""" Master data pipeline. Runs all collectors, back-calculates solar depression angles for each verified sighting, optionally looks up missing elevations, and writes two clean CSVs: data/processed/fajr_angles.csv data/processed/isha_angles.csv Each row represents ONE confirmed human-verified sighting. Columns: date - YYYY-MM-DD (local calendar date) utc_dt - ISO 8601 UTC datetime of the sighting lat - decimal degrees (north positive) lng - decimal degrees (east positive) elevation_m - metres above sea level fajr_angle - solar depression angle at moment of Fajr sighting (degrees) isha_angle - solar depression angle at moment of Isha sighting (degrees) day_of_year - 1-366 (for seasonality / TOY analysis) source - citation string notes - observer notes Usage: python -m src.pipeline [--no-elevation-lookup] --no-elevation-lookup : skip Open-Elevation API calls (use 0 for unknowns) """ import argparse import sys import os from pathlib import Path from datetime import timezone import pandas as pd # Add project root to path ROOT = Path(__file__).parent.parent sys.path.insert(0, str(ROOT)) from src.angle_calc import depression_angle from src.collect.openfajr import fetch_openfajr from src.collect.verified_sightings import load_verified_sightings from src.elevation import get_elevations_batch from src.ingest import ingest_all_raw_csvs PROCESSED_DIR = ROOT / "data" / "processed" def _raw_to_df(records: list[dict]) -> pd.DataFrame: """Convert a list of standardized raw record dicts to a DataFrame.""" from datetime import datetime, timedelta rows = [] for r in records: try: dt_local = datetime.strptime( f"{r['date_local']} {r['time_local']}", "%Y-%m-%d %H:%M" ) utc_offset = float(r.get("utc_offset", 0)) utc_dt = dt_local - timedelta(hours=utc_offset) rows.append({ "prayer": r["prayer"], "date": r["date_local"], "utc_dt": utc_dt, "lat": float(r["lat"]), "lng": float(r["lng"]), "elevation_m": float(r.get("elevation_m") or 0), "source": r.get("source", ""), "notes": r.get("notes", ""), }) except Exception as e: import logging logging.getLogger(__name__).warning("Skipping raw record: %s — %s", r, e) return pd.DataFrame(rows) def build_dataset( lookup_elevation: bool = True, ) -> tuple[pd.DataFrame, pd.DataFrame]: """ Run all collectors, compute depression angles, return (fajr_df, isha_df). """ print("Loading OpenFajr Birmingham iCal feed...") openfajr_df = fetch_openfajr() print(f" {len(openfajr_df)} Fajr records from OpenFajr") print("Loading manually verified sightings...") manual_df = load_verified_sightings() print(f" {len(manual_df)} manually compiled records") print("Loading ingested raw CSV sightings...") raw_records = ingest_all_raw_csvs(lookup_elevation=False) raw_df = _raw_to_df(raw_records) if len(raw_df) > 0: print(f" {len(raw_df)} records from raw CSVs") else: print(" 0 raw CSV records found") all_df = pd.concat([openfajr_df, manual_df, raw_df], ignore_index=True) # Deduplicate: same prayer + same date + same lat/lng (rounded to 3 decimal # places, ~111m) should produce identical angles. Keep the first occurrence # and log any removed records so cross-source overlaps are visible. all_df["_lat_r"] = all_df["lat"].round(3) all_df["_lng_r"] = all_df["lng"].round(3) dup_mask = all_df.duplicated(subset=["prayer", "date", "_lat_r", "_lng_r"], keep="first") if dup_mask.any(): print(f" Deduplicating {dup_mask.sum()} cross-source duplicate(s) " f"(same prayer+date+location):") for _, row in all_df[dup_mask].iterrows(): print(f" {row['prayer'].upper()} {row['date']} " f"lat={row['lat']:.3f} lng={row['lng']:.3f} — {row['source']}") all_df = all_df[~dup_mask].copy() all_df = all_df.drop(columns=["_lat_r", "_lng_r"]) # Elevation lookup for records with elevation_m == 0 if lookup_elevation: missing_mask = all_df["elevation_m"] == 0.0 n_missing = missing_mask.sum() if n_missing > 0: print(f"Looking up elevations for {n_missing} records...") locs = list(zip( all_df.loc[missing_mask, "lat"], all_df.loc[missing_mask, "lng"], )) elevations = get_elevations_batch(locs) all_df.loc[missing_mask, "elevation_m"] = elevations print(f" Elevation lookup complete") else: print("Skipping elevation lookup (--no-elevation-lookup)") # Back-calculate depression angle for each sighting print("Computing solar depression angles...") angles = [] for _, row in all_df.iterrows(): try: angle = depression_angle( row["utc_dt"], row["lat"], row["lng"], row["elevation_m"], ) except Exception as e: angle = float("nan") angles.append(angle) all_df["angle"] = angles # Drop records with implausible depression angles — data entry / timing errors. # Floor thresholds based on the full body of peer-reviewed sighting research: # Fajr: no confirmed genuine sighting below 7° depression # Isha: no confirmed genuine sighting below 10° depression # These also catch: sun-above-horizon (negative), DST clock-change artifacts, # and mis-estimated observation times that ended up too close to sunrise/sunset. FAJR_MIN_DEG = 7.0 ISHA_MIN_DEG = 10.0 fajr_bad = (all_df["prayer"] == "fajr") & (all_df["angle"] < FAJR_MIN_DEG) isha_bad = (all_df["prayer"] == "isha") & (all_df["angle"] < ISHA_MIN_DEG) bad = fajr_bad | isha_bad | all_df["angle"].isna() if bad.any(): print(f" Dropping {bad.sum()} record(s) with implausible angles " f"(< {FAJR_MIN_DEG}° Fajr / < {ISHA_MIN_DEG}° Isha):") for _, row in all_df[bad].iterrows(): print(f" {row['prayer'].upper()} {row['date']} {row['utc_dt']} " f"lat={row['lat']:.2f} angle={row['angle']:.2f}° — {row['source']}") all_df = all_df[~bad].copy() # Add seasonality feature all_df["day_of_year"] = all_df["utc_dt"].apply( lambda dt: dt.timetuple().tm_yday ) # Split into Fajr and Isha datasets fajr_df = all_df[all_df["prayer"] == "fajr"].copy() isha_df = all_df[all_df["prayer"] == "isha"].copy() fajr_df = fajr_df.rename(columns={"angle": "fajr_angle"}) isha_df = isha_df.rename(columns={"angle": "isha_angle"}) # Final column order for ML fajr_cols = ["date", "utc_dt", "lat", "lng", "elevation_m", "day_of_year", "fajr_angle", "source", "notes"] isha_cols = ["date", "utc_dt", "lat", "lng", "elevation_m", "day_of_year", "isha_angle", "source", "notes"] fajr_df = fajr_df[fajr_cols].sort_values(["lat", "day_of_year"]) isha_df = isha_df[isha_cols].sort_values(["lat", "day_of_year"]) return fajr_df, isha_df def main(): parser = argparse.ArgumentParser(description="Build Fajr/Isha angle datasets") parser.add_argument( "--no-elevation-lookup", action="store_true", help="Skip Open-Elevation API calls", ) args = parser.parse_args() PROCESSED_DIR.mkdir(parents=True, exist_ok=True) fajr_df, isha_df = build_dataset( lookup_elevation=not args.no_elevation_lookup, ) fajr_path = PROCESSED_DIR / "fajr_angles.csv" isha_path = PROCESSED_DIR / "isha_angles.csv" fajr_df.to_csv(fajr_path, index=False) isha_df.to_csv(isha_path, index=False) print(f"\nFajr dataset: {len(fajr_df)} records → {fajr_path}") print(f"Isha dataset: {len(isha_df)} records → {isha_path}") print("\nFajr angle stats:") print(fajr_df["fajr_angle"].describe().to_string()) print("\nIsha angle stats:") if len(isha_df) > 0: print(isha_df["isha_angle"].describe().to_string()) print("\nFajr geographic coverage:") print(f" Latitude range: {fajr_df['lat'].min():.1f}° to {fajr_df['lat'].max():.1f}°") print(f" Unique locations: {len(fajr_df.groupby(['lat','lng']))}") dates = fajr_df["date"].astype(str) print(f" Date range: {dates.min()} to {dates.max()}") if __name__ == "__main__": main()