This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new bbb043cde9f Fix team consumer asset filtering (#68242)
bbb043cde9f is described below

commit bbb043cde9fc65ce58242d8db16586ca3364af7f
Author: Vincent <[email protected]>
AuthorDate: Mon Jun 8 17:36:26 2026 -0400

    Fix team consumer asset filtering (#68242)
---
 .../api_fastapi/core_api/routes/public/assets.py   |  2 +-
 airflow-core/src/airflow/assets/manager.py         | 14 +++++++-------
 .../src/airflow/dag_processing/collection.py       |  2 +-
 airflow-core/src/airflow/serialization/encoders.py |  8 +++++---
 .../core_api/routes/public/test_assets.py          |  7 +++++++
 airflow-core/tests/unit/assets/test_manager.py     | 22 ++++++++++++++++++++--
 .../tests/unit/dag_processing/test_collection.py   |  4 ++--
 .../unit/serialization/test_serialized_objects.py  |  1 -
 .../sdk/definitions/asset/access_control.py        |  6 ++++--
 .../definitions/test_asset_access_control.py       |  2 +-
 10 files changed, 48 insertions(+), 20 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
index 9525b1f796d..15af36445dd 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
@@ -380,7 +380,7 @@ def create_asset_event(
     if conf.getboolean("core", "multi_team"):
         api_user_teams = get_auth_manager().get_authorized_teams(user=user)
         if body.access_control:
-            api_allow_consumer_teams = body.access_control.consumer_teams or 
None
+            api_allow_consumer_teams = body.access_control.consumer_teams
             api_allow_global_consumers = body.access_control.allow_global
 
     assets_event = asset_manager.register_asset_change(
diff --git a/airflow-core/src/airflow/assets/manager.py 
b/airflow-core/src/airflow/assets/manager.py
index b11698c49c0..5f9e1b598ab 100644
--- a/airflow-core/src/airflow/assets/manager.py
+++ b/airflow-core/src/airflow/assets/manager.py
@@ -197,9 +197,8 @@ class AssetManager(LoggingMixin):
         :param asset_model: the AssetModel whose scheduled_dags carry 
allow_producer_teams.
         :param source_is_api: True if the event was triggered via the REST API 
(not a DAG task).
         :param session: SQLAlchemy session.
-        :param allow_consumer_teams: list of team names allowed to consume. 
Empty/None means no filtering.
-        :param allow_global_consumers: whether teamless consumers are allowed. 
Only applies when
-            allow_consumer_teams is non-empty.
+        :param allow_consumer_teams: list of team names allowed to consume. 
None means no team-based filtering. Empty list means allowing only the team the 
Dag is in (if any).
+        :param allow_global_consumers: whether teamless consumers are allowed.
         """
         if not conf.getboolean("core", "multi_team"):
             return dags_to_queue
@@ -222,7 +221,7 @@ class AssetManager(LoggingMixin):
             ref.dag_id: ref.allow_global_producers for ref in 
asset_model.scheduled_dags
         }
 
-        has_consumer_team_filter = bool(allow_consumer_teams)
+        has_consumer_team_filter = allow_consumer_teams is not None or not 
allow_global_consumers
 
         filtered = set()
         for dag in dags_to_queue:
@@ -253,12 +252,13 @@ class AssetManager(LoggingMixin):
                 continue
 
             # --- Consumer-team filtering (producer-side control) ---
-            if has_consumer_team_filter and allow_consumer_teams is not None:
+            if has_consumer_team_filter:
                 if consumer_team is None:
                     if not allow_global_consumers:
                         continue
-                elif consumer_team not in allow_consumer_teams:
-                    continue
+                elif consumer_team not in source_teams:
+                    if allow_consumer_teams is not None and consumer_team not 
in allow_consumer_teams:
+                        continue
 
             filtered.add(dag)
 
diff --git a/airflow-core/src/airflow/dag_processing/collection.py 
b/airflow-core/src/airflow/dag_processing/collection.py
index 29d48266434..16ca8a0ef24 100644
--- a/airflow-core/src/airflow/dag_processing/collection.py
+++ b/airflow-core/src/airflow/dag_processing/collection.py
@@ -1029,7 +1029,7 @@ class AssetModelOperation(NamedTuple):
                 continue
             referenced_outlets = {
                 (task_id, assets[d.name, d.uri]): (
-                    d.access_control.get("consumer_teams", []),
+                    d.access_control.get("consumer_teams"),
                     d.access_control.get("allow_global", True),
                 )
                 for task_id, d in references
diff --git a/airflow-core/src/airflow/serialization/encoders.py 
b/airflow-core/src/airflow/serialization/encoders.py
index 6301891b3c0..d82b9cd359e 100644
--- a/airflow-core/src/airflow/serialization/encoders.py
+++ b/airflow-core/src/airflow/serialization/encoders.py
@@ -210,12 +210,14 @@ def encode_asset_like(a: BaseAsset | SerializedAssetBase) 
-> dict[str, Any]:
                     d["access_control"] = ac
             else:
                 # Asset stores access_control as an AssetAccessControl 
instance.
-                if ac.producer_teams or ac.consumer_teams or not 
ac.allow_global:
-                    d["access_control"] = {
+                if ac.producer_teams or ac.consumer_teams is not None or not 
ac.allow_global:
+                    ac_dict: dict[str, Any] = {
                         "producer_teams": ac.producer_teams,
-                        "consumer_teams": ac.consumer_teams,
                         "allow_global": ac.allow_global,
                     }
+                    if ac.consumer_teams is not None:
+                        ac_dict["consumer_teams"] = ac.consumer_teams
+                    d["access_control"] = ac_dict
             return d
         case AssetAlias() | SerializedAssetAlias():
             return {"__type": DAT.ASSET_ALIAS, "name": a.name, "group": 
a.group}
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
index 72224441031..89957e179c2 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
@@ -1459,6 +1459,13 @@ class TestPostAssetEventsTeamResolution(TestAssets):
                 True,
                 id="multi_team_enabled_no_access_control",
             ),
+            pytest.param(
+                "True",
+                {"consumer_teams": []},
+                [],
+                True,
+                id="multi_team_enabled_empty_consumer_teams",
+            ),
             pytest.param(
                 "False",
                 {"consumer_teams": ["team_ml"], "allow_global": False},
diff --git a/airflow-core/tests/unit/assets/test_manager.py 
b/airflow-core/tests/unit/assets/test_manager.py
index e25a1f0c204..47da4d0db2e 100644
--- a/airflow-core/tests/unit/assets/test_manager.py
+++ b/airflow-core/tests/unit/assets/test_manager.py
@@ -807,8 +807,8 @@ class TestFilterDagsByTeam:
                 {"dag1": ["team_a"]},
                 [],
                 True,
-                True,
-                id="empty_allow_consumer_teams_means_no_consumer_filtering",
+                False,
+                id="empty_allow_consumer_teams_blocks_all_teams",
             ),
             pytest.param(
                 {"dag1": "team_b"},
@@ -819,6 +819,24 @@ class TestFilterDagsByTeam:
                 True,
                 id="none_allow_consumer_teams_means_no_consumer_filtering",
             ),
+            pytest.param(
+                {},
+                {"team_a"},
+                {},
+                None,
+                False,
+                False,
+                id="teamless_consumer_blocked_when_only_allow_global_false",
+            ),
+            pytest.param(
+                {"dag1": "team_a"},
+                {"team_a"},
+                {"dag1": ["team_a"]},
+                [],
+                False,
+                True,
+                id="same_team_as_producer_always_allowed",
+            ),
         ],
     )
     @mock.patch.object(DagModel, "get_dag_id_to_team_name_mapping")
diff --git a/airflow-core/tests/unit/dag_processing/test_collection.py 
b/airflow-core/tests/unit/dag_processing/test_collection.py
index eb99dc65f3c..fe15045598f 100644
--- a/airflow-core/tests/unit/dag_processing/test_collection.py
+++ b/airflow-core/tests/unit/dag_processing/test_collection.py
@@ -283,7 +283,7 @@ class TestAssetModelOperation:
 
     @pytest.mark.usefixtures("testing_dag_bundle")
     def 
test_add_task_outlet_asset_references_defaults_when_no_access_control(self, 
dag_maker, session):
-        """Outlet references default to empty consumer_teams and 
allow_global_consumers=True."""
+        """Outlet references default to None consumer_teams and 
allow_global_consumers=True."""
         from airflow.models.asset import TaskOutletAssetReference
 
         asset = Asset("plain_asset")
@@ -303,7 +303,7 @@ class TestAssetModelOperation:
             
select(TaskOutletAssetReference).where(TaskOutletAssetReference.dag_id == 
"plain_producer_dag")
         )
         assert ref is not None
-        assert ref.allow_consumer_teams == []
+        assert ref.allow_consumer_teams is None
         assert ref.allow_global_consumers is True
 
     @pytest.mark.parametrize(
diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py 
b/airflow-core/tests/unit/serialization/test_serialized_objects.py
index 4801761b74b..1e60848f2d3 100644
--- a/airflow-core/tests/unit/serialization/test_serialized_objects.py
+++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py
@@ -772,7 +772,6 @@ def test_encode_asset_with_access_control():
     encoded = encode_asset_like(asset)
     assert encoded["access_control"] == {
         "producer_teams": ["team_a"],
-        "consumer_teams": [],
         "allow_global": False,
     }
 
diff --git a/task-sdk/src/airflow/sdk/definitions/asset/access_control.py 
b/task-sdk/src/airflow/sdk/definitions/asset/access_control.py
index e642c14f0c5..f6a40bb543b 100644
--- a/task-sdk/src/airflow/sdk/definitions/asset/access_control.py
+++ b/task-sdk/src/airflow/sdk/definitions/asset/access_control.py
@@ -20,6 +20,8 @@ import attrs
 
 
 def _validate_teams(instance, attribute, value):
+    if value is None:
+        return value
     for entry in value:
         if not isinstance(entry, str) or not entry or entry.isspace():
             raise ValueError(f"Each entry in {attribute.name} must be a 
non-empty string")
@@ -34,8 +36,8 @@ class AssetAccessControl:
         factory=list,
         validator=[_validate_teams],
     )
-    consumer_teams: list[str] = attrs.field(
-        factory=list,
+    consumer_teams: list[str] | None = attrs.field(
+        default=None,
         validator=[_validate_teams],
     )
     allow_global: bool = attrs.field(default=True, 
validator=[attrs.validators.instance_of(bool)])
diff --git a/task-sdk/tests/task_sdk/definitions/test_asset_access_control.py 
b/task-sdk/tests/task_sdk/definitions/test_asset_access_control.py
index 86e4f916aa6..1abe295306c 100644
--- a/task-sdk/tests/task_sdk/definitions/test_asset_access_control.py
+++ b/task-sdk/tests/task_sdk/definitions/test_asset_access_control.py
@@ -25,7 +25,7 @@ class TestAssetAccessControl:
     def test_defaults(self):
         ac = AssetAccessControl()
         assert ac.producer_teams == []
-        assert ac.consumer_teams == []
+        assert ac.consumer_teams is None
         assert ac.allow_global is True
 
     def test_explicit_values(self):

Reply via email to