Dev-iL opened a new issue, #67799:
URL: https://github.com/apache/airflow/issues/67799

   ### Description
   
   Incrementally convert Airflow's FastAPI routes from the synchronous 
`SessionDep` to the async `AsyncSessionDep`, so DB I/O no longer occupies an 
AnyIO threadpool worker and instead yields the event loop. The async 
infrastructure already ships in Airflow 3.x; adoption is what's missing.
   
   This is an umbrella/tracking epic. Individual conversions land as their own 
small, parity-focused PRs.
   
   ### Use case/motivation
   
   - The async SQLAlchemy plumbing already exists: `create_async_engine`, 
`async_sessionmaker`, `AsyncSessionDep`, `create_session_async`, 
`paginated_select_async`, the `sql_alchemy_conn_async` config key, and 
automatic async-driver derivation. None of this needs to be built — only 
adopted. Adoption inside the Execution API started at ~0 of ~49 routes.
   - The relevant bottleneck on Airflow 3.x is the **Starlette/AnyIO threadpool 
inside the API server** (default 40 workers), not the Triggerer event loop. In 
3.x the Triggerer and Workers reach the metadata DB through the Execution API 
over HTTP, not directly — so the original framing of PR #36504 ("unblock the 
Triggerer loop") no longer applies; the win is at the API server. See 
`dev/async_session_poc/PR36504_NOTES.md` and `RESULTS.md`.
   - Measured A/B (single uvicorn worker): async wins cleanly at moderate 
concurrency (e.g. `/sleep` 500 ms: +23% RPS, −38% p99 at c=50) and inverts 
above the threadpool ceiling (c≥100) on a single worker. A follow-up 
multi-worker run (workers ∈ {1, 4}) with a multi-process client was 
inconclusive on a single 8-core box — the clean-regime async win holds (+2% to 
+24% RPS), but high-concurrency cells saturated and the 4-worker cells 
exhausted Postgres `max_connections` (see 
[`RESULTS_MULTIWORKER.md`](RESULTS_MULTIWORKER.md)). Notably, removing the 
client GIL did *not* remove the single-worker inversion, which locates it 
server-side (one event loop serialising CPU-bound request/response work), not 
in the client harness.
   
   ## Definition of done per endpoint
   
   Each conversion PR must:
   
   1. Convert `def` → `async def`, `SessionDep` → `AsyncSessionDep`, and 
`await` all `session.execute/scalar/scalars` calls. No `commit()` added in 
routes (the async dependency commits / rolls back, mirroring sync).
   2. Preserve the contract exactly — same response model, query/path 
signature, and status codes. No Execution API version bump for a pure 
sync→async swap.
   3. Pass the endpoint's existing tests **byte-identically** where they don't 
mock the session; update only session-mocking tests to patch 
`sqlalchemy.ext.asyncio.AsyncSession` with async interceptors.
   4. Add the `reconfigure_async_db_engine` autouse fixture (calls 
`_configure_async_session()`) to any test class that exercises a converted 
route — the async engine binds its pool to the creating event loop, and the 
test harness uses a fresh loop per test. (Pattern: `TestWaitDagRun`, 
`TestTIHealthEndpoint`.)
   5. Verify on **all three supported backends** — SQLite (aiosqlite), Postgres 
(asyncpg), MySQL (aiomysql) — not SQLite alone (`SELECT … FOR UPDATE` is a 
no-op there, and `rowcount` semantics differ per driver).
   
   ## Cross-cutting work (shared dependencies / risks)
   
   - **asyncpg + PgBouncer-safe engine defaults** — tracked in 
#\<tracking-issue\>. The more routes go async, the more this matters.
   - **Async secrets backends** — prerequisite for any route that resolves 
Variable/Connection *values*.
   - **Multi-worker benchmark** — a first multi-worker (`AIRFLOW__API__WORKERS` 
∈ {1, 4}) + multi-process-client run is done 
([`RESULTS_MULTIWORKER.md`](RESULTS_MULTIWORKER.md)) but was inconclusive on a 
single shared-core box. A definitive crossover test still needs separate 
server/client hosts, per-worker pools sized so `pool_size × workers ≤ 
max_connections`, and a raised `max_connections`. Until a clean 
high-concurrency win is shown, conversions remain justified by parity, not by a 
measured throughput claim.
   - **Async-engine disposal at teardown** — `dispose_orm` closes the async 
engine synchronously, logging a cosmetic `Event loop is closed` at session 
teardown on all async drivers (exit code unaffected). Worth tidying once, 
centrally.
   - **Connection budget** — each converted route shifts load onto the async 
pool; a busy API server can hold both the sync and async pools. With multiple 
API-server workers, each worker holds its *own* async pool, so demand is 
roughly `pool_size × workers` — the multi-worker benchmark exhausted Postgres 
`max_connections=100` precisely because the per-worker pool was not sized for 
that multiplication. Size per-worker pools so `pool_size × workers` fits 
`max_connections`, and watch headroom as adoption grows.
   
   ## Guiding principles
   
   - One endpoint (or a small, cohesive batch) per PR; parity over cleverness.
   - Prefer read endpoints with comprehensive existing tests first; defer 
anything needing async secrets backends.
   - Don't convert sibling routes opportunistically — keep diffs small and 
reviewable.
   
   ## References
   
   - Benchmark code / experiments at: 
https://github.com/apache/airflow/compare/main...Dev-iL:airflow:2605/async_sqla_poc
 <br>(see `dev/async_session_poc/PR36504_NOTES.md`, `RESULTS.md`, 
`results.csv`, `diagnostics/pg_stat_activity.log`)
   
   
   ### Related issues
   
   - #36504
   
   ### Are you willing to submit a PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to