Python Pipeline Example

This example shows how to run the SOL pipeline directly in Python, using functions imported from the GRiD CLI.

This is useful for local debugging, Jupyter notebooks, or workflow integration.

“SOL.py - Python-based pipeline”
  1#!/usr/bin/env python3
  2"""
  3SOL Pipeline (Python version)
  4=============================
  5
  6Usage:
  7    python sol_pipeline.py        # runs steps 1–9
  8    python sol_pipeline.py 5      # runs only step 5
  9    python sol_pipeline.py 3-5    # runs steps 3–5
 10
 11This script mirrors the functionality of the original SLURM script,
 12but calls GRiD functions directly from Python.
 13"""
 14
 15import sys
 16import re
 17from pathlib import Path
 18from rich.console import Console
 19from grid.cli import (
 20    crai,
 21    count_reads,
 22    mosdepth,
 23    normalize_mosdepth,
 24    find_neighbors,
 25    extract_reference,
 26    lpa_realign,
 27    compute_dipcn,
 28    estimate_kiv,
 29)
 30
 31console = Console()
 32
 33# ============================================================
 34# Step Range Parsing
 35# ============================================================
 36def parse_range(arg: str):
 37    """Parse step range input (e.g., '3-5', '5', or '')."""
 38    if not arg:
 39        return 1, 9
 40    match = re.match(r"^(\d+)-(\d+)$", arg)
 41    if match:
 42        return int(match.group(1)), int(match.group(2))
 43    elif re.match(r"^\d+$", arg):
 44        step = int(arg)
 45        return step, step
 46    else:
 47        console.print(f"[red]❌ Invalid range argument: {arg}[/red]")
 48        sys.exit(1)
 49
 50
 51# ============================================================
 52# Environment Setup
 53# ============================================================
 54WORK = Path("/work/users/c/a/catererz/grid")
 55GRID = Path("/nas/longleaf/home/catererz/epi/GRiD")
 56SOL = Path("/proj/epi/Genetic_Data_Center/SOL")
 57CRAM = SOL / "cram"
 58FILES = GRID / "files"
 59
 60WORK.mkdir(parents=True, exist_ok=True)
 61
 62REGIONS_FILE = FILES / "734_possible_coding_vntr_regions.IBD2R_gt_0.25.uniq.txt"
 63
 64# Parse LPA region
 65with REGIONS_FILE.open() as f:
 66    for line in f:
 67        cols = line.split()
 68        if len(cols) >= 7 and cols[6] == "LPA":
 69            CHR, START, END = cols[0], int(cols[1]), int(cols[2])
 70            break
 71
 72
 73# ============================================================
 74# Helper: should we run this step?
 75# ============================================================
 76def should_run(step, start, end):
 77    return start <= step <= end
 78
 79
 80# ============================================================
 81# Main pipeline
 82# ============================================================
 83def main():
 84    arg = sys.argv[1] if len(sys.argv) > 1 else ""
 85    start_step, end_step = parse_range(arg)
 86    console.rule(f"Running SOL Pipeline Steps {start_step}{end_step}")
 87
 88    # STEP 1 — Ensure CRAI Indexes
 89    if should_run(1, start_step, end_step):
 90        console.rule("[bold cyan]Step 1: Ensuring CRAM Indexes[/bold cyan]")
 91        for cram_file in CRAM.glob("*.cram"):
 92            crai.callback(cram=cram_file, reference=str(FILES / "hg38.fa"))
 93
 94    # STEP 2 — Count Reads
 95    if should_run(2, start_step, end_step):
 96        console.rule("[bold cyan]Step 2: Counting Reads[/bold cyan]")
 97        count_reads.callback(
 98            cram_dir=str(CRAM),
 99            output_file="count-reads.tsv",
100            ref_fasta=str(FILES / "hg38.fa"),
101            chrom=f"chr{CHR}",
102            start=START,
103            end=END,
104            config=str(GRID / "grid/config.yaml"),
105            threads=4,
106        )
107
108    # STEP 3 — Mosdepth
109    if should_run(3, start_step, end_step):
110        console.rule("[bold cyan]Step 3: Running Mosdepth[/bold cyan]")
111        mosdepth.callback(
112            cram_dir=str(CRAM),
113            output_file="mosdepth.tsv",
114            ref_fasta=str(FILES / "hg38.fa"),
115            chrom=f"chr{CHR}",
116            start=START,
117            end=END,
118            region_name="LPA",
119            work_dir=str(WORK / "mosdepth"),
120            by=1000,
121            fast=True,
122            threads=4,
123        )
124
125    # STEP 4 — Normalize Mosdepth
126    if should_run(4, start_step, end_step):
127        console.rule("[bold cyan]Step 4: Normalizing Mosdepth[/bold cyan]")
128        normalize_mosdepth.callback(
129            mosdepth_dir=str(WORK / "mosdepth"),
130            output_file="normalized_mosdepth.txt.gz",
131            repeat_mask=str(FILES / "repeat_mask_list.hg38.ucsc_bed"),
132            chrom=f"chr{CHR}",
133            start=START,
134            end=END,
135            min_depth=20,
136            max_depth=100,
137            top_frac=0.1,
138            threads=4,
139        )
140
141    # STEP 5 — Find Neighbors
142    if should_run(5, start_step, end_step):
143        console.rule("[bold cyan]Step 5: Finding Neighbors[/bold cyan]")
144        find_neighbors.callback(
145            input_file="normalized_mosdepth.txt.gz",
146            output_file="neighbors.txt",
147            zmax=2.0,
148            n_neighbors=500,
149            sigma2_max=1000.0,
150        )
151
152    # STEP 6 — Extract Reference
153    if should_run(6, start_step, end_step):
154        console.rule("[bold cyan]Step 6: Extracting LPA Reference[/bold cyan]")
155        extract_reference.callback(
156            reference_fa=str(FILES / "hg38.fa"),
157            bed_file=str(FILES / "ref_hg38.bed"),
158            output_dir=str(WORK),
159            output_prefix="lpa_reference",
160        )
161
162    # STEP 7 — Realign Reads
163    if should_run(7, start_step, end_step):
164        console.rule("[bold cyan]Step 7: Realigning Reads to LPA[/bold cyan]")
165        lpa_realign.callback(
166            cram_dir=str(CRAM),
167            output_file="lpa_realigned.tsv",
168            ref_fasta=str(FILES / "hg38.fa"),
169            lpa_ref_fasta="lpa_reference.fasta",
170            positions_file=str(FILES / "hardcoded_positions.txt"),
171            genome_build="hg38",
172            chrom=f"chr{CHR}",
173            start=START,
174            end=END,
175            threads=4,
176        )
177
178    # STEP 8 — Compute DIPCN
179    if should_run(8, start_step, end_step):
180        console.rule("[bold cyan]Step 8: Computing LPA DIPCN[/bold cyan]")
181        compute_dipcn.callback(
182            count_file="lpa_realigned.tsv",
183            neighbor_file="neighbors.txt",
184            output_prefix="lpa_dipcn",
185            n_neighbors=500,
186        )
187
188    # STEP 9 — Compute KIV2 CN
189    if should_run(9, start_step, end_step):
190        console.rule("[bold cyan]Step 9: Computing LPA KIV2 CN[/bold cyan]")
191        estimate_kiv.callback(
192            exon1a="lpa_dipcn.exon1A.dipCN.txt",
193            exon1b="lpa_dipcn.exon1B.dipCN.txt",
194            output="lpa_kiv2_cn.txt",
195            format="txt",
196        )
197
198    console.rule(f"[bold green]Pipeline complete for steps {start_step}{end_step}[/bold green]")
199
200
201if __name__ == "__main__":
202    main()

Usage

python SOL.py        # run all steps (1–9)
python SOL.py 5      # run only step 5
python SOL.py 3-7    # run steps 3 through 7