capytan opened a new issue, #66734:
URL: https://github.com/apache/airflow/issues/66734

   ### Under which category would you file this issue?
   
   Airflow Core
   
   ### Apache Airflow version
   
   3.2.1
   
   ### What happened and how to reproduce it?
   
   After upgrading 3.1.8 → 3.2.1 we started seeing PostgreSQL deadlocks on 
task_instance: 0 in the 30 days before the upgrade, 2 in the next 7 hours 
after. The pattern is the same cross-index row-lock contention reported in 
#65818, but between a different pair of writers, and #65920 / #65836 do not 
touch it. It occurs on a setup with a single scheduler, single triggerer, and 
single api-server colocated in one ECS task, so HA topology is not required.
   
   Setup is Airflow 3.1.8 (no issue) → 3.2.1 (issue starts), Aurora PostgreSQL 
15.10, CeleryExecutor. The workload includes one DAG scheduled every minute (`* 
* * * *`); both observed deadlocks involved that DAG, where new dag_runs are 
constantly being created while task instances of older runs are still 
completing.
   
   The regression is in `_verify_integrity_if_dag_changed` 
(`airflow/jobs/scheduler_job_runner.py`). In 3.1.8 it iterated 
`dag_run.task_instances` in Python and assigned `ti.dag_version = 
latest_dag_version` for unfinished TIs, so SQLAlchemy emitted per-row UPDATEs 
that locked by PK. In 3.2.1 the same logic became a single bulk UPDATE that 
filters by `state IN (State.unfinished)` and reaches rows by (dag_id, run_id).
   
   ```python
   session.execute(
       update(TI)
       .where(
           TI.dag_id == dag_run.dag_id,
           TI.run_id == dag_run.run_id,
           TI.state.in_(State.unfinished),
       )
       .values(dag_version_id=latest_dag_version.id),
       execution_options={"synchronize_session": False},
   )
   ```
   
   The api-server side is `ti_update_state` in 
`airflow/api_fastapi/execution_api/routes/task_instances.py`. It takes SELECT 
FOR UPDATE on the target row by primary key twice: once at the top of the 
function (a joined select with `.with_for_update(of=TI)`), and again inside 
`_create_ti_state_update_query_and_update_state` via `session.get(TI, 
task_instance_id, with_for_update=True)`. So the api-server locks its target 
row by the PK index while the scheduler bulk UPDATE reaches its rows via the 
`ti_dag_run` (dag_id, run_id) index rather than the PK. When their row sets 
overlap, the two paths acquire row locks in different orders and PG detects an 
A→B / B→A deadlock.
   
   Here is the api-server traceback. The deadlock surfaces on the second SELECT 
FOR UPDATE. We did not have pg_locks enabled at the time, so we cannot say with 
certainty which statement PG picked as the victim.
   
   ```
   Traceback (most recent call last):
     File ".../airflow/api_fastapi/execution_api/routes/task_instances.py", 
line 390, in ti_update_state
     File ".../airflow/api_fastapi/execution_api/routes/task_instances.py", 
line 507, in _create_ti_state_update_query_and_update_state
     File ".../sqlalchemy/orm/session.py", line 3680, in get
     ...
   sqlalchemy.exc.OperationalError: (psycopg2.errors.DeadlockDetected) deadlock 
detected
   [parameters: {'pk_1': UUID('...')}]
   ```
   
   PG monitoring shows Lock:transactionid waits during these events, and the 
top SQL in those windows matches the 3.2.1 bulk UPDATE: `UPDATE task_instance 
SET updated_at=?, dag_version_id=? WHERE dag_id=? AND run_id=? AND state IN (?, 
?, ?, ?, ?, ?, ?, ?)` (8 placeholders = State.unfinished; `updated_at` is added 
by SQLAlchemy's `onupdate` for the column, the source only sets 
`dag_version_id`).
   
   The trigger condition for the bulk UPDATE is `not dag_run.bundle_version and 
not check_version_id_exists_in_dr(latest_dag_version.id)`, so this path runs 
for any dag_run that lacks a bundle_version and has not yet been tagged with 
the latest dag_version_id. It covers both the upgrade-time wave and ongoing 
operation.
   
   There is no user-visible impact today. The Task SDK retries PATCH /state in 
a fresh session about one second later and the task is recorded as success. But 
the api-server logs `ERROR - Error updating Task Instance state. Setting the 
task to failed.` along with a 500 traceback, which is misleading. The error 
handler in `ti_update_state` looks like this.
   
   ```python
   except Exception:
       log.exception("Error updating Task Instance state. Setting the task to 
failed.", ...)
       ti = session.get(TI, task_instance_id, with_for_update=True)  # SELECT 
FOR UPDATE on the aborted session
       ...
       query = query.values(state=(updated_state := TaskInstanceState.FAILED))
   # outer try:
   try:
       result = session.execute(query)  # also fails on the aborted session
   ```
   
   By the time this runs the session is already in `InFailedSqlTransaction`, so 
the recovery `session.get(...)` fails first, the follow-up 
`session.execute(query)` fails too, and the `state=failed` write never reaches 
the database. Operators see "failed" in the logs and a 500 response for a task 
that did not in fact fail. The Task SDK retry path is what currently keeps this 
from becoming real data corruption, but if that path ever stops compensating, 
the same bug starts recording successful tasks as failed.
   
   ### What you think should happen instead?
   
   `_verify_integrity_if_dag_changed` should adopt the same locking discipline 
as #65920 / #65836: SELECT id ORDER BY id LIMIT N FOR UPDATE SKIP LOCKED, then 
UPDATE WHERE id IN (...), in bounded batches, so both writers lock in PK order.
   
   The error handler in `ti_update_state` is harder to defend. Treating 
OperationalError with PG SQLSTATE 40P01 / 40001 (or MySQL 1213) as retryable 
and returning an explicit retryable response with Retry-After (e.g., 503 
Service Unavailable) would make the Task SDK retry a real contract instead of 
relying on the 500 response being retried. If a non-retryable failure write is 
still wanted as a fallback, it needs to run in a fresh session, since the one 
the handler inherits is already aborted.
   
   ### Operating System
   
   Debian 12 (apache/airflow:slim-3.2.1-python3.12 image)
   
   ### Deployment
   
   Other Docker-based deployment
   
   ### Apache Airflow Provider(s)
   
   _No response_
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Official Helm Chart version
   
   Not Applicable
   
   ### Kubernetes Version
   
   _No response_
   
   ### Helm Chart configuration
   
   _No response_
   
   ### Docker Image customizations
   
   _No response_
   
   ### Anything else?
   
   Related: #65818, #65920, #65836.
   
   The traceback and SQL evidence above are from a running Airflow 3.2.1 
deployment on Amazon ECS. Let us know if more diagnostic data would help with 
investigation.
   
   Drafted with AI assistance (Claude Opus 4.7 with 1M context); content 
verified against Airflow 3.1.8 and 3.2.1 sources before submission.
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to