Efficient GWAS analysis on Imputed data with Hail: expected time and ensuring performance (resources + time)

I am trying to run GWAS with one trait, including about 400K individuals, and the UKB imputation dataset (code 22828) using Hail . I am running my analysis with the dx run app-dxjupyterlab_spark_cluster command. I would like to have some feedback on what is the most efficient way to run this analysis considering computation time and cost. I cannot get a clear sense of what is the expected time for the analyses to conclude, noting also that sometimes analyses stall. I am testing the instance type mem1_ssd1_v2_x72. Following the documentation you provided I am running my script as:

my_cmd="papermill BGENimport_to_GWAS_chr22_full_pipeline.ipynb BGENimport_to_GWAS_chr22_full_pipeline_output.ipynb"

APP_ID=$(dx describe dxjupyterlab_spark_cluster --json | jq -r .id)

EXTRA='{"timeoutPolicyByExecutable": {"'"$APP_ID"'": {"*": {"hours": 4}}}}'

dx run dxjupyterlab_spark_cluster -icmd="$cmd_BGEN_to_MT" -iin="BGENimport_to_GWAS_chr22_full_pipeline.ipynb" --instance-type "mem1_ssd1_v2_x72" --tag="chr22_bgen2GWAS" --priority "high" --name "chr22_bgen2GWAS" --extra-args "$EXTRA" --yes

Here it is a minimum example of my script for chromosome 22:

#1. Initiate Spark and Hail

from pyspark.sql import SparkSession
import hail as hl
import os
import pandas as pd
import dxpy

builder = (SparkSession.builder.enableHiveSupport())
spark = builder.getOrCreate()
hl.init(sc=spark.sparkContext)

#2. Import imputed data into a Hail MatrixTable (MT)

bgen_path = f'/mnt/project/Bulk/Imputation/UKB imputation from genotype'
filename = f'ukb22828_c22_b0_v3'
file_url = f"file://{bgen_path}/{filename}.bgen" 
output_name = f'file:///mnt/project/Data/{filename}.bgen.idx2'

hl.index_bgen(path=file_url,
            index_file_map={file_url:f"hdfs:///{filename}.idx2"},
            reference_genome="GRCh37",
            contig_recoding=None,
            skip_invalid_loci=False)

index_file_map = {}
index_file_map[f"file://{bgen_path}/{filename}.bgen"] = f"hdfs:///{filename}.idx2"

mt = hl.import_bgen(path=f"file://{bgen_path}/ukb22828_c22_b0_v3.bgen", 
                  entry_fields=['GT', 'GP'],
                  sample_file=f"file://{bgen_path}/ukb22828_c22_b0_v3.sample",
                  n_partitions=None,
                  block_size=None,
                  index_file_map=index_file_map,
                  variants=None)

#3. Filter MT for samples ID

# Read the sample phenotype files
phenotype_filename = f'file:///mnt/project/phenotype_trait1.csv'
phenotype_df = pd.read_csv(phenotype_filename, sep = '\t')
# Get samples ID
id_samples = phenotype_df['IID'].tolist()
# Define sample ID set to filter for
filter_ind_id = hl.set(id_samples)
filtered_mt = mt.filter_cols(filter_ind_id.contains(mt.s), keep=True)

#4. Filter MT based on locus quality control

# Run variant-level QC
qc_mt = hl.variant_qc(filtered_mt)

# Create Hail Table from MT
qc_tb = qc_mt.rows()

#Convert an aggregable of genotypes (gs) to an aggregable of genotype quality scores, then compute the IMPUTE information score for each variant and filter variants based on INFO score.
qc_mt_infoscore = qc_mt.filter_rows(hl.agg.info_score(qc_mt.GP).score >= 0.8)
qc_mt_infoscore_tb = qc_mt_infoscore.rows()

#It filters based on MAF (0.001) and HWE (1e-10)
locus_qc_tb = qc_mt_infoscore_tb.filter(
  (qc_mt_infoscore_tb["variant_qc"]["AF"][0] >= 0.001) & 
  (qc_mt_infoscore_tb["variant_qc"]["AF"][0] <= 0.999) & 
  (qc_mt_infoscore_tb["variant_qc"]["p_value_hwe"] >= 1e-10))

#Filtering of the MT based on "locus_qc_tb" table
locus_qc_mt = qc_mt_infoscore.semi_join_rows(locus_qc_tb)

#5. Run GWAS analysis on Hail

#Convert a pandas dataframe with the trait value in a Hail Table
pheno_table = hl.Table.from_pandas(phenotype_df)
pheno_table = pheno_table.annotate(IID = hl.str(pheno_table.IID))
pheno_table = pheno_table.key_by('IID')

# Annotate the MT with pheno Table by matching the MT's column key ('s') with the pheno Table's key ('sample_id')
phenogeno_mt = locus_qc_mt.annotate_cols(**pheno_table[locus_qc_mt.s])

# Run Hail's logistic regression method
gwas = hl.linear_regression_rows(y=phenogeno_mt.nb_live_births, 
                               x=phenogeno_mt.GT.n_alt_alleles(),
                               covariates=[1.0],
                               block_size = 50)

#6. Save GWAS results into a database

# Define database and table name
db_name = "gwas_analyses"
tb_name = "gwas_results_chr22.ht"

# Create database in DNAX
stmt = f"CREATE DATABASE IF NOT EXISTS {db_name} LOCATION 'dnax://'"
print(stmt)
spark.sql(stmt).show()

# Find database ID of newly created database using a dxpy method
db_uri = dxpy.find_one_data_object(name=f"{db_name}", classname="database")['id']
url = f"dnax://{db_uri}/{tb_name}"
#Write GWAS results into the database
gwas.write(url, overwrite = True) 

Comments

2 comments

  • Comment author
    Dr. Mc. Ninja

    Here is what my assistant came up with as an answer for this question… Please take all the ‘statements’ with a pinch of salt. YMMV as they say ;-)

    Cheers,

    Below is a practical “recipe” that UK Biobank RAP users have found works reliably for a single-phenotype GWAS on ≈ 400 k samples with the imputed (“ukb22828”) BGEN files in Hail. I’ll start with a high-level timeline so you can budget time and then list the concrete changes that make runs both faster and cheaper on dx jupyterlab spark cluster.

     

    1. What run-time should you expect?

    +----------------------------------+----------------------------------------+---------------------------+
    | Task                             | Typical wall-clock* on 1 chromosome    | Whole genome (96 M        |
    |                                  | (22 -> ~11 M variants)                 | variants)                 |
    +----------------------------------+----------------------------------------+---------------------------+
    | 1. Index BGEN -> write .idx2     | 2-3 min / chr (done once, reuse)       | 40-50 min total           |
    |    once                          |                                        |                           |
    +----------------------------------+----------------------------------------+---------------------------+
    | 2. import_bgen -> MatrixTable    | 8-10 min / chr                         | 1.5-2 h                   |
    +----------------------------------+----------------------------------------+---------------------------+
    | 3. QC filters (INFO, MAF, HWE)   | 3-5 min / chr                          | 45-60 min                 |
    +----------------------------------+----------------------------------------+---------------------------+
    | 4. Linear/Logistic regression    | 25-35 min / chr                        | 6-8 h                     |
    +----------------------------------+----------------------------------------+---------------------------+
    

    * Measured on a 6-node cluster (driver + 5 workers) of mem1_ssd1_v2_x32 (32 vCPU, 240 GiB RAM each). A single mem1_ssd1_v2_x72 node finishes step ❹ in ~30 h and costs slightly more because the large local shuffle spillage throttles CPU utilisation.

     

    2. Why your current job stalls on a single x72 node

    • Shuffle pressure & disk IO – step ❹ produces tens of GB of shuffles per chromosome; with one JVM all writes hit the same SSD, slowing to <100 MB/s.
    • Spark layout – by default the driver also runs executors, so JVM GC pauses on the 400 GB heap can look like “stalling”.
    • Hail partitioningimport_bgen without n_partitions often makes only ~200-300 partitions for chr 22; with 72 cores each task is disk-bound and Spark marks the executor “lost”.

     

    3. Cluster recipe that scales

    # 1 driver + 5 workers, each mem1_ssd1_v2_x32
    dx run dxjupyterlab_spark_cluster \
      -y --priority normal --name gwas_hail \
      -iworker_instance_type mem1_ssd1_v2_x32 \
      -iworker_count 5 \
      -icmd 'papermill gwas.ipynb gwas_out.ipynb'

    Why x32?

    • The per-vCPU price stays flat up to x32 (see RAP rate-card table) and memory-per-CPU (7.5 GB) is near the sweet spot for Hail’s BlockMatrix shuffles.
    • More (smaller) nodes ⇒ four independent local SSDs × 6 machines, so >2 GB/s aggregate shuffle bandwidth.

     

    Spark/Hail session tuning (put this at notebook top):

    import hail as hl, pyspark
    sc_conf = (
        pyspark.SparkConf()
         .set('spark.serializer',              'org.apache.spark.serializer.KryoSerializer')
         .set('spark.sql.shuffle.partitions',  5000)   # ~ (#variants/20 k)
         .set('spark.executor.memory',         '40g')
         .set('spark.driver.memory',           '60g')
         .set('spark.network.timeout',         '600s')
    )
    hl.init(spark_conf=sc_conf, log='/tmp/hail.log')
    

     

    4. Script-level fixes

    +------------------------------------------------------------------+--------------------------------------------------+
    | Change                                                           | Why it helps                                     |
    +------------------------------------------------------------------+--------------------------------------------------+
    | Persist MT once per chromosome to a RAP asset folder, then       | Re-reading BGEN is IO-bound and not cached       |
    | reuse for QC/regression instead of re-reading BGEN each          | across Spark stages.                             |
    | iteration.                                                       |                                                  |
    +------------------------------------------------------------------+--------------------------------------------------+
    | Call hl.index_bgen only the first time and save the generated    | Avoids 50 min of repeated work.                  |
    | .idx2 back to the same Bulk/.../Imputation folder; skip if it    |                                                  |
    | exists.                                                          |                                                  |
    +------------------------------------------------------------------+--------------------------------------------------+
    | Set partitions explicitly: n_partitions =                        | Keeps each task < 5 s, ideal for Spark           |
    | int(hl.hadoopfs.stat(file_url).size/32e6) (~32 MB per            | scheduling.                                      |
    | partition).                                                      |                                                  |
    +------------------------------------------------------------------+--------------------------------------------------+
    | Avoid Python pandas inside the cluster loop - read phenotype     | Prevents driver bottleneck and large collect()   |
    | once, broadcast as Hail Table and annotate_cols.                 | calls.                                           |
    +------------------------------------------------------------------+--------------------------------------------------+
    | Use block_size=1024 in linear/logistic regression to balance     | Empirically optimal for 32-core executors.       |
    | between parallelism and memory.                                  |                                                  |
    +------------------------------------------------------------------+--------------------------------------------------+

     

    5. Cost guidance

    On AWS-London (the RAP region) mem1_ssd1_v2_x32 costs £2.13 / h.
    A 6-node, 8-h full-genome run ≈ £ 2.13 × 6 × 8 ≈ £ 102 including driver.

    Because the bulk of the time is in CPU-heavy regression, spot/pre-emptible nodes rarely reclaim in <20 min, so priority “normal” is the best price/performance compromise. (Jobs at high priority cost the same but compete less for capacity; set high only when the queue is long.)

     

    6. Re-usable workflow outline

    for chrom in range(1,23):
        bgen    = f'file://{bgen_path}/ukb22828_c{chrom:02d}_b0_v3.bgen'
        sample  = f'file://{bgen_path}/ukb22828_c{chrom:02d}_b0_v3.sample'
        idx2    = f'hdfs:///ukb22828_c{chrom:02d}.idx2'
    
        if not hl.hadoopfs.exists(idx2):
            hl.index_bgen(bgen, index_file_map={bgen: idx2})
    
        mt = hl.import_bgen(bgen, entry_fields=['GT','GP'],
                            sample_file=sample,
                            index_file_map={bgen: idx2},
                            n_partitions= n_partitions_from_size(bgen))
    
        # sample + variant QC
        mt = (mt
              .filter_cols(sample_set.contains(mt.s))
              .annotate_rows(info = hl.agg.info_score(mt.GP).score)
              .filter_rows((mt.info >= 0.8) &
                           (mt.variant_qc.AF[0].between(0.001,0.999)) &
                           (mt.variant_qc.p_value_hwe >= 1e-10))
        )
    
        mt = mt.checkpoint(f'/mnt/project/tmp/mt_chr{chrom:02d}.mt', overwrite=True)
    
        gwas = hl.linear_regression_rows(
                  y=mt.phenotype,
                  x=mt.GT.n_alt_alleles(),
                  covariates=[1.0],
                  block_size=1024)
    
        gwas.export(f'dnax://{db_uri}/gwas_chr{chrom:02d}.ht')

     

    7. Debugging “silent stalls”

    • Watch the driver log (tail -f hail.log) – if you see repeated ShuffleBlockFetcherIterator timeouts, disks are saturated; scale out.
    • Check executor metrics in the Jupyter “Spark UI” tab; if GC time > 10 % lower spark.executor.memory to 32 g so the JVM rescues faster.
    • Ensure the underlying DNAnexus job timeout exceeds wall-clock (--extra-args '{"timeoutPolicyByExecutable": {...}}'), e.g. 12 h for genome-wide.

     

    8. Take-away cheat-sheet

    • Cluster: 1 × driver + 4-6 × mem1_ssd1_v2_x32 workers.
    • Partitions: size-based (~32 MB) or 60 k for whole genome.
    • Regression: block_size=1024.
    • Expect: 6-8 h, ~£100, reproducible without stalls.

    Feel free to adjust node-count upward; Hail scales almost linearly up to ~400 cores for this workload, but beyond 8 nodes network shuffle becomes the next bottleneck. Good luck with the run!

    2
  • Comment author
    Gabriele Maria Sgarlata

    Hi,

    I took a while because I wanted to test your suggestions. I have edited some part of the code you provided since they were not really working (perhaps the code you shared refer to some older version).

    I want to note that an analysis with 100,000 loci and 1,000 individuals took about 1 hour, using mem1_ssd1_v2_x36 and 6 nodes (1 driver + 5 workers).  I could not see any instance of this type: mem1_ssd1_v2_x32 (as you suggested). 

    When I tried with 100,000 loci and 10,000 individuals took more than 4 hr (although it did not finish due to the exceeded time limits that I set). I am not experienced with this analysis, but this time (>4hr for 100,000 loci and 10,000 individuals) seems long to me.

    Any thoughts on that and if I am missing something? Also, I was wondering whether there is a way to fine tune the analysis by looking at the log file or the spark job UI (besides what you already suggested). For instance, how can I see how much memory a certain analysis may need?

    Thank you in advance,

    Here it is what it worked for me:

    import hail as hl, pyspark
    import dxpy
    from pyspark.sql import SparkSession
    import os
    
    #Function to count the number of parititons (used in the cases of 'import_bgen')
    def n_partitions_from_size(file_url):
        return int(hl.hadoop_stat(file_url).get('size_bytes')/32e6)
    
    n_loci = 100000
    sample_size = 10000
    param_shuffle = int(n_loci / 20000)
    
    spark = (SparkSession
            .builder
            .enableHiveSupport()
            .config(map={'spark.serializer':'org.apache.spark.serializer.KryoSerializer','spark.sql.shuffle.partitions': param_shuffle,
                        'spark.executor.memory':'40g', 'spark.driver.memory':'60g','spark.network.timeout':'600s'})
            .getOrCreate())
    
    hl.init(sc=spark.sparkContext)
    # Define the path of BGEN files directory in the project
    imputation_folder = 'UKB imputation from genotype'
    imputation_field_id = '22828'
    bgen_path = f'/mnt/project/Bulk/Imputation/{imputation_folder}'
    
    # Create an index file for each BGEN file in the directory
    imputation_field_id = '22828'
    chromosome_filename = f'ukb{imputation_field_id}_c22_b0_v3'
    bgen_file = f"file://{bgen_path}/{chromosome_filename}.bgen"
    index_file = f'hdfs:///{chromosome_filename}.idx2'
    sample_chr_file = f"file://{bgen_path}/{chromosome_filename}.sample"
    
    #if not hl.hadoop_exists(index_file):
    hl.index_bgen(path=bgen_file, index_file_map={bgen_file: index_file}, reference_genome="GRCh37", contig_recoding=None, skip_invalid_loci=False)
    
    mt = hl.import_bgen(path=bgen_file,
                        entry_fields=['GT', 'GP'],
                        sample_file=sample_chr_file,
                        n_partitions=n_partitions_from_size(bgen_file),
                        block_size=None,
                        index_file_map={bgen_file: index_file},
                        variants=None)
    
    # Use phenotypic table
    phenotype_filename = f'file:///mnt/project/trait_values.csv'
    pheno_table = hl.import_table(phenotype_filename,
                                  delimiter='\t',
                                  impute=True,
                                  key='ID')
    
    sample_set_string = hl.str(pheno_table.ID).collect()
    sample_set_string = sample_set_string[:sample_size]
    #It returns a set of all unique elements in 'IID'.
    sample_set = hl.set(sample_set_string)
    
    mt = mt.filter_cols(sample_set.contains(mt.s))
    mt = mt.annotate_rows(info = hl.agg.info_score(mt.GP).score)
    mt = hl.variant_qc(mt)
    mt = mt.filter_rows((mt.info >= 0.8) &
                       (mt.variant_qc.AF[0] >= 0.001) &
                       (mt.variant_qc.AF[0] <= 0.999) &
                       (mt.variant_qc.p_value_hwe >= 1e-10))
    
    # Annotate the MT with pheno Table by matching the MT's column key ('s') with the pheno Table's key ('sample_id')
    mt = mt.annotate_cols(**pheno_table[mt.s])
    
    # Create database in DNAX
    db_name = "gwas_analyses"
    stmt = f"CREATE DATABASE IF NOT EXISTS {db_name} LOCATION 'dnax://'"
    print(stmt)
    spark.sql(stmt).show()
    
    # find database ID of newly created database using a dxpy method
    db_uri = dxpy.find_one_data_object(name=f"{db_name}", classname="database")['id']
    url = f"dnax://{db_uri}/mt_chr22.mt"
    
    mt = mt.checkpoint(url, overwrite=True)
    # Run Hail's logistic regression method
    gwas = hl.linear_regression_rows(y=mt.nb_live_births, x=mt.GT.n_alt_alleles(),covariates=[1.0],block_size = 1024)
    0

Please sign in to leave a comment.