This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit d67d47d24fca26fc2df1bd2feace0b965b49225b Author: Kaxil Naik <[email protected]> AuthorDate: Tue Sep 23 03:16:48 2025 +0100 Fix async connection retrieval in triggerer context (#55812) Add async support for secrets backends in _async_get_connection using sync_to_async to prevent event loop blocking. This resolves the 'async_to_sync forbidden in event loop' error when triggers call get_connection() and secrets backends need to be checked. Also fix Databricks provider to use async connection methods in async contexts instead of cached properties that cause sync calls. Fixes triggerer failures when providers access connections during async trigger execution. (cherry picked from commit 2f76b41778c100d1f28e6e2fd2974043cb253f6a) --- task-sdk/pyproject.toml | 1 + task-sdk/src/airflow/sdk/execution_time/context.py | 24 +++++++++++++-- .../tests/task_sdk/execution_time/test_context.py | 35 ++++++++++++++++++++++ 3 files changed, 58 insertions(+), 2 deletions(-) diff --git a/task-sdk/pyproject.toml b/task-sdk/pyproject.toml index cd0bccb6600..e097179876a 100644 --- a/task-sdk/pyproject.toml +++ b/task-sdk/pyproject.toml @@ -50,6 +50,7 @@ classifiers = [ ] dependencies = [ "apache-airflow-core<3.2.0,>=3.1.0", + "asgiref>=2.3.0", "attrs>=24.2.0, !=25.2.0", "fsspec>=2023.10.0", "httpx>=0.27.0", diff --git a/task-sdk/src/airflow/sdk/execution_time/context.py b/task-sdk/src/airflow/sdk/execution_time/context.py index 570cd25d9a3..2107cd853ab 100644 --- a/task-sdk/src/airflow/sdk/execution_time/context.py +++ b/task-sdk/src/airflow/sdk/execution_time/context.py @@ -172,13 +172,33 @@ def _get_connection(conn_id: str) -> Connection: async def _async_get_connection(conn_id: str) -> Connection: - # TODO: add async support for secrets backends + from asgiref.sync import sync_to_async from airflow.sdk.execution_time.comms import GetConnection + from airflow.sdk.execution_time.supervisor import ensure_secrets_backend_loaded from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS - msg = await SUPERVISOR_COMMS.asend(GetConnection(conn_id=conn_id)) + # TODO: check cache first + # enabled only if SecretCache.init() has been called first + # Try secrets backends first using async wrapper + backends = ensure_secrets_backend_loaded() + for secrets_backend in backends: + try: + conn = await sync_to_async(secrets_backend.get_connection)(conn_id) + if conn: + # TODO: this should probably be in get conn + if conn.password: + mask_secret(conn.password) + if conn.extra: + mask_secret(conn.extra) + return conn + except Exception: + # If one backend fails, try the next one + continue + + # If no secrets backend has the connection, fall back to API server + msg = await SUPERVISOR_COMMS.asend(GetConnection(conn_id=conn_id)) return _process_connection_result_conn(msg) diff --git a/task-sdk/tests/task_sdk/execution_time/test_context.py b/task-sdk/tests/task_sdk/execution_time/test_context.py index 54e2c66bee8..39aecb9d955 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_context.py +++ b/task-sdk/tests/task_sdk/execution_time/test_context.py @@ -58,6 +58,7 @@ from airflow.sdk.execution_time.context import ( TriggeringAssetEventsAccessor, VariableAccessor, _AssetRefResolutionMixin, + _async_get_connection, _convert_variable_result_to_variable, _process_connection_result_conn, context_to_airflow_vars, @@ -730,3 +731,37 @@ class TestInletEventAccessor: map_index=0, ), ) + + +class TestAsyncGetConnection: + """Test async connection retrieval with secrets backends.""" + + @pytest.mark.asyncio + async def test_async_get_connection_from_secrets_backend(self, mock_supervisor_comms): + """Test that _async_get_connection successfully retrieves from secrets backend using sync_to_async.""" + sample_connection = Connection( + conn_id="test_conn", conn_type="postgres", host="localhost", port=5432, login="user" + ) + + class MockSecretsBackend: + """Simple mock secrets backend for testing.""" + + def __init__(self, connections: dict[str, Connection | None] | None = None): + self.connections = connections or {} + + def get_connection(self, conn_id: str) -> Connection | None: + return self.connections.get(conn_id) + + backend = MockSecretsBackend({"test_conn": sample_connection}) + + with patch( + "airflow.sdk.execution_time.supervisor.ensure_secrets_backend_loaded", autospec=True + ) as mock_load: + mock_load.return_value = [backend] + + result = await _async_get_connection("test_conn") + + assert result == sample_connection + # Should not have tried SUPERVISOR_COMMS since secrets backend had the connection + mock_supervisor_comms.send.assert_not_called() + mock_supervisor_comms.asend.assert_not_called()
