This is an automated email from the ASF dual-hosted git repository.
vatsrahul1001 pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-2-test by this push:
new ad0b99b2cb1 [v3-2-test] Fix deadlock in ti_update_state caused by FOR
UPDATE locking dag_run (#67246) (#67264)
ad0b99b2cb1 is described below
commit ad0b99b2cb16ecbcd421e4031da8b27d9901c2bf
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu May 21 08:14:00 2026 +0530
[v3-2-test] Fix deadlock in ti_update_state caused by FOR UPDATE locking
dag_run (#67246) (#67264)
session.get(TI, id, with_for_update=True) emits a SELECT that joins
dag_run (via the lazy="joined" relationship) and applies FOR UPDATE to
both tables. Under concurrent task completions this serialises all
workers on the same dag_run row, producing deadlock cycles with the
scheduler's trigger-rule dependency checks.
Three other callsites in this file already use with_for_update={"of": TI}
for exactly this reason. Apply the same fix to the two remaining callsites
in _create_ti_state_update_query_and_update_state and its error-recovery
path.
(cherry picked from commit 315d1591644629cca400e723769ba01408b343f6)
Co-authored-by: Arthur <[email protected]>
---
.../src/airflow/api_fastapi/execution_api/routes/task_instances.py | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
index 15feb13ac3d..44a2674efb8 100644
---
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
+++
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
@@ -418,7 +418,7 @@ def ti_update_state(
"Error updating Task Instance state. Setting the task to failed.",
payload=ti_patch_payload,
)
- ti = session.get(TI, task_instance_id, with_for_update=True)
+ ti = session.get(TI, task_instance_id, with_for_update={"of": TI})
if session.bind is not None:
query = TI.duration_expression_update(timezone.utcnow(), query,
session.bind)
query = query.values(state=(updated_state := TaskInstanceState.FAILED))
@@ -521,7 +521,7 @@ def _create_ti_state_update_query_and_update_state(
dag_id: str,
) -> tuple[Update, TaskInstanceState]:
if isinstance(ti_patch_payload, (TITerminalStatePayload,
TIRetryStatePayload, TISuccessStatePayload)):
- ti = session.get(TI, task_instance_id, with_for_update=True)
+ ti = session.get(TI, task_instance_id, with_for_update={"of": TI})
updated_state = TaskInstanceState(ti_patch_payload.state.value)
if session.bind is not None:
query = TI.duration_expression_update(ti_patch_payload.end_date,
query, session.bind)