This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new 4ec9f797e56 Add additional permission check in asset materialization 
(#63338) (#63363)
4ec9f797e56 is described below

commit 4ec9f797e56e6fb2297297937bc7836b138275de
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Wed Mar 11 19:24:02 2026 +0100

    Add additional permission check in asset materialization (#63338) (#63363)
---
 .../api_fastapi/core_api/routes/public/assets.py   | 13 +++++++++++++
 .../core_api/routes/public/test_assets.py          | 22 ++++++++++++++++++++++
 2 files changed, 35 insertions(+)

diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
index b1bafae7766..714b77c65b9 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
@@ -25,6 +25,8 @@ from sqlalchemy import and_, delete, func, select
 from sqlalchemy.orm import joinedload, subqueryload
 
 from airflow._shared.timezones import timezone
+from airflow.api_fastapi.app import get_auth_manager
+from airflow.api_fastapi.auth.managers.models.resource_details import 
DagAccessEntity, DagDetails
 from airflow.api_fastapi.common.dagbag import DagBagDep, 
get_latest_version_of_dag
 from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
 from airflow.api_fastapi.common.parameters import (
@@ -372,6 +374,17 @@ def materialize_asset(
             f"More than one DAG materializes asset with ID: {asset_id}",
         )
 
+    if not get_auth_manager().is_authorized_dag(
+        method="POST",
+        access_entity=DagAccessEntity.RUN,
+        details=DagDetails(id=dag_id),
+        user=user,
+    ):
+        raise HTTPException(
+            status.HTTP_403_FORBIDDEN,
+            f"User is not authorized to trigger a run for DAG: {dag_id} that 
materializes this asset",
+        )
+
     dag = get_latest_version_of_dag(dag_bag, dag_id, session)
     return dag.create_dagrun(
         run_id=dag.timetable.generate_run_id(
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
index 625b5abc9d7..451dbf22efd 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
@@ -24,6 +24,7 @@ import pytest
 import time_machine
 
 from airflow._shared.timezones import timezone
+from airflow.api_fastapi.auth.managers.models.resource_details import 
DagAccessEntity, DagDetails
 from airflow.models import DagModel
 from airflow.models.asset import (
     AssetActive,
@@ -1261,6 +1262,27 @@ class TestPostAssetMaterialize(TestAssets):
         assert response.status_code == 404
         assert response.json()["detail"] == "No DAG materializes asset with 
ID: 3"
 
+    @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+    def test_should_respond_403_when_user_cannot_trigger_dag(self, 
test_client):
+        with mock.patch(
+            
"airflow.api_fastapi.core_api.routes.public.assets.get_auth_manager",
+            autospec=True,
+        ) as mock_get_auth_manager:
+            mock_get_auth_manager.return_value.is_authorized_dag.return_value 
= False
+
+            response = test_client.post("/assets/1/materialize")
+
+            assert response.status_code == 403
+            assert response.json()["detail"] == (
+                f"User is not authorized to trigger a run for DAG: 
{self.DAG_ASSET1_ID} that materializes this asset"
+            )
+            
mock_get_auth_manager.return_value.is_authorized_dag.assert_called_once_with(
+                method="POST",
+                access_entity=DagAccessEntity.RUN,
+                details=DagDetails(id=self.DAG_ASSET1_ID),
+                user=mock.ANY,
+            )
+
 
 class TestGetAssetQueuedEvents(TestQueuedEventEndpoint):
     @pytest.mark.usefixtures("time_freezer")

Reply via email to