This is an automated email from the ASF dual-hosted git repository.

o-nikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f6bd8b48d29 Add team_name tag to scheduler metrics for multi-team 
deployments (#68594)
f6bd8b48d29 is described below

commit f6bd8b48d296f95d44f819714514cfdb0c967c44
Author: D. Ferruzzi <[email protected]>
AuthorDate: Tue Jun 16 11:31:42 2026 -0700

    Add team_name tag to scheduler metrics for multi-team deployments (#68594)
---
 .../src/airflow/jobs/scheduler_job_runner.py       |  63 +++++++++++--
 airflow-core/tests/unit/jobs/test_scheduler_job.py | 100 +++++++++++++++++++++
 2 files changed, 157 insertions(+), 6 deletions(-)

diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 813f782437a..ec5b3248178 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -117,6 +117,7 @@ from airflow.timetables.base import Timetable, 
compute_rollup_fingerprint
 from airflow.timetables.simple import AssetTriggeredTimetable
 from airflow.triggers.base import TriggerEvent
 from airflow.utils.event_scheduler import EventScheduler
+from airflow.utils.helpers import prune_dict
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.retries import MAX_DB_RETRIES, retry_db_transaction, 
run_with_db_retries
 from airflow.utils.session import NEW_SESSION, create_session, provide_session
@@ -1437,9 +1438,14 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             )
 
             if ti_queued and not ti_requeued:
+                team_name = (
+                    DagModel.get_team_name(ti.dag_id, session=session)
+                    if conf.getboolean("core", "multi_team")
+                    else None
+                )
                 stats.incr(
                     "scheduler.tasks.killed_externally",
-                    tags={"dag_id": ti.dag_id, "task_id": ti.task_id},
+                    tags=prune_dict({"dag_id": ti.dag_id, "task_id": 
ti.task_id, "team_name": team_name}),
                 )
                 msg = (
                     "Executor %s reported that the task instance %s finished 
with state %s, but the task instance's state attribute is %s. "  # noqa: 
RUF100, UP031, flynt
@@ -2671,7 +2677,16 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                 stats.timing(
                     "dagrun.schedule_delay",
                     schedule_delay,
-                    tags={"dag_id": dag.dag_id},
+                    tags=prune_dict(
+                        {
+                            "dag_id": dag.dag_id,
+                            "team_name": 
self._get_team_names_for_dag_ids([dag.dag_id], session).get(
+                                dag.dag_id
+                            )
+                            if self._multi_team
+                            else None,
+                        }
+                    ),
                 )
 
         # cache saves time during scheduling of many dag_runs for same dag
@@ -2822,7 +2837,16 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                 stats.timing(
                     "dagrun.duration.failed",
                     duration,
-                    tags={"dag_id": dag_run.dag_id},
+                    tags=prune_dict(
+                        {
+                            "dag_id": dag_run.dag_id,
+                            "team_name": 
self._get_team_names_for_dag_ids([dag_run.dag_id], session).get(
+                                dag_run.dag_id
+                            )
+                            if self._multi_team
+                            else None,
+                        }
+                    ),
                 )
             return callback_to_execute
 
@@ -3106,6 +3130,12 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         )
         all_states_metric = session.execute(stmt).all()
 
+        if self._multi_team:
+            unique_dag_ids = {row[1] for row in all_states_metric}
+            dag_id_to_team_name = 
self._get_team_names_for_dag_ids(unique_dag_ids, session)
+        else:
+            dag_id_to_team_name = {}
+
         for state in metric_states:
             if state not in self.previous_ti_metrics:
                 self.previous_ti_metrics[state] = {}
@@ -3120,7 +3150,14 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                 stats.gauge(
                     f"ti.{state}",
                     float(count),
-                    tags={"queue": queue, "dag_id": dag_id, "task_id": 
task_id},
+                    tags=prune_dict(
+                        {
+                            "queue": queue,
+                            "dag_id": dag_id,
+                            "task_id": task_id,
+                            "team_name": dag_id_to_team_name.get(dag_id),
+                        }
+                    ),
                 )
 
             for prev_key in self.previous_ti_metrics[state]:
@@ -3130,7 +3167,14 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                     stats.gauge(
                         f"ti.{state}",
                         0,
-                        tags={"queue": queue, "dag_id": dag_id, "task_id": 
task_id},
+                        tags=prune_dict(
+                            {
+                                "queue": queue,
+                                "dag_id": dag_id,
+                                "task_id": task_id,
+                                "team_name": dag_id_to_team_name.get(dag_id),
+                            }
+                        ),
                     )
 
             self.previous_ti_metrics[state] = ti_metrics
@@ -3508,7 +3552,14 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                 continue
             executor.change_state(ti.key, TaskInstanceState.FAILED, 
remove_running=True)
             stats.incr(
-                "task_instances_without_heartbeats_killed", tags={"dag_id": 
ti.dag_id, "task_id": ti.task_id}
+                "task_instances_without_heartbeats_killed",
+                tags=prune_dict(
+                    {
+                        "dag_id": ti.dag_id,
+                        "task_id": ti.task_id,
+                        "team_name": dag_id_to_team_name.get(ti.dag_id),
+                    }
+                ),
             )
 
     # [END find_and_purge_task_instances_without_heartbeats]
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 1a089780633..540bdc46054 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -2873,6 +2873,55 @@ class TestSchedulerJob:
         mock_stats.gauge.assert_any_call("pool.queued_slots", mock.ANY, 
tags=expected_tags)
         mock_stats.gauge.assert_any_call("pool.running_slots", mock.ANY, 
tags=expected_tags)
 
+    @pytest.mark.parametrize(
+        ("multi_team", "expected_tags"),
+        [
+            pytest.param(
+                "true",
+                {"queue": "default", "dag_id": "ti_gauge_dag", "task_id": 
"task1", "team_name": "ti_team"},
+                id="with_team",
+            ),
+            pytest.param(
+                "false",
+                {"queue": "default", "dag_id": "ti_gauge_dag", "task_id": 
"task1"},
+                id="without_team",
+            ),
+        ],
+    )
+    @mock.patch("airflow._shared.observability.metrics.stats._get_backend")
+    def test_emit_ti_metrics_team_name(self, mock_get_backend, multi_team, 
expected_tags, dag_maker, session):
+        """TI gauge metrics include team_name only when multi_team is 
enabled."""
+        mock_stats = mock.MagicMock(spec=StatsLogger)
+        mock_get_backend.return_value = mock_stats
+
+        clear_db_teams()
+
+        team = Team(name="ti_team")
+        session.add(team)
+        session.flush()
+
+        clear_db_dag_bundles()
+
+        bundle = DagBundleModel(name="ti_bundle")
+        bundle.teams.append(team)
+        session.add(bundle)
+        session.flush()
+
+        with dag_maker(dag_id="ti_gauge_dag", bundle_name="ti_bundle", 
session=session):
+            EmptyOperator(task_id="task1")
+
+        dr = dag_maker.create_dagrun()
+        ti = dr.get_task_instances(session=session)[0]
+        ti.state = State.RUNNING
+        session.flush()
+
+        with conf_vars({("core", "multi_team"): multi_team}):
+            scheduler_job = Job()
+            self.job_runner = SchedulerJobRunner(job=scheduler_job)
+            self.job_runner._emit_ti_metrics(session=session)
+
+        mock_stats.gauge.assert_any_call("ti.running", mock.ANY, 
tags=expected_tags)
+
     def test_enqueue_task_instances_with_queued_state(self, dag_maker, 
session):
         dag_id = 
"SchedulerJobTest.test_enqueue_task_instances_with_queued_state"
         task_id_1 = "dummy"
@@ -3537,6 +3586,57 @@ class TestSchedulerJob:
         assert any("Backfilled dag_version_id" in rec.message for rec in 
caplog.records)
         mock_executor.send_callback.assert_called_once()
 
+    @pytest.mark.parametrize(
+        ("multi_team", "expected_tags"),
+        [
+            pytest.param(
+                "true",
+                {"dag_id": "heartbeat_dag", "task_id": "task", "team_name": 
"hb_team"},
+                id="with_team",
+            ),
+            pytest.param(
+                "false",
+                {"dag_id": "heartbeat_dag", "task_id": "task"},
+                id="without_team",
+            ),
+        ],
+    )
+    @mock.patch("airflow._shared.observability.metrics.stats.incr")
+    def test_purge_heartbeat_killed_metric_team_name(
+        self, mock_incr, multi_team, expected_tags, dag_maker, session
+    ):
+        clear_db_teams()
+        team = Team(name="hb_team")
+        session.add(team)
+        session.flush()
+
+        clear_db_dag_bundles()
+        bundle = DagBundleModel(name="hb_bundle")
+        bundle.teams.append(team)
+        session.add(bundle)
+        session.flush()
+
+        with dag_maker("heartbeat_dag", bundle_name="hb_bundle", 
session=session):
+            EmptyOperator(task_id="task")
+
+        dag_run = dag_maker.create_dagrun(run_id="test_run", 
state=DagRunState.RUNNING)
+
+        mock_executor = MagicMock()
+        scheduler_job = Job()
+
+        ti = dag_run.get_task_instance(task_id="task", session=session)
+        ti.state = TaskInstanceState.RUNNING
+        ti.queued_by_job_id = scheduler_job.id
+        ti.last_heartbeat_at = timezone.utcnow() - timedelta(hours=1)
+        session.merge(ti)
+        session.commit()
+
+        with conf_vars({("core", "multi_team"): multi_team}):
+            self.job_runner = SchedulerJobRunner(scheduler_job, 
executors=[mock_executor])
+            self.job_runner._purge_task_instances_without_heartbeats([ti], 
session=session)
+
+        mock_incr.assert_any_call("task_instances_without_heartbeats_killed", 
tags=expected_tags)
+
     @staticmethod
     def mock_failure_callback(context):
         pass

Reply via email to