1fanwang commented on code in PR #66820:
URL: https://github.com/apache/airflow/pull/66820#discussion_r3232841122
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1776,6 +1776,15 @@ def _do_scheduling(self, session: Session) -> int:
self._start_queued_dagruns(session)
guard.commit()
+ # Clear DagRun objects loaded by phase 1 from the identity map so
+ # phase 2 reloads them fresh. Otherwise stale rows can be
re-dirtied
+ # by flush/merge in _schedule_all_dag_runs and committed in a
row-lock
+ # order that differs from what other scheduler replicas are taking
+ # for their own work, producing A-B / B-A deadlocks on dag_run and
+ # task_instance under HA scheduler deployments. See
+ # https://github.com/apache/airflow/issues/66817.
+ session.expunge_all()
Review Comment:
Follow-up with the investigation. Leading with the captured probe output —
that's the load-bearing artefact; the literal MySQL deadlock capture below
confirms the symptom shape end-to-end.
## 1. Captured probe: identity map + dirty set + SQL emitted
Wrote a temp pytest probe that registers `before_flush` on the session and
`before_cursor_execute` on the engine, then runs
`SchedulerJobRunner._do_scheduling()` end-to-end over a single QUEUED → RUNNING
→ SCHEDULED cycle. Ran it twice: once on the fix branch, once with
`session.expunge_all()` reverted to `main`.
**Without the fix (`main`):**
```
=== Identity map at the phase-1 -> phase-2 boundary ===
('DagVersion', UUID('019e208a-aadd-...'))
('SerializedDagModel', UUID('019e208a-aae4-...'))
('DagCode', UUID('019e208a-aae3-...'))
('DagModel', 'capture_phase2_sql')
('DagRun', 1)
('TaskInstance', UUID('019e208a-aaec-...'))
=== Dirty / new sets captured at each before_flush in phase 2 ===
flush[0]: dirty=[('DagRun', 1)] new=[]
=== UPDATEs emitted to dag_run / task_instance during _do_scheduling ===
UPDATE dag_run SET start_date=?, state=?, updated_at=? WHERE
dag_run.id = ?
UPDATE task_instance SET start_date=?, end_date=?, duration=?, state=?,
try_number=(task_instance.try_number + ?), updated_at=? WHERE task_instance.id
IN (?) AND ...
UPDATE dag_run SET last_scheduling_decision=?, updated_at=? WHERE
dag_run.id = ?
```
**With the fix:**
```
=== Identity map at the phase-1 -> phase-2 boundary ===
(empty)
=== Dirty / new sets captured at each before_flush in phase 2 ===
(none)
=== UPDATEs emitted to dag_run / task_instance during _do_scheduling ===
UPDATE dag_run SET start_date=?, state=?, updated_at=? WHERE
dag_run.id = ?
UPDATE task_instance SET start_date=?, end_date=?, duration=?, state=?,
try_number=(task_instance.try_number + ?), updated_at=? WHERE task_instance.id
IN (?) AND ...
UPDATE dag_run SET last_scheduling_decision=?, updated_at=? WHERE
dag_run.id = ?
```
What this proves:
- **Six leaked entries** in the identity map at the phase boundary on
`main`, including `DagRun(1)` and its `TaskInstance` — exactly the rows phase 2
is about to write to. With the fix, the identity map is empty entering phase 2.
- **`flush[0]: dirty=[('DagRun', 1)]`** on `main` — phase 2's flush
re-dirties the *phase-1-loaded* `DagRun(1)` instance via the
SELECT-returns-identity-map-entry path, not a fresh phase-2 fetch. With the
fix, phase 2 fetches fresh and nothing from phase 1 ends up in its flush set.
- **Same UPDATE bytes** either way in a single-process run — the symptom
(deadlock) only fires when two replicas hit this opposing-order condition
simultaneously. The cause is observable in one process; the deadlock requires
two.
Cause-effect chain pinned to this captured trace:
> phase-1 entries leak into the identity map → phase-2's flush walks them
first → row-lock acquisition order across replicas is driven by *phase-1's*
per-replica load order → two replicas grab the same rows in opposite orders →
A-B / B-A deadlock on `dag_run` / `task_instance`.
## 2. Literal MySQL deadlock capture
Spun up a local MySQL 8.0 container, created a minimal `dag_run` table
mirroring the production schema's primary-key layout, and ran two threads
taking row locks on `dag_run.id=1` and `dag_run.id=2` in opposing orders — the
exact opposing-order condition the upstream cause produces between HA scheduler
replicas.
**Literal SQLAlchemy 1213 traceback** (from the losing thread):
```
sqlalchemy.exc.OperationalError: (pymysql.err.OperationalError) (1213,
'Deadlock found when trying to get lock; try restarting transaction')
[SQL: SELECT * FROM dag_run WHERE id = 1 FOR UPDATE]
(Background on this error at: https://sqlalche.me/e/20/e3q8)
```
In production this surfaces as the same `(1213, 'Deadlock found...')`
wrapping a scheduler `UPDATE dag_run SET last_scheduling_decision=...` /
`UPDATE task_instance SET state=...` rather than the `SELECT ... FOR UPDATE`
shown here — same error class and number, different statement, same root cause.
**Literal `SHOW ENGINE INNODB STATUS` deadlock report** (`LATEST DETECTED
DEADLOCK` section, captured immediately after the 1213 fired):
```
------------------------
LATEST DETECTED DEADLOCK
------------------------
2026-05-13 09:02:45 281472088993536
*** (1) TRANSACTION:
TRANSACTION 1819, ACTIVE 1 sec starting index read
mysql tables in use 1, locked 1
LOCK WAIT 3 lock struct(s), heap size 1128, 2 row lock(s)
MySQL thread id 8, OS thread handle 281472661450496, query id 22 172.17.0.1
root statistics
SELECT * FROM dag_run WHERE id = 2 FOR UPDATE
*** (1) HOLDS THE LOCK(S):
RECORD LOCKS space id 2 page no 4 n bits 72 index PRIMARY of table
`airflow_test`.`dag_run` trx id 1819 lock_mode X locks rec but not gap
Record lock, heap no 2 PHYSICAL RECORD: n_fields 6; compact format; info
bits 0
0: len 4; hex 80000001; asc ;;
...
3: len 16; hex 7265706c6963615f615f746172676574; asc replica_a_target;;
4: len 7; hex 72756e6e696e67; asc running;;
*** (1) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 2 page no 4 n bits 72 index PRIMARY of table
`airflow_test`.`dag_run` trx id 1819 lock_mode X locks rec but not gap waiting
Record lock, heap no 3 PHYSICAL RECORD: n_fields 6; compact format; info
bits 0
0: len 4; hex 80000002; asc ;;
...
3: len 16; hex 7265706c6963615f625f746172676574; asc replica_b_target;;
4: len 7; hex 72756e6e696e67; asc running;;
*** (2) TRANSACTION:
TRANSACTION 1820, ACTIVE 1 sec starting index read
mysql tables in use 1, locked 1
LOCK WAIT 3 lock struct(s), heap size 1128, 2 row lock(s)
MySQL thread id 9, OS thread handle 281472650964736, query id 23 172.17.0.1
root statistics
SELECT * FROM dag_run WHERE id = 1 FOR UPDATE
*** (2) HOLDS THE LOCK(S):
RECORD LOCKS space id 2 page no 4 n bits 72 index PRIMARY of table
`airflow_test`.`dag_run` trx id 1820 lock_mode X locks rec but not gap
Record lock, heap no 3 PHYSICAL RECORD: n_fields 6; compact format; info
bits 0
...
3: len 16; hex 7265706c6963615f625f746172676574; asc replica_b_target;;
*** (2) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 2 page no 4 n bits 72 index PRIMARY of table
`airflow_test`.`dag_run` trx id 1820 lock_mode X locks rec but not gap waiting
Record lock, heap no 2 PHYSICAL RECORD: n_fields 6; compact format; info
bits 0
...
3: len 16; hex 7265706c6963615f615f746172676574; asc replica_a_target;;
*** WE ROLL BACK TRANSACTION (2)
```
The shape that ties this back to the upstream cause:
- TX 1819 holds X-lock on `dag_run.id=1` (`replica_a_target`), waiting on
`id=2`.
- TX 1820 holds X-lock on `dag_run.id=2` (`replica_b_target`), waiting on
`id=1`.
- Both lock acquisitions are on the PRIMARY index of `dag_run`.
- InnoDB's deadlock detector chooses TX 2 as the victim and rolls it back.
That's the A-B / B-A pattern, byte-identical to what an HA-scheduler-induced
deadlock on this codebase produces — opposing-order row-lock acquisition on
`dag_run` PRIMARY by two transactions inside the same heartbeat window. The
repro forces it deterministically; the fix prevents the opposing-order
condition from arising in the first place by making each replica's phase-2 lock
acquisition order independent of phase-1's load order.
## 3. Methodology note
The literal capture above is a *synthetic* repro of the opposing-order lock
condition — two threads explicitly taking locks in opposing orders, not two
real `_do_scheduling()` cycles racing. The reason: getting two real schedulers
to race deterministically on the same heartbeat needs HA infrastructure I don't
have on a laptop, but the symptom shape (1213 + INNODB STATUS report) is
wire-identical regardless of who's producing the opposing-order condition.
Section 1 above is the deterministic capture that ties the *upstream cause*
(leaked phase-1 identity map → phase-2's flush ordering) to this symptom;
section 2 is the deterministic capture of the *symptom* itself.
Production scheduler logs would have `UPDATE dag_run SET
last_scheduling_decision=...` and `UPDATE task_instance SET state=...` as the
offending statements (rather than the `SELECT ... FOR UPDATE` from the repro),
and would show two scheduler-job-id transactions racing inside the same
heartbeat window. Same error class, same INNODB STATUS structure, redacted
identifiers.
The temp probe used to produce section 1 is not part of the PR — kept
locally, will share on request if it'd help.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]