This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4af2c27656f Refactor DagRun tracing into `_trace_dagrun` helper method 
(#44008)
4af2c27656f is described below

commit 4af2c27656fd53e095385451d77800c83857697e
Author: Kaxil Naik <[email protected]>
AuthorDate: Thu Nov 14 03:14:48 2024 +0000

    Refactor DagRun tracing into `_trace_dagrun` helper method (#44008)
    
    related: https://github.com/apache/airflow/issues/43789
    
    Changes:
    This commit
    - extracts the tracing logic from the `update_state` method in `DagRun` to 
a new helper method `_trace_dagrun`.
    - preserves types in some cases like: int and bool. Otel Attributes can be 
str, bool, int, float, Sequence[str], Sequence[bool], Sequence[int], 
Sequence[float]
---
 airflow/jobs/scheduler_job_runner.py |  1 -
 airflow/models/dagrun.py             | 59 ++++++++++++++++++------------------
 2 files changed, 30 insertions(+), 30 deletions(-)

diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index 65ef7c828cc..7a82fb2c9b2 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -859,7 +859,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
 
     @classmethod
     def _set_span_attrs__process_executor_events(cls, span, state, ti):
-        # Use span.set_attributes
         span.set_attributes(
             {
                 "category": "scheduler",
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 635cd73ccd8..ccc4832f5fb 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -1020,35 +1020,7 @@ class DagRun(Base, LoggingMixin):
                 dagv.version if dagv else None,
             )
 
-            with Trace.start_span_from_dagrun(dagrun=self) as span:
-                if self._state is DagRunState.FAILED:
-                    span.set_attribute("error", True)
-                attributes = {
-                    "category": "DAG runs",
-                    "dag_id": str(self.dag_id),
-                    "execution_date": str(self.execution_date),
-                    "run_id": str(self.run_id),
-                    "queued_at": str(self.queued_at),
-                    "run_start_date": str(self.start_date),
-                    "run_end_date": str(self.end_date),
-                    "run_duration": str(
-                        (self.end_date - self.start_date).total_seconds()
-                        if self.start_date and self.end_date
-                        else 0
-                    ),
-                    "state": str(self._state),
-                    "external_trigger": str(self.external_trigger),
-                    "run_type": str(self.run_type),
-                    "data_interval_start": str(self.data_interval_start),
-                    "data_interval_end": str(self.data_interval_end),
-                    "dag_version": str(dagv.version if dagv else None),
-                    "conf": str(self.conf),
-                }
-                if span.is_recording():
-                    span.add_event(name="queued", 
timestamp=datetime_to_nano(self.queued_at))
-                    span.add_event(name="started", 
timestamp=datetime_to_nano(self.start_date))
-                    span.add_event(name="ended", 
timestamp=datetime_to_nano(self.end_date))
-                span.set_attributes(attributes)
+            self._trace_dagrun(dagv)
 
             session.flush()
 
@@ -1060,6 +1032,35 @@ class DagRun(Base, LoggingMixin):
 
         return schedulable_tis, callback
 
+    def _trace_dagrun(self, dagv) -> None:
+        with Trace.start_span_from_dagrun(dagrun=self) as span:
+            if self._state == DagRunState.FAILED:
+                span.set_attribute("error", True)
+            attributes = {
+                "category": "DAG runs",
+                "dag_id": self.dag_id,
+                "execution_date": str(self.execution_date),
+                "run_id": self.run_id,
+                "queued_at": str(self.queued_at),
+                "run_start_date": str(self.start_date),
+                "run_end_date": str(self.end_date),
+                "run_duration": (self.end_date - 
self.start_date).total_seconds()
+                if self.start_date and self.end_date
+                else 0,
+                "state": str(self._state),
+                "external_trigger": self.external_trigger,
+                "run_type": str(self.run_type),
+                "data_interval_start": str(self.data_interval_start),
+                "data_interval_end": str(self.data_interval_end),
+                "dag_version": str(dagv.version if dagv else None),
+                "conf": str(self.conf),
+            }
+            if span.is_recording():
+                span.add_event(name="queued", 
timestamp=datetime_to_nano(self.queued_at))
+                span.add_event(name="started", 
timestamp=datetime_to_nano(self.start_date))
+                span.add_event(name="ended", 
timestamp=datetime_to_nano(self.end_date))
+            span.set_attributes(attributes)
+
     @provide_session
     def task_instance_scheduling_decisions(self, session: Session = 
NEW_SESSION) -> TISchedulingDecision:
         tis = self.get_task_instances(session=session, state=State.task_states)

Reply via email to