This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a13fd1841c3 Updated `create_asset_event` endpoint to pass the user 
teams (#66367)
a13fd1841c3 is described below

commit a13fd1841c33d189bb238005adfef2ea1da8437b
Author: Vincent <[email protected]>
AuthorDate: Tue May 5 15:48:09 2026 -0400

    Updated `create_asset_event` endpoint to pass the user teams (#66367)
---
 .../api_fastapi/core_api/routes/public/assets.py   |  8 +++
 airflow-core/src/airflow/assets/manager.py         |  4 +-
 .../core_api/routes/public/test_assets.py          | 58 ++++++++++++++++++++++
 3 files changed, 68 insertions(+), 2 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
index e5d9d4bf228..db306e189e3 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
@@ -72,6 +72,7 @@ from airflow.api_fastapi.core_api.security import (
 )
 from airflow.api_fastapi.logging.decorators import action_logging
 from airflow.assets.manager import asset_manager
+from airflow.configuration import conf
 from airflow.models.asset import (
     AssetAliasModel,
     AssetDagRunQueue,
@@ -364,6 +365,7 @@ def get_asset_events(
 def create_asset_event(
     body: CreateAssetEventsBody,
     session: SessionDep,
+    user: GetUserDep,
 ) -> AssetEventResponse:
     """Create asset events."""
     asset_model = session.scalar(select(AssetModel).where(AssetModel.id == 
body.asset_id).limit(1))
@@ -371,11 +373,17 @@ def create_asset_event(
         raise HTTPException(status.HTTP_404_NOT_FOUND, f"Asset with ID: 
`{body.asset_id}` was not found")
     timestamp = timezone.utcnow()
 
+    api_user_teams: set[str] = set()
+    if conf.getboolean("core", "multi_team"):
+        api_user_teams = get_auth_manager().get_authorized_teams(user=user)
+
     assets_event = asset_manager.register_asset_change(
         asset=asset_model,
         timestamp=timestamp,
         extra=body.extra,
         partition_key=body.partition_key,
+        source_is_api=True,
+        api_user_teams=api_user_teams,
         session=session,
     )
 
diff --git a/airflow-core/src/airflow/assets/manager.py 
b/airflow-core/src/airflow/assets/manager.py
index f1730fb2e81..bcfd7cfd061 100644
--- a/airflow-core/src/airflow/assets/manager.py
+++ b/airflow-core/src/airflow/assets/manager.py
@@ -17,7 +17,7 @@
 # under the License.
 from __future__ import annotations
 
-from collections.abc import Collection, Iterable, Set as AbstractSet
+from collections.abc import Collection, Iterable
 from contextlib import contextmanager
 from typing import TYPE_CHECKING
 
@@ -176,7 +176,7 @@ class AssetManager(LoggingMixin):
     def _filter_dags_by_team(
         cls,
         dags_to_queue: set[DagModel],
-        source_teams: AbstractSet[str],
+        source_teams: set[str],
         asset_model: AssetModel,
         source_is_api: bool,
         *,
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
index 26fe24888ac..45ae5cbf61a 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
@@ -1389,6 +1389,64 @@ class TestPostAssetEvents(TestAssets):
                 assert asset["last_asset_event"]["timestamp"] is None
 
 
+class TestPostAssetEventsTeamResolution(TestAssets):
+    """Tests for team-based filtering in create_asset_event."""
+
+    _ROUTE = "airflow.api_fastapi.core_api.routes.public.assets"
+
+    def _make_mock_event(self, asset):
+        m = mock.MagicMock(
+            spec=AssetEvent,
+            id=1,
+            asset_id=asset.id,
+            uri=asset.uri,
+            group=asset.group,
+            extra={"from_rest_api": True},
+            source_map_index=-1,
+            timestamp=DEFAULT_DATE,
+            source_task_id=None,
+            source_dag_id=None,
+            source_run_id=None,
+            partition_key=None,
+            created_dagruns=[],
+        )
+        # MagicMock uses 'name' internally for repr, so it must be set 
separately.
+        m.name = asset.name
+        return m
+
+    @pytest.mark.usefixtures("time_freezer")
+    @pytest.mark.parametrize(
+        ("multi_team", "expected_teams"),
+        [
+            pytest.param(True, {"team_a", "team_b"}, id="enabled"),
+            pytest.param(False, set(), id="disabled"),
+        ],
+    )
+    def test_team_resolution(self, test_client, session, multi_team, 
expected_teams):
+        (asset,) = self.create_assets(session, num=1)
+        mock_auth_mgr = mock.MagicMock()
+        mock_auth_mgr.get_authorized_teams.return_value = {"team_a", "team_b"}
+
+        with (
+            mock.patch(
+                f"{self._ROUTE}.conf.getboolean",
+                side_effect=lambda s, k, **kw: multi_team if k == "multi_team" 
else kw.get("fallback"),
+            ),
+            mock.patch(f"{self._ROUTE}.get_auth_manager", 
return_value=mock_auth_mgr),
+            mock.patch(
+                f"{self._ROUTE}.asset_manager.register_asset_change",
+                spec=True,
+                return_value=self._make_mock_event(asset),
+            ) as mock_register,
+        ):
+            response = test_client.post("/assets/events", json={"asset_id": 
asset.id, "extra": {}})
+
+        assert response.status_code == 200
+        call_kwargs = mock_register.call_args.kwargs
+        assert call_kwargs["source_is_api"] is True
+        assert call_kwargs["api_user_teams"] == expected_teams
+
+
 @pytest.mark.need_serialized_dag
 class TestPostAssetMaterialize(TestAssets):
     DAG_ASSET1_ID = "test_dag_1"

Reply via email to