This is an automated email from the ASF dual-hosted git repository.
o-nikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 19163e949a2 Add team_name tag to executor metrics for multi-team
deployments (#68593)
19163e949a2 is described below
commit 19163e949a2238aa9a4e0ff789293c02097bd5c1
Author: D. Ferruzzi <[email protected]>
AuthorDate: Mon Jun 15 17:37:22 2026 -0700
Add team_name tag to executor metrics for multi-team deployments (#68593)
---
.../src/airflow/executors/base_executor.py | 7 ++--
.../tests/unit/executors/test_base_executor.py | 42 +++++++++++-----------
2 files changed, 26 insertions(+), 23 deletions(-)
diff --git a/airflow-core/src/airflow/executors/base_executor.py
b/airflow-core/src/airflow/executors/base_executor.py
index 31c307b163f..ef577233110 100644
--- a/airflow-core/src/airflow/executors/base_executor.py
+++ b/airflow-core/src/airflow/executors/base_executor.py
@@ -40,6 +40,7 @@ from airflow.executors.workloads.types import
state_class_for_key
from airflow.models import Log
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.observability.metrics import stats_utils
+from airflow.utils.helpers import prune_dict
from airflow.utils.log.logging_mixin import LoggingMixin
PARALLELISM: int = conf.getint("core", "PARALLELISM")
@@ -411,17 +412,17 @@ class BaseExecutor(LoggingMixin):
stats.gauge(
open_slots_metric_name,
value=open_slots,
- tags={"status": "open", "executor_class_name": name},
+ tags=prune_dict({"status": "open", "executor_class_name": name,
"team_name": self.team_name}),
)
stats.gauge(
queued_tasks_metric_name,
value=num_queued_tasks,
- tags={"status": "queued", "executor_class_name": name},
+ tags=prune_dict({"status": "queued", "executor_class_name": name,
"team_name": self.team_name}),
)
stats.gauge(
running_tasks_metric_name,
value=num_running_tasks,
- tags={"status": "running", "executor_class_name": name},
+ tags=prune_dict({"status": "running", "executor_class_name": name,
"team_name": self.team_name}),
)
def order_queued_tasks_by_priority(self) -> list[tuple[TaskInstanceKey,
workloads.ExecuteTask]]:
diff --git a/airflow-core/tests/unit/executors/test_base_executor.py
b/airflow-core/tests/unit/executors/test_base_executor.py
index f74d1f1e084..cb646f7ce8d 100644
--- a/airflow-core/tests/unit/executors/test_base_executor.py
+++ b/airflow-core/tests/unit/executors/test_base_executor.py
@@ -174,30 +174,32 @@ def test_fail_and_success():
assert len(executor.get_event_buffer()) == 3
[email protected](
+ ("team_name", "expected_tags"),
+ [
+ pytest.param(None, {"status": "open", "executor_class_name":
"BaseExecutor"}, id="without_team"),
+ pytest.param(
+ "team_a",
+ {"status": "open", "executor_class_name": "BaseExecutor",
"team_name": "team_a"},
+ id="with_team",
+ ),
+ ],
+)
@mock.patch("airflow.executors.base_executor.BaseExecutor.sync")
@mock.patch("airflow.executors.base_executor.BaseExecutor.trigger_tasks")
@mock.patch("airflow.executors.base_executor.stats.gauge")
-def test_gauge_executor_metrics_single_executor(mock_stats_gauge,
mock_trigger_tasks, mock_sync):
- executor = BaseExecutor()
+def test_gauge_executor_metrics_single_executor(
+ mock_stats_gauge, mock_trigger_tasks, mock_sync, team_name, expected_tags
+):
+ executor = BaseExecutor(team_name=team_name)
executor.heartbeat()
- calls = [
- mock.call(
- "executor.open_slots",
- value=mock.ANY,
- tags={"status": "open", "executor_class_name": "BaseExecutor"},
- ),
- mock.call(
- "executor.queued_tasks",
- value=mock.ANY,
- tags={"status": "queued", "executor_class_name": "BaseExecutor"},
- ),
- mock.call(
- "executor.running_tasks",
- value=mock.ANY,
- tags={"status": "running", "executor_class_name": "BaseExecutor"},
- ),
- ]
- mock_stats_gauge.assert_has_calls(calls)
+ # Verify all three gauges use the expected tag structure
+ for metric, status in [
+ ("executor.open_slots", "open"),
+ ("executor.queued_tasks", "queued"),
+ ("executor.running_tasks", "running"),
+ ]:
+ mock_stats_gauge.assert_any_call(metric, value=mock.ANY,
tags={**expected_tags, "status": status})
@pytest.mark.parametrize(