Data Pipeline
Six data streams feeding the engine — NWP ensemble, satellite, observed ground truth, market prices, settlement, and engine output.
The engine requires six distinct data streams feeding a time-series store. Each stream serves a different role in the calibration and product pipeline.
Stream Architecture
Six Data Streams
Stream 1 — NWP Ensemble Ingest
| Attribute | Detail |
|---|---|
| Source | Open-Meteo API (3 endpoints) |
| Models | GFS 31-member, ECMWF 51-member, HRRR 3km (US) |
| Frequency | Every 6 hours (aligned to 00Z/06Z/12Z/18Z model runs, pulled ~3–4h after run) |
| Format | JSON via REST API |
| Purpose | Raw ensemble data for probability calibration and live signal generation |
Implemented endpoints:
| Endpoint | URL | Use | Variables |
|---|---|---|---|
| Ensemble (live) | ensemble-api.open-meteo.com/v1/ensemble | GFS 31-member + ECMWF IFS 51-member for production | temperature_2m_max, temperature_2m_min, precipitation_sum, wind_speed_10m_max, wind_gusts_10m_max, shortwave_radiation_sum |
| Historical Forecast | historical-forecast-api.open-meteo.com/v1/forecast | Archived GFS deterministic forecasts for backtesting | Same variables (deterministic, no ensemble members) |
| ERA5 Archive | archive-api.open-meteo.com/v1/archive | ERA5 reanalysis for irradiance ground truth | shortwave_radiation_sum |
The Historical Forecast API returns deterministic GFS output only (no ensemble members). This is why backtest mode uses CDF-based probability functions (Gaussian, Gamma, Weibull, Beta) rather than member counting. The live Ensemble API provides both GFS 31-member and ECMWF IFS 51-member forecasts for production signal generation, where member counting is used for all variables.
Implementation — src/data_ingestion/open_meteo.py:
| Function | Purpose | Parameters |
|---|---|---|
download_historical_forecasts() | Backtest forecasts for any variable | variables, unit_params, value_columns (defaults preserve backward compat) |
download_ensemble_forecast() | Single-model live ensemble | value_col, unit_params (generalized from temperature-only) |
download_multi_model_ensemble() | Multi-model ensemble (GFS + ECMWF) | Passes through variable params to single-model function |
Unit conversions applied on ingestion:
| Variable | Open-Meteo Unit | Engine Unit | Conversion |
|---|---|---|---|
| Temperature | °C | °F | * 9/5 + 32 |
| Precipitation | mm | inches | * 0.0393701 |
| Wind speed | km/h | mph | * 0.621371 |
| Irradiance | MJ/m^2 | MJ/m^2 | None (native) |
Stream 2 — Observed Ground Truth (IEM + ERA5)
| Attribute | Detail |
|---|---|
| Sources | Iowa Environmental Mesonet (IEM) ASOS/AWOS + ERA5 Reanalysis |
| Frequency | Daily batch |
| Variables | Temperature (high/low), precipitation, wind speed, wind gust, irradiance |
| Purpose | Backtest validation, bias training, daily scoring |
IEM ASOS Daily API — src/data_ingestion/iem_observed.py:
| Function | IEM vars Param | Actual Column Returned | Engine Column |
|---|---|---|---|
download_daily_highs() | max_tmpf | max_temp_f | max_temp_f |
download_daily_lows() | min_tmpf | min_temp_f | min_temp_f |
download_daily_precip() | precip | precip_in | precip_inches |
download_daily_max_wind() | max_sknt | max_wind_speed_kts | max_wind_mph (knots * 1.15078) |
download_daily_max_gust() | gust_sknt | max_wind_gust_kts | max_gust_mph (knots * 1.15078) |
IEM API quirks:
- Requires state-specific network codes (e.g.,
NY_ASOS,IL_ASOS) - Uses 3-letter station codes (strip leading
Kfrom ICAO:KORD→ORD) - Returns ALL columns regardless of the
varsparameter - Actual column names differ from the
varsparameter names (e.g.,precip_innotprecip,max_wind_speed_ktsnotmax_sknt) - URL:
mesonet.agron.iastate.edu/cgi-bin/request/daily.py
ERA5 Reanalysis — src/data_ingestion/open_meteo_reanalysis.py:
| Function | Variable | Purpose |
|---|---|---|
download_era5_irradiance() | shortwave_radiation_sum | Irradiance ground truth (IEM has no solar data) |
ERA5 is a global atmospheric reanalysis that assimilates millions of observations. Its irradiance values are independent of GFS/ECMWF forecasts, making it a valid ground truth source for backtest evaluation.
Stream 3 — Satellite Ground Truth (SatSure Sparta)
| Attribute | Detail |
|---|---|
| Source | SatSure Sparta API |
| Frequency | Daily |
| Variables | LST, soil moisture, NDVI, cloud cover, irradiance |
| Purpose | Layer 1 observations for bias correction enhancement and settlement verification |
Satellite data provides independent ground truth that NWP models don't see. It feeds two functions: improving bias correction (what actually happened vs what was forecast) and providing verification data for warranty settlement.
Stream 4 — Market Price Feed (ForecastEx)
| Attribute | Detail |
|---|---|
| Source | IBKR API (ForecastEx markets) |
| Frequency | Near real-time (WebSocket or 30-second polling) |
| Data | Contract prices for binary weather contracts |
| Purpose | Edge calculation — calibrated probability vs market consensus |
ForecastEx prices represent market consensus probabilities. Comparing the engine's calibrated probability against market price reveals the edge — and provides an independent validation of calibration quality.
Stream 5 — Settlement / Verification
| Attribute | Detail |
|---|---|
| Source | NWS Climatological Reports + SatSure |
| Frequency | Daily batch (after observation period closes) |
| Data | Observed daily max/min temperature, rainfall, wind speed, irradiance |
| Purpose | Settlement data → Brier score calculation → Calibration tab updates |
This is the ground truth stream for ForecastEx settlement. After each observation period closes, verified outcomes are compared against the engine's predictions to calculate Brier scores and update the reliability diagram.
Stream 6 — Engine Output
| Attribute | Detail |
|---|---|
| Source | Internal processing pipeline |
| Frequency | After each NWP ingest (~4 times daily) + midday D+0 update |
| Data | Calibrated probabilities, risk scores, trade signals |
| Purpose | Feeds dashboard tabs, Risk API endpoints, signal logs |
Daily Automation Pipeline
scripts/daily_pipeline.sh — triggered via launchd, 8 steps:
Step 1: Score Yesterday's Predictions
| Sub-step | Script | Description |
|---|---|---|
| 1a | daily_scoring.py --variable high | Score DH predictions against IEM observed daily highs |
| 1b | daily_scoring.py --variable low | Score NLL predictions against IEM observed daily lows |
Step 2: Generate Tomorrow's Temperature Signals
| Sub-step | Script | Description |
|---|---|---|
| 2a | run_phase2.py --signal-only --variable high --models gfs_seamless ecmwf_ifs025 | DH multi-model ensemble → bias correct → calibrate → edge/Kelly/z-score |
| 2b | run_phase2.py --signal-only --variable low --models gfs_seamless ecmwf_ifs025 | NLL multi-model ensemble → same pipeline |
Step 3: D+0 METAR Blending
| Script | Description |
|---|---|
run_d0_update.py | Fetch hourly METAR observations, Bayesian-blend with morning forecast for settlement-day contracts |
Steps 4–6: Non-Temperature Variable Signals
| Step | Script | Description |
|---|---|---|
| 4 | run_variable_signals.py --variable rainfall --risk-scores | Rainfall probability curves + construction delay risk scores |
| 5 | run_variable_signals.py --variable wind_speed --risk-scores | Wind probability curves + operational risk scores |
| 6 | run_variable_signals.py --variable irradiance --risk-scores | Irradiance probability curves + solar shortfall risk scores |
run_variable_signals.py is the generic signal generator for non-temperature variables. It:
- Loads the variable via
get_variable(name)from the registry - Fetches ensemble forecasts via
var.ingest_ensemble() - Applies bias correction via
var.bias_correct_ensemble() - Computes probabilities via ensemble member counting
- Optionally generates risk scores via
RiskScorer - Saves signals to
data/signals/{variable}/signals_YYYY-MM-DD.csv
Step 7: Divergence Monitor
| Script | Description |
|---|---|
run_divergence.py | Compare afternoon ensemble against morning baseline, flag >10pp probability shifts or direction reversals |
Output actions: HOLD (normal), FLAG (significant shift >= threshold), REVERSE (direction flipped YES↔NO).
Step 8: Dashboard Export
| Script | Description |
|---|---|
export_dashboard_data.py | Export all signals, scores, track record, calibration, variable signals, and system status to dashboard JSON |
Export outputs (written to dashboard/public/data/):
| File | Contents |
|---|---|
signals_latest.json | DH + NLL temperature trade signals |
scores_latest.json | Scored predictions vs observations |
track_record.json | Daily Brier scores and direction accuracy |
calibration.json | Reliability diagram + Phase 1 summary per variable |
variable_signals.json | Rainfall, wind, irradiance probability signals |
system_status.json | Variable registry, pipeline status, config dump, data freshness |
Midday Automation Pipeline
scripts/midday_pipeline.sh — triggered via LaunchAgent at noon local time:
| Step | Script | Description |
|---|---|---|
| 1 | run_d0_update.py --variable high | Bayesian-blend METAR observations with morning DH forecast |
| 2 | run_d0_update.py --variable low | Same for NLL contracts |
| 3 | run_divergence.py --variable high | Flag >10pp shifts or direction reversals vs morning baseline |
| 4 | run_divergence.py --variable low | Same for NLL contracts |
Generic Backtest Pipeline
scripts/run_phase_generic.py — unified Phase 0/1 orchestrator for all variables:
python scripts/run_phase_generic.py --variable rainfall --phase 0+1
python scripts/run_phase_generic.py --variable wind_speed --phase 0+1
python scripts/run_phase_generic.py --variable irradiance --phase 0+1
python scripts/run_phase_generic.py --variable temperature_high --phase 0+1
Phase 0 (Baseline):
var.ingest_historical()— download historical forecasts from Open-Meteovar.observe()— download observations from IEM or ERA5- Merge on
[station, date] - Compute MAE, bias, RMSE per city and overall
- Save paired data to
data/backtest/{variable}/forecast_vs_observed.csv
Phase 1 (Calibration):
- Split train/test using
TRAIN_END_DATE/TEST_START_DATE - Train bias parameters (additive or multiplicative based on
var.config.bias_method) - Generate backtest probabilities via
generate_backtest_probabilities_generic() - Train isotonic calibration on raw probabilities
- Compute Brier scores (raw and calibrated)
- Save artifacts:
bias_params.json,calibration_model.pkl,phase1_summary.json
Storage: TimescaleDB
The engine uses TimescaleDB — a PostgreSQL extension with native time-series support.
Why TimescaleDB:
- Standard SQL (no proprietary query language)
- Automatic time-partitioning (hypertables)
- Efficient compression for historical data
- Native continuous aggregates for roll-up calculations
- PostgreSQL ecosystem (PostGIS for spatial queries, standard tooling)
Schema design stores data in time-series hypertables per stream type, with location and variable as indexed dimensions. Each prediction record includes full provenance metadata for audit trail compliance.
Data Directories
| Path | Contents |
|---|---|
data/backtest/ | Temperature (DH) paired data, bias params, calibration model |
data/backtest/ecmwf/ | ECMWF-specific DH bias params and calibration |
data/backtest/low/ | Temperature (NLL) bias params and calibration |
data/backtest/ecmwf_low/ | ECMWF-specific NLL bias params |
data/backtest/rainfall/ | Rainfall paired data, multiplicative bias, calibration |
data/backtest/irradiance/ | Irradiance paired data, multiplicative bias, calibration |
data/backtest/wind/ | Wind paired data, multiplicative bias, calibration |
data/signals/ | DH trade signals (daily CSV) |
data/signals/low/ | NLL trade signals |
data/signals/rainfall/ | Rainfall probability signals |
data/signals/wind/ | Wind probability signals |
data/signals/irradiance/ | Irradiance probability signals |
data/logs/ | Daily pipeline logs |
Deployment
Initial deployment runs on a single VPS with Docker containers:
- Ingestion service container
- TimescaleDB container
- Engine pipeline container (scheduled via launchd/cron)
- FastAPI backend container (Risk API + probability endpoints)
- Nginx reverse proxy
This is sufficient for initial scale — NWP data volume is modest (megabytes per ingest cycle, not gigabytes). Kubernetes migration is deferred until multi-project scale justifies the operational complexity.