Dev-iL opened a new pull request, #67800:
URL: https://github.com/apache/airflow/pull/67800

    <!-- SPDX-License-Identifier: Apache-2.0
         https://www.apache.org/licenses/LICENSE-2.0 -->
   
   <!--
   Thank you for contributing!
   
   Please provide above a brief description of the changes made in this pull 
request.
   Write a good git commit message following this guide: 
http://chris.beams.io/posts/git-commit/
   
   Please make sure that your code changes are covered with tests.
   And in case of new features or big changes remember to adjust the 
documentation.
   
   Feel free to ping (in general) for the review if you do not see reaction for 
a few days
   (72 Hours is the minimum reaction time you can expect from volunteers) - we 
sometimes miss notifications.
   
   In case of an existing issue, reference it using one of the following:
   
   * closes: #ISSUE
   * related: #ISSUE
   -->
   Related:
   - #36504 
   - #67799 
   
   ## What & why
   
   Converts the Execution API `ti_heartbeat` route from the synchronous 
`SessionDep` to the async `AsyncSessionDep`, adopting the async metadata engine 
that already ships in Airflow 3.x (`create_async_engine`, `async_sessionmaker`, 
`AsyncSessionDep`, `create_session_async`).
   
   Heartbeat is the highest-QPS write path in the system (one call per running 
task per heartbeat interval, per worker), so it is a meaningful first 
production route to run on the async engine. The goal of this PR is 
**behavioral parity with zero regression** — not a throughput change. It 
follows the same low-blast-radius conversion pattern already merged for `GET 
/execution/variables/keys`.
   
   ## What changed
   
   - **Route** (`execution_api/routes/task_instances.py`): `ti_heartbeat` is 
now `async def` and takes `AsyncSessionDep`; the fast-path `UPDATE`, the 
slow-path `SELECT … FOR UPDATE` (`.one()`), the Task-Instance-History existence 
`scalar`, and the final `UPDATE` are awaited. No explicit `commit()` is added — 
the async dependency commits on success and rolls back (releasing the `FOR 
UPDATE` row lock) on exception, mirroring the sync dependency exactly. 
`synchronize_session=False` and `with_for_update()` are unchanged.
   - **Tests** (`versions/head/test_task_instances.py`):
     - The two tests that intercept the fast-path `UPDATE` now patch 
`sqlalchemy.ext.asyncio.AsyncSession` with an async interceptor (the route now 
`await`s `AsyncSession.execute`); assertions are unchanged.
     - Added a `reconfigure_async_db_engine` autouse fixture to 
`TestTIHealthEndpoint`. The async engine binds its connection pool to the event 
loop that created it, while the test harness builds a fresh app and event loop 
per test; without this, a pooled connection from a prior test's closed loop is 
reused and fails (`attached to a different loop`). This is the same workaround 
already used by `TestWaitDagRun`.
   - **Docs**: documented the asyncpg + transaction-mode PgBouncer 
prepared-statement caveat on the `sql_alchemy_connect_args_async` config 
reference and in the PgBouncer section of the database setup guide.
   - **Newsfragment** (improvement): notes that the heartbeat endpoint now uses 
the async engine and that the API server may hold both the sync and async 
connection pools concurrently, so operators should budget DB `max_connections` 
for both.
   
   ## Behavioral parity
   
   No change to the endpoint contract: same `204` success, same `404` / `409` 
(running-elsewhere / not-running) / `410` (cleared-and-archived) responses with 
identical detail payloads, same request signature. No Execution API version 
bump. The pre-existing heartbeat tests pass unchanged — that byte-identical 
pass is the parity proof.
   
   ## Testing
   
   Heartbeat suite (`-k heartbeat`, 12 tests covering success, fast path, 
slow-path fallback, `404`, `409`, `410`, and the `rowcount == 0` / 
unknown-`rowcount` fall-through branches) passes on **all three supported 
backends**:
   
   | Backend | Async driver | Result |
   |---------|--------------|--------|
   | SQLite   | aiosqlite | 12/12 ✅ |
   | Postgres | asyncpg   | 12/12 ✅ |
   | MySQL    | aiomysql  | 12/12 ✅ |
   
   `ruff` and `mypy` (airflow-core) are clean.
   
   ## Operational notes
   
   - **Connection budget**: heartbeat moving to the async engine means a busy 
API server can hold both the sync pool (for the remaining sync routes) and the 
async pool concurrently. Size DB `max_connections` accordingly (called out in 
the newsfragment).
   - **PgBouncer (transaction pooling)**: asyncpg uses server-side prepared 
statements, which break under transaction-mode PgBouncer. This is documented as 
a deployment-configuration requirement here; shipping pgbouncer-safe 
async-engine defaults is deferred and tracked in #\<tracking-issue\>.
   
   ## Note to reviewers
   
   A cosmetic `Event loop is closed` message is logged during session teardown 
when the async engine is disposed (`dispose_orm` closing the async engine 
synchronously). It appears on all three async drivers, is pre-existing engine 
disposal behavior unrelated to this change, and every run exits `0`. Tidying 
async-engine disposal is out of scope for this PR.
   
   Engine-default hardening for asyncpg/PgBouncer will be tracked in a 
dedicated issue to be opened soon.
   
   ---
   
   ##### Was generative AI tooling used to co-author this PR?
   
   <!--
   If generative AI tooling has been used in the process of authoring this PR, 
please
   change below checkbox to `[X]` followed by the name of the tool, uncomment 
the "Generated-by".
   -->
   
   - [x] Yes (please specify the tool below)
   
   Generated-by: Claude Opus 4.8 following [the 
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
   
   ---
   
   * Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information. Note: commit author/co-author name and email in commits 
become permanently public when merged.
   * For fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   * When adding dependency, check compliance with the [ASF 3rd Party License 
Policy](https://www.apache.org/legal/resolved.html#category-x).
   * For significant user-facing changes create newsfragment: 
`{pr_number}.significant.rst`, in 
[airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments).
 You can add this file in a follow-up commit after the PR is created so you 
know the PR number.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to