1fanwang commented on code in PR #66773:
URL: https://github.com/apache/airflow/pull/66773#discussion_r3232486967
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2938,7 +2944,10 @@ def _find_task_instances_without_heartbeats(self, *,
session: Session) -> list[T
.join(DM, TI.dag_id == DM.dag_id)
.where(
TI.state.in_((TaskInstanceState.RUNNING,
TaskInstanceState.RESTARTING)),
- TI.last_heartbeat_at < limit_dttm,
+ or_(
+ TI.last_heartbeat_at < limit_dttm,
Review Comment:
**Update 2026-05-19:** the timing assumption in this argument was wrong.
Live repro on a freshly-migrated A3 deployment showed
`adopt_or_reset_orphaned_tasks` fires within ~14 ms of scheduler startup,
rotates the NULL-heartbeat TIs into `task_instance_history`, and
`_find_task_instances_without_heartbeats` only ever runs against the fresh rows
that already have `last_heartbeat_at` populated by `/run`. The cleanup query
never observes the NULL state, so the predicate this PR adds is unreachable on
a normal restart path. See the
[close-comment](https://github.com/apache/airflow/pull/66773#discussion_r3269906184)
for the trace and the real-bug hypothesis on #58307.
<details>
<summary>Original argument, preserved for thread continuity</summary>
Fair point — let me walk through it. The `/run` endpoint you linked does set
`state=RUNNING` and `last_heartbeat_at=utcnow()` atomically in the same
`UPDATE`, so in steady-state Airflow 3 a freshly-running TI is not NULL on that
path.
The case I'm targeting is the Airflow 2 → 3 upgrade legacy state. Migration
`0045_3_0_0_add_last_heartbeat_at_directly_to_ti` adds the column as
`nullable=True` without a backfill. Any TI that was already RUNNING at upgrade
time has `last_heartbeat_at IS NULL` until something writes to it. The codebase
already acknowledges this exact state inside `adopt_or_reset_orphaned_tasks`:
```python
# scheduler_job_runner.py:2854-2856
# If old ti from Airflow 2 and last_heartbeat_at is None, set
last_heartbeat_at to now
if ti.last_heartbeat_at is None:
ti.last_heartbeat_at = timezone.utcnow()
```
`adopt_or_reset` only runs at scheduler startup / on the scheduler-lock
timer. `_find_task_instances_without_heartbeats` runs on a tighter loop and
currently has no matching fallback — a TI in the migration state is invisible
to the cleanup query and stays RUNNING forever. This PR is the
heartbeat-cleanup-path counterpart to the existing `adopt_or_reset` fallback.
Captured the regression deterministically — reverting the new `or_(...)`
predicate and rerunning the regression test on `main`:
```
FAILED
::test_find_and_purge_task_instances_without_heartbeats_null_last_heartbeat
AssertionError: assert ti.key not in MockExecutor.running
(the cleanup query silently skipped the NULL row; the TI key is still in
executor.running)
```
With the fix in place, the test passes — and the companion
`..._null_last_heartbeat_fresh_start` case pins that a newly-started TI inside
its first timeout window is left alone. Updated the PR body with the full
before/after snippet and the steady-state walkthrough.
</details>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]