This is an automated email from the ASF dual-hosted git repository.

dheerajturaga pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1e406495ec5 Fix spurious "Failed to detach context" error on Execution 
API disconnects (#68039)
1e406495ec5 is described below

commit 1e406495ec5fdbeeb59bd29edea5850459529e12
Author: Dheeraj Turaga <[email protected]>
AuthorDate: Mon Jun 8 22:39:35 2026 -0500

    Fix spurious "Failed to detach context" error on Execution API disconnects 
(#68039)
---
 .../src/airflow/api_fastapi/execution_api/app.py   |  5 +-
 .../unit/api_fastapi/execution_api/test_app.py     | 80 +++++++++++++++++-----
 2 files changed, 66 insertions(+), 19 deletions(-)

diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/app.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/app.py
index f10c772e03f..88c7d23fff2 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/app.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/app.py
@@ -17,7 +17,7 @@
 
 from __future__ import annotations
 
-import contextlib
+import asyncio
 import json
 import time
 from contextlib import AsyncExitStack
@@ -250,11 +250,12 @@ async def _extract_w3c_trace_context(
         yield
         return
     ctx = otel_propagate.extract(request.headers)
+    attached_in = asyncio.current_task()
     token = otel_context.attach(ctx)
     try:
         yield
     finally:
-        with contextlib.suppress(Exception):
+        if asyncio.current_task() is attached_in:
             otel_context.detach(token)
 
 
diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/test_app.py 
b/airflow-core/tests/unit/api_fastapi/execution_api/test_app.py
index 0a71e655255..566dea919a6 100644
--- a/airflow-core/tests/unit/api_fastapi/execution_api/test_app.py
+++ b/airflow-core/tests/unit/api_fastapi/execution_api/test_app.py
@@ -16,6 +16,7 @@
 # under the License.
 from __future__ import annotations
 
+import asyncio
 from unittest import mock
 from uuid import UUID
 
@@ -26,7 +27,10 @@ from fastapi.routing import APIRoute
 from fastapi.testclient import TestClient
 from opentelemetry import context as otel_context, propagate as otel_propagate
 
-from airflow.api_fastapi.execution_api.app import create_task_execution_api_app
+from airflow.api_fastapi.execution_api.app import (
+    _extract_w3c_trace_context,
+    create_task_execution_api_app,
+)
 from airflow.api_fastapi.execution_api.datamodels.taskinstance import 
TaskInstance
 from airflow.api_fastapi.execution_api.datamodels.token import TIClaims, 
TIToken
 from airflow.api_fastapi.execution_api.security import require_auth
@@ -225,22 +229,64 @@ class TestTraceContextPropagation:
         assert extract_spy.called
         assert response.status_code == 500
 
-    def test_route_exception_not_masked_by_detach_error(self):
-        """A detach failure during cleanup must not replace the original route 
exception."""
-        app = self._build_app("unsafe-always")
+    @staticmethod
+    def _make_request() -> Request:
+        return Request({"type": "http", "headers": []})
+
+    @pytest.mark.asyncio
+    async def test_detach_runs_on_same_task_aclose(self):
+        """Same-task aclose (e.g. normal shutdown) must still call detach.
+
+        The task-guard allows detach because the finalizer runs in the same 
asyncio
+        Task that called attach, so the contextvars.Context matches and 
reset() succeeds.
+        A real client-disconnect force-close runs in a *different* task; that 
case is
+        covered by test_detach_skipped_on_cross_task_aclose.
+        """
+        with mock.patch.object(otel_context, "detach") as detach_spy:
+            gen = _extract_w3c_trace_context(self._make_request(), 
dependency_solver="fastapi")
+            await gen.asend(None)  # run up to and including the yield (after 
attach)
+            await gen.aclose()  # raises GeneratorExit at the yield, same task 
→ detach runs
+
+        detach_spy.assert_called_once()
+
+    @pytest.mark.asyncio
+    async def test_detach_skipped_on_cross_task_aclose(self):
+        """Cross-task force-close (real client disconnect) must NOT call 
detach.
+
+        When the finalizer runs in a different asyncio Task than attach did, 
the
+        contextvars.Context differs and reset() would raise "Token was created 
in a
+        different Context" — OTel logs that at ERROR before any suppression 
here
+        could see it. The task guard detects the mismatch and skips detach.
+        """
+        gen = _extract_w3c_trace_context(self._make_request(), 
dependency_solver="fastapi")
+        await gen.asend(None)  # run up to the yield in the current (test) task
+
+        with mock.patch.object(otel_context, "detach") as detach_spy:
+            # Close from a different asyncio task, simulating a cross-task 
force-close.
+            await asyncio.create_task(gen.aclose())
+
+        detach_spy.assert_not_called()
+
+    @pytest.mark.asyncio
+    async def test_detach_runs_on_route_exception(self):
+        """A route handler error (athrow at the yield) runs in the same task, 
so detach
+        must run -- otherwise the upstream trace context leaks into the error 
handling.
+        """
+        with mock.patch.object(otel_context, "detach") as detach_spy:
+            gen = _extract_w3c_trace_context(self._make_request(), 
dependency_solver="fastapi")
+            await gen.asend(None)  # run up to the yield
+            with pytest.raises(RuntimeError, match="boom"):
+                await gen.athrow(RuntimeError("boom"))  # FastAPI's 
route-error finalization
 
-        async def mock_require_auth(request: Request) -> TIToken:
-            ti_id = UUID(request.path_params.get("task_instance_id", 
"00000000-0000-0000-0000-000000000000"))
-            return TIToken(id=ti_id, claims=TIClaims(scope="execution"))
+        detach_spy.assert_called_once()
 
-        app.dependency_overrides[require_auth] = mock_require_auth
+    @pytest.mark.asyncio
+    async def test_detach_runs_on_normal_completion(self):
+        """Normal completion still detaches the token in the request's own 
Context."""
+        with mock.patch.object(otel_context, "detach") as detach_spy:
+            gen = _extract_w3c_trace_context(self._make_request(), 
dependency_solver="fastapi")
+            await gen.asend(None)  # run up to the yield
+            with pytest.raises(StopAsyncIteration):
+                await gen.asend(None)  # resume past the yield -> else branch 
-> detach
 
-        real_extract = otel_propagate.extract
-        with (
-            mock.patch.object(otel_propagate, "extract", wraps=real_extract),
-            mock.patch.object(otel_context, "detach", 
side_effect=ValueError("token from another context")),
-            mock.patch("airflow.models.variable.Variable.get", 
side_effect=RuntimeError("boom")),
-            TestClient(app) as test_client,
-        ):
-            with pytest.raises(RuntimeError, match="boom"):
-                test_client.get("/variables/k", headers={"Authorization": 
"Bearer fake"})
+        detach_spy.assert_called_once()

Reply via email to