This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new aee4240161d Forward run-identity attributes to the trace sampler 
(#68592)
aee4240161d is described below

commit aee4240161df4e1b6bf04e35a2dd8c468c0c344c
Author: Daniel Standish <[email protected]>
AuthorDate: Tue Jun 23 13:24:10 2026 -0700

    Forward run-identity attributes to the trace sampler (#68592)
    
    Builds on the head-sampling foundation (#68591) by forwarding run-identity
    attributes (dag_id, run_type, run_id) to the trace sampler, so a custom 
sampler can
    differentiate the head-sampling decision by run kind.
---
 airflow-core/src/airflow/models/dagrun.py          | 28 +++++++++++++--
 airflow-core/src/airflow/models/taskinstance.py    |  5 +--
 airflow-core/tests/unit/models/test_dagrun.py      | 15 ++++++++
 .../observability/traces/__init__.py               |  8 ++++-
 .../tests/observability/test_traces.py             | 40 ++++++++++++++++++++++
 5 files changed, 90 insertions(+), 6 deletions(-)

diff --git a/airflow-core/src/airflow/models/dagrun.py 
b/airflow-core/src/airflow/models/dagrun.py
index cb347e17f6d..f5ae6a28d58 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -154,6 +154,28 @@ def _creator_note(val):
     return DagRunNote(*val)
 
 
+def dagrun_trace_attributes(dr) -> dict[str, str]:
+    """
+    Run-identity attributes for a DAG run's trace.
+
+    Defined once and used in two places: forwarded to the sampler at carrier
+    creation (so a custom sampler can differentiate the head-sampling decision 
by
+    run kind) and set on the emitted ``dag_run`` span. ``getattr`` guards 
because
+    the values may not be populated yet at ``__init__``/clear time.
+    """
+    attributes: dict[str, str] = {}
+    dag_id = getattr(dr, "dag_id", None)
+    if dag_id is not None:
+        attributes["airflow.dag_id"] = str(dag_id)
+    run_id = getattr(dr, "run_id", None)
+    if run_id is not None:
+        attributes["airflow.dag_run.run_id"] = str(run_id)
+    run_type = getattr(dr, "run_type", None)
+    if run_type is not None:
+        attributes["airflow.dag_run.run_type"] = str(run_type)
+    return attributes
+
+
 class DagRun(Base, LoggingMixin):
     """
     Invocation instance of a DAG.
@@ -385,7 +407,8 @@ class DagRun(Base, LoggingMixin):
         self.triggering_user_name = triggering_user_name
         self.scheduled_by_job_id = None
         self.context_carrier: dict[str, str] = new_dagrun_trace_carrier(
-            task_span_detail_level=self.conf.get(TASK_SPAN_DETAIL_LEVEL_KEY, 
None)
+            task_span_detail_level=self.conf.get(TASK_SPAN_DETAIL_LEVEL_KEY, 
None),
+            attributes=dagrun_trace_attributes(self),  # these are for 
potential use by head sampler
         )
 
         if not isinstance(partition_key, str | None):
@@ -1087,8 +1110,7 @@ class DagRun(Base, LoggingMixin):
 
         with override_ids(span_context.trace_id, span_context.span_id):
             attributes: dict[str, str] = {
-                "airflow.dag_id": str(self.dag_id),
-                "airflow.dag_run.run_id": self.run_id,
+                **dagrun_trace_attributes(self),
             }
             if self.start_date:
                 attributes["airflow.dag_run.start_date"] = str(self.start_date)
diff --git a/airflow-core/src/airflow/models/taskinstance.py 
b/airflow-core/src/airflow/models/taskinstance.py
index b39c9f3b322..f905e4c8e3d 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -407,7 +407,7 @@ def clear_task_instances(
             session.merge(ti)
 
     if dag_run_state is not False and tis:
-        from airflow.models.dagrun import DagRun  # Avoid circular import
+        from airflow.models.dagrun import DagRun, dagrun_trace_attributes  # 
Avoid circular import
 
         run_ids_by_dag_id = defaultdict(set)
         for instance in tis:
@@ -429,7 +429,8 @@ def clear_task_instances(
             dr.clear_number += 1
             dr.queued_at = timezone.utcnow()
             dr.context_carrier = new_dagrun_trace_carrier(
-                task_span_detail_level=dr.conf.get(TASK_SPAN_DETAIL_LEVEL_KEY) 
if dr.conf else None
+                task_span_detail_level=dr.conf.get(TASK_SPAN_DETAIL_LEVEL_KEY) 
if dr.conf else None,
+                attributes=dagrun_trace_attributes(dr),
             )
 
             _recalculate_dagrun_queued_at_deadlines(dr, dr.queued_at, session)
diff --git a/airflow-core/tests/unit/models/test_dagrun.py 
b/airflow-core/tests/unit/models/test_dagrun.py
index 290e616dd99..a2e852f5834 100644
--- a/airflow-core/tests/unit/models/test_dagrun.py
+++ b/airflow-core/tests/unit/models/test_dagrun.py
@@ -4307,10 +4307,25 @@ class TestDagRunTracing:
         assert span.name == f"dag_run.{dr.dag_id}"
         assert span.attributes["airflow.dag_id"] == dr.dag_id
         assert span.attributes["airflow.dag_run.run_id"] == dr.run_id
+        # run_type is set via the shared dagrun_trace_attributes helper
+        assert span.attributes["airflow.dag_run.run_type"] == str(dr.run_type)
 
         expected_status = StatusCode.OK if final_state == DagRunState.SUCCESS 
else StatusCode.ERROR
         assert span.status.status_code == expected_status
 
+    def test_dagrun_trace_attributes_helper(self, dag_maker, session):
+        """The shared helper returns dag_id, run_id and run_type as airflow.* 
attributes."""
+        from airflow.models.dagrun import dagrun_trace_attributes
+
+        with dag_maker("test_trace_attrs_helper", session=session):
+            EmptyOperator(task_id="t1")
+        dr = dag_maker.create_dagrun(state=DagRunState.RUNNING)
+
+        attrs = dagrun_trace_attributes(dr)
+        assert attrs["airflow.dag_id"] == dr.dag_id
+        assert attrs["airflow.dag_run.run_id"] == dr.run_id
+        assert attrs["airflow.dag_run.run_type"] == str(dr.run_type)
+
     @pytest.mark.parametrize("carrier_value", [None, {}])
     def test_emit_dagrun_span_with_none_or_empty_carrier(self, dag_maker, 
session, carrier_value):
         """_emit_dagrun_span should emit a root span when context_carrier is 
None or empty.
diff --git 
a/shared/observability/src/airflow_shared/observability/traces/__init__.py 
b/shared/observability/src/airflow_shared/observability/traces/__init__.py
index 6037162232e..ec2ee6e2a0c 100644
--- a/shared/observability/src/airflow_shared/observability/traces/__init__.py
+++ b/shared/observability/src/airflow_shared/observability/traces/__init__.py
@@ -61,7 +61,7 @@ TASK_SPAN_DETAIL_LEVEL_KEY = "airflow/task_span_detail_level"
 DEFAULT_TASK_SPAN_DETAIL_LEVEL = 1
 
 
-def new_dagrun_trace_carrier(task_span_detail_level=None) -> dict[str, str]:
+def new_dagrun_trace_carrier(task_span_detail_level=None, attributes=None) -> 
dict[str, str]:
     """
     Generate a fresh W3C traceparent carrier without creating a recordable 
span.
 
@@ -70,6 +70,11 @@ def new_dagrun_trace_carrier(task_span_detail_level=None) -> 
dict[str, str]:
     ``OTEL_TRACES_SAMPLER_ARG``), rather than being hardcoded. This makes the
     carrier the single head-sampling decision point for a DAG run: every
     downstream span (dag_run, task_run, worker) rides on this flag.
+
+    ``attributes`` are forwarded to the sampler as ``should_sample`` 
attributes so
+    a custom sampler can differentiate the decision by run kind (e.g. by
+    ``airflow.dag_id`` / ``airflow.dag_run.run_type``). The built-in samplers 
ignore them.
+    They are decision input only -- they are not persisted in the carrier.
     """
     gen = RandomIdGenerator()
     trace_id = gen.generate_trace_id()
@@ -81,6 +86,7 @@ def new_dagrun_trace_carrier(task_span_detail_level=None) -> 
dict[str, str]:
             parent_context=None,  # root decision
             trace_id=trace_id,
             name="dag_run",
+            attributes=attributes or {},
         )
         sampled = result.decision == Decision.RECORD_AND_SAMPLE
         sampler_trace_state = result.trace_state
diff --git a/shared/observability/tests/observability/test_traces.py 
b/shared/observability/tests/observability/test_traces.py
index 088a54e6248..ba09ad59fe0 100644
--- a/shared/observability/tests/observability/test_traces.py
+++ b/shared/observability/tests/observability/test_traces.py
@@ -170,6 +170,46 @@ class TestNewDagrunTraceCarrierSampling:
         assert get_task_span_detail_level(span) == 2
         assert _carrier_is_sampled(carrier) is False
 
+    def test_attributes_forwarded_to_sampler(self, monkeypatch):
+        """The attributes arg is forwarded to should_sample so a custom 
sampler can use it."""
+        captured = {}
+
+        class _RecordingSampler:
+            def should_sample(self, parent_context, trace_id, name, 
attributes=None, **kwargs):
+                captured["attributes"] = attributes
+                return ALWAYS_ON.should_sample(parent_context, trace_id, name, 
attributes=attributes)
+
+        class _Provider:
+            sampler = _RecordingSampler()
+
+        monkeypatch.setattr(
+            "airflow_shared.observability.traces.trace.get_tracer_provider",
+            lambda: _Provider(),
+        )
+        new_dagrun_trace_carrier(
+            attributes={"airflow.dag_id": "my_dag", 
"airflow.dag_run.run_type": "manual"}
+        )
+        assert captured["attributes"] == {"airflow.dag_id": "my_dag", 
"airflow.dag_run.run_type": "manual"}
+
+    def test_attributes_default_to_empty_dict(self, monkeypatch):
+        """When no attributes are passed, the sampler receives an empty dict, 
not None."""
+        captured = {}
+
+        class _RecordingSampler:
+            def should_sample(self, parent_context, trace_id, name, 
attributes=None, **kwargs):
+                captured["attributes"] = attributes
+                return ALWAYS_ON.should_sample(parent_context, trace_id, name, 
attributes=attributes)
+
+        class _Provider:
+            sampler = _RecordingSampler()
+
+        monkeypatch.setattr(
+            "airflow_shared.observability.traces.trace.get_tracer_provider",
+            lambda: _Provider(),
+        )
+        new_dagrun_trace_carrier()
+        assert captured["attributes"] == {}
+
 
 class TestGetTaskSpanDetailLevel:
     def _make_span_with_trace_state(self, entries: list[tuple[str, str]]) -> 
NonRecordingSpan:

Reply via email to