This is an automated email from the ASF dual-hosted git repository.
ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new aa0609f0867 Add team_name tag to dag processor metrics for multi-team
deployments (#68599)
aa0609f0867 is described below
commit aa0609f08672c3438d451d48fe0832ae1c9cd925
Author: D. Ferruzzi <[email protected]>
AuthorDate: Thu Jun 18 09:29:57 2026 -0700
Add team_name tag to dag processor metrics for multi-team deployments
(#68599)
---
airflow-core/src/airflow/dag_processing/manager.py | 111 +++++++-
.../src/airflow/dag_processing/processor.py | 16 +-
airflow-core/src/airflow/models/dagrun.py | 15 +-
.../tests/unit/dag_processing/test_manager.py | 292 +++++++++++++++++++++
.../tests/unit/dag_processing/test_processor.py | 77 ++++++
airflow-core/tests/unit/models/test_dagrun.py | 31 +++
6 files changed, 527 insertions(+), 15 deletions(-)
diff --git a/airflow-core/src/airflow/dag_processing/manager.py
b/airflow-core/src/airflow/dag_processing/manager.py
index 92ca10c67fa..e43920f2618 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -69,6 +69,7 @@ from airflow.sdk import SecretCache
from airflow.sdk.log import init_log_file, logging_processors
from airflow.typing_compat import assert_never
from airflow.utils.file import list_py_file_paths, might_contain_dag
+from airflow.utils.helpers import prune_dict
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.net import get_hostname
from airflow.utils.process_utils import (
@@ -719,7 +720,12 @@ class DagFileProcessorManager(LoggingMixin):
)
self._callback_to_execute[file_info].append(request)
self._add_files_to_queue([file_info], mode="front")
- stats.incr("dag_processing.other_callback_count")
+ team_name = (
+ DagBundleModel.get_team_name(file_info.bundle_name)
+ if conf.getboolean("core", "multi_team")
+ else None
+ )
+ stats.incr("dag_processing.other_callback_count",
tags=prune_dict({"team_name": team_name}))
@provide_session
def get_bundle_state(self, bundle_name: str, *, session: Session =
NEW_SESSION) -> BundleState | None:
@@ -1020,6 +1026,14 @@ class DagFileProcessorManager(LoggingMixin):
utcnow = timezone.utcnow()
now = time.monotonic()
+ if conf.getboolean("core", "multi_team"):
+ bundle_names = {bundle_name for bundle_name in known_files}
+ bundle_to_team = {
+ bundle_name: DagBundleModel.get_team_name(bundle_name) for
bundle_name in bundle_names
+ }
+ else:
+ bundle_to_team = {}
+
for files in known_files.values():
for file in files:
stat = self._file_stats[file]
@@ -1040,11 +1054,14 @@ class DagFileProcessorManager(LoggingMixin):
stats.gauge(
"dag_processing.last_run.seconds_ago",
seconds_ago,
- tags={
- "file_path": file.normalized_file_path_for_stats,
- "bundle_name":
normalize_name_for_stats(file.bundle_name),
- "file_name": file_name,
- },
+ tags=prune_dict(
+ {
+ "file_path":
file.normalized_file_path_for_stats,
+ "bundle_name":
normalize_name_for_stats(file.bundle_name),
+ "file_name": file_name,
+ "team_name":
bundle_to_team.get(file.bundle_name),
+ }
+ ),
)
rows.append(
@@ -1136,6 +1153,15 @@ class DagFileProcessorManager(LoggingMixin):
def terminate_orphan_processes(self, present: set[DagFileInfo]):
"""Stop processors that are working on deleted files."""
present_keys = {file.presence_key for file in present}
+
+ if conf.getboolean("core", "multi_team"):
+ bundle_names = {file.bundle_name for file in self._processors}
+ bundle_to_team = {
+ bundle_name: DagBundleModel.get_team_name(bundle_name) for
bundle_name in bundle_names
+ }
+ else:
+ bundle_to_team = {}
+
for file in list(self._processors.keys()):
if file.presence_key not in present_keys:
processor = self._processors.pop(file, None)
@@ -1145,7 +1171,13 @@ class DagFileProcessorManager(LoggingMixin):
self.log.warning("Stopping processor for %s", file_name)
stats.decr(
"dag_processing.processes",
- tags={"file_path": file.normalized_file_path_for_stats,
"action": "stop"},
+ tags=prune_dict(
+ {
+ "file_path": file.normalized_file_path_for_stats,
+ "action": "stop",
+ "team_name": bundle_to_team.get(file.bundle_name),
+ }
+ ),
)
processor.kill(signal.SIGKILL)
processor.logger_filehandle.close()
@@ -1183,6 +1215,9 @@ class DagFileProcessorManager(LoggingMixin):
run_duration = time.monotonic() - proc.start_time
finish_time = timezone.utcnow()
+ team_name = (
+ DagBundleModel.get_team_name(file.bundle_name) if
conf.getboolean("core", "multi_team") else None
+ )
next_stat = process_parse_results(
run_duration=run_duration,
finish_time=finish_time,
@@ -1191,6 +1226,7 @@ class DagFileProcessorManager(LoggingMixin):
parsing_result=proc.parsing_result,
is_callback_only=is_callback_only,
relative_fileloc=str(file.rel_path),
+ team_name=team_name,
)
if proc.parsing_result is not None:
@@ -1358,6 +1394,14 @@ class DagFileProcessorManager(LoggingMixin):
def _start_new_processes(self):
"""Start more processors if we have enough slots and files to
process."""
+ if conf.getboolean("core", "multi_team"):
+ bundle_names = {file.bundle_name for file in self._file_queue}
+ bundle_to_team = {
+ bundle_name: DagBundleModel.get_team_name(bundle_name) for
bundle_name in bundle_names
+ }
+ else:
+ bundle_to_team = {}
+
while self._parallelism > len(self._processors) and self._file_queue:
file, _ = self._file_queue.popitem(last=False)
# Stop creating duplicate processor i.e. processor with the same
filepath
@@ -1367,7 +1411,13 @@ class DagFileProcessorManager(LoggingMixin):
processor = self._create_process(file)
stats.incr(
"dag_processing.processes",
- tags={"file_path": file.normalized_file_path_for_stats,
"action": "start"},
+ tags=prune_dict(
+ {
+ "file_path": file.normalized_file_path_for_stats,
+ "action": "start",
+ "team_name": bundle_to_team.get(file.bundle_name),
+ }
+ ),
)
self._processors[file] = processor
@@ -1533,6 +1583,15 @@ class DagFileProcessorManager(LoggingMixin):
"""Kill any file processors that timeout to defend against process
hangs."""
now = time.monotonic()
processors_to_remove = []
+
+ if conf.getboolean("core", "multi_team"):
+ bundle_names = {file.bundle_name for file in self._processors}
+ bundle_to_team = {
+ bundle_name: DagBundleModel.get_team_name(bundle_name) for
bundle_name in bundle_names
+ }
+ else:
+ bundle_to_team = {}
+
for file, processor in self._processors.items():
duration = now - processor.start_time
if duration > self.processor_timeout:
@@ -1544,8 +1603,17 @@ class DagFileProcessorManager(LoggingMixin):
self.processor_timeout,
)
file_path_tag = file.normalized_file_path_for_stats
- stats.decr("dag_processing.processes", tags={"file_path":
file_path_tag, "action": "timeout"})
- stats.incr("dag_processing.processor_timeouts",
tags={"file_path": file_path_tag})
+ team_name = bundle_to_team.get(file.bundle_name)
+ stats.decr(
+ "dag_processing.processes",
+ tags=prune_dict(
+ {"file_path": file_path_tag, "action": "timeout",
"team_name": team_name}
+ ),
+ )
+ stats.incr(
+ "dag_processing.processor_timeouts",
+ tags=prune_dict({"file_path": file_path_tag, "team_name":
team_name}),
+ )
processor.kill(signal.SIGKILL)
processors_to_remove.append(file)
@@ -1606,10 +1674,24 @@ class DagFileProcessorManager(LoggingMixin):
def terminate(self):
"""Stop all running processors."""
+ if conf.getboolean("core", "multi_team"):
+ bundle_names = {file.bundle_name for file in self._processors}
+ bundle_to_team = {
+ bundle_name: DagBundleModel.get_team_name(bundle_name) for
bundle_name in bundle_names
+ }
+ else:
+ bundle_to_team = {}
+
for file, processor in self._processors.items():
stats.decr(
"dag_processing.processes",
- tags={"file_path": file.normalized_file_path_for_stats,
"action": "terminate"},
+ tags=prune_dict(
+ {
+ "file_path": file.normalized_file_path_for_stats,
+ "action": "terminate",
+ "team_name": bundle_to_team.get(file.bundle_name),
+ }
+ ),
)
# SIGTERM, wait 5s, SIGKILL if still alive
processor.kill(signal.SIGTERM, escalation_delay=5.0)
@@ -1642,6 +1724,7 @@ def process_parse_results(
*,
is_callback_only: bool = False,
relative_fileloc: str | None = None,
+ team_name: str | None = None,
) -> DagFileStat:
"""
Create a DagFileStat from parsing results and emit metrics.
@@ -1655,7 +1738,7 @@ def process_parse_results(
last_duration=run_duration,
run_count=run_count, # Don't increment for callback-only
processing
)
- stats.incr("dag_processing.callback_only_count")
+ stats.incr("dag_processing.callback_only_count",
tags=prune_dict({"team_name": team_name}))
else:
# Actual DAG parsing or import error
stat = DagFileStat(
@@ -1673,7 +1756,9 @@ def process_parse_results(
stats.timing(
"dag_processing.last_duration",
stat.last_duration,
- tags={"bundle_name": normalized_bundle, "file_name": file_name},
+ tags=prune_dict(
+ {"bundle_name": normalized_bundle, "file_name": file_name,
"team_name": team_name}
+ ),
)
if parsing_result is None:
diff --git a/airflow-core/src/airflow/dag_processing/processor.py
b/airflow-core/src/airflow/dag_processing/processor.py
index f7b3affd008..251b7b62974 100644
--- a/airflow-core/src/airflow/dag_processing/processor.py
+++ b/airflow-core/src/airflow/dag_processing/processor.py
@@ -38,6 +38,7 @@ from airflow.callbacks.callback_requests import (
from airflow.configuration import conf
from airflow.dag_processing.bundles.base import BundleVersionLock
from airflow.dag_processing.dagbag import BundleDagBag, DagBag
+from airflow.models.dag import DagModel
from airflow.sdk.exceptions import TaskNotFound
from airflow.sdk.execution_time.comms import (
ConnectionResult,
@@ -90,6 +91,7 @@ from airflow.sdk.log import mask_secret
from airflow.serialization.serialized_objects import DagSerialization,
LazyDeserializedDAG
from airflow.utils.dag_version_inflation_checker import
check_dag_file_stability
from airflow.utils.file import iter_airflow_imports
+from airflow.utils.helpers import prune_dict
from airflow.utils.state import TaskInstanceState
if TYPE_CHECKING:
@@ -386,7 +388,19 @@ def _execute_dag_callbacks(dagbag: DagBag, request:
DagCallbackRequest, log: Fil
callback(context)
except Exception:
log.exception("Callback failed", dag_id=request.dag_id)
- stats.incr("dag.callback_exceptions", tags={"dag_id":
request.dag_id})
+ stats.incr(
+ "dag.callback_exceptions",
+ tags=prune_dict(
+ {
+ "dag_id": request.dag_id,
+ "team_name": (
+ DagModel.get_team_name(request.dag_id)
+ if conf.getboolean("core", "multi_team")
+ else None
+ ),
+ }
+ ),
+ )
def _execute_task_callbacks(dagbag: DagBag, request: TaskCallbackRequest, log:
FilteringBoundLogger) -> None:
diff --git a/airflow-core/src/airflow/models/dagrun.py
b/airflow-core/src/airflow/models/dagrun.py
index 80e279f1499..ab024142918 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -1420,6 +1420,7 @@ class DagRun(Base, LoggingMixin):
TaskInstance as TIDataModel,
TIRunContext,
)
+ from airflow.models.dag import DagModel
from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance
if relevant_ti:
@@ -1463,7 +1464,19 @@ class DagRun(Base, LoggingMixin):
callback(context)
except Exception:
self.log.exception("Callback failed for %s", dag.dag_id)
- stats.incr("dag.callback_exceptions", tags={"dag_id":
dag.dag_id})
+ stats.incr(
+ "dag.callback_exceptions",
+ tags=prune_dict(
+ {
+ "dag_id": dag.dag_id,
+ "team_name": (
+ DagModel.get_team_name(dag.dag_id)
+ if airflow_conf.getboolean("core",
"multi_team")
+ else None
+ ),
+ }
+ ),
+ )
def _get_ready_tis(
self,
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py
b/airflow-core/tests/unit/dag_processing/test_manager.py
index 0f6c061fc43..6d10e5c1a06 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -2922,3 +2922,295 @@ class TestDagFileProcessorManager:
assert manager._bundle_versions["mock_bundle"] == "newhash"
assert manager._bundle_version_data["mock_bundle"] == test_data
+
+
+class TestMultiTeamMetrics:
+ """Tests for team_name tag on dag processing metrics in multi-team mode."""
+
+ def mock_processor(self, start_time: float | None = None) ->
DagFileProcessorProcess:
+ proc = MagicMock()
+ proc.wait.return_value = 0
+ read_end, write_end = socketpair()
+ ret = DagFileProcessorProcess(
+ process_log=MagicMock(),
+ id=uuid7(),
+ pid=1234,
+ process=proc,
+ stdin=write_end,
+ logger_filehandle=MagicMock(),
+ client=MagicMock(),
+ bundle_name="testing",
+ dag_file_rel_path="test_dag.py",
+ )
+ if start_time:
+ ret.start_time = start_time
+ ret._open_sockets.clear()
+ return ret
+
+ @conf_vars({("core", "multi_team"): "true"})
+ @mock.patch("airflow.dag_processing.manager.DagBundleModel.get_team_name",
return_value="team_alpha")
+ @mock.patch("airflow.dag_processing.manager.stats.gauge")
+ def test_log_file_processing_stats_includes_team_name(self, mock_gauge,
mock_get_team_name):
+ manager = DagFileProcessorManager(max_runs=1)
+ dag_file = DagFileInfo(
+ bundle_name="testing",
+ rel_path=Path("dag_file.py"),
+ bundle_path=TEST_DAGS_FOLDER,
+ )
+ manager._file_stats[dag_file] = DagFileStat(
+ last_finish_time=timezone.utcnow() - timedelta(seconds=5),
+ )
+
+ manager._log_file_processing_stats({"testing": {dag_file}})
+
+ mock_gauge.assert_any_call(
+ "dag_processing.last_run.seconds_ago",
+ mock.ANY,
+ tags={
+ "file_path": "dag_file.py",
+ "bundle_name": "testing",
+ "file_name": "dag_file",
+ "team_name": "team_alpha",
+ },
+ )
+
+ @conf_vars({("core", "multi_team"): "false"})
+ @mock.patch("airflow.dag_processing.manager.DagBundleModel.get_team_name")
+ @mock.patch("airflow.dag_processing.manager.stats.gauge")
+ def test_log_file_processing_stats_omits_team_name_when_not_multi_team(
+ self, mock_gauge, mock_get_team_name
+ ):
+ manager = DagFileProcessorManager(max_runs=1)
+ dag_file = DagFileInfo(
+ bundle_name="testing",
+ rel_path=Path("dag_file.py"),
+ bundle_path=TEST_DAGS_FOLDER,
+ )
+ manager._file_stats[dag_file] = DagFileStat(
+ last_finish_time=timezone.utcnow() - timedelta(seconds=5),
+ )
+
+ manager._log_file_processing_stats({"testing": {dag_file}})
+
+ mock_get_team_name.assert_not_called()
+ mock_gauge.assert_any_call(
+ "dag_processing.last_run.seconds_ago",
+ mock.ANY,
+ tags={
+ "file_path": "dag_file.py",
+ "bundle_name": "testing",
+ "file_name": "dag_file",
+ },
+ )
+
+ @conf_vars({("core", "multi_team"): "true"})
+ @mock.patch("airflow.dag_processing.manager.DagBundleModel.get_team_name",
return_value="team_alpha")
+ @mock.patch("airflow.dag_processing.manager.stats.incr")
+ def test_start_new_processes_includes_team_name(self, mock_incr,
mock_get_team_name):
+ manager = DagFileProcessorManager(max_runs=1)
+ dag_file = DagFileInfo(
+ bundle_name="testing",
+ rel_path=Path("dag_file.py"),
+ bundle_path=TEST_DAGS_FOLDER,
+ )
+ manager._file_queue[dag_file] = None
+ manager._create_process = MagicMock(return_value=self.mock_processor())
+
+ manager._start_new_processes()
+
+ mock_incr.assert_any_call(
+ "dag_processing.processes",
+ tags={"file_path": "dag_file.py", "action": "start", "team_name":
"team_alpha"},
+ )
+
+ @conf_vars({("core", "multi_team"): "true"})
+ @mock.patch("airflow.dag_processing.manager.DagBundleModel.get_team_name",
return_value="team_alpha")
+ @mock.patch("airflow.dag_processing.manager.stats.incr")
+ @mock.patch("airflow.dag_processing.manager.stats.decr")
+ def test_kill_timed_out_processors_includes_team_name(self, mock_decr,
mock_incr, mock_get_team_name):
+ manager = DagFileProcessorManager(max_runs=1, processor_timeout=5)
+ start_time = time.monotonic() - manager.processor_timeout - 1
+ processor = self.mock_processor(start_time=start_time)
+ dag_file = DagFileInfo(
+ bundle_name="testing", rel_path=Path("dag_file.py"),
bundle_path=TEST_DAGS_FOLDER
+ )
+ manager._processors = {dag_file: processor}
+
+ with mock.patch.object(type(processor), "kill"):
+ manager._kill_timed_out_processors()
+
+ mock_decr.assert_called_once_with(
+ "dag_processing.processes",
+ tags={"file_path": "dag_file.py", "action": "timeout",
"team_name": "team_alpha"},
+ )
+ mock_incr.assert_any_call(
+ "dag_processing.processor_timeouts",
+ tags={"file_path": "dag_file.py", "team_name": "team_alpha"},
+ )
+
+ @conf_vars({("core", "multi_team"): "true"})
+ @mock.patch("airflow.dag_processing.manager.DagBundleModel.get_team_name",
return_value="team_alpha")
+ @mock.patch("airflow.dag_processing.manager.stats.timing")
+ def test_process_parse_results_includes_team_name(self, mock_timing,
mock_get_team_name):
+ from airflow.dag_processing.manager import process_parse_results
+
+ result = DagFileParsingResult(fileloc="/tmp/dag.py",
serialized_dags=[])
+ process_parse_results(
+ run_duration=1.5,
+ finish_time=timezone.utcnow(),
+ run_count=0,
+ bundle_name="testing",
+ parsing_result=result,
+ relative_fileloc="dag.py",
+ team_name="team_alpha",
+ )
+
+ mock_timing.assert_called_once_with(
+ "dag_processing.last_duration",
+ 1.5,
+ tags={"bundle_name": "testing", "file_name": "dag", "team_name":
"team_alpha"},
+ )
+
+ @conf_vars({("core", "multi_team"): "false"})
+ @mock.patch("airflow.dag_processing.manager.stats.timing")
+ def test_process_parse_results_omits_team_name_when_none(self,
mock_timing):
+ from airflow.dag_processing.manager import process_parse_results
+
+ result = DagFileParsingResult(fileloc="/tmp/dag.py",
serialized_dags=[])
+ process_parse_results(
+ run_duration=1.5,
+ finish_time=timezone.utcnow(),
+ run_count=0,
+ bundle_name="testing",
+ parsing_result=result,
+ relative_fileloc="dag.py",
+ team_name=None,
+ )
+
+ mock_timing.assert_called_once_with(
+ "dag_processing.last_duration",
+ 1.5,
+ tags={"bundle_name": "testing", "file_name": "dag"},
+ )
+
+ @pytest.mark.parametrize(
+ ("multi_team", "team_name", "expected_tags"),
+ [
+ pytest.param(
+ "true",
+ "team_alpha",
+ {"file_path": "dag_file.py", "action": "stop", "team_name":
"team_alpha"},
+ id="with_team",
+ ),
+ pytest.param(
+ "false",
+ None,
+ {"file_path": "dag_file.py", "action": "stop"},
+ id="without_team",
+ ),
+ ],
+ )
+ @mock.patch("airflow.dag_processing.manager.stats.decr")
+ def test_terminate_orphan_processes_includes_team_name(
+ self, mock_decr, multi_team, team_name, expected_tags
+ ):
+ manager = DagFileProcessorManager(max_runs=1)
+ dag_file = DagFileInfo(
+ bundle_name="testing", rel_path=Path("dag_file.py"),
bundle_path=TEST_DAGS_FOLDER
+ )
+ processor = self.mock_processor()
+ manager._processors = {dag_file: processor}
+
+ with (
+ conf_vars({("core", "multi_team"): multi_team}),
+
mock.patch("airflow.dag_processing.manager.DagBundleModel.get_team_name",
return_value=team_name),
+ mock.patch.object(type(processor), "kill"),
+ ):
+ # Empty "present" set means the file is orphaned, so its processor
is stopped.
+ manager.terminate_orphan_processes(present=set())
+
+ mock_decr.assert_called_once_with("dag_processing.processes",
tags=expected_tags)
+
+ @pytest.mark.parametrize(
+ ("multi_team", "team_name", "expected_tags"),
+ [
+ pytest.param(
+ "true",
+ "team_alpha",
+ {"file_path": "dag_file.py", "action": "terminate",
"team_name": "team_alpha"},
+ id="with_team",
+ ),
+ pytest.param(
+ "false",
+ None,
+ {"file_path": "dag_file.py", "action": "terminate"},
+ id="without_team",
+ ),
+ ],
+ )
+ @mock.patch("airflow.dag_processing.manager.stats.decr")
+ def test_terminate_includes_team_name(self, mock_decr, multi_team,
team_name, expected_tags):
+ manager = DagFileProcessorManager(max_runs=1)
+ dag_file = DagFileInfo(
+ bundle_name="testing", rel_path=Path("dag_file.py"),
bundle_path=TEST_DAGS_FOLDER
+ )
+ processor = self.mock_processor()
+ manager._processors = {dag_file: processor}
+
+ with (
+ conf_vars({("core", "multi_team"): multi_team}),
+
mock.patch("airflow.dag_processing.manager.DagBundleModel.get_team_name",
return_value=team_name),
+ mock.patch.object(type(processor), "kill"),
+ ):
+ manager.terminate()
+
+ mock_decr.assert_called_once_with("dag_processing.processes",
tags=expected_tags)
+
+ @pytest.mark.parametrize(
+ ("team_name", "expected_tags"),
+ [
+ pytest.param("team_alpha", {"team_name": "team_alpha"},
id="with_team"),
+ pytest.param(None, {}, id="without_team"),
+ ],
+ )
+ @mock.patch("airflow.dag_processing.manager.stats.incr")
+ def test_process_parse_results_callback_only_count_includes_team_name(
+ self, mock_incr, team_name, expected_tags
+ ):
+ from airflow.dag_processing.manager import process_parse_results
+
+ process_parse_results(
+ run_duration=1.5,
+ finish_time=timezone.utcnow(),
+ run_count=0,
+ bundle_name="testing",
+ parsing_result=None,
+ is_callback_only=True,
+ relative_fileloc="dag.py",
+ team_name=team_name,
+ )
+
+
mock_incr.assert_called_once_with("dag_processing.callback_only_count",
tags=expected_tags)
+
+ @pytest.mark.parametrize(
+ ("multi_team", "team_name", "expected_tags"),
+ [
+ pytest.param("true", "team_alpha", {"team_name": "team_alpha"},
id="with_team"),
+ pytest.param("false", None, {}, id="without_team"),
+ ],
+ )
+ @mock.patch("airflow.dag_processing.manager.stats.incr")
+ def test_add_callback_to_queue_includes_team_name(self, mock_incr,
multi_team, team_name, expected_tags):
+ manager = DagFileProcessorManager(max_runs=1)
+ request = MagicMock(filepath="test_dag.py", bundle_name="testing",
bundle_version=None)
+ bundle = MagicMock(path=TEST_DAGS_FOLDER)
+
+ with (
+ conf_vars({("core", "multi_team"): multi_team}),
+ mock.patch.object(manager, "prepare_callback_bundle",
return_value=bundle),
+ mock.patch.object(manager, "_add_files_to_queue"),
+
mock.patch("airflow.dag_processing.manager.DagBundleModel.get_team_name",
return_value=team_name),
+ ):
+ manager._add_callback_to_queue(request)
+
+
mock_incr.assert_called_once_with("dag_processing.other_callback_count",
tags=expected_tags)
diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py
b/airflow-core/tests/unit/dag_processing/test_processor.py
index 2e3d6940cc7..599c36d47f6 100644
--- a/airflow-core/tests/unit/dag_processing/test_processor.py
+++ b/airflow-core/tests/unit/dag_processing/test_processor.py
@@ -2149,3 +2149,80 @@ class TestDagFileProcessorProcess:
"value": "super-secret-value",
"type": "VariableResult",
}
+
+
+class TestMultiTeamCallbackMetrics:
+ """Tests for team_name tag on dag.callback_exceptions in multi-team
mode."""
+
+ @conf_vars({("core", "multi_team"): "true"})
+ @patch("airflow.dag_processing.processor.DagModel.get_team_name",
return_value="team_alpha")
+ @patch("airflow.dag_processing.processor.stats.incr")
+ def test_callback_exception_includes_team_name(self, mock_incr,
mock_get_team_name, spy_agency):
+ def failing_callback(context):
+ raise RuntimeError("boom")
+
+ with DAG(dag_id="test_dag", on_failure_callback=failing_callback) as
dag:
+ BaseOperator(task_id="test_task")
+
+ @spy_agency.spy_for(DagBag.collect_dags, owner=DagBag)
+ def fake_collect_dags(self, *args, **kwargs):
+ self.dags[dag.dag_id] = dag
+
+ dagbag = DagBag()
+ dagbag.collect_dags()
+
+ request = DagCallbackRequest(
+ filepath="test.py",
+ dag_id="test_dag",
+ run_id="test_run",
+ bundle_name="testing",
+ bundle_version=None,
+ is_failure_callback=True,
+ msg="Test failure",
+ )
+
+ log = structlog.get_logger()
+ _execute_dag_callbacks(dagbag, request, log)
+
+ mock_incr.assert_called_once_with(
+ "dag.callback_exceptions",
+ tags={"dag_id": "test_dag", "team_name": "team_alpha"},
+ )
+
+ @conf_vars({("core", "multi_team"): "false"})
+ @patch("airflow.dag_processing.processor.DagModel.get_team_name")
+ @patch("airflow.dag_processing.processor.stats.incr")
+ def test_callback_exception_omits_team_name_when_not_multi_team(
+ self, mock_incr, mock_get_team_name, spy_agency
+ ):
+ def failing_callback(context):
+ raise RuntimeError("boom")
+
+ with DAG(dag_id="test_dag", on_failure_callback=failing_callback) as
dag:
+ BaseOperator(task_id="test_task")
+
+ @spy_agency.spy_for(DagBag.collect_dags, owner=DagBag)
+ def fake_collect_dags(self, *args, **kwargs):
+ self.dags[dag.dag_id] = dag
+
+ dagbag = DagBag()
+ dagbag.collect_dags()
+
+ request = DagCallbackRequest(
+ filepath="test.py",
+ dag_id="test_dag",
+ run_id="test_run",
+ bundle_name="testing",
+ bundle_version=None,
+ is_failure_callback=True,
+ msg="Test failure",
+ )
+
+ log = structlog.get_logger()
+ _execute_dag_callbacks(dagbag, request, log)
+
+ mock_get_team_name.assert_not_called()
+ mock_incr.assert_called_once_with(
+ "dag.callback_exceptions",
+ tags={"dag_id": "test_dag"},
+ )
diff --git a/airflow-core/tests/unit/models/test_dagrun.py
b/airflow-core/tests/unit/models/test_dagrun.py
index a8d384d9480..a55b93685d9 100644
--- a/airflow-core/tests/unit/models/test_dagrun.py
+++ b/airflow-core/tests/unit/models/test_dagrun.py
@@ -4118,6 +4118,37 @@ class TestDagRunHandleDagCallback:
assert "ti" not in context_received
assert context_received["run_id"] == dr.run_id
+ @pytest.mark.parametrize(
+ ("multi_team", "team_name", "expected_tags"),
+ [
+ pytest.param(
+ "true", "team_alpha", {"dag_id": "test_dag", "team_name":
"team_alpha"}, id="with_team"
+ ),
+ pytest.param("false", None, {"dag_id": "test_dag"},
id="without_team"),
+ ],
+ )
+ @mock.patch("airflow._shared.observability.metrics.stats.incr")
+ def test_callback_exception_team_name_tag(
+ self, mock_incr, multi_team, team_name, expected_tags, dag_maker,
session
+ ):
+ def failing_callback(context):
+ raise RuntimeError("boom")
+
+ with dag_maker("test_dag", session=session,
on_failure_callback=failing_callback) as dag:
+ BashOperator(task_id="test_task", bash_command="echo 1")
+
+ dr = dag_maker.create_dagrun()
+ dag.on_failure_callback = failing_callback
+ dag.has_on_failure_callback = True
+
+ with (
+ conf_vars({("core", "multi_team"): multi_team}),
+ mock.patch("airflow.models.dag.DagModel.get_team_name",
return_value=team_name),
+ ):
+ dr.execute_dag_callbacks(dag, success=False)
+
+ mock_incr.assert_any_call("dag.callback_exceptions",
tags=expected_tags)
+
class TestDagRunTracing:
"""Tests for DagRun OpenTelemetry span behavior."""