This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 678d1dd0ce9 Replace `allow_producer_teams` with `access_control` on 
Asset (#66954)
678d1dd0ce9 is described below

commit 678d1dd0ce9c566a8ada8f2d299bdbfd91142bd2
Author: Vincent <[email protected]>
AuthorDate: Wed May 20 12:19:24 2026 -0400

    Replace `allow_producer_teams` with `access_control` on Asset (#66954)
    
    Replace the flat `allow_producer_teams: list[str]` attribute on the Asset
    class with `access_control: AssetAccessControl`, which bundles producer
    team restrictions and a global-producer toggle into a single object.
---
 .../src/airflow/dag_processing/collection.py       |  4 +-
 .../example_dags/example_asset_allow_teams.py      | 22 ++++----
 airflow-core/src/airflow/serialization/decoders.py |  2 +-
 .../airflow/serialization/definitions/assets.py    |  2 +-
 airflow-core/src/airflow/serialization/encoders.py | 14 ++++-
 .../tests/unit/dag_processing/test_collection.py   | 14 +++--
 .../unit/serialization/test_serialized_objects.py  | 55 ++++++++++++++++++
 task-sdk/docs/api.rst                              |  4 +-
 task-sdk/src/airflow/sdk/__init__.py               |  3 +
 task-sdk/src/airflow/sdk/__init__.pyi              |  2 +
 .../src/airflow/sdk/definitions/asset/__init__.py  | 26 ++++-----
 .../sdk/definitions/asset/access_control.py        | 37 ++++++++++++
 task-sdk/tests/task_sdk/definitions/test_asset.py  | 56 +++++-------------
 .../definitions/test_asset_access_control.py       | 66 ++++++++++++++++++++++
 14 files changed, 228 insertions(+), 79 deletions(-)

diff --git a/airflow-core/src/airflow/dag_processing/collection.py 
b/airflow-core/src/airflow/dag_processing/collection.py
index 361f09871c8..94fb4f84530 100644
--- a/airflow-core/src/airflow/dag_processing/collection.py
+++ b/airflow-core/src/airflow/dag_processing/collection.py
@@ -895,7 +895,9 @@ class AssetModelOperation(NamedTuple):
             if not references:
                 dags[dag_id].schedule_asset_references = []
                 continue
-            referenced_assets = {assets[r.name, r.uri]: r.allow_producer_teams 
for r in references}
+            referenced_assets = {
+                assets[r.name, r.uri]: r.access_control.get("producer_teams", 
[]) for r in references
+            }
             referenced_asset_ids = {a.id for a in referenced_assets}
             orm_refs = {r.asset_id: r for r in 
dags[dag_id].schedule_asset_references}
             for asset_id, ref in orm_refs.items():
diff --git a/airflow-core/src/airflow/example_dags/example_asset_allow_teams.py 
b/airflow-core/src/airflow/example_dags/example_asset_allow_teams.py
index 0dc71db8d20..f547057ce73 100644
--- a/airflow-core/src/airflow/example_dags/example_asset_allow_teams.py
+++ b/airflow-core/src/airflow/example_dags/example_asset_allow_teams.py
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 """
-Example DAG demonstrating cross-team asset triggering with 
``allow_producer_teams``.
+Example DAG demonstrating cross-team asset triggering with 
``AssetAccessControl``.
 
 When Multi-Team mode is enabled (``[core] multi_team = True``), asset events 
are filtered by team
 membership. By default, a consuming DAG only receives events from DAGs within 
the same team.
@@ -23,9 +23,9 @@ membership. By default, a consuming DAG only receives events 
from DAGs within th
 Usage:
     - ``team_analytics_producer`` (belonging to ``team_analytics``) produces 
events on ``shared_data``.
     - ``team_ml_consumer`` (belonging to ``team_ml``) consumes ``shared_data``.
-    - Because ``shared_data`` has ``allow_producer_teams=["team_analytics"]``, 
events from ``team_analytics``
-      are accepted by ``team_ml_consumer``.
-    - Without ``allow_producer_teams``, the cross-team event would be blocked.
+    - Because ``shared_data`` has 
``access_control=AssetAccessControl(producer_teams=["team_analytics"])``,
+      events from ``team_analytics`` are accepted by ``team_ml_consumer``.
+    - Without ``access_control``, the cross-team event would be blocked.
 """
 
 from __future__ import annotations
@@ -33,14 +33,16 @@ from __future__ import annotations
 import pendulum
 
 from airflow.providers.standard.operators.bash import BashOperator
-from airflow.sdk import DAG, Asset
+from airflow.sdk import DAG, Asset, AssetAccessControl
 
-# [START asset_allow_producer_teams]
-# Define an asset that accepts events from team_analytics in addition to the 
consumer's own team.
+# [START asset_access_control]
+# Define an asset that accepts events from team_analytics.
 shared_data = Asset(
     name="shared_data",
     uri="s3://data-lake/shared/output.csv",
-    allow_producer_teams=["team_analytics"],
+    access_control=AssetAccessControl(
+        producer_teams=["team_analytics"],
+    ),
 )
 
 # Producer DAG — belongs to team_analytics (via its DAG bundle configuration).
@@ -60,7 +62,7 @@ with DAG(
 
 # Consumer DAG — belongs to team_ml (via its DAG bundle configuration).
 # This DAG is triggered when shared_data is updated. Because shared_data has
-# allow_producer_teams=["team_analytics"], events from team_analytics are 
accepted.
+# access_control with producer_teams=["team_analytics"], events from 
team_analytics are accepted.
 with DAG(
     dag_id="team_ml_consumer",
     start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
@@ -72,4 +74,4 @@ with DAG(
         task_id="consume_shared_data",
         bash_command="echo 'Consuming shared data from team_analytics'",
     )
-# [END asset_allow_producer_teams]
+# [END asset_access_control]
diff --git a/airflow-core/src/airflow/serialization/decoders.py 
b/airflow-core/src/airflow/serialization/decoders.py
index 683efdc87e6..22683ef5d61 100644
--- a/airflow-core/src/airflow/serialization/decoders.py
+++ b/airflow-core/src/airflow/serialization/decoders.py
@@ -106,7 +106,7 @@ def _decode_asset(var: dict[str, Any]):
             )
             for watcher in watchers
         ],
-        allow_producer_teams=var.get("allow_producer_teams", []),
+        access_control=var.get("access_control", {}),
     )
 
 
diff --git a/airflow-core/src/airflow/serialization/definitions/assets.py 
b/airflow-core/src/airflow/serialization/definitions/assets.py
index a741fabcdec..f848ce0e1c9 100644
--- a/airflow-core/src/airflow/serialization/definitions/assets.py
+++ b/airflow-core/src/airflow/serialization/definitions/assets.py
@@ -120,7 +120,7 @@ class SerializedAsset(SerializedAssetBase):
     group: str
     extra: dict[str, Any]
     watchers: MutableSequence[SerializedAssetWatcher]
-    allow_producer_teams: list[str] = attrs.field(factory=list)
+    access_control: dict[str, Any] = attrs.field(factory=dict)
 
     def as_expression(self) -> Any:
         """
diff --git a/airflow-core/src/airflow/serialization/encoders.py 
b/airflow-core/src/airflow/serialization/encoders.py
index 22384711d1b..741347943ca 100644
--- a/airflow-core/src/airflow/serialization/encoders.py
+++ b/airflow-core/src/airflow/serialization/encoders.py
@@ -191,8 +191,18 @@ def encode_asset_like(a: BaseAsset | SerializedAssetBase) 
-> dict[str, Any]:
             d = {"__type": DAT.ASSET, "name": a.name, "uri": a.uri, "group": 
a.group, "extra": a.extra}
             if a.watchers:
                 d["watchers"] = [{"name": w.name, "trigger": 
encode_trigger(w.trigger)} for w in a.watchers]
-            if a.allow_producer_teams:
-                d["allow_producer_teams"] = a.allow_producer_teams
+            ac = a.access_control
+            if isinstance(ac, dict):
+                # SerializedAsset stores access_control as a plain dict.
+                if ac:
+                    d["access_control"] = ac
+            else:
+                # Asset stores access_control as an AssetAccessControl 
instance.
+                if ac.producer_teams or not ac.allow_global:
+                    d["access_control"] = {
+                        "producer_teams": ac.producer_teams,
+                        "allow_global": ac.allow_global,
+                    }
             return d
         case AssetAlias() | SerializedAssetAlias():
             return {"__type": DAT.ASSET_ALIAS, "name": a.name, "group": 
a.group}
diff --git a/airflow-core/tests/unit/dag_processing/test_collection.py 
b/airflow-core/tests/unit/dag_processing/test_collection.py
index b0a65f3cd9c..eb6efc76152 100644
--- a/airflow-core/tests/unit/dag_processing/test_collection.py
+++ b/airflow-core/tests/unit/dag_processing/test_collection.py
@@ -145,12 +145,16 @@ class TestAssetModelOperation:
 
     @pytest.mark.usefixtures("testing_dag_bundle")
     def 
test_sync_assets_preserves_allow_producer_teams_from_other_bundle(self, 
dag_maker, session):
-        """When a producer bundle (without allow_producer_teams) is synced 
after a consumer bundle
-        (with allow_producer_teams), the stored allow_producer_teams must not 
be wiped out."""
+        """When a producer bundle (without access_control) is synced after a 
consumer bundle
+        (with access_control), the stored allow_producer_teams must not be 
wiped out."""
         from airflow.models.asset import DagScheduleAssetReference
+        from airflow.sdk import AssetAccessControl
 
-        # First sync: consumer bundle sets allow_producer_teams on the asset.
-        consumer_asset = Asset("shared_asset", allow_producer_teams=["team1", 
"team2"])
+        # First sync: consumer bundle sets access_control on the asset.
+        consumer_asset = Asset(
+            "shared_asset",
+            access_control=AssetAccessControl(producer_teams=["team1", 
"team2"]),
+        )
         with dag_maker(dag_id="consumer_dag", schedule=[consumer_asset]) as 
consumer_dag:
             EmptyOperator(task_id="mytask")
 
@@ -167,7 +171,7 @@ class TestAssetModelOperation:
         )
         assert ref.allow_producer_teams == ["team1", "team2"]
 
-        # Second sync: producer bundle references the same asset WITHOUT 
allow_producer_teams.
+        # Second sync: producer bundle references the same asset WITHOUT 
access_control.
         producer_asset = Asset("shared_asset")
         with dag_maker(dag_id="producer_dag", schedule="@once") as 
producer_dag:
             EmptyOperator(task_id="produce", outlets=[producer_asset])
diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py 
b/airflow-core/tests/unit/serialization/test_serialized_objects.py
index e672e93cd23..05463439e1f 100644
--- a/airflow-core/tests/unit/serialization/test_serialized_objects.py
+++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py
@@ -744,6 +744,61 @@ def test_serde_decode_asset_condition_unknown_type():
         decode_asset_like({"__type": "UNKNOWN_TYPE"})
 
 
+def test_encode_asset_with_access_control():
+    from airflow.sdk import Asset, AssetAccessControl
+    from airflow.serialization.encoders import encode_asset_like
+
+    asset = Asset(
+        name="test",
+        uri="s3://bucket/key",
+        access_control=AssetAccessControl(producer_teams=["team_a"], 
allow_global=False),
+    )
+    encoded = encode_asset_like(asset)
+    assert encoded["access_control"] == {"producer_teams": ["team_a"], 
"allow_global": False}
+
+
+def test_encode_asset_without_access_control_omits_key():
+    from airflow.sdk import Asset
+    from airflow.serialization.encoders import encode_asset_like
+
+    asset = Asset(name="test", uri="s3://bucket/key")
+    encoded = encode_asset_like(asset)
+    assert "access_control" not in encoded
+
+
+def test_decode_asset_with_access_control():
+    from airflow.serialization.decoders import decode_asset_like
+
+    decoded = decode_asset_like(
+        {
+            "__type": "asset",
+            "name": "test",
+            "uri": "s3://bucket/key",
+            "group": "asset",
+            "extra": {},
+            "watchers": [],
+            "access_control": {"producer_teams": ["team_a"], "allow_global": 
False},
+        }
+    )
+    assert decoded.access_control == {"producer_teams": ["team_a"], 
"allow_global": False}
+
+
+def test_decode_asset_defaults_access_control_to_empty_dict():
+    from airflow.serialization.decoders import decode_asset_like
+
+    decoded = decode_asset_like(
+        {
+            "__type": "asset",
+            "name": "test",
+            "uri": "s3://bucket/key",
+            "group": "asset",
+            "extra": {},
+            "watchers": [],
+        }
+    )
+    assert decoded.access_control == {}
+
+
 def test_encode_timezone():
     from airflow.serialization.serialized_objects import encode_timezone
 
diff --git a/task-sdk/docs/api.rst b/task-sdk/docs/api.rst
index 6e9ed0a2758..8f3f3de9539 100644
--- a/task-sdk/docs/api.rst
+++ b/task-sdk/docs/api.rst
@@ -181,6 +181,8 @@ Assets
 ------
 .. autoapiclass:: airflow.sdk.Asset
 
+.. autoapiclass:: airflow.sdk.AssetAccessControl
+
 .. autoapiclass:: airflow.sdk.AssetAlias
 
 .. autoapiclass:: airflow.sdk.AssetAll
@@ -281,7 +283,7 @@ Everything else
 .. autoapimodule:: airflow.sdk
   :members:
   :special-members: __version__
-  :exclude-members: BaseAsyncOperator, BaseOperator, DAG, dag, asset, Asset, 
AssetAlias, AssetAll, AssetAny, AssetWatcher, TaskGroup, TaskInstance, XComArg, 
get_current_context, get_parsing_context
+  :exclude-members: BaseAsyncOperator, BaseOperator, DAG, dag, asset, Asset, 
AssetAccessControl, AssetAlias, AssetAll, AssetAny, AssetWatcher, TaskGroup, 
TaskInstance, XComArg, get_current_context, get_parsing_context
   :undoc-members:
   :imported-members:
   :no-index:
diff --git a/task-sdk/src/airflow/sdk/__init__.py 
b/task-sdk/src/airflow/sdk/__init__.py
index d9dcf60b994..eeae86f1eb3 100644
--- a/task-sdk/src/airflow/sdk/__init__.py
+++ b/task-sdk/src/airflow/sdk/__init__.py
@@ -22,6 +22,7 @@ __all__ = [
     "__version__",
     "AllowedKeyMapper",
     "Asset",
+    "AssetAccessControl",
     "AssetAlias",
     "AssetAll",
     "AssetAny",
@@ -122,6 +123,7 @@ if TYPE_CHECKING:
     from airflow.sdk.configuration import AirflowSDKConfigParser
     from airflow.sdk.definitions.asset import (
         Asset,
+        AssetAccessControl,
         AssetAlias,
         AssetAll,
         AssetAny,
@@ -188,6 +190,7 @@ if TYPE_CHECKING:
 __lazy_imports: dict[str, str] = {
     "AllowedKeyMapper": ".definitions.partition_mappers.allowed_key",
     "Asset": ".definitions.asset",
+    "AssetAccessControl": ".definitions.asset",
     "AssetAlias": ".definitions.asset",
     "AssetAll": ".definitions.asset",
     "AssetAny": ".definitions.asset",
diff --git a/task-sdk/src/airflow/sdk/__init__.pyi 
b/task-sdk/src/airflow/sdk/__init__.pyi
index d0b4af5d9e5..a947e3676df 100644
--- a/task-sdk/src/airflow/sdk/__init__.pyi
+++ b/task-sdk/src/airflow/sdk/__init__.pyi
@@ -49,6 +49,7 @@ from airflow.sdk.definitions.asset import (
     AssetAny as AssetAny,
     AssetWatcher as AssetWatcher,
 )
+from airflow.sdk.definitions.asset.access_control import AssetAccessControl as 
AssetAccessControl
 from airflow.sdk.definitions.asset.decorators import asset as asset
 from airflow.sdk.definitions.asset.metadata import Metadata as Metadata
 from airflow.sdk.definitions.connection import Connection as Connection
@@ -112,6 +113,7 @@ __all__ = [
     "__version__",
     "AllowedKeyMapper",
     "Asset",
+    "AssetAccessControl",
     "AssetAlias",
     "AssetAll",
     "AssetAny",
diff --git a/task-sdk/src/airflow/sdk/definitions/asset/__init__.py 
b/task-sdk/src/airflow/sdk/definitions/asset/__init__.py
index 622b1d12dd9..05e1ea08108 100644
--- a/task-sdk/src/airflow/sdk/definitions/asset/__init__.py
+++ b/task-sdk/src/airflow/sdk/definitions/asset/__init__.py
@@ -48,6 +48,7 @@ else:
 
 __all__ = [
     "Asset",
+    "AssetAccessControl",
     "Dataset",
     "Model",
     "AssetAlias",
@@ -60,6 +61,7 @@ __all__ = [
 ]
 
 from airflow.sdk.configuration import conf
+from airflow.sdk.definitions.asset.access_control import AssetAccessControl
 
 log = logging.getLogger(__name__)
 
@@ -240,13 +242,6 @@ class BaseAsset:
         return AssetAll(self, other)
 
 
-def _validate_allow_producer_teams(instance, attribute, value):
-    for entry in value:
-        if not isinstance(entry, str) or not entry or entry.isspace():
-            raise ValueError("Each entry in allow_producer_teams must be a 
non-empty string")
-    return value
-
-
 def _validate_asset_watcher_trigger(instance, attribute, value):
     from airflow.triggers.base import BaseEventTrigger
 
@@ -285,9 +280,8 @@ class Asset(os.PathLike, BaseAsset):
     watchers: list[AssetWatcher] = attrs.field(
         factory=list,
     )
-    allow_producer_teams: list[str] = attrs.field(
-        factory=list,
-        validator=[_validate_allow_producer_teams],
+    access_control: AssetAccessControl = attrs.field(
+        factory=AssetAccessControl,
     )
 
     asset_type: ClassVar[str] = "asset"
@@ -302,7 +296,7 @@ class Asset(os.PathLike, BaseAsset):
         group: str = ...,
         extra: dict[str, JsonValue] | None = None,
         watchers: list[AssetWatcher] = ...,
-        allow_producer_teams: list[str] = ...,
+        access_control: AssetAccessControl = ...,
     ) -> None:
         """Canonical; both name and uri are provided."""
 
@@ -314,7 +308,7 @@ class Asset(os.PathLike, BaseAsset):
         group: str = ...,
         extra: dict[str, JsonValue] | None = None,
         watchers: list[AssetWatcher] = ...,
-        allow_producer_teams: list[str] = ...,
+        access_control: AssetAccessControl = ...,
     ) -> None:
         """It's possible to only provide the name, either by keyword or as the 
only positional argument."""
 
@@ -326,7 +320,7 @@ class Asset(os.PathLike, BaseAsset):
         group: str = ...,
         extra: dict[str, JsonValue] | None = None,
         watchers: list[AssetWatcher] = ...,
-        allow_producer_teams: list[str] = ...,
+        access_control: AssetAccessControl = ...,
     ) -> None:
         """It's possible to only provide the URI as a keyword argument."""
 
@@ -338,7 +332,7 @@ class Asset(os.PathLike, BaseAsset):
         group: str | None = None,
         extra: dict[str, JsonValue] | None = None,
         watchers: list[AssetWatcher] | None = None,
-        allow_producer_teams: list[str] | None = None,
+        access_control: AssetAccessControl | None = None,
     ) -> None:
         if name is None and uri is None:
             raise TypeError("Asset() requires either 'name' or 'uri'")
@@ -360,8 +354,8 @@ class Asset(os.PathLike, BaseAsset):
             kwargs["extra"] = extra
         if watchers is not None:
             kwargs["watchers"] = watchers
-        if allow_producer_teams is not None:
-            kwargs["allow_producer_teams"] = allow_producer_teams
+        if access_control is not None:
+            kwargs["access_control"] = access_control
 
         self.__attrs_init__(name=name, uri=uri, **kwargs)
 
diff --git a/task-sdk/src/airflow/sdk/definitions/asset/access_control.py 
b/task-sdk/src/airflow/sdk/definitions/asset/access_control.py
new file mode 100644
index 00000000000..04c218bfdb6
--- /dev/null
+++ b/task-sdk/src/airflow/sdk/definitions/asset/access_control.py
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import attrs
+
+
+def _validate_producer_teams(instance, attribute, value):
+    for entry in value:
+        if not isinstance(entry, str) or not entry or entry.isspace():
+            raise ValueError("Each entry in producer_teams must be a non-empty 
string")
+    return value
+
+
[email protected]
+class AssetAccessControl:
+    """Access control configuration for an Asset."""
+
+    producer_teams: list[str] = attrs.field(
+        factory=list,
+        validator=[_validate_producer_teams],
+    )
+    allow_global: bool = attrs.field(default=True, 
validator=[attrs.validators.instance_of(bool)])
diff --git a/task-sdk/tests/task_sdk/definitions/test_asset.py 
b/task-sdk/tests/task_sdk/definitions/test_asset.py
index fb6a61954c9..9b1f08b6d8b 100644
--- a/task-sdk/tests/task_sdk/definitions/test_asset.py
+++ b/task-sdk/tests/task_sdk/definitions/test_asset.py
@@ -459,49 +459,21 @@ class TestAssetSubclasses:
         assert obj.group == group
 
 
-class TestAllowProducerTeamsValidationProperty:
-    @pytest.mark.parametrize(
-        "teams",
-        [
-            [""],
-            ["  "],
-            ["\t"],
-            ["\n"],
-            [123],
-            [None],
-            [True],
-            [{}],
-            ["team_a", "  ", "team_b"],
-        ],
-    )
-    def test_rejects_lists_with_invalid_entries(self, teams):
-        with pytest.raises(ValueError, match="allow_producer_teams"):
-            Asset(name="test_asset", allow_producer_teams=teams)
-
-    @pytest.mark.parametrize(
-        "teams",
-        [
-            [],
-            ["team_a"],
-            ["team_a", "team_b"],
-            ["team-with-dashes"],
-            ["team_with_underscores"],
-        ],
-    )
-    def test_accepts_valid_allow_producer_teams(self, teams):
-        asset = Asset(name="test_asset", allow_producer_teams=teams)
-        assert asset.allow_producer_teams == teams
-
+class TestAssetAccessControl:
+    def test_sets_access_control_correctly(self):
+        from airflow.sdk.definitions.asset.access_control import 
AssetAccessControl
 
-class TestAllowProducerTeamsField:
-    def test_sets_field_correctly(self):
-        asset = Asset(name="x", allow_producer_teams=["team_a"])
-        assert asset.allow_producer_teams == ["team_a"]
+        ac = AssetAccessControl(producer_teams=["team_a"], allow_global=False)
+        asset = Asset(name="x", access_control=ac)
+        assert asset.access_control.producer_teams == ["team_a"]
+        assert asset.access_control.allow_global is False
 
-    def test_defaults_to_empty_list(self):
+    def test_defaults_to_default_access_control(self):
         asset = Asset(name="x")
-        assert asset.allow_producer_teams == []
+        assert asset.access_control.producer_teams == []
+        assert asset.access_control.allow_global is True
 
-    def test_explicit_empty_list(self):
-        asset = Asset(name="x", allow_producer_teams=[])
-        assert asset.allow_producer_teams == []
+    def test_explicit_none_uses_default(self):
+        asset = Asset(name="x", access_control=None)
+        assert asset.access_control.producer_teams == []
+        assert asset.access_control.allow_global is True
diff --git a/task-sdk/tests/task_sdk/definitions/test_asset_access_control.py 
b/task-sdk/tests/task_sdk/definitions/test_asset_access_control.py
new file mode 100644
index 00000000000..8524d8b0ccb
--- /dev/null
+++ b/task-sdk/tests/task_sdk/definitions/test_asset_access_control.py
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+
+from airflow.sdk.definitions.asset.access_control import AssetAccessControl
+
+
+class TestAssetAccessControl:
+    def test_defaults(self):
+        ac = AssetAccessControl()
+        assert ac.producer_teams == []
+        assert ac.allow_global is True
+
+    def test_explicit_values(self):
+        ac = AssetAccessControl(producer_teams=["team_a", "team_b"], 
allow_global=False)
+        assert ac.producer_teams == ["team_a", "team_b"]
+        assert ac.allow_global is False
+
+    @pytest.mark.parametrize(
+        "teams",
+        [
+            [""],
+            [123],
+            [None],
+            [True],
+            [{}],
+            ["team_a", "  ", "team_b"],
+        ],
+    )
+    def test_rejects_invalid_producer_teams(self, teams):
+        with pytest.raises(ValueError, match="producer_teams"):
+            AssetAccessControl(producer_teams=teams)
+
+    @pytest.mark.parametrize(
+        "teams",
+        [
+            [],
+            ["team_a"],
+            ["team_a", "team_b"],
+            ["team-with-dashes"],
+            ["team_with_underscores"],
+        ],
+    )
+    def test_accepts_valid_producer_teams(self, teams):
+        ac = AssetAccessControl(producer_teams=teams)
+        assert ac.producer_teams == teams
+
+    def test_allow_global_must_be_bool(self):
+        with pytest.raises(TypeError):
+            AssetAccessControl(allow_global="yes")

Reply via email to