This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a13fd1841c3 Updated `create_asset_event` endpoint to pass the user
teams (#66367)
a13fd1841c3 is described below
commit a13fd1841c33d189bb238005adfef2ea1da8437b
Author: Vincent <[email protected]>
AuthorDate: Tue May 5 15:48:09 2026 -0400
Updated `create_asset_event` endpoint to pass the user teams (#66367)
---
.../api_fastapi/core_api/routes/public/assets.py | 8 +++
airflow-core/src/airflow/assets/manager.py | 4 +-
.../core_api/routes/public/test_assets.py | 58 ++++++++++++++++++++++
3 files changed, 68 insertions(+), 2 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
index e5d9d4bf228..db306e189e3 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
@@ -72,6 +72,7 @@ from airflow.api_fastapi.core_api.security import (
)
from airflow.api_fastapi.logging.decorators import action_logging
from airflow.assets.manager import asset_manager
+from airflow.configuration import conf
from airflow.models.asset import (
AssetAliasModel,
AssetDagRunQueue,
@@ -364,6 +365,7 @@ def get_asset_events(
def create_asset_event(
body: CreateAssetEventsBody,
session: SessionDep,
+ user: GetUserDep,
) -> AssetEventResponse:
"""Create asset events."""
asset_model = session.scalar(select(AssetModel).where(AssetModel.id ==
body.asset_id).limit(1))
@@ -371,11 +373,17 @@ def create_asset_event(
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Asset with ID:
`{body.asset_id}` was not found")
timestamp = timezone.utcnow()
+ api_user_teams: set[str] = set()
+ if conf.getboolean("core", "multi_team"):
+ api_user_teams = get_auth_manager().get_authorized_teams(user=user)
+
assets_event = asset_manager.register_asset_change(
asset=asset_model,
timestamp=timestamp,
extra=body.extra,
partition_key=body.partition_key,
+ source_is_api=True,
+ api_user_teams=api_user_teams,
session=session,
)
diff --git a/airflow-core/src/airflow/assets/manager.py
b/airflow-core/src/airflow/assets/manager.py
index f1730fb2e81..bcfd7cfd061 100644
--- a/airflow-core/src/airflow/assets/manager.py
+++ b/airflow-core/src/airflow/assets/manager.py
@@ -17,7 +17,7 @@
# under the License.
from __future__ import annotations
-from collections.abc import Collection, Iterable, Set as AbstractSet
+from collections.abc import Collection, Iterable
from contextlib import contextmanager
from typing import TYPE_CHECKING
@@ -176,7 +176,7 @@ class AssetManager(LoggingMixin):
def _filter_dags_by_team(
cls,
dags_to_queue: set[DagModel],
- source_teams: AbstractSet[str],
+ source_teams: set[str],
asset_model: AssetModel,
source_is_api: bool,
*,
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
index 26fe24888ac..45ae5cbf61a 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
@@ -1389,6 +1389,64 @@ class TestPostAssetEvents(TestAssets):
assert asset["last_asset_event"]["timestamp"] is None
+class TestPostAssetEventsTeamResolution(TestAssets):
+ """Tests for team-based filtering in create_asset_event."""
+
+ _ROUTE = "airflow.api_fastapi.core_api.routes.public.assets"
+
+ def _make_mock_event(self, asset):
+ m = mock.MagicMock(
+ spec=AssetEvent,
+ id=1,
+ asset_id=asset.id,
+ uri=asset.uri,
+ group=asset.group,
+ extra={"from_rest_api": True},
+ source_map_index=-1,
+ timestamp=DEFAULT_DATE,
+ source_task_id=None,
+ source_dag_id=None,
+ source_run_id=None,
+ partition_key=None,
+ created_dagruns=[],
+ )
+ # MagicMock uses 'name' internally for repr, so it must be set
separately.
+ m.name = asset.name
+ return m
+
+ @pytest.mark.usefixtures("time_freezer")
+ @pytest.mark.parametrize(
+ ("multi_team", "expected_teams"),
+ [
+ pytest.param(True, {"team_a", "team_b"}, id="enabled"),
+ pytest.param(False, set(), id="disabled"),
+ ],
+ )
+ def test_team_resolution(self, test_client, session, multi_team,
expected_teams):
+ (asset,) = self.create_assets(session, num=1)
+ mock_auth_mgr = mock.MagicMock()
+ mock_auth_mgr.get_authorized_teams.return_value = {"team_a", "team_b"}
+
+ with (
+ mock.patch(
+ f"{self._ROUTE}.conf.getboolean",
+ side_effect=lambda s, k, **kw: multi_team if k == "multi_team"
else kw.get("fallback"),
+ ),
+ mock.patch(f"{self._ROUTE}.get_auth_manager",
return_value=mock_auth_mgr),
+ mock.patch(
+ f"{self._ROUTE}.asset_manager.register_asset_change",
+ spec=True,
+ return_value=self._make_mock_event(asset),
+ ) as mock_register,
+ ):
+ response = test_client.post("/assets/events", json={"asset_id":
asset.id, "extra": {}})
+
+ assert response.status_code == 200
+ call_kwargs = mock_register.call_args.kwargs
+ assert call_kwargs["source_is_api"] is True
+ assert call_kwargs["api_user_teams"] == expected_teams
+
+
@pytest.mark.need_serialized_dag
class TestPostAssetMaterialize(TestAssets):
DAG_ASSET1_ID = "test_dag_1"