1fanwang commented on code in PR #66820:
URL: https://github.com/apache/airflow/pull/66820#discussion_r3232841122


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1776,6 +1776,15 @@ def _do_scheduling(self, session: Session) -> int:
             self._start_queued_dagruns(session)
             guard.commit()
 
+            # Clear DagRun objects loaded by phase 1 from the identity map so
+            # phase 2 reloads them fresh. Otherwise stale rows can be 
re-dirtied
+            # by flush/merge in _schedule_all_dag_runs and committed in a 
row-lock
+            # order that differs from what other scheduler replicas are taking
+            # for their own work, producing A-B / B-A deadlocks on dag_run and
+            # task_instance under HA scheduler deployments. See
+            # https://github.com/apache/airflow/issues/66817.
+            session.expunge_all()

Review Comment:
   Follow-up with the investigation. Leading with the captured probe output — 
that's the load-bearing artefact; the rest is supporting characterisation.
   
   ## Captured probe: identity map + dirty set + SQL emitted
   
   Wrote a temp pytest probe that registers `before_flush` on the session and 
`before_cursor_execute` on the engine, then runs 
`SchedulerJobRunner._do_scheduling()` end-to-end over a single QUEUED → RUNNING 
→ SCHEDULED cycle. Ran it twice: once on the fix branch, once with 
`session.expunge_all()` reverted to `main`.
   
   **Without the fix (`main`):**
   
   ```
   === Identity map at the phase-1 -> phase-2 boundary ===
     ('DagVersion',         UUID('019e208a-aadd-...'))
     ('SerializedDagModel', UUID('019e208a-aae4-...'))
     ('DagCode',            UUID('019e208a-aae3-...'))
     ('DagModel',           'capture_phase2_sql')
     ('DagRun',             1)
     ('TaskInstance',       UUID('019e208a-aaec-...'))
   
   === Dirty / new sets captured at each before_flush in phase 2 ===
     flush[0]: dirty=[('DagRun', 1)] new=[]
   
   === UPDATEs emitted to dag_run / task_instance during _do_scheduling ===
     UPDATE dag_run        SET start_date=?, state=?, updated_at=? WHERE 
dag_run.id = ?
     UPDATE task_instance  SET start_date=?, end_date=?, duration=?, state=?, 
try_number=(task_instance.try_number + ?), updated_at=? WHERE task_instance.id 
IN (?) AND ...
     UPDATE dag_run        SET last_scheduling_decision=?, updated_at=? WHERE 
dag_run.id = ?
   ```
   
   **With the fix:**
   
   ```
   === Identity map at the phase-1 -> phase-2 boundary ===
     (empty)
   
   === Dirty / new sets captured at each before_flush in phase 2 ===
     (none)
   
   === UPDATEs emitted to dag_run / task_instance during _do_scheduling ===
     UPDATE dag_run        SET start_date=?, state=?, updated_at=? WHERE 
dag_run.id = ?
     UPDATE task_instance  SET start_date=?, end_date=?, duration=?, state=?, 
try_number=(task_instance.try_number + ?), updated_at=? WHERE task_instance.id 
IN (?) AND ...
     UPDATE dag_run        SET last_scheduling_decision=?, updated_at=? WHERE 
dag_run.id = ?
   ```
   
   What this proves:
   
   - **Six leaked entries** in the identity map at the phase boundary on 
`main`, including `DagRun(1)` and its `TaskInstance` — exactly the rows phase 2 
is about to write to. With the fix, the identity map is empty entering phase 2.
   - **`flush[0]: dirty=[('DagRun', 1)]`** on `main` — phase 2's flush 
re-dirties the *phase-1-loaded* `DagRun(1)` instance via the 
SELECT-returns-identity-map-entry path, not a fresh phase-2 fetch. With the 
fix, phase 2 fetches fresh and nothing from phase 1 ends up in its flush set.
   - **Same UPDATE bytes** either way in a single-process run, because the 
*symptom* (deadlock) only fires when two replicas hit this opposing-order 
condition simultaneously. The cause is observable in one process; the deadlock 
requires two.
   
   So the cause-effect chain pinned to this captured trace:
   
   > phase-1 entries leak into the identity map → phase-2's flush walks them 
first → row-lock acquisition order across replicas is driven by *phase-1's* 
per-replica load order → two replicas grab the same rows in opposite orders → 
A-B / B-A deadlock on `dag_run` / `task_instance`.
   
   ## Production-shape scheduler log (characteristic, with caveat)
   
   I can't copy production scheduler logs out due to company policy, but the 
deadlock shape we hit lands in this form on MySQL/InnoDB:
   
   ```
   sqlalchemy.exc.OperationalError: (MySQLdb.OperationalError) (1213, 'Deadlock 
found when trying to get lock; try restarting transaction')
   [SQL: UPDATE dag_run SET last_scheduling_decision=%s, updated_at=%s WHERE 
dag_run.id = %s]
   ```
   
   and on Postgres:
   
   ```
   sqlalchemy.exc.OperationalError: (psycopg2.errors.DeadlockDetected) deadlock 
detected
   DETAIL:  Process N waits for ShareLock on transaction M; blocked by process 
P.
           Process P waits for ShareLock on transaction Q; blocked by process N.
   CONTEXT:  while updating tuple (B, T) in relation "dag_run"
   [SQL: UPDATE dag_run SET last_scheduling_decision=%s, updated_at=%s WHERE 
dag_run.id = %s]
   ```
   
   Two markers that identify this as the same bug vs an unrelated deadlock:
   
   - The offending statement is `UPDATE dag_run` (or `UPDATE task_instance`), 
both written by `_schedule_dag_run`'s flush.
   - The two transactions in conflict are different scheduler-job-id 
transactions inside the same heartbeat window.
   
   ## What I'd need to capture a literal 1213 + INNODB STATUS
   
   A two-thread Docker MySQL repro that forces opposing-order row locks on two 
`dag_run` rows. Happy to spin one up if the captured mechanism above isn't 
enough — it's a few minutes of setup, just calling it out as a separate ask 
because the mechanism is fully evidenced by section 1.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to