This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new bbb043cde9f Fix team consumer asset filtering (#68242)
bbb043cde9f is described below
commit bbb043cde9fc65ce58242d8db16586ca3364af7f
Author: Vincent <[email protected]>
AuthorDate: Mon Jun 8 17:36:26 2026 -0400
Fix team consumer asset filtering (#68242)
---
.../api_fastapi/core_api/routes/public/assets.py | 2 +-
airflow-core/src/airflow/assets/manager.py | 14 +++++++-------
.../src/airflow/dag_processing/collection.py | 2 +-
airflow-core/src/airflow/serialization/encoders.py | 8 +++++---
.../core_api/routes/public/test_assets.py | 7 +++++++
airflow-core/tests/unit/assets/test_manager.py | 22 ++++++++++++++++++++--
.../tests/unit/dag_processing/test_collection.py | 4 ++--
.../unit/serialization/test_serialized_objects.py | 1 -
.../sdk/definitions/asset/access_control.py | 6 ++++--
.../definitions/test_asset_access_control.py | 2 +-
10 files changed, 48 insertions(+), 20 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
index 9525b1f796d..15af36445dd 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
@@ -380,7 +380,7 @@ def create_asset_event(
if conf.getboolean("core", "multi_team"):
api_user_teams = get_auth_manager().get_authorized_teams(user=user)
if body.access_control:
- api_allow_consumer_teams = body.access_control.consumer_teams or
None
+ api_allow_consumer_teams = body.access_control.consumer_teams
api_allow_global_consumers = body.access_control.allow_global
assets_event = asset_manager.register_asset_change(
diff --git a/airflow-core/src/airflow/assets/manager.py
b/airflow-core/src/airflow/assets/manager.py
index b11698c49c0..5f9e1b598ab 100644
--- a/airflow-core/src/airflow/assets/manager.py
+++ b/airflow-core/src/airflow/assets/manager.py
@@ -197,9 +197,8 @@ class AssetManager(LoggingMixin):
:param asset_model: the AssetModel whose scheduled_dags carry
allow_producer_teams.
:param source_is_api: True if the event was triggered via the REST API
(not a DAG task).
:param session: SQLAlchemy session.
- :param allow_consumer_teams: list of team names allowed to consume.
Empty/None means no filtering.
- :param allow_global_consumers: whether teamless consumers are allowed.
Only applies when
- allow_consumer_teams is non-empty.
+ :param allow_consumer_teams: list of team names allowed to consume.
None means no team-based filtering. Empty list means allowing only the team the
Dag is in (if any).
+ :param allow_global_consumers: whether teamless consumers are allowed.
"""
if not conf.getboolean("core", "multi_team"):
return dags_to_queue
@@ -222,7 +221,7 @@ class AssetManager(LoggingMixin):
ref.dag_id: ref.allow_global_producers for ref in
asset_model.scheduled_dags
}
- has_consumer_team_filter = bool(allow_consumer_teams)
+ has_consumer_team_filter = allow_consumer_teams is not None or not
allow_global_consumers
filtered = set()
for dag in dags_to_queue:
@@ -253,12 +252,13 @@ class AssetManager(LoggingMixin):
continue
# --- Consumer-team filtering (producer-side control) ---
- if has_consumer_team_filter and allow_consumer_teams is not None:
+ if has_consumer_team_filter:
if consumer_team is None:
if not allow_global_consumers:
continue
- elif consumer_team not in allow_consumer_teams:
- continue
+ elif consumer_team not in source_teams:
+ if allow_consumer_teams is not None and consumer_team not
in allow_consumer_teams:
+ continue
filtered.add(dag)
diff --git a/airflow-core/src/airflow/dag_processing/collection.py
b/airflow-core/src/airflow/dag_processing/collection.py
index 29d48266434..16ca8a0ef24 100644
--- a/airflow-core/src/airflow/dag_processing/collection.py
+++ b/airflow-core/src/airflow/dag_processing/collection.py
@@ -1029,7 +1029,7 @@ class AssetModelOperation(NamedTuple):
continue
referenced_outlets = {
(task_id, assets[d.name, d.uri]): (
- d.access_control.get("consumer_teams", []),
+ d.access_control.get("consumer_teams"),
d.access_control.get("allow_global", True),
)
for task_id, d in references
diff --git a/airflow-core/src/airflow/serialization/encoders.py
b/airflow-core/src/airflow/serialization/encoders.py
index 6301891b3c0..d82b9cd359e 100644
--- a/airflow-core/src/airflow/serialization/encoders.py
+++ b/airflow-core/src/airflow/serialization/encoders.py
@@ -210,12 +210,14 @@ def encode_asset_like(a: BaseAsset | SerializedAssetBase)
-> dict[str, Any]:
d["access_control"] = ac
else:
# Asset stores access_control as an AssetAccessControl
instance.
- if ac.producer_teams or ac.consumer_teams or not
ac.allow_global:
- d["access_control"] = {
+ if ac.producer_teams or ac.consumer_teams is not None or not
ac.allow_global:
+ ac_dict: dict[str, Any] = {
"producer_teams": ac.producer_teams,
- "consumer_teams": ac.consumer_teams,
"allow_global": ac.allow_global,
}
+ if ac.consumer_teams is not None:
+ ac_dict["consumer_teams"] = ac.consumer_teams
+ d["access_control"] = ac_dict
return d
case AssetAlias() | SerializedAssetAlias():
return {"__type": DAT.ASSET_ALIAS, "name": a.name, "group":
a.group}
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
index 72224441031..89957e179c2 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
@@ -1459,6 +1459,13 @@ class TestPostAssetEventsTeamResolution(TestAssets):
True,
id="multi_team_enabled_no_access_control",
),
+ pytest.param(
+ "True",
+ {"consumer_teams": []},
+ [],
+ True,
+ id="multi_team_enabled_empty_consumer_teams",
+ ),
pytest.param(
"False",
{"consumer_teams": ["team_ml"], "allow_global": False},
diff --git a/airflow-core/tests/unit/assets/test_manager.py
b/airflow-core/tests/unit/assets/test_manager.py
index e25a1f0c204..47da4d0db2e 100644
--- a/airflow-core/tests/unit/assets/test_manager.py
+++ b/airflow-core/tests/unit/assets/test_manager.py
@@ -807,8 +807,8 @@ class TestFilterDagsByTeam:
{"dag1": ["team_a"]},
[],
True,
- True,
- id="empty_allow_consumer_teams_means_no_consumer_filtering",
+ False,
+ id="empty_allow_consumer_teams_blocks_all_teams",
),
pytest.param(
{"dag1": "team_b"},
@@ -819,6 +819,24 @@ class TestFilterDagsByTeam:
True,
id="none_allow_consumer_teams_means_no_consumer_filtering",
),
+ pytest.param(
+ {},
+ {"team_a"},
+ {},
+ None,
+ False,
+ False,
+ id="teamless_consumer_blocked_when_only_allow_global_false",
+ ),
+ pytest.param(
+ {"dag1": "team_a"},
+ {"team_a"},
+ {"dag1": ["team_a"]},
+ [],
+ False,
+ True,
+ id="same_team_as_producer_always_allowed",
+ ),
],
)
@mock.patch.object(DagModel, "get_dag_id_to_team_name_mapping")
diff --git a/airflow-core/tests/unit/dag_processing/test_collection.py
b/airflow-core/tests/unit/dag_processing/test_collection.py
index eb99dc65f3c..fe15045598f 100644
--- a/airflow-core/tests/unit/dag_processing/test_collection.py
+++ b/airflow-core/tests/unit/dag_processing/test_collection.py
@@ -283,7 +283,7 @@ class TestAssetModelOperation:
@pytest.mark.usefixtures("testing_dag_bundle")
def
test_add_task_outlet_asset_references_defaults_when_no_access_control(self,
dag_maker, session):
- """Outlet references default to empty consumer_teams and
allow_global_consumers=True."""
+ """Outlet references default to None consumer_teams and
allow_global_consumers=True."""
from airflow.models.asset import TaskOutletAssetReference
asset = Asset("plain_asset")
@@ -303,7 +303,7 @@ class TestAssetModelOperation:
select(TaskOutletAssetReference).where(TaskOutletAssetReference.dag_id ==
"plain_producer_dag")
)
assert ref is not None
- assert ref.allow_consumer_teams == []
+ assert ref.allow_consumer_teams is None
assert ref.allow_global_consumers is True
@pytest.mark.parametrize(
diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py
b/airflow-core/tests/unit/serialization/test_serialized_objects.py
index 4801761b74b..1e60848f2d3 100644
--- a/airflow-core/tests/unit/serialization/test_serialized_objects.py
+++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py
@@ -772,7 +772,6 @@ def test_encode_asset_with_access_control():
encoded = encode_asset_like(asset)
assert encoded["access_control"] == {
"producer_teams": ["team_a"],
- "consumer_teams": [],
"allow_global": False,
}
diff --git a/task-sdk/src/airflow/sdk/definitions/asset/access_control.py
b/task-sdk/src/airflow/sdk/definitions/asset/access_control.py
index e642c14f0c5..f6a40bb543b 100644
--- a/task-sdk/src/airflow/sdk/definitions/asset/access_control.py
+++ b/task-sdk/src/airflow/sdk/definitions/asset/access_control.py
@@ -20,6 +20,8 @@ import attrs
def _validate_teams(instance, attribute, value):
+ if value is None:
+ return value
for entry in value:
if not isinstance(entry, str) or not entry or entry.isspace():
raise ValueError(f"Each entry in {attribute.name} must be a
non-empty string")
@@ -34,8 +36,8 @@ class AssetAccessControl:
factory=list,
validator=[_validate_teams],
)
- consumer_teams: list[str] = attrs.field(
- factory=list,
+ consumer_teams: list[str] | None = attrs.field(
+ default=None,
validator=[_validate_teams],
)
allow_global: bool = attrs.field(default=True,
validator=[attrs.validators.instance_of(bool)])
diff --git a/task-sdk/tests/task_sdk/definitions/test_asset_access_control.py
b/task-sdk/tests/task_sdk/definitions/test_asset_access_control.py
index 86e4f916aa6..1abe295306c 100644
--- a/task-sdk/tests/task_sdk/definitions/test_asset_access_control.py
+++ b/task-sdk/tests/task_sdk/definitions/test_asset_access_control.py
@@ -25,7 +25,7 @@ class TestAssetAccessControl:
def test_defaults(self):
ac = AssetAccessControl()
assert ac.producer_teams == []
- assert ac.consumer_teams == []
+ assert ac.consumer_teams is None
assert ac.allow_global is True
def test_explicit_values(self):