This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2956c98ecb6 feat(metrics): wrap executor.heartbeat() in a timer to
localize loop slowdowns (#66808)
2956c98ecb6 is described below
commit 2956c98ecb6220ec7aad473af84ff724dd934bf9
Author: Stefan Wang <[email protected]>
AuthorDate: Tue May 12 14:58:53 2026 -0700
feat(metrics): wrap executor.heartbeat() in a timer to localize loop
slowdowns (#66808)
Emit scheduler.executor_heartbeat_duration as a per-executor timer so
operators can see whether executor.heartbeat() is the bottleneck of a
slow scheduler loop, instead of inferring from the aggregate
scheduler.scheduler_loop_duration.
Tagged by type(executor).__name__ so multi-executor deployments
attribute the cost to each configured executor separately.
Closes #66803
Signed-off-by: 1fanwang <[email protected]>
---
airflow-core/src/airflow/jobs/scheduler_job_runner.py | 6 +++++-
airflow-core/tests/unit/jobs/test_scheduler_job.py | 16 ++++++++++++++++
.../observability/metrics/metrics_template.yaml | 7 +++++++
3 files changed, 28 insertions(+), 1 deletion(-)
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 3eed95a8bb0..1a3f55b7f6f 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -1665,7 +1665,11 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
# either a no-op, or they will check-in on currently running
tasks and send out new
# events to be processed below.
for executor in self.executors:
- executor.heartbeat()
+ with stats.timer(
+ "scheduler.executor_heartbeat_duration",
+ tags={"executor": type(executor).__name__},
+ ):
+ executor.heartbeat()
with create_session() as session:
num_finished_events = 0
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 56e69459104..eb763635ddf 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -1181,6 +1181,22 @@ class TestSchedulerJob:
for executor in self.job_runner.executors:
executor.heartbeat.assert_called_once()
+ def test_executor_heartbeat_emits_timer(self, mock_executors,
configure_testing_dag_bundle):
+ with configure_testing_dag_bundle(os.devnull):
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job, num_runs=1)
+ with patch("airflow.jobs.scheduler_job_runner.stats.timer") as
mock_timer:
+ self.job_runner._execute()
+
+ heartbeat_calls = [
+ timer_call
+ for timer_call in mock_timer.call_args_list
+ if timer_call.args and timer_call.args[0] ==
"scheduler.executor_heartbeat_duration"
+ ]
+ assert len(heartbeat_calls) == len(self.job_runner.executors)
+ for executor, timer_call in zip(self.job_runner.executors,
heartbeat_calls):
+ assert timer_call.kwargs.get("tags") == {"executor":
type(executor).__name__}
+
def test_executor_events_processed(self, mock_executors,
configure_testing_dag_bundle):
with configure_testing_dag_bundle(os.devnull):
scheduler_job = Job()
diff --git
a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml
b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml
index f4c48834927..f7ddd2b1243 100644
---
a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml
+++
b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml
@@ -604,6 +604,13 @@ metrics:
legacy_name: "-"
name_variables: []
+ - name: "scheduler.executor_heartbeat_duration"
+ description: "Milliseconds spent in ``executor.heartbeat()`` per scheduler
loop iteration, tagged
+ by executor class name so each configured executor is reported
separately."
+ type: "timer"
+ legacy_name: "-"
+ name_variables: []
+
- name: "dagrun.first_task_scheduling_delay"
description: "Milliseconds elapsed between first task start_date and
dagrun expected start"
type: "timer"