This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 678d1dd0ce9 Replace `allow_producer_teams` with `access_control` on
Asset (#66954)
678d1dd0ce9 is described below
commit 678d1dd0ce9c566a8ada8f2d299bdbfd91142bd2
Author: Vincent <[email protected]>
AuthorDate: Wed May 20 12:19:24 2026 -0400
Replace `allow_producer_teams` with `access_control` on Asset (#66954)
Replace the flat `allow_producer_teams: list[str]` attribute on the Asset
class with `access_control: AssetAccessControl`, which bundles producer
team restrictions and a global-producer toggle into a single object.
---
.../src/airflow/dag_processing/collection.py | 4 +-
.../example_dags/example_asset_allow_teams.py | 22 ++++----
airflow-core/src/airflow/serialization/decoders.py | 2 +-
.../airflow/serialization/definitions/assets.py | 2 +-
airflow-core/src/airflow/serialization/encoders.py | 14 ++++-
.../tests/unit/dag_processing/test_collection.py | 14 +++--
.../unit/serialization/test_serialized_objects.py | 55 ++++++++++++++++++
task-sdk/docs/api.rst | 4 +-
task-sdk/src/airflow/sdk/__init__.py | 3 +
task-sdk/src/airflow/sdk/__init__.pyi | 2 +
.../src/airflow/sdk/definitions/asset/__init__.py | 26 ++++-----
.../sdk/definitions/asset/access_control.py | 37 ++++++++++++
task-sdk/tests/task_sdk/definitions/test_asset.py | 56 +++++-------------
.../definitions/test_asset_access_control.py | 66 ++++++++++++++++++++++
14 files changed, 228 insertions(+), 79 deletions(-)
diff --git a/airflow-core/src/airflow/dag_processing/collection.py
b/airflow-core/src/airflow/dag_processing/collection.py
index 361f09871c8..94fb4f84530 100644
--- a/airflow-core/src/airflow/dag_processing/collection.py
+++ b/airflow-core/src/airflow/dag_processing/collection.py
@@ -895,7 +895,9 @@ class AssetModelOperation(NamedTuple):
if not references:
dags[dag_id].schedule_asset_references = []
continue
- referenced_assets = {assets[r.name, r.uri]: r.allow_producer_teams
for r in references}
+ referenced_assets = {
+ assets[r.name, r.uri]: r.access_control.get("producer_teams",
[]) for r in references
+ }
referenced_asset_ids = {a.id for a in referenced_assets}
orm_refs = {r.asset_id: r for r in
dags[dag_id].schedule_asset_references}
for asset_id, ref in orm_refs.items():
diff --git a/airflow-core/src/airflow/example_dags/example_asset_allow_teams.py
b/airflow-core/src/airflow/example_dags/example_asset_allow_teams.py
index 0dc71db8d20..f547057ce73 100644
--- a/airflow-core/src/airflow/example_dags/example_asset_allow_teams.py
+++ b/airflow-core/src/airflow/example_dags/example_asset_allow_teams.py
@@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
"""
-Example DAG demonstrating cross-team asset triggering with
``allow_producer_teams``.
+Example DAG demonstrating cross-team asset triggering with
``AssetAccessControl``.
When Multi-Team mode is enabled (``[core] multi_team = True``), asset events
are filtered by team
membership. By default, a consuming DAG only receives events from DAGs within
the same team.
@@ -23,9 +23,9 @@ membership. By default, a consuming DAG only receives events
from DAGs within th
Usage:
- ``team_analytics_producer`` (belonging to ``team_analytics``) produces
events on ``shared_data``.
- ``team_ml_consumer`` (belonging to ``team_ml``) consumes ``shared_data``.
- - Because ``shared_data`` has ``allow_producer_teams=["team_analytics"]``,
events from ``team_analytics``
- are accepted by ``team_ml_consumer``.
- - Without ``allow_producer_teams``, the cross-team event would be blocked.
+ - Because ``shared_data`` has
``access_control=AssetAccessControl(producer_teams=["team_analytics"])``,
+ events from ``team_analytics`` are accepted by ``team_ml_consumer``.
+ - Without ``access_control``, the cross-team event would be blocked.
"""
from __future__ import annotations
@@ -33,14 +33,16 @@ from __future__ import annotations
import pendulum
from airflow.providers.standard.operators.bash import BashOperator
-from airflow.sdk import DAG, Asset
+from airflow.sdk import DAG, Asset, AssetAccessControl
-# [START asset_allow_producer_teams]
-# Define an asset that accepts events from team_analytics in addition to the
consumer's own team.
+# [START asset_access_control]
+# Define an asset that accepts events from team_analytics.
shared_data = Asset(
name="shared_data",
uri="s3://data-lake/shared/output.csv",
- allow_producer_teams=["team_analytics"],
+ access_control=AssetAccessControl(
+ producer_teams=["team_analytics"],
+ ),
)
# Producer DAG — belongs to team_analytics (via its DAG bundle configuration).
@@ -60,7 +62,7 @@ with DAG(
# Consumer DAG — belongs to team_ml (via its DAG bundle configuration).
# This DAG is triggered when shared_data is updated. Because shared_data has
-# allow_producer_teams=["team_analytics"], events from team_analytics are
accepted.
+# access_control with producer_teams=["team_analytics"], events from
team_analytics are accepted.
with DAG(
dag_id="team_ml_consumer",
start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
@@ -72,4 +74,4 @@ with DAG(
task_id="consume_shared_data",
bash_command="echo 'Consuming shared data from team_analytics'",
)
-# [END asset_allow_producer_teams]
+# [END asset_access_control]
diff --git a/airflow-core/src/airflow/serialization/decoders.py
b/airflow-core/src/airflow/serialization/decoders.py
index 683efdc87e6..22683ef5d61 100644
--- a/airflow-core/src/airflow/serialization/decoders.py
+++ b/airflow-core/src/airflow/serialization/decoders.py
@@ -106,7 +106,7 @@ def _decode_asset(var: dict[str, Any]):
)
for watcher in watchers
],
- allow_producer_teams=var.get("allow_producer_teams", []),
+ access_control=var.get("access_control", {}),
)
diff --git a/airflow-core/src/airflow/serialization/definitions/assets.py
b/airflow-core/src/airflow/serialization/definitions/assets.py
index a741fabcdec..f848ce0e1c9 100644
--- a/airflow-core/src/airflow/serialization/definitions/assets.py
+++ b/airflow-core/src/airflow/serialization/definitions/assets.py
@@ -120,7 +120,7 @@ class SerializedAsset(SerializedAssetBase):
group: str
extra: dict[str, Any]
watchers: MutableSequence[SerializedAssetWatcher]
- allow_producer_teams: list[str] = attrs.field(factory=list)
+ access_control: dict[str, Any] = attrs.field(factory=dict)
def as_expression(self) -> Any:
"""
diff --git a/airflow-core/src/airflow/serialization/encoders.py
b/airflow-core/src/airflow/serialization/encoders.py
index 22384711d1b..741347943ca 100644
--- a/airflow-core/src/airflow/serialization/encoders.py
+++ b/airflow-core/src/airflow/serialization/encoders.py
@@ -191,8 +191,18 @@ def encode_asset_like(a: BaseAsset | SerializedAssetBase)
-> dict[str, Any]:
d = {"__type": DAT.ASSET, "name": a.name, "uri": a.uri, "group":
a.group, "extra": a.extra}
if a.watchers:
d["watchers"] = [{"name": w.name, "trigger":
encode_trigger(w.trigger)} for w in a.watchers]
- if a.allow_producer_teams:
- d["allow_producer_teams"] = a.allow_producer_teams
+ ac = a.access_control
+ if isinstance(ac, dict):
+ # SerializedAsset stores access_control as a plain dict.
+ if ac:
+ d["access_control"] = ac
+ else:
+ # Asset stores access_control as an AssetAccessControl
instance.
+ if ac.producer_teams or not ac.allow_global:
+ d["access_control"] = {
+ "producer_teams": ac.producer_teams,
+ "allow_global": ac.allow_global,
+ }
return d
case AssetAlias() | SerializedAssetAlias():
return {"__type": DAT.ASSET_ALIAS, "name": a.name, "group":
a.group}
diff --git a/airflow-core/tests/unit/dag_processing/test_collection.py
b/airflow-core/tests/unit/dag_processing/test_collection.py
index b0a65f3cd9c..eb6efc76152 100644
--- a/airflow-core/tests/unit/dag_processing/test_collection.py
+++ b/airflow-core/tests/unit/dag_processing/test_collection.py
@@ -145,12 +145,16 @@ class TestAssetModelOperation:
@pytest.mark.usefixtures("testing_dag_bundle")
def
test_sync_assets_preserves_allow_producer_teams_from_other_bundle(self,
dag_maker, session):
- """When a producer bundle (without allow_producer_teams) is synced
after a consumer bundle
- (with allow_producer_teams), the stored allow_producer_teams must not
be wiped out."""
+ """When a producer bundle (without access_control) is synced after a
consumer bundle
+ (with access_control), the stored allow_producer_teams must not be
wiped out."""
from airflow.models.asset import DagScheduleAssetReference
+ from airflow.sdk import AssetAccessControl
- # First sync: consumer bundle sets allow_producer_teams on the asset.
- consumer_asset = Asset("shared_asset", allow_producer_teams=["team1",
"team2"])
+ # First sync: consumer bundle sets access_control on the asset.
+ consumer_asset = Asset(
+ "shared_asset",
+ access_control=AssetAccessControl(producer_teams=["team1",
"team2"]),
+ )
with dag_maker(dag_id="consumer_dag", schedule=[consumer_asset]) as
consumer_dag:
EmptyOperator(task_id="mytask")
@@ -167,7 +171,7 @@ class TestAssetModelOperation:
)
assert ref.allow_producer_teams == ["team1", "team2"]
- # Second sync: producer bundle references the same asset WITHOUT
allow_producer_teams.
+ # Second sync: producer bundle references the same asset WITHOUT
access_control.
producer_asset = Asset("shared_asset")
with dag_maker(dag_id="producer_dag", schedule="@once") as
producer_dag:
EmptyOperator(task_id="produce", outlets=[producer_asset])
diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py
b/airflow-core/tests/unit/serialization/test_serialized_objects.py
index e672e93cd23..05463439e1f 100644
--- a/airflow-core/tests/unit/serialization/test_serialized_objects.py
+++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py
@@ -744,6 +744,61 @@ def test_serde_decode_asset_condition_unknown_type():
decode_asset_like({"__type": "UNKNOWN_TYPE"})
+def test_encode_asset_with_access_control():
+ from airflow.sdk import Asset, AssetAccessControl
+ from airflow.serialization.encoders import encode_asset_like
+
+ asset = Asset(
+ name="test",
+ uri="s3://bucket/key",
+ access_control=AssetAccessControl(producer_teams=["team_a"],
allow_global=False),
+ )
+ encoded = encode_asset_like(asset)
+ assert encoded["access_control"] == {"producer_teams": ["team_a"],
"allow_global": False}
+
+
+def test_encode_asset_without_access_control_omits_key():
+ from airflow.sdk import Asset
+ from airflow.serialization.encoders import encode_asset_like
+
+ asset = Asset(name="test", uri="s3://bucket/key")
+ encoded = encode_asset_like(asset)
+ assert "access_control" not in encoded
+
+
+def test_decode_asset_with_access_control():
+ from airflow.serialization.decoders import decode_asset_like
+
+ decoded = decode_asset_like(
+ {
+ "__type": "asset",
+ "name": "test",
+ "uri": "s3://bucket/key",
+ "group": "asset",
+ "extra": {},
+ "watchers": [],
+ "access_control": {"producer_teams": ["team_a"], "allow_global":
False},
+ }
+ )
+ assert decoded.access_control == {"producer_teams": ["team_a"],
"allow_global": False}
+
+
+def test_decode_asset_defaults_access_control_to_empty_dict():
+ from airflow.serialization.decoders import decode_asset_like
+
+ decoded = decode_asset_like(
+ {
+ "__type": "asset",
+ "name": "test",
+ "uri": "s3://bucket/key",
+ "group": "asset",
+ "extra": {},
+ "watchers": [],
+ }
+ )
+ assert decoded.access_control == {}
+
+
def test_encode_timezone():
from airflow.serialization.serialized_objects import encode_timezone
diff --git a/task-sdk/docs/api.rst b/task-sdk/docs/api.rst
index 6e9ed0a2758..8f3f3de9539 100644
--- a/task-sdk/docs/api.rst
+++ b/task-sdk/docs/api.rst
@@ -181,6 +181,8 @@ Assets
------
.. autoapiclass:: airflow.sdk.Asset
+.. autoapiclass:: airflow.sdk.AssetAccessControl
+
.. autoapiclass:: airflow.sdk.AssetAlias
.. autoapiclass:: airflow.sdk.AssetAll
@@ -281,7 +283,7 @@ Everything else
.. autoapimodule:: airflow.sdk
:members:
:special-members: __version__
- :exclude-members: BaseAsyncOperator, BaseOperator, DAG, dag, asset, Asset,
AssetAlias, AssetAll, AssetAny, AssetWatcher, TaskGroup, TaskInstance, XComArg,
get_current_context, get_parsing_context
+ :exclude-members: BaseAsyncOperator, BaseOperator, DAG, dag, asset, Asset,
AssetAccessControl, AssetAlias, AssetAll, AssetAny, AssetWatcher, TaskGroup,
TaskInstance, XComArg, get_current_context, get_parsing_context
:undoc-members:
:imported-members:
:no-index:
diff --git a/task-sdk/src/airflow/sdk/__init__.py
b/task-sdk/src/airflow/sdk/__init__.py
index d9dcf60b994..eeae86f1eb3 100644
--- a/task-sdk/src/airflow/sdk/__init__.py
+++ b/task-sdk/src/airflow/sdk/__init__.py
@@ -22,6 +22,7 @@ __all__ = [
"__version__",
"AllowedKeyMapper",
"Asset",
+ "AssetAccessControl",
"AssetAlias",
"AssetAll",
"AssetAny",
@@ -122,6 +123,7 @@ if TYPE_CHECKING:
from airflow.sdk.configuration import AirflowSDKConfigParser
from airflow.sdk.definitions.asset import (
Asset,
+ AssetAccessControl,
AssetAlias,
AssetAll,
AssetAny,
@@ -188,6 +190,7 @@ if TYPE_CHECKING:
__lazy_imports: dict[str, str] = {
"AllowedKeyMapper": ".definitions.partition_mappers.allowed_key",
"Asset": ".definitions.asset",
+ "AssetAccessControl": ".definitions.asset",
"AssetAlias": ".definitions.asset",
"AssetAll": ".definitions.asset",
"AssetAny": ".definitions.asset",
diff --git a/task-sdk/src/airflow/sdk/__init__.pyi
b/task-sdk/src/airflow/sdk/__init__.pyi
index d0b4af5d9e5..a947e3676df 100644
--- a/task-sdk/src/airflow/sdk/__init__.pyi
+++ b/task-sdk/src/airflow/sdk/__init__.pyi
@@ -49,6 +49,7 @@ from airflow.sdk.definitions.asset import (
AssetAny as AssetAny,
AssetWatcher as AssetWatcher,
)
+from airflow.sdk.definitions.asset.access_control import AssetAccessControl as
AssetAccessControl
from airflow.sdk.definitions.asset.decorators import asset as asset
from airflow.sdk.definitions.asset.metadata import Metadata as Metadata
from airflow.sdk.definitions.connection import Connection as Connection
@@ -112,6 +113,7 @@ __all__ = [
"__version__",
"AllowedKeyMapper",
"Asset",
+ "AssetAccessControl",
"AssetAlias",
"AssetAll",
"AssetAny",
diff --git a/task-sdk/src/airflow/sdk/definitions/asset/__init__.py
b/task-sdk/src/airflow/sdk/definitions/asset/__init__.py
index 622b1d12dd9..05e1ea08108 100644
--- a/task-sdk/src/airflow/sdk/definitions/asset/__init__.py
+++ b/task-sdk/src/airflow/sdk/definitions/asset/__init__.py
@@ -48,6 +48,7 @@ else:
__all__ = [
"Asset",
+ "AssetAccessControl",
"Dataset",
"Model",
"AssetAlias",
@@ -60,6 +61,7 @@ __all__ = [
]
from airflow.sdk.configuration import conf
+from airflow.sdk.definitions.asset.access_control import AssetAccessControl
log = logging.getLogger(__name__)
@@ -240,13 +242,6 @@ class BaseAsset:
return AssetAll(self, other)
-def _validate_allow_producer_teams(instance, attribute, value):
- for entry in value:
- if not isinstance(entry, str) or not entry or entry.isspace():
- raise ValueError("Each entry in allow_producer_teams must be a
non-empty string")
- return value
-
-
def _validate_asset_watcher_trigger(instance, attribute, value):
from airflow.triggers.base import BaseEventTrigger
@@ -285,9 +280,8 @@ class Asset(os.PathLike, BaseAsset):
watchers: list[AssetWatcher] = attrs.field(
factory=list,
)
- allow_producer_teams: list[str] = attrs.field(
- factory=list,
- validator=[_validate_allow_producer_teams],
+ access_control: AssetAccessControl = attrs.field(
+ factory=AssetAccessControl,
)
asset_type: ClassVar[str] = "asset"
@@ -302,7 +296,7 @@ class Asset(os.PathLike, BaseAsset):
group: str = ...,
extra: dict[str, JsonValue] | None = None,
watchers: list[AssetWatcher] = ...,
- allow_producer_teams: list[str] = ...,
+ access_control: AssetAccessControl = ...,
) -> None:
"""Canonical; both name and uri are provided."""
@@ -314,7 +308,7 @@ class Asset(os.PathLike, BaseAsset):
group: str = ...,
extra: dict[str, JsonValue] | None = None,
watchers: list[AssetWatcher] = ...,
- allow_producer_teams: list[str] = ...,
+ access_control: AssetAccessControl = ...,
) -> None:
"""It's possible to only provide the name, either by keyword or as the
only positional argument."""
@@ -326,7 +320,7 @@ class Asset(os.PathLike, BaseAsset):
group: str = ...,
extra: dict[str, JsonValue] | None = None,
watchers: list[AssetWatcher] = ...,
- allow_producer_teams: list[str] = ...,
+ access_control: AssetAccessControl = ...,
) -> None:
"""It's possible to only provide the URI as a keyword argument."""
@@ -338,7 +332,7 @@ class Asset(os.PathLike, BaseAsset):
group: str | None = None,
extra: dict[str, JsonValue] | None = None,
watchers: list[AssetWatcher] | None = None,
- allow_producer_teams: list[str] | None = None,
+ access_control: AssetAccessControl | None = None,
) -> None:
if name is None and uri is None:
raise TypeError("Asset() requires either 'name' or 'uri'")
@@ -360,8 +354,8 @@ class Asset(os.PathLike, BaseAsset):
kwargs["extra"] = extra
if watchers is not None:
kwargs["watchers"] = watchers
- if allow_producer_teams is not None:
- kwargs["allow_producer_teams"] = allow_producer_teams
+ if access_control is not None:
+ kwargs["access_control"] = access_control
self.__attrs_init__(name=name, uri=uri, **kwargs)
diff --git a/task-sdk/src/airflow/sdk/definitions/asset/access_control.py
b/task-sdk/src/airflow/sdk/definitions/asset/access_control.py
new file mode 100644
index 00000000000..04c218bfdb6
--- /dev/null
+++ b/task-sdk/src/airflow/sdk/definitions/asset/access_control.py
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import attrs
+
+
+def _validate_producer_teams(instance, attribute, value):
+ for entry in value:
+ if not isinstance(entry, str) or not entry or entry.isspace():
+ raise ValueError("Each entry in producer_teams must be a non-empty
string")
+ return value
+
+
[email protected]
+class AssetAccessControl:
+ """Access control configuration for an Asset."""
+
+ producer_teams: list[str] = attrs.field(
+ factory=list,
+ validator=[_validate_producer_teams],
+ )
+ allow_global: bool = attrs.field(default=True,
validator=[attrs.validators.instance_of(bool)])
diff --git a/task-sdk/tests/task_sdk/definitions/test_asset.py
b/task-sdk/tests/task_sdk/definitions/test_asset.py
index fb6a61954c9..9b1f08b6d8b 100644
--- a/task-sdk/tests/task_sdk/definitions/test_asset.py
+++ b/task-sdk/tests/task_sdk/definitions/test_asset.py
@@ -459,49 +459,21 @@ class TestAssetSubclasses:
assert obj.group == group
-class TestAllowProducerTeamsValidationProperty:
- @pytest.mark.parametrize(
- "teams",
- [
- [""],
- [" "],
- ["\t"],
- ["\n"],
- [123],
- [None],
- [True],
- [{}],
- ["team_a", " ", "team_b"],
- ],
- )
- def test_rejects_lists_with_invalid_entries(self, teams):
- with pytest.raises(ValueError, match="allow_producer_teams"):
- Asset(name="test_asset", allow_producer_teams=teams)
-
- @pytest.mark.parametrize(
- "teams",
- [
- [],
- ["team_a"],
- ["team_a", "team_b"],
- ["team-with-dashes"],
- ["team_with_underscores"],
- ],
- )
- def test_accepts_valid_allow_producer_teams(self, teams):
- asset = Asset(name="test_asset", allow_producer_teams=teams)
- assert asset.allow_producer_teams == teams
-
+class TestAssetAccessControl:
+ def test_sets_access_control_correctly(self):
+ from airflow.sdk.definitions.asset.access_control import
AssetAccessControl
-class TestAllowProducerTeamsField:
- def test_sets_field_correctly(self):
- asset = Asset(name="x", allow_producer_teams=["team_a"])
- assert asset.allow_producer_teams == ["team_a"]
+ ac = AssetAccessControl(producer_teams=["team_a"], allow_global=False)
+ asset = Asset(name="x", access_control=ac)
+ assert asset.access_control.producer_teams == ["team_a"]
+ assert asset.access_control.allow_global is False
- def test_defaults_to_empty_list(self):
+ def test_defaults_to_default_access_control(self):
asset = Asset(name="x")
- assert asset.allow_producer_teams == []
+ assert asset.access_control.producer_teams == []
+ assert asset.access_control.allow_global is True
- def test_explicit_empty_list(self):
- asset = Asset(name="x", allow_producer_teams=[])
- assert asset.allow_producer_teams == []
+ def test_explicit_none_uses_default(self):
+ asset = Asset(name="x", access_control=None)
+ assert asset.access_control.producer_teams == []
+ assert asset.access_control.allow_global is True
diff --git a/task-sdk/tests/task_sdk/definitions/test_asset_access_control.py
b/task-sdk/tests/task_sdk/definitions/test_asset_access_control.py
new file mode 100644
index 00000000000..8524d8b0ccb
--- /dev/null
+++ b/task-sdk/tests/task_sdk/definitions/test_asset_access_control.py
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+
+from airflow.sdk.definitions.asset.access_control import AssetAccessControl
+
+
+class TestAssetAccessControl:
+ def test_defaults(self):
+ ac = AssetAccessControl()
+ assert ac.producer_teams == []
+ assert ac.allow_global is True
+
+ def test_explicit_values(self):
+ ac = AssetAccessControl(producer_teams=["team_a", "team_b"],
allow_global=False)
+ assert ac.producer_teams == ["team_a", "team_b"]
+ assert ac.allow_global is False
+
+ @pytest.mark.parametrize(
+ "teams",
+ [
+ [""],
+ [123],
+ [None],
+ [True],
+ [{}],
+ ["team_a", " ", "team_b"],
+ ],
+ )
+ def test_rejects_invalid_producer_teams(self, teams):
+ with pytest.raises(ValueError, match="producer_teams"):
+ AssetAccessControl(producer_teams=teams)
+
+ @pytest.mark.parametrize(
+ "teams",
+ [
+ [],
+ ["team_a"],
+ ["team_a", "team_b"],
+ ["team-with-dashes"],
+ ["team_with_underscores"],
+ ],
+ )
+ def test_accepts_valid_producer_teams(self, teams):
+ ac = AssetAccessControl(producer_teams=teams)
+ assert ac.producer_teams == teams
+
+ def test_allow_global_must_be_bool(self):
+ with pytest.raises(TypeError):
+ AssetAccessControl(allow_global="yes")