1fanwang opened a new issue, #66907:
URL: https://github.com/apache/airflow/issues/66907

   ### Apache Airflow version
   
   main (development)
   
   ### What happened
   
   The UI fix in #66552 closed the user-facing symptom from #66462, but the 
underlying drift mechanism still runs every parse cycle and the values it 
computes still surface through the REST API. On the LinkedIn DI Airflow side we 
have CLIs and dashboards that read `next_dagrun_logical_date` and 
`next_dagrun_run_after` from `/api/v2/dags/{id}/details`, and they see exactly 
the same misleading drift the UI fix hid for the web view. Separately, the 
scheduler-side recompute is unconditional, so every parse cycle pays for it 
even for Dags that have been paused for weeks.
   
   The mechanism is the same one #66462 described. 
`DagModel.calculate_dagrun_date_fields` 
(`airflow-core/src/airflow/models/dag.py:757`) is called for every Dag on every 
parse cycle from `airflow-core/src/airflow/dag_processing/collection.py:638`, 
including paused ones. It calls `dag.timetable.next_dagrun_info_v2(...)`, and 
for `catchup=False` timetables the `_skip_to_latest` branch reads the wall 
clock and snaps the proposed start to the 
most-recent-scheduled-interval-before-now. The DagModel fields get overwritten 
with that newly-computed value each cycle, even when nothing has changed about 
the Dag.
   
   ### How to reproduce
   
   Self-contained script (does not need a DB or a running scheduler), runnable 
from any airflow worktree on `main`:
   
   ```python
   """
   Reproduce next_dagrun_* field drift on paused Dags.
   
   `DagModel.calculate_dagrun_date_fields` is called every parse cycle for every
   Dag (including paused ones). For `catchup=False` timetables, the timetable's
   `_skip_to_latest` reads the wall clock and snaps the proposed start to the
   most-recent-scheduled-interval-before-now. The DagModel fields therefore
   advance every parse cycle even when the Dag has been paused for weeks and no
   new run will fire.
   
   Run via `uv run --project airflow-core python 
/tmp/paused_next_run_drift_repro.py`
   from any airflow worktree on upstream/main.
   """
   
   from __future__ import annotations
   
   import datetime as dt
   
   import time_machine
   from pendulum import timezone as pendulum_tz
   
   from airflow.models.dag import DagModel
   from airflow.timetables.base import DagRunInfo
   from airflow.timetables.interval import CronDataIntervalTimetable
   
   
   class _FakeDag:
       """Stand-in for SerializedDAG/LazyDeserializedDAG.
   
       `DagModel.calculate_dagrun_date_fields` only touches `dag.timetable` and
       `dag.next_dagrun_info`. The latter normally forwards into
       `timetable.next_dagrun_info_v2`, which is all this repro needs.
       """
   
       def __init__(self, timetable: CronDataIntervalTimetable, start_date: 
dt.datetime) -> None:
           self.timetable = timetable
           self._start_date = start_date
   
       def next_dagrun_info(
           self, *, last_automated_run_info: DagRunInfo | None
       ) -> DagRunInfo | None:
           from airflow.timetables.base import TimeRestriction
   
           restriction = TimeRestriction(
               earliest=self._start_date, latest=None, catchup=False
           )
           return self.timetable.next_dagrun_info_v2(
               last_dagrun_info=last_automated_run_info, restriction=restriction
           )
   
   
   class _FakeDagRun:
       """Stand-in for DagRun for `timetable.run_info_from_dag_run`.
   
       The interval timetable's `run_info_from_dag_run` reads `run_after`,
       `partition_date`, `partition_key`, then calls
       `airflow.models.dag.get_run_data_interval(dag, dag_run)`. The latter
       reads `data_interval_start` / `data_interval_end` from the run.
       """
   
       def __init__(self, logical_date: dt.datetime, period: dt.timedelta) -> 
None:
           self.logical_date = logical_date
           self.run_after = logical_date + period
           self.data_interval_start = logical_date
           self.data_interval_end = logical_date + period
           self.partition_date = None
           self.partition_key = None
   
   
   def main() -> None:
       utc = pendulum_tz("UTC")
       timetable = CronDataIntervalTimetable("0 1 * * *", utc)
   
       day0 = dt.datetime(2026, 1, 1, 1, 0, tzinfo=utc)
       period = dt.timedelta(days=1)
       last_run = _FakeDagRun(logical_date=day0, period=period)
   
       dag = _FakeDag(timetable=timetable, start_date=day0)
       dm = DagModel(dag_id="repro_paused_drift", is_paused=True)
   
       header = f"{'parse cycle wall clock':<28} {'next_dagrun':<28} 
{'next_dagrun_create_after':<28}"
       print(header)
       print("-" * len(header))
   
       for offset_days in (1, 3, 7, 14, 30):
           now = day0 + dt.timedelta(days=offset_days, hours=21)
           with time_machine.travel(now, tick=False):
               dm.calculate_dagrun_date_fields(dag, last_automated_run=last_run)
               print(
                   f"{now.isoformat():<28} "
                   f"{str(dm.next_dagrun):<28} "
                   f"{str(dm.next_dagrun_create_after):<28}"
               )
   
   
   if __name__ == "__main__":
       main()
   ```
   
   Output (one entry per simulated parse cycle, all with the same paused Dag 
and the same `last_automated_run`):
   
   ```
   parse cycle wall clock       next_dagrun                  
next_dagrun_create_after
   
--------------------------------------------------------------------------------------
   2026-01-02T22:00:00+00:00    2026-01-02 01:00:00+00:00    2026-01-03 
01:00:00+00:00
   2026-01-04T22:00:00+00:00    2026-01-03 01:00:00+00:00    2026-01-04 
01:00:00+00:00
   2026-01-08T22:00:00+00:00    2026-01-07 01:00:00+00:00    2026-01-08 
01:00:00+00:00
   2026-01-15T22:00:00+00:00    2026-01-14 01:00:00+00:00    2026-01-15 
01:00:00+00:00
   2026-01-31T22:00:00+00:00    2026-01-30 01:00:00+00:00    2026-01-31 
01:00:00+00:00
   ```
   
   The Dag is paused, the `last_automated_run` does not change, but 
`next_dagrun` keeps advancing toward "now" and `next_dagrun_create_after` keeps 
advancing right behind it. Neither will ever fire because the Dag is paused; 
both stay strictly before `now`, which is what #66462 originally flagged.
   
   ### Affected surfaces left over from #66552
   
   1. **REST API.** `GET /api/v2/dags/{id}/details` returns 
`next_dagrun_logical_date` and `next_dagrun_run_after` straight from the 
DagModel. External consumers (CLIs, dashboards, Terraform providers, our 
internal LinkedIn DI tooling) get the same misleading drift the UI is now 
hiding. The UI fix paints over the symptom rather than removing it from the 
data model.
   
   2. **Scheduler-side recompute.** 
`airflow-core/src/airflow/dag_processing/collection.py:638` invokes 
`DagModel.calculate_dagrun_date_fields` unconditionally on every parse cycle 
for every Dag, regardless of `is_paused`. At deployments with thousands of Dags 
and a non-trivial fraction paused, that's wasted CPU on a value that will not 
be used until unpause. This also keeps churning the rows in the metadata DB 
even though nothing meaningful changed.
   
   ### What you think should happen instead
   
   Either or both of options (B) and (C) from #66462. (C) — skip the recompute 
for paused Dags — addresses both surfaces at once and is the recommendation 
here. The data model would stop drifting, the REST API would stop returning the 
drifting value, and the parse-cycle CPU cost would drop.
   
   The caveat is that today's behavior preserves one useful side effect: when a 
Dag is unpaused, the scheduler already has a current value of 
`next_dagrun_create_after` and fires "the missed interval if any" without an 
extra step. If (C) is taken, the unpause path needs to recompute these fields 
before the scheduler reads them, otherwise unpause-after-long-pause stops 
firing the catch-up run. The natural hook is wherever `is_paused` flips from 
`True` to `False` (the REST API endpoint, the CLI, and the bulk update); option 
(C) should land together with that recompute.
   
   (B) alone — nulling the fields in the API serializer for paused Dags — fixes 
the external-consumer drift but leaves the wasted scheduler CPU in place, so 
it's strictly weaker than (C).
   
   This is related to #59618, which already documents that 
`calculate_dagrun_date_fields` is being called with an unreliable 
`last_automated_run` argument; the changes here would narrow that function's 
contract further by making paused Dags a no-op for it.
   
   ### Operating System
   
   n/a (Python-only repro)
   
   ### Versions of Apache Airflow Providers
   
   n/a
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   Reproduced against `apache/airflow` `main` (commit `0cf6462459`). The same 
drift mechanism exists on the 3.x line.
   
   ### Anything else?
   
   Cross-references:
   
   - #66462 — original report, closed by #66552.
   - #66552 — UI-only fix, merged. Covered three UI surfaces (`DagCard.tsx`, 
`DagsList.tsx`, `Header.tsx`) but did not touch the API serializer or the 
scheduler's recompute path.
   - #59618 — existing scheduler-side issue about 
`calculate_dagrun_date_fields` being called with an unreliable 
`last_automated_run`.
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to