This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 35ca494ace6 add check for xcom permission when result is specified in
query parameter (#64415)
35ca494ace6 is described below
commit 35ca494ace6ec7d4962c27f5fd7137097934c0b1
Author: Kevin Yang <[email protected]>
AuthorDate: Mon Mar 30 10:47:41 2026 -0400
add check for xcom permission when result is specified in query parameter
(#64415)
---
.../api_fastapi/core_api/routes/public/dag_run.py | 15 +++++++++-
.../core_api/routes/public/test_dag_run.py | 32 ++++++++++++++++++++++
2 files changed, 46 insertions(+), 1 deletion(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
index 67f76307124..ff42238806b 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -33,7 +33,8 @@ from airflow.api.common.mark_tasks import (
set_dag_run_state_to_queued,
set_dag_run_state_to_success,
)
-from airflow.api_fastapi.auth.managers.models.resource_details import
DagAccessEntity
+from airflow.api_fastapi.app import get_auth_manager
+from airflow.api_fastapi.auth.managers.models.resource_details import
DagAccessEntity, DagDetails
from airflow.api_fastapi.common.dagbag import DagBagDep, get_dag_for_run,
get_latest_version_of_dag
from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
from airflow.api_fastapi.common.db.dag_runs import (
@@ -536,6 +537,7 @@ def wait_dag_run_until_finished(
dag_id: str,
dag_run_id: str,
session: SessionDep,
+ user: GetUserDep,
interval: Annotated[float, Query(gt=0.0, description="Seconds to wait
between dag run state checks")],
result_task_ids: Annotated[
list[str] | None,
@@ -543,6 +545,17 @@ def wait_dag_run_until_finished(
] = None,
):
"Wait for a dag run until it finishes, and return its result(s)."
+ if result_task_ids:
+ if not get_auth_manager().is_authorized_dag(
+ method="GET",
+ access_entity=DagAccessEntity.XCOM,
+ details=DagDetails(id=dag_id),
+ user=user,
+ ):
+ raise HTTPException(
+ status.HTTP_403_FORBIDDEN,
+ "User is not authorized to read XCom data for this DAG",
+ )
if not session.scalar(select(1).where(DagRun.dag_id == dag_id,
DagRun.run_id == dag_run_id)):
raise HTTPException(
status.HTTP_404_NOT_FOUND,
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
index 3a55532e3d7..2e80a3501fe 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -2079,3 +2079,35 @@ class TestWaitDagRun:
assert response.status_code == 200
data = response.json()
assert data == {"state": DagRunState.SUCCESS, "results": {"task_1":
'"result_1"'}}
+
+ def test_should_respond_403_when_user_lacks_xcom_permission(self,
test_client):
+ from airflow.api_fastapi.auth.managers.models.resource_details import
DagAccessEntity, DagDetails
+
+ with mock.patch(
+
"airflow.api_fastapi.core_api.routes.public.dag_run.get_auth_manager",
+ autospec=True,
+ ) as mock_get_auth_manager:
+ mock_get_auth_manager.return_value.is_authorized_dag.return_value
= False
+
+ response = test_client.get(
+ f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/wait",
+ params={"interval": "1", "result": "task_1"},
+ )
+
+ assert response.status_code == 403
+
mock_get_auth_manager.return_value.is_authorized_dag.assert_called_once_with(
+ method="GET",
+ access_entity=DagAccessEntity.XCOM,
+ details=DagDetails(id=DAG1_ID),
+ user=mock.ANY,
+ )
+
+ def
test_should_respond_200_without_result_when_user_lacks_xcom_permission(self,
test_client):
+ """Waiting without result parameter should not require XCom
permissions."""
+ response = test_client.get(
+ f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/wait",
+ params={"interval": "1"},
+ )
+ assert response.status_code == 200
+ data = response.json()
+ assert data == {"state": DagRunState.SUCCESS}