This is an automated email from the ASF dual-hosted git repository.

ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7cc1d693b7e Add team_name tag to remaining multi-team metrics (#68601)
7cc1d693b7e is described below

commit 7cc1d693b7e94c57f5e7ee4cf47474001ccb1ba9
Author: D. Ferruzzi <[email protected]>
AuthorDate: Wed Jun 17 22:31:58 2026 -0700

    Add team_name tag to remaining multi-team metrics (#68601)
---
 .../src/airflow/jobs/triggerer_job_runner.py       | 10 +--
 airflow-core/src/airflow/models/callback.py        | 12 +++-
 airflow-core/src/airflow/models/taskinstance.py    |  2 +-
 airflow-core/tests/unit/jobs/test_triggerer_job.py | 77 ++++++++++++++++++++++
 airflow-core/tests/unit/models/test_callback.py    | 45 +++++++++++++
 .../tests/unit/models/test_taskinstance.py         | 38 +++++++++++
 .../src/airflow/sdk/bases/resumablejobmixin.py     | 18 +++--
 task-sdk/src/airflow/sdk/types.py                  |  3 +
 .../tests/task_sdk/bases/test_resumablejobmixin.py | 34 ++++++++++
 9 files changed, 225 insertions(+), 14 deletions(-)

diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py 
b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
index 97fd6692044..3612feb011f 100644
--- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
+++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
@@ -110,7 +110,7 @@ from airflow.sdk.execution_time.task_runner import 
RuntimeTaskInstance
 from airflow.serialization.serialized_objects import DagSerialization
 from airflow.triggers.base import BaseEventTrigger, BaseTrigger, 
DiscrimatedTriggerEvent, TriggerEvent
 from airflow.triggers.shared_stream import SharedStreamManager
-from airflow.utils.helpers import log_filename_template_renderer
+from airflow.utils.helpers import log_filename_template_renderer, prune_dict
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.session import create_session, provide_session
 
@@ -679,7 +679,7 @@ class TriggerRunnerSupervisor(WatchedSubprocess):
         perform_heartbeat(self.job, 
heartbeat_callback=self.heartbeat_callback, only_if_necessary=True)
 
     def heartbeat_callback(self, session: Session | None = None) -> None:
-        stats.incr("triggerer_heartbeat", 1, 1)
+        stats.incr("triggerer_heartbeat", 1, 1, tags=prune_dict({"team_name": 
self.team_name}))
 
     def load_triggers(self) -> None:
         """Assign triggers to this triggerer and update the runner with the 
IDs it should run."""
@@ -710,7 +710,7 @@ class TriggerRunnerSupervisor(WatchedSubprocess):
             if entry.persist_seq is not None:
                 self.persisted_event_seqs.append(entry.persist_seq)
             # Emit stat event
-            stats.incr("triggers.succeeded")
+            stats.incr("triggers.succeeded", tags=prune_dict({"team_name": 
self.team_name}))
 
     def on_trigger_event(self, trigger_id: int, event: TriggerEvent) -> None:
         """Record that a trigger fired an event."""
@@ -731,7 +731,7 @@ class TriggerRunnerSupervisor(WatchedSubprocess):
             trigger_id, exc = self.failed_triggers.popleft()
             self.on_trigger_failure(trigger_id=trigger_id, exc=exc)
             # Emit stat event
-            stats.incr("triggers.failed")
+            stats.incr("triggers.failed", tags=prune_dict({"team_name": 
self.team_name}))
 
     def on_trigger_failure(self, trigger_id: int, exc: list[str] | None) -> 
None:
         """Record that a trigger failed."""
@@ -753,7 +753,7 @@ class TriggerRunnerSupervisor(WatchedSubprocess):
                 "TriggerRunnerSupervisor.metric_tags() requires a Job with a 
hostname; "
                 "subclasses without a metadata-DB Job must override this 
method."
             )
-        return {"hostname": hostname}
+        return prune_dict({"hostname": hostname, "team_name": self.team_name})
 
     def emit_metrics(self):
         tags = self.metric_tags()
diff --git a/airflow-core/src/airflow/models/callback.py 
b/airflow-core/src/airflow/models/callback.py
index 78247be9ec0..1e8e758450d 100644
--- a/airflow-core/src/airflow/models/callback.py
+++ b/airflow-core/src/airflow/models/callback.py
@@ -39,6 +39,7 @@ from airflow.executors.workloads.callback import 
CallbackFetchMethod
 from airflow.models import Base
 from airflow.models.base import StringID
 from airflow.models.dagbundle import DagBundleModel
+from airflow.utils.helpers import prune_dict
 from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime
 from airflow.utils.state import CallbackState
 
@@ -156,7 +157,7 @@ class Callback(Base, BaseWorkload):
     def queue(self, *, session: Session) -> None:
         self.state = CallbackState.QUEUED
 
-    def get_metric_info(self, status: CallbackState, result: Any) -> dict:
+    def get_metric_info(self, status: CallbackState, result: Any, team_name: 
str | None = None) -> dict:
         tags = {"result": result, **self.data}
         tags.pop("prefix", None)
 
@@ -177,6 +178,10 @@ class Callback(Base, BaseWorkload):
             for k, v in tags.items()
         }
 
+        # team_name is omitted entirely when not in a multi-team deployment or 
when the
+        # callback's bundle is not mapped to a team.
+        tags = prune_dict({**tags, "team_name": team_name})
+
         prefix = self.data.get("prefix", "")
         name = f"{prefix}.callback_{status}" if prefix else 
f"callback_{status}"
 
@@ -250,9 +255,12 @@ class TriggererCallback(Callback):
         if (status := event.payload.get(PAYLOAD_STATUS_KEY)) and status in 
(ACTIVE_STATES | TERMINAL_STATES):
             self.state = status
             if status in TERMINAL_STATES:
+                team_name: str | None = None
+                if self.bundle_name and conf.getboolean("core", "multi_team"):
+                    team_name = DagBundleModel.get_team_name(self.bundle_name, 
session=session)
                 self.trigger = None
                 self.output = event.payload.get(PAYLOAD_BODY_KEY)
-                stats.incr(**self.get_metric_info(status, self.output))
+                stats.incr(**self.get_metric_info(status, self.output, 
team_name=team_name))
 
             session.add(self)
         else:
diff --git a/airflow-core/src/airflow/models/taskinstance.py 
b/airflow-core/src/airflow/models/taskinstance.py
index 740596f9d69..bbf4b0db923 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -1505,7 +1505,7 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload):
         stats.timing(
             f"task.{metric_name}",
             timing,
-            tags={"task_id": self.task_id, "dag_id": self.dag_id, "queue": 
self.queue},
+            tags={**self.stats_tags, "queue": self.queue},
         )
 
     def clear_next_method_args(self) -> None:
diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py 
b/airflow-core/tests/unit/jobs/test_triggerer_job.py
index 923779df003..2a875fbfbf6 100644
--- a/airflow-core/tests/unit/jobs/test_triggerer_job.py
+++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py
@@ -2460,6 +2460,83 @@ def 
test_handle_events_does_not_confirm_seq_when_persist_fails(jobless_superviso
     assert list(jobless_supervisor.persisted_event_seqs) == []
 
 
[email protected](
+    ("team_name", "expected_tags"),
+    [
+        pytest.param("team_alpha", {"team_name": "team_alpha"}, 
id="with_team"),
+        pytest.param(None, {}, id="without_team"),
+    ],
+)
+def test_handle_events_emits_team_name(jobless_supervisor, team_name, 
expected_tags):
+    """triggers.succeeded carries the triggerer's team_name (omitted when the 
triggerer has none)."""
+    jobless_supervisor.team_name = team_name
+    jobless_supervisor.events.append(TriggerEventEntry(1, TriggerEvent(True), 
7))
+
+    with (
+        mock.patch.object(TriggerRunnerSupervisor, "on_trigger_event", 
autospec=True),
+        mock.patch("airflow.jobs.triggerer_job_runner.stats.incr") as 
mock_incr,
+    ):
+        jobless_supervisor.handle_events()
+
+    mock_incr.assert_called_once_with("triggers.succeeded", tags=expected_tags)
+
+
[email protected](
+    ("team_name", "expected_tags"),
+    [
+        pytest.param("team_alpha", {"team_name": "team_alpha"}, 
id="with_team"),
+        pytest.param(None, {}, id="without_team"),
+    ],
+)
+def test_handle_failed_triggers_emits_team_name(jobless_supervisor, team_name, 
expected_tags):
+    """triggers.failed carries the triggerer's team_name (omitted when the 
triggerer has none)."""
+    jobless_supervisor.team_name = team_name
+    jobless_supervisor.failed_triggers.append((1, None))
+
+    with (
+        mock.patch.object(TriggerRunnerSupervisor, "on_trigger_failure", 
autospec=True),
+        mock.patch("airflow.jobs.triggerer_job_runner.stats.incr") as 
mock_incr,
+    ):
+        jobless_supervisor.handle_failed_triggers()
+
+    mock_incr.assert_called_once_with("triggers.failed", tags=expected_tags)
+
+
[email protected](
+    ("team_name", "expected_tags"),
+    [
+        pytest.param("team_alpha", {"team_name": "team_alpha"}, 
id="with_team"),
+        pytest.param(None, {}, id="without_team"),
+    ],
+)
+def test_heartbeat_callback_emits_team_name(jobless_supervisor, team_name, 
expected_tags):
+    jobless_supervisor.team_name = team_name
+
+    with mock.patch("airflow.jobs.triggerer_job_runner.stats.incr") as 
mock_incr:
+        jobless_supervisor.heartbeat_callback()
+
+    mock_incr.assert_called_once_with("triggerer_heartbeat", 1, 1, 
tags=expected_tags)
+
+
[email protected](
+    ("team_name", "expected_extra"),
+    [
+        pytest.param("team_alpha", {"team_name": "team_alpha"}, 
id="with_team"),
+        pytest.param(None, {}, id="without_team"),
+    ],
+)
+def test_emit_metrics_includes_team_name(supervisor_builder, mocker, 
team_name, expected_extra):
+    supervisor = supervisor_builder()
+    supervisor.team_name = team_name
+    gauge = mocker.patch("airflow.jobs.triggerer_job_runner.stats.gauge")
+
+    supervisor.emit_metrics()
+
+    expected_tags = {"hostname": supervisor.job.hostname, **expected_extra}
+    gauge.assert_any_call("triggers.running", mock.ANY, tags=expected_tags)
+    gauge.assert_any_call("triggerer.capacity_left", mock.ANY, 
tags=expected_tags)
+
+
 def 
test_state_sync_carries_and_drains_persist_confirmations(jobless_supervisor):
     """The state-sync response carries pending confirmations once, then None 
when there are none."""
     jobless_supervisor.persisted_event_seqs.extend([3, 9])
diff --git a/airflow-core/tests/unit/models/test_callback.py 
b/airflow-core/tests/unit/models/test_callback.py
index b5296979ed4..b2b13582710 100644
--- a/airflow-core/tests/unit/models/test_callback.py
+++ b/airflow-core/tests/unit/models/test_callback.py
@@ -143,6 +143,21 @@ class TestCallback:
         # insertion order collapse to one metric series (no needless 
cardinality split).
         assert metric_info["tags"]["result"] == '{"code": 0, "output": [1, 2]}'
 
+    @pytest.mark.parametrize(
+        ("team_name", "expect_tag"),
+        [
+            pytest.param("team_alpha", True, id="with_team"),
+            pytest.param(None, False, id="without_team"),
+        ],
+    )
+    def test_get_metric_info_includes_team_name(self, team_name, expect_tag):
+        callback = TriggererCallback(TEST_ASYNC_CALLBACK, 
prefix="deadline_alerts", dag_id=TEST_DAG_ID)
+        metric_info = callback.get_metric_info(CallbackState.SUCCESS, "0", 
team_name=team_name)
+        if expect_tag:
+            assert metric_info["tags"]["team_name"] == team_name
+        else:
+            assert "team_name" not in metric_info["tags"]
+
 
 class TestTriggererCallback:
     def test_polymorphic_serde(self, session):
@@ -249,6 +264,36 @@ class TestTriggererCallback:
             assert callback.trigger is None
             assert callback.output == event.payload[PAYLOAD_BODY_KEY]
 
+    @pytest.mark.parametrize(
+        ("multi_team", "team_name", "expect_tag"),
+        [
+            pytest.param("true", "team_alpha", True, id="with_team"),
+            pytest.param("false", None, False, id="without_team"),
+        ],
+    )
+    @patch("airflow.models.callback.stats.incr")
+    def test_handle_event_emits_team_name(self, mock_incr, multi_team, 
team_name, expect_tag, session):
+        """On a terminal event, callback_{status} carries team_name resolved 
from the bundle."""
+        callback = TriggererCallback(TEST_ASYNC_CALLBACK, dag_id=TEST_DAG_ID)
+        callback.bundle_name = "test_bundle"
+        event = TriggerEvent({PAYLOAD_STATUS_KEY: CallbackState.SUCCESS, 
PAYLOAD_BODY_KEY: "0"})
+
+        with (
+            conf_vars({("core", "multi_team"): multi_team}),
+            patch(
+                "airflow.models.callback.DagBundleModel.get_team_name", 
return_value=team_name
+            ) as mock_get_team_name,
+        ):
+            callback.handle_event(event, session)
+
+        mock_incr.assert_called_once()
+        _, kwargs = mock_incr.call_args
+        if expect_tag:
+            assert kwargs["tags"]["team_name"] == team_name
+        else:
+            mock_get_team_name.assert_not_called()
+            assert "team_name" not in kwargs["tags"]
+
 
 class TestExecutorCallback:
     def test_polymorphic_serde(self, session):
diff --git a/airflow-core/tests/unit/models/test_taskinstance.py 
b/airflow-core/tests/unit/models/test_taskinstance.py
index b9d8c85f613..4e3fc709028 100644
--- a/airflow-core/tests/unit/models/test_taskinstance.py
+++ b/airflow-core/tests/unit/models/test_taskinstance.py
@@ -4126,3 +4126,41 @@ class TestTaskInstanceStatsTagsTeamName:
         ti._team_name = None
         tags = ti.stats_tags
         assert "team_name" not in tags
+
+    @pytest.mark.parametrize(
+        ("team_name", "expected_tags"),
+        [
+            pytest.param(
+                "my_team",
+                {"dag_id": "test_dag", "task_id": "my_task", "team_name": 
"my_team", "queue": "default"},
+                id="with_team",
+            ),
+            pytest.param(
+                None,
+                {"dag_id": "test_dag", "task_id": "my_task", "queue": 
"default"},
+                id="without_team",
+            ),
+        ],
+    )
+    @mock.patch("airflow._shared.observability.metrics.stats.timing")
+    def test_emit_state_change_metric_includes_team_name(
+        self, mock_timing, team_name, expected_tags, dag_maker, session
+    ):
+        with dag_maker("test_dag"):
+            EmptyOperator(task_id="my_task")
+        dr = dag_maker.create_dagrun()
+        ti = dr.get_task_instance("my_task", session=session)
+        ti.state = TaskInstanceState.SCHEDULED
+        ti.scheduled_dttm = timezone.utcnow()
+        if team_name:
+            ti._team_name = team_name
+        session.merge(ti)
+        session.flush()
+
+        ti.emit_state_change_metric(TaskInstanceState.QUEUED)
+
+        mock_timing.assert_called_once_with(
+            "task.scheduled_duration",
+            mock.ANY,
+            tags=expected_tags,
+        )
diff --git a/task-sdk/src/airflow/sdk/bases/resumablejobmixin.py 
b/task-sdk/src/airflow/sdk/bases/resumablejobmixin.py
index 8bfda28a664..7066d10cced 100644
--- a/task-sdk/src/airflow/sdk/bases/resumablejobmixin.py
+++ b/task-sdk/src/airflow/sdk/bases/resumablejobmixin.py
@@ -107,7 +107,13 @@ class ResumableJobMixin:
         Closing this window would require atomic "submit + persist", which is 
not possible across
         an external system boundary.
         """
-        operator_tag = {"operator": type(self).__name__}
+        stats_tags = {"operator": type(self).__name__}
+        # The task is team-scoped in multi-team deployments; surface team_name 
on the
+        # resumable_job metrics via the running task instance's stats tags 
(omitted when
+        # not multi-team or the task has no team).
+        ti = context.get("ti")
+        if ti is not None and (team_name := ti.stats_tags.get("team_name")):
+            stats_tags["team_name"] = team_name
         reconnect_to: Any = None
         already_succeeded_id: Any = None
 
@@ -125,7 +131,7 @@ class ResumableJobMixin:
             else:
                 external_id = task_state_store.get(self.external_id_key)
                 if external_id:
-                    stats.incr("resumable_job.reconnect_attempt", 
tags=operator_tag)
+                    stats.incr("resumable_job.reconnect_attempt", 
tags=stats_tags)
 
                     status = self.get_job_status(external_id, context)
 
@@ -135,7 +141,7 @@ class ResumableJobMixin:
                     if self.is_job_active(status):
                         # Job is still running, skip submission and reconnect 
to it.
                         span.set_attribute("resumable.decision", "reconnect")
-                        stats.incr("resumable_job.reconnect_success", 
tags=operator_tag)
+                        stats.incr("resumable_job.reconnect_success", 
tags=stats_tags)
                         self.log.info(
                             "Reconnecting to existing job",
                             external_id_key=self.external_id_key,
@@ -146,7 +152,7 @@ class ResumableJobMixin:
                     elif self.is_job_succeeded(status):
                         # Job already finished successfully, skip polling and 
return result directly.
                         span.set_attribute("resumable.decision", 
"already_succeeded")
-                        stats.incr("resumable_job.already_succeeded", 
tags=operator_tag)
+                        stats.incr("resumable_job.already_succeeded", 
tags=stats_tags)
                         self.log.info(
                             "Job already completed successfully, skipping 
resubmission",
                             external_id_key=self.external_id_key,
@@ -156,7 +162,7 @@ class ResumableJobMixin:
                     else:
                         # Job is in a terminal failed state, fall through and 
submit a new job.
                         span.set_attribute("resumable.decision", 
"terminal_resubmit")
-                        stats.incr("resumable_job.terminal_resubmit", 
tags=operator_tag)
+                        stats.incr("resumable_job.terminal_resubmit", 
tags=stats_tags)
                         self.log.warning(
                             "Prior job in terminal state, resubmitting fresh",
                             external_id_key=self.external_id_key,
@@ -165,7 +171,7 @@ class ResumableJobMixin:
                         )
                 else:
                     span.set_attribute("resumable.decision", "fresh_submit")
-                    stats.incr("resumable_job.fresh_submit", tags=operator_tag)
+                    stats.incr("resumable_job.fresh_submit", tags=stats_tags)
                     self.log.debug(
                         "No stored external ID found; submitting fresh job",
                         external_id_key=self.external_id_key,
diff --git a/task-sdk/src/airflow/sdk/types.py 
b/task-sdk/src/airflow/sdk/types.py
index 711dbe71ca4..029af34496f 100644
--- a/task-sdk/src/airflow/sdk/types.py
+++ b/task-sdk/src/airflow/sdk/types.py
@@ -151,6 +151,9 @@ class RuntimeTaskInstanceProtocol(Protocol):
     @property
     def mark_success_url(self) -> str: ...
 
+    @property
+    def stats_tags(self) -> dict[str, str]: ...
+
     def xcom_pull(
         self,
         task_ids: str | Iterable[str] | None = None,
diff --git a/task-sdk/tests/task_sdk/bases/test_resumablejobmixin.py 
b/task-sdk/tests/task_sdk/bases/test_resumablejobmixin.py
index 11f97616cdf..e186cc19a36 100644
--- a/task-sdk/tests/task_sdk/bases/test_resumablejobmixin.py
+++ b/task-sdk/tests/task_sdk/bases/test_resumablejobmixin.py
@@ -85,6 +85,20 @@ def make_context(task_store: FakeTaskState | None = None) -> 
dict:
     return ctx
 
 
+class FakeTI:
+    """Minimal stand-in for RuntimeTaskInstance exposing stats_tags with an 
optional team_name."""
+
+    def __init__(self, team_name: str | None = None):
+        self._team_name = team_name
+
+    @property
+    def stats_tags(self) -> dict[str, str]:
+        tags = {"dag_id": "d", "task_id": "t"}
+        if self._team_name:
+            tags["team_name"] = self._team_name
+        return tags
+
+
 class TestFirstSubmission:
     def test_submits_and_polls_when_no_prior_state(self):
         op = ConcreteResumableOperator(task_id="test_task")
@@ -253,6 +267,26 @@ class TestMetrics:
         assert "resumable_job.reconnect_success" not in called_names
         assert "resumable_job.fresh_submit" not in called_names
 
+    @pytest.mark.parametrize(
+        ("team_name", "expected_tag"),
+        [
+            pytest.param(
+                "team_alpha",
+                {"operator": "ConcreteResumableOperator", "team_name": 
"team_alpha"},
+                id="with_team",
+            ),
+            pytest.param(None, {"operator": "ConcreteResumableOperator"}, 
id="without_team"),
+        ],
+    )
+    def test_team_name_added_to_metric_tags(self, team_name, expected_tag):
+        op = ConcreteResumableOperator(task_id="test_task")
+        ctx = make_context(FakeTaskState())
+        ctx["ti"] = FakeTI(team_name)
+        mock_incr = MagicMock()
+        with patch(self._PATCH, mock_incr):
+            op.execute_resumable(ctx)
+        mock_incr.assert_called_once_with("resumable_job.fresh_submit", 
tags=expected_tag)
+
 
 class TestTracing:
     _MODULE_TRACER = "airflow.sdk.bases.resumablejobmixin.tracer"

Reply via email to