This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f5b1eb437f1 Handle trigger calls to get_connection (#55799)
f5b1eb437f1 is described below

commit f5b1eb437f11bb2dedc2273d83894f8c868982c3
Author: Daniel Standish <[email protected]>
AuthorDate: Thu Sep 18 12:00:51 2025 -0700

    Handle trigger calls to get_connection (#55799)
    
    Some [incorrectly-written] triggers may call BaseHook.get_connection 
without wrapping with sync_to_async.
    
    This is naughty behavior because it will block the event loop.  But in 2.x 
it would not cause an error.
    
    In 3.0, however, this results in an error.  It fails in 3.0 because the 
code to hit the API wraps the get_connection call with async_to_sync, which is 
forbidden in the asyncio event loop.
    
    What we do here is try to detect when this happens and when it does, we run 
it through greenback portal which makes it not fail.
---
 airflow-core/pyproject.toml                           |  1 +
 airflow-core/src/airflow/jobs/triggerer_job_runner.py |  5 +++++
 airflow-core/tests/unit/jobs/test_triggerer_job.py    | 11 +++++------
 task-sdk/src/airflow/sdk/definitions/connection.py    | 19 +++++++++++++++++++
 4 files changed, 30 insertions(+), 6 deletions(-)

diff --git a/airflow-core/pyproject.toml b/airflow-core/pyproject.toml
index 3a4ef49e686..01b22d22cf9 100644
--- a/airflow-core/pyproject.toml
+++ b/airflow-core/pyproject.toml
@@ -158,6 +158,7 @@ dependencies = [
     "eventlet>=0.37.0",
     "gevent>=25.4.1",
     "greenlet>=3.1.0",
+    "greenback>=1.2.1",
 ]
 "graphviz" = [
     # The graphviz package creates friction when installing on MacOS as it 
needs graphviz system package to
diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py 
b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
index 06f12ce3191..f5bc2d94d06 100644
--- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
+++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
@@ -1093,6 +1093,11 @@ class TriggerRunner:
 
     async def run_trigger(self, trigger_id, trigger):
         """Run a trigger (they are async generators) and push their events 
into our outbound event deque."""
+        if not os.environ.get("AIRFLOW_DISABLE_GREENBACK_PORTAL", "").lower() 
== "true":
+            import greenback
+
+            await greenback.ensure_portal()
+
         bind_log_contextvars(trigger_id=trigger_id)
 
         name = self.triggers[trigger_id]["name"]
diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py 
b/airflow-core/tests/unit/jobs/test_triggerer_job.py
index bd390c82cbb..a12969ce79e 100644
--- a/airflow-core/tests/unit/jobs/test_triggerer_job.py
+++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py
@@ -302,8 +302,7 @@ def test_trigger_log(mock_monotonic, trigger, 
watcher_count, trigger_count, sess
 
 
 class TestTriggerRunner:
-    @pytest.mark.asyncio
-    async def test_run_inline_trigger_canceled(self, session) -> None:
+    def test_run_inline_trigger_canceled(self, session) -> None:
         trigger_runner = TriggerRunner()
         trigger_runner.triggers = {
             1: {"task": MagicMock(spec=asyncio.Task), "name": "mock_name", 
"events": 0}
@@ -313,10 +312,10 @@ class TestTriggerRunner:
         mock_trigger.run.side_effect = asyncio.CancelledError()
 
         with pytest.raises(asyncio.CancelledError):
-            await trigger_runner.run_trigger(1, mock_trigger)
+            asyncio.run(trigger_runner.run_trigger(1, mock_trigger))
 
-    @pytest.mark.asyncio
-    async def test_run_inline_trigger_timeout(self, session, cap_structlog) -> 
None:
+    # @pytest.mark.asyncio
+    def test_run_inline_trigger_timeout(self, session, cap_structlog) -> None:
         trigger_runner = TriggerRunner()
         trigger_runner.triggers = {
             1: {"task": MagicMock(spec=asyncio.Task), "name": "mock_name", 
"events": 0}
@@ -326,7 +325,7 @@ class TestTriggerRunner:
         mock_trigger.run.side_effect = asyncio.CancelledError()
 
         with pytest.raises(asyncio.CancelledError):
-            await trigger_runner.run_trigger(1, mock_trigger)
+            asyncio.run(trigger_runner.run_trigger(1, mock_trigger))
         assert {"event": "Trigger cancelled due to timeout", "log_level": 
"error"} in cap_structlog
 
     @patch("airflow.jobs.triggerer_job_runner.Trigger._decrypt_kwargs")
diff --git a/task-sdk/src/airflow/sdk/definitions/connection.py 
b/task-sdk/src/airflow/sdk/definitions/connection.py
index cc2e92a41aa..b0e1372f52f 100644
--- a/task-sdk/src/airflow/sdk/definitions/connection.py
+++ b/task-sdk/src/airflow/sdk/definitions/connection.py
@@ -17,6 +17,7 @@
 # under the License.
 from __future__ import annotations
 
+import asyncio
 import json
 import logging
 from json import JSONDecodeError
@@ -203,6 +204,24 @@ class Connection:
             return _get_connection(conn_id)
         except AirflowRuntimeError as e:
             cls._handle_connection_error(e, conn_id)
+        except RuntimeError as e:
+            # The error from async_to_sync is a RuntimeError, so we have to 
fall back to text matching
+            if str(e).startswith("You cannot use AsyncToSync in the same 
thread as an async event loop"):
+                import greenback
+
+                task = asyncio.current_task()
+                if greenback.has_portal(task):
+                    import warnings
+
+                    warnings.warn(
+                        "You should not use sync calls here -- use `await 
Conn.async_get` instead",
+                        stacklevel=2,
+                    )
+
+                    return greenback.await_(cls.async_get(conn_id))
+
+            log.exception("async_to_sync failed")
+            raise
 
     @classmethod
     async def async_get(cls, conn_id: str) -> Any:

Reply via email to