This is an automated email from the ASF dual-hosted git repository.

ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b191f539330 Add team_name tag to deadline metrics for multi-team 
deployments (#68589)
b191f539330 is described below

commit b191f5393306566e8ff1c9f4d55ff5e4824dbe09
Author: D. Ferruzzi <[email protected]>
AuthorDate: Tue Jun 16 15:58:12 2026 -0700

    Add team_name tag to deadline metrics for multi-team deployments (#68589)
---
 airflow-core/src/airflow/models/deadline.py        |  23 +++-
 .../src/airflow/serialization/definitions/dag.py   |  11 +-
 airflow-core/tests/unit/models/test_deadline.py    | 120 +++++++++++++++++++++
 3 files changed, 151 insertions(+), 3 deletions(-)

diff --git a/airflow-core/src/airflow/models/deadline.py 
b/airflow-core/src/airflow/models/deadline.py
index d82b9e7b805..f596c663014 100644
--- a/airflow-core/src/airflow/models/deadline.py
+++ b/airflow-core/src/airflow/models/deadline.py
@@ -31,12 +31,14 @@ from sqlalchemy.orm import Mapped, mapped_column, 
relationship
 
 from airflow._shared.observability.metrics import stats
 from airflow._shared.timezones import timezone
+from airflow.configuration import conf
 from airflow.models.base import Base
 from airflow.models.callback import (
     Callback,
     ExecutorCallback,
     TriggererCallback,
 )
+from airflow.utils.helpers import prune_dict
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.session import provide_session
 from airflow.utils.sqlalchemy import UtcDateTime, get_dialect_name
@@ -173,6 +175,7 @@ class Deadline(Base):
         :param session: Session to use.
         """
         from airflow.models import DagRun  # Avoids circular import
+        from airflow.models.dag import DagModel
 
         # Assemble the filter conditions.
         filter_conditions = [column == value for column, value in 
conditions.items()]
@@ -199,9 +202,16 @@ class Deadline(Base):
             if dagrun.end_date is not None and dagrun.end_date <= 
deadline.deadline_time:
                 # If the DagRun finished before the Deadline:
                 session.delete(deadline)
+                team_name = (
+                    DagModel.get_team_name(dagrun.dag_id, session=session)
+                    if conf.getboolean("core", "multi_team")
+                    else None
+                )
                 stats.incr(
                     "deadline_alerts.deadline_not_missed",
-                    tags={"dag_id": dagrun.dag_id, "dagrun_id": dagrun.run_id},
+                    tags=prune_dict(
+                        {"dag_id": dagrun.dag_id, "dagrun_id": dagrun.run_id, 
"team_name": team_name}
+                    ),
                 )
                 deleted_count += 1
                 dagruns_to_refresh.add(dagrun)
@@ -217,6 +227,7 @@ class Deadline(Base):
 
     def handle_miss(self, session: Session):
         """Handle a missed deadline by queueing the callback."""
+        from airflow.models.dag import DagModel  # Avoids circular import
 
         def get_simple_context():
             from airflow.api_fastapi.core_api.datamodels.dag_run import 
DAGRunResponse
@@ -265,9 +276,17 @@ class Deadline(Base):
 
         self.missed = True
         session.add(self)
+
+        team_name = (
+            DagModel.get_team_name(self.dagrun.dag_id, session=session)
+            if conf.getboolean("core", "multi_team")
+            else None
+        )
         stats.incr(
             "deadline_alerts.deadline_missed",
-            tags={"dag_id": self.dagrun.dag_id, "dagrun_id": 
self.dagrun.run_id},
+            tags=prune_dict(
+                {"dag_id": self.dagrun.dag_id, "dagrun_id": 
self.dagrun.run_id, "team_name": team_name}
+            ),
         )
 
 
diff --git a/airflow-core/src/airflow/serialization/definitions/dag.py 
b/airflow-core/src/airflow/serialization/definitions/dag.py
index 77f440987f5..61205832ac5 100644
--- a/airflow-core/src/airflow/serialization/definitions/dag.py
+++ b/airflow-core/src/airflow/serialization/definitions/dag.py
@@ -55,6 +55,7 @@ from airflow.serialization.definitions.deadline import 
DeadlineAlertFields, Seri
 from airflow.serialization.definitions.param import SerializedParamsDict
 from airflow.serialization.enums import DagAttributeTypes as DAT, Encoding
 from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction
+from airflow.utils.helpers import prune_dict
 from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.state import DagRunState, TaskInstanceState
 from airflow.utils.types import DagRunType
@@ -749,7 +750,15 @@ class SerializedDAG:
                             bundle_name=orm_dagrun.dag_model.bundle_name,
                         )
                     )
-                    stats.incr("deadline_alerts.deadline_created", 
tags={"dag_id": self.dag_id})
+                    team_name = (
+                        DagModel.get_team_name(self.dag_id, session=session)
+                        if airflow_conf.getboolean("core", "multi_team")
+                        else None
+                    )
+                    stats.incr(
+                        "deadline_alerts.deadline_created",
+                        tags=prune_dict({"dag_id": self.dag_id, "team_name": 
team_name}),
+                    )
 
     @provide_session
     def set_task_instance_state(
diff --git a/airflow-core/tests/unit/models/test_deadline.py 
b/airflow-core/tests/unit/models/test_deadline.py
index c14ea44375e..3635bcadcbe 100644
--- a/airflow-core/tests/unit/models/test_deadline.py
+++ b/airflow-core/tests/unit/models/test_deadline.py
@@ -82,6 +82,8 @@ def _clean_db():
     db.clear_db_dags()
     db.clear_db_runs()
     db.clear_db_deadline()
+    db.clear_db_dag_bundles()
+    db.clear_db_teams()
 
 
 def assert_correct_timing(reference, expected_timing):
@@ -790,3 +792,121 @@ class TestDeadlineReferenceDecorator:
                 return timezone.datetime(DEFAULT_DATE)
 
         mock_register.assert_called_once_with(DecoratedCustomRef, timing)
+
+
[email protected]_test
+class TestDeadlineMetricsTeamName:
+    """Verify team_name tag is included/excluded on deadline metrics based on 
multi_team config."""
+
+    @staticmethod
+    def setup_method():
+        _clean_db()
+
+    @staticmethod
+    def teardown_method():
+        _clean_db()
+
+    @pytest.mark.parametrize(
+        ("multi_team", "expected_tags"),
+        [
+            pytest.param(
+                "true", {"dag_id": "dl_dag", "dagrun_id": mock.ANY, 
"team_name": "dl_team"}, id="with_team"
+            ),
+            pytest.param("false", {"dag_id": "dl_dag", "dagrun_id": mock.ANY}, 
id="without_team"),
+        ],
+    )
+    @mock.patch("airflow._shared.observability.metrics.stats._get_backend")
+    def test_deadline_not_missed_respects_team_name(
+        self, mock_get_backend, multi_team, expected_tags, session, dag_maker
+    ):
+        from airflow._shared.observability.metrics.base_stats_logger import 
StatsLogger
+        from airflow.models.dagbundle import DagBundleModel
+        from airflow.models.team import Team
+
+        from tests_common.test_utils.config import conf_vars
+
+        mock_stats = mock.MagicMock(spec=StatsLogger)
+        mock_get_backend.return_value = mock_stats
+
+        team = Team(name="dl_team")
+        session.add(team)
+        session.flush()
+
+        bundle = DagBundleModel(name="dl_bundle")
+        bundle.teams.append(team)
+        session.add(bundle)
+        session.flush()
+
+        with dag_maker(dag_id="dl_dag", bundle_name="dl_bundle", 
session=session):
+            EmptyOperator(task_id="task1")
+
+        dr = dag_maker.create_dagrun(state=DagRunState.SUCCESS, 
logical_date=DEFAULT_DATE)
+        dr.end_date = DEFAULT_DATE
+        session.flush()
+
+        deadline = Deadline(
+            deadline_time=DEFAULT_DATE + timedelta(hours=1),
+            callback=AsyncCallback(TEST_CALLBACK_PATH),
+            dagrun_id=dr.id,
+            dag_id=dr.dag_id,
+            deadline_alert_id=None,
+        )
+        session.add(deadline)
+        session.flush()
+
+        with conf_vars({("core", "multi_team"): multi_team}):
+            Deadline.prune_deadlines(conditions={Deadline.dagrun_id: dr.id}, 
session=session)
+
+        mock_stats.incr.assert_any_call("deadline_alerts.deadline_not_missed", 
tags=expected_tags)
+
+    @pytest.mark.parametrize(
+        ("multi_team", "expected_tags"),
+        [
+            pytest.param(
+                "true", {"dag_id": "dl_dag", "dagrun_id": mock.ANY, 
"team_name": "dl_team"}, id="with_team"
+            ),
+            pytest.param("false", {"dag_id": "dl_dag", "dagrun_id": mock.ANY}, 
id="without_team"),
+        ],
+    )
+    @mock.patch("airflow._shared.observability.metrics.stats._get_backend")
+    def test_deadline_missed_respects_team_name(
+        self, mock_get_backend, multi_team, expected_tags, session, dag_maker
+    ):
+        from airflow._shared.observability.metrics.base_stats_logger import 
StatsLogger
+        from airflow.models.dagbundle import DagBundleModel
+        from airflow.models.team import Team
+
+        from tests_common.test_utils.config import conf_vars
+
+        mock_stats = mock.MagicMock(spec=StatsLogger)
+        mock_get_backend.return_value = mock_stats
+
+        team = Team(name="dl_team")
+        session.add(team)
+        session.flush()
+
+        bundle = DagBundleModel(name="dl_bundle")
+        bundle.teams.append(team)
+        session.add(bundle)
+        session.flush()
+
+        with dag_maker(dag_id="dl_dag", bundle_name="dl_bundle", 
session=session):
+            EmptyOperator(task_id="task1")
+
+        dr = dag_maker.create_dagrun(state=DagRunState.RUNNING, 
logical_date=DEFAULT_DATE)
+
+        deadline = Deadline(
+            deadline_time=DEFAULT_DATE,
+            callback=AsyncCallback(TEST_CALLBACK_PATH),
+            dagrun_id=dr.id,
+            dag_id=dr.dag_id,
+            deadline_alert_id=None,
+        )
+        session.add(deadline)
+        session.flush()
+
+        with conf_vars({("core", "multi_team"): multi_team}):
+            with mock.patch.object(deadline.callback, "queue"):
+                deadline.handle_miss(session)
+
+        mock_stats.incr.assert_any_call("deadline_alerts.deadline_missed", 
tags=expected_tags)

Reply via email to