This is an automated email from the ASF dual-hosted git repository.

o-nikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ab4c0978a76 Fix in-process Execution API loop stopped while transport 
still in use (#68865)
ab4c0978a76 is described below

commit ab4c0978a76e4f44148fccd3a4f4fa28404ca317
Author: Sean Ghaeli <[email protected]>
AuthorDate: Mon Jun 22 18:33:22 2026 -0700

    Fix in-process Execution API loop stopped while transport still in use 
(#68865)
    
    * Fix in-process Execution API loop stopped while transport still in use
    
    #68840 moved the InProcessExecutionAPI background event-loop + thread
    cleanup into a weakref.finalize keyed on the InProcessExecutionAPI
    instance. But callers build a sync Client from
    InProcessExecutionAPI().transport and discard the factory object, so the
    instance is garbage-collected while the transport is still in use. The
    finalizer then stops the loop, and every subsequent request hangs on the
    dead loop -- surfacing as Timeout failures across the Dag-processor and
    triggerer in-process API tests on main (Error while closing in-process
    execution API lifespan -> TimeoutError).
    
    Key the finalizer on the returned WSGI transport instead of the factory
    instance, so loop/thread/lifespan teardown happens when the transport
    (which the Client holds) is collected, not when the throwaway factory is.
    
    Verified locally: test_processor.py::test_top_level_variable_set and
    test_top_level_variable_access_not_found now pass (previously hung to
    timeout).
    
    * Test transport-tied lifecycle for in-process Execution API
    
    Rewrite test_in_process_execution_api_teardown as
    test_in_process_execution_api_transport_lifecycle: assert that dropping
    the factory instance leaves the loop running while the transport is held,
    and only dropping the transport stops the loop + joins the daemon thread.
    This fails against finalizing on the instance (the regression) and passes
    with the finalizer keyed on the transport.
    
    Verified locally: 1 passed in 11.58s.
    
    ---------
    
    Co-authored-by: Sean Ghaeli <[email protected]>
---
 .../src/airflow/api_fastapi/execution_api/app.py      | 18 ++++++++++++------
 .../tests/unit/api_fastapi/execution_api/test_app.py  | 19 +++++++++++++------
 2 files changed, 25 insertions(+), 12 deletions(-)

diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/app.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/app.py
index 12bbb0921a2..449019db799 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/app.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/app.py
@@ -350,9 +350,9 @@ def get_extra_schemas() -> dict[str, dict]:
     }
 
 
-# Note: _shutdown_loop is used as a finalizer for 
:class:`InProcessExecutionAPI`. As such, its arguments must
-# not directly or indirectly reference the instance itself, as this will 
prevent the instance from being
-# garbage collected.
+# Note: _shutdown_loop is used as a finalizer for the WSGI transport returned 
by
+# ``InProcessExecutionAPI.transport``. As such, its arguments must not 
directly or indirectly reference that
+# transport, as this would prevent the transport from being garbage collected.
 def _shutdown_loop(
     loop: asyncio.AbstractEventLoop,
     thread: threading.Thread,
@@ -432,10 +432,16 @@ class InProcessExecutionAPI:
         # safely aclose() a context whose __aenter__ has actually run.
         asyncio.run_coroutine_threadsafe(start_lifespan(cm, self.app), 
loop).result()
 
-        # Stop the loop + thread and unwind the lifespan when this instance is 
garbage collected.
-        weakref.finalize(self, _shutdown_loop, loop, thread, cm)
+        transport = httpx.WSGITransport(app=middleware)  # type: 
ignore[arg-type]
 
-        return httpx.WSGITransport(app=middleware)  # type: ignore[arg-type]
+        # Stop the loop + thread and unwind the lifespan when the *transport* 
is garbage collected, not
+        # this InProcessExecutionAPI instance. Callers commonly build a Client 
from ``.transport`` and drop
+        # the factory object (e.g. 
``Client(transport=InProcessExecutionAPI().transport)``); finalizing on
+        # ``self`` would stop the loop while the transport is still in use, so 
every later request would
+        # hang on the now-dead loop.
+        weakref.finalize(transport, _shutdown_loop, loop, thread, cm)
+
+        return transport
 
     @cached_property
     def atransport(self) -> httpx.ASGITransport:
diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/test_app.py 
b/airflow-core/tests/unit/api_fastapi/execution_api/test_app.py
index 75e25bf0eba..32880773ade 100644
--- a/airflow-core/tests/unit/api_fastapi/execution_api/test_app.py
+++ b/airflow-core/tests/unit/api_fastapi/execution_api/test_app.py
@@ -137,25 +137,32 @@ def 
test_routes_with_task_instance_id_param_enforce_ti_self(client):
     )
 
 
-def test_in_process_execution_api_teardown():
-    """Accessing .transport spins up a daemon thread; dropping the instance 
must stop it via finalize.
+def test_in_process_execution_api_transport_lifecycle():
+    """The background loop + thread lifecycle is tied to the ``.transport``, 
not the factory instance.
 
-    Regression coverage for the a2wsgi background-thread leak.
+    Callers build a sync ``Client`` from ``InProcessExecutionAPI().transport`` 
and drop the factory
+    object. Dropping the instance must NOT stop the loop while the transport 
is still held -- doing so
+    left every later request hanging on a stopped loop. Dropping the transport 
must stop the loop and
+    join the daemon thread (the a2wsgi background-thread leak guard).
     """
     before = {t for t in threading.enumerate() if t.name == 
"InProcessExecutionAPI-loop"}
 
     api = InProcessExecutionAPI()
-    _ = api.transport  # trigger loop + thread creation
+    transport = api.transport  # triggers loop + thread creation; the 
transport is what callers keep
 
     new_threads = {t for t in threading.enumerate() if t.name == 
"InProcessExecutionAPI-loop"} - before
     assert len(new_threads) == 1
     thread = new_threads.pop()
     assert thread.is_alive()
 
-    # Drop the only strong reference; the weakref.finalize registered in 
.transport must stop
-    # the loop and join the daemon thread once the instance is collected.
+    # Dropping only the factory instance must leave the loop running for the 
still-live transport.
     del api
     gc.collect()
+    assert thread.is_alive()
+
+    # Dropping the transport runs the weakref.finalize: loop stopped, daemon 
thread joined (no leak).
+    del transport
+    gc.collect()
     thread.join(timeout=5)
     assert not thread.is_alive()
 

Reply via email to