This is an automated email from the ASF dual-hosted git repository.
kaxil pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 315d1591644 Fix deadlock in ti_update_state caused by FOR UPDATE
locking dag_run (#67246)
315d1591644 is described below
commit 315d1591644629cca400e723769ba01408b343f6
Author: Arthur <[email protected]>
AuthorDate: Thu May 21 03:13:13 2026 +0200
Fix deadlock in ti_update_state caused by FOR UPDATE locking dag_run
(#67246)
session.get(TI, id, with_for_update=True) emits a SELECT that joins
dag_run (via the lazy="joined" relationship) and applies FOR UPDATE to
both tables. Under concurrent task completions this serialises all
workers on the same dag_run row, producing deadlock cycles with the
scheduler's trigger-rule dependency checks.
Three other callsites in this file already use with_for_update={"of": TI}
for exactly this reason. Apply the same fix to the two remaining callsites
in _create_ti_state_update_query_and_update_state and its error-recovery
path.
---
.../src/airflow/api_fastapi/execution_api/routes/task_instances.py | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
index 153b5a7c09c..577b2200e4c 100644
---
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
+++
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
@@ -430,7 +430,7 @@ def ti_update_state(
"Error updating Task Instance state. Setting the task to failed.",
payload=ti_patch_payload,
)
- ti = session.get(TI, task_instance_id, with_for_update=True)
+ ti = session.get(TI, task_instance_id, with_for_update={"of": TI})
if session.bind is not None:
query = TI.duration_expression_update(timezone.utcnow(), query,
session.bind)
query = query.values(state=(updated_state := TaskInstanceState.FAILED))
@@ -558,7 +558,7 @@ def _create_ti_state_update_query_and_update_state(
dag_id: str,
) -> tuple[Update, TaskInstanceState]:
if isinstance(ti_patch_payload, (TITerminalStatePayload,
TIRetryStatePayload, TISuccessStatePayload)):
- ti = session.get(TI, task_instance_id, with_for_update=True)
+ ti = session.get(TI, task_instance_id, with_for_update={"of": TI})
updated_state = TaskInstanceState(ti_patch_payload.state.value)
if session.bind is not None:
query = TI.duration_expression_update(ti_patch_payload.end_date,
query, session.bind)