This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new 5f83aed0071 Fix Python 3.12+ fork warning in async connection tests
(#56019)
5f83aed0071 is described below
commit 5f83aed0071587e048353bd05075ac94f3a064ee
Author: Kaxil Naik <[email protected]>
AuthorDate: Wed Sep 24 08:22:47 2025 +0100
Fix Python 3.12+ fork warning in async connection tests (#56019)
(cherry picked from commit 8e6f03eb377dbdf96978c32ffb88e334dbcfb1cf)
---
task-sdk/tests/conftest.py | 42 ++++++++++++++++++++++++++++++++++++++++++
1 file changed, 42 insertions(+)
diff --git a/task-sdk/tests/conftest.py b/task-sdk/tests/conftest.py
index 0fa1cf6d505..b924bc4384c 100644
--- a/task-sdk/tests/conftest.py
+++ b/task-sdk/tests/conftest.py
@@ -175,6 +175,48 @@ def _disable_ol_plugin():
airflow.plugins_manager.plugins = None
[email protected](autouse=True)
+def _cleanup_async_resources(request):
+ """
+ Clean up async resources that can cause Python 3.12 fork warnings.
+
+ Problem: asgiref.sync.sync_to_async (used in _async_get_connection) creates
+ ThreadPoolExecutors that persist between tests. When supervisor.py calls
+ os.fork() in subsequent tests, Python 3.12+ warns about forking a
+ multi-threaded process.
+
+ Solution: Clean up asgiref's ThreadPoolExecutors after async tests to
ensure
+ subsequent tests start with a clean thread environment.
+ """
+ yield
+
+ # Only clean up after async tests to avoid unnecessary overhead
+ if "asyncio" in request.keywords:
+ # Clean up asgiref ThreadPoolExecutors that persist between tests
+ # These are created by sync_to_async() calls in async connection
retrieval
+ try:
+ from asgiref.sync import SyncToAsync
+
+ # SyncToAsync maintains a class-level executor for performance
+ # We need to shut it down to prevent multi-threading warnings on
fork()
+ if hasattr(SyncToAsync, "single_thread_executor") and
SyncToAsync.single_thread_executor:
+ if not SyncToAsync.single_thread_executor._shutdown:
+ SyncToAsync.single_thread_executor.shutdown(wait=True)
+ SyncToAsync.single_thread_executor = None
+
+ # SyncToAsync also maintains a WeakKeyDictionary of
context-specific executors
+ # Clean these up too to ensure complete thread cleanup
+ if hasattr(SyncToAsync, "context_to_thread_executor"):
+ for executor in
list(SyncToAsync.context_to_thread_executor.values()):
+ if hasattr(executor, "shutdown") and not getattr(executor,
"_shutdown", True):
+ executor.shutdown(wait=True)
+ SyncToAsync.context_to_thread_executor.clear()
+
+ except (ImportError, AttributeError):
+ # If asgiref structure changes, fail gracefully
+ pass
+
+
class MakeTIContextCallable(Protocol):
def __call__(
self,