Dev-iL commented on PR #36504:
URL: https://github.com/apache/airflow/pull/36504#issuecomment-4450027580

   I asked Opus to consider this proposal in light of the changes introduced in 
Airflow 3 (internals as well as dependencies). Below is the result of 
implementing the principles raised herein to Airflow 3.2/3.3 and suggested next 
steps.
   
   All code that was generated as part of this effort can be found in: 
https://github.com/Dev-iL/airflow/commit/90070e1920bd7d5e38a999387ee63c2d1f7e5dd3
   
   <details><summary>Testing spec</summary>
   <p>
   
   ~~~md
   # Definition: Async-SQLA Execution-API POC + Benchmark
   
   ## 1. Intent & Context
   
   - **Goal:** Demonstrate, with reproducible numbers on Airflow 3.2 + 
Postgres, that converting an Execution-API read endpoint from sync `SessionDep` 
to async `AsyncSessionDep` removes the Starlette threadpool ceiling under 
concurrent DB-bound load. This is the modern reframing of PR #36504; the 
original Triggerer-direct-DB motivation is architecturally obsolete in 3.x, but 
FastAPI route conversion is the underlying win that remains.
   - **Mental Model:**
     - Async SQLA infrastructure already exists in `airflow-core` 
(`create_async_engine`, `async_sessionmaker`, `AsyncSessionDep`, 
`create_session_async`, `paginated_select_async`, `sql_alchemy_conn_async` 
config with automatic `AIO_LIBS_MAPPING` derivation in `settings.py:240`). 
Adoption inside FastAPI routes is near-zero (0 routes use `AsyncSessionDep`, 49 
use `SessionDep` in core_api alone). The work is *adoption*, not infrastructure.
     - Conversion target must use `SessionDep` directly with plain SQLA 
queries. `get_variable` and `get_connection` are NOT eligible — they call 
`Variable.get(...)` / `Connection.get_connection_from_secrets(...)`, which 
traverse the SecretsBackend chain. Converting those would require building 
async secrets backends (PR #36504-scale work), out of POC scope. Eligible 
target: `get_variable_keys` in 
`airflow-core/src/airflow/api_fastapi/execution_api/routes/variables.py` — 
already uses `SessionDep` + `session.scalar(select(...))` patterns, 
mechanically convertible to `AsyncSessionDep` + `paginated_select_async`.
     - The benchmark must isolate the variable of interest. Use a **debug-only 
synthetic route pair** as the controlled A/B (identical SQL, only session type 
differs), plus the real `get_variable_keys` conversion as a 
realistic-conversion proof. The pedagogical exhibit is a `SELECT pg_sleep(...)` 
route: sync mode hits the FastAPI/Starlette threadpool ceiling 
(`AnyIO.to_thread`, default 40) and starves; async mode does not.
     - Default async PG driver is **asyncpg** (per `AIO_LIBS_MAPPING`), not 
psycopg v3 async. The benchmark therefore compares "async SQLA via asyncpg vs 
sync SQLA via psycopg2/3" — this framing must appear verbatim in the report so 
the win is not misread as a psycopg-v3-async claim.
     - This is a POC under `dev/`. Not destined to ship in this manifest's 
scope: no newsfragment, no PR, no provider changes, no production rollout. A 
follow-up PR migrating more routes is downstream of the benchmark numbers.
   - **Mode:** balanced
   - **Interview:** autonomous
   - **Medium:** local
   - **Cache:** manifest
   
   ## 2. Approach
   
   - **Architecture:**
     - **Synthetic bench router** (POC instrumentation, gated by env var): a 
new FastAPI router file at 
`airflow-core/src/airflow/api_fastapi/execution_api/routes/_benchmark.py`, 
mounted into the Execution API only when 
`AIRFLOW__BENCHMARK__ENABLE_ASYNC_SESSION_ROUTES=true`. Provides three matched 
pairs of routes under `/execution/__bench/`:
       - `/select1/sync` vs `/select1/async` — `SELECT 1`, isolates per-request 
session overhead.
       - `/sleep/sync?ms=N` vs `/sleep/async?ms=N` — `SELECT 
pg_sleep(N/1000.0)`, demonstrates threadpool starvation under slow queries.
       - `/var_count/sync` vs `/var_count/async` — `SELECT COUNT(*) FROM 
variable`, realistic single-row aggregate.
       - All six routes bypass `CurrentTIToken`/`has_*_access` deps 
(debug-only, gated) so request auth is not in the hot path.
     - **Real-route conversion**: convert `get_variable_keys` in `variables.py` 
from sync `def` + `SessionDep` + `session.scalar/scalars` to `async def` + 
`AsyncSessionDep` + `await session.scalar(...)` / `await session.scalars(...)` 
/ `paginated_select_async` for the count. No other endpoint converted in this 
manifest's scope.
     - **Benchmark harness** at `dev/async_session_poc/run_bench.py`: 
pure-Python (`httpx.AsyncClient`), no external tools 
(`hey`/`wrk`/`oha`/`locust` are not installed locally — verified). Spawns N 
concurrent worker coroutines for a fixed duration against a target URL, records 
per-request wall-clock latency, emits RPS + p50/p95/p99 + error rate. Sweeps 
concurrency `c={50, 100, 200, 400}`, duration 30s per sweep.
     - **Seed script** at `dev/async_session_poc/seed.py`: inserts 1000 
Variables (idempotent — `INSERT ... ON CONFLICT DO NOTHING`) so the 
realistic-query path returns non-trivial work.
     - **Driver script** at `dev/async_session_poc/run_all.sh`: orchestrates 
`breeze start-airflow --backend postgres --integration ...`, seeds the DB, runs 
the harness against the synthetic bench routes at all four concurrency levels, 
writes results CSV and a results markdown. **Calibration**: the script exports 
`AIRFLOW__DATABASE__SQL_ALCHEMY_POOL_SIZE=20`, 
`AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_OVERFLOW=480` (so combined pool ≥ 500 > max 
concurrency 400) before launching the API server, leaves AnyIO threadpool at 
its Starlette default of 40 (read and recorded at startup), and targets 
`http://localhost:8080/execution/__bench/*` (breeze API-server default port; 
override via `BENCH_TARGET_BASE_URL` env var).
     - **Results report** at `dev/async_session_poc/RESULTS.md`: tables (RPS, 
p50/p95/p99, error rate) per route × concurrency, plus framing paragraph 
stating exactly what driver pair was compared and what the result does/doesn't 
prove (does prove: async-route DB-bound throughput under contention; does NOT 
prove: Triggerer/Worker wins, since they don't hit DB directly in 3.x).
   
   - **Execution Order:**
     - D1 (seed script) → D2 (synthetic bench router) → D3 (harness) → D4 
(real-route conversion) → D5 (run benchmarks, write report)
     - Rationale: seed + bench router + harness are independent and can be 
validated against `SELECT 1` before D4 lands; D4 is the lowest-blast-radius 
production-code touch and only needs to land before the final benchmark run; D5 
depends on all four.
   
   - **Risk Areas:**
     - [R-1] Async ORM lazy-loading trap on the realistic conversion | Detect: 
existing `test_variables.py` tests must pass after conversion; any 
`MissingGreenlet` exception or `await` on a non-async attribute will surface 
there.
     - [R-2] Benchmark client itself is the bottleneck (Python GIL, 
single-process `httpx.AsyncClient`) instead of the server | Detect: at low 
concurrency (c=50), both sync and async should be CPU-light on the client; 
client CPU >80% during a run means the harness is saturated. Mitigate by 
running the harness against a no-op static endpoint baseline and confirming RPS 
≫ what we see in the DB-bound tests.
     - [R-3] Threadpool ceiling not actually hit at c=400 in sync mode | 
Detect: if sync `/sleep` at c=400 doesn't show p99 cliff, the threadpool may be 
larger than expected (e.g., uvicorn overrode the default). Mitigation: 
explicitly read `anyio.to_thread.current_default_thread_limiter().total_tokens` 
at server startup and record it in the report header; INV-G16 fails the run if 
it isn't 40.
     - [R-4] Behavior change leaks into the `get_variable_keys` conversion | 
Detect: behavior-contract AC requires existing test suite passes byte-identical 
responses; add a characterization test if coverage is thin on 
prefix/limit/offset/team_name combos.
     - [R-5] Bench routes accidentally registered in non-debug mode | Detect: 
registration is gated on 
`os.environ.get("AIRFLOW__BENCHMARK__ENABLE_ASYNC_SESSION_ROUTES") == "true"`; 
integration test asserts the routes return 404 when the env var is unset.
     - [R-6] Connection-pool size masks the real effect | Detect: report header 
records both sync `SQL_ALCHEMY_POOL_SIZE` + `SQL_ALCHEMY_MAX_OVERFLOW` and the 
async equivalents (per `settings.py:418` both engines read the same config 
keys). Mitigation: calibration sets combined pool ≥ 500 (PG-10); INV-G17 fails 
the run if either pool is configured below this.
     - [R-7] `asyncpg` not installed in the breeze image | Detect: `python -c 
'import asyncpg'` inside breeze returns non-zero. Mitigation: precheck step in 
`run_all.sh` runs the import and aborts with an actionable install hint before 
launching the bench.
   
   - **Trade-offs:**
     - [T-1] External benchmark tool (`hey`/`wrk`) vs in-repo pure-Python 
harness → Prefer pure-Python because no external tooling is pre-installed in 
the breeze container and reproducibility is higher when the client is checked 
into the repo. Cost: GIL ceiling on the client side — mitigated by R-2's 
detection.
     - [T-2] Convert a real production endpoint vs synthetic-only → Prefer 
both. Synthetic isolates the mechanism; real conversion proves the migration is 
mechanical, not theoretical. Skipping real means the POC is a microbenchmark 
with no transferable claim.
     - [T-3] `get_variable_keys` (clean conversion) vs `ti_heartbeat` (hot 
path) → Prefer `get_variable_keys`. Heartbeat is the production hot path but 
does state-machine mutation + RTIF logic + tracing emission. Conversion risk is 
high and would dilute the benchmark signal with unrelated complexity. Heartbeat 
conversion is the obvious follow-up if these numbers warrant it.
     - [T-4] Seed scale 1000 vs 10000 vs 100000 → Prefer 1000. The route 
returns up to `limit=1000` keys; larger seed exercises pagination but doesn't 
change the threadpool-ceiling shape. Bigger seeds inflate setup time without 
sharpening the signal.
     - [T-5] Postgres-only first pass vs include SQLite/MySQL → Prefer 
Postgres-only. SQLite over aiosqlite serializes through a single-writer lock 
and would muddy the result; MySQL adds async-driver maintenance load (aiomysql 
vs asyncmy) without changing the qualitative answer.
   
   ## 3. Global Invariants
   
   - [INV-G1] All Python files touched in this manifest pass `uv run ruff 
format` (no diff) and `uv run ruff check --fix` (no remaining errors).
     ```yaml
     verify:
       method: bash
       command: "uv run ruff format --check 
airflow-core/src/airflow/api_fastapi/execution_api/routes/variables.py 
airflow-core/src/airflow/api_fastapi/execution_api/routes/_benchmark.py 
dev/async_session_poc/ && uv run ruff check 
airflow-core/src/airflow/api_fastapi/execution_api/routes/variables.py 
airflow-core/src/airflow/api_fastapi/execution_api/routes/_benchmark.py 
dev/async_session_poc/"
     ```
   
   - [INV-G2] `airflow-core` mypy hook passes after the conversion (no new type 
errors introduced in the touched modules).
     ```yaml
     verify:
       method: bash
       command: "prek run mypy-airflow-core --files 
airflow-core/src/airflow/api_fastapi/execution_api/routes/variables.py 
airflow-core/src/airflow/api_fastapi/execution_api/routes/_benchmark.py"
     ```
   
   - [INV-G3] Existing execution_api variable tests pass against the converted 
route — behavior preserved.
     ```yaml
     verify:
       method: bash
       command: "uv run --project airflow-core pytest 
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_variables.py
 
airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_06_30/test_variables.py
 -xvs"
     ```
   
   - [INV-G4] Benchmark routes are gated and inert by default. With 
`AIRFLOW__BENCHMARK__ENABLE_ASYNC_SESSION_ROUTES` unset/false, all six 
`/execution/__bench/*` paths return 404.
     ```yaml
     verify:
       method: bash
       command: "uv run --project airflow-core pytest 
airflow-core/tests/unit/api_fastapi/execution_api/test_benchmark_routes.py::test_routes_gated_off
 -xvs"
     ```
   
   - [INV-G5] No newsfragment, no `provider.yaml` edit, no `chart/` edit. Scope 
ring-fence: only 
`airflow-core/src/airflow/api_fastapi/execution_api/routes/{variables,_benchmark}.py`,
 `airflow-core/src/airflow/api_fastapi/execution_api/app.py` (if mount needs 
gating), 
`airflow-core/tests/unit/api_fastapi/execution_api/test_benchmark_routes.py`, 
and `dev/async_session_poc/**` may be added or modified.
     ```yaml
     verify:
       method: bash
       # Self-amended: the original `main...HEAD` form picked up the branch's
       # pre-existing committed work (40+ files modified before this manifest's
       # changeset began). Working-tree scope (unstaged + untracked) cleanly
       # isolates manifest-introduced changes — which is what the ring-fence is
       # actually trying to gate.
       command: "{ git diff --name-only HEAD; git ls-files --others 
--exclude-standard; } | grep -vE 
'^(airflow-core/src/airflow/api_fastapi/execution_api/(routes/(variables|_benchmark)\\.py|app\\.py)|airflow-core/tests/unit/api_fastapi/execution_api/test_benchmark_routes\\.py|dev/async_session_poc/)'
 && echo 'OUT-OF-SCOPE FILES' && exit 1 || exit 0"
     ```
   
   - [INV-G6] The results report contains the literal framing string `async 
SQLA via asyncpg vs sync SQLA via psycopg` and the literal disclaimer string 
`Triggerer and Workers do not hit the metadata DB directly in 3.x`, so the win 
is not misread.
     ```yaml
     verify:
       method: bash
       command: "grep -q 'async SQLA via asyncpg vs sync SQLA via psycopg' 
dev/async_session_poc/RESULTS.md && grep -q 'Triggerer and Workers do not hit 
the metadata DB directly in 3.x' dev/async_session_poc/RESULTS.md"
     ```
   
   - [INV-G7] Defect-finding review (code-bugs) finds no LOW+ findings on 
touched production code.
     ```yaml
     verify:
       method: subagent
       agent: manifest-dev:code-bugs-reviewer
       prompt: "Audit code changes in 
airflow-core/src/airflow/api_fastapi/execution_api/routes/variables.py, 
airflow-core/src/airflow/api_fastapi/execution_api/routes/_benchmark.py, and 
any modifications to airflow-core/src/airflow/api_fastapi/execution_api/app.py 
for mechanical defects: race conditions, resource leaks (especially around 
async session/connection lifecycle), edge cases, missing-await, dangerous 
defaults, env-var gate logic bypasses. Threshold: no LOW+ findings."
     ```
   
   - [INV-G8] Change-intent review finds no LOW+ findings — the conversion 
actually delivers async behavior, not accidental sync-blocking inside an async 
wrapper.
     ```yaml
     verify:
       method: subagent
       agent: manifest-dev:change-intent-reviewer
       prompt: "Audit the get_variable_keys conversion in 
airflow-core/src/airflow/api_fastapi/execution_api/routes/variables.py, the 
synthetic bench routes in _benchmark.py, and the mount-gate logic in 
airflow-core/src/airflow/api_fastapi/execution_api/app.py. Stated intent: an 
async route that does not block the event loop on DB I/O, and a gate that only 
mounts bench routes when the env var is set. Look for behavioral divergences: 
sync calls inside async functions, await of a sync method, threadpool offload 
still happening invisibly, blocking attribute access on lazy-loaded ORM 
relationships, gate logic that admits unintended truthy values (e.g., '1', 
'yes') if scope says only 'true'. Threshold: no LOW+ findings."
     ```
   
   - [INV-G9] Contract review finds no LOW+ findings — the public response 
shape and status codes of `GET /execution/variables/keys` are unchanged.
     ```yaml
     verify:
       method: subagent
       agent: manifest-dev:contracts-reviewer
       prompt: "Compare the converted GET /execution/variables/keys endpoint in 
airflow-core/src/airflow/api_fastapi/execution_api/routes/variables.py against 
the pre-conversion contract: response model VariableKeysResponse with fields 
{keys: list[str], total_entries: int}, query params prefix, limit (1-10000), 
offset (>=0), responses {200, 401}. Confirm no breaking change for clients. 
Threshold: no LOW+ findings."
     ```
   
   - [INV-G10] Type-safety review finds no LOW+ findings on touched modules.
     ```yaml
     verify:
       method: subagent
       agent: manifest-dev:type-safety-reviewer
       prompt: "Audit type safety on 
airflow-core/src/airflow/api_fastapi/execution_api/routes/variables.py, 
_benchmark.py, app.py (gate-mount edits), and dev/async_session_poc/. Verify 
return types, awaitable vs coroutine vs result confusion, AsyncSession vs 
Session annotations, missing async generics. Threshold: no LOW+ findings."
     ```
   
   - [INV-G11] CLAUDE.md adherence review finds no MEDIUM+ findings (no 
newsfragment for non-applicable distributions, ruff invoked after each Python 
edit, no direct pytest/airflow CLI calls outside of breeze in instructions, no 
commit-author-as-agent footers added to commits, etc.).
     ```yaml
     verify:
       method: subagent
       agent: manifest-dev:context-file-adherence-reviewer
       prompt: "Audit this changeset against 
/home/iliya/repositories/airflow/CLAUDE.md. Specifically: was ruff format+check 
run on every Python file edit? Any newsfragment added for providers/airflow-ctl 
(forbidden)? Any newsfragment added for airflow-core (not wanted — this is a 
POC, not a user-visible change)? Any direct pytest/airflow CLI calls in scripts 
that should go through breeze? Any Co-Authored-By: <agent> in commits? 
Threshold: no MEDIUM+ findings."
     ```
   
   - [INV-G12] Maintainability review on the harness finds no MEDIUM+ findings 
— the harness is reproducible by another engineer.
     ```yaml
     verify:
       method: subagent
       agent: manifest-dev:code-maintainability-reviewer
       prompt: "Audit dev/async_session_poc/ for maintainability: are sweep 
parameters configurable from CLI rather than hardcoded? Are seeds and target 
URLs not buried? Is the README clear about how to re-run? Threshold: no MEDIUM+ 
findings."
     ```
   
   - [INV-G13] Simplicity review on the harness finds no MEDIUM+ findings.
     ```yaml
     verify:
       method: subagent
       agent: manifest-dev:code-simplicity-reviewer
       prompt: "Audit dev/async_session_poc/ for unnecessary complexity. A 
benchmark harness should be small (~100-200 LOC). Reject if there's 
abstraction-for-its-own-sake (frameworks, plugins, registries, classes wrapping 
single functions). Threshold: no MEDIUM+ findings."
     ```
   
   - [INV-G14] Test-quality review on any new tests added for D2 (gate test) 
and D4 (characterization tests if added).
     ```yaml
     verify:
       method: subagent
       agent: manifest-dev:test-quality-reviewer
       prompt: "Audit new tests at 
airflow-core/tests/unit/api_fastapi/execution_api/test_benchmark_routes.py and 
any characterization tests added near test_variables.py. Verify tests are not 
tautological, don't mock the SUT, assert meaningful properties (response 
status, body, gate behavior). Threshold: no MEDIUM+ findings."
     ```
   
   - [INV-G15] Requirements traceability — every Deliverable AC below maps to a 
concrete artifact in the diff.
     ```yaml
     verify:
       method: subagent
       agent: general-purpose
       prompt: "For each Deliverable D1..D5 in 
/tmp/manifest-20260514-073808.md, locate the artifact in the working tree that 
satisfies it. Report any AC whose artifact is missing or whose claim cannot be 
verified by inspecting the repo. Threshold: no MEDIUM+ findings — every 
specified AC must have an identifiable on-disk implementation."
     ```
   
   - [INV-G16] AnyIO/Starlette threadpool size at bench-server startup equals 
exactly 40 (Starlette default). The bench's recorded `threadpool_total_tokens` 
must match. The report header must include a fenced metadata block (```` ```env 
```` … ```` ``` ````) containing `threadpool_total_tokens=40` on its own line, 
so the check is format-stable across prose vs table layouts.
     ```yaml
     verify:
       method: bash
       phase: 2
       command: "python3 -c \"import re,sys; 
m=open('dev/async_session_poc/RESULTS.md').read(); 
v=re.search(r'(?m)^threadpool_total_tokens\\s*=\\s*(\\d+)\\s*$', m); sys.exit(0 
if v and v.group(1)=='40' else 1)\""
     ```
   
   - [INV-G17] DB pool calibration. The bench environment must export 
`AIRFLOW__DATABASE__SQL_ALCHEMY_POOL_SIZE` and 
`AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_OVERFLOW` whose sum is ≥ 500, and the 
RESULTS.md header must record both **sync** and **async** pool sizes (per R-6 
either pool can saturate). The fenced metadata block must contain 
`sync_pool_size`, `sync_max_overflow`, `async_pool_size`, `async_max_overflow` 
on their own lines.
     ```yaml
     verify:
       method: bash
       phase: 2
       command: "python3 -c \"import re,sys; 
m=open('dev/async_session_poc/RESULTS.md').read(); g=lambda k: 
int(re.search(r'(?m)^'+k+r'\\s*=\\s*(\\d+)\\s*$', m).group(1)); 
sps,smo,aps,amo=g('sync_pool_size'),g('sync_max_overflow'),g('async_pool_size'),g('async_max_overflow');
 sys.exit(0 if (sps+smo>=500 and aps+amo>=500) else 1)\""
     ```
   
   - [INV-G18] `asyncpg` is importable inside breeze (precheck for the bench 
run).
     ```yaml
     verify:
       method: bash
       phase: 2
       command: "breeze run python -c 'import asyncpg; 
print(asyncpg.__version__)'"
     ```
   
   ## 4. Process Guidance
   
   - [PG-1] Run existing variable-endpoint tests on `main` BEFORE starting D4. 
If they fail on `main` already (pre-existing breakage in the working 
environment), record the failures and treat the conversion's pass criterion as 
"no new failures vs baseline", not "all green".
   - [PG-2] Read each touched file immediately before editing, even if just 
read minutes ago — per global CLAUDE.md "Edit Integrity" rule.
   - [PG-3] After every Python file edit, run `uv run ruff format <file>` and 
`uv run ruff check --fix <file>` before moving on (CLAUDE.md project rule).
   - [PG-4] Benchmark runs go through `breeze start-airflow --backend postgres 
...`, never against host Python directly. The pure-Python harness client may 
run from host (it only needs `httpx`) — but the API server under test must run 
inside breeze so the env is reproducible.
   - [PG-5] The synthetic bench router file is named with a leading underscore 
(`_benchmark.py`) and the gate env var lives under an `AIRFLOW__BENCHMARK__` 
namespace (not `AIRFLOW__CORE__`) — both signals that this is debug/POC 
instrumentation, not production-facing config.
   - [PG-6] The results report must record: Airflow version (`airflow 
version`), Postgres version (`SELECT version()`), sync DB URL scheme, async DB 
URL scheme, `SQL_ALCHEMY_POOL_SIZE`, `SQL_ALCHEMY_MAX_OVERFLOW`, 
Starlette/AnyIO threadpool size, host CPU/memory. Otherwise the numbers are not 
reproducible.
   - [PG-7] If the harness shows a confusing or weak result (e.g., async is not 
faster), do NOT massage parameters until it looks favorable. Record the actual 
numbers and propose investigation — the POC's value is in honest measurement, 
not advocacy.
   - [PG-8] Commits go through `git commit` normally; no `Co-Authored-By: 
Claude` (CLAUDE.md forbids agent self-as-co-author).
   - [PG-9] Treat `dev/async_session_poc/` as scratch-quality code by 
convention — but the touched `variables.py` and any test files are production 
code and must meet production standards.
   - [PG-10] Benchmark target base URL: `http://localhost:8080` by default 
(breeze API-server default). Override via `BENCH_TARGET_BASE_URL` env var. 
Encode as a constant in `run_all.sh` (`: 
"${BENCH_TARGET_BASE_URL:=http://localhost:8080}"`); the Python harness accepts 
it via `--url`.
   - [PG-11] Calibration is a precondition for D5 (not a fix-up after the 
fact). `run_all.sh` MUST: (a) verify `asyncpg` import, (b) export 
`AIRFLOW__DATABASE__SQL_ALCHEMY_POOL_SIZE=40` and 
`AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_OVERFLOW=560` BEFORE launching the API 
server (sum = 600, providing ~50% headroom over max concurrency 400 — 
minimum-viable would be 500; the extra margin absorbs housekeeping/background 
slots so transient saturation doesn't masquerade as the threadpool effect), (c) 
on first request, log the live 
`anyio.to_thread.current_default_thread_limiter().total_tokens` so it appears 
in API server logs and can be copied into `RESULTS.md`. If any precondition 
fails, abort the bench with a non-zero exit and a clear message.
   - [PG-12] Warmup default = 5s (not 2s). Async pool + asyncpg + DNS+TLS ramp 
may not stabilize in 2s at high concurrency. The harness CLI keeps `--warmup` 
adjustable; the canonical run uses 5s.
   
   ## 5. Known Assumptions
   
   - [ASM-1] (auto) Conversion target = `get_variable_keys`. Default: chosen 
over `get_variable`/`get_connection`. Impact if wrong: if the user actually 
wanted the user-Connection lookup endpoint converted, they'd be unhappy that we 
converted a list endpoint instead — but `get_variable`/`get_connection` require 
building async secrets backends (PR #36504-scope work, beyond a single-endpoint 
POC). Recovery: amend to add the async-secrets-backend work as additional 
deliverables.
   - [ASM-2] (auto) Benchmark client = pure-Python `httpx.AsyncClient`. 
Default: chosen because `hey`/`wrk`/`oha`/`locust` are NOT installed on host 
(verified). Impact if wrong: if the GIL ceiling on the client distorts results, 
we'd need to switch to `hey`/`wrk`. R-2 detects this.
   - [ASM-3] (auto) Concurrency sweep `c={50, 100, 200, 400}`, duration 30s per 
cell. Default: covers below and above the default Starlette threadpool size 
(40) and gives stable percentiles in 30s for these endpoints. Impact if wrong: 
if the threadpool-ceiling effect doesn't show by c=400, we'd extend to c=800; 
cheap to redo.
   - [ASM-4] (auto) Seed = 1000 Variables, idempotent insert. Default: matches 
the `limit<=10000` ceiling on the keys endpoint without trivializing the count 
query. Impact if wrong: trivial — re-seed with different count.
   - [ASM-5] (auto) Default async PG driver = asyncpg via 
`_get_async_conn_uri_from_sync` derivation. Default: matches Airflow's built-in 
behavior — does NOT explicitly set `sql_alchemy_conn_async` to 
`postgresql+psycopg://...`. Impact if wrong: report would need to claim 
"psycopg-v3 async vs sync", which is not what we're measuring. INV-G6 enforces 
honest framing.
   - [ASM-6] (auto) Synthetic bench router gated by env var 
`AIRFLOW__BENCHMARK__ENABLE_ASYNC_SESSION_ROUTES=true`. Default: env-var gate 
is the simplest and lowest-blast-radius mechanism; doesn't require a 
config-file change. Impact if wrong: if reviewer prefers a config-key in 
`airflow.cfg` under a `[benchmark]` section, that's a trivial swap.
   - [ASM-7] (auto) Postgres-only scope, no SQLite/MySQL coverage in this 
manifest. Default: keeps the comparison clean — SQLite's single-writer lock and 
MySQL's async-driver picks (aiomysql vs asyncmy) are separate concerns. Impact 
if wrong: adds D6/D7 in a follow-up manifest.
   - [ASM-8] (auto) No PR opened, no upstream push, no fork-branch push at the 
end of this manifest. Default: framing is "POC measurements", and the user's 
next decision (after seeing numbers) determines whether to migrate further 
routes in a real PR. Impact if wrong: trivial — open the PR after manifest 
completes if results warrant.
   - [ASM-9] (auto) Sample size per cell = whatever 30s yields after a 5s 
warmup discard (per PG-12). Default: time-bounded is more honest than 
count-bounded under saturation. Impact if wrong: if percentiles are noisy, 
extend duration to 60s.
   - [ASM-10] (auto) `asyncpg` is available inside breeze's standard 
postgres-backend image. Default: it's a transitive dep via the async-SQLA path 
already wired in `settings.py`, and a precheck (PG-11/INV-G18) confirms before 
the bench runs. Impact if wrong: bench aborts cleanly; install step (`uv add 
asyncpg --project airflow-core` or breeze image rebuild) is then required.
   - [ASM-11] (auto) The threadpool-starvation pedagogical effect requires 
**server-side** sleep — `pg_sleep(N)` in the database, not `asyncio.sleep(N)` 
or `time.sleep(N)` in Python. Default: implemented as `await 
session.execute(text('SELECT pg_sleep(:s)'), {'s': ms/1000.0})` (async) / 
`session.execute(text('SELECT pg_sleep(:s)'), {'s': ms/1000.0})` (sync). Impact 
if wrong: a Python-side sleep would not exercise DB-pool/threadpool contention 
and would invalidate the demonstration.
   - [ASM-12] (auto) The benchmark sweep covers only the six synthetic bench 
routes (24 cells: 6 routes × 4 concurrencies). The real-route conversion D4 is 
verified by tests (AC-4.2), not by harness throughput, because driving 
`get_variable_keys` requires minting per-request `CurrentTIToken` JWTs — auth 
complexity that would dilute the A/B signal. Default: drop `real_var_keys` from 
CSV/report; D4 stands on test pass alone. Impact if wrong: if a real-route 
throughput number is required, follow-up adds a JWT-minting helper and re-runs.
   
   ## 6. Deliverables
   
   ### Deliverable 1: Seed script
   
   A script that idempotently seeds the metadata DB with 1000 Variables 
(key=`bench_var_NNNN`, value=`v_NNNN`) for the realistic-query benchmark path. 
Runs against the Postgres backend in breeze.
   
   **Acceptance Criteria:**
   
   - [AC-1.1] `dev/async_session_poc/seed.py` exists and is executable as 
`breeze run python dev/async_session_poc/seed.py`.
     ```yaml
     verify:
       method: bash
       command: "test -f dev/async_session_poc/seed.py"
     ```
   - [AC-1.2] After running the seed script twice against a fresh DB, exactly 
1000 `bench_var_*` keys exist (idempotent).
     ```yaml
     verify:
       method: bash
       phase: 2
       command: "breeze run python dev/async_session_poc/seed.py && breeze run 
python dev/async_session_poc/seed.py && breeze run python -c \"from 
airflow.utils.session import create_session; from airflow.models.variable 
import Variable; from sqlalchemy import select, func; s = create_session(); 
print(s.scalar(select(func.count()).select_from(select(Variable.key).where(Variable.key.like('bench_var_%')).subquery())))\"
 | tail -1 | grep -x 1000"
     ```
   
   ### Deliverable 2: Synthetic bench router
   
   A gated debug router exposing six routes under `/execution/__bench/`: three 
pairs of (sync, async) routes that issue identical SQL — `SELECT 1`, `SELECT 
pg_sleep(:s)`, `SELECT COUNT(*) FROM variable` — using only the session-type 
difference between each pair.
   
   **Acceptance Criteria:**
   
   - [AC-2.1] File 
`airflow-core/src/airflow/api_fastapi/execution_api/routes/_benchmark.py` 
exists; defines a router with exactly six routes: `/select1/sync`, 
`/select1/async`, `/sleep/sync`, `/sleep/async`, `/var_count/sync`, 
`/var_count/async`.
     ```yaml
     verify:
       method: codebase
       prompt: "Confirm 
airflow-core/src/airflow/api_fastapi/execution_api/routes/_benchmark.py defines 
exactly the six routes listed; confirm sync routes use SessionDep and async 
routes use AsyncSessionDep; confirm /sleep routes accept a `ms` query param 
(int, >=0, <=10000) and the SQL is parameterized (no string interpolation)."
     ```
   - [AC-2.2] Router is mounted into the Execution API only when env var 
`AIRFLOW__BENCHMARK__ENABLE_ASYNC_SESSION_ROUTES` is set to the **exact 
case-sensitive string `"true"`**. Any other value (unset, `"True"`, `"1"`, 
`"yes"`, empty string, anything else) leaves the routes unmounted. Strictness 
is intentional: this is debug instrumentation and ambiguity around what counts 
as "enabled" is a security/scope risk.
     ```yaml
     verify:
       method: codebase
       prompt: "Confirm the mount logic in 
airflow-core/src/airflow/api_fastapi/execution_api/app.py (or equivalent 
router-registration site) is gated by 
`os.environ.get('AIRFLOW__BENCHMARK__ENABLE_ASYNC_SESSION_ROUTES') == 'true'` — 
exact case-sensitive string equality, no .lower(), no boolean-conf parser, no 
membership check against a truthy set."
     ```
   - [AC-2.3] Gate test passes: 
`airflow-core/tests/unit/api_fastapi/execution_api/test_benchmark_routes.py` 
contains at least three tests — `test_routes_gated_off_unset` (asserts 404 for 
all 6 routes when env var is unset), `test_routes_gated_off_truthy_variants` 
(asserts 404 for all 6 routes when env var is set to each of `"True"`, `"1"`, 
`"yes"`, `""`, `"false"`), and `test_routes_gated_on` (asserts 
200/expected-body for all 6 routes when env var is exactly `"true"`).
     ```yaml
     verify:
       method: bash
       command: "uv run --project airflow-core pytest 
airflow-core/tests/unit/api_fastapi/execution_api/test_benchmark_routes.py -xvs"
     ```
   - [AC-2.4] The sync routes use plain `Session` with `session.execute(...)`; 
the async routes use `AsyncSession` with `await session.execute(...)`. No code 
path uses `asyncio.to_thread`, no sync route is `async def`, no async route is 
plain `def`.
     ```yaml
     verify:
       method: subagent
       agent: manifest-dev:code-bugs-reviewer
       prompt: "Audit 
airflow-core/src/airflow/api_fastapi/execution_api/routes/_benchmark.py: 
confirm sync routes are `def` + SessionDep + sync session calls, and async 
routes are `async def` + AsyncSessionDep + awaited calls. Flag any threadpool 
offload or mixed mode. Threshold: no LOW+."
     ```
   
   ### Deliverable 3: Benchmark harness
   
   A pure-Python concurrency-sweep harness driven by `httpx.AsyncClient`. Takes 
URL, concurrency, duration; emits per-request latencies and aggregated 
RPS/p50/p95/p99/error-rate.
   
   **Acceptance Criteria:**
   
   - [AC-3.1] `dev/async_session_poc/run_bench.py` exists. CLI accepts `--url 
URL --concurrency N --duration SECONDS --warmup SECONDS --output PATH.csv`. 
Defaults: `--warmup 5 --output -` (stdout) — aligned with PG-12 canonical 
warmup.
     ```yaml
     verify:
       method: bash
       command: "test -f dev/async_session_poc/run_bench.py && uv run --project 
airflow-core python dev/async_session_poc/run_bench.py --help | grep -qE 
'(--url|--concurrency|--duration|--warmup|--output)'"
     ```
   - [AC-3.2] Harness records: per-request wall-clock latency (monotonic 
clock), HTTP status, completion time. Aggregates RPS (excluding warmup window), 
p50/p95/p99 (HDR or numpy.percentile is acceptable), error rate (non-200 
responses / total).
     ```yaml
     verify:
       method: codebase
       prompt: "Confirm dev/async_session_poc/run_bench.py uses 
time.monotonic() for latencies (CLAUDE.md project rule), excludes the warmup 
window from aggregates, and emits the listed metrics."
     ```
   - [AC-3.3] Harness self-test: running against `httpbin.org/anything` or a 
local placeholder endpoint at c=10 d=5s produces a non-empty CSV with 
reasonable values (RPS > 0, p50 > 0).
     ```yaml
     verify:
       method: bash
       phase: 2
       command: "cd dev/async_session_poc && uv run --project airflow-core 
python run_bench.py --url https://httpbin.org/anything --concurrency 10 
--duration 5 --warmup 1 --output /tmp/bench_selftest.csv && test -s 
/tmp/bench_selftest.csv && head -2 /tmp/bench_selftest.csv | grep -qE 
'^(metric|p50|p95|p99|rps),'"
     ```
   - [AC-3.4] No `assert` in production code (the harness counts as POC 
tooling, but ruff/style rules still apply); no `time.time()` for durations.
     ```yaml
     verify:
       method: bash
       command: "! grep -nE '\\btime\\.time\\(\\)' 
dev/async_session_poc/run_bench.py && ! grep -nE '^\\s*assert ' 
dev/async_session_poc/run_bench.py"
     ```
   
   ### Deliverable 4: Convert `get_variable_keys` to async
   
   Mechanical conversion of the existing sync endpoint to `async def` using 
`AsyncSessionDep` and `paginated_select_async`. No behavior change: same 
response model, same query params, same status codes, same response shape.
   
   **Acceptance Criteria:**
   
   - [AC-4.1] `get_variable_keys` in 
`airflow-core/src/airflow/api_fastapi/execution_api/routes/variables.py` is 
`async def` and uses `AsyncSessionDep` (not `SessionDep`). All session calls 
are awaited.
     ```yaml
     verify:
       method: codebase
       prompt: "Read 
airflow-core/src/airflow/api_fastapi/execution_api/routes/variables.py and 
confirm get_variable_keys is declared `async def`, its session parameter is 
annotated AsyncSessionDep, and every session.* call is awaited. The count query 
should use paginated_select_async or an explicit `await session.scalar(...)`. 
The response model VariableKeysResponse and parameter signatures (prefix, limit 
ge=1 le=10000, offset ge=0, team_name dep) are unchanged."
     ```
   - [AC-4.2] Existing test suite for the Variables execution-api route passes.
     ```yaml
     verify:
       method: bash
       command: "uv run --project airflow-core pytest 
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_variables.py
 
airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_06_30/test_variables.py
 -xvs"
     ```
   - [AC-4.3] No other endpoint in `variables.py` is converted. `get_variable`, 
`put_variable`, `delete_variable` retain their pre-existing sync signatures 
(out-of-scope for this POC).
     ```yaml
     verify:
       method: codebase
       prompt: "Confirm get_variable, put_variable, delete_variable in 
airflow-core/src/airflow/api_fastapi/execution_api/routes/variables.py are 
still plain `def` (not async def) and have not been touched beyond imports."
     ```
   
   ### Deliverable 5: Run benchmarks + write results report
   
   Execute the harness across the six synthetic bench routes at the concurrency 
sweep (per ASM-12, the real `get_variable_keys` is verified by tests not 
throughput). Write CSV + markdown report with the framing required by INV-G6.
   
   **Acceptance Criteria:**
   
   - [AC-5.1] `dev/async_session_poc/results.csv` exists, contains rows for at 
least: `{select1, sleep_500ms, var_count} × {sync, async} × {50, 100, 200, 
400}` = 24 cells (the synthetic-route sweep; per ASM-12, the real-route 
`get_variable_keys` is verified by tests not harness). Columns at minimum: 
`route, mode, concurrency, rps, p50_ms, p95_ms, p99_ms, error_rate, 
sample_count`.
     ```yaml
     verify:
       method: bash
       phase: 2
       command: "test -s dev/async_session_poc/results.csv && head -1 
dev/async_session_poc/results.csv | grep -qE 
'route.*mode.*concurrency.*rps.*p50_ms.*p95_ms.*p99_ms.*error_rate.*sample_count'
 && wc -l dev/async_session_poc/results.csv | awk '{ if ($1 < 25) { print \"too 
few rows: \"$1; exit 1 } }'"
     ```
   - [AC-5.2] `dev/async_session_poc/RESULTS.md` exists; contains a markdown 
table per route, environment header (Airflow version, Postgres version, sync DB 
URL scheme, async DB URL scheme, sync pool size, async pool size, threadpool 
size, host CPU/RAM per PG-6), and analysis paragraphs.
     ```yaml
     verify:
       method: codebase
       prompt: "Read dev/async_session_poc/RESULTS.md. Confirm presence of: (1) 
an environment-header section listing Airflow version, Postgres version, both 
sync+async DB URL schemes, both sync+async pool sizes, the Starlette/AnyIO 
threadpool size, and host CPU/RAM; (2) a results table per route (or one 
combined table) covering the 24 cells from AC-5.1; (3) analysis paragraphs 
distinguishing what the data does and doesn't prove."
     ```
   - [AC-5.3] Report's framing strings present (cross-checked by INV-G6 but 
listed here as the deliverable's own AC).
     ```yaml
     verify:
       method: bash
       command: "grep -q 'async SQLA via asyncpg vs sync SQLA via psycopg' 
dev/async_session_poc/RESULTS.md && grep -q 'Triggerer and Workers do not hit 
the metadata DB directly in 3.x' dev/async_session_poc/RESULTS.md"
     ```
   - [AC-5.4] For the `/sleep` pair at concurrency 200, async RPS is strictly 
higher than sync RPS (the pedagogical exhibit must demonstrate the effect). If 
it does not, the run is treated as **diagnostic-required** and PASSes only when 
`RESULTS.md` contains an `## Anomalies` section that names the cause AND 
includes concrete diagnostic evidence — not just a written explanation. 
Required diagnostic artifacts when claiming pool saturation: Postgres 
`pg_stat_activity` count snapshots showing active-connection peak; when 
claiming threadpool exhaustion: live 
`anyio.to_thread.current_default_thread_limiter()` token state during the run; 
when claiming client-side GIL ceiling: harness CPU% during the sync vs async 
runs side-by-side.
     ```yaml
     verify:
       method: subagent
       phase: 2
       agent: manifest-dev:criteria-checker
       prompt: "Read dev/async_session_poc/results.csv. Locate the rows for 
route=sleep_500ms (or equivalent slow-query route name), concurrency=200, modes 
sync and async. PASS-PATH-1: async rps > sync rps. PASS-PATH-2: an '## 
Anomalies' section in dev/async_session_poc/RESULTS.md (a) names the specific 
cause from the manifest's R-2/R-3/R-6 risks and (b) cites concrete diagnostic 
evidence for that cause — pg_stat_activity counts (pool), recorded threadpool 
token state (threadpool), or harness CPU% (client). A bare textual explanation 
without diagnostic artifacts FAILS."
     ```
   ~~~
   
   </p>
   </details> 
   
   ---
   
   ## TL;DR
   
   The **motivation** behind PR #36504 — *the Triggerer talks to the metadata 
DB directly, so we should make those DB calls async to free the event loop* — 
is **architecturally obsolete on Airflow 3.x**. Triggerer (and Workers) no 
longer hit the metadata DB directly; they go through the Execution API over 
HTTP.
   
   What **remains valid and is now the actual win** is the lower-level idea 
underneath that PR: **converting Airflow's FastAPI routes from sync 
`SessionDep` to async `AsyncSessionDep` removes a real bottleneck — but the 
bottleneck is the Starlette/AnyIO threadpool inside the API server, not the 
Triggerer's loop.**
   
   This POC builds a minimal, reproducible A/B benchmark of that conversion 
against a single Airflow 3.3.0 + PostgreSQL 14 deployment and produces honest 
measured numbers — both where async helps and where it surprisingly doesn't.
   
   ## What was kept from PR #36504
   
   1. **The core conversion pattern.** Sync `SessionDep` → async 
`AsyncSessionDep`, `def` → `async def`, `session.scalar(...)` → `await 
session.scalar(...)`, `session.scalars(...).all()` → `(await 
session.scalars(...)).all()`. PR #36504 already demonstrated this mechanically 
for several call sites; the pattern is unchanged.
   2. **The async-SQLAlchemy plumbing it presupposed.** Airflow 3.x already 
ships `create_async_engine`, `async_sessionmaker`, `AsyncSessionDep`, 
`create_session_async`, `paginated_select_async`, the `sql_alchemy_conn_async` 
config key, and automatic driver derivation via `AIO_LIBS_MAPPING` 
(`airflow-core/src/airflow/settings.py:240, 387, 415-421`). *None of this had 
to be re-built* — adoption is what's missing, not infrastructure. The current 
adoption rate inside `core_api` is ~0/49 routes using `AsyncSessionDep`.
   3. **The route-conversion shape.** The POC converts exactly one real route — 
`GET /execution/variables/keys` in 
`airflow-core/src/airflow/api_fastapi/execution_api/routes/variables.py` — as a 
low-blast-radius proof that the migration is mechanical: response model 
unchanged, query-param signatures unchanged, status codes unchanged, all 30 
existing variable-execution-API tests pass byte-identical.
   
   ## What was deliberately set aside
   
   1. **The Triggerer-direct-DB framing.** In 3.x, the Triggerer evaluates 
deferred tasks but communicates via the Execution API, not direct DB access. PR 
#36504's headline benefit ("Triggerer event loop unblocked") no longer applies 
— there is no Triggerer code path in the hot zone the original change targeted. 
The README and `RESULTS.md` include an explicit disclaimer paragraph: 
*"Triggerer and Workers do not hit the metadata DB directly in 3.x."*
   2. **`get_variable` / `get_connection` conversions.** Both call into the 
SecretsBackend chain (`Variable.get(...)`, 
`Connection.get_connection_from_secrets(...)`). Converting them requires 
building **async secrets backends**, which is PR-#36504-scale work and well 
beyond a single-endpoint POC. They are explicitly out of scope here.
   3. **`ti_heartbeat` and other production hot paths.** Heartbeat is what 
matters most for throughput, but it does state-machine mutation + RTIF logic + 
tracing emission, all of which would dilute the A/B signal with unrelated 
complexity. The POC picks the cleanest-conversion candidate 
(`get_variable_keys`) instead and reserves heartbeat for a follow-up if the 
numbers warrant it.
   4. **SQLite and MySQL.** Single-writer locks on SQLite (via aiosqlite) and 
the aiomysql-vs-asyncmy driver picking on MySQL would muddy the measurement. 
Postgres-only (asyncpg) for this pass.
   5. **Newsfragment, PR, provider changes, prod rollout.** This is a POC under 
`dev/`. No newsfragment, no `provider.yaml` edit, no `chart/` edit. A follow-up 
migration PR is downstream of these numbers.
   
   ## What this POC actually does
   
   Three pieces of instrumentation, all under 
`airflow-core/src/airflow/api_fastapi/execution_api/` and 
`dev/async_session_poc/`:
   
   - **A real-route conversion** — `get_variable_keys` → `async def` + 
`AsyncSessionDep` + awaited `session.scalar/scalars`.
   - **A synthetic A/B router** at `/__bench/...`, mounted only when 
`AIRFLOW__BENCHMARK__ENABLE_ASYNC_SESSION_ROUTES=true` (exact-string gate; no 
`.lower()`, no truthy-set membership). Six routes — three pairs of identical 
SQL where the only difference is sync `SessionDep` vs async `AsyncSessionDep`:
     - `/select1/{sync,async}` — `SELECT 1` (per-request session overhead)
     - `/sleep/{sync,async}?ms=N` — `SELECT pg_sleep(N/1000.0)` (the 
threadpool-ceiling pedagogy)
     - `/var_count/{sync,async}` — `SELECT COUNT(*) FROM variable` (realistic 
aggregate; seeded with 1000 `bench_var_*` rows)
   - **A pure-Python `httpx.AsyncClient`-based harness** with a CSV/markdown 
reporting pipeline (`dev/async_session_poc/`). 24-cell sweep: 6 routes × {50, 
100, 200, 400} concurrency, 30s measured + 5s warmup per cell.
   
   ## Key outcomes
   
   Full numbers and per-route tables live in 
[`dev/async_session_poc/RESULTS.md`](RESULTS.md). The headline:
   
   ### Where async wins cleanly — c=50
   
   At concurrency 50 (below the 40-token AnyIO threadpool ceiling), async is 
consistently faster than sync on every route, with **zero error rate** on both 
sides:
   
   | route | sync RPS | async RPS | sync p99 | async p99 |
   |---|---:|---:|---:|---:|
   | `/select1` | 121.3 | 131.7 | 1749 ms | 1747 ms |
   | `/sleep` 500ms | 78.5 | **96.4** | 893 ms | **558 ms** |
   | `/var_count` | 156.6 | 175.2 | 1403 ms | 1378 ms |
   
   The `/sleep` row is the cleanest signal: identical 500 ms server-side work, 
~23% higher async throughput, ~38% better tail latency. **This is the "PR 
#36504 effect"** — at modest concurrency the async path avoids the per-request 
threadpool-dispatch overhead that the sync path pays.
   
   ### Where it inverts surprisingly — c≥100
   
   Above the threadpool ceiling, async becomes *slower* than sync on this 
single-uvicorn-worker setup, and both modes accumulate large queueing latency 
and httpx client-side timeouts:
   
   | route | concurrency | sync RPS | async RPS |
   |---|---:|---:|---:|
   | `/sleep` 500ms | 100 | 46.5 | 39.5 |
   | `/sleep` 500ms | **200** | **43.9** | **37.3** ⚠ |
   | `/sleep` 500ms | 400 | 44.5 | 29.2 |
   
   This is the **AC-5.4 inversion** the POC's acceptance criterion specifically 
calls out as diagnostic-required. We ran the inversion to ground and reached 
the following evidence-backed conclusion (full diagnostics in `RESULTS.md` § 
Anomalies, raw log at `dev/async_session_poc/diagnostics/pg_stat_activity.log`):
   
   - **R-6 (DB-pool saturation) — ruled out.** 1 Hz `pg_stat_activity` sampling 
during the full 24-cell sweep recorded a **peak active connection count of 51** 
— out of a calibrated pool of **600** (`sync_pool_size=40 + 
sync_max_overflow=560`; async engine reads the same cfg keys). The active count 
plateaus at ~40-41 (the sync threadpool's 40 workers each holding one 
connection) plus a small ~10-connection async overlay. The DB pool was never 
the gate.
   - **R-3 (sync threadpool exhaustion) — firing for sync as designed.** The 
active-conn mode at 40-41 is exactly the Starlette threadpool's 40-worker 
ceiling holding 40 simultaneous sync queries. This is the *intended* 
pedagogical effect for the sync side.
   - **The async path is bottlenecked above the DB pool.** With c=200 async 
awaiting `pg_sleep(0.5)`, ~200 simultaneous active connections were expected; 
we saw ~10 incremental async connections above the sync baseline. The 
bottleneck is upstream of the DB.
   - **Operative causes — R-2 (client GIL) and R-3-async (single uvicorn worker 
event loop), in combination.** The harness is a single Python process driving 
200+ concurrent `httpx.AsyncClient` requests; the API server is a single 
uvicorn worker whose event loop handles all async-route work + response framing 
on one Python interpreter. Sync routes get *effective* parallelism via the 
threadpool's 40 workers; async routes get only the serialization of one event 
loop.
   
   In short: **the async-route conversion does what it advertises — the route 
itself no longer blocks the event loop on DB I/O — but a single-worker uvicorn 
ceiling and a single-process client harness combine to hide that gain at high 
concurrency.**
   
   ## Why a single uvicorn worker matters
   
   This is the subtle part and the most important thing for the original PR 
thread's audience to internalize:
   
   - A sync route inside an async framework is dispatched to the AnyIO 
threadpool (default size **40**). 40 sync queries can run concurrently.
   - An async route stays on the event loop. With one uvicorn worker, **N** 
async coroutines can each `await` simultaneously — but every CPU-bound step 
(request parsing, response serialization, JSON encoding, middleware) is 
serialized on one Python interpreter.
   
   The threadpool gives sync routes a fixed-but-real form of parallelism even 
without process workers. Async routes need *either* (a) more uvicorn workers, 
*or* (b) lower-CPU-overhead handling per request, to fully exploit the 
unblocked event loop. We didn't run with multiple workers in this POC 
(deliberately, to isolate the route-level mechanism); that's the single-biggest 
follow-up.
   
   ## Next steps
   
   These are out of scope for this manifest and tracked here for the PR thread:
   
   1. **Re-run with `--workers 4` / `--workers 8`.** Repeat the same 24-cell 
sweep with a multi-worker uvicorn. Predicted: sync's ceiling tracks `workers × 
40` threadpool; async's ceiling rises with `workers`, eventually crossing sync 
at high concurrency. If that prediction holds, the original PR's win is real 
but contingent on running with reasonable worker counts — which is already 
standard production guidance.
   2. **Multi-process harness.** Run one harness process per logical core, sum 
the CSVs. Removes R-2 (client GIL) as a confound. Required to make c=200+ 
measurements trustworthy regardless of server-worker count.
   3. **Profile the async path with `py-spy`** during a c=200 `/sleep` run. 
Confirm where the time actually goes: connection acquisition, query dispatch, 
or response serialization.
   4. **Migrate `ti_heartbeat`.** Heartbeat is the production hot path (one 
call per task per heartbeat interval, per worker). Conversion touches 
state-machine + RTIF + tracing logic; needs a careful PR.
   5. **Async secrets backends.** Prerequisite to converting `get_variable` / 
`get_connection`. PR-#36504-scale work; tracked separately.
   6. **Pick a small batch of low-risk Execution-API read endpoints to 
migrate** in a real PR — list endpoints with simple `select` patterns, no 
SecretsBackend dependency, comprehensive existing test coverage. Candidates: 
variable-keys (already migrated here), connection-list (without value lookup), 
task-instance read endpoints.
   
   ## Reproducing
   
   Everything needed to re-run is in [`README.md`](README.md). One liner:
   
   ```bash
   source dev/async_session_poc/env_for_server.sh   # emitted by run_all.sh
   breeze start-airflow --backend postgres            # in one terminal
   bash dev/async_session_poc/run_all.sh              # in another
   ```
   
   Override sweep parameters via `BENCH_*` env vars (`BENCH_TARGET_BASE_URL`, 
`BENCH_DURATION`, `BENCH_WARMUP`, `BENCH_SLEEP_MS`, `BENCH_CONCURRENCIES`). 
Pool calibration knobs (`AIRFLOW__DATABASE__SQL_ALCHEMY_POOL_SIZE`, 
`AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_OVERFLOW`) must reach the api-server 
process; `run_all.sh` writes `env_for_server.sh` for that purpose.
   
   Raw 24-cell results: [`results.csv`](results.csv). Analysis and anomaly 
diagnostics: [`RESULTS.md`](RESULTS.md). Captured `pg_stat_activity` log: 
[`diagnostics/pg_stat_activity.log`](diagnostics/pg_stat_activity.log).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to