This is an automated email from the ASF dual-hosted git repository.
ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7cc1d693b7e Add team_name tag to remaining multi-team metrics (#68601)
7cc1d693b7e is described below
commit 7cc1d693b7e94c57f5e7ee4cf47474001ccb1ba9
Author: D. Ferruzzi <[email protected]>
AuthorDate: Wed Jun 17 22:31:58 2026 -0700
Add team_name tag to remaining multi-team metrics (#68601)
---
.../src/airflow/jobs/triggerer_job_runner.py | 10 +--
airflow-core/src/airflow/models/callback.py | 12 +++-
airflow-core/src/airflow/models/taskinstance.py | 2 +-
airflow-core/tests/unit/jobs/test_triggerer_job.py | 77 ++++++++++++++++++++++
airflow-core/tests/unit/models/test_callback.py | 45 +++++++++++++
.../tests/unit/models/test_taskinstance.py | 38 +++++++++++
.../src/airflow/sdk/bases/resumablejobmixin.py | 18 +++--
task-sdk/src/airflow/sdk/types.py | 3 +
.../tests/task_sdk/bases/test_resumablejobmixin.py | 34 ++++++++++
9 files changed, 225 insertions(+), 14 deletions(-)
diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
index 97fd6692044..3612feb011f 100644
--- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
+++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
@@ -110,7 +110,7 @@ from airflow.sdk.execution_time.task_runner import
RuntimeTaskInstance
from airflow.serialization.serialized_objects import DagSerialization
from airflow.triggers.base import BaseEventTrigger, BaseTrigger,
DiscrimatedTriggerEvent, TriggerEvent
from airflow.triggers.shared_stream import SharedStreamManager
-from airflow.utils.helpers import log_filename_template_renderer
+from airflow.utils.helpers import log_filename_template_renderer, prune_dict
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import create_session, provide_session
@@ -679,7 +679,7 @@ class TriggerRunnerSupervisor(WatchedSubprocess):
perform_heartbeat(self.job,
heartbeat_callback=self.heartbeat_callback, only_if_necessary=True)
def heartbeat_callback(self, session: Session | None = None) -> None:
- stats.incr("triggerer_heartbeat", 1, 1)
+ stats.incr("triggerer_heartbeat", 1, 1, tags=prune_dict({"team_name":
self.team_name}))
def load_triggers(self) -> None:
"""Assign triggers to this triggerer and update the runner with the
IDs it should run."""
@@ -710,7 +710,7 @@ class TriggerRunnerSupervisor(WatchedSubprocess):
if entry.persist_seq is not None:
self.persisted_event_seqs.append(entry.persist_seq)
# Emit stat event
- stats.incr("triggers.succeeded")
+ stats.incr("triggers.succeeded", tags=prune_dict({"team_name":
self.team_name}))
def on_trigger_event(self, trigger_id: int, event: TriggerEvent) -> None:
"""Record that a trigger fired an event."""
@@ -731,7 +731,7 @@ class TriggerRunnerSupervisor(WatchedSubprocess):
trigger_id, exc = self.failed_triggers.popleft()
self.on_trigger_failure(trigger_id=trigger_id, exc=exc)
# Emit stat event
- stats.incr("triggers.failed")
+ stats.incr("triggers.failed", tags=prune_dict({"team_name":
self.team_name}))
def on_trigger_failure(self, trigger_id: int, exc: list[str] | None) ->
None:
"""Record that a trigger failed."""
@@ -753,7 +753,7 @@ class TriggerRunnerSupervisor(WatchedSubprocess):
"TriggerRunnerSupervisor.metric_tags() requires a Job with a
hostname; "
"subclasses without a metadata-DB Job must override this
method."
)
- return {"hostname": hostname}
+ return prune_dict({"hostname": hostname, "team_name": self.team_name})
def emit_metrics(self):
tags = self.metric_tags()
diff --git a/airflow-core/src/airflow/models/callback.py
b/airflow-core/src/airflow/models/callback.py
index 78247be9ec0..1e8e758450d 100644
--- a/airflow-core/src/airflow/models/callback.py
+++ b/airflow-core/src/airflow/models/callback.py
@@ -39,6 +39,7 @@ from airflow.executors.workloads.callback import
CallbackFetchMethod
from airflow.models import Base
from airflow.models.base import StringID
from airflow.models.dagbundle import DagBundleModel
+from airflow.utils.helpers import prune_dict
from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime
from airflow.utils.state import CallbackState
@@ -156,7 +157,7 @@ class Callback(Base, BaseWorkload):
def queue(self, *, session: Session) -> None:
self.state = CallbackState.QUEUED
- def get_metric_info(self, status: CallbackState, result: Any) -> dict:
+ def get_metric_info(self, status: CallbackState, result: Any, team_name:
str | None = None) -> dict:
tags = {"result": result, **self.data}
tags.pop("prefix", None)
@@ -177,6 +178,10 @@ class Callback(Base, BaseWorkload):
for k, v in tags.items()
}
+ # team_name is omitted entirely when not in a multi-team deployment or
when the
+ # callback's bundle is not mapped to a team.
+ tags = prune_dict({**tags, "team_name": team_name})
+
prefix = self.data.get("prefix", "")
name = f"{prefix}.callback_{status}" if prefix else
f"callback_{status}"
@@ -250,9 +255,12 @@ class TriggererCallback(Callback):
if (status := event.payload.get(PAYLOAD_STATUS_KEY)) and status in
(ACTIVE_STATES | TERMINAL_STATES):
self.state = status
if status in TERMINAL_STATES:
+ team_name: str | None = None
+ if self.bundle_name and conf.getboolean("core", "multi_team"):
+ team_name = DagBundleModel.get_team_name(self.bundle_name,
session=session)
self.trigger = None
self.output = event.payload.get(PAYLOAD_BODY_KEY)
- stats.incr(**self.get_metric_info(status, self.output))
+ stats.incr(**self.get_metric_info(status, self.output,
team_name=team_name))
session.add(self)
else:
diff --git a/airflow-core/src/airflow/models/taskinstance.py
b/airflow-core/src/airflow/models/taskinstance.py
index 740596f9d69..bbf4b0db923 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -1505,7 +1505,7 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload):
stats.timing(
f"task.{metric_name}",
timing,
- tags={"task_id": self.task_id, "dag_id": self.dag_id, "queue":
self.queue},
+ tags={**self.stats_tags, "queue": self.queue},
)
def clear_next_method_args(self) -> None:
diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py
b/airflow-core/tests/unit/jobs/test_triggerer_job.py
index 923779df003..2a875fbfbf6 100644
--- a/airflow-core/tests/unit/jobs/test_triggerer_job.py
+++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py
@@ -2460,6 +2460,83 @@ def
test_handle_events_does_not_confirm_seq_when_persist_fails(jobless_superviso
assert list(jobless_supervisor.persisted_event_seqs) == []
[email protected](
+ ("team_name", "expected_tags"),
+ [
+ pytest.param("team_alpha", {"team_name": "team_alpha"},
id="with_team"),
+ pytest.param(None, {}, id="without_team"),
+ ],
+)
+def test_handle_events_emits_team_name(jobless_supervisor, team_name,
expected_tags):
+ """triggers.succeeded carries the triggerer's team_name (omitted when the
triggerer has none)."""
+ jobless_supervisor.team_name = team_name
+ jobless_supervisor.events.append(TriggerEventEntry(1, TriggerEvent(True),
7))
+
+ with (
+ mock.patch.object(TriggerRunnerSupervisor, "on_trigger_event",
autospec=True),
+ mock.patch("airflow.jobs.triggerer_job_runner.stats.incr") as
mock_incr,
+ ):
+ jobless_supervisor.handle_events()
+
+ mock_incr.assert_called_once_with("triggers.succeeded", tags=expected_tags)
+
+
[email protected](
+ ("team_name", "expected_tags"),
+ [
+ pytest.param("team_alpha", {"team_name": "team_alpha"},
id="with_team"),
+ pytest.param(None, {}, id="without_team"),
+ ],
+)
+def test_handle_failed_triggers_emits_team_name(jobless_supervisor, team_name,
expected_tags):
+ """triggers.failed carries the triggerer's team_name (omitted when the
triggerer has none)."""
+ jobless_supervisor.team_name = team_name
+ jobless_supervisor.failed_triggers.append((1, None))
+
+ with (
+ mock.patch.object(TriggerRunnerSupervisor, "on_trigger_failure",
autospec=True),
+ mock.patch("airflow.jobs.triggerer_job_runner.stats.incr") as
mock_incr,
+ ):
+ jobless_supervisor.handle_failed_triggers()
+
+ mock_incr.assert_called_once_with("triggers.failed", tags=expected_tags)
+
+
[email protected](
+ ("team_name", "expected_tags"),
+ [
+ pytest.param("team_alpha", {"team_name": "team_alpha"},
id="with_team"),
+ pytest.param(None, {}, id="without_team"),
+ ],
+)
+def test_heartbeat_callback_emits_team_name(jobless_supervisor, team_name,
expected_tags):
+ jobless_supervisor.team_name = team_name
+
+ with mock.patch("airflow.jobs.triggerer_job_runner.stats.incr") as
mock_incr:
+ jobless_supervisor.heartbeat_callback()
+
+ mock_incr.assert_called_once_with("triggerer_heartbeat", 1, 1,
tags=expected_tags)
+
+
[email protected](
+ ("team_name", "expected_extra"),
+ [
+ pytest.param("team_alpha", {"team_name": "team_alpha"},
id="with_team"),
+ pytest.param(None, {}, id="without_team"),
+ ],
+)
+def test_emit_metrics_includes_team_name(supervisor_builder, mocker,
team_name, expected_extra):
+ supervisor = supervisor_builder()
+ supervisor.team_name = team_name
+ gauge = mocker.patch("airflow.jobs.triggerer_job_runner.stats.gauge")
+
+ supervisor.emit_metrics()
+
+ expected_tags = {"hostname": supervisor.job.hostname, **expected_extra}
+ gauge.assert_any_call("triggers.running", mock.ANY, tags=expected_tags)
+ gauge.assert_any_call("triggerer.capacity_left", mock.ANY,
tags=expected_tags)
+
+
def
test_state_sync_carries_and_drains_persist_confirmations(jobless_supervisor):
"""The state-sync response carries pending confirmations once, then None
when there are none."""
jobless_supervisor.persisted_event_seqs.extend([3, 9])
diff --git a/airflow-core/tests/unit/models/test_callback.py
b/airflow-core/tests/unit/models/test_callback.py
index b5296979ed4..b2b13582710 100644
--- a/airflow-core/tests/unit/models/test_callback.py
+++ b/airflow-core/tests/unit/models/test_callback.py
@@ -143,6 +143,21 @@ class TestCallback:
# insertion order collapse to one metric series (no needless
cardinality split).
assert metric_info["tags"]["result"] == '{"code": 0, "output": [1, 2]}'
+ @pytest.mark.parametrize(
+ ("team_name", "expect_tag"),
+ [
+ pytest.param("team_alpha", True, id="with_team"),
+ pytest.param(None, False, id="without_team"),
+ ],
+ )
+ def test_get_metric_info_includes_team_name(self, team_name, expect_tag):
+ callback = TriggererCallback(TEST_ASYNC_CALLBACK,
prefix="deadline_alerts", dag_id=TEST_DAG_ID)
+ metric_info = callback.get_metric_info(CallbackState.SUCCESS, "0",
team_name=team_name)
+ if expect_tag:
+ assert metric_info["tags"]["team_name"] == team_name
+ else:
+ assert "team_name" not in metric_info["tags"]
+
class TestTriggererCallback:
def test_polymorphic_serde(self, session):
@@ -249,6 +264,36 @@ class TestTriggererCallback:
assert callback.trigger is None
assert callback.output == event.payload[PAYLOAD_BODY_KEY]
+ @pytest.mark.parametrize(
+ ("multi_team", "team_name", "expect_tag"),
+ [
+ pytest.param("true", "team_alpha", True, id="with_team"),
+ pytest.param("false", None, False, id="without_team"),
+ ],
+ )
+ @patch("airflow.models.callback.stats.incr")
+ def test_handle_event_emits_team_name(self, mock_incr, multi_team,
team_name, expect_tag, session):
+ """On a terminal event, callback_{status} carries team_name resolved
from the bundle."""
+ callback = TriggererCallback(TEST_ASYNC_CALLBACK, dag_id=TEST_DAG_ID)
+ callback.bundle_name = "test_bundle"
+ event = TriggerEvent({PAYLOAD_STATUS_KEY: CallbackState.SUCCESS,
PAYLOAD_BODY_KEY: "0"})
+
+ with (
+ conf_vars({("core", "multi_team"): multi_team}),
+ patch(
+ "airflow.models.callback.DagBundleModel.get_team_name",
return_value=team_name
+ ) as mock_get_team_name,
+ ):
+ callback.handle_event(event, session)
+
+ mock_incr.assert_called_once()
+ _, kwargs = mock_incr.call_args
+ if expect_tag:
+ assert kwargs["tags"]["team_name"] == team_name
+ else:
+ mock_get_team_name.assert_not_called()
+ assert "team_name" not in kwargs["tags"]
+
class TestExecutorCallback:
def test_polymorphic_serde(self, session):
diff --git a/airflow-core/tests/unit/models/test_taskinstance.py
b/airflow-core/tests/unit/models/test_taskinstance.py
index b9d8c85f613..4e3fc709028 100644
--- a/airflow-core/tests/unit/models/test_taskinstance.py
+++ b/airflow-core/tests/unit/models/test_taskinstance.py
@@ -4126,3 +4126,41 @@ class TestTaskInstanceStatsTagsTeamName:
ti._team_name = None
tags = ti.stats_tags
assert "team_name" not in tags
+
+ @pytest.mark.parametrize(
+ ("team_name", "expected_tags"),
+ [
+ pytest.param(
+ "my_team",
+ {"dag_id": "test_dag", "task_id": "my_task", "team_name":
"my_team", "queue": "default"},
+ id="with_team",
+ ),
+ pytest.param(
+ None,
+ {"dag_id": "test_dag", "task_id": "my_task", "queue":
"default"},
+ id="without_team",
+ ),
+ ],
+ )
+ @mock.patch("airflow._shared.observability.metrics.stats.timing")
+ def test_emit_state_change_metric_includes_team_name(
+ self, mock_timing, team_name, expected_tags, dag_maker, session
+ ):
+ with dag_maker("test_dag"):
+ EmptyOperator(task_id="my_task")
+ dr = dag_maker.create_dagrun()
+ ti = dr.get_task_instance("my_task", session=session)
+ ti.state = TaskInstanceState.SCHEDULED
+ ti.scheduled_dttm = timezone.utcnow()
+ if team_name:
+ ti._team_name = team_name
+ session.merge(ti)
+ session.flush()
+
+ ti.emit_state_change_metric(TaskInstanceState.QUEUED)
+
+ mock_timing.assert_called_once_with(
+ "task.scheduled_duration",
+ mock.ANY,
+ tags=expected_tags,
+ )
diff --git a/task-sdk/src/airflow/sdk/bases/resumablejobmixin.py
b/task-sdk/src/airflow/sdk/bases/resumablejobmixin.py
index 8bfda28a664..7066d10cced 100644
--- a/task-sdk/src/airflow/sdk/bases/resumablejobmixin.py
+++ b/task-sdk/src/airflow/sdk/bases/resumablejobmixin.py
@@ -107,7 +107,13 @@ class ResumableJobMixin:
Closing this window would require atomic "submit + persist", which is
not possible across
an external system boundary.
"""
- operator_tag = {"operator": type(self).__name__}
+ stats_tags = {"operator": type(self).__name__}
+ # The task is team-scoped in multi-team deployments; surface team_name
on the
+ # resumable_job metrics via the running task instance's stats tags
(omitted when
+ # not multi-team or the task has no team).
+ ti = context.get("ti")
+ if ti is not None and (team_name := ti.stats_tags.get("team_name")):
+ stats_tags["team_name"] = team_name
reconnect_to: Any = None
already_succeeded_id: Any = None
@@ -125,7 +131,7 @@ class ResumableJobMixin:
else:
external_id = task_state_store.get(self.external_id_key)
if external_id:
- stats.incr("resumable_job.reconnect_attempt",
tags=operator_tag)
+ stats.incr("resumable_job.reconnect_attempt",
tags=stats_tags)
status = self.get_job_status(external_id, context)
@@ -135,7 +141,7 @@ class ResumableJobMixin:
if self.is_job_active(status):
# Job is still running, skip submission and reconnect
to it.
span.set_attribute("resumable.decision", "reconnect")
- stats.incr("resumable_job.reconnect_success",
tags=operator_tag)
+ stats.incr("resumable_job.reconnect_success",
tags=stats_tags)
self.log.info(
"Reconnecting to existing job",
external_id_key=self.external_id_key,
@@ -146,7 +152,7 @@ class ResumableJobMixin:
elif self.is_job_succeeded(status):
# Job already finished successfully, skip polling and
return result directly.
span.set_attribute("resumable.decision",
"already_succeeded")
- stats.incr("resumable_job.already_succeeded",
tags=operator_tag)
+ stats.incr("resumable_job.already_succeeded",
tags=stats_tags)
self.log.info(
"Job already completed successfully, skipping
resubmission",
external_id_key=self.external_id_key,
@@ -156,7 +162,7 @@ class ResumableJobMixin:
else:
# Job is in a terminal failed state, fall through and
submit a new job.
span.set_attribute("resumable.decision",
"terminal_resubmit")
- stats.incr("resumable_job.terminal_resubmit",
tags=operator_tag)
+ stats.incr("resumable_job.terminal_resubmit",
tags=stats_tags)
self.log.warning(
"Prior job in terminal state, resubmitting fresh",
external_id_key=self.external_id_key,
@@ -165,7 +171,7 @@ class ResumableJobMixin:
)
else:
span.set_attribute("resumable.decision", "fresh_submit")
- stats.incr("resumable_job.fresh_submit", tags=operator_tag)
+ stats.incr("resumable_job.fresh_submit", tags=stats_tags)
self.log.debug(
"No stored external ID found; submitting fresh job",
external_id_key=self.external_id_key,
diff --git a/task-sdk/src/airflow/sdk/types.py
b/task-sdk/src/airflow/sdk/types.py
index 711dbe71ca4..029af34496f 100644
--- a/task-sdk/src/airflow/sdk/types.py
+++ b/task-sdk/src/airflow/sdk/types.py
@@ -151,6 +151,9 @@ class RuntimeTaskInstanceProtocol(Protocol):
@property
def mark_success_url(self) -> str: ...
+ @property
+ def stats_tags(self) -> dict[str, str]: ...
+
def xcom_pull(
self,
task_ids: str | Iterable[str] | None = None,
diff --git a/task-sdk/tests/task_sdk/bases/test_resumablejobmixin.py
b/task-sdk/tests/task_sdk/bases/test_resumablejobmixin.py
index 11f97616cdf..e186cc19a36 100644
--- a/task-sdk/tests/task_sdk/bases/test_resumablejobmixin.py
+++ b/task-sdk/tests/task_sdk/bases/test_resumablejobmixin.py
@@ -85,6 +85,20 @@ def make_context(task_store: FakeTaskState | None = None) ->
dict:
return ctx
+class FakeTI:
+ """Minimal stand-in for RuntimeTaskInstance exposing stats_tags with an
optional team_name."""
+
+ def __init__(self, team_name: str | None = None):
+ self._team_name = team_name
+
+ @property
+ def stats_tags(self) -> dict[str, str]:
+ tags = {"dag_id": "d", "task_id": "t"}
+ if self._team_name:
+ tags["team_name"] = self._team_name
+ return tags
+
+
class TestFirstSubmission:
def test_submits_and_polls_when_no_prior_state(self):
op = ConcreteResumableOperator(task_id="test_task")
@@ -253,6 +267,26 @@ class TestMetrics:
assert "resumable_job.reconnect_success" not in called_names
assert "resumable_job.fresh_submit" not in called_names
+ @pytest.mark.parametrize(
+ ("team_name", "expected_tag"),
+ [
+ pytest.param(
+ "team_alpha",
+ {"operator": "ConcreteResumableOperator", "team_name":
"team_alpha"},
+ id="with_team",
+ ),
+ pytest.param(None, {"operator": "ConcreteResumableOperator"},
id="without_team"),
+ ],
+ )
+ def test_team_name_added_to_metric_tags(self, team_name, expected_tag):
+ op = ConcreteResumableOperator(task_id="test_task")
+ ctx = make_context(FakeTaskState())
+ ctx["ti"] = FakeTI(team_name)
+ mock_incr = MagicMock()
+ with patch(self._PATCH, mock_incr):
+ op.execute_resumable(ctx)
+ mock_incr.assert_called_once_with("resumable_job.fresh_submit",
tags=expected_tag)
+
class TestTracing:
_MODULE_TRACER = "airflow.sdk.bases.resumablejobmixin.tracer"