This is an automated email from the ASF dual-hosted git repository.

o-nikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 19163e949a2 Add team_name tag to executor metrics for multi-team 
deployments (#68593)
19163e949a2 is described below

commit 19163e949a2238aa9a4e0ff789293c02097bd5c1
Author: D. Ferruzzi <[email protected]>
AuthorDate: Mon Jun 15 17:37:22 2026 -0700

    Add team_name tag to executor metrics for multi-team deployments (#68593)
---
 .../src/airflow/executors/base_executor.py         |  7 ++--
 .../tests/unit/executors/test_base_executor.py     | 42 +++++++++++-----------
 2 files changed, 26 insertions(+), 23 deletions(-)

diff --git a/airflow-core/src/airflow/executors/base_executor.py 
b/airflow-core/src/airflow/executors/base_executor.py
index 31c307b163f..ef577233110 100644
--- a/airflow-core/src/airflow/executors/base_executor.py
+++ b/airflow-core/src/airflow/executors/base_executor.py
@@ -40,6 +40,7 @@ from airflow.executors.workloads.types import 
state_class_for_key
 from airflow.models import Log
 from airflow.models.taskinstancekey import TaskInstanceKey
 from airflow.observability.metrics import stats_utils
+from airflow.utils.helpers import prune_dict
 from airflow.utils.log.logging_mixin import LoggingMixin
 
 PARALLELISM: int = conf.getint("core", "PARALLELISM")
@@ -411,17 +412,17 @@ class BaseExecutor(LoggingMixin):
         stats.gauge(
             open_slots_metric_name,
             value=open_slots,
-            tags={"status": "open", "executor_class_name": name},
+            tags=prune_dict({"status": "open", "executor_class_name": name, 
"team_name": self.team_name}),
         )
         stats.gauge(
             queued_tasks_metric_name,
             value=num_queued_tasks,
-            tags={"status": "queued", "executor_class_name": name},
+            tags=prune_dict({"status": "queued", "executor_class_name": name, 
"team_name": self.team_name}),
         )
         stats.gauge(
             running_tasks_metric_name,
             value=num_running_tasks,
-            tags={"status": "running", "executor_class_name": name},
+            tags=prune_dict({"status": "running", "executor_class_name": name, 
"team_name": self.team_name}),
         )
 
     def order_queued_tasks_by_priority(self) -> list[tuple[TaskInstanceKey, 
workloads.ExecuteTask]]:
diff --git a/airflow-core/tests/unit/executors/test_base_executor.py 
b/airflow-core/tests/unit/executors/test_base_executor.py
index f74d1f1e084..cb646f7ce8d 100644
--- a/airflow-core/tests/unit/executors/test_base_executor.py
+++ b/airflow-core/tests/unit/executors/test_base_executor.py
@@ -174,30 +174,32 @@ def test_fail_and_success():
     assert len(executor.get_event_buffer()) == 3
 
 
[email protected](
+    ("team_name", "expected_tags"),
+    [
+        pytest.param(None, {"status": "open", "executor_class_name": 
"BaseExecutor"}, id="without_team"),
+        pytest.param(
+            "team_a",
+            {"status": "open", "executor_class_name": "BaseExecutor", 
"team_name": "team_a"},
+            id="with_team",
+        ),
+    ],
+)
 @mock.patch("airflow.executors.base_executor.BaseExecutor.sync")
 @mock.patch("airflow.executors.base_executor.BaseExecutor.trigger_tasks")
 @mock.patch("airflow.executors.base_executor.stats.gauge")
-def test_gauge_executor_metrics_single_executor(mock_stats_gauge, 
mock_trigger_tasks, mock_sync):
-    executor = BaseExecutor()
+def test_gauge_executor_metrics_single_executor(
+    mock_stats_gauge, mock_trigger_tasks, mock_sync, team_name, expected_tags
+):
+    executor = BaseExecutor(team_name=team_name)
     executor.heartbeat()
-    calls = [
-        mock.call(
-            "executor.open_slots",
-            value=mock.ANY,
-            tags={"status": "open", "executor_class_name": "BaseExecutor"},
-        ),
-        mock.call(
-            "executor.queued_tasks",
-            value=mock.ANY,
-            tags={"status": "queued", "executor_class_name": "BaseExecutor"},
-        ),
-        mock.call(
-            "executor.running_tasks",
-            value=mock.ANY,
-            tags={"status": "running", "executor_class_name": "BaseExecutor"},
-        ),
-    ]
-    mock_stats_gauge.assert_has_calls(calls)
+    # Verify all three gauges use the expected tag structure
+    for metric, status in [
+        ("executor.open_slots", "open"),
+        ("executor.queued_tasks", "queued"),
+        ("executor.running_tasks", "running"),
+    ]:
+        mock_stats_gauge.assert_any_call(metric, value=mock.ANY, 
tags={**expected_tags, "status": status})
 
 
 @pytest.mark.parametrize(

Reply via email to