This is an automated email from the ASF dual-hosted git repository.
ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b191f539330 Add team_name tag to deadline metrics for multi-team
deployments (#68589)
b191f539330 is described below
commit b191f5393306566e8ff1c9f4d55ff5e4824dbe09
Author: D. Ferruzzi <[email protected]>
AuthorDate: Tue Jun 16 15:58:12 2026 -0700
Add team_name tag to deadline metrics for multi-team deployments (#68589)
---
airflow-core/src/airflow/models/deadline.py | 23 +++-
.../src/airflow/serialization/definitions/dag.py | 11 +-
airflow-core/tests/unit/models/test_deadline.py | 120 +++++++++++++++++++++
3 files changed, 151 insertions(+), 3 deletions(-)
diff --git a/airflow-core/src/airflow/models/deadline.py
b/airflow-core/src/airflow/models/deadline.py
index d82b9e7b805..f596c663014 100644
--- a/airflow-core/src/airflow/models/deadline.py
+++ b/airflow-core/src/airflow/models/deadline.py
@@ -31,12 +31,14 @@ from sqlalchemy.orm import Mapped, mapped_column,
relationship
from airflow._shared.observability.metrics import stats
from airflow._shared.timezones import timezone
+from airflow.configuration import conf
from airflow.models.base import Base
from airflow.models.callback import (
Callback,
ExecutorCallback,
TriggererCallback,
)
+from airflow.utils.helpers import prune_dict
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import provide_session
from airflow.utils.sqlalchemy import UtcDateTime, get_dialect_name
@@ -173,6 +175,7 @@ class Deadline(Base):
:param session: Session to use.
"""
from airflow.models import DagRun # Avoids circular import
+ from airflow.models.dag import DagModel
# Assemble the filter conditions.
filter_conditions = [column == value for column, value in
conditions.items()]
@@ -199,9 +202,16 @@ class Deadline(Base):
if dagrun.end_date is not None and dagrun.end_date <=
deadline.deadline_time:
# If the DagRun finished before the Deadline:
session.delete(deadline)
+ team_name = (
+ DagModel.get_team_name(dagrun.dag_id, session=session)
+ if conf.getboolean("core", "multi_team")
+ else None
+ )
stats.incr(
"deadline_alerts.deadline_not_missed",
- tags={"dag_id": dagrun.dag_id, "dagrun_id": dagrun.run_id},
+ tags=prune_dict(
+ {"dag_id": dagrun.dag_id, "dagrun_id": dagrun.run_id,
"team_name": team_name}
+ ),
)
deleted_count += 1
dagruns_to_refresh.add(dagrun)
@@ -217,6 +227,7 @@ class Deadline(Base):
def handle_miss(self, session: Session):
"""Handle a missed deadline by queueing the callback."""
+ from airflow.models.dag import DagModel # Avoids circular import
def get_simple_context():
from airflow.api_fastapi.core_api.datamodels.dag_run import
DAGRunResponse
@@ -265,9 +276,17 @@ class Deadline(Base):
self.missed = True
session.add(self)
+
+ team_name = (
+ DagModel.get_team_name(self.dagrun.dag_id, session=session)
+ if conf.getboolean("core", "multi_team")
+ else None
+ )
stats.incr(
"deadline_alerts.deadline_missed",
- tags={"dag_id": self.dagrun.dag_id, "dagrun_id":
self.dagrun.run_id},
+ tags=prune_dict(
+ {"dag_id": self.dagrun.dag_id, "dagrun_id":
self.dagrun.run_id, "team_name": team_name}
+ ),
)
diff --git a/airflow-core/src/airflow/serialization/definitions/dag.py
b/airflow-core/src/airflow/serialization/definitions/dag.py
index 77f440987f5..61205832ac5 100644
--- a/airflow-core/src/airflow/serialization/definitions/dag.py
+++ b/airflow-core/src/airflow/serialization/definitions/dag.py
@@ -55,6 +55,7 @@ from airflow.serialization.definitions.deadline import
DeadlineAlertFields, Seri
from airflow.serialization.definitions.param import SerializedParamsDict
from airflow.serialization.enums import DagAttributeTypes as DAT, Encoding
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction
+from airflow.utils.helpers import prune_dict
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.types import DagRunType
@@ -749,7 +750,15 @@ class SerializedDAG:
bundle_name=orm_dagrun.dag_model.bundle_name,
)
)
- stats.incr("deadline_alerts.deadline_created",
tags={"dag_id": self.dag_id})
+ team_name = (
+ DagModel.get_team_name(self.dag_id, session=session)
+ if airflow_conf.getboolean("core", "multi_team")
+ else None
+ )
+ stats.incr(
+ "deadline_alerts.deadline_created",
+ tags=prune_dict({"dag_id": self.dag_id, "team_name":
team_name}),
+ )
@provide_session
def set_task_instance_state(
diff --git a/airflow-core/tests/unit/models/test_deadline.py
b/airflow-core/tests/unit/models/test_deadline.py
index c14ea44375e..3635bcadcbe 100644
--- a/airflow-core/tests/unit/models/test_deadline.py
+++ b/airflow-core/tests/unit/models/test_deadline.py
@@ -82,6 +82,8 @@ def _clean_db():
db.clear_db_dags()
db.clear_db_runs()
db.clear_db_deadline()
+ db.clear_db_dag_bundles()
+ db.clear_db_teams()
def assert_correct_timing(reference, expected_timing):
@@ -790,3 +792,121 @@ class TestDeadlineReferenceDecorator:
return timezone.datetime(DEFAULT_DATE)
mock_register.assert_called_once_with(DecoratedCustomRef, timing)
+
+
[email protected]_test
+class TestDeadlineMetricsTeamName:
+ """Verify team_name tag is included/excluded on deadline metrics based on
multi_team config."""
+
+ @staticmethod
+ def setup_method():
+ _clean_db()
+
+ @staticmethod
+ def teardown_method():
+ _clean_db()
+
+ @pytest.mark.parametrize(
+ ("multi_team", "expected_tags"),
+ [
+ pytest.param(
+ "true", {"dag_id": "dl_dag", "dagrun_id": mock.ANY,
"team_name": "dl_team"}, id="with_team"
+ ),
+ pytest.param("false", {"dag_id": "dl_dag", "dagrun_id": mock.ANY},
id="without_team"),
+ ],
+ )
+ @mock.patch("airflow._shared.observability.metrics.stats._get_backend")
+ def test_deadline_not_missed_respects_team_name(
+ self, mock_get_backend, multi_team, expected_tags, session, dag_maker
+ ):
+ from airflow._shared.observability.metrics.base_stats_logger import
StatsLogger
+ from airflow.models.dagbundle import DagBundleModel
+ from airflow.models.team import Team
+
+ from tests_common.test_utils.config import conf_vars
+
+ mock_stats = mock.MagicMock(spec=StatsLogger)
+ mock_get_backend.return_value = mock_stats
+
+ team = Team(name="dl_team")
+ session.add(team)
+ session.flush()
+
+ bundle = DagBundleModel(name="dl_bundle")
+ bundle.teams.append(team)
+ session.add(bundle)
+ session.flush()
+
+ with dag_maker(dag_id="dl_dag", bundle_name="dl_bundle",
session=session):
+ EmptyOperator(task_id="task1")
+
+ dr = dag_maker.create_dagrun(state=DagRunState.SUCCESS,
logical_date=DEFAULT_DATE)
+ dr.end_date = DEFAULT_DATE
+ session.flush()
+
+ deadline = Deadline(
+ deadline_time=DEFAULT_DATE + timedelta(hours=1),
+ callback=AsyncCallback(TEST_CALLBACK_PATH),
+ dagrun_id=dr.id,
+ dag_id=dr.dag_id,
+ deadline_alert_id=None,
+ )
+ session.add(deadline)
+ session.flush()
+
+ with conf_vars({("core", "multi_team"): multi_team}):
+ Deadline.prune_deadlines(conditions={Deadline.dagrun_id: dr.id},
session=session)
+
+ mock_stats.incr.assert_any_call("deadline_alerts.deadline_not_missed",
tags=expected_tags)
+
+ @pytest.mark.parametrize(
+ ("multi_team", "expected_tags"),
+ [
+ pytest.param(
+ "true", {"dag_id": "dl_dag", "dagrun_id": mock.ANY,
"team_name": "dl_team"}, id="with_team"
+ ),
+ pytest.param("false", {"dag_id": "dl_dag", "dagrun_id": mock.ANY},
id="without_team"),
+ ],
+ )
+ @mock.patch("airflow._shared.observability.metrics.stats._get_backend")
+ def test_deadline_missed_respects_team_name(
+ self, mock_get_backend, multi_team, expected_tags, session, dag_maker
+ ):
+ from airflow._shared.observability.metrics.base_stats_logger import
StatsLogger
+ from airflow.models.dagbundle import DagBundleModel
+ from airflow.models.team import Team
+
+ from tests_common.test_utils.config import conf_vars
+
+ mock_stats = mock.MagicMock(spec=StatsLogger)
+ mock_get_backend.return_value = mock_stats
+
+ team = Team(name="dl_team")
+ session.add(team)
+ session.flush()
+
+ bundle = DagBundleModel(name="dl_bundle")
+ bundle.teams.append(team)
+ session.add(bundle)
+ session.flush()
+
+ with dag_maker(dag_id="dl_dag", bundle_name="dl_bundle",
session=session):
+ EmptyOperator(task_id="task1")
+
+ dr = dag_maker.create_dagrun(state=DagRunState.RUNNING,
logical_date=DEFAULT_DATE)
+
+ deadline = Deadline(
+ deadline_time=DEFAULT_DATE,
+ callback=AsyncCallback(TEST_CALLBACK_PATH),
+ dagrun_id=dr.id,
+ dag_id=dr.dag_id,
+ deadline_alert_id=None,
+ )
+ session.add(deadline)
+ session.flush()
+
+ with conf_vars({("core", "multi_team"): multi_team}):
+ with mock.patch.object(deadline.callback, "queue"):
+ deadline.handle_miss(session)
+
+ mock_stats.incr.assert_any_call("deadline_alerts.deadline_missed",
tags=expected_tags)