This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 5898bcb2b92 Emit OpenTelemetry spans around listener hook calls 
(#67347)
5898bcb2b92 is described below

commit 5898bcb2b926f678fb0cc3877cf99bb54a6a0b3c
Author: Daniel Standish <[email protected]>
AuthorDate: Thu May 28 11:19:55 2026 -0700

    Emit OpenTelemetry spans around listener hook calls (#67347)
    
    Push span creation down into ListenerManager via pluggy's
    add_hookcall_monitoring so every hook call gets a
    `listener.<hook_name>` span automatically, gated on task span detail
    level > 1. Callers no longer need to wrap individual hook invocations.
    
    Also makes the existing _after_hookcall safe against listener
    exceptions: previously `outcome.get_result()` was called
    unconditionally and would re-raise, skipping the rest of the callback.
---
 airflow-core/tests/integration/otel/test_otel.py   |  6 +-
 shared/listeners/pyproject.toml                    |  3 +
 .../src/airflow_shared/listeners/listener.py       | 48 ++++++++++-
 .../tests/listeners/test_listener_manager.py       | 95 +++++++++++++++++++++-
 .../src/airflow/sdk/execution_time/task_runner.py  |  3 +-
 uv.lock                                            | 10 ++-
 6 files changed, 159 insertions(+), 6 deletions(-)

diff --git a/airflow-core/tests/integration/otel/test_otel.py 
b/airflow-core/tests/integration/otel/test_otel.py
index a6af896b437..c543f54a921 100644
--- a/airflow-core/tests/integration/otel/test_otel.py
+++ b/airflow-core/tests/integration/otel/test_otel.py
@@ -458,7 +458,6 @@ class TestOtelIntegration:
                 # Additional detail spans are deferred to follow-up PRs; 
tracked
                 # at https://linear.app/astronomer/issue/ACD-157.
                 {
-                    "hook.on_starting": "startup",
                     "_verify_bundle_access": "parse",
                     "parse": "startup",
                     "get_template_context": "startup",
@@ -474,6 +473,11 @@ class TestOtelIntegration:
                     "dag_run.otel_test_dag": None,
                     "task_run.task1": "dag_run.otel_test_dag",
                     "worker.task1": "task_run.task1",
+                    # OpenLineage registers a listener by default, so its
+                    # on_task_instance_running / on_task_instance_success hook
+                    # calls get wrapped in spans at detail level > 1.
+                    "listener.on_task_instance_running": "_prepare",
+                    "listener.on_task_instance_success": "finalize",
                 },
                 id="detail_spans",
             ),
diff --git a/shared/listeners/pyproject.toml b/shared/listeners/pyproject.toml
index 0554778a47e..e5a67d003cd 100644
--- a/shared/listeners/pyproject.toml
+++ b/shared/listeners/pyproject.toml
@@ -24,6 +24,7 @@ classifiers = [
 ]
 
 dependencies = [
+    "opentelemetry-api>=1.27.0",
     "pluggy>=1.5.0",
     "structlog>=25.4.0",
 ]
@@ -31,6 +32,8 @@ dependencies = [
 [dependency-groups]
 dev = [
     "apache-airflow-devel-common",
+    "apache-airflow-shared-observability",
+    "opentelemetry-sdk>=1.27.0",
 ]
 mypy = [
     "apache-airflow-devel-common[mypy]",
diff --git a/shared/listeners/src/airflow_shared/listeners/listener.py 
b/shared/listeners/src/airflow_shared/listeners/listener.py
index d4b36c059d4..a76019ac028 100644
--- a/shared/listeners/src/airflow_shared/listeners/listener.py
+++ b/shared/listeners/src/airflow_shared/listeners/listener.py
@@ -17,24 +17,70 @@
 # under the License.
 from __future__ import annotations
 
+import threading
 from typing import TYPE_CHECKING
 
 import pluggy
 import structlog
+from opentelemetry import trace
+from opentelemetry.trace import Status, StatusCode
+
+from ..observability.traces import DEFAULT_TASK_SPAN_DETAIL_LEVEL, 
TASK_SPAN_DETAIL_LEVEL_KEY
 
 if TYPE_CHECKING:
+    from opentelemetry.trace import Span
     from pluggy._hooks import _HookRelay
 
 log = structlog.get_logger(__name__)
+tracer = trace.get_tracer(__name__)
+
+
+def _detail_level(span: Span) -> int:
+    raw = span.get_span_context().trace_state.get(TASK_SPAN_DETAIL_LEVEL_KEY)
+    if raw is None:
+        return DEFAULT_TASK_SPAN_DETAIL_LEVEL
+    try:
+        return int(raw)
+    except (TypeError, ValueError):
+        return DEFAULT_TASK_SPAN_DETAIL_LEVEL
+
+
+_span_state = threading.local()
+
+
+def _stack() -> list:
+    stack = getattr(_span_state, "stack", None)
+    if stack is None:
+        stack = _span_state.stack = []
+    return stack
 
 
 def _before_hookcall(hook_name, hook_impls, kwargs):
     log.debug("Calling %r with %r", hook_name, kwargs)
     log.debug("Hook impls: %s", hook_impls)
+    if not hook_impls or _detail_level(trace.get_current_span()) <= 1:
+        _stack().append(None)
+        return
+    cm = tracer.start_as_current_span(f"listener.{hook_name}")
+    span = cm.__enter__()
+    _stack().append((cm, span))
 
 
 def _after_hookcall(outcome, hook_name, hook_impls, kwargs):
-    log.debug("Result from %r: %s", hook_name, outcome.get_result())
+    excinfo = getattr(outcome, "excinfo", None)
+    if excinfo:
+        log.debug("Hook %r raised %s", hook_name, excinfo[0].__name__)
+    else:
+        log.debug("Result from %r: %s", hook_name, outcome.get_result())
+    entry = _stack().pop()
+    if entry is None:
+        return
+    cm, span = entry
+    if excinfo:
+        exc_type, exc, _tb = excinfo
+        span.record_exception(exc)
+        span.set_status(Status(StatusCode.ERROR, description=f"Exception: 
{exc_type.__name__}"))
+    cm.__exit__(None, None, None)
 
 
 class ListenerManager:
diff --git a/shared/listeners/tests/listeners/test_listener_manager.py 
b/shared/listeners/tests/listeners/test_listener_manager.py
index ebf360dade0..3843c89f88f 100644
--- a/shared/listeners/tests/listeners/test_listener_manager.py
+++ b/shared/listeners/tests/listeners/test_listener_manager.py
@@ -17,9 +17,19 @@
 # under the License.
 from __future__ import annotations
 
-from airflow_shared.listeners import hookimpl
+from unittest import mock
+
+import pytest
+from opentelemetry.sdk.trace import TracerProvider
+from opentelemetry.sdk.trace.export import SimpleSpanProcessor
+from opentelemetry.sdk.trace.export.in_memory_span_exporter import 
InMemorySpanExporter
+from opentelemetry.trace import StatusCode
+from opentelemetry.trace.propagation.tracecontext import 
TraceContextTextMapPropagator
+
+from airflow_shared.listeners import hookimpl, listener as listener_module
 from airflow_shared.listeners.listener import ListenerManager
 from airflow_shared.listeners.spec import lifecycle, taskinstance
+from airflow_shared.observability.traces import new_dagrun_trace_carrier
 
 
 class TestListenerManager:
@@ -162,3 +172,86 @@ class TestListenerManager:
             ("success", mock_ti),
             ("failed", mock_ti, "test error"),
         ]
+
+
[email protected]
+def test_tracer():
+    """Patch the listener module's tracer with one backed by an in-memory 
exporter."""
+    exporter = InMemorySpanExporter()
+    provider = TracerProvider()
+    provider.add_span_processor(SimpleSpanProcessor(exporter))
+    tracer = provider.get_tracer("test")
+    with mock.patch.object(listener_module, "tracer", tracer):
+        yield tracer, exporter
+
+
+def _parent_span_ctx(detail_level: int):
+    carrier = new_dagrun_trace_carrier(task_span_detail_level=detail_level)
+    return TraceContextTextMapPropagator().extract(carrier)
+
+
+class _StartingListener:
+    @hookimpl
+    def on_starting(self, component):
+        pass
+
+
+class _RaisingListener:
+    @hookimpl
+    def on_starting(self, component):
+        raise RuntimeError("boom")
+
+
+class TestListenerSpan:
+    """Span emitted around every listener hook call when detail level > 1."""
+
+    def test_emits_span_when_detail_level_above_1(self, test_tracer):
+        tracer, exporter = test_tracer
+        lm = ListenerManager()
+        lm.add_hookspecs(lifecycle)
+        lm.add_listener(_StartingListener())
+
+        with tracer.start_as_current_span("parent", 
context=_parent_span_ctx(2)):
+            lm.hook.on_starting(component="x")
+
+        names = [s.name for s in exporter.get_finished_spans()]
+        assert "listener.on_starting" in names
+
+    def test_no_span_at_default_detail_level(self, test_tracer):
+        tracer, exporter = test_tracer
+        lm = ListenerManager()
+        lm.add_hookspecs(lifecycle)
+        lm.add_listener(_StartingListener())
+
+        with tracer.start_as_current_span("parent", 
context=_parent_span_ctx(1)):
+            lm.hook.on_starting(component="x")
+
+        names = [s.name for s in exporter.get_finished_spans()]
+        assert "listener.on_starting" not in names
+
+    def test_no_span_when_no_impls_registered(self, test_tracer):
+        tracer, exporter = test_tracer
+        lm = ListenerManager()
+        lm.add_hookspecs(lifecycle)
+        # No listeners added — pluggy still fires monitoring around the call.
+
+        with tracer.start_as_current_span("parent", 
context=_parent_span_ctx(2)):
+            lm.hook.on_starting(component="x")
+
+        names = [s.name for s in exporter.get_finished_spans()]
+        assert "listener.on_starting" not in names
+
+    def test_records_exception_on_listener_error(self, test_tracer):
+        tracer, exporter = test_tracer
+        lm = ListenerManager()
+        lm.add_hookspecs(lifecycle)
+        lm.add_listener(_RaisingListener())
+
+        with tracer.start_as_current_span("parent", 
context=_parent_span_ctx(2)):
+            with pytest.raises(RuntimeError):
+                lm.hook.on_starting(component="x")
+
+        spans = {s.name: s for s in exporter.get_finished_spans()}
+        listener_span = spans["listener.on_starting"]
+        assert listener_span.status.status_code == StatusCode.ERROR
+        assert any(ev.name == "exception" for ev in listener_span.events)
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py 
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index 313309fa401..df340f25112 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -1022,8 +1022,7 @@ def startup(msg: StartupDetails) -> 
tuple[RuntimeTaskInstance, Context, Logger]:
     )
 
     try:
-        with detail_span("hook.on_starting"):
-            
get_listener_manager().hook.on_starting(component=TaskRunnerMarker())
+        get_listener_manager().hook.on_starting(component=TaskRunnerMarker())
     except Exception:
         log.exception("error calling listener")
 
diff --git a/uv.lock b/uv.lock
index 6fefbf9fc23..9fbd48c147d 100644
--- a/uv.lock
+++ b/uv.lock
@@ -8311,6 +8311,7 @@ name = "apache-airflow-shared-listeners"
 version = "0.0"
 source = { editable = "shared/listeners" }
 dependencies = [
+    { name = "opentelemetry-api" },
     { name = "pluggy" },
     { name = "structlog" },
 ]
@@ -8318,6 +8319,8 @@ dependencies = [
 [package.dev-dependencies]
 dev = [
     { name = "apache-airflow-devel-common" },
+    { name = "apache-airflow-shared-observability" },
+    { name = "opentelemetry-sdk" },
 ]
 mypy = [
     { name = "apache-airflow-devel-common", extra = ["mypy"] },
@@ -8325,12 +8328,17 @@ mypy = [
 
 [package.metadata]
 requires-dist = [
+    { name = "opentelemetry-api", specifier = ">=1.27.0" },
     { name = "pluggy", specifier = ">=1.5.0" },
     { name = "structlog", specifier = ">=25.4.0" },
 ]
 
 [package.metadata.requires-dev]
-dev = [{ name = "apache-airflow-devel-common", editable = "devel-common" }]
+dev = [
+    { name = "apache-airflow-devel-common", editable = "devel-common" },
+    { name = "apache-airflow-shared-observability", editable = 
"shared/observability" },
+    { name = "opentelemetry-sdk", specifier = ">=1.27.0" },
+]
 mypy = [{ name = "apache-airflow-devel-common", extras = ["mypy"], editable = 
"devel-common" }]
 
 [[package]]

Reply via email to