import pandas as pd
from DOLPHIN.preprocess import gtfpy
import numpy as np
import math
from tqdm import tqdm
from intervaltree import Interval, IntervalTree
import glob, os
pd.set_option('display.max_columns',500)
pd.set_option('display.max_rows',100)
[docs]def prepare_exon_gtf(input_gtf_path, output_dir="./"):
"""
Load an Ensembl GTF file and extract exon-level annotations with unique start/end per gene.
Parameters
----------
input_gtf_path : str
Path to the original Ensembl .gtf file.
output_dir : str, optional
Directory to save intermediate results (default: './dolphin_exon_gtf/').
Returns
-------
df_exon_nodup : pandas.DataFrame
Filtered exon annotation table with duplicates (same gene_id, start, end) removed.
"""
# 1. Ensure output directory exists
os.makedirs(os.path.join(output_dir, "dolphin_exon_gtf"), exist_ok=True)
print(f"[Step] Reading GTF file from: {input_gtf_path}")
# 2. Load GTF and parse
df_gtf = gtfpy.readGTF(input_gtf_path)
GTFpa = gtfpy.parseGTF(df_gtf)
print(f"[Status] GTF loaded and parsed with {GTFpa.shape[0]} total entries.")
# 3. Filter exon features and keep necessary columns
# df_exon = GTFpa[GTFpa["feature"] == "exon"][[
# 'seqname', 'source', 'feature', 'start', 'end', 'score',
# 'strand', 'frame', 'gene_id', 'gene_version', 'gene_name',
# 'gene_source', 'gene_biotype', 'exon_number'
# ]]
required_cols = [
'seqname', 'source', 'feature', 'start', 'end', 'score',
'strand', 'frame', 'gene_id', 'gene_name', 'exon_number'
]
optional_cols = ['gene_version', 'gene_source', 'gene_biotype', 'gene_type']
missing_required = [col for col in required_cols if col not in GTFpa.columns]
if missing_required:
raise ValueError(
f"GTFpa is missing required columns: {missing_required}. "
"Please check your GTF or preprocessing step."
)
final_cols = required_cols + [col for col in optional_cols if col in GTFpa.columns]
df_exon = GTFpa[GTFpa["feature"] == "exon"][final_cols]
# 4. Sort and convert coordinates
df_exon[["start", "end"]] = df_exon[["start", "end"]].apply(pd.to_numeric)
df_exon = df_exon.sort_values(by=["seqname", "start", "end"], ascending=[True, True, True])
# 5. Remove duplicate exons (same gene, start, end)
df_exon_nodup = df_exon.drop_duplicates(subset=["gene_id", "start", "end"], keep="first")
print(f"[Status] Removed duplicates: {df_exon_nodup.shape[0]} unique exon entries remain.")
return df_exon_nodup
# def exon_uniq(df_exon_nodup, gene):
# # Subset exons for the given gene
# _df_exon = df_exon_nodup.loc[df_exon_nodup["gene_id"] == gene].reset_index(drop=True).copy()
# # Create interval tree from exon start/end
# # tree = IntervalTree(Interval(row["start"], row["end"]) for _, row in _df_exon.iterrows())
# tree = IntervalTree(
# Interval(row["start"], row["end"])
# for _, row in _df_exon.iterrows()
# if row["start"] < row["end"]
# )
# # Merge overlapping exons
# tree.merge_overlaps()
# # Get sorted list of unique intervals
# merged_intervals = sorted([(iv.begin, iv.end) for iv in tree])
# # Assign merged coordinates to exons
# _df_exon["_start"] = math.nan
# _df_exon["_end"] = math.nan
# for k in range(_df_exon.shape[0]):
# for start, end in merged_intervals:
# if start <= _df_exon.at[k, "start"] and end >= _df_exon.at[k, "end"]:
# _df_exon.at[k, "_start"] = start
# _df_exon.at[k, "_end"] = end
# break # stop at first match
# # Warn if any exon did not get matched
# for i in range(_df_exon.shape[0]):
# if math.isnan(_df_exon.at[i, '_start']):
# print(f"Attention: {gene}, start={_df_exon.at[i,'start']}, end={_df_exon.at[i,'end']} was not assigned.")
# # Keep only unique exons based on merged coordinates
# _df_exon_out = _df_exon.drop_duplicates(subset=["_start", "_end"], keep="first").copy()
# _df_exon_out["start"] = _df_exon_out["_start"].astype(int)
# _df_exon_out["end"] = _df_exon_out["_end"].astype(int)
# _df_exon_out.drop(columns=["_start", "_end"], inplace=True)
# # Re-index exon numbers
# _df_exon_out = _df_exon_out.reset_index(drop=True)
# _df_exon_out["exon_number"] = _df_exon_out.index + 1
# return _df_exon_out
[docs]def exon_uniq(df_exon_nodup: pd.DataFrame, gene: str) -> pd.DataFrame:
"""
Merge overlapping exons for a single gene using interval trees.
Parameters
----------
df_exon_nodup : pandas.DataFrame
DataFrame containing all exons (from `prepare_exon_gtf`), including gene IDs and coordinates.
gene : str
The gene ID whose exons will be processed.
Returns
-------
pandas.DataFrame
A cleaned exon DataFrame for the given gene, where overlapping exons are merged,
exon coordinates are updated, and exon numbers are reindexed. Exons that are
invalid or cannot be matched to any merged region are excluded.
"""
# Subset exons for the given gene
_df_exon = df_exon_nodup.loc[df_exon_nodup["gene_id"] == gene].reset_index(drop=True).copy()
# Return empty DataFrame if the gene has no exons
if _df_exon.empty:
print(f"[Warning] Gene {gene} has no exon entries.")
return pd.DataFrame(columns=df_exon_nodup.columns)
# Filter out invalid exons where start >= end
_df_exon = _df_exon[_df_exon["start"] < _df_exon["end"]].reset_index(drop=True)
if _df_exon.empty:
print(f"[Warning] Gene {gene} has only invalid exon coordinates (start >= end).")
return pd.DataFrame(columns=df_exon_nodup.columns)
# Create IntervalTree from valid exon intervals
try:
valid_intervals = [
Interval(row["start"], row["end"])
for _, row in _df_exon.iterrows()
if row["start"] < row["end"]
]
tree = IntervalTree(valid_intervals)
tree.merge_overlaps()
except Exception as e:
print(f"[Error] IntervalTree failed for gene {gene}: {e}")
return pd.DataFrame(columns=df_exon_nodup.columns)
# Extract merged, sorted non-overlapping intervals
merged_intervals = sorted([(iv.begin, iv.end) for iv in tree])
# Initialize columns for matched merged coordinates
_df_exon["_start"] = math.nan
_df_exon["_end"] = math.nan
# Assign each exon to the first merged interval that fully contains it
for k in range(_df_exon.shape[0]):
for start, end in merged_intervals:
if start <= _df_exon.at[k, "start"] and end >= _df_exon.at[k, "end"]:
_df_exon.at[k, "_start"] = start
_df_exon.at[k, "_end"] = end
break # stop after the first match
# Warn about unmatched exons
unmatched = _df_exon[_df_exon["_start"].isna()]
for _, row in unmatched.iterrows():
print(f"[Warning] Gene {gene}: exon (start={row['start']}, end={row['end']}) was not assigned to any merged region.")
# Drop exons that failed to match any merged interval
_df_exon_out = _df_exon.dropna(subset=["_start", "_end"]).copy()
# If no exons remain after matching, return empty
if _df_exon_out.empty:
print(f"[Warning] Gene {gene} has no exons remaining after merging.")
return pd.DataFrame(columns=df_exon_nodup.columns)
# Remove duplicate merged intervals, keeping only one exon per interval
_df_exon_out = _df_exon_out.drop_duplicates(subset=["_start", "_end"], keep="first").copy()
# Convert merged coordinates to integer and replace original coordinates
_df_exon_out["start"] = _df_exon_out["_start"].astype(int)
_df_exon_out["end"] = _df_exon_out["_end"].astype(int)
_df_exon_out.drop(columns=["_start", "_end"], inplace=True)
# Re-index exon numbers
_df_exon_out = _df_exon_out.reset_index(drop=True)
_df_exon_out["exon_number"] = _df_exon_out.index + 1
return _df_exon_out
[docs]def save_by_batch(df_exon_nodup, save_num=10000, output_dir="./"):
"""
Process exon annotations for each gene in batches and save results as serialized .pkl files.
This function applies `exon_uniq()` to each gene in the input DataFrame and saves the processed
exon data in batches. Each batch contains up to `save_num` genes and is written to a pickle file.
A log file is generated to record processing status and potential errors.
Parameters
----------
df_exon_nodup : pandas.DataFrame
DataFrame containing filtered exon annotations (typically from `prepare_exon_gtf`).
save_num : int, optional
Number of genes to include per output batch file (default is 10,000).
output_dir : str, optional
Path to the output directory where batch `.pkl` files and the log file will be stored
(default is "./").
Returns
-------
None
This function writes intermediate results to disk but does not return any object.
"""
print(f"[Step] Start processing and saving exons by batch...")
# 1. Create temp folder
temp_dir = os.path.join(output_dir, "dolphin_exon_gtf", "temp")
os.makedirs(temp_dir, exist_ok=True)
# delete all .pkl 文件
for f in glob.glob(os.path.join(temp_dir, "df_exon_gtf_*.pkl")):
os.remove(f)
# delete all log 文件
if os.path.exists(os.path.join(temp_dir,"process_log.txt")):
os.remove("temp/process_log.txt")
# 2. Gene list
gene_list = df_exon_nodup["gene_id"].unique().tolist()
total_genes = len(gene_list)
# 3. Initialize list for all batch DataFrames (optional, for further processing)
list_of_df = []
log_path = os.path.join(temp_dir, "process_log.txt")
# 4. Track current batch number
current_batch = 0
df_out = pd.DataFrame()
# 5. Process all genes with a unified tqdm progress bar
for i, gene in enumerate(tqdm(gene_list, total=total_genes, desc="Processing all genes")):
try:
_temp = exon_uniq(df_exon_nodup, gene)
df_out = pd.concat([df_out, _temp], ignore_index=True)
with open(log_path, "a") as f:
f.write(f"Batch {current_batch}, Processed gene: {gene}\n")
except Exception as e:
with open(log_path, "a") as f:
f.write(f"Batch {current_batch}, Error processing gene {gene}: {e}\n")
continue # skip gene with error
# Save this batch every save_num genes or at the end
if (i + 1) % save_num == 0 or (i + 1 == total_genes):
output_path = os.path.join(temp_dir, f"df_exon_gtf_{current_batch}.pkl")
df_out.to_pickle(output_path)
list_of_df.append(df_out)
df_out = pd.DataFrame() # reset for next batch
current_batch += 1
print(f"[Done] Finished saving all exon batches.")
[docs]def combine_saved_batches(folder="./", prefix="df_exon_gtf_"):
"""
Combine multiple saved exon batch files into a single concatenated DataFrame.
This function reads all `.pkl` files in the specified folder that start with the given prefix,
concatenates them in order, and returns a single DataFrame containing all exon records.
Parameters
----------
folder : str, optional
Directory where batch `.pkl` files are stored.
Default is "./", which typically points to the parent of "dolphin_exon_gtf/temp".
prefix : str, optional
Filename prefix used to identify batch `.pkl` files.
Default is ``df_exon_gtf_``.
Returns
-------
pandas.DataFrame
A single DataFrame containing concatenated exon entries from all batch files.
The rows are ordered according to batch and file sorting.
"""
temp_dir = os.path.join(folder, "dolphin_exon_gtf", "temp")
# List all matching .pkl files in the folder
pkl_files = [os.path.join(temp_dir, f) for f in os.listdir(temp_dir)
if f.startswith(prefix) and f.endswith(".pkl")]
# Sort to ensure batch order is respected
pkl_files.sort()
if not pkl_files:
raise FileNotFoundError(f"No .pkl files found in '{temp_dir}' with prefix '{prefix}'.")
# Read and concatenate
dataframes = [pd.read_pickle(f) for f in pkl_files]
combined_df = pd.concat(dataframes, ignore_index=True)
print(f"Successfully combined {len(pkl_files)} files into a single DataFrame with {combined_df.shape[0]} rows.")
return combined_df
[docs]def check_exon_overlap(gtf_df, expected_gene_list=None):
"""
Check for overlapping adjacent exon intervals within each gene.
This function checks whether any exons within the same gene have overlapping intervals,
based on their `start` and `end` positions. Optionally, it compares the set of gene IDs
in the provided DataFrame with an expected list to detect any missing or extra genes.
Parameters
----------
gtf_df : pandas.DataFrame
A DataFrame containing exon annotations with at least the columns: 'gene_id', 'start', and 'end'.
expected_gene_list : list of str, optional
A list of expected gene IDs used to validate that all genes were processed and included in `gtf_df`.
Returns
-------
pandas.DataFrame
A DataFrame containing exon entries that overlap with their adjacent exons within the same gene.
The result may be empty if no overlaps are detected.
"""
df_check = gtf_df.copy()
# Sort by gene and start for correct order
df_check = df_check.sort_values(by=["gene_id", "start"]).reset_index(drop=True)
# Get the start of the next exon within each gene
df_check['_next_start'] = df_check.groupby('gene_id')['start'].shift(-1)
# Initialize overlap check column
df_check['_overlap'] = True
# Check if next exon starts after or at the end of current exon
for i in range(df_check.shape[0]):
if math.isnan(df_check.loc[i, '_next_start']):
df_check.loc[i, '_overlap'] = True # Last exon of gene
else:
df_check.loc[i, '_overlap'] = df_check.loc[i, '_next_start'] >= df_check.loc[i, 'end']
# Filter exons that overlap with the next one
overlap_issues = df_check[df_check['_overlap'] == False]
print(f"Found {overlap_issues.shape[0]} overlapping exon entries.")
# If expected gene list is provided, validate gene count
if expected_gene_list is not None:
unique_genes_in_df = set(gtf_df["gene_id"].unique())
expected_genes = set(expected_gene_list)
missing_genes = expected_genes - unique_genes_in_df
extra_genes = unique_genes_in_df - expected_genes
if not missing_genes and not extra_genes:
print(f"All {len(expected_genes)} expected genes are present in the merged DataFrame.")
else:
print(f"Gene count mismatch:")
if missing_genes:
print(f" - {len(missing_genes)} gene(s) missing: {list(missing_genes)[:5]} ...")
if extra_genes:
print(f" - {len(extra_genes)} unexpected gene(s): {list(extra_genes)[:5]} ...")
return overlap_issues
[docs]def save_gtf_outputs(gtf_df, output_dir="./", base_name="dolphin.exon"):
"""
Save the final exon DataFrame to both GTF and Pickle formats.
This function writes the given exon annotation table to two output files:
one in standard GTF format, and the other as a serialized Python pickle (.pkl).
Parameters
----------
gtf_df : pandas.DataFrame
The exon annotation DataFrame to be saved.
output_dir : str, optional
Directory where the output files will be saved (default is the current directory).
base_name : str, optional
Filename prefix used for both output files (default is "dolphin.exon").
Returns
-------
<output_dir>/dolphin_exon_gtf/<base_name>.gtf : GTF-format annotation file
<output_dir>/dolphin_exon_gtf/<base_name>.pkl : Pickle-serialized DataFrame
"""
# Build full output paths
gtf_path = os.path.join(os.path.join(output_dir, "dolphin_exon_gtf", f"{base_name}.gtf"))
pkl_path = os.path.join(os.path.join(output_dir, "dolphin_exon_gtf", output_dir, f"{base_name}.pkl"))
# Write to GTF
gtfpy.writeGTF(gtf_df, gtf_path)
# Write to Pickle
gtf_df.to_pickle(pkl_path)
print(f"GTF file saved to: {gtf_path}")
print(f"Pickle file saved to: {pkl_path}")
[docs]def generate_nonoverlapping_exons(input_gtf_path: str, output_dir: str = "./", batch_size: int = 10000):
"""
End-to-end pipeline to process an Ensembl GTF file and generate non-overlapping exons per gene.
This function performs the following steps:
1. Load and filter exon features from a GTF file.
2. Remove duplicate exons (by gene_id, start, end).
3. Process each gene to merge overlapping exons using IntervalTree.
4. Save intermediate results in batches.
5. Combine all batches into a final exon DataFrame.
6. Optionally check for residual overlaps.
7. Save the final results in GTF and Pickle formats.
Parameters
----------
input_gtf_path : str
Path to the input Ensembl-format GTF file.
output_dir : str
Directory to save intermediate and final output files.
batch_size : int
Number of genes to process and save per batch (default: 10000).
Returns
-------
gtf_all : pd.DataFrame
Final merged and cleaned exon annotation table.
overlap_issues : pd.DataFrame
DataFrame of overlapping exons detected post-processing (if any).
"""
# Step 1: Load exon entries from GTF and remove duplicates
df_exon_nodup = prepare_exon_gtf(input_gtf_path, output_dir=output_dir)
# Step 2: Process and save exons by gene in batches
save_by_batch(df_exon_nodup, save_num=batch_size, output_dir=output_dir)
# Step 3: Combine saved batches
gtf_all = combine_saved_batches(folder=output_dir)
# Step 4: Check for residual overlaps
overlap_issues = check_exon_overlap(gtf_all, expected_gene_list=df_exon_nodup["gene_id"].unique().tolist())
# Step 5: Save final GTF and pickle files
save_gtf_outputs(gtf_all, output_dir=output_dir)
print(f"[Success] Exon GTF processing pipeline completed.")
return gtf_all, overlap_issues