This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 19d6c5a0007 Emit dagrun.first_task_start_delay separate from
scheduling_delay (#66807)
19d6c5a0007 is described below
commit 19d6c5a0007d6e8d53a4bf354718b754182a51e2
Author: Stefan Wang <[email protected]>
AuthorDate: Tue May 12 14:59:57 2026 -0700
Emit dagrun.first_task_start_delay separate from scheduling_delay (#66807)
* feat(metrics): emit dagrun.first_task_start_delay separate from
scheduling_delay
`dagrun.first_task_scheduling_delay` measures `data_interval_end ->
first_start_date`,
which conflates scheduler-enqueue latency with executor-pickup latency.
When a
Dag run's first task starts late, ops can't tell from this single timer
whether
the scheduler was slow to queue the task or the executor was slow to pick
it up.
Add `dagrun.first_task_start_delay`, computed as `first_start_date -
queued_at`,
tagged by `dag_id` and `run_type` to match the existing tag shape. It is
emitted
next to the existing scheduling-delay metric on Dag run completion, only
when
`queued_at` is set and the delta is positive.
Closes #66802
Signed-off-by: 1fanwang <[email protected]>
* Rename newsfragment to match PR number
Signed-off-by: 1fanwang <[email protected]>
---------
Signed-off-by: 1fanwang <[email protected]>
---
airflow-core/newsfragments/66807.feature.rst | 1 +
airflow-core/src/airflow/models/dagrun.py | 4 ++
airflow-core/tests/unit/models/test_dagrun.py | 80 ++++++++++++++++++++++
.../observability/metrics/metrics_template.yaml | 6 ++
4 files changed, 91 insertions(+)
diff --git a/airflow-core/newsfragments/66807.feature.rst
b/airflow-core/newsfragments/66807.feature.rst
new file mode 100644
index 00000000000..623aecef3d2
--- /dev/null
+++ b/airflow-core/newsfragments/66807.feature.rst
@@ -0,0 +1 @@
+Add ``dagrun.first_task_start_delay`` timer that measures the elapsed time
between a Dag run's ``queued_at`` and the start time of its first task. This
separates executor-pickup latency from the scheduler-enqueue latency captured
by ``dagrun.first_task_scheduling_delay``.
diff --git a/airflow-core/src/airflow/models/dagrun.py
b/airflow-core/src/airflow/models/dagrun.py
index 1851cdf4c47..ca2b68f34be 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -1594,6 +1594,10 @@ class DagRun(Base, LoggingMixin):
f"dagrun.{dag.dag_id}.first_task_scheduling_delay",
true_delay, tags=self.stats_tags
)
stats.timing("dagrun.first_task_scheduling_delay",
true_delay, tags=self.stats_tags)
+ if self.queued_at is not None:
+ start_delay = first_start_date - self.queued_at
+ if start_delay.total_seconds() > 0:
+ stats.timing("dagrun.first_task_start_delay",
start_delay, tags=self.stats_tags)
except Exception:
self.log.warning("Failed to record first_task_scheduling_delay
metric:", exc_info=True)
diff --git a/airflow-core/tests/unit/models/test_dagrun.py
b/airflow-core/tests/unit/models/test_dagrun.py
index a0636ed3236..a9f9dfb136c 100644
--- a/airflow-core/tests/unit/models/test_dagrun.py
+++ b/airflow-core/tests/unit/models/test_dagrun.py
@@ -1148,6 +1148,86 @@ class TestDagRun:
session.rollback()
session.close()
+ @pytest.mark.parametrize(
+ ("queued_at_offset", "expected"),
+ [
+ (datetime.timedelta(seconds=30), True),
+ (None, False),
+ ],
+ )
+ def test_emit_first_task_start_delay(self, session, queued_at_offset,
expected, testing_dag_bundle):
+ """
+ Tests that `dagrun.first_task_start_delay` measures `queued_at ->
first_start_date`
+ and is only emitted when `queued_at` is set and the delta is positive.
+ """
+ dag = DAG(
+ dag_id="test_emit_first_task_start_delay",
+ start_date=DEFAULT_DATE,
+ schedule="*/5 * * * *",
+ )
+ dag_task = EmptyOperator(task_id="dummy", dag=dag, owner="airflow")
+ expected_stat_tags = {"dag_id": dag.dag_id, "run_type":
DagRunType.SCHEDULED}
+ scheduler_dag = sync_dag_to_db(dag, session=session)
+ try:
+ info = scheduler_dag.next_dagrun_info(last_automated_run_info=None)
+ orm_dag_kwargs = {
+ "dag_id": dag.dag_id,
+ "bundle_name": "testing",
+ "has_task_concurrency_limits": False,
+ "is_stale": False,
+ }
+ if info is not None:
+ orm_dag_kwargs.update(
+ {
+ "next_dagrun": info.logical_date,
+ "next_dagrun_data_interval": info.data_interval,
+ "next_dagrun_create_after": info.run_after,
+ },
+ )
+ orm_dag = DagModel(**orm_dag_kwargs)
+ session.merge(orm_dag)
+ session.flush()
+
+ ti_start_date = dag.start_date + datetime.timedelta(minutes=1)
+ queued_at = ti_start_date - queued_at_offset if queued_at_offset
else None
+
+ dag_run = scheduler_dag.create_dagrun(
+ run_id=scheduler_dag.timetable.generate_run_id(
+ run_type=DagRunType.SCHEDULED,
+ run_after=dag.start_date,
+
data_interval=infer_automated_data_interval(scheduler_dag.timetable,
dag.start_date),
+ ),
+ run_type=DagRunType.SCHEDULED,
+ state=DagRunState.SUCCESS,
+ logical_date=dag.start_date,
+
data_interval=infer_automated_data_interval(scheduler_dag.timetable,
dag.start_date),
+ run_after=dag.start_date,
+ start_date=dag.start_date,
+ triggered_by=DagRunTriggeredByType.TEST,
+ session=session,
+ )
+ dag_run.queued_at = queued_at
+ ti = dag_run.get_task_instance(dag_task.task_id, session)
+ ti.set_state(TaskInstanceState.SUCCESS, session)
+ ti.start_date = ti_start_date
+ session.flush()
+
+ with
mock.patch("airflow._shared.observability.metrics.stats.timing") as stats_mock:
+ dag_run.update_state(session)
+
+ start_delay_call = call("dagrun.first_task_start_delay", mock.ANY,
tags=expected_stat_tags)
+ if expected:
+ expected_delta = ti_start_date - queued_at
+ assert (
+ call("dagrun.first_task_start_delay", expected_delta,
tags=expected_stat_tags)
+ in stats_mock.mock_calls
+ )
+ else:
+ assert start_delay_call not in stats_mock.mock_calls
+ finally:
+ session.rollback()
+ session.close()
+
def test_states_sets(self, dag_maker, session):
"""
Tests that adding State.failed_states and State.success_states work as
expected.
diff --git
a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml
b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml
index f7ddd2b1243..fa6111ef54a 100644
---
a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml
+++
b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml
@@ -617,6 +617,12 @@ metrics:
legacy_name: "dagrun.{dag_id}.first_task_scheduling_delay"
name_variables: ["dag_id"]
+ - name: "dagrun.first_task_start_delay"
+ description: "Milliseconds elapsed between dagrun queued_at and first task
start_date"
+ type: "timer"
+ legacy_name: "-"
+ name_variables: []
+
- name: "collect_db_dags"
description: "Milliseconds taken for fetching all Serialized Dags from DB"
type: "timer"