This is an automated email from the ASF dual-hosted git repository.

ashb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f481758f04c Propagate task OTel trace context through IPC and into 
execution API requests (#66151)
f481758f04c is described below

commit f481758f04cc28d1a417c4671b8705cd6e5f83e8
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Tue May 5 13:14:18 2026 +0100

    Propagate task OTel trace context through IPC and into execution API 
requests (#66151)
    
    The supervisor makes HTTP calls (XCom pushes, RTIF writes, connection
    lookups) on behalf of the task process via a Unix socket IPC channel.
    Without explicit propagation, those calls either float under the
    supervisor's own span or are unparented entirely — the task's trace
    context never crosses the process boundary.
    
    This commit wires the full chain:
    
    IPC leg (task process → supervisor):
    - Add traceparent: str | None = None field to _RequestFrame
    - _make_frame() injects the task runner's active W3C traceparent into
      every outgoing IPC frame via TraceContextTextMapPropagator.inject()
    - handle_requests() extracts the traceparent and calls
      otel_context.attach() before dispatching each request, restoring the
      task's trace context in the supervisor process for that request's
      lifetime
    - TriggerCommsDecoder.asend() now calls _make_frame() instead of
      constructing _RequestFrame directly, so trigger IPC frames carry
      the active span's traceparent too
    
    HTTP leg (supervisor → execution API):
    - inject_trace_context event hook on the httpx Client propagates the
      currently-active span's traceparent header on every outgoing request,
      linking server-side spans to the correct task span
    - _log_and_trace_retry records http.retry events on the active span
      alongside the existing log warning
    
    Dependency cleanup:
    - Move opentelemetry-api>=1.27.0 from [otel] optional extras to base
      [dependencies] in shared/observability — it flows unconditionally into
      task-sdk via shared_distributions, the same way airflow-core already
      has it unconditionally
    - Replace try/except ImportError guards and _NoOpTracer fallbacks in
      comms.py, supervisor.py, and client.py with direct imports; inject()
      and get_current_span() are no-ops when no TracerProvider is configured,
      so the guards were only testing "is OTel installed?" not "is it enabled?"
    - Introduce _FrameMixin (plain Python mixin, not a msgspec.Struct) to
      share _encoder and as_bytes() between _RequestFrame and _ResponseFrame
---
 shared/observability/pyproject.toml                |  3 +-
 task-sdk/pyproject.toml                            |  3 ++
 task-sdk/src/airflow/sdk/api/client.py             | 30 ++++++++++++-
 task-sdk/src/airflow/sdk/execution_time/comms.py   | 50 ++++++++++++++--------
 .../src/airflow/sdk/execution_time/supervisor.py   | 17 ++++++++
 .../task_sdk/execution_time/test_supervisor.py     | 46 ++++++++++++++++++++
 uv.lock                                            | 13 +++---
 7 files changed, 134 insertions(+), 28 deletions(-)

diff --git a/shared/observability/pyproject.toml 
b/shared/observability/pyproject.toml
index 54b902157df..724ff21e750 100644
--- a/shared/observability/pyproject.toml
+++ b/shared/observability/pyproject.toml
@@ -24,15 +24,14 @@ classifiers = [
 ]
 
 dependencies = [
+    "opentelemetry-api>=1.27.0",
     "pendulum>=3.1.0",
-    "pygtrie>=2.5.0",
     "structlog>=25.4.0",
     "methodtools>=0.4.7",
 ]
 
 [project.optional-dependencies]
 "otel" = [
-    "opentelemetry-api>=1.27.0",
     "opentelemetry-exporter-otlp>=1.27.0",
     "opentelemetry-proto<9999,>=1.27.0",
 ]
diff --git a/task-sdk/pyproject.toml b/task-sdk/pyproject.toml
index a33748911af..6e4a0b1017d 100644
--- a/task-sdk/pyproject.toml
+++ b/task-sdk/pyproject.toml
@@ -87,6 +87,9 @@ dependencies = [
     # Start of shared providers-discovery dependencies
     "jsonschema>=4.19.1",
     # End of shared providers-discovery dependencies
+    # Start of shared observability dependencies
+    "opentelemetry-api>=1.27.0",
+    # End of shared observability dependencies
 ]
 
 [project.optional-dependencies]
diff --git a/task-sdk/src/airflow/sdk/api/client.py 
b/task-sdk/src/airflow/sdk/api/client.py
index 54927794bf1..15513010062 100644
--- a/task-sdk/src/airflow/sdk/api/client.py
+++ b/task-sdk/src/airflow/sdk/api/client.py
@@ -29,6 +29,8 @@ import certifi
 import httpx
 import msgspec
 import structlog
+from opentelemetry import trace
+from opentelemetry.trace.propagation.tracecontext import 
TraceContextTextMapPropagator
 from pydantic import BaseModel
 from tenacity import (
     before_log,
@@ -163,6 +165,9 @@ def getuser() -> str:
 
 log = structlog.get_logger(logger_name=__name__)
 
+_trace_propagator = TraceContextTextMapPropagator()
+_log_retry_warning = before_log(log, logging.WARNING)
+
 __all__ = [
     "Client",
     "ConnectionOperations",
@@ -206,6 +211,24 @@ def add_correlation_id(request: httpx.Request):
     request.headers["correlation-id"] = str(uuid7())
 
 
+def inject_trace_context(request: httpx.Request) -> None:
+    _trace_propagator.inject(request.headers)
+
+
+def _log_and_trace_retry(retry_state) -> None:
+    _log_retry_warning(retry_state)
+    span = trace.get_current_span()
+    if span.is_recording():
+        exc = retry_state.outcome.exception() if retry_state.outcome else None
+        span.add_event(
+            "http.retry",
+            attributes={
+                "attempt_number": retry_state.attempt_number,
+                "error": str(exc) if exc else "",
+            },
+        )
+
+
 class TaskInstanceOperations:
     __slots__ = ("client",)
 
@@ -980,7 +1003,10 @@ class Client(httpx.Client):
                 "user-agent": f"apache-airflow-task-sdk/{__version__} 
(Python/{pyver})",
                 "airflow-api-version": API_VERSION,
             },
-            event_hooks={"response": [self._update_auth, raise_on_4xx_5xx], 
"request": [add_correlation_id]},
+            event_hooks={
+                "response": [self._update_auth, raise_on_4xx_5xx],
+                "request": [add_correlation_id, inject_trace_context],
+            },
             **kwargs,
         )
 
@@ -993,7 +1019,7 @@ class Client(httpx.Client):
         retry=retry_if_exception(_should_retry_api_request),
         stop=stop_after_attempt(API_RETRIES),
         wait=wait_random_exponential(min=API_RETRY_WAIT_MIN, 
max=API_RETRY_WAIT_MAX),
-        before_sleep=before_log(log, logging.WARNING),
+        before_sleep=_log_and_trace_retry,
         reraise=True,
     )
     def request(self, *args, **kwargs):
diff --git a/task-sdk/src/airflow/sdk/execution_time/comms.py 
b/task-sdk/src/airflow/sdk/execution_time/comms.py
index 87c7881333a..1e11e9636e5 100644
--- a/task-sdk/src/airflow/sdk/execution_time/comms.py
+++ b/task-sdk/src/airflow/sdk/execution_time/comms.py
@@ -103,6 +103,10 @@ except ImportError:
     # Available on Unix and Windows (so "everywhere") but lets be safe
     recv_fds = None  # type: ignore[assignment]
 
+from opentelemetry.trace.propagation.tracecontext import 
TraceContextTextMapPropagator
+
+_trace_propagator = TraceContextTextMapPropagator()
+
 
 if TYPE_CHECKING:
     from structlog.typing import FilteringBoundLogger as Logger
@@ -133,23 +137,14 @@ def _new_encoder() -> msgspec.msgpack.Encoder:
     return msgspec.msgpack.Encoder(enc_hook=_msgpack_enc_hook)
 
 
-class _RequestFrame(msgspec.Struct, array_like=True, frozen=True, 
omit_defaults=True):  # type: ignore[call-arg]
-    id: int
-    """
-    The request id, set by the sender.
-
-    This is used to allow "pipeling" of requests and to be able to tie 
response to requests, which is
-    particularly useful in the Triggerer where multiple async tasks can send a 
requests concurrently.
-    """
-    body: dict[str, Any] | None
-
-    req_encoder: ClassVar[msgspec.msgpack.Encoder] = _new_encoder()
+class _FrameMixin:
+    _encoder: ClassVar[msgspec.msgpack.Encoder] = _new_encoder()
 
     def as_bytes(self) -> bytearray:
         # https://jcristharif.com/msgspec/perf-tips.html#length-prefix-framing 
for inspiration
         buffer = bytearray(256)
 
-        self.req_encoder.encode_into(self, buffer, 4)
+        self._encoder.encode_into(self, buffer, 4)  # type: ignore[arg-type]
 
         n = len(buffer) - 4
         if n >= 2**32:
@@ -159,7 +154,25 @@ class _RequestFrame(msgspec.Struct, array_like=True, 
frozen=True, omit_defaults=
         return buffer
 
 
-class _ResponseFrame(_RequestFrame, frozen=True):  # type: ignore[call-arg]
+class _RequestFrame(_FrameMixin, msgspec.Struct, array_like=True, frozen=True, 
omit_defaults=True):  # type: ignore[call-arg]
+    id: int
+    """
+    The request id, set by the sender.
+
+    This is used to allow "pipeling" of requests and to be able to tie 
response to requests, which is
+    particularly useful in the Triggerer where multiple async tasks can send a 
requests concurrently.
+    """
+    body: dict[str, Any] | None
+    context_carrier: dict[str, str] | None = None
+    """W3C trace context carrier (traceparent + tracestate) of the task 
runner's active span.
+
+    The supervisor extracts this to restore the task runner's trace context 
before making outbound HTTP
+    calls, so that server-side spans (e.g. POST /xcoms/…) appear as children 
of the correct task span
+    rather than under the supervisor's own span.
+    """
+
+
+class _ResponseFrame(_FrameMixin, msgspec.Struct, array_like=True, 
frozen=True, omit_defaults=True):  # type: ignore[call-arg]
     id: int
     """
     The id of the request this is a response to
@@ -193,10 +206,14 @@ class CommsDecoder(Generic[ReceiveMsgType, SendMsgType]):
     # Async lock for async operations
     _async_lock: asyncio.Lock = attrs.field(factory=asyncio.Lock, repr=False)
 
+    def _make_frame(self, msg: SendMsgType) -> _RequestFrame:
+        carrier: dict[str, str] = {}
+        _trace_propagator.inject(carrier)
+        return _RequestFrame(id=next(self.id_counter), body=msg.model_dump(), 
context_carrier=carrier or None)
+
     def send(self, msg: SendMsgType) -> ReceiveMsgType | None:
         """Send a request to the parent and block until the response is 
received."""
-        frame = _RequestFrame(id=next(self.id_counter), body=msg.model_dump())
-        frame_bytes = frame.as_bytes()
+        frame_bytes = self._make_frame(msg).as_bytes()
 
         # We must make sure sockets aren't intermixed between sync and async 
calls,
         # thus we need a dual locking mechanism to ensure that.
@@ -224,8 +241,7 @@ class CommsDecoder(Generic[ReceiveMsgType, SendMsgType]):
 
         Uses async lock for coroutine safety and thread lock for socket safety.
         """
-        frame = _RequestFrame(id=next(self.id_counter), body=msg.model_dump())
-        frame_bytes = frame.as_bytes()
+        frame_bytes = self._make_frame(msg).as_bytes()
 
         async with self._async_lock:
             # Acquire the threading lock without blocking the event loop
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py 
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 375c5a9e30b..cd25d927957 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -132,6 +132,11 @@ try:
 except ImportError:
     send_fds = None  # type: ignore[assignment]
 
+from opentelemetry import context as otel_context, trace
+from opentelemetry.trace.propagation.tracecontext import 
TraceContextTextMapPropagator
+
+_trace_propagator = TraceContextTextMapPropagator()
+
 if TYPE_CHECKING:
     from structlog.typing import FilteringBoundLogger, WrappedLogger
     from typing_extensions import Self
@@ -738,7 +743,13 @@ class WatchedSubprocess:
                 log.exception("Unable to decode message", body=request.body)
                 continue
 
+            # Restore the task runner's trace context so that any outbound 
HTTP calls made while
+            # handling this request are linked to the correct task span, not 
the supervisor's own span.
+            token = None
             try:
+                if request.context_carrier:
+                    ctx = _trace_propagator.extract(request.context_carrier)
+                    token = otel_context.attach(ctx)
                 self._handle_request(msg, log, request.id)
             except ServerResponseError as e:
                 error_details = e.response.json() if e.response else None
@@ -762,6 +773,9 @@ class WatchedSubprocess:
                     ),
                     request_id=request.id,
                 )
+            finally:
+                if token is not None:
+                    otel_context.detach(token)
 
     def _handle_request(self, msg, log: FilteringBoundLogger, req_id: int) -> 
None:
         raise NotImplementedError()
@@ -2246,6 +2260,9 @@ def supervise_task(
         finally:
             if log_path and log_file_descriptor:
                 log_file_descriptor.close()
+            provider = trace.get_tracer_provider()
+            if hasattr(provider, "force_flush"):
+                provider.force_flush(timeout_millis=5000)  # upper bound, not 
a fixed wait
 
 
 def supervise(**kwargs) -> int:
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py 
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index 3695af1fff5..f0d8e1a0b65 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -44,6 +44,10 @@ import msgspec
 import psutil
 import pytest
 import structlog
+from opentelemetry.sdk.trace import TracerProvider
+from opentelemetry.sdk.trace.export import SimpleSpanProcessor
+from opentelemetry.sdk.trace.export.in_memory_span_exporter import 
InMemorySpanExporter
+from opentelemetry.trace import get_current_span
 from pytest_unordered import unordered
 from task_sdk import FAKE_BUNDLE, make_client
 from uuid6 import uuid7
@@ -3515,3 +3519,45 @@ class TestChildExecMain:
             os.close(saved_2)
             for s in [req_a, req_b, out_a, out_b, err_a, err_b]:
                 s.close()
+
+
+def test_ipc_trace_context_propagation(mocker):
+    """Full IPC propagation chain: _make_frame injects the active span; 
handle_requests restores it."""
+    provider = TracerProvider()
+    provider.add_span_processor(SimpleSpanProcessor(InMemorySpanExporter()))
+    tracer = provider.get_tracer("test")
+
+    # Task-runner side: _make_frame injects the active span via the real 
propagator.
+    with tracer.start_as_current_span("task_span") as span:
+        frame = CommsDecoder(socket=None)._make_frame(GetVariable(key="k"))  # 
type: ignore[arg-type]
+        expected_span_id = span.get_span_context().span_id
+
+    assert frame.context_carrier is not None
+    assert f"{expected_span_id:016x}" in 
frame.context_carrier.get("traceparent", "")
+
+    # Supervisor side: handle_requests extracts and restores the context 
before dispatch.
+    # Capture the active span from inside the client call — exercises the full 
dispatch path.
+    _, write_end = socket.socketpair()
+    proc = ActivitySubprocess(
+        process_log=mocker.MagicMock(),
+        id=TI_ID,
+        pid=12345,
+        stdin=write_end,
+        client=mocker.Mock(),
+        process=mocker.Mock(),
+    )
+    captured: list[int] = []
+
+    def capture_on_get(key):
+        captured.append(get_current_span().get_span_context().span_id)
+        return VariableResult(key=key, value="v")
+
+    proc.client.variables.get.side_effect = capture_on_get
+
+    generator = proc.handle_requests(log=mocker.Mock())
+    next(generator)
+    generator.send(frame)
+
+    assert captured == [expected_span_id]
+    # Context is detached after dispatch — no leak.
+    assert get_current_span().get_span_context().span_id != expected_span_id
diff --git a/uv.lock b/uv.lock
index b31aab16b3b..8a0ba2fa354 100644
--- a/uv.lock
+++ b/uv.lock
@@ -8161,15 +8161,14 @@ version = "0.0"
 source = { editable = "shared/observability" }
 dependencies = [
     { name = "methodtools" },
+    { name = "opentelemetry-api" },
     { name = "pendulum" },
-    { name = "pygtrie" },
     { name = "structlog" },
 ]
 
 [package.optional-dependencies]
 all = [
     { name = "datadog" },
-    { name = "opentelemetry-api" },
     { name = "opentelemetry-exporter-otlp" },
     { name = "opentelemetry-proto" },
     { name = "statsd" },
@@ -8178,7 +8177,6 @@ datadog = [
     { name = "datadog" },
 ]
 otel = [
-    { name = "opentelemetry-api" },
     { name = "opentelemetry-exporter-otlp" },
     { name = "opentelemetry-proto" },
 ]
@@ -8200,11 +8198,10 @@ requires-dist = [
     { name = "apache-airflow-shared-observability", extras = ["otel", 
"statsd", "datadog"], marker = "extra == 'all'", editable = 
"shared/observability" },
     { name = "datadog", marker = "extra == 'datadog'", specifier = ">=0.50.0" 
},
     { name = "methodtools", specifier = ">=0.4.7" },
-    { name = "opentelemetry-api", marker = "extra == 'otel'", specifier = 
">=1.27.0" },
+    { name = "opentelemetry-api", specifier = ">=1.27.0" },
     { name = "opentelemetry-exporter-otlp", marker = "extra == 'otel'", 
specifier = ">=1.27.0" },
     { name = "opentelemetry-proto", marker = "extra == 'otel'", specifier = 
">=1.27.0,<9999" },
     { name = "pendulum", specifier = ">=3.1.0" },
-    { name = "pygtrie", specifier = ">=2.5.0" },
     { name = "statsd", marker = "extra == 'statsd'", specifier = ">=3.3.0" },
     { name = "structlog", specifier = ">=25.4.0" },
 ]
@@ -8446,6 +8443,7 @@ dependencies = [
     { name = "jsonschema" },
     { name = "methodtools" },
     { name = "msgspec" },
+    { name = "opentelemetry-api" },
     { name = "packaging" },
     { name = "pathspec" },
     { name = "pendulum" },
@@ -8521,6 +8519,7 @@ requires-dist = [
     { name = "jsonschema", specifier = ">=4.19.1" },
     { name = "methodtools", specifier = ">=0.4.7" },
     { name = "msgspec", specifier = ">=0.19.0" },
+    { name = "opentelemetry-api", specifier = ">=1.27.0" },
     { name = "opentelemetry-api", marker = "extra == 'all'", specifier = 
">=1.27.0" },
     { name = "opentelemetry-api", marker = "extra == 'otel'", specifier = 
">=1.27.0" },
     { name = "opentelemetry-exporter-otlp", marker = "extra == 'all'", 
specifier = ">=1.27.0" },
@@ -20298,8 +20297,8 @@ name = "secretstorage"
 version = "3.5.0"
 source = { registry = "https://pypi.org/simple"; }
 dependencies = [
-    { name = "cryptography", marker = "python_full_version >= '3.14' or 
platform_machine != 'arm64' or sys_platform != 'darwin'" },
-    { name = "jeepney", marker = "python_full_version >= '3.14' or 
platform_machine != 'arm64' or sys_platform != 'darwin'" },
+    { name = "cryptography", marker = "(python_full_version >= '3.14' and 
sys_platform == 'darwin') or (python_full_version < '3.15' and sys_platform == 
'emscripten') or (python_full_version < '3.15' and sys_platform == 'win32') or 
(platform_machine != 'arm64' and sys_platform == 'darwin') or (sys_platform != 
'darwin' and sys_platform != 'emscripten' and sys_platform != 'win32')" },
+    { name = "jeepney", marker = "(python_full_version >= '3.14' and 
sys_platform == 'darwin') or (python_full_version < '3.15' and sys_platform == 
'emscripten') or (python_full_version < '3.15' and sys_platform == 'win32') or 
(platform_machine != 'arm64' and sys_platform == 'darwin') or (sys_platform != 
'darwin' and sys_platform != 'emscripten' and sys_platform != 'win32')" },
 ]
 sdist = { url = 
"https://files.pythonhosted.org/packages/1c/03/e834bcd866f2f8a49a85eaff47340affa3bfa391ee9912a952a1faa68c7b/secretstorage-3.5.0.tar.gz";,
 hash = 
"sha256:f04b8e4689cbce351744d5537bf6b1329c6fc68f91fa666f60a380edddcd11be", size 
= 19884, upload-time = "2025-11-23T19:02:53.191Z" }
 wheels = [

Reply via email to