This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4af2c27656f Refactor DagRun tracing into `_trace_dagrun` helper method
(#44008)
4af2c27656f is described below
commit 4af2c27656fd53e095385451d77800c83857697e
Author: Kaxil Naik <[email protected]>
AuthorDate: Thu Nov 14 03:14:48 2024 +0000
Refactor DagRun tracing into `_trace_dagrun` helper method (#44008)
related: https://github.com/apache/airflow/issues/43789
Changes:
This commit
- extracts the tracing logic from the `update_state` method in `DagRun` to
a new helper method `_trace_dagrun`.
- preserves types in some cases like: int and bool. Otel Attributes can be
str, bool, int, float, Sequence[str], Sequence[bool], Sequence[int],
Sequence[float]
---
airflow/jobs/scheduler_job_runner.py | 1 -
airflow/models/dagrun.py | 59 ++++++++++++++++++------------------
2 files changed, 30 insertions(+), 30 deletions(-)
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index 65ef7c828cc..7a82fb2c9b2 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -859,7 +859,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
@classmethod
def _set_span_attrs__process_executor_events(cls, span, state, ti):
- # Use span.set_attributes
span.set_attributes(
{
"category": "scheduler",
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 635cd73ccd8..ccc4832f5fb 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -1020,35 +1020,7 @@ class DagRun(Base, LoggingMixin):
dagv.version if dagv else None,
)
- with Trace.start_span_from_dagrun(dagrun=self) as span:
- if self._state is DagRunState.FAILED:
- span.set_attribute("error", True)
- attributes = {
- "category": "DAG runs",
- "dag_id": str(self.dag_id),
- "execution_date": str(self.execution_date),
- "run_id": str(self.run_id),
- "queued_at": str(self.queued_at),
- "run_start_date": str(self.start_date),
- "run_end_date": str(self.end_date),
- "run_duration": str(
- (self.end_date - self.start_date).total_seconds()
- if self.start_date and self.end_date
- else 0
- ),
- "state": str(self._state),
- "external_trigger": str(self.external_trigger),
- "run_type": str(self.run_type),
- "data_interval_start": str(self.data_interval_start),
- "data_interval_end": str(self.data_interval_end),
- "dag_version": str(dagv.version if dagv else None),
- "conf": str(self.conf),
- }
- if span.is_recording():
- span.add_event(name="queued",
timestamp=datetime_to_nano(self.queued_at))
- span.add_event(name="started",
timestamp=datetime_to_nano(self.start_date))
- span.add_event(name="ended",
timestamp=datetime_to_nano(self.end_date))
- span.set_attributes(attributes)
+ self._trace_dagrun(dagv)
session.flush()
@@ -1060,6 +1032,35 @@ class DagRun(Base, LoggingMixin):
return schedulable_tis, callback
+ def _trace_dagrun(self, dagv) -> None:
+ with Trace.start_span_from_dagrun(dagrun=self) as span:
+ if self._state == DagRunState.FAILED:
+ span.set_attribute("error", True)
+ attributes = {
+ "category": "DAG runs",
+ "dag_id": self.dag_id,
+ "execution_date": str(self.execution_date),
+ "run_id": self.run_id,
+ "queued_at": str(self.queued_at),
+ "run_start_date": str(self.start_date),
+ "run_end_date": str(self.end_date),
+ "run_duration": (self.end_date -
self.start_date).total_seconds()
+ if self.start_date and self.end_date
+ else 0,
+ "state": str(self._state),
+ "external_trigger": self.external_trigger,
+ "run_type": str(self.run_type),
+ "data_interval_start": str(self.data_interval_start),
+ "data_interval_end": str(self.data_interval_end),
+ "dag_version": str(dagv.version if dagv else None),
+ "conf": str(self.conf),
+ }
+ if span.is_recording():
+ span.add_event(name="queued",
timestamp=datetime_to_nano(self.queued_at))
+ span.add_event(name="started",
timestamp=datetime_to_nano(self.start_date))
+ span.add_event(name="ended",
timestamp=datetime_to_nano(self.end_date))
+ span.set_attributes(attributes)
+
@provide_session
def task_instance_scheduling_decisions(self, session: Session =
NEW_SESSION) -> TISchedulingDecision:
tis = self.get_task_instances(session=session, state=State.task_states)