This is an automated email from the ASF dual-hosted git repository.
o-nikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ab4c0978a76 Fix in-process Execution API loop stopped while transport
still in use (#68865)
ab4c0978a76 is described below
commit ab4c0978a76e4f44148fccd3a4f4fa28404ca317
Author: Sean Ghaeli <[email protected]>
AuthorDate: Mon Jun 22 18:33:22 2026 -0700
Fix in-process Execution API loop stopped while transport still in use
(#68865)
* Fix in-process Execution API loop stopped while transport still in use
#68840 moved the InProcessExecutionAPI background event-loop + thread
cleanup into a weakref.finalize keyed on the InProcessExecutionAPI
instance. But callers build a sync Client from
InProcessExecutionAPI().transport and discard the factory object, so the
instance is garbage-collected while the transport is still in use. The
finalizer then stops the loop, and every subsequent request hangs on the
dead loop -- surfacing as Timeout failures across the Dag-processor and
triggerer in-process API tests on main (Error while closing in-process
execution API lifespan -> TimeoutError).
Key the finalizer on the returned WSGI transport instead of the factory
instance, so loop/thread/lifespan teardown happens when the transport
(which the Client holds) is collected, not when the throwaway factory is.
Verified locally: test_processor.py::test_top_level_variable_set and
test_top_level_variable_access_not_found now pass (previously hung to
timeout).
* Test transport-tied lifecycle for in-process Execution API
Rewrite test_in_process_execution_api_teardown as
test_in_process_execution_api_transport_lifecycle: assert that dropping
the factory instance leaves the loop running while the transport is held,
and only dropping the transport stops the loop + joins the daemon thread.
This fails against finalizing on the instance (the regression) and passes
with the finalizer keyed on the transport.
Verified locally: 1 passed in 11.58s.
---------
Co-authored-by: Sean Ghaeli <[email protected]>
---
.../src/airflow/api_fastapi/execution_api/app.py | 18 ++++++++++++------
.../tests/unit/api_fastapi/execution_api/test_app.py | 19 +++++++++++++------
2 files changed, 25 insertions(+), 12 deletions(-)
diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/app.py
b/airflow-core/src/airflow/api_fastapi/execution_api/app.py
index 12bbb0921a2..449019db799 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/app.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/app.py
@@ -350,9 +350,9 @@ def get_extra_schemas() -> dict[str, dict]:
}
-# Note: _shutdown_loop is used as a finalizer for
:class:`InProcessExecutionAPI`. As such, its arguments must
-# not directly or indirectly reference the instance itself, as this will
prevent the instance from being
-# garbage collected.
+# Note: _shutdown_loop is used as a finalizer for the WSGI transport returned
by
+# ``InProcessExecutionAPI.transport``. As such, its arguments must not
directly or indirectly reference that
+# transport, as this would prevent the transport from being garbage collected.
def _shutdown_loop(
loop: asyncio.AbstractEventLoop,
thread: threading.Thread,
@@ -432,10 +432,16 @@ class InProcessExecutionAPI:
# safely aclose() a context whose __aenter__ has actually run.
asyncio.run_coroutine_threadsafe(start_lifespan(cm, self.app),
loop).result()
- # Stop the loop + thread and unwind the lifespan when this instance is
garbage collected.
- weakref.finalize(self, _shutdown_loop, loop, thread, cm)
+ transport = httpx.WSGITransport(app=middleware) # type:
ignore[arg-type]
- return httpx.WSGITransport(app=middleware) # type: ignore[arg-type]
+ # Stop the loop + thread and unwind the lifespan when the *transport*
is garbage collected, not
+ # this InProcessExecutionAPI instance. Callers commonly build a Client
from ``.transport`` and drop
+ # the factory object (e.g.
``Client(transport=InProcessExecutionAPI().transport)``); finalizing on
+ # ``self`` would stop the loop while the transport is still in use, so
every later request would
+ # hang on the now-dead loop.
+ weakref.finalize(transport, _shutdown_loop, loop, thread, cm)
+
+ return transport
@cached_property
def atransport(self) -> httpx.ASGITransport:
diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/test_app.py
b/airflow-core/tests/unit/api_fastapi/execution_api/test_app.py
index 75e25bf0eba..32880773ade 100644
--- a/airflow-core/tests/unit/api_fastapi/execution_api/test_app.py
+++ b/airflow-core/tests/unit/api_fastapi/execution_api/test_app.py
@@ -137,25 +137,32 @@ def
test_routes_with_task_instance_id_param_enforce_ti_self(client):
)
-def test_in_process_execution_api_teardown():
- """Accessing .transport spins up a daemon thread; dropping the instance
must stop it via finalize.
+def test_in_process_execution_api_transport_lifecycle():
+ """The background loop + thread lifecycle is tied to the ``.transport``,
not the factory instance.
- Regression coverage for the a2wsgi background-thread leak.
+ Callers build a sync ``Client`` from ``InProcessExecutionAPI().transport``
and drop the factory
+ object. Dropping the instance must NOT stop the loop while the transport
is still held -- doing so
+ left every later request hanging on a stopped loop. Dropping the transport
must stop the loop and
+ join the daemon thread (the a2wsgi background-thread leak guard).
"""
before = {t for t in threading.enumerate() if t.name ==
"InProcessExecutionAPI-loop"}
api = InProcessExecutionAPI()
- _ = api.transport # trigger loop + thread creation
+ transport = api.transport # triggers loop + thread creation; the
transport is what callers keep
new_threads = {t for t in threading.enumerate() if t.name ==
"InProcessExecutionAPI-loop"} - before
assert len(new_threads) == 1
thread = new_threads.pop()
assert thread.is_alive()
- # Drop the only strong reference; the weakref.finalize registered in
.transport must stop
- # the loop and join the daemon thread once the instance is collected.
+ # Dropping only the factory instance must leave the loop running for the
still-live transport.
del api
gc.collect()
+ assert thread.is_alive()
+
+ # Dropping the transport runs the weakref.finalize: loop stopped, daemon
thread joined (no leak).
+ del transport
+ gc.collect()
thread.join(timeout=5)
assert not thread.is_alive()