This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new aee4240161d Forward run-identity attributes to the trace sampler
(#68592)
aee4240161d is described below
commit aee4240161df4e1b6bf04e35a2dd8c468c0c344c
Author: Daniel Standish <[email protected]>
AuthorDate: Tue Jun 23 13:24:10 2026 -0700
Forward run-identity attributes to the trace sampler (#68592)
Builds on the head-sampling foundation (#68591) by forwarding run-identity
attributes (dag_id, run_type, run_id) to the trace sampler, so a custom
sampler can
differentiate the head-sampling decision by run kind.
---
airflow-core/src/airflow/models/dagrun.py | 28 +++++++++++++--
airflow-core/src/airflow/models/taskinstance.py | 5 +--
airflow-core/tests/unit/models/test_dagrun.py | 15 ++++++++
.../observability/traces/__init__.py | 8 ++++-
.../tests/observability/test_traces.py | 40 ++++++++++++++++++++++
5 files changed, 90 insertions(+), 6 deletions(-)
diff --git a/airflow-core/src/airflow/models/dagrun.py
b/airflow-core/src/airflow/models/dagrun.py
index cb347e17f6d..f5ae6a28d58 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -154,6 +154,28 @@ def _creator_note(val):
return DagRunNote(*val)
+def dagrun_trace_attributes(dr) -> dict[str, str]:
+ """
+ Run-identity attributes for a DAG run's trace.
+
+ Defined once and used in two places: forwarded to the sampler at carrier
+ creation (so a custom sampler can differentiate the head-sampling decision
by
+ run kind) and set on the emitted ``dag_run`` span. ``getattr`` guards
because
+ the values may not be populated yet at ``__init__``/clear time.
+ """
+ attributes: dict[str, str] = {}
+ dag_id = getattr(dr, "dag_id", None)
+ if dag_id is not None:
+ attributes["airflow.dag_id"] = str(dag_id)
+ run_id = getattr(dr, "run_id", None)
+ if run_id is not None:
+ attributes["airflow.dag_run.run_id"] = str(run_id)
+ run_type = getattr(dr, "run_type", None)
+ if run_type is not None:
+ attributes["airflow.dag_run.run_type"] = str(run_type)
+ return attributes
+
+
class DagRun(Base, LoggingMixin):
"""
Invocation instance of a DAG.
@@ -385,7 +407,8 @@ class DagRun(Base, LoggingMixin):
self.triggering_user_name = triggering_user_name
self.scheduled_by_job_id = None
self.context_carrier: dict[str, str] = new_dagrun_trace_carrier(
- task_span_detail_level=self.conf.get(TASK_SPAN_DETAIL_LEVEL_KEY,
None)
+ task_span_detail_level=self.conf.get(TASK_SPAN_DETAIL_LEVEL_KEY,
None),
+ attributes=dagrun_trace_attributes(self), # these are for
potential use by head sampler
)
if not isinstance(partition_key, str | None):
@@ -1087,8 +1110,7 @@ class DagRun(Base, LoggingMixin):
with override_ids(span_context.trace_id, span_context.span_id):
attributes: dict[str, str] = {
- "airflow.dag_id": str(self.dag_id),
- "airflow.dag_run.run_id": self.run_id,
+ **dagrun_trace_attributes(self),
}
if self.start_date:
attributes["airflow.dag_run.start_date"] = str(self.start_date)
diff --git a/airflow-core/src/airflow/models/taskinstance.py
b/airflow-core/src/airflow/models/taskinstance.py
index b39c9f3b322..f905e4c8e3d 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -407,7 +407,7 @@ def clear_task_instances(
session.merge(ti)
if dag_run_state is not False and tis:
- from airflow.models.dagrun import DagRun # Avoid circular import
+ from airflow.models.dagrun import DagRun, dagrun_trace_attributes #
Avoid circular import
run_ids_by_dag_id = defaultdict(set)
for instance in tis:
@@ -429,7 +429,8 @@ def clear_task_instances(
dr.clear_number += 1
dr.queued_at = timezone.utcnow()
dr.context_carrier = new_dagrun_trace_carrier(
- task_span_detail_level=dr.conf.get(TASK_SPAN_DETAIL_LEVEL_KEY)
if dr.conf else None
+ task_span_detail_level=dr.conf.get(TASK_SPAN_DETAIL_LEVEL_KEY)
if dr.conf else None,
+ attributes=dagrun_trace_attributes(dr),
)
_recalculate_dagrun_queued_at_deadlines(dr, dr.queued_at, session)
diff --git a/airflow-core/tests/unit/models/test_dagrun.py
b/airflow-core/tests/unit/models/test_dagrun.py
index 290e616dd99..a2e852f5834 100644
--- a/airflow-core/tests/unit/models/test_dagrun.py
+++ b/airflow-core/tests/unit/models/test_dagrun.py
@@ -4307,10 +4307,25 @@ class TestDagRunTracing:
assert span.name == f"dag_run.{dr.dag_id}"
assert span.attributes["airflow.dag_id"] == dr.dag_id
assert span.attributes["airflow.dag_run.run_id"] == dr.run_id
+ # run_type is set via the shared dagrun_trace_attributes helper
+ assert span.attributes["airflow.dag_run.run_type"] == str(dr.run_type)
expected_status = StatusCode.OK if final_state == DagRunState.SUCCESS
else StatusCode.ERROR
assert span.status.status_code == expected_status
+ def test_dagrun_trace_attributes_helper(self, dag_maker, session):
+ """The shared helper returns dag_id, run_id and run_type as airflow.*
attributes."""
+ from airflow.models.dagrun import dagrun_trace_attributes
+
+ with dag_maker("test_trace_attrs_helper", session=session):
+ EmptyOperator(task_id="t1")
+ dr = dag_maker.create_dagrun(state=DagRunState.RUNNING)
+
+ attrs = dagrun_trace_attributes(dr)
+ assert attrs["airflow.dag_id"] == dr.dag_id
+ assert attrs["airflow.dag_run.run_id"] == dr.run_id
+ assert attrs["airflow.dag_run.run_type"] == str(dr.run_type)
+
@pytest.mark.parametrize("carrier_value", [None, {}])
def test_emit_dagrun_span_with_none_or_empty_carrier(self, dag_maker,
session, carrier_value):
"""_emit_dagrun_span should emit a root span when context_carrier is
None or empty.
diff --git
a/shared/observability/src/airflow_shared/observability/traces/__init__.py
b/shared/observability/src/airflow_shared/observability/traces/__init__.py
index 6037162232e..ec2ee6e2a0c 100644
--- a/shared/observability/src/airflow_shared/observability/traces/__init__.py
+++ b/shared/observability/src/airflow_shared/observability/traces/__init__.py
@@ -61,7 +61,7 @@ TASK_SPAN_DETAIL_LEVEL_KEY = "airflow/task_span_detail_level"
DEFAULT_TASK_SPAN_DETAIL_LEVEL = 1
-def new_dagrun_trace_carrier(task_span_detail_level=None) -> dict[str, str]:
+def new_dagrun_trace_carrier(task_span_detail_level=None, attributes=None) ->
dict[str, str]:
"""
Generate a fresh W3C traceparent carrier without creating a recordable
span.
@@ -70,6 +70,11 @@ def new_dagrun_trace_carrier(task_span_detail_level=None) ->
dict[str, str]:
``OTEL_TRACES_SAMPLER_ARG``), rather than being hardcoded. This makes the
carrier the single head-sampling decision point for a DAG run: every
downstream span (dag_run, task_run, worker) rides on this flag.
+
+ ``attributes`` are forwarded to the sampler as ``should_sample``
attributes so
+ a custom sampler can differentiate the decision by run kind (e.g. by
+ ``airflow.dag_id`` / ``airflow.dag_run.run_type``). The built-in samplers
ignore them.
+ They are decision input only -- they are not persisted in the carrier.
"""
gen = RandomIdGenerator()
trace_id = gen.generate_trace_id()
@@ -81,6 +86,7 @@ def new_dagrun_trace_carrier(task_span_detail_level=None) ->
dict[str, str]:
parent_context=None, # root decision
trace_id=trace_id,
name="dag_run",
+ attributes=attributes or {},
)
sampled = result.decision == Decision.RECORD_AND_SAMPLE
sampler_trace_state = result.trace_state
diff --git a/shared/observability/tests/observability/test_traces.py
b/shared/observability/tests/observability/test_traces.py
index 088a54e6248..ba09ad59fe0 100644
--- a/shared/observability/tests/observability/test_traces.py
+++ b/shared/observability/tests/observability/test_traces.py
@@ -170,6 +170,46 @@ class TestNewDagrunTraceCarrierSampling:
assert get_task_span_detail_level(span) == 2
assert _carrier_is_sampled(carrier) is False
+ def test_attributes_forwarded_to_sampler(self, monkeypatch):
+ """The attributes arg is forwarded to should_sample so a custom
sampler can use it."""
+ captured = {}
+
+ class _RecordingSampler:
+ def should_sample(self, parent_context, trace_id, name,
attributes=None, **kwargs):
+ captured["attributes"] = attributes
+ return ALWAYS_ON.should_sample(parent_context, trace_id, name,
attributes=attributes)
+
+ class _Provider:
+ sampler = _RecordingSampler()
+
+ monkeypatch.setattr(
+ "airflow_shared.observability.traces.trace.get_tracer_provider",
+ lambda: _Provider(),
+ )
+ new_dagrun_trace_carrier(
+ attributes={"airflow.dag_id": "my_dag",
"airflow.dag_run.run_type": "manual"}
+ )
+ assert captured["attributes"] == {"airflow.dag_id": "my_dag",
"airflow.dag_run.run_type": "manual"}
+
+ def test_attributes_default_to_empty_dict(self, monkeypatch):
+ """When no attributes are passed, the sampler receives an empty dict,
not None."""
+ captured = {}
+
+ class _RecordingSampler:
+ def should_sample(self, parent_context, trace_id, name,
attributes=None, **kwargs):
+ captured["attributes"] = attributes
+ return ALWAYS_ON.should_sample(parent_context, trace_id, name,
attributes=attributes)
+
+ class _Provider:
+ sampler = _RecordingSampler()
+
+ monkeypatch.setattr(
+ "airflow_shared.observability.traces.trace.get_tracer_provider",
+ lambda: _Provider(),
+ )
+ new_dagrun_trace_carrier()
+ assert captured["attributes"] == {}
+
class TestGetTaskSpanDetailLevel:
def _make_span_with_trace_state(self, entries: list[tuple[str, str]]) ->
NonRecordingSpan: