This is an automated email from the ASF dual-hosted git repository.

ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new aa0609f0867 Add team_name tag to dag processor metrics for multi-team 
deployments (#68599)
aa0609f0867 is described below

commit aa0609f08672c3438d451d48fe0832ae1c9cd925
Author: D. Ferruzzi <[email protected]>
AuthorDate: Thu Jun 18 09:29:57 2026 -0700

    Add team_name tag to dag processor metrics for multi-team deployments 
(#68599)
---
 airflow-core/src/airflow/dag_processing/manager.py | 111 +++++++-
 .../src/airflow/dag_processing/processor.py        |  16 +-
 airflow-core/src/airflow/models/dagrun.py          |  15 +-
 .../tests/unit/dag_processing/test_manager.py      | 292 +++++++++++++++++++++
 .../tests/unit/dag_processing/test_processor.py    |  77 ++++++
 airflow-core/tests/unit/models/test_dagrun.py      |  31 +++
 6 files changed, 527 insertions(+), 15 deletions(-)

diff --git a/airflow-core/src/airflow/dag_processing/manager.py 
b/airflow-core/src/airflow/dag_processing/manager.py
index 92ca10c67fa..e43920f2618 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -69,6 +69,7 @@ from airflow.sdk import SecretCache
 from airflow.sdk.log import init_log_file, logging_processors
 from airflow.typing_compat import assert_never
 from airflow.utils.file import list_py_file_paths, might_contain_dag
+from airflow.utils.helpers import prune_dict
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.net import get_hostname
 from airflow.utils.process_utils import (
@@ -719,7 +720,12 @@ class DagFileProcessorManager(LoggingMixin):
         )
         self._callback_to_execute[file_info].append(request)
         self._add_files_to_queue([file_info], mode="front")
-        stats.incr("dag_processing.other_callback_count")
+        team_name = (
+            DagBundleModel.get_team_name(file_info.bundle_name)
+            if conf.getboolean("core", "multi_team")
+            else None
+        )
+        stats.incr("dag_processing.other_callback_count", 
tags=prune_dict({"team_name": team_name}))
 
     @provide_session
     def get_bundle_state(self, bundle_name: str, *, session: Session = 
NEW_SESSION) -> BundleState | None:
@@ -1020,6 +1026,14 @@ class DagFileProcessorManager(LoggingMixin):
         utcnow = timezone.utcnow()
         now = time.monotonic()
 
+        if conf.getboolean("core", "multi_team"):
+            bundle_names = {bundle_name for bundle_name in known_files}
+            bundle_to_team = {
+                bundle_name: DagBundleModel.get_team_name(bundle_name) for 
bundle_name in bundle_names
+            }
+        else:
+            bundle_to_team = {}
+
         for files in known_files.values():
             for file in files:
                 stat = self._file_stats[file]
@@ -1040,11 +1054,14 @@ class DagFileProcessorManager(LoggingMixin):
                     stats.gauge(
                         "dag_processing.last_run.seconds_ago",
                         seconds_ago,
-                        tags={
-                            "file_path": file.normalized_file_path_for_stats,
-                            "bundle_name": 
normalize_name_for_stats(file.bundle_name),
-                            "file_name": file_name,
-                        },
+                        tags=prune_dict(
+                            {
+                                "file_path": 
file.normalized_file_path_for_stats,
+                                "bundle_name": 
normalize_name_for_stats(file.bundle_name),
+                                "file_name": file_name,
+                                "team_name": 
bundle_to_team.get(file.bundle_name),
+                            }
+                        ),
                     )
 
                 rows.append(
@@ -1136,6 +1153,15 @@ class DagFileProcessorManager(LoggingMixin):
     def terminate_orphan_processes(self, present: set[DagFileInfo]):
         """Stop processors that are working on deleted files."""
         present_keys = {file.presence_key for file in present}
+
+        if conf.getboolean("core", "multi_team"):
+            bundle_names = {file.bundle_name for file in self._processors}
+            bundle_to_team = {
+                bundle_name: DagBundleModel.get_team_name(bundle_name) for 
bundle_name in bundle_names
+            }
+        else:
+            bundle_to_team = {}
+
         for file in list(self._processors.keys()):
             if file.presence_key not in present_keys:
                 processor = self._processors.pop(file, None)
@@ -1145,7 +1171,13 @@ class DagFileProcessorManager(LoggingMixin):
                 self.log.warning("Stopping processor for %s", file_name)
                 stats.decr(
                     "dag_processing.processes",
-                    tags={"file_path": file.normalized_file_path_for_stats, 
"action": "stop"},
+                    tags=prune_dict(
+                        {
+                            "file_path": file.normalized_file_path_for_stats,
+                            "action": "stop",
+                            "team_name": bundle_to_team.get(file.bundle_name),
+                        }
+                    ),
                 )
                 processor.kill(signal.SIGKILL)
                 processor.logger_filehandle.close()
@@ -1183,6 +1215,9 @@ class DagFileProcessorManager(LoggingMixin):
 
         run_duration = time.monotonic() - proc.start_time
         finish_time = timezone.utcnow()
+        team_name = (
+            DagBundleModel.get_team_name(file.bundle_name) if 
conf.getboolean("core", "multi_team") else None
+        )
         next_stat = process_parse_results(
             run_duration=run_duration,
             finish_time=finish_time,
@@ -1191,6 +1226,7 @@ class DagFileProcessorManager(LoggingMixin):
             parsing_result=proc.parsing_result,
             is_callback_only=is_callback_only,
             relative_fileloc=str(file.rel_path),
+            team_name=team_name,
         )
 
         if proc.parsing_result is not None:
@@ -1358,6 +1394,14 @@ class DagFileProcessorManager(LoggingMixin):
 
     def _start_new_processes(self):
         """Start more processors if we have enough slots and files to 
process."""
+        if conf.getboolean("core", "multi_team"):
+            bundle_names = {file.bundle_name for file in self._file_queue}
+            bundle_to_team = {
+                bundle_name: DagBundleModel.get_team_name(bundle_name) for 
bundle_name in bundle_names
+            }
+        else:
+            bundle_to_team = {}
+
         while self._parallelism > len(self._processors) and self._file_queue:
             file, _ = self._file_queue.popitem(last=False)
             # Stop creating duplicate processor i.e. processor with the same 
filepath
@@ -1367,7 +1411,13 @@ class DagFileProcessorManager(LoggingMixin):
             processor = self._create_process(file)
             stats.incr(
                 "dag_processing.processes",
-                tags={"file_path": file.normalized_file_path_for_stats, 
"action": "start"},
+                tags=prune_dict(
+                    {
+                        "file_path": file.normalized_file_path_for_stats,
+                        "action": "start",
+                        "team_name": bundle_to_team.get(file.bundle_name),
+                    }
+                ),
             )
 
             self._processors[file] = processor
@@ -1533,6 +1583,15 @@ class DagFileProcessorManager(LoggingMixin):
         """Kill any file processors that timeout to defend against process 
hangs."""
         now = time.monotonic()
         processors_to_remove = []
+
+        if conf.getboolean("core", "multi_team"):
+            bundle_names = {file.bundle_name for file in self._processors}
+            bundle_to_team = {
+                bundle_name: DagBundleModel.get_team_name(bundle_name) for 
bundle_name in bundle_names
+            }
+        else:
+            bundle_to_team = {}
+
         for file, processor in self._processors.items():
             duration = now - processor.start_time
             if duration > self.processor_timeout:
@@ -1544,8 +1603,17 @@ class DagFileProcessorManager(LoggingMixin):
                     self.processor_timeout,
                 )
                 file_path_tag = file.normalized_file_path_for_stats
-                stats.decr("dag_processing.processes", tags={"file_path": 
file_path_tag, "action": "timeout"})
-                stats.incr("dag_processing.processor_timeouts", 
tags={"file_path": file_path_tag})
+                team_name = bundle_to_team.get(file.bundle_name)
+                stats.decr(
+                    "dag_processing.processes",
+                    tags=prune_dict(
+                        {"file_path": file_path_tag, "action": "timeout", 
"team_name": team_name}
+                    ),
+                )
+                stats.incr(
+                    "dag_processing.processor_timeouts",
+                    tags=prune_dict({"file_path": file_path_tag, "team_name": 
team_name}),
+                )
                 processor.kill(signal.SIGKILL)
 
                 processors_to_remove.append(file)
@@ -1606,10 +1674,24 @@ class DagFileProcessorManager(LoggingMixin):
 
     def terminate(self):
         """Stop all running processors."""
+        if conf.getboolean("core", "multi_team"):
+            bundle_names = {file.bundle_name for file in self._processors}
+            bundle_to_team = {
+                bundle_name: DagBundleModel.get_team_name(bundle_name) for 
bundle_name in bundle_names
+            }
+        else:
+            bundle_to_team = {}
+
         for file, processor in self._processors.items():
             stats.decr(
                 "dag_processing.processes",
-                tags={"file_path": file.normalized_file_path_for_stats, 
"action": "terminate"},
+                tags=prune_dict(
+                    {
+                        "file_path": file.normalized_file_path_for_stats,
+                        "action": "terminate",
+                        "team_name": bundle_to_team.get(file.bundle_name),
+                    }
+                ),
             )
             # SIGTERM, wait 5s, SIGKILL if still alive
             processor.kill(signal.SIGTERM, escalation_delay=5.0)
@@ -1642,6 +1724,7 @@ def process_parse_results(
     *,
     is_callback_only: bool = False,
     relative_fileloc: str | None = None,
+    team_name: str | None = None,
 ) -> DagFileStat:
     """
     Create a DagFileStat from parsing results and emit metrics.
@@ -1655,7 +1738,7 @@ def process_parse_results(
             last_duration=run_duration,
             run_count=run_count,  # Don't increment for callback-only 
processing
         )
-        stats.incr("dag_processing.callback_only_count")
+        stats.incr("dag_processing.callback_only_count", 
tags=prune_dict({"team_name": team_name}))
     else:
         # Actual DAG parsing or import error
         stat = DagFileStat(
@@ -1673,7 +1756,9 @@ def process_parse_results(
         stats.timing(
             "dag_processing.last_duration",
             stat.last_duration,
-            tags={"bundle_name": normalized_bundle, "file_name": file_name},
+            tags=prune_dict(
+                {"bundle_name": normalized_bundle, "file_name": file_name, 
"team_name": team_name}
+            ),
         )
 
     if parsing_result is None:
diff --git a/airflow-core/src/airflow/dag_processing/processor.py 
b/airflow-core/src/airflow/dag_processing/processor.py
index f7b3affd008..251b7b62974 100644
--- a/airflow-core/src/airflow/dag_processing/processor.py
+++ b/airflow-core/src/airflow/dag_processing/processor.py
@@ -38,6 +38,7 @@ from airflow.callbacks.callback_requests import (
 from airflow.configuration import conf
 from airflow.dag_processing.bundles.base import BundleVersionLock
 from airflow.dag_processing.dagbag import BundleDagBag, DagBag
+from airflow.models.dag import DagModel
 from airflow.sdk.exceptions import TaskNotFound
 from airflow.sdk.execution_time.comms import (
     ConnectionResult,
@@ -90,6 +91,7 @@ from airflow.sdk.log import mask_secret
 from airflow.serialization.serialized_objects import DagSerialization, 
LazyDeserializedDAG
 from airflow.utils.dag_version_inflation_checker import 
check_dag_file_stability
 from airflow.utils.file import iter_airflow_imports
+from airflow.utils.helpers import prune_dict
 from airflow.utils.state import TaskInstanceState
 
 if TYPE_CHECKING:
@@ -386,7 +388,19 @@ def _execute_dag_callbacks(dagbag: DagBag, request: 
DagCallbackRequest, log: Fil
             callback(context)
         except Exception:
             log.exception("Callback failed", dag_id=request.dag_id)
-            stats.incr("dag.callback_exceptions", tags={"dag_id": 
request.dag_id})
+            stats.incr(
+                "dag.callback_exceptions",
+                tags=prune_dict(
+                    {
+                        "dag_id": request.dag_id,
+                        "team_name": (
+                            DagModel.get_team_name(request.dag_id)
+                            if conf.getboolean("core", "multi_team")
+                            else None
+                        ),
+                    }
+                ),
+            )
 
 
 def _execute_task_callbacks(dagbag: DagBag, request: TaskCallbackRequest, log: 
FilteringBoundLogger) -> None:
diff --git a/airflow-core/src/airflow/models/dagrun.py 
b/airflow-core/src/airflow/models/dagrun.py
index 80e279f1499..ab024142918 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -1420,6 +1420,7 @@ class DagRun(Base, LoggingMixin):
             TaskInstance as TIDataModel,
             TIRunContext,
         )
+        from airflow.models.dag import DagModel
         from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance
 
         if relevant_ti:
@@ -1463,7 +1464,19 @@ class DagRun(Base, LoggingMixin):
                 callback(context)
             except Exception:
                 self.log.exception("Callback failed for %s", dag.dag_id)
-                stats.incr("dag.callback_exceptions", tags={"dag_id": 
dag.dag_id})
+                stats.incr(
+                    "dag.callback_exceptions",
+                    tags=prune_dict(
+                        {
+                            "dag_id": dag.dag_id,
+                            "team_name": (
+                                DagModel.get_team_name(dag.dag_id)
+                                if airflow_conf.getboolean("core", 
"multi_team")
+                                else None
+                            ),
+                        }
+                    ),
+                )
 
     def _get_ready_tis(
         self,
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py 
b/airflow-core/tests/unit/dag_processing/test_manager.py
index 0f6c061fc43..6d10e5c1a06 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -2922,3 +2922,295 @@ class TestDagFileProcessorManager:
 
         assert manager._bundle_versions["mock_bundle"] == "newhash"
         assert manager._bundle_version_data["mock_bundle"] == test_data
+
+
+class TestMultiTeamMetrics:
+    """Tests for team_name tag on dag processing metrics in multi-team mode."""
+
+    def mock_processor(self, start_time: float | None = None) -> 
DagFileProcessorProcess:
+        proc = MagicMock()
+        proc.wait.return_value = 0
+        read_end, write_end = socketpair()
+        ret = DagFileProcessorProcess(
+            process_log=MagicMock(),
+            id=uuid7(),
+            pid=1234,
+            process=proc,
+            stdin=write_end,
+            logger_filehandle=MagicMock(),
+            client=MagicMock(),
+            bundle_name="testing",
+            dag_file_rel_path="test_dag.py",
+        )
+        if start_time:
+            ret.start_time = start_time
+        ret._open_sockets.clear()
+        return ret
+
+    @conf_vars({("core", "multi_team"): "true"})
+    @mock.patch("airflow.dag_processing.manager.DagBundleModel.get_team_name", 
return_value="team_alpha")
+    @mock.patch("airflow.dag_processing.manager.stats.gauge")
+    def test_log_file_processing_stats_includes_team_name(self, mock_gauge, 
mock_get_team_name):
+        manager = DagFileProcessorManager(max_runs=1)
+        dag_file = DagFileInfo(
+            bundle_name="testing",
+            rel_path=Path("dag_file.py"),
+            bundle_path=TEST_DAGS_FOLDER,
+        )
+        manager._file_stats[dag_file] = DagFileStat(
+            last_finish_time=timezone.utcnow() - timedelta(seconds=5),
+        )
+
+        manager._log_file_processing_stats({"testing": {dag_file}})
+
+        mock_gauge.assert_any_call(
+            "dag_processing.last_run.seconds_ago",
+            mock.ANY,
+            tags={
+                "file_path": "dag_file.py",
+                "bundle_name": "testing",
+                "file_name": "dag_file",
+                "team_name": "team_alpha",
+            },
+        )
+
+    @conf_vars({("core", "multi_team"): "false"})
+    @mock.patch("airflow.dag_processing.manager.DagBundleModel.get_team_name")
+    @mock.patch("airflow.dag_processing.manager.stats.gauge")
+    def test_log_file_processing_stats_omits_team_name_when_not_multi_team(
+        self, mock_gauge, mock_get_team_name
+    ):
+        manager = DagFileProcessorManager(max_runs=1)
+        dag_file = DagFileInfo(
+            bundle_name="testing",
+            rel_path=Path("dag_file.py"),
+            bundle_path=TEST_DAGS_FOLDER,
+        )
+        manager._file_stats[dag_file] = DagFileStat(
+            last_finish_time=timezone.utcnow() - timedelta(seconds=5),
+        )
+
+        manager._log_file_processing_stats({"testing": {dag_file}})
+
+        mock_get_team_name.assert_not_called()
+        mock_gauge.assert_any_call(
+            "dag_processing.last_run.seconds_ago",
+            mock.ANY,
+            tags={
+                "file_path": "dag_file.py",
+                "bundle_name": "testing",
+                "file_name": "dag_file",
+            },
+        )
+
+    @conf_vars({("core", "multi_team"): "true"})
+    @mock.patch("airflow.dag_processing.manager.DagBundleModel.get_team_name", 
return_value="team_alpha")
+    @mock.patch("airflow.dag_processing.manager.stats.incr")
+    def test_start_new_processes_includes_team_name(self, mock_incr, 
mock_get_team_name):
+        manager = DagFileProcessorManager(max_runs=1)
+        dag_file = DagFileInfo(
+            bundle_name="testing",
+            rel_path=Path("dag_file.py"),
+            bundle_path=TEST_DAGS_FOLDER,
+        )
+        manager._file_queue[dag_file] = None
+        manager._create_process = MagicMock(return_value=self.mock_processor())
+
+        manager._start_new_processes()
+
+        mock_incr.assert_any_call(
+            "dag_processing.processes",
+            tags={"file_path": "dag_file.py", "action": "start", "team_name": 
"team_alpha"},
+        )
+
+    @conf_vars({("core", "multi_team"): "true"})
+    @mock.patch("airflow.dag_processing.manager.DagBundleModel.get_team_name", 
return_value="team_alpha")
+    @mock.patch("airflow.dag_processing.manager.stats.incr")
+    @mock.patch("airflow.dag_processing.manager.stats.decr")
+    def test_kill_timed_out_processors_includes_team_name(self, mock_decr, 
mock_incr, mock_get_team_name):
+        manager = DagFileProcessorManager(max_runs=1, processor_timeout=5)
+        start_time = time.monotonic() - manager.processor_timeout - 1
+        processor = self.mock_processor(start_time=start_time)
+        dag_file = DagFileInfo(
+            bundle_name="testing", rel_path=Path("dag_file.py"), 
bundle_path=TEST_DAGS_FOLDER
+        )
+        manager._processors = {dag_file: processor}
+
+        with mock.patch.object(type(processor), "kill"):
+            manager._kill_timed_out_processors()
+
+        mock_decr.assert_called_once_with(
+            "dag_processing.processes",
+            tags={"file_path": "dag_file.py", "action": "timeout", 
"team_name": "team_alpha"},
+        )
+        mock_incr.assert_any_call(
+            "dag_processing.processor_timeouts",
+            tags={"file_path": "dag_file.py", "team_name": "team_alpha"},
+        )
+
+    @conf_vars({("core", "multi_team"): "true"})
+    @mock.patch("airflow.dag_processing.manager.DagBundleModel.get_team_name", 
return_value="team_alpha")
+    @mock.patch("airflow.dag_processing.manager.stats.timing")
+    def test_process_parse_results_includes_team_name(self, mock_timing, 
mock_get_team_name):
+        from airflow.dag_processing.manager import process_parse_results
+
+        result = DagFileParsingResult(fileloc="/tmp/dag.py", 
serialized_dags=[])
+        process_parse_results(
+            run_duration=1.5,
+            finish_time=timezone.utcnow(),
+            run_count=0,
+            bundle_name="testing",
+            parsing_result=result,
+            relative_fileloc="dag.py",
+            team_name="team_alpha",
+        )
+
+        mock_timing.assert_called_once_with(
+            "dag_processing.last_duration",
+            1.5,
+            tags={"bundle_name": "testing", "file_name": "dag", "team_name": 
"team_alpha"},
+        )
+
+    @conf_vars({("core", "multi_team"): "false"})
+    @mock.patch("airflow.dag_processing.manager.stats.timing")
+    def test_process_parse_results_omits_team_name_when_none(self, 
mock_timing):
+        from airflow.dag_processing.manager import process_parse_results
+
+        result = DagFileParsingResult(fileloc="/tmp/dag.py", 
serialized_dags=[])
+        process_parse_results(
+            run_duration=1.5,
+            finish_time=timezone.utcnow(),
+            run_count=0,
+            bundle_name="testing",
+            parsing_result=result,
+            relative_fileloc="dag.py",
+            team_name=None,
+        )
+
+        mock_timing.assert_called_once_with(
+            "dag_processing.last_duration",
+            1.5,
+            tags={"bundle_name": "testing", "file_name": "dag"},
+        )
+
+    @pytest.mark.parametrize(
+        ("multi_team", "team_name", "expected_tags"),
+        [
+            pytest.param(
+                "true",
+                "team_alpha",
+                {"file_path": "dag_file.py", "action": "stop", "team_name": 
"team_alpha"},
+                id="with_team",
+            ),
+            pytest.param(
+                "false",
+                None,
+                {"file_path": "dag_file.py", "action": "stop"},
+                id="without_team",
+            ),
+        ],
+    )
+    @mock.patch("airflow.dag_processing.manager.stats.decr")
+    def test_terminate_orphan_processes_includes_team_name(
+        self, mock_decr, multi_team, team_name, expected_tags
+    ):
+        manager = DagFileProcessorManager(max_runs=1)
+        dag_file = DagFileInfo(
+            bundle_name="testing", rel_path=Path("dag_file.py"), 
bundle_path=TEST_DAGS_FOLDER
+        )
+        processor = self.mock_processor()
+        manager._processors = {dag_file: processor}
+
+        with (
+            conf_vars({("core", "multi_team"): multi_team}),
+            
mock.patch("airflow.dag_processing.manager.DagBundleModel.get_team_name", 
return_value=team_name),
+            mock.patch.object(type(processor), "kill"),
+        ):
+            # Empty "present" set means the file is orphaned, so its processor 
is stopped.
+            manager.terminate_orphan_processes(present=set())
+
+        mock_decr.assert_called_once_with("dag_processing.processes", 
tags=expected_tags)
+
+    @pytest.mark.parametrize(
+        ("multi_team", "team_name", "expected_tags"),
+        [
+            pytest.param(
+                "true",
+                "team_alpha",
+                {"file_path": "dag_file.py", "action": "terminate", 
"team_name": "team_alpha"},
+                id="with_team",
+            ),
+            pytest.param(
+                "false",
+                None,
+                {"file_path": "dag_file.py", "action": "terminate"},
+                id="without_team",
+            ),
+        ],
+    )
+    @mock.patch("airflow.dag_processing.manager.stats.decr")
+    def test_terminate_includes_team_name(self, mock_decr, multi_team, 
team_name, expected_tags):
+        manager = DagFileProcessorManager(max_runs=1)
+        dag_file = DagFileInfo(
+            bundle_name="testing", rel_path=Path("dag_file.py"), 
bundle_path=TEST_DAGS_FOLDER
+        )
+        processor = self.mock_processor()
+        manager._processors = {dag_file: processor}
+
+        with (
+            conf_vars({("core", "multi_team"): multi_team}),
+            
mock.patch("airflow.dag_processing.manager.DagBundleModel.get_team_name", 
return_value=team_name),
+            mock.patch.object(type(processor), "kill"),
+        ):
+            manager.terminate()
+
+        mock_decr.assert_called_once_with("dag_processing.processes", 
tags=expected_tags)
+
+    @pytest.mark.parametrize(
+        ("team_name", "expected_tags"),
+        [
+            pytest.param("team_alpha", {"team_name": "team_alpha"}, 
id="with_team"),
+            pytest.param(None, {}, id="without_team"),
+        ],
+    )
+    @mock.patch("airflow.dag_processing.manager.stats.incr")
+    def test_process_parse_results_callback_only_count_includes_team_name(
+        self, mock_incr, team_name, expected_tags
+    ):
+        from airflow.dag_processing.manager import process_parse_results
+
+        process_parse_results(
+            run_duration=1.5,
+            finish_time=timezone.utcnow(),
+            run_count=0,
+            bundle_name="testing",
+            parsing_result=None,
+            is_callback_only=True,
+            relative_fileloc="dag.py",
+            team_name=team_name,
+        )
+
+        
mock_incr.assert_called_once_with("dag_processing.callback_only_count", 
tags=expected_tags)
+
+    @pytest.mark.parametrize(
+        ("multi_team", "team_name", "expected_tags"),
+        [
+            pytest.param("true", "team_alpha", {"team_name": "team_alpha"}, 
id="with_team"),
+            pytest.param("false", None, {}, id="without_team"),
+        ],
+    )
+    @mock.patch("airflow.dag_processing.manager.stats.incr")
+    def test_add_callback_to_queue_includes_team_name(self, mock_incr, 
multi_team, team_name, expected_tags):
+        manager = DagFileProcessorManager(max_runs=1)
+        request = MagicMock(filepath="test_dag.py", bundle_name="testing", 
bundle_version=None)
+        bundle = MagicMock(path=TEST_DAGS_FOLDER)
+
+        with (
+            conf_vars({("core", "multi_team"): multi_team}),
+            mock.patch.object(manager, "prepare_callback_bundle", 
return_value=bundle),
+            mock.patch.object(manager, "_add_files_to_queue"),
+            
mock.patch("airflow.dag_processing.manager.DagBundleModel.get_team_name", 
return_value=team_name),
+        ):
+            manager._add_callback_to_queue(request)
+
+        
mock_incr.assert_called_once_with("dag_processing.other_callback_count", 
tags=expected_tags)
diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py 
b/airflow-core/tests/unit/dag_processing/test_processor.py
index 2e3d6940cc7..599c36d47f6 100644
--- a/airflow-core/tests/unit/dag_processing/test_processor.py
+++ b/airflow-core/tests/unit/dag_processing/test_processor.py
@@ -2149,3 +2149,80 @@ class TestDagFileProcessorProcess:
             "value": "super-secret-value",
             "type": "VariableResult",
         }
+
+
+class TestMultiTeamCallbackMetrics:
+    """Tests for team_name tag on dag.callback_exceptions in multi-team 
mode."""
+
+    @conf_vars({("core", "multi_team"): "true"})
+    @patch("airflow.dag_processing.processor.DagModel.get_team_name", 
return_value="team_alpha")
+    @patch("airflow.dag_processing.processor.stats.incr")
+    def test_callback_exception_includes_team_name(self, mock_incr, 
mock_get_team_name, spy_agency):
+        def failing_callback(context):
+            raise RuntimeError("boom")
+
+        with DAG(dag_id="test_dag", on_failure_callback=failing_callback) as 
dag:
+            BaseOperator(task_id="test_task")
+
+        @spy_agency.spy_for(DagBag.collect_dags, owner=DagBag)
+        def fake_collect_dags(self, *args, **kwargs):
+            self.dags[dag.dag_id] = dag
+
+        dagbag = DagBag()
+        dagbag.collect_dags()
+
+        request = DagCallbackRequest(
+            filepath="test.py",
+            dag_id="test_dag",
+            run_id="test_run",
+            bundle_name="testing",
+            bundle_version=None,
+            is_failure_callback=True,
+            msg="Test failure",
+        )
+
+        log = structlog.get_logger()
+        _execute_dag_callbacks(dagbag, request, log)
+
+        mock_incr.assert_called_once_with(
+            "dag.callback_exceptions",
+            tags={"dag_id": "test_dag", "team_name": "team_alpha"},
+        )
+
+    @conf_vars({("core", "multi_team"): "false"})
+    @patch("airflow.dag_processing.processor.DagModel.get_team_name")
+    @patch("airflow.dag_processing.processor.stats.incr")
+    def test_callback_exception_omits_team_name_when_not_multi_team(
+        self, mock_incr, mock_get_team_name, spy_agency
+    ):
+        def failing_callback(context):
+            raise RuntimeError("boom")
+
+        with DAG(dag_id="test_dag", on_failure_callback=failing_callback) as 
dag:
+            BaseOperator(task_id="test_task")
+
+        @spy_agency.spy_for(DagBag.collect_dags, owner=DagBag)
+        def fake_collect_dags(self, *args, **kwargs):
+            self.dags[dag.dag_id] = dag
+
+        dagbag = DagBag()
+        dagbag.collect_dags()
+
+        request = DagCallbackRequest(
+            filepath="test.py",
+            dag_id="test_dag",
+            run_id="test_run",
+            bundle_name="testing",
+            bundle_version=None,
+            is_failure_callback=True,
+            msg="Test failure",
+        )
+
+        log = structlog.get_logger()
+        _execute_dag_callbacks(dagbag, request, log)
+
+        mock_get_team_name.assert_not_called()
+        mock_incr.assert_called_once_with(
+            "dag.callback_exceptions",
+            tags={"dag_id": "test_dag"},
+        )
diff --git a/airflow-core/tests/unit/models/test_dagrun.py 
b/airflow-core/tests/unit/models/test_dagrun.py
index a8d384d9480..a55b93685d9 100644
--- a/airflow-core/tests/unit/models/test_dagrun.py
+++ b/airflow-core/tests/unit/models/test_dagrun.py
@@ -4118,6 +4118,37 @@ class TestDagRunHandleDagCallback:
         assert "ti" not in context_received
         assert context_received["run_id"] == dr.run_id
 
+    @pytest.mark.parametrize(
+        ("multi_team", "team_name", "expected_tags"),
+        [
+            pytest.param(
+                "true", "team_alpha", {"dag_id": "test_dag", "team_name": 
"team_alpha"}, id="with_team"
+            ),
+            pytest.param("false", None, {"dag_id": "test_dag"}, 
id="without_team"),
+        ],
+    )
+    @mock.patch("airflow._shared.observability.metrics.stats.incr")
+    def test_callback_exception_team_name_tag(
+        self, mock_incr, multi_team, team_name, expected_tags, dag_maker, 
session
+    ):
+        def failing_callback(context):
+            raise RuntimeError("boom")
+
+        with dag_maker("test_dag", session=session, 
on_failure_callback=failing_callback) as dag:
+            BashOperator(task_id="test_task", bash_command="echo 1")
+
+        dr = dag_maker.create_dagrun()
+        dag.on_failure_callback = failing_callback
+        dag.has_on_failure_callback = True
+
+        with (
+            conf_vars({("core", "multi_team"): multi_team}),
+            mock.patch("airflow.models.dag.DagModel.get_team_name", 
return_value=team_name),
+        ):
+            dr.execute_dag_callbacks(dag, success=False)
+
+        mock_incr.assert_any_call("dag.callback_exceptions", 
tags=expected_tags)
+
 
 class TestDagRunTracing:
     """Tests for DagRun OpenTelemetry span behavior."""

Reply via email to