This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 992e602015b Apply per-file authorization to dag-source endpoint
(#67662)
992e602015b is described below
commit 992e602015b5e9a3fd297e18047cff9d85094c95
Author: Jarek Potiuk <[email protected]>
AuthorDate: Fri Jun 5 02:56:19 2026 +0200
Apply per-file authorization to dag-source endpoint (#67662)
* Apply per-file authorization to dag-source endpoint
A single source file can define multiple Dags. The /dagSources/{dag_id}
endpoint previously returned the file's full source code as soon as the
caller had CODE access to dag_id, even when the caller was not
authorized to read every other Dag defined in the same file.
Apply the same per-file authorization overlay already used by the
import-errors endpoint (apache/airflow#65329): enumerate the Dags
sharing the (relative_fileloc, bundle_name) of the requested Dag,
intersect with the caller's readable Dag set, and redact the source
when any co-located Dag is not readable.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
* Document per-file authorization boundary for dag-source endpoint
Add a Security Model subsection that describes the per-Dag read scope
the dag-source retrieval endpoint enforces, and the known limitation
around historical-version retrieval: the per-Dag scope is evaluated
against the current file membership, which may differ from the file's
contents at the time the requested version was stored. Deployments
that rely on per-Dag read scoping for source isolation should keep one
Dag per source file, or restrict DagAccessEntity.CODE accordingly.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
---------
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
airflow-core/docs/security/security_model.rst | 25 +++++++++
.../core_api/routes/public/dag_sources.py | 33 +++++++++++-
.../core_api/routes/public/test_dag_sources.py | 59 ++++++++++++++++++++++
3 files changed, 115 insertions(+), 2 deletions(-)
diff --git a/airflow-core/docs/security/security_model.rst
b/airflow-core/docs/security/security_model.rst
index 27294d1f99a..d69c19824d5 100644
--- a/airflow-core/docs/security/security_model.rst
+++ b/airflow-core/docs/security/security_model.rst
@@ -234,6 +234,31 @@ boundaries will be improved in future versions of Airflow.
Until then, you shoul
have access to all Dags and shared resources, and can modify their state
regardless of team assignment.
+Per-Dag read access and source-code retrieval
+---------------------------------------------
+
+The dag-source retrieval endpoint (``GET /api/v2/dagSources/{dag_id}``) honors
+per-Dag read scoping for the **current** Dag-to-file mapping: if the file
+backing the requested Dag also defines other Dags the caller is not authorized
+to read, the endpoint returns a redacted placeholder instead of the source.
+
+The endpoint also supports retrieving historical source via the optional
+``version_number`` query parameter. For historical versions, the per-Dag scope
+is enforced using the **current** file membership, which may differ from the
+file's contents at the time the requested version was stored. As a consequence,
+requesting an older version may return source containing a Dag that has since
+been removed from the file, even if the caller does not currently have read
+access to that removed Dag. Conversely, requesting an older version may return
+the redacted placeholder when a later-added co-located Dag is not in the
+caller's readable set, even though the requested historical source predates
+that addition.
+
+Deployments that rely on per-Dag read scoping for source isolation should
+either keep one Dag per source file, or restrict ``DagAccessEntity.CODE`` to
+roles that are trusted to read every Dag that has ever co-existed in any
+source file.
+
+
Security contexts for Dag author submitted code
-----------------------------------------------
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_sources.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_sources.py
index 26920007672..6723938ec1b 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_sources.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_sources.py
@@ -17,7 +17,9 @@
from __future__ import annotations
from fastapi import Depends, HTTPException, Response, status
+from sqlalchemy import select
+from airflow.api_fastapi.app import get_auth_manager
from airflow.api_fastapi.auth.managers.models.resource_details import
DagAccessEntity
from airflow.api_fastapi.common.db.common import SessionDep
from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrText
@@ -25,9 +27,11 @@ from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.common.types import Mimetype
from airflow.api_fastapi.core_api.datamodels.dag_sources import
DAGSourceResponse
from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_exception_doc
-from airflow.api_fastapi.core_api.security import requires_access_dag
+from airflow.api_fastapi.core_api.security import GetUserDep,
requires_access_dag
+from airflow.models import DagModel
from airflow.models.dag_version import DagVersion
+REDACTED_SOURCE = "REDACTED - you do not have read permission on all Dags in
the file"
dag_sources_router = AirflowRouter(tags=["DagSource"], prefix="/dagSources")
@@ -55,6 +59,7 @@ def get_dag_source(
accept: HeaderAcceptJsonOrText,
dag_id: str,
session: SessionDep,
+ user: GetUserDep,
version_number: int | None = None,
):
"""Get source code using file token."""
@@ -69,9 +74,33 @@ def get_dag_source(
status.HTTP_404_NOT_FOUND,
detail=f"Code not found. dag_id='{dag_id}'
version_number='{version_number}'",
)
+
+ # Per-file authorization overlay on top of the ``DagAccessEntity.CODE``
+ # check above: a single source file may define multiple Dags, and the
+ # caller having CODE access to ``dag_id`` does not imply they may read
+ # every other Dag co-located in the file. Match the file by
+ # ``(relative_fileloc, bundle_name)`` -- the same keying
+ # ``import_error.py`` uses for its equivalent check -- and redact the
+ # response when any co-located Dag is not in the caller's readable set.
+ content = dag_version.dag_code.source_code
+ dag_model = dag_version.dag_model
+ if dag_model is not None and dag_model.relative_fileloc:
+ file_dag_ids = set(
+ session.scalars(
+ select(DagModel.dag_id).where(
+ DagModel.relative_fileloc == dag_model.relative_fileloc,
+ DagModel.bundle_name == dag_model.bundle_name,
+ )
+ ).all()
+ )
+ if file_dag_ids:
+ readable_dag_ids =
get_auth_manager().get_authorized_dag_ids(user=user)
+ if not file_dag_ids.issubset(readable_dag_ids):
+ content = REDACTED_SOURCE
+
dag_source_model = DAGSourceResponse(
dag_id=dag_id,
- content=dag_version.dag_code.source_code,
+ content=content,
version_number=dag_version.version_number,
dag_display_name=dag_version.dag_model.dag_display_name,
)
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_sources.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_sources.py
index a85c6a9f3a7..029d74007ee 100644
---
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_sources.py
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_sources.py
@@ -18,12 +18,15 @@
from __future__ import annotations
import json
+from unittest import mock
import pendulum
import pytest
from httpx import Response
from sqlalchemy import select
+from airflow.api_fastapi.core_api.routes.public.dag_sources import
REDACTED_SOURCE
+from airflow.models import DagModel
from airflow.models.dagbag import DBDagBag
from airflow.models.dagcode import DagCode
from airflow.utils.state import DagRunState
@@ -178,3 +181,59 @@ class TestGetDAGSource:
url = f"{API_PREFIX}/{wrong_fileloc}"
response = test_client.get(url, headers={"Accept": "application/json"})
assert response.status_code == 404
+
+ @pytest.fixture
+ def colocated_unreadable_dag(self, session, test_dag):
+ """Insert a second ``DagModel`` sharing the requested Dag's source
file."""
+ requested = session.scalar(select(DagModel).where(DagModel.dag_id ==
TEST_DAG_ID))
+ colocated = DagModel(
+ dag_id="other_dag_in_same_file",
+ fileloc=requested.fileloc,
+ relative_fileloc=requested.relative_fileloc,
+ bundle_name=requested.bundle_name,
+ is_paused=False,
+ )
+ session.add(colocated)
+ session.commit()
+ return colocated
+
+
@mock.patch("airflow.api_fastapi.core_api.routes.public.dag_sources.get_auth_manager")
+ def test_source_is_redacted_when_caller_cannot_read_all_dags_in_file(
+ self, mock_get_auth_manager, test_client, test_dag,
colocated_unreadable_dag
+ ):
+ mock_get_auth_manager.return_value.get_authorized_dag_ids.return_value
= {TEST_DAG_ID}
+
+ response: Response = test_client.get(
+ f"{API_PREFIX}/{TEST_DAG_ID}", headers={"Accept":
"application/json"}
+ )
+
+ assert response.status_code == 200
+ assert response.json() == {
+ "content": REDACTED_SOURCE,
+ "dag_id": TEST_DAG_ID,
+ "version_number": 1,
+ "dag_display_name": TEST_DAG_DISPLAY_NAME,
+ }
+
mock_get_auth_manager.return_value.get_authorized_dag_ids.assert_called_once_with(user=mock.ANY)
+
+
@mock.patch("airflow.api_fastapi.core_api.routes.public.dag_sources.get_auth_manager")
+ def test_source_is_returned_when_caller_can_read_all_dags_in_file(
+ self, mock_get_auth_manager, test_client, test_dag,
colocated_unreadable_dag
+ ):
+ mock_get_auth_manager.return_value.get_authorized_dag_ids.return_value
= {
+ TEST_DAG_ID,
+ colocated_unreadable_dag.dag_id,
+ }
+ dag_content = self._get_dag_file_code(test_dag.fileloc)
+
+ response: Response = test_client.get(
+ f"{API_PREFIX}/{TEST_DAG_ID}", headers={"Accept":
"application/json"}
+ )
+
+ assert response.status_code == 200
+ assert response.json() == {
+ "content": dag_content,
+ "dag_id": TEST_DAG_ID,
+ "version_number": 1,
+ "dag_display_name": TEST_DAG_DISPLAY_NAME,
+ }