This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 5898bcb2b92 Emit OpenTelemetry spans around listener hook calls
(#67347)
5898bcb2b92 is described below
commit 5898bcb2b926f678fb0cc3877cf99bb54a6a0b3c
Author: Daniel Standish <[email protected]>
AuthorDate: Thu May 28 11:19:55 2026 -0700
Emit OpenTelemetry spans around listener hook calls (#67347)
Push span creation down into ListenerManager via pluggy's
add_hookcall_monitoring so every hook call gets a
`listener.<hook_name>` span automatically, gated on task span detail
level > 1. Callers no longer need to wrap individual hook invocations.
Also makes the existing _after_hookcall safe against listener
exceptions: previously `outcome.get_result()` was called
unconditionally and would re-raise, skipping the rest of the callback.
---
airflow-core/tests/integration/otel/test_otel.py | 6 +-
shared/listeners/pyproject.toml | 3 +
.../src/airflow_shared/listeners/listener.py | 48 ++++++++++-
.../tests/listeners/test_listener_manager.py | 95 +++++++++++++++++++++-
.../src/airflow/sdk/execution_time/task_runner.py | 3 +-
uv.lock | 10 ++-
6 files changed, 159 insertions(+), 6 deletions(-)
diff --git a/airflow-core/tests/integration/otel/test_otel.py
b/airflow-core/tests/integration/otel/test_otel.py
index a6af896b437..c543f54a921 100644
--- a/airflow-core/tests/integration/otel/test_otel.py
+++ b/airflow-core/tests/integration/otel/test_otel.py
@@ -458,7 +458,6 @@ class TestOtelIntegration:
# Additional detail spans are deferred to follow-up PRs;
tracked
# at https://linear.app/astronomer/issue/ACD-157.
{
- "hook.on_starting": "startup",
"_verify_bundle_access": "parse",
"parse": "startup",
"get_template_context": "startup",
@@ -474,6 +473,11 @@ class TestOtelIntegration:
"dag_run.otel_test_dag": None,
"task_run.task1": "dag_run.otel_test_dag",
"worker.task1": "task_run.task1",
+ # OpenLineage registers a listener by default, so its
+ # on_task_instance_running / on_task_instance_success hook
+ # calls get wrapped in spans at detail level > 1.
+ "listener.on_task_instance_running": "_prepare",
+ "listener.on_task_instance_success": "finalize",
},
id="detail_spans",
),
diff --git a/shared/listeners/pyproject.toml b/shared/listeners/pyproject.toml
index 0554778a47e..e5a67d003cd 100644
--- a/shared/listeners/pyproject.toml
+++ b/shared/listeners/pyproject.toml
@@ -24,6 +24,7 @@ classifiers = [
]
dependencies = [
+ "opentelemetry-api>=1.27.0",
"pluggy>=1.5.0",
"structlog>=25.4.0",
]
@@ -31,6 +32,8 @@ dependencies = [
[dependency-groups]
dev = [
"apache-airflow-devel-common",
+ "apache-airflow-shared-observability",
+ "opentelemetry-sdk>=1.27.0",
]
mypy = [
"apache-airflow-devel-common[mypy]",
diff --git a/shared/listeners/src/airflow_shared/listeners/listener.py
b/shared/listeners/src/airflow_shared/listeners/listener.py
index d4b36c059d4..a76019ac028 100644
--- a/shared/listeners/src/airflow_shared/listeners/listener.py
+++ b/shared/listeners/src/airflow_shared/listeners/listener.py
@@ -17,24 +17,70 @@
# under the License.
from __future__ import annotations
+import threading
from typing import TYPE_CHECKING
import pluggy
import structlog
+from opentelemetry import trace
+from opentelemetry.trace import Status, StatusCode
+
+from ..observability.traces import DEFAULT_TASK_SPAN_DETAIL_LEVEL,
TASK_SPAN_DETAIL_LEVEL_KEY
if TYPE_CHECKING:
+ from opentelemetry.trace import Span
from pluggy._hooks import _HookRelay
log = structlog.get_logger(__name__)
+tracer = trace.get_tracer(__name__)
+
+
+def _detail_level(span: Span) -> int:
+ raw = span.get_span_context().trace_state.get(TASK_SPAN_DETAIL_LEVEL_KEY)
+ if raw is None:
+ return DEFAULT_TASK_SPAN_DETAIL_LEVEL
+ try:
+ return int(raw)
+ except (TypeError, ValueError):
+ return DEFAULT_TASK_SPAN_DETAIL_LEVEL
+
+
+_span_state = threading.local()
+
+
+def _stack() -> list:
+ stack = getattr(_span_state, "stack", None)
+ if stack is None:
+ stack = _span_state.stack = []
+ return stack
def _before_hookcall(hook_name, hook_impls, kwargs):
log.debug("Calling %r with %r", hook_name, kwargs)
log.debug("Hook impls: %s", hook_impls)
+ if not hook_impls or _detail_level(trace.get_current_span()) <= 1:
+ _stack().append(None)
+ return
+ cm = tracer.start_as_current_span(f"listener.{hook_name}")
+ span = cm.__enter__()
+ _stack().append((cm, span))
def _after_hookcall(outcome, hook_name, hook_impls, kwargs):
- log.debug("Result from %r: %s", hook_name, outcome.get_result())
+ excinfo = getattr(outcome, "excinfo", None)
+ if excinfo:
+ log.debug("Hook %r raised %s", hook_name, excinfo[0].__name__)
+ else:
+ log.debug("Result from %r: %s", hook_name, outcome.get_result())
+ entry = _stack().pop()
+ if entry is None:
+ return
+ cm, span = entry
+ if excinfo:
+ exc_type, exc, _tb = excinfo
+ span.record_exception(exc)
+ span.set_status(Status(StatusCode.ERROR, description=f"Exception:
{exc_type.__name__}"))
+ cm.__exit__(None, None, None)
class ListenerManager:
diff --git a/shared/listeners/tests/listeners/test_listener_manager.py
b/shared/listeners/tests/listeners/test_listener_manager.py
index ebf360dade0..3843c89f88f 100644
--- a/shared/listeners/tests/listeners/test_listener_manager.py
+++ b/shared/listeners/tests/listeners/test_listener_manager.py
@@ -17,9 +17,19 @@
# under the License.
from __future__ import annotations
-from airflow_shared.listeners import hookimpl
+from unittest import mock
+
+import pytest
+from opentelemetry.sdk.trace import TracerProvider
+from opentelemetry.sdk.trace.export import SimpleSpanProcessor
+from opentelemetry.sdk.trace.export.in_memory_span_exporter import
InMemorySpanExporter
+from opentelemetry.trace import StatusCode
+from opentelemetry.trace.propagation.tracecontext import
TraceContextTextMapPropagator
+
+from airflow_shared.listeners import hookimpl, listener as listener_module
from airflow_shared.listeners.listener import ListenerManager
from airflow_shared.listeners.spec import lifecycle, taskinstance
+from airflow_shared.observability.traces import new_dagrun_trace_carrier
class TestListenerManager:
@@ -162,3 +172,86 @@ class TestListenerManager:
("success", mock_ti),
("failed", mock_ti, "test error"),
]
+
+
[email protected]
+def test_tracer():
+ """Patch the listener module's tracer with one backed by an in-memory
exporter."""
+ exporter = InMemorySpanExporter()
+ provider = TracerProvider()
+ provider.add_span_processor(SimpleSpanProcessor(exporter))
+ tracer = provider.get_tracer("test")
+ with mock.patch.object(listener_module, "tracer", tracer):
+ yield tracer, exporter
+
+
+def _parent_span_ctx(detail_level: int):
+ carrier = new_dagrun_trace_carrier(task_span_detail_level=detail_level)
+ return TraceContextTextMapPropagator().extract(carrier)
+
+
+class _StartingListener:
+ @hookimpl
+ def on_starting(self, component):
+ pass
+
+
+class _RaisingListener:
+ @hookimpl
+ def on_starting(self, component):
+ raise RuntimeError("boom")
+
+
+class TestListenerSpan:
+ """Span emitted around every listener hook call when detail level > 1."""
+
+ def test_emits_span_when_detail_level_above_1(self, test_tracer):
+ tracer, exporter = test_tracer
+ lm = ListenerManager()
+ lm.add_hookspecs(lifecycle)
+ lm.add_listener(_StartingListener())
+
+ with tracer.start_as_current_span("parent",
context=_parent_span_ctx(2)):
+ lm.hook.on_starting(component="x")
+
+ names = [s.name for s in exporter.get_finished_spans()]
+ assert "listener.on_starting" in names
+
+ def test_no_span_at_default_detail_level(self, test_tracer):
+ tracer, exporter = test_tracer
+ lm = ListenerManager()
+ lm.add_hookspecs(lifecycle)
+ lm.add_listener(_StartingListener())
+
+ with tracer.start_as_current_span("parent",
context=_parent_span_ctx(1)):
+ lm.hook.on_starting(component="x")
+
+ names = [s.name for s in exporter.get_finished_spans()]
+ assert "listener.on_starting" not in names
+
+ def test_no_span_when_no_impls_registered(self, test_tracer):
+ tracer, exporter = test_tracer
+ lm = ListenerManager()
+ lm.add_hookspecs(lifecycle)
+ # No listeners added — pluggy still fires monitoring around the call.
+
+ with tracer.start_as_current_span("parent",
context=_parent_span_ctx(2)):
+ lm.hook.on_starting(component="x")
+
+ names = [s.name for s in exporter.get_finished_spans()]
+ assert "listener.on_starting" not in names
+
+ def test_records_exception_on_listener_error(self, test_tracer):
+ tracer, exporter = test_tracer
+ lm = ListenerManager()
+ lm.add_hookspecs(lifecycle)
+ lm.add_listener(_RaisingListener())
+
+ with tracer.start_as_current_span("parent",
context=_parent_span_ctx(2)):
+ with pytest.raises(RuntimeError):
+ lm.hook.on_starting(component="x")
+
+ spans = {s.name: s for s in exporter.get_finished_spans()}
+ listener_span = spans["listener.on_starting"]
+ assert listener_span.status.status_code == StatusCode.ERROR
+ assert any(ev.name == "exception" for ev in listener_span.events)
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index 313309fa401..df340f25112 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -1022,8 +1022,7 @@ def startup(msg: StartupDetails) ->
tuple[RuntimeTaskInstance, Context, Logger]:
)
try:
- with detail_span("hook.on_starting"):
-
get_listener_manager().hook.on_starting(component=TaskRunnerMarker())
+ get_listener_manager().hook.on_starting(component=TaskRunnerMarker())
except Exception:
log.exception("error calling listener")
diff --git a/uv.lock b/uv.lock
index 6fefbf9fc23..9fbd48c147d 100644
--- a/uv.lock
+++ b/uv.lock
@@ -8311,6 +8311,7 @@ name = "apache-airflow-shared-listeners"
version = "0.0"
source = { editable = "shared/listeners" }
dependencies = [
+ { name = "opentelemetry-api" },
{ name = "pluggy" },
{ name = "structlog" },
]
@@ -8318,6 +8319,8 @@ dependencies = [
[package.dev-dependencies]
dev = [
{ name = "apache-airflow-devel-common" },
+ { name = "apache-airflow-shared-observability" },
+ { name = "opentelemetry-sdk" },
]
mypy = [
{ name = "apache-airflow-devel-common", extra = ["mypy"] },
@@ -8325,12 +8328,17 @@ mypy = [
[package.metadata]
requires-dist = [
+ { name = "opentelemetry-api", specifier = ">=1.27.0" },
{ name = "pluggy", specifier = ">=1.5.0" },
{ name = "structlog", specifier = ">=25.4.0" },
]
[package.metadata.requires-dev]
-dev = [{ name = "apache-airflow-devel-common", editable = "devel-common" }]
+dev = [
+ { name = "apache-airflow-devel-common", editable = "devel-common" },
+ { name = "apache-airflow-shared-observability", editable =
"shared/observability" },
+ { name = "opentelemetry-sdk", specifier = ">=1.27.0" },
+]
mypy = [{ name = "apache-airflow-devel-common", extras = ["mypy"], editable =
"devel-common" }]
[[package]]