This is an automated email from the ASF dual-hosted git repository.
dheerajturaga pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1e406495ec5 Fix spurious "Failed to detach context" error on Execution
API disconnects (#68039)
1e406495ec5 is described below
commit 1e406495ec5fdbeeb59bd29edea5850459529e12
Author: Dheeraj Turaga <[email protected]>
AuthorDate: Mon Jun 8 22:39:35 2026 -0500
Fix spurious "Failed to detach context" error on Execution API disconnects
(#68039)
---
.../src/airflow/api_fastapi/execution_api/app.py | 5 +-
.../unit/api_fastapi/execution_api/test_app.py | 80 +++++++++++++++++-----
2 files changed, 66 insertions(+), 19 deletions(-)
diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/app.py
b/airflow-core/src/airflow/api_fastapi/execution_api/app.py
index f10c772e03f..88c7d23fff2 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/app.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/app.py
@@ -17,7 +17,7 @@
from __future__ import annotations
-import contextlib
+import asyncio
import json
import time
from contextlib import AsyncExitStack
@@ -250,11 +250,12 @@ async def _extract_w3c_trace_context(
yield
return
ctx = otel_propagate.extract(request.headers)
+ attached_in = asyncio.current_task()
token = otel_context.attach(ctx)
try:
yield
finally:
- with contextlib.suppress(Exception):
+ if asyncio.current_task() is attached_in:
otel_context.detach(token)
diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/test_app.py
b/airflow-core/tests/unit/api_fastapi/execution_api/test_app.py
index 0a71e655255..566dea919a6 100644
--- a/airflow-core/tests/unit/api_fastapi/execution_api/test_app.py
+++ b/airflow-core/tests/unit/api_fastapi/execution_api/test_app.py
@@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations
+import asyncio
from unittest import mock
from uuid import UUID
@@ -26,7 +27,10 @@ from fastapi.routing import APIRoute
from fastapi.testclient import TestClient
from opentelemetry import context as otel_context, propagate as otel_propagate
-from airflow.api_fastapi.execution_api.app import create_task_execution_api_app
+from airflow.api_fastapi.execution_api.app import (
+ _extract_w3c_trace_context,
+ create_task_execution_api_app,
+)
from airflow.api_fastapi.execution_api.datamodels.taskinstance import
TaskInstance
from airflow.api_fastapi.execution_api.datamodels.token import TIClaims,
TIToken
from airflow.api_fastapi.execution_api.security import require_auth
@@ -225,22 +229,64 @@ class TestTraceContextPropagation:
assert extract_spy.called
assert response.status_code == 500
- def test_route_exception_not_masked_by_detach_error(self):
- """A detach failure during cleanup must not replace the original route
exception."""
- app = self._build_app("unsafe-always")
+ @staticmethod
+ def _make_request() -> Request:
+ return Request({"type": "http", "headers": []})
+
+ @pytest.mark.asyncio
+ async def test_detach_runs_on_same_task_aclose(self):
+ """Same-task aclose (e.g. normal shutdown) must still call detach.
+
+ The task-guard allows detach because the finalizer runs in the same
asyncio
+ Task that called attach, so the contextvars.Context matches and
reset() succeeds.
+ A real client-disconnect force-close runs in a *different* task; that
case is
+ covered by test_detach_skipped_on_cross_task_aclose.
+ """
+ with mock.patch.object(otel_context, "detach") as detach_spy:
+ gen = _extract_w3c_trace_context(self._make_request(),
dependency_solver="fastapi")
+ await gen.asend(None) # run up to and including the yield (after
attach)
+ await gen.aclose() # raises GeneratorExit at the yield, same task
→ detach runs
+
+ detach_spy.assert_called_once()
+
+ @pytest.mark.asyncio
+ async def test_detach_skipped_on_cross_task_aclose(self):
+ """Cross-task force-close (real client disconnect) must NOT call
detach.
+
+ When the finalizer runs in a different asyncio Task than attach did,
the
+ contextvars.Context differs and reset() would raise "Token was created
in a
+ different Context" — OTel logs that at ERROR before any suppression
here
+ could see it. The task guard detects the mismatch and skips detach.
+ """
+ gen = _extract_w3c_trace_context(self._make_request(),
dependency_solver="fastapi")
+ await gen.asend(None) # run up to the yield in the current (test) task
+
+ with mock.patch.object(otel_context, "detach") as detach_spy:
+ # Close from a different asyncio task, simulating a cross-task
force-close.
+ await asyncio.create_task(gen.aclose())
+
+ detach_spy.assert_not_called()
+
+ @pytest.mark.asyncio
+ async def test_detach_runs_on_route_exception(self):
+ """A route handler error (athrow at the yield) runs in the same task,
so detach
+ must run -- otherwise the upstream trace context leaks into the error
handling.
+ """
+ with mock.patch.object(otel_context, "detach") as detach_spy:
+ gen = _extract_w3c_trace_context(self._make_request(),
dependency_solver="fastapi")
+ await gen.asend(None) # run up to the yield
+ with pytest.raises(RuntimeError, match="boom"):
+ await gen.athrow(RuntimeError("boom")) # FastAPI's
route-error finalization
- async def mock_require_auth(request: Request) -> TIToken:
- ti_id = UUID(request.path_params.get("task_instance_id",
"00000000-0000-0000-0000-000000000000"))
- return TIToken(id=ti_id, claims=TIClaims(scope="execution"))
+ detach_spy.assert_called_once()
- app.dependency_overrides[require_auth] = mock_require_auth
+ @pytest.mark.asyncio
+ async def test_detach_runs_on_normal_completion(self):
+ """Normal completion still detaches the token in the request's own
Context."""
+ with mock.patch.object(otel_context, "detach") as detach_spy:
+ gen = _extract_w3c_trace_context(self._make_request(),
dependency_solver="fastapi")
+ await gen.asend(None) # run up to the yield
+ with pytest.raises(StopAsyncIteration):
+ await gen.asend(None) # resume past the yield -> else branch
-> detach
- real_extract = otel_propagate.extract
- with (
- mock.patch.object(otel_propagate, "extract", wraps=real_extract),
- mock.patch.object(otel_context, "detach",
side_effect=ValueError("token from another context")),
- mock.patch("airflow.models.variable.Variable.get",
side_effect=RuntimeError("boom")),
- TestClient(app) as test_client,
- ):
- with pytest.raises(RuntimeError, match="boom"):
- test_client.get("/variables/k", headers={"Authorization":
"Bearer fake"})
+ detach_spy.assert_called_once()