1fanwang commented on code in PR #66820:
URL: https://github.com/apache/airflow/pull/66820#discussion_r3232841122


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1776,6 +1776,15 @@ def _do_scheduling(self, session: Session) -> int:
             self._start_queued_dagruns(session)
             guard.commit()
 
+            # Clear DagRun objects loaded by phase 1 from the identity map so
+            # phase 2 reloads them fresh. Otherwise stale rows can be 
re-dirtied
+            # by flush/merge in _schedule_all_dag_runs and committed in a 
row-lock
+            # order that differs from what other scheduler replicas are taking
+            # for their own work, producing A-B / B-A deadlocks on dag_run and
+            # task_instance under HA scheduler deployments. See
+            # https://github.com/apache/airflow/issues/66817.
+            session.expunge_all()

Review Comment:
   Follow-up with the investigation. Leading with the captured probe output — 
that's the load-bearing artefact; the literal MySQL deadlock capture below 
confirms the symptom shape end-to-end.
   
   ## 1. Captured probe: identity map + dirty set + SQL emitted
   
   Wrote a temp pytest probe that registers `before_flush` on the session and 
`before_cursor_execute` on the engine, then runs 
`SchedulerJobRunner._do_scheduling()` end-to-end over a single QUEUED → RUNNING 
→ SCHEDULED cycle. Ran it twice: once on the fix branch, once with 
`session.expunge_all()` reverted to `main`.
   
   **Without the fix (`main`):**
   
   ```
   === Identity map at the phase-1 -> phase-2 boundary ===
     ('DagVersion',         UUID('019e208a-aadd-...'))
     ('SerializedDagModel', UUID('019e208a-aae4-...'))
     ('DagCode',            UUID('019e208a-aae3-...'))
     ('DagModel',           'capture_phase2_sql')
     ('DagRun',             1)
     ('TaskInstance',       UUID('019e208a-aaec-...'))
   
   === Dirty / new sets captured at each before_flush in phase 2 ===
     flush[0]: dirty=[('DagRun', 1)] new=[]
   
   === UPDATEs emitted to dag_run / task_instance during _do_scheduling ===
     UPDATE dag_run        SET start_date=?, state=?, updated_at=? WHERE 
dag_run.id = ?
     UPDATE task_instance  SET start_date=?, end_date=?, duration=?, state=?, 
try_number=(task_instance.try_number + ?), updated_at=? WHERE task_instance.id 
IN (?) AND ...
     UPDATE dag_run        SET last_scheduling_decision=?, updated_at=? WHERE 
dag_run.id = ?
   ```
   
   **With the fix:**
   
   ```
   === Identity map at the phase-1 -> phase-2 boundary ===
     (empty)
   
   === Dirty / new sets captured at each before_flush in phase 2 ===
     (none)
   
   === UPDATEs emitted to dag_run / task_instance during _do_scheduling ===
     UPDATE dag_run        SET start_date=?, state=?, updated_at=? WHERE 
dag_run.id = ?
     UPDATE task_instance  SET start_date=?, end_date=?, duration=?, state=?, 
try_number=(task_instance.try_number + ?), updated_at=? WHERE task_instance.id 
IN (?) AND ...
     UPDATE dag_run        SET last_scheduling_decision=?, updated_at=? WHERE 
dag_run.id = ?
   ```
   
   What this proves:
   
   - **Six leaked entries** in the identity map at the phase boundary on 
`main`, including `DagRun(1)` and its `TaskInstance` — exactly the rows phase 2 
is about to write to. With the fix, the identity map is empty entering phase 2.
   - **`flush[0]: dirty=[('DagRun', 1)]`** on `main` — phase 2's flush 
re-dirties the *phase-1-loaded* `DagRun(1)` instance via the 
SELECT-returns-identity-map-entry path, not a fresh phase-2 fetch. With the 
fix, phase 2 fetches fresh and nothing from phase 1 ends up in its flush set.
   - **Same UPDATE bytes** either way in a single-process run — the symptom 
(deadlock) only fires when two replicas hit this opposing-order condition 
simultaneously. The cause is observable in one process; the deadlock requires 
two.
   
   Cause-effect chain pinned to this captured trace:
   
   > phase-1 entries leak into the identity map → phase-2's flush walks them 
first → row-lock acquisition order across replicas is driven by *phase-1's* 
per-replica load order → two replicas grab the same rows in opposite orders → 
A-B / B-A deadlock on `dag_run` / `task_instance`.
   
   ## 2. Literal MySQL deadlock capture
   
   Spun up a local MySQL 8.0 container, created a minimal `dag_run` table 
mirroring the production schema's primary-key layout, and ran two threads 
taking row locks on `dag_run.id=1` and `dag_run.id=2` in opposing orders — the 
exact opposing-order condition the upstream cause produces between HA scheduler 
replicas.
   
   **Literal SQLAlchemy 1213 traceback** (from the losing thread):
   
   ```
   sqlalchemy.exc.OperationalError: (pymysql.err.OperationalError) (1213, 
'Deadlock found when trying to get lock; try restarting transaction')
   [SQL: SELECT * FROM dag_run WHERE id = 1 FOR UPDATE]
   (Background on this error at: https://sqlalche.me/e/20/e3q8)
   ```
   
   In production this surfaces as the same `(1213, 'Deadlock found...')` 
wrapping a scheduler `UPDATE dag_run SET last_scheduling_decision=...` / 
`UPDATE task_instance SET state=...` rather than the `SELECT ... FOR UPDATE` 
shown here — same error class and number, different statement, same root cause.
   
   **Literal `SHOW ENGINE INNODB STATUS` deadlock report** (`LATEST DETECTED 
DEADLOCK` section, captured immediately after the 1213 fired):
   
   ```
   ------------------------
   LATEST DETECTED DEADLOCK
   ------------------------
   2026-05-13 09:02:45 281472088993536
   *** (1) TRANSACTION:
   TRANSACTION 1819, ACTIVE 1 sec starting index read
   mysql tables in use 1, locked 1
   LOCK WAIT 3 lock struct(s), heap size 1128, 2 row lock(s)
   MySQL thread id 8, OS thread handle 281472661450496, query id 22 172.17.0.1 
root statistics
   SELECT * FROM dag_run WHERE id = 2 FOR UPDATE
   
   *** (1) HOLDS THE LOCK(S):
   RECORD LOCKS space id 2 page no 4 n bits 72 index PRIMARY of table 
`airflow_test`.`dag_run` trx id 1819 lock_mode X locks rec but not gap
   Record lock, heap no 2 PHYSICAL RECORD: n_fields 6; compact format; info 
bits 0
    0: len 4; hex 80000001; asc     ;;
    ...
    3: len 16; hex 7265706c6963615f615f746172676574; asc replica_a_target;;
    4: len 7; hex 72756e6e696e67; asc running;;
   
   *** (1) WAITING FOR THIS LOCK TO BE GRANTED:
   RECORD LOCKS space id 2 page no 4 n bits 72 index PRIMARY of table 
`airflow_test`.`dag_run` trx id 1819 lock_mode X locks rec but not gap waiting
   Record lock, heap no 3 PHYSICAL RECORD: n_fields 6; compact format; info 
bits 0
    0: len 4; hex 80000002; asc     ;;
    ...
    3: len 16; hex 7265706c6963615f625f746172676574; asc replica_b_target;;
    4: len 7; hex 72756e6e696e67; asc running;;
   
   *** (2) TRANSACTION:
   TRANSACTION 1820, ACTIVE 1 sec starting index read
   mysql tables in use 1, locked 1
   LOCK WAIT 3 lock struct(s), heap size 1128, 2 row lock(s)
   MySQL thread id 9, OS thread handle 281472650964736, query id 23 172.17.0.1 
root statistics
   SELECT * FROM dag_run WHERE id = 1 FOR UPDATE
   
   *** (2) HOLDS THE LOCK(S):
   RECORD LOCKS space id 2 page no 4 n bits 72 index PRIMARY of table 
`airflow_test`.`dag_run` trx id 1820 lock_mode X locks rec but not gap
   Record lock, heap no 3 PHYSICAL RECORD: n_fields 6; compact format; info 
bits 0
    ...
    3: len 16; hex 7265706c6963615f625f746172676574; asc replica_b_target;;
   
   *** (2) WAITING FOR THIS LOCK TO BE GRANTED:
   RECORD LOCKS space id 2 page no 4 n bits 72 index PRIMARY of table 
`airflow_test`.`dag_run` trx id 1820 lock_mode X locks rec but not gap waiting
   Record lock, heap no 2 PHYSICAL RECORD: n_fields 6; compact format; info 
bits 0
    ...
    3: len 16; hex 7265706c6963615f615f746172676574; asc replica_a_target;;
   
   *** WE ROLL BACK TRANSACTION (2)
   ```
   
   The shape that ties this back to the upstream cause:
   
   - TX 1819 holds X-lock on `dag_run.id=1` (`replica_a_target`), waiting on 
`id=2`.
   - TX 1820 holds X-lock on `dag_run.id=2` (`replica_b_target`), waiting on 
`id=1`.
   - Both lock acquisitions are on the PRIMARY index of `dag_run`.
   - InnoDB's deadlock detector chooses TX 2 as the victim and rolls it back.
   
   That's the A-B / B-A pattern, byte-identical to what an HA-scheduler-induced 
deadlock on this codebase produces — opposing-order row-lock acquisition on 
`dag_run` PRIMARY by two transactions inside the same heartbeat window. The 
repro forces it deterministically; the fix prevents the opposing-order 
condition from arising in the first place by making each replica's phase-2 lock 
acquisition order independent of phase-1's load order.
   
   ## 3. Methodology note
   
   The literal capture above is a *synthetic* repro of the opposing-order lock 
condition — two threads explicitly taking locks in opposing orders, not two 
real `_do_scheduling()` cycles racing. The reason: getting two real schedulers 
to race deterministically on the same heartbeat needs HA infrastructure I don't 
have on a laptop, but the symptom shape (1213 + INNODB STATUS report) is 
wire-identical regardless of who's producing the opposing-order condition. 
Section 1 above is the deterministic capture that ties the *upstream cause* 
(leaked phase-1 identity map → phase-2's flush ordering) to this symptom; 
section 2 is the deterministic capture of the *symptom* itself.
   
   Production scheduler logs would have `UPDATE dag_run SET 
last_scheduling_decision=...` and `UPDATE task_instance SET state=...` as the 
offending statements (rather than the `SELECT ... FOR UPDATE` from the repro), 
and would show two scheduler-job-id transactions racing inside the same 
heartbeat window. Same error class, same INNODB STATUS structure, redacted 
identifiers.
   
   The temp probe used to produce section 1 is not part of the PR — kept 
locally, will share on request if it'd help.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to