capytan opened a new issue, #66734:
URL: https://github.com/apache/airflow/issues/66734
### Under which category would you file this issue?
Airflow Core
### Apache Airflow version
3.2.1
### What happened and how to reproduce it?
After upgrading 3.1.8 → 3.2.1 we started seeing PostgreSQL deadlocks on
task_instance: 0 in the 30 days before the upgrade, 2 in the next 7 hours
after. The pattern is the same cross-index row-lock contention reported in
#65818, but between a different pair of writers, and #65920 / #65836 do not
touch it. It occurs on a setup with a single scheduler, single triggerer, and
single api-server colocated in one ECS task, so HA topology is not required.
Setup is Airflow 3.1.8 (no issue) → 3.2.1 (issue starts), Aurora PostgreSQL
15.10, CeleryExecutor. The workload includes one DAG scheduled every minute (`*
* * * *`); both observed deadlocks involved that DAG, where new dag_runs are
constantly being created while task instances of older runs are still
completing.
The regression is in `_verify_integrity_if_dag_changed`
(`airflow/jobs/scheduler_job_runner.py`). In 3.1.8 it iterated
`dag_run.task_instances` in Python and assigned `ti.dag_version =
latest_dag_version` for unfinished TIs, so SQLAlchemy emitted per-row UPDATEs
that locked by PK. In 3.2.1 the same logic became a single bulk UPDATE that
filters by `state IN (State.unfinished)` and reaches rows by (dag_id, run_id).
```python
session.execute(
update(TI)
.where(
TI.dag_id == dag_run.dag_id,
TI.run_id == dag_run.run_id,
TI.state.in_(State.unfinished),
)
.values(dag_version_id=latest_dag_version.id),
execution_options={"synchronize_session": False},
)
```
The api-server side is `ti_update_state` in
`airflow/api_fastapi/execution_api/routes/task_instances.py`. It takes SELECT
FOR UPDATE on the target row by primary key twice: once at the top of the
function (a joined select with `.with_for_update(of=TI)`), and again inside
`_create_ti_state_update_query_and_update_state` via `session.get(TI,
task_instance_id, with_for_update=True)`. So the api-server locks its target
row by the PK index while the scheduler bulk UPDATE reaches its rows via the
`ti_dag_run` (dag_id, run_id) index rather than the PK. When their row sets
overlap, the two paths acquire row locks in different orders and PG detects an
A→B / B→A deadlock.
Here is the api-server traceback. The deadlock surfaces on the second SELECT
FOR UPDATE. We did not have pg_locks enabled at the time, so we cannot say with
certainty which statement PG picked as the victim.
```
Traceback (most recent call last):
File ".../airflow/api_fastapi/execution_api/routes/task_instances.py",
line 390, in ti_update_state
File ".../airflow/api_fastapi/execution_api/routes/task_instances.py",
line 507, in _create_ti_state_update_query_and_update_state
File ".../sqlalchemy/orm/session.py", line 3680, in get
...
sqlalchemy.exc.OperationalError: (psycopg2.errors.DeadlockDetected) deadlock
detected
[parameters: {'pk_1': UUID('...')}]
```
PG monitoring shows Lock:transactionid waits during these events, and the
top SQL in those windows matches the 3.2.1 bulk UPDATE: `UPDATE task_instance
SET updated_at=?, dag_version_id=? WHERE dag_id=? AND run_id=? AND state IN (?,
?, ?, ?, ?, ?, ?, ?)` (8 placeholders = State.unfinished; `updated_at` is added
by SQLAlchemy's `onupdate` for the column, the source only sets
`dag_version_id`).
The trigger condition for the bulk UPDATE is `not dag_run.bundle_version and
not check_version_id_exists_in_dr(latest_dag_version.id)`, so this path runs
for any dag_run that lacks a bundle_version and has not yet been tagged with
the latest dag_version_id. It covers both the upgrade-time wave and ongoing
operation.
There is no user-visible impact today. The Task SDK retries PATCH /state in
a fresh session about one second later and the task is recorded as success. But
the api-server logs `ERROR - Error updating Task Instance state. Setting the
task to failed.` along with a 500 traceback, which is misleading. The error
handler in `ti_update_state` looks like this.
```python
except Exception:
log.exception("Error updating Task Instance state. Setting the task to
failed.", ...)
ti = session.get(TI, task_instance_id, with_for_update=True) # SELECT
FOR UPDATE on the aborted session
...
query = query.values(state=(updated_state := TaskInstanceState.FAILED))
# outer try:
try:
result = session.execute(query) # also fails on the aborted session
```
By the time this runs the session is already in `InFailedSqlTransaction`, so
the recovery `session.get(...)` fails first, the follow-up
`session.execute(query)` fails too, and the `state=failed` write never reaches
the database. Operators see "failed" in the logs and a 500 response for a task
that did not in fact fail. The Task SDK retry path is what currently keeps this
from becoming real data corruption, but if that path ever stops compensating,
the same bug starts recording successful tasks as failed.
### What you think should happen instead?
`_verify_integrity_if_dag_changed` should adopt the same locking discipline
as #65920 / #65836: SELECT id ORDER BY id LIMIT N FOR UPDATE SKIP LOCKED, then
UPDATE WHERE id IN (...), in bounded batches, so both writers lock in PK order.
The error handler in `ti_update_state` is harder to defend. Treating
OperationalError with PG SQLSTATE 40P01 / 40001 (or MySQL 1213) as retryable
and returning an explicit retryable response with Retry-After (e.g., 503
Service Unavailable) would make the Task SDK retry a real contract instead of
relying on the 500 response being retried. If a non-retryable failure write is
still wanted as a fallback, it needs to run in a fresh session, since the one
the handler inherits is already aborted.
### Operating System
Debian 12 (apache/airflow:slim-3.2.1-python3.12 image)
### Deployment
Other Docker-based deployment
### Apache Airflow Provider(s)
_No response_
### Versions of Apache Airflow Providers
_No response_
### Official Helm Chart version
Not Applicable
### Kubernetes Version
_No response_
### Helm Chart configuration
_No response_
### Docker Image customizations
_No response_
### Anything else?
Related: #65818, #65920, #65836.
The traceback and SQL evidence above are from a running Airflow 3.2.1
deployment on Amazon ECS. Let us know if more diagnostic data would help with
investigation.
Drafted with AI assistance (Claude Opus 4.7 with 1M context); content
verified against Airflow 3.1.8 and 3.2.1 sources before submission.
### Are you willing to submit PR?
- [x] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]