Cliff Horizon logo

Data Pipeline

Six data streams feeding the engine — NWP ensemble, satellite, observed ground truth, market prices, settlement, and engine output.

The engine requires six distinct data streams feeding a time-series store. Each stream serves a different role in the calibration and product pipeline.

Stream Architecture

Six Data Streams

Stream 1 — NWP Ensemble Ingest

AttributeDetail
SourceOpen-Meteo API (3 endpoints)
ModelsGFS 31-member, ECMWF 51-member, HRRR 3km (US)
FrequencyEvery 6 hours (aligned to 00Z/06Z/12Z/18Z model runs, pulled ~3–4h after run)
FormatJSON via REST API
PurposeRaw ensemble data for probability calibration and live signal generation

Implemented endpoints:

EndpointURLUseVariables
Ensemble (live)ensemble-api.open-meteo.com/v1/ensembleGFS 31-member + ECMWF IFS 51-member for productiontemperature_2m_max, temperature_2m_min, precipitation_sum, wind_speed_10m_max, wind_gusts_10m_max, shortwave_radiation_sum
Historical Forecasthistorical-forecast-api.open-meteo.com/v1/forecastArchived GFS deterministic forecasts for backtestingSame variables (deterministic, no ensemble members)
ERA5 Archivearchive-api.open-meteo.com/v1/archiveERA5 reanalysis for irradiance ground truthshortwave_radiation_sum

The Historical Forecast API returns deterministic GFS output only (no ensemble members). This is why backtest mode uses CDF-based probability functions (Gaussian, Gamma, Weibull, Beta) rather than member counting. The live Ensemble API provides both GFS 31-member and ECMWF IFS 51-member forecasts for production signal generation, where member counting is used for all variables.

Implementation — src/data_ingestion/open_meteo.py:

FunctionPurposeParameters
download_historical_forecasts()Backtest forecasts for any variablevariables, unit_params, value_columns (defaults preserve backward compat)
download_ensemble_forecast()Single-model live ensemblevalue_col, unit_params (generalized from temperature-only)
download_multi_model_ensemble()Multi-model ensemble (GFS + ECMWF)Passes through variable params to single-model function

Unit conversions applied on ingestion:

VariableOpen-Meteo UnitEngine UnitConversion
Temperature°C°F* 9/5 + 32
Precipitationmminches* 0.0393701
Wind speedkm/hmph* 0.621371
IrradianceMJ/m^2MJ/m^2None (native)

Stream 2 — Observed Ground Truth (IEM + ERA5)

AttributeDetail
SourcesIowa Environmental Mesonet (IEM) ASOS/AWOS + ERA5 Reanalysis
FrequencyDaily batch
VariablesTemperature (high/low), precipitation, wind speed, wind gust, irradiance
PurposeBacktest validation, bias training, daily scoring

IEM ASOS Daily API — src/data_ingestion/iem_observed.py:

FunctionIEM vars ParamActual Column ReturnedEngine Column
download_daily_highs()max_tmpfmax_temp_fmax_temp_f
download_daily_lows()min_tmpfmin_temp_fmin_temp_f
download_daily_precip()precipprecip_inprecip_inches
download_daily_max_wind()max_skntmax_wind_speed_ktsmax_wind_mph (knots * 1.15078)
download_daily_max_gust()gust_skntmax_wind_gust_ktsmax_gust_mph (knots * 1.15078)

IEM API quirks:

  • Requires state-specific network codes (e.g., NY_ASOS, IL_ASOS)
  • Uses 3-letter station codes (strip leading K from ICAO: KORDORD)
  • Returns ALL columns regardless of the vars parameter
  • Actual column names differ from the vars parameter names (e.g., precip_in not precip, max_wind_speed_kts not max_sknt)
  • URL: mesonet.agron.iastate.edu/cgi-bin/request/daily.py

ERA5 Reanalysis — src/data_ingestion/open_meteo_reanalysis.py:

FunctionVariablePurpose
download_era5_irradiance()shortwave_radiation_sumIrradiance ground truth (IEM has no solar data)

ERA5 is a global atmospheric reanalysis that assimilates millions of observations. Its irradiance values are independent of GFS/ECMWF forecasts, making it a valid ground truth source for backtest evaluation.

Stream 3 — Satellite Ground Truth (SatSure Sparta)

AttributeDetail
SourceSatSure Sparta API
FrequencyDaily
VariablesLST, soil moisture, NDVI, cloud cover, irradiance
PurposeLayer 1 observations for bias correction enhancement and settlement verification

Satellite data provides independent ground truth that NWP models don't see. It feeds two functions: improving bias correction (what actually happened vs what was forecast) and providing verification data for warranty settlement.

Stream 4 — Market Price Feed (ForecastEx)

AttributeDetail
SourceIBKR API (ForecastEx markets)
FrequencyNear real-time (WebSocket or 30-second polling)
DataContract prices for binary weather contracts
PurposeEdge calculation — calibrated probability vs market consensus

ForecastEx prices represent market consensus probabilities. Comparing the engine's calibrated probability against market price reveals the edge — and provides an independent validation of calibration quality.

Stream 5 — Settlement / Verification

AttributeDetail
SourceNWS Climatological Reports + SatSure
FrequencyDaily batch (after observation period closes)
DataObserved daily max/min temperature, rainfall, wind speed, irradiance
PurposeSettlement data → Brier score calculation → Calibration tab updates

This is the ground truth stream for ForecastEx settlement. After each observation period closes, verified outcomes are compared against the engine's predictions to calculate Brier scores and update the reliability diagram.

Stream 6 — Engine Output

AttributeDetail
SourceInternal processing pipeline
FrequencyAfter each NWP ingest (~4 times daily) + midday D+0 update
DataCalibrated probabilities, risk scores, trade signals
PurposeFeeds dashboard tabs, Risk API endpoints, signal logs

Daily Automation Pipeline

scripts/daily_pipeline.sh — triggered via launchd, 8 steps:

Step 1: Score Yesterday's Predictions

Sub-stepScriptDescription
1adaily_scoring.py --variable highScore DH predictions against IEM observed daily highs
1bdaily_scoring.py --variable lowScore NLL predictions against IEM observed daily lows

Step 2: Generate Tomorrow's Temperature Signals

Sub-stepScriptDescription
2arun_phase2.py --signal-only --variable high --models gfs_seamless ecmwf_ifs025DH multi-model ensemble → bias correct → calibrate → edge/Kelly/z-score
2brun_phase2.py --signal-only --variable low --models gfs_seamless ecmwf_ifs025NLL multi-model ensemble → same pipeline

Step 3: D+0 METAR Blending

ScriptDescription
run_d0_update.pyFetch hourly METAR observations, Bayesian-blend with morning forecast for settlement-day contracts

Steps 4–6: Non-Temperature Variable Signals

StepScriptDescription
4run_variable_signals.py --variable rainfall --risk-scoresRainfall probability curves + construction delay risk scores
5run_variable_signals.py --variable wind_speed --risk-scoresWind probability curves + operational risk scores
6run_variable_signals.py --variable irradiance --risk-scoresIrradiance probability curves + solar shortfall risk scores

run_variable_signals.py is the generic signal generator for non-temperature variables. It:

  1. Loads the variable via get_variable(name) from the registry
  2. Fetches ensemble forecasts via var.ingest_ensemble()
  3. Applies bias correction via var.bias_correct_ensemble()
  4. Computes probabilities via ensemble member counting
  5. Optionally generates risk scores via RiskScorer
  6. Saves signals to data/signals/{variable}/signals_YYYY-MM-DD.csv

Step 7: Divergence Monitor

ScriptDescription
run_divergence.pyCompare afternoon ensemble against morning baseline, flag >10pp probability shifts or direction reversals

Output actions: HOLD (normal), FLAG (significant shift >= threshold), REVERSE (direction flipped YES↔NO).

Step 8: Dashboard Export

ScriptDescription
export_dashboard_data.pyExport all signals, scores, track record, calibration, variable signals, and system status to dashboard JSON

Export outputs (written to dashboard/public/data/):

FileContents
signals_latest.jsonDH + NLL temperature trade signals
scores_latest.jsonScored predictions vs observations
track_record.jsonDaily Brier scores and direction accuracy
calibration.jsonReliability diagram + Phase 1 summary per variable
variable_signals.jsonRainfall, wind, irradiance probability signals
system_status.jsonVariable registry, pipeline status, config dump, data freshness

Midday Automation Pipeline

scripts/midday_pipeline.sh — triggered via LaunchAgent at noon local time:

StepScriptDescription
1run_d0_update.py --variable highBayesian-blend METAR observations with morning DH forecast
2run_d0_update.py --variable lowSame for NLL contracts
3run_divergence.py --variable highFlag >10pp shifts or direction reversals vs morning baseline
4run_divergence.py --variable lowSame for NLL contracts

Generic Backtest Pipeline

scripts/run_phase_generic.py — unified Phase 0/1 orchestrator for all variables:

python scripts/run_phase_generic.py --variable rainfall --phase 0+1
python scripts/run_phase_generic.py --variable wind_speed --phase 0+1
python scripts/run_phase_generic.py --variable irradiance --phase 0+1
python scripts/run_phase_generic.py --variable temperature_high --phase 0+1

Phase 0 (Baseline):

  1. var.ingest_historical() — download historical forecasts from Open-Meteo
  2. var.observe() — download observations from IEM or ERA5
  3. Merge on [station, date]
  4. Compute MAE, bias, RMSE per city and overall
  5. Save paired data to data/backtest/{variable}/forecast_vs_observed.csv

Phase 1 (Calibration):

  1. Split train/test using TRAIN_END_DATE / TEST_START_DATE
  2. Train bias parameters (additive or multiplicative based on var.config.bias_method)
  3. Generate backtest probabilities via generate_backtest_probabilities_generic()
  4. Train isotonic calibration on raw probabilities
  5. Compute Brier scores (raw and calibrated)
  6. Save artifacts: bias_params.json, calibration_model.pkl, phase1_summary.json

Storage: TimescaleDB

The engine uses TimescaleDB — a PostgreSQL extension with native time-series support.

Why TimescaleDB:

  • Standard SQL (no proprietary query language)
  • Automatic time-partitioning (hypertables)
  • Efficient compression for historical data
  • Native continuous aggregates for roll-up calculations
  • PostgreSQL ecosystem (PostGIS for spatial queries, standard tooling)

Schema design stores data in time-series hypertables per stream type, with location and variable as indexed dimensions. Each prediction record includes full provenance metadata for audit trail compliance.

Data Directories

PathContents
data/backtest/Temperature (DH) paired data, bias params, calibration model
data/backtest/ecmwf/ECMWF-specific DH bias params and calibration
data/backtest/low/Temperature (NLL) bias params and calibration
data/backtest/ecmwf_low/ECMWF-specific NLL bias params
data/backtest/rainfall/Rainfall paired data, multiplicative bias, calibration
data/backtest/irradiance/Irradiance paired data, multiplicative bias, calibration
data/backtest/wind/Wind paired data, multiplicative bias, calibration
data/signals/DH trade signals (daily CSV)
data/signals/low/NLL trade signals
data/signals/rainfall/Rainfall probability signals
data/signals/wind/Wind probability signals
data/signals/irradiance/Irradiance probability signals
data/logs/Daily pipeline logs

Deployment

Initial deployment runs on a single VPS with Docker containers:

  • Ingestion service container
  • TimescaleDB container
  • Engine pipeline container (scheduled via launchd/cron)
  • FastAPI backend container (Risk API + probability endpoints)
  • Nginx reverse proxy

This is sufficient for initial scale — NWP data volume is modest (megabytes per ingest cycle, not gigabytes). Kubernetes migration is deferred until multi-project scale justifies the operational complexity.