This is an automated email from the ASF dual-hosted git repository.

ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8b46c346ed8 Add team_name tag to asset metrics for multi-team 
deployments (#68367)
8b46c346ed8 is described below

commit 8b46c346ed875be381a49ae0e1e1e0c6ea6969a9
Author: D. Ferruzzi <[email protected]>
AuthorDate: Thu Jun 18 09:36:01 2026 -0700

    Add team_name tag to asset metrics for multi-team deployments (#68367)
---
 airflow-core/src/airflow/assets/manager.py         | 10 ++--
 .../src/airflow/jobs/scheduler_job_runner.py       |  7 ++-
 airflow-core/tests/unit/assets/test_manager.py     | 57 +++++++++++++++++++-
 airflow-core/tests/unit/jobs/test_scheduler_job.py | 61 ++++++++++++++++++++++
 4 files changed, 130 insertions(+), 5 deletions(-)

diff --git a/airflow-core/src/airflow/assets/manager.py 
b/airflow-core/src/airflow/assets/manager.py
index f72c533c5a0..c8d2edef4f1 100644
--- a/airflow-core/src/airflow/assets/manager.py
+++ b/airflow-core/src/airflow/assets/manager.py
@@ -44,7 +44,7 @@ from airflow.models.asset import (
 )
 from airflow.models.log import Log
 from airflow.timetables.base import compute_rollup_fingerprint
-from airflow.utils.helpers import is_container
+from airflow.utils.helpers import is_container, prune_dict
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.sqlalchemy import get_dialect_name, with_row_locks
 
@@ -397,7 +397,12 @@ class AssetManager(LoggingMixin):
             )
         )
 
-        stats.incr("asset.updates")
+        team_name = None
+        if task_instance and conf.getboolean("core", "multi_team"):
+            from airflow.models.dag import DagModel
+
+            team_name = DagModel.get_team_name(task_instance.dag_id, 
session=session)
+        stats.incr("asset.updates", tags=prune_dict({"team_name": team_name}))
 
         dags_to_queue = (
             dags_to_queue_from_asset | dags_to_queue_from_asset_alias | 
dags_to_queue_from_asset_ref
@@ -405,7 +410,6 @@ class AssetManager(LoggingMixin):
 
         if conf.getboolean("core", "multi_team"):
             if task_instance:
-                team_name = DagModel.get_team_name(task_instance.dag_id, 
session=session)
                 resolved_source_teams = {team_name} if team_name else set()
                 # Resolve consumer-team filtering from the outlet reference
                 outlet_ref = session.scalar(
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 51178847267..93b155f2257 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -2611,7 +2611,12 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                 creating_job_id=self.job.id,
                 session=session,
             )
-            stats.incr("asset.triggered_dagruns")
+            team_name = (
+                self._get_team_names_for_dag_ids([dag.dag_id], 
session).get(dag.dag_id)
+                if self._multi_team
+                else None
+            )
+            stats.incr("asset.triggered_dagruns", 
tags=prune_dict({"team_name": team_name}))
             dag_run.consumed_asset_events.extend(asset_events)
             self.log.info(
                 "Created asset-triggered DagRun for '%s': run_id=%s, consumed 
%d asset events",
diff --git a/airflow-core/tests/unit/assets/test_manager.py 
b/airflow-core/tests/unit/assets/test_manager.py
index 687f4d17847..40d9401b9ff 100644
--- a/airflow-core/tests/unit/assets/test_manager.py
+++ b/airflow-core/tests/unit/assets/test_manager.py
@@ -29,6 +29,7 @@ from sqlalchemy import delete, func, select
 from sqlalchemy.orm import Session
 
 from airflow import settings
+from airflow._shared.observability.metrics.base_stats_logger import StatsLogger
 from airflow.assets.manager import AssetManager
 from airflow.models.asset import (
     AssetAliasModel,
@@ -40,7 +41,9 @@ from airflow.models.asset import (
     DagScheduleAssetReference,
 )
 from airflow.models.dag import DAG, DagModel
+from airflow.models.dagbundle import DagBundleModel
 from airflow.models.log import Log
+from airflow.models.team import Team
 from airflow.partition_mappers.temporal import FanOutMapper, StartOfWeekMapper
 from airflow.partition_mappers.window import WeekWindow
 from airflow.providers.standard.operators.empty import EmptyOperator
@@ -48,7 +51,11 @@ from airflow.sdk.definitions.asset import Asset
 from airflow.sdk.definitions.timetables.assets import PartitionedAssetTimetable
 
 from tests_common.test_utils.config import conf_vars
-from tests_common.test_utils.db import clear_db_apdr, clear_db_logs, 
clear_db_pakl
+from tests_common.test_utils.db import (
+    clear_db_apdr,
+    clear_db_logs,
+    clear_db_pakl,
+)
 from unit.listeners import asset_listener
 
 pytestmark = pytest.mark.db_test
@@ -670,6 +677,54 @@ def _make_asset_model(
     return model
 
 
+class TestAssetMetricsTeamName:
+    @pytest.mark.parametrize(
+        ("multi_team", "expect_team_tag"),
+        [
+            pytest.param("true", True, id="with_team"),
+            pytest.param("false", False, id="without_team"),
+        ],
+    )
+    @mock.patch("airflow._shared.observability.metrics.stats._get_backend")
+    def test_asset_updates_respects_team_name(
+        self, mock_get_backend, multi_team, expect_team_tag, session, dag_maker
+    ):
+        mock_stats = mock.MagicMock(spec=StatsLogger)
+        mock_get_backend.return_value = mock_stats
+
+        suffix = "with_team" if expect_team_tag else "without_team"
+
+        team_name = f"team_asset_upd_{suffix}"
+        team = Team(name=team_name)
+        session.add(team)
+        session.flush()
+
+        bundle_name = f"bundle_asset_upd_{suffix}"
+        bundle = DagBundleModel(name=bundle_name)
+        bundle.teams.append(team)
+        session.add(bundle)
+        session.flush()
+
+        asset_name = f"metric_asset_{suffix}"
+        asset = Asset(uri=f"test://{asset_name}", name=asset_name, 
group="asset")
+        with dag_maker(dag_id=f"asset_dag_{suffix}", bundle_name=bundle_name, 
session=session):
+            EmptyOperator(task_id="task1", outlets=[asset])
+
+        ti = mock.MagicMock()
+        ti.dag_id = f"asset_dag_{suffix}"
+        ti.task_id = "task1"
+        ti.run_id = "run1"
+        ti.map_index = -1
+
+        with conf_vars({("core", "multi_team"): multi_team}):
+            AssetManager().register_asset_change(task_instance=ti, 
asset=asset, session=session)
+
+        if expect_team_tag:
+            mock_stats.incr.assert_any_call("asset.updates", 
tags={"team_name": team_name})
+        else:
+            mock_stats.incr.assert_any_call("asset.updates")
+
+
 class TestFilterDagsByTeam:
     @conf_vars({("core", "multi_team"): "false"})
     def test_multi_team_disabled_returns_all_dags(self):
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index af1b34423da..b312f9fd1b0 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -5667,6 +5667,67 @@ class TestSchedulerJob:
         assert created_run.data_interval_end is None
         assert created_run.creating_job_id == scheduler_job.id
 
+    @pytest.mark.parametrize(
+        ("multi_team", "expect_team_tag"),
+        [
+            pytest.param("true", True, id="with_team"),
+            pytest.param("false", False, id="without_team"),
+        ],
+    )
+    @mock.patch("airflow._shared.observability.metrics.stats._get_backend")
+    def test_asset_triggered_dagruns_respects_team_name(
+        self, mock_get_backend, multi_team, expect_team_tag, session, dag_maker
+    ):
+        mock_stats = mock.MagicMock(spec=StatsLogger)
+        mock_get_backend.return_value = mock_stats
+
+        suffix = "with_team" if expect_team_tag else "without_team"
+
+        team_name = f"team_asset_trig_{suffix}"
+        team = Team(name=team_name)
+        session.add(team)
+        session.flush()
+
+        bundle_name = f"bundle_asset_trig_{suffix}"
+        bundle = DagBundleModel(name=bundle_name)
+        bundle.teams.append(team)
+        session.add(bundle)
+        session.commit()
+
+        asset_name = f"test_team_asset_{suffix}"
+        asset = Asset(uri=f"test://{asset_name}", name=asset_name, 
group="test_group")
+        with dag_maker(dag_id=f"producer_{suffix}", bundle_name=bundle_name, 
session=session):
+            BashOperator(task_id="task", bash_command="echo 1", 
outlets=[asset])
+        dr = dag_maker.create_dagrun()
+
+        asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri 
== asset.uri))
+        event = AssetEvent(
+            asset_id=asset_id,
+            source_task_id="task",
+            source_dag_id=dr.dag_id,
+            source_run_id=dr.run_id,
+            source_map_index=-1,
+        )
+        session.add(event)
+
+        with dag_maker(
+            dag_id=f"consumer_{suffix}", schedule=[asset], 
bundle_name=bundle_name, session=session
+        ):
+            pass
+
+        session.add(AssetDagRunQueue(asset_id=asset_id, 
target_dag_id=f"consumer_{suffix}"))
+        session.flush()
+
+        with conf_vars({("core", "multi_team"): multi_team}):
+            scheduler_job = Job()
+            self.job_runner = SchedulerJobRunner(job=scheduler_job, 
executors=[self.null_exec])
+            self.job_runner._create_dagruns_for_dags(session, session)
+
+        if expect_team_tag:
+            mock_stats.incr.assert_any_call("asset.triggered_dagruns", 
tags={"team_name": team_name})
+        else:
+            mock_stats.incr.assert_any_call("asset.triggered_dagruns")
+
     @pytest.mark.need_serialized_dag
     @pytest.mark.parametrize(
         ("disable", "enable"),

Reply via email to