Dev-iL opened a new pull request, #67800:
URL: https://github.com/apache/airflow/pull/67800
<!-- SPDX-License-Identifier: Apache-2.0
https://www.apache.org/licenses/LICENSE-2.0 -->
<!--
Thank you for contributing!
Please provide above a brief description of the changes made in this pull
request.
Write a good git commit message following this guide:
http://chris.beams.io/posts/git-commit/
Please make sure that your code changes are covered with tests.
And in case of new features or big changes remember to adjust the
documentation.
Feel free to ping (in general) for the review if you do not see reaction for
a few days
(72 Hours is the minimum reaction time you can expect from volunteers) - we
sometimes miss notifications.
In case of an existing issue, reference it using one of the following:
* closes: #ISSUE
* related: #ISSUE
-->
Related:
- #36504
- #67799
## What & why
Converts the Execution API `ti_heartbeat` route from the synchronous
`SessionDep` to the async `AsyncSessionDep`, adopting the async metadata engine
that already ships in Airflow 3.x (`create_async_engine`, `async_sessionmaker`,
`AsyncSessionDep`, `create_session_async`).
Heartbeat is the highest-QPS write path in the system (one call per running
task per heartbeat interval, per worker), so it is a meaningful first
production route to run on the async engine. The goal of this PR is
**behavioral parity with zero regression** — not a throughput change. It
follows the same low-blast-radius conversion pattern already merged for `GET
/execution/variables/keys`.
## What changed
- **Route** (`execution_api/routes/task_instances.py`): `ti_heartbeat` is
now `async def` and takes `AsyncSessionDep`; the fast-path `UPDATE`, the
slow-path `SELECT … FOR UPDATE` (`.one()`), the Task-Instance-History existence
`scalar`, and the final `UPDATE` are awaited. No explicit `commit()` is added —
the async dependency commits on success and rolls back (releasing the `FOR
UPDATE` row lock) on exception, mirroring the sync dependency exactly.
`synchronize_session=False` and `with_for_update()` are unchanged.
- **Tests** (`versions/head/test_task_instances.py`):
- The two tests that intercept the fast-path `UPDATE` now patch
`sqlalchemy.ext.asyncio.AsyncSession` with an async interceptor (the route now
`await`s `AsyncSession.execute`); assertions are unchanged.
- Added a `reconfigure_async_db_engine` autouse fixture to
`TestTIHealthEndpoint`. The async engine binds its connection pool to the event
loop that created it, while the test harness builds a fresh app and event loop
per test; without this, a pooled connection from a prior test's closed loop is
reused and fails (`attached to a different loop`). This is the same workaround
already used by `TestWaitDagRun`.
- **Docs**: documented the asyncpg + transaction-mode PgBouncer
prepared-statement caveat on the `sql_alchemy_connect_args_async` config
reference and in the PgBouncer section of the database setup guide.
- **Newsfragment** (improvement): notes that the heartbeat endpoint now uses
the async engine and that the API server may hold both the sync and async
connection pools concurrently, so operators should budget DB `max_connections`
for both.
## Behavioral parity
No change to the endpoint contract: same `204` success, same `404` / `409`
(running-elsewhere / not-running) / `410` (cleared-and-archived) responses with
identical detail payloads, same request signature. No Execution API version
bump. The pre-existing heartbeat tests pass unchanged — that byte-identical
pass is the parity proof.
## Testing
Heartbeat suite (`-k heartbeat`, 12 tests covering success, fast path,
slow-path fallback, `404`, `409`, `410`, and the `rowcount == 0` /
unknown-`rowcount` fall-through branches) passes on **all three supported
backends**:
| Backend | Async driver | Result |
|---------|--------------|--------|
| SQLite | aiosqlite | 12/12 ✅ |
| Postgres | asyncpg | 12/12 ✅ |
| MySQL | aiomysql | 12/12 ✅ |
`ruff` and `mypy` (airflow-core) are clean.
## Operational notes
- **Connection budget**: heartbeat moving to the async engine means a busy
API server can hold both the sync pool (for the remaining sync routes) and the
async pool concurrently. Size DB `max_connections` accordingly (called out in
the newsfragment).
- **PgBouncer (transaction pooling)**: asyncpg uses server-side prepared
statements, which break under transaction-mode PgBouncer. This is documented as
a deployment-configuration requirement here; shipping pgbouncer-safe
async-engine defaults is deferred and tracked in #\<tracking-issue\>.
## Note to reviewers
A cosmetic `Event loop is closed` message is logged during session teardown
when the async engine is disposed (`dispose_orm` closing the async engine
synchronously). It appears on all three async drivers, is pre-existing engine
disposal behavior unrelated to this change, and every run exits `0`. Tidying
async-engine disposal is out of scope for this PR.
Engine-default hardening for asyncpg/PgBouncer will be tracked in a
dedicated issue to be opened soon.
---
##### Was generative AI tooling used to co-author this PR?
<!--
If generative AI tooling has been used in the process of authoring this PR,
please
change below checkbox to `[X]` followed by the name of the tool, uncomment
the "Generated-by".
-->
- [x] Yes (please specify the tool below)
Generated-by: Claude Opus 4.8 following [the
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
---
* Read the **[Pull Request
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
for more information. Note: commit author/co-author name and email in commits
become permanently public when merged.
* For fundamental code changes, an Airflow Improvement Proposal
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
is needed.
* When adding dependency, check compliance with the [ASF 3rd Party License
Policy](https://www.apache.org/legal/resolved.html#category-x).
* For significant user-facing changes create newsfragment:
`{pr_number}.significant.rst`, in
[airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments).
You can add this file in a follow-up commit after the PR is created so you
know the PR number.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]