pray-calc-ml/scripts/washetdonker_process.py
Aric Camarata ada08e7ec4 data: expand dataset from 5.9k to 91k records via 6 new SQM sources
Add 6 new data collection pipelines and their processed outputs:

Sources added:
- TESS/Stars4All photometer network: 37 months (Jun 2017-Aug 2020),
  ~40k raw events from 100+ European stations via Zenodo archives
- Globe at Night citizen science: 26k twilight observations (2006-2024),
  filtered from 308k total observations for solar depression 6-22 deg
- GaN-MN continuous monitoring: 45 months (Jan 2022-Sep 2025),
  ~12.5k twilight events from 88 stations across 20+ countries
- Galicia SQM network: 14 stations, 1-min resolution, 7.5k events
- Madrid/Majadahonda SQM: multi-year continuous monitoring, 3.1k events
- washetdonker.nl Netherlands: 7 stations, 3.3k morning events
- Academic papers: Jordan (Abed 2015), Fayum Egypt, India photometer

Pipeline changes:
- ingest.py: add all new files to APPROVED_RAW_CSVS allowlist,
  fix filter to use allowlist instead of hardcoded exclusions
- .gitignore: exclude bulk raw data directories (BSRN, TESS, GaN-MN,
  washetdonker, Globe at Night downloads)

Final dataset: 56,668 Fajr + 34,763 Isha = 91,431 total records
Previous: 5,871 Fajr + 46 Isha = 5,917 total records
2026-03-22 16:39:29 -04:00

474 lines
17 KiB
Python

"""
Process washetdonker.nl SQM .dat files to extract twilight events.
Each .dat file covers one night (noon-to-noon UTC) for one station.
The SQM measures sky brightness in MSAS (mag/arcsec^2):
- Day/bright sky: MSAS = -0.000 (saturated, unusable)
- Twilight: MSAS rises from -0.000 to ~5-8 (civil), ~10-12 (nautical), ~17-19 (astro)
- Dark night: MSAS ~19-21 at pristine sites, ~17-19 in the Netherlands
Twilight event extraction strategy
-----------------------------------
For morning twilight (Fajr territory):
1. Find the dark-night plateau after midnight UTC.
2. Detect when MSAS begins monotonically declining after the plateau.
3. The first sustained drop is astronomical twilight.
4. We record the time when MSAS crosses several thresholds and compute the
solar depression angle at each threshold.
We do NOT attempt to identify true Fajr from SQM data alone — that requires
human observation. Instead, we extract:
- The time when the sky starts brightening (astronomical twilight onset)
- The solar depression angle at that moment
- Times and angles at MSAS = 17.0, 15.0, 12.0, 10.0, 8.0, 6.0, 4.0
These become calibration points: if a human says "Fajr was at 12°", we can look
at what the SQM was reading at that location on nearby nights at 12° depression
and see if the brightness threshold is consistent.
Output CSVs
-----------
data/raw/washetdonker/events_morning.csv — morning twilight events
data/raw/washetdonker/events_evening.csv — evening twilight events
data/raw/raw_sightings/washetdonker_astronomical_morning.csv — pipeline-ready
Usage:
python scripts/washetdonker_process.py [--station Texel] [--year 2024]
python scripts/washetdonker_process.py --all
"""
from __future__ import annotations
import argparse
import csv
import logging
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Generator, Optional
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
datefmt="%H:%M:%S",
)
log = logging.getLogger(__name__)
REPO_ROOT = Path(__file__).parent.parent
sys.path.insert(0, str(REPO_ROOT))
from src.angle_calc import depression_angle # noqa: E402
RAW_ROOT = REPO_ROOT / "data" / "raw" / "washetdonker"
SIGHTINGS_DIR = REPO_ROOT / "data" / "raw" / "raw_sightings"
# MSAS thresholds at which we record a crossing event.
# Lower MSAS = brighter sky. Morning = sky gets brighter = MSAS decreasing.
MSAS_THRESHOLDS = [18.0, 17.0, 16.0, 15.0, 14.0, 13.0, 12.0, 11.0, 10.0, 9.0, 8.0, 6.0]
# Minimum MSAS for a "valid dark night" reading (filters daytime saturation)
MSAS_MIN_VALID = 4.0
# Dark-night plateau: readings must reach this to be a genuine astronomical night
MSAS_DARK_THRESHOLD = 14.0
# A twilight event is real if MSAS changes by at least this much from plateau
MSAS_SIGNIFICANT_CHANGE = 2.0
def parse_header(lines: list[str]) -> dict:
"""Extract lat, lng, elevation, station name, timezone from header lines."""
meta = {"lat": None, "lng": None, "elevation_m": 0.0, "station": "", "utc_offset": 0.0}
for line in lines:
if not line.startswith("#"):
break
line = line[1:].strip()
if line.startswith("Position:"):
parts = line.split(":", 1)[1].strip().split(",")
try:
meta["lat"] = float(parts[0].strip())
meta["lng"] = float(parts[1].strip())
if len(parts) > 2:
meta["elevation_m"] = float(parts[2].strip())
except (ValueError, IndexError):
pass
elif line.startswith("Location name:"):
meta["station"] = line.split(":", 1)[1].strip()
elif line.startswith("Local timezone:"):
tz_str = line.split(":", 1)[1].strip()
# e.g. "UTC+1" or "UTC+2" or "UTC+0"
m = re.search(r"UTC([+-]\d+)", tz_str)
if m:
meta["utc_offset"] = float(m.group(1))
return meta
def read_dat_file(path: Path) -> tuple[dict, list[tuple[datetime, float]]]:
"""
Parse a washetdonker .dat file.
Returns (metadata_dict, [(utc_datetime, msas_value), ...])
Only rows with MSAS > MSAS_MIN_VALID are included (filters daylight saturation).
"""
lines = path.read_text(encoding="utf-8", errors="replace").splitlines()
# Find header end
header_lines = []
data_start = 0
for i, line in enumerate(lines):
if line.startswith("#"):
header_lines.append(line)
else:
data_start = i
break
meta = parse_header(header_lines)
rows: list[tuple[datetime, float]] = []
for line in lines[data_start:]:
line = line.strip()
if not line:
continue
parts = line.split(";")
if len(parts) < 6:
continue
try:
utc_str = parts[0].strip()
msas = float(parts[5].strip())
except (ValueError, IndexError):
continue
if msas < MSAS_MIN_VALID:
continue # daytime saturation reading
try:
utc_dt = datetime.strptime(utc_str, "%Y-%m-%dT%H:%M:%S.%f").replace(
tzinfo=timezone.utc
)
except ValueError:
try:
utc_dt = datetime.strptime(utc_str, "%Y-%m-%dT%H:%M:%S").replace(
tzinfo=timezone.utc
)
except ValueError:
continue
rows.append((utc_dt, msas))
return meta, rows
def find_plateau(rows: list[tuple[datetime, float]]) -> Optional[float]:
"""
Find the median MSAS of the dark-night plateau.
Uses the darkest 30-minute window in the night.
Returns None if no valid dark night.
"""
if not rows:
return None
# Only look at readings with MSAS > MSAS_DARK_THRESHOLD
dark_rows = [msas for _, msas in rows if msas >= MSAS_DARK_THRESHOLD]
if len(dark_rows) < 5:
return None
dark_rows.sort(reverse=True)
# Use 75th percentile of dark readings as plateau estimate
idx = max(0, int(len(dark_rows) * 0.25))
return dark_rows[idx]
def find_morning_twilight_events(
rows: list[tuple[datetime, float]],
meta: dict,
) -> list[dict]:
"""
Find the morning twilight onset and threshold crossings.
Strategy:
1. Find the dark night plateau MSAS.
2. Find the minimum-MSAS (darkest point) in the second half of the night.
3. From there, scan forward to find when MSAS starts dropping.
4. For each MSAS threshold, find the first crossing time and compute solar angle.
"""
if not rows or meta["lat"] is None:
return []
plateau = find_plateau(rows)
if plateau is None:
return []
# Only look at rows after midnight UTC (morning half)
# The file covers one noon-to-noon period; morning is after UTC midnight
midnight_rows = [(dt, msas) for dt, msas in rows if dt.hour >= 0 and dt.hour < 12]
if len(midnight_rows) < 10:
return []
# Find the darkest point (highest MSAS) in morning half
darkest_idx = max(range(len(midnight_rows)), key=lambda i: midnight_rows[i][1])
darkest_msas = midnight_rows[darkest_idx][1]
# If darkest MSAS is too low, cloudy/moon/bad night — skip
if darkest_msas < MSAS_DARK_THRESHOLD:
return []
# Scan forward from darkest point to find threshold crossings
events = []
threshold_crossed = set()
for i in range(darkest_idx, len(midnight_rows)):
dt, msas = midnight_rows[i]
for threshold in MSAS_THRESHOLDS:
if threshold in threshold_crossed:
continue
if msas <= threshold <= darkest_msas:
threshold_crossed.add(threshold)
# Compute solar depression angle at this moment
try:
angle = depression_angle(
dt.replace(tzinfo=None),
meta["lat"],
meta["lng"],
meta["elevation_m"],
)
except Exception as exc:
log.debug("angle_calc error: %s", exc)
continue
events.append({
"station": meta["station"],
"lat": meta["lat"],
"lng": meta["lng"],
"elevation_m": meta["elevation_m"],
"utc_dt": dt.strftime("%Y-%m-%dT%H:%M:%SZ"),
"msas": round(msas, 3),
"msas_threshold": threshold,
"plateau_msas": round(darkest_msas, 3),
"solar_depression_deg": round(angle, 4),
"direction": "morning",
"source": f"washetdonker.nl/{meta['station']}",
})
return events
def find_evening_twilight_events(
rows: list[tuple[datetime, float]],
meta: dict,
) -> list[dict]:
"""
Find evening twilight threshold crossings.
The file starts at noon UTC. Evening twilight is in the first part,
when MSAS rises from -0.000 (saturated) to dark values.
We find the first time MSAS exceeds each threshold.
"""
if not rows or meta["lat"] is None:
return []
plateau = find_plateau(rows)
if plateau is None:
return []
# Evening half: from file start until midnight UTC
evening_rows = [(dt, msas) for dt, msas in rows if dt.hour >= 12 or dt.hour < 0]
# Actually the file noon-to-noon. Rows before midnight are evening.
# UTC rows with hour 12-23 are evening.
evening_rows = [(dt, msas) for dt, msas in rows if dt.hour >= 12]
if len(evening_rows) < 10:
return []
darkest_msas = max(msas for _, msas in rows)
if darkest_msas < MSAS_DARK_THRESHOLD:
return []
events = []
threshold_crossed = set()
# Evening: MSAS increasing (sky getting darker)
# Find each threshold crossing from below (first time MSAS exceeds threshold)
for dt, msas in evening_rows:
for threshold in MSAS_THRESHOLDS:
if threshold in threshold_crossed:
continue
if msas >= threshold:
threshold_crossed.add(threshold)
try:
angle = depression_angle(
dt.replace(tzinfo=None),
meta["lat"],
meta["lng"],
meta["elevation_m"],
)
except Exception as exc:
log.debug("angle_calc error: %s", exc)
continue
events.append({
"station": meta["station"],
"lat": meta["lat"],
"lng": meta["lng"],
"elevation_m": meta["elevation_m"],
"utc_dt": dt.strftime("%Y-%m-%dT%H:%M:%SZ"),
"msas": round(msas, 3),
"msas_threshold": threshold,
"plateau_msas": round(darkest_msas, 3),
"solar_depression_deg": round(angle, 4),
"direction": "evening",
"source": f"washetdonker.nl/{meta['station']}",
})
return events
EVENT_FIELDNAMES = [
"station", "lat", "lng", "elevation_m",
"utc_dt", "msas", "msas_threshold", "plateau_msas",
"solar_depression_deg", "direction", "source",
]
# Pipeline-compatible fieldnames for morning events at the 12.0 MSAS threshold
# (approximate astronomical twilight boundary)
PIPELINE_FIELDNAMES = [
"prayer", "date_local", "time_local", "utc_offset",
"lat", "lng", "elevation_m", "source", "notes",
]
def event_to_pipeline_row(event: dict, utc_offset: float) -> dict:
"""Convert an event dict to the pipeline CSV format."""
utc_dt = datetime.strptime(event["utc_dt"], "%Y-%m-%dT%H:%M:%SZ")
from datetime import timedelta
local_dt = utc_dt + timedelta(hours=utc_offset)
return {
"prayer": "fajr",
"date_local": local_dt.strftime("%Y-%m-%d"),
"time_local": local_dt.strftime("%H:%M"),
"utc_offset": utc_offset,
"lat": event["lat"],
"lng": event["lng"],
"elevation_m": event["elevation_m"],
"source": event["source"],
"notes": (
f"SQM MSAS={event['msas']} at threshold={event['msas_threshold']} "
f"plateau={event['plateau_msas']} "
f"solar_depression={event['solar_depression_deg']}deg "
f"direction=morning"
),
}
def process_all(station_filter: Optional[str], year_filter: Optional[int]) -> dict:
"""Process all downloaded .dat files. Returns counts dict."""
morning_events: list[dict] = []
evening_events: list[dict] = []
dat_files = list(RAW_ROOT.rglob("*.dat"))
if not dat_files:
log.warning("No .dat files found in %s", RAW_ROOT)
return {"files": 0, "morning": 0, "evening": 0}
if station_filter:
dat_files = [f for f in dat_files if station_filter in str(f)]
if year_filter:
dat_files = [f for f in dat_files if f"/{year_filter}/" in str(f)]
log.info("Processing %d .dat files ...", len(dat_files))
for i, path in enumerate(sorted(dat_files)):
if i % 100 == 0 and i > 0:
log.info(" %d / %d files (morning=%d evening=%d)",
i, len(dat_files), len(morning_events), len(evening_events))
try:
meta, rows = read_dat_file(path)
except Exception as exc:
log.debug("Parse error %s: %s", path.name, exc)
continue
if not rows:
continue
m_events = find_morning_twilight_events(rows, meta)
e_events = find_evening_twilight_events(rows, meta)
morning_events.extend(m_events)
evening_events.extend(e_events)
log.info("Extraction complete. Morning events: %d Evening events: %d",
len(morning_events), len(evening_events))
# Write full events CSVs
RAW_ROOT.mkdir(parents=True, exist_ok=True)
morning_out = RAW_ROOT / "events_morning.csv"
with open(morning_out, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=EVENT_FIELDNAMES)
writer.writeheader()
writer.writerows(morning_events)
log.info("Written: %s", morning_out)
evening_out = RAW_ROOT / "events_evening.csv"
with open(evening_out, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=EVENT_FIELDNAMES)
writer.writeheader()
writer.writerows(evening_events)
log.info("Written: %s", evening_out)
# Write pipeline-compatible CSV for morning events at the astronomical threshold
# Pick the threshold closest to known astronomical twilight (~18°)
# For Netherlands stations (high latitude, low MSAS ceilings), use MSAS=17.0
# We export ALL threshold crossings for the pipeline to pick the best one
SIGHTINGS_DIR.mkdir(parents=True, exist_ok=True)
pipeline_out = SIGHTINGS_DIR / "washetdonker_morning.csv"
# Use the MSAS=15.0 threshold as a reasonable proxy for deep twilight onset
# (roughly maps to astronomical twilight at Dutch latitudes)
pipeline_rows = []
utc_offset_cache: dict[str, float] = {}
for ev in morning_events:
if ev["msas_threshold"] != 15.0:
continue
if ev["solar_depression_deg"] < 7.0:
continue # too close to horizon — unreliable
# Use stored utc_offset from the station's most-recently-seen offset
# (we don't store it in the event; approximate from known Netherlands offset)
# Netherlands is UTC+1 (winter) / UTC+2 (summer). Approximate from date.
utc_dt = datetime.strptime(ev["utc_dt"], "%Y-%m-%dT%H:%M:%SZ")
month = utc_dt.month
utc_offset = 2.0 if 3 <= month <= 10 else 1.0
pipeline_rows.append(event_to_pipeline_row(ev, utc_offset))
with open(pipeline_out, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=PIPELINE_FIELDNAMES)
writer.writeheader()
writer.writerows(pipeline_rows)
log.info("Written pipeline CSV: %s (%d rows)", pipeline_out, len(pipeline_rows))
return {
"files": len(dat_files),
"morning": len(morning_events),
"evening": len(evening_events),
"pipeline_rows": len(pipeline_rows),
}
def main():
ap = argparse.ArgumentParser(description="Process washetdonker SQM .dat files")
ap.add_argument("--station", help="Filter to one station name")
ap.add_argument("--year", type=int, help="Filter to one year")
ap.add_argument("--all", dest="all", action="store_true",
help="Process all downloaded files")
args = ap.parse_args()
counts = process_all(args.station, args.year)
print(f"\nSummary:")
print(f" Files processed: {counts['files']}")
print(f" Morning events: {counts['morning']}")
print(f" Evening events: {counts['evening']}")
print(f" Pipeline rows: {counts.get('pipeline_rows', 0)}")
if __name__ == "__main__":
main()