Efficient GWAS analysis on Imputed data with Hail: expected time and ensuring performance (resources + time)
I am trying to run GWAS with one trait, including about 400K individuals, and the UKB imputation dataset (code 22828) using Hail . I am running my analysis with the dx run app-dxjupyterlab_spark_cluster command. I would like to have some feedback on what is the most efficient way to run this analysis considering computation time and cost. I cannot get a clear sense of what is the expected time for the analyses to conclude, noting also that sometimes analyses stall. I am testing the instance type mem1_ssd1_v2_x72. Following the documentation you provided I am running my script as:
my_cmd="papermill BGENimport_to_GWAS_chr22_full_pipeline.ipynb BGENimport_to_GWAS_chr22_full_pipeline_output.ipynb"
APP_ID=$(dx describe dxjupyterlab_spark_cluster --json | jq -r .id)
EXTRA='{"timeoutPolicyByExecutable": {"'"$APP_ID"'": {"*": {"hours": 4}}}}'
dx run dxjupyterlab_spark_cluster -icmd="$cmd_BGEN_to_MT" -iin="BGENimport_to_GWAS_chr22_full_pipeline.ipynb" --instance-type "mem1_ssd1_v2_x72" --tag="chr22_bgen2GWAS" --priority "high" --name "chr22_bgen2GWAS" --extra-args "$EXTRA" --yesHere it is a minimum example of my script for chromosome 22:
#1. Initiate Spark and Hail
from pyspark.sql import SparkSession
import hail as hl
import os
import pandas as pd
import dxpy
builder = (SparkSession.builder.enableHiveSupport())
spark = builder.getOrCreate()
hl.init(sc=spark.sparkContext)#2. Import imputed data into a Hail MatrixTable (MT)
bgen_path = f'/mnt/project/Bulk/Imputation/UKB imputation from genotype'
filename = f'ukb22828_c22_b0_v3'
file_url = f"file://{bgen_path}/{filename}.bgen"
output_name = f'file:///mnt/project/Data/{filename}.bgen.idx2'
hl.index_bgen(path=file_url,
index_file_map={file_url:f"hdfs:///{filename}.idx2"},
reference_genome="GRCh37",
contig_recoding=None,
skip_invalid_loci=False)
index_file_map = {}
index_file_map[f"file://{bgen_path}/{filename}.bgen"] = f"hdfs:///{filename}.idx2"
mt = hl.import_bgen(path=f"file://{bgen_path}/ukb22828_c22_b0_v3.bgen",
entry_fields=['GT', 'GP'],
sample_file=f"file://{bgen_path}/ukb22828_c22_b0_v3.sample",
n_partitions=None,
block_size=None,
index_file_map=index_file_map,
variants=None)#3. Filter MT for samples ID
# Read the sample phenotype files
phenotype_filename = f'file:///mnt/project/phenotype_trait1.csv'
phenotype_df = pd.read_csv(phenotype_filename, sep = '\t')
# Get samples ID
id_samples = phenotype_df['IID'].tolist()
# Define sample ID set to filter for
filter_ind_id = hl.set(id_samples)
filtered_mt = mt.filter_cols(filter_ind_id.contains(mt.s), keep=True)#4. Filter MT based on locus quality control
# Run variant-level QC
qc_mt = hl.variant_qc(filtered_mt)
# Create Hail Table from MT
qc_tb = qc_mt.rows()
#Convert an aggregable of genotypes (gs) to an aggregable of genotype quality scores, then compute the IMPUTE information score for each variant and filter variants based on INFO score.
qc_mt_infoscore = qc_mt.filter_rows(hl.agg.info_score(qc_mt.GP).score >= 0.8)
qc_mt_infoscore_tb = qc_mt_infoscore.rows()
#It filters based on MAF (0.001) and HWE (1e-10)
locus_qc_tb = qc_mt_infoscore_tb.filter(
(qc_mt_infoscore_tb["variant_qc"]["AF"][0] >= 0.001) &
(qc_mt_infoscore_tb["variant_qc"]["AF"][0] <= 0.999) &
(qc_mt_infoscore_tb["variant_qc"]["p_value_hwe"] >= 1e-10))
#Filtering of the MT based on "locus_qc_tb" table
locus_qc_mt = qc_mt_infoscore.semi_join_rows(locus_qc_tb)#5. Run GWAS analysis on Hail
#Convert a pandas dataframe with the trait value in a Hail Table
pheno_table = hl.Table.from_pandas(phenotype_df)
pheno_table = pheno_table.annotate(IID = hl.str(pheno_table.IID))
pheno_table = pheno_table.key_by('IID')
# Annotate the MT with pheno Table by matching the MT's column key ('s') with the pheno Table's key ('sample_id')
phenogeno_mt = locus_qc_mt.annotate_cols(**pheno_table[locus_qc_mt.s])
# Run Hail's logistic regression method
gwas = hl.linear_regression_rows(y=phenogeno_mt.nb_live_births,
x=phenogeno_mt.GT.n_alt_alleles(),
covariates=[1.0],
block_size = 50)#6. Save GWAS results into a database
# Define database and table name
db_name = "gwas_analyses"
tb_name = "gwas_results_chr22.ht"
# Create database in DNAX
stmt = f"CREATE DATABASE IF NOT EXISTS {db_name} LOCATION 'dnax://'"
print(stmt)
spark.sql(stmt).show()
# Find database ID of newly created database using a dxpy method
db_uri = dxpy.find_one_data_object(name=f"{db_name}", classname="database")['id']
url = f"dnax://{db_uri}/{tb_name}"
#Write GWAS results into the database
gwas.write(url, overwrite = True)
Comments
2 comments
Here is what my assistant came up with as an answer for this question… Please take all the ‘statements’ with a pinch of salt. YMMV as they say ;-)
Cheers,
Below is a practical “recipe” that UK Biobank RAP users have found works reliably for a single-phenotype GWAS on ≈ 400 k samples with the imputed (“ukb22828”) BGEN files in Hail. I’ll start with a high-level timeline so you can budget time and then list the concrete changes that make runs both faster and cheaper on dx jupyterlab spark cluster.
1. What run-time should you expect?
* Measured on a 6-node cluster (driver + 5 workers) of
mem1_ssd1_v2_x32(32 vCPU, 240 GiB RAM each). A singlemem1_ssd1_v2_x72node finishes step ❹ in ~30 h and costs slightly more because the large local shuffle spillage throttles CPU utilisation.2. Why your current job stalls on a single x72 node
import_bgenwithoutn_partitionsoften makes only ~200-300 partitions for chr 22; with 72 cores each task is disk-bound and Spark marks the executor “lost”.3. Cluster recipe that scales
Why x32?
BlockMatrixshuffles.Spark/Hail session tuning (put this at notebook top):
4. Script-level fixes
5. Cost guidance
On AWS-London (the RAP region)
mem1_ssd1_v2_x32costs £2.13 / h.A 6-node, 8-h full-genome run ≈ £ 2.13 × 6 × 8 ≈ £ 102 including driver.
Because the bulk of the time is in CPU-heavy regression, spot/pre-emptible nodes rarely reclaim in <20 min, so priority “normal” is the best price/performance compromise. (Jobs at high priority cost the same but compete less for capacity; set high only when the queue is long.)
6. Re-usable workflow outline
7. Debugging “silent stalls”
tail -f hail.log) – if you see repeatedShuffleBlockFetcherIteratortimeouts, disks are saturated; scale out.spark.executor.memoryto 32 g so the JVM rescues faster.--extra-args '{"timeoutPolicyByExecutable": {...}}'), e.g. 12 h for genome-wide.8. Take-away cheat-sheet
mem1_ssd1_v2_x32workers.60 kfor whole genome.block_size=1024.Feel free to adjust node-count upward; Hail scales almost linearly up to ~400 cores for this workload, but beyond 8 nodes network shuffle becomes the next bottleneck. Good luck with the run!
Hi,
I took a while because I wanted to test your suggestions. I have edited some part of the code you provided since they were not really working (perhaps the code you shared refer to some older version).
I want to note that an analysis with 100,000 loci and 1,000 individuals took about 1 hour, using mem1_ssd1_v2_x36 and 6 nodes (1 driver + 5 workers). I could not see any instance of this type: mem1_ssd1_v2_x32 (as you suggested).
When I tried with 100,000 loci and 10,000 individuals took more than 4 hr (although it did not finish due to the exceeded time limits that I set). I am not experienced with this analysis, but this time (>4hr for 100,000 loci and 10,000 individuals) seems long to me.
Any thoughts on that and if I am missing something? Also, I was wondering whether there is a way to fine tune the analysis by looking at the log file or the spark job UI (besides what you already suggested). For instance, how can I see how much memory a certain analysis may need?
Thank you in advance,
Here it is what it worked for me:
Please sign in to leave a comment.