shahar1 opened a new pull request, #68749:
URL: https://github.com/apache/airflow/pull/68749

   # Human Summary
   
   closes: #39456
   related: #39603
   
   This PR fixes a bug where new asset-scheduled Dags consume historical asset 
events that occurred before the Dag started referencing those assets.
   
   # AI Summary
   
   <details><summary>Click here</summary>
   
   **Bug.** In `SchedulerJobRunner._create_dag_runs_asset_triggered`, the set 
of asset events attached to a run is bounded below by the previous 
asset-triggered run's `run_after`, falling back to `date.min` when there is no 
previous run (`func.coalesce(cte.c.previous_dag_run_run_after, date.min)`). For 
a Dag's very first run this means every asset event ever recorded for its 
assets is consumed — so a Dag newly added to assets that already have history 
reprocesses the entire backlog on its first run.
   
   **Fix.** Add the Dag's earliest schedule-reference `created_at` to the 
`coalesce` fallback chain, before `date.min`:
   
   ```python
   AssetEvent.timestamp > func.coalesce(
       cte.c.previous_dag_run_run_after,
       select(func.min(DagScheduleAssetReference.created_at))
       .where(DagScheduleAssetReference.dag_id == dag.dag_id)
       .scalar_subquery(),
       date.min,
   )
   ```
   
   On the first run the window is bounded at the moment the Dag started 
scheduling on its assets; subsequent runs are unchanged (still bounded by the 
previous run's `run_after`). `created_at` is reliable for this because schedule 
references are updated in place during parsing 
(`dag_processing/collection.py`), not deleted and recreated, so it survives 
re-serialization.
   
   **Scope.** Direct asset references only. The asset-**alias** path is 
deliberately left on `date.min`: an alias consumer is expected to pick up 
events attached to its alias regardless of timing (covered by 
`test_create_dag_runs_asset_alias_with_asset_event_attached`).
   
   This is the same fix idea as the stale 2.x PR #39603, re-expressed for the 
current SQL-native query (a single `coalesce` over a CTE rather than a Python 
`if/else` over a joined reference table; a scalar subquery is used instead of a 
bare correlated column to avoid an implicit cartesian join).
   
   **Validation.**
   - New regression test 
`test_new_asset_triggered_dag_ignores_events_before_creation` — fails on `main` 
(consumes 2 events), passes here (consumes 1).
   - Updated `test_create_dag_runs_assets`, which previously encoded the buggy 
behavior.
   - All asset-related scheduler tests pass.
   
   </details>
   
   ---
   
   ##### Was generative AI tooling used to co-author this PR?
   
   - [X] Yes — Claude Code (Opus 4.8, 1M context)
   
   Generated-by: Claude Code (Opus 4.8, 1M context) following [the 
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to