hkc-8010 opened a new issue, #66853:
URL: https://github.com/apache/airflow/issues/66853
## Problem
Under high concurrency (80+ simultaneous task completions emitting asset
events), the Airflow 3 API server dies with OOMKill. The root cause is a DB
lock contention chain:
1. `ti_update_state()` acquires `SELECT task_instance ... WITH FOR UPDATE`,
holding a PostgreSQL row lock.
2. While holding that lock, `register_asset_changes_in_db()` calls
`AssetManager.register_asset_change()`, which runs multiple slow queries
including `asset_alias_model.asset_events.append(asset_event)`. This ORM
.append() lazy-loads the **entire** `asset_events` collection for the alias --
potentially thousands of rows on long-running deployments.
3. Each slow query leaves the connection `idle in transaction` while Python
processes results. New workers needing `SELECT task_instance FOR UPDATE` on the
same row queue up, each holding a FastAPI threadpool thread via Cadwyn's
`run_in_threadpool`.
4. With 80+ concurrent completions, thread count grows unbounded until
OOMKill.
### DB evidence (confirmed from production, May 2026)
```
pid 31104: idle in transaction, xact_age=3:35
last query: SELECT asset_alias WHERE name IN ('fivetran-synced-table')
pid 31303: idle in transaction, xact_age=3:31, same pattern
-- both blocking: SELECT task_instance FOR UPDATE WHERE id = UUID(...)
```
Disabling the trigger DAGs dropped apiserver memory from 5Gi+ to MBs
instantly. Re-enabling just one DAG reproduced 76 `QueryCanceled: canceling
statement due to statement timeout` errors in 1.5 hours.
Blocking query chain confirmed from `pg_stat_activity`:
```sql
pid 31303: idle in transaction
INSERT INTO asset_alias_asset_event VALUES (2, 1151718)
pid 31104: idle in transaction
SELECT asset_alias ... WHERE name IN ('fivetran-synced-table')
-- both blocking 8 workers doing:
SELECT task_instance ... FOR UPDATE WHERE id = UUID(...)
```
## Fix
Two changes:
**1. `AssetManager.register_asset_change()` (`assets/manager.py`)**: Replace
`asset_alias_model.asset_events.append(asset_event)` +
`session.add(asset_alias_model)` with a direct `INSERT INTO
asset_alias_asset_event (alias_id, event_id)`. This eliminates the lazy-load of
the existing events collection (which can be thousands of rows) while the
task_instance row lock is held.
**2. `ti_update_state()` (`execution_api/routes/task_instances.py`)**: Add
`session.commit()` after the TI state UPDATE and Log writes to release the
`task_instance` row lock before running asset registration. Asset registration
then runs in a fresh implicit transaction. Registration failures are logged and
swallowed -- the task state is already durable at that point.
Note on `session.commit()` in a session-parameter function: this
intentionally deviates from the project convention. No code after the commit
relies on rollback; the subsequent `session.get()` re-loads fresh state.
Alternative approaches (second session, background task) were considered but
have higher operational complexity for equivalent correctness.
## Testing
- New: `test_register_asset_change_with_alias_no_lazy_load` -- confirms no
SELECT on `asset_alias_asset_event` collection during registration when
pre-existing rows exist
- New:
`test_ti_update_state_to_success_asset_registration_failure_returns_204` --
confirms 204 + TI SUCCESS when asset registration raises after commit
- All 22 existing `test_manager.py` tests pass
- All 38 existing `TestTIUpdateState` tests pass
- Breeze `--python 3.10 --db-reset` clean on both test files
## Are you willing to submit a PR?
- [x] Yes I am willing to submit a PR!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]