This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c82c945e430 Apply consumer team filtering (#68025)
c82c945e430 is described below

commit c82c945e430a54be87ab4926b98ec5d93fa8045f
Author: Vincent <[email protected]>
AuthorDate: Thu Jun 4 13:00:08 2026 -0700

    Apply consumer team filtering (#68025)
---
 airflow-core/src/airflow/assets/manager.py     |  79 ++++++++++++++-----
 airflow-core/tests/unit/assets/test_manager.py | 104 +++++++++++++++++++++++++
 2 files changed, 165 insertions(+), 18 deletions(-)

diff --git a/airflow-core/src/airflow/assets/manager.py 
b/airflow-core/src/airflow/assets/manager.py
index 7c12c31f979..2df2e06e82f 100644
--- a/airflow-core/src/airflow/assets/manager.py
+++ b/airflow-core/src/airflow/assets/manager.py
@@ -40,6 +40,7 @@ from airflow.models.asset import (
     DagScheduleAssetReference,
     DagScheduleAssetUriReference,
     PartitionedAssetKeyLog,
+    TaskOutletAssetReference,
 )
 from airflow.models.log import Log
 from airflow.utils.helpers import is_container
@@ -181,15 +182,24 @@ class AssetManager(LoggingMixin):
         source_is_api: bool,
         *,
         session: Session,
+        allow_consumer_teams: list[str] | None = None,
+        allow_global_consumers: bool = True,
     ) -> set[DagModel]:
         """
         Filter consuming DAGs based on team membership when multi_team is 
enabled.
 
+        Both producer-team filtering (consumer decides which producers it 
accepts) and
+        consumer-team filtering (producer decides which consumers may receive 
its events)
+        must pass for a DAG to be queued (logical AND).
+
         :param dags_to_queue: set of DagModel instances to potentially queue.
         :param source_teams: set of team names the source belongs to. Empty 
set means teamless.
         :param asset_model: the AssetModel whose scheduled_dags carry 
allow_producer_teams.
         :param source_is_api: True if the event was triggered via the REST API 
(not a DAG task).
         :param session: SQLAlchemy session.
+        :param allow_consumer_teams: list of team names allowed to consume. 
Empty/None means no filtering.
+        :param allow_global_consumers: whether teamless consumers are allowed. 
Only applies when
+            allow_consumer_teams is non-empty.
         """
         if not conf.getboolean("core", "multi_team"):
             return dags_to_queue
@@ -212,35 +222,46 @@ class AssetManager(LoggingMixin):
             ref.dag_id: ref.allow_global_producers for ref in 
asset_model.scheduled_dags
         }
 
+        has_consumer_team_filter = bool(allow_consumer_teams)
+
         filtered = set()
         for dag in dags_to_queue:
             consumer_team = dag_id_to_team.get(dag.dag_id)
 
+            # --- Producer-team filtering (consumer-side control) ---
+            producer_pass = False
             if consumer_team is None:
                 # Teamless consumer accepts events from any source
-                filtered.add(dag)
-                continue
-
-            if is_teamless_source:
+                producer_pass = True
+            elif is_teamless_source:
                 if source_is_api:
                     # Teamless API user can only trigger teamless consumers
-                    continue
-                # Teamless DAG producer — check allow_global_producers
-                if dag_id_to_allow_global.get(dag.dag_id, True):
-                    filtered.add(dag)
-                continue
-
-            if consumer_team in source_teams:
+                    producer_pass = False
+                else:
+                    # Teamless DAG producer — check allow_global_producers
+                    producer_pass = dag_id_to_allow_global.get(dag.dag_id, 
True)
+            elif consumer_team in source_teams:
                 # Same team
-                filtered.add(dag)
-                continue
+                producer_pass = True
+            else:
+                allow_producer_teams = dag_id_to_allow_teams.get(dag.dag_id, 
[])
+                if source_teams & set(allow_producer_teams):
+                    # Cross-team via allow_producer_teams
+                    producer_pass = True
 
-            allow_producer_teams = dag_id_to_allow_teams.get(dag.dag_id, [])
-            if source_teams & set(allow_producer_teams):
-                # Cross-team via allow_producer_teams
-                filtered.add(dag)
+            if not producer_pass:
                 continue
 
+            # --- Consumer-team filtering (producer-side control) ---
+            if has_consumer_team_filter and allow_consumer_teams is not None:
+                if consumer_team is None:
+                    if not allow_global_consumers:
+                        continue
+                elif consumer_team not in allow_consumer_teams:
+                    continue
+
+            filtered.add(dag)
+
         return filtered
 
     @classmethod
@@ -255,6 +276,8 @@ class AssetManager(LoggingMixin):
         partition_key: str | None = None,
         source_is_api: bool = False,
         api_user_teams: set[str] | None = None,
+        api_allow_consumer_teams: list[str] | None = None,
+        api_allow_global_consumers: bool = True,
         **kwargs,
     ) -> AssetEvent | None:
         """
@@ -266,14 +289,20 @@ class AssetManager(LoggingMixin):
         When multi_team mode is enabled, team-based filtering is applied to 
determine which
         consumer DAGs should be queued:
         - For DAG-produced events (task_instance is set), source teams are 
resolved automatically
-          from the producing DAG's bundle.
+          from the producing DAG's bundle. Consumer-team filtering is resolved 
from the
+          TaskOutletAssetReference for the producing task.
         - For API-produced events (source_is_api=True), ``api_user_teams`` 
must be provided explicitly.
+          Consumer-team filtering uses ``api_allow_consumer_teams`` and 
``api_allow_global_consumers``.
 
         :param source_is_api: True if the event originates from the REST API 
rather than
             a DAG task execution.
         :param api_user_teams: Teams of the API user triggering the event. 
Only used when
             source_is_api=True. Ignored when task_instance is provided (teams 
are resolved
             from the DAG's bundle instead).
+        :param api_allow_consumer_teams: Consumer teams allowed by an 
API-triggered event.
+            Only used when source_is_api=True.
+        :param api_allow_global_consumers: Whether teamless consumers are 
allowed for an
+            API-triggered event. Only used when source_is_api=True. Defaults 
to True.
         """
         from airflow.models.dag import DagModel
 
@@ -378,14 +407,28 @@ class AssetManager(LoggingMixin):
             if task_instance:
                 team_name = DagModel.get_team_name(task_instance.dag_id, 
session=session)
                 resolved_source_teams = {team_name} if team_name else set()
+                # Resolve consumer-team filtering from the outlet reference
+                outlet_ref = session.scalar(
+                    select(TaskOutletAssetReference).where(
+                        TaskOutletAssetReference.dag_id == 
task_instance.dag_id,
+                        TaskOutletAssetReference.task_id == 
task_instance.task_id,
+                        TaskOutletAssetReference.asset_id == asset_model.id,
+                    )
+                )
+                resolved_consumer_teams = outlet_ref.allow_consumer_teams if 
outlet_ref else None
+                resolved_global_consumers = outlet_ref.allow_global_consumers 
if outlet_ref else True
             else:
                 resolved_source_teams = api_user_teams or set()
+                resolved_consumer_teams = api_allow_consumer_teams
+                resolved_global_consumers = api_allow_global_consumers
             dags_to_queue = cls._filter_dags_by_team(
                 dags_to_queue=dags_to_queue,
                 source_teams=resolved_source_teams,
                 asset_model=asset_model,
                 source_is_api=source_is_api,
                 session=session,
+                allow_consumer_teams=resolved_consumer_teams,
+                allow_global_consumers=resolved_global_consumers,
             )
 
         log.debug("asset event added", asset_event=asset_event, 
dags_to_queue=dags_to_queue)
diff --git a/airflow-core/tests/unit/assets/test_manager.py 
b/airflow-core/tests/unit/assets/test_manager.py
index 8f9d290a2b0..69439812ec0 100644
--- a/airflow-core/tests/unit/assets/test_manager.py
+++ b/airflow-core/tests/unit/assets/test_manager.py
@@ -581,3 +581,107 @@ class TestFilterDagsByTeam:
             )
 
         assert dag_with_team not in result
+
+    @conf_vars({("core", "multi_team"): "true"})
+    @pytest.mark.parametrize(
+        (
+            "team_mapping",
+            "source_teams",
+            "scheduled_dags",
+            "allow_consumer_teams",
+            "allow_global_consumers",
+            "expected_in",
+        ),
+        [
+            pytest.param(
+                {"dag1": "team_b"},
+                {"team_a"},
+                {"dag1": ["team_a"]},
+                ["team_a"],
+                True,
+                False,
+                id="consumer_blocked_when_team_not_in_allow_consumer_teams",
+            ),
+            pytest.param(
+                {"dag1": "team_b"},
+                {"team_a"},
+                {"dag1": ["team_a"]},
+                ["team_a", "team_b"],
+                True,
+                True,
+                id="consumer_allowed_when_team_in_allow_consumer_teams",
+            ),
+            pytest.param(
+                {},
+                {"team_a"},
+                {},
+                ["team_b"],
+                True,
+                True,
+                id="teamless_consumer_passes_when_allow_global_consumers_true",
+            ),
+            pytest.param(
+                {},
+                {"team_a"},
+                {},
+                ["team_b"],
+                False,
+                False,
+                
id="teamless_consumer_blocked_when_allow_global_consumers_false",
+            ),
+            pytest.param(
+                {"dag1": "team_b"},
+                {"team_a"},
+                {"dag1": []},
+                ["team_b"],
+                True,
+                False,
+                id="both_filters_must_pass_and_logic",
+            ),
+            pytest.param(
+                {"dag1": "team_b"},
+                {"team_a"},
+                {"dag1": ["team_a"]},
+                [],
+                True,
+                True,
+                id="empty_allow_consumer_teams_means_no_consumer_filtering",
+            ),
+            pytest.param(
+                {"dag1": "team_b"},
+                {"team_a"},
+                {"dag1": ["team_a"]},
+                None,
+                True,
+                True,
+                id="none_allow_consumer_teams_means_no_consumer_filtering",
+            ),
+        ],
+    )
+    @mock.patch.object(DagModel, "get_dag_id_to_team_name_mapping")
+    def test_consumer_team_filtering(
+        self,
+        mock_mapping,
+        team_mapping,
+        source_teams,
+        scheduled_dags,
+        allow_consumer_teams,
+        allow_global_consumers,
+        expected_in,
+    ):
+        dag = _make_dag("dag1")
+        mock_mapping.return_value = team_mapping
+
+        result = AssetManager._filter_dags_by_team(
+            dags_to_queue={dag},
+            source_teams=source_teams,
+            asset_model=_make_asset_model(scheduled_dags=scheduled_dags)
+            if scheduled_dags
+            else _make_asset_model(),
+            source_is_api=False,
+            session=mock.Mock(),
+            allow_consumer_teams=allow_consumer_teams,
+            allow_global_consumers=allow_global_consumers,
+        )
+
+        assert (dag in result) == expected_in

Reply via email to