1fanwang commented on code in PR #66820:
URL: https://github.com/apache/airflow/pull/66820#discussion_r3232841122
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1776,6 +1776,15 @@ def _do_scheduling(self, session: Session) -> int:
self._start_queued_dagruns(session)
guard.commit()
+ # Clear DagRun objects loaded by phase 1 from the identity map so
+ # phase 2 reloads them fresh. Otherwise stale rows can be
re-dirtied
+ # by flush/merge in _schedule_all_dag_runs and committed in a
row-lock
+ # order that differs from what other scheduler replicas are taking
+ # for their own work, producing A-B / B-A deadlocks on dag_run and
+ # task_instance under HA scheduler deployments. See
+ # https://github.com/apache/airflow/issues/66817.
+ session.expunge_all()
Review Comment:
Follow-up with the investigation. The captured probe output below is the
load-bearing piece; the Docker MySQL repro after it confirms the symptom shape
end-to-end.
Wrote a temp pytest probe that registers `before_flush` on the session and
`before_cursor_execute` on the engine, then runs
`SchedulerJobRunner._do_scheduling()` over a single QUEUED → RUNNING →
SCHEDULED cycle. Ran it twice: once on the fix branch, once with
`session.expunge_all()` reverted to current `main`.
On `main`, without the fix:
```
=== Identity map at the phase-1 -> phase-2 boundary ===
('DagVersion', UUID('019e208a-aadd-...'))
('SerializedDagModel', UUID('019e208a-aae4-...'))
('DagCode', UUID('019e208a-aae3-...'))
('DagModel', 'capture_phase2_sql')
('DagRun', 1)
('TaskInstance', UUID('019e208a-aaec-...'))
=== Dirty / new sets captured at each before_flush in phase 2 ===
flush[0]: dirty=[('DagRun', 1)] new=[]
=== UPDATEs emitted to dag_run / task_instance during _do_scheduling ===
UPDATE dag_run SET start_date=?, state=?, updated_at=? WHERE
dag_run.id = ?
UPDATE task_instance SET start_date=?, end_date=?, duration=?, state=?,
try_number=(task_instance.try_number + ?), updated_at=? WHERE task_instance.id
IN (?) AND ...
UPDATE dag_run SET last_scheduling_decision=?, updated_at=? WHERE
dag_run.id = ?
```
With the fix in place:
```
=== Identity map at the phase-1 -> phase-2 boundary ===
(empty)
=== Dirty / new sets captured at each before_flush in phase 2 ===
(none)
=== UPDATEs emitted to dag_run / task_instance during _do_scheduling ===
UPDATE dag_run SET start_date=?, state=?, updated_at=? WHERE
dag_run.id = ?
UPDATE task_instance SET start_date=?, end_date=?, duration=?, state=?,
try_number=(task_instance.try_number + ?), updated_at=? WHERE task_instance.id
IN (?) AND ...
UPDATE dag_run SET last_scheduling_decision=?, updated_at=? WHERE
dag_run.id = ?
```
Six leaked entries in the identity map at the phase boundary on `main`,
including `DagRun(1)` and its `TaskInstance` — exactly the rows phase 2 is
about to write to. With the fix the identity map is empty entering phase 2. The
`flush[0]` dirty set on `main` contains `DagRun(1)` — that's the phase-1-loaded
instance, not a fresh phase-2 fetch, so phase 2's flush is re-dirtying it via
the SELECT-returns-identity-map-entry path. Same UPDATE bytes either way in a
single-process run, because the deadlock symptom only fires when two replicas
hit this opposing-order condition simultaneously. The cause is observable in
one process; the deadlock requires two. The cause-effect chain: phase-1 entries
leak into the identity map, phase-2's flush walks them first, row-lock
acquisition order across replicas is driven by phase-1's per-replica load
order, two replicas grab the same rows in opposite orders, A-B / B-A deadlock
on `dag_run` / `task_instance`.
For the symptom side, spun up a local MySQL 8.0 container, created a minimal
`dag_run` table mirroring the production schema's primary-key layout, and ran
two threads taking row locks on `dag_run.id=1` and `dag_run.id=2` in opposing
orders — the exact opposing-order condition the upstream cause produces between
HA scheduler replicas. Literal SQLAlchemy 1213 traceback from the losing thread:
```
sqlalchemy.exc.OperationalError: (pymysql.err.OperationalError) (1213,
'Deadlock found when trying to get lock; try restarting transaction')
[SQL: SELECT * FROM dag_run WHERE id = 1 FOR UPDATE]
(Background on this error at: https://sqlalche.me/e/20/e3q8)
```
In production this surfaces as the same `(1213, 'Deadlock found...')`
wrapping a scheduler `UPDATE dag_run SET last_scheduling_decision=...` /
`UPDATE task_instance SET state=...` rather than the `SELECT ... FOR UPDATE`
shown here — same error class and number, different statement, same root cause.
Literal `SHOW ENGINE INNODB STATUS` deadlock section, captured immediately
after the 1213 fired:
```
------------------------
LATEST DETECTED DEADLOCK
------------------------
2026-05-13 09:02:45 281472088993536
*** (1) TRANSACTION:
TRANSACTION 1819, ACTIVE 1 sec starting index read
mysql tables in use 1, locked 1
LOCK WAIT 3 lock struct(s), heap size 1128, 2 row lock(s)
MySQL thread id 8, OS thread handle 281472661450496, query id 22 172.17.0.1
root statistics
SELECT * FROM dag_run WHERE id = 2 FOR UPDATE
*** (1) HOLDS THE LOCK(S):
RECORD LOCKS space id 2 page no 4 n bits 72 index PRIMARY of table
`airflow_test`.`dag_run` trx id 1819 lock_mode X locks rec but not gap
Record lock, heap no 2 PHYSICAL RECORD: n_fields 6; compact format; info
bits 0
0: len 4; hex 80000001; asc ;;
...
3: len 16; hex 7265706c6963615f615f746172676574; asc replica_a_target;;
4: len 7; hex 72756e6e696e67; asc running;;
*** (1) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 2 page no 4 n bits 72 index PRIMARY of table
`airflow_test`.`dag_run` trx id 1819 lock_mode X locks rec but not gap waiting
Record lock, heap no 3 PHYSICAL RECORD: n_fields 6; compact format; info
bits 0
0: len 4; hex 80000002; asc ;;
...
3: len 16; hex 7265706c6963615f625f746172676574; asc replica_b_target;;
4: len 7; hex 72756e6e696e67; asc running;;
*** (2) TRANSACTION:
TRANSACTION 1820, ACTIVE 1 sec starting index read
mysql tables in use 1, locked 1
LOCK WAIT 3 lock struct(s), heap size 1128, 2 row lock(s)
MySQL thread id 9, OS thread handle 281472650964736, query id 23 172.17.0.1
root statistics
SELECT * FROM dag_run WHERE id = 1 FOR UPDATE
*** (2) HOLDS THE LOCK(S):
RECORD LOCKS space id 2 page no 4 n bits 72 index PRIMARY of table
`airflow_test`.`dag_run` trx id 1820 lock_mode X locks rec but not gap
Record lock, heap no 3 PHYSICAL RECORD: n_fields 6; compact format; info
bits 0
...
3: len 16; hex 7265706c6963615f625f746172676574; asc replica_b_target;;
*** (2) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 2 page no 4 n bits 72 index PRIMARY of table
`airflow_test`.`dag_run` trx id 1820 lock_mode X locks rec but not gap waiting
Record lock, heap no 2 PHYSICAL RECORD: n_fields 6; compact format; info
bits 0
...
3: len 16; hex 7265706c6963615f615f746172676574; asc replica_a_target;;
*** WE ROLL BACK TRANSACTION (2)
```
TX 1819 holds X-lock on `dag_run.id=1` (`replica_a_target`), waiting on
`id=2`. TX 1820 holds X-lock on `dag_run.id=2` (`replica_b_target`), waiting on
`id=1`. Both acquisitions are on the PRIMARY index of `dag_run`, both
transactions are inside the same window, and InnoDB picks TX 2 as the victim.
That's the A-B / B-A pattern, byte-identical to what an HA-scheduler-induced
deadlock on this codebase produces — opposing-order row-lock acquisition on
`dag_run` PRIMARY by two transactions inside the same heartbeat window.
One caveat on the MySQL repro: it's a synthetic two-thread version of the
opposing-order condition, not two real `_do_scheduling()` cycles racing.
Getting two real schedulers to race deterministically on the same heartbeat
needs HA infrastructure I don't have on a laptop, but the symptom shape (1213 +
INNODB STATUS) is wire-identical regardless of who's producing the
opposing-order condition. The probe above is the deterministic capture that
ties the upstream cause (leaked phase-1 identity map → phase-2's flush
ordering) to this symptom; the MySQL piece is the deterministic capture of the
symptom itself. Production scheduler logs would have the same INNODB STATUS
structure with `UPDATE dag_run SET last_scheduling_decision=...` / `UPDATE
task_instance SET state=...` as the offending statements, and two
scheduler-job-id transactions racing inside the same heartbeat window — same
error class, redacted identifiers. The temp probe and the deadlock repro script
are kept locally; wil
l share on request.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]