1fanwang commented on code in PR #65926:
URL: https://github.com/apache/airflow/pull/65926#discussion_r3227419293


##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -1539,7 +1539,10 @@ def _are_premature_tis(
             flag_upstream_failed=True,
             ignore_in_retry_period=True,
             ignore_in_reschedule_period=True,
-            finished_tis=finished_tis,
+            # Re-query finished TIs instead of relying on the scheduler loop's 
earlier snapshot. Another
+            # process may have changed task states while this loop was 
running, and writing terminal
+            # states such as UPSTREAM_FAILED based on stale upstream state can 
permanently strand tasks.
+            finished_tis=None,
         )

Review Comment:
   The race-condition reasoning here makes sense — 
`DepContext.ensure_finished_tis` will re-query when `finished_tis` is `None`, 
so this avoids writing terminal states from a stale snapshot.
   
   One follow-on question: now that the inner call always passes `None`, the 
`finished_tis: list[TI]` parameter on `_are_premature_tis` itself (line 1534) 
is effectively dead — every caller still passes the scheduler-loop snapshot but 
the function discards it. Worth either dropping the parameter and updating the 
caller in `dagrun.py:~1147` so the API doesn't mislead, or keeping it as 
`list[TI] | None` and documenting that `None` triggers the re-query? The new 
test at `test_stale_finished_tis_do_not_cause_stuck_upstream_failed` happens to 
pass `stale_finished_tis` which now exercises the discard path — a reader of 
the test would probably expect the snapshot to matter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to