This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 992e602015b Apply per-file authorization to dag-source endpoint 
(#67662)
992e602015b is described below

commit 992e602015b5e9a3fd297e18047cff9d85094c95
Author: Jarek Potiuk <[email protected]>
AuthorDate: Fri Jun 5 02:56:19 2026 +0200

    Apply per-file authorization to dag-source endpoint (#67662)
    
    * Apply per-file authorization to dag-source endpoint
    
    A single source file can define multiple Dags. The /dagSources/{dag_id}
    endpoint previously returned the file's full source code as soon as the
    caller had CODE access to dag_id, even when the caller was not
    authorized to read every other Dag defined in the same file.
    
    Apply the same per-file authorization overlay already used by the
    import-errors endpoint (apache/airflow#65329): enumerate the Dags
    sharing the (relative_fileloc, bundle_name) of the requested Dag,
    intersect with the caller's readable Dag set, and redact the source
    when any co-located Dag is not readable.
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
    
    * Document per-file authorization boundary for dag-source endpoint
    
    Add a Security Model subsection that describes the per-Dag read scope
    the dag-source retrieval endpoint enforces, and the known limitation
    around historical-version retrieval: the per-Dag scope is evaluated
    against the current file membership, which may differ from the file's
    contents at the time the requested version was stored. Deployments
    that rely on per-Dag read scoping for source isolation should keep one
    Dag per source file, or restrict DagAccessEntity.CODE accordingly.
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
    
    ---------
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 airflow-core/docs/security/security_model.rst      | 25 +++++++++
 .../core_api/routes/public/dag_sources.py          | 33 +++++++++++-
 .../core_api/routes/public/test_dag_sources.py     | 59 ++++++++++++++++++++++
 3 files changed, 115 insertions(+), 2 deletions(-)

diff --git a/airflow-core/docs/security/security_model.rst 
b/airflow-core/docs/security/security_model.rst
index 27294d1f99a..d69c19824d5 100644
--- a/airflow-core/docs/security/security_model.rst
+++ b/airflow-core/docs/security/security_model.rst
@@ -234,6 +234,31 @@ boundaries will be improved in future versions of Airflow. 
Until then, you shoul
 have access to all Dags and shared resources, and can modify their state 
regardless of team assignment.
 
 
+Per-Dag read access and source-code retrieval
+---------------------------------------------
+
+The dag-source retrieval endpoint (``GET /api/v2/dagSources/{dag_id}``) honors
+per-Dag read scoping for the **current** Dag-to-file mapping: if the file
+backing the requested Dag also defines other Dags the caller is not authorized
+to read, the endpoint returns a redacted placeholder instead of the source.
+
+The endpoint also supports retrieving historical source via the optional
+``version_number`` query parameter. For historical versions, the per-Dag scope
+is enforced using the **current** file membership, which may differ from the
+file's contents at the time the requested version was stored. As a consequence,
+requesting an older version may return source containing a Dag that has since
+been removed from the file, even if the caller does not currently have read
+access to that removed Dag. Conversely, requesting an older version may return
+the redacted placeholder when a later-added co-located Dag is not in the
+caller's readable set, even though the requested historical source predates
+that addition.
+
+Deployments that rely on per-Dag read scoping for source isolation should
+either keep one Dag per source file, or restrict ``DagAccessEntity.CODE`` to
+roles that are trusted to read every Dag that has ever co-existed in any
+source file.
+
+
 Security contexts for Dag author submitted code
 -----------------------------------------------
 
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_sources.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_sources.py
index 26920007672..6723938ec1b 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_sources.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_sources.py
@@ -17,7 +17,9 @@
 from __future__ import annotations
 
 from fastapi import Depends, HTTPException, Response, status
+from sqlalchemy import select
 
+from airflow.api_fastapi.app import get_auth_manager
 from airflow.api_fastapi.auth.managers.models.resource_details import 
DagAccessEntity
 from airflow.api_fastapi.common.db.common import SessionDep
 from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrText
@@ -25,9 +27,11 @@ from airflow.api_fastapi.common.router import AirflowRouter
 from airflow.api_fastapi.common.types import Mimetype
 from airflow.api_fastapi.core_api.datamodels.dag_sources import 
DAGSourceResponse
 from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
-from airflow.api_fastapi.core_api.security import requires_access_dag
+from airflow.api_fastapi.core_api.security import GetUserDep, 
requires_access_dag
+from airflow.models import DagModel
 from airflow.models.dag_version import DagVersion
 
+REDACTED_SOURCE = "REDACTED - you do not have read permission on all Dags in 
the file"
 dag_sources_router = AirflowRouter(tags=["DagSource"], prefix="/dagSources")
 
 
@@ -55,6 +59,7 @@ def get_dag_source(
     accept: HeaderAcceptJsonOrText,
     dag_id: str,
     session: SessionDep,
+    user: GetUserDep,
     version_number: int | None = None,
 ):
     """Get source code using file token."""
@@ -69,9 +74,33 @@ def get_dag_source(
             status.HTTP_404_NOT_FOUND,
             detail=f"Code not found. dag_id='{dag_id}' 
version_number='{version_number}'",
         )
+
+    # Per-file authorization overlay on top of the ``DagAccessEntity.CODE``
+    # check above: a single source file may define multiple Dags, and the
+    # caller having CODE access to ``dag_id`` does not imply they may read
+    # every other Dag co-located in the file. Match the file by
+    # ``(relative_fileloc, bundle_name)`` -- the same keying
+    # ``import_error.py`` uses for its equivalent check -- and redact the
+    # response when any co-located Dag is not in the caller's readable set.
+    content = dag_version.dag_code.source_code
+    dag_model = dag_version.dag_model
+    if dag_model is not None and dag_model.relative_fileloc:
+        file_dag_ids = set(
+            session.scalars(
+                select(DagModel.dag_id).where(
+                    DagModel.relative_fileloc == dag_model.relative_fileloc,
+                    DagModel.bundle_name == dag_model.bundle_name,
+                )
+            ).all()
+        )
+        if file_dag_ids:
+            readable_dag_ids = 
get_auth_manager().get_authorized_dag_ids(user=user)
+            if not file_dag_ids.issubset(readable_dag_ids):
+                content = REDACTED_SOURCE
+
     dag_source_model = DAGSourceResponse(
         dag_id=dag_id,
-        content=dag_version.dag_code.source_code,
+        content=content,
         version_number=dag_version.version_number,
         dag_display_name=dag_version.dag_model.dag_display_name,
     )
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_sources.py
 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_sources.py
index a85c6a9f3a7..029d74007ee 100644
--- 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_sources.py
+++ 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_sources.py
@@ -18,12 +18,15 @@
 from __future__ import annotations
 
 import json
+from unittest import mock
 
 import pendulum
 import pytest
 from httpx import Response
 from sqlalchemy import select
 
+from airflow.api_fastapi.core_api.routes.public.dag_sources import 
REDACTED_SOURCE
+from airflow.models import DagModel
 from airflow.models.dagbag import DBDagBag
 from airflow.models.dagcode import DagCode
 from airflow.utils.state import DagRunState
@@ -178,3 +181,59 @@ class TestGetDAGSource:
         url = f"{API_PREFIX}/{wrong_fileloc}"
         response = test_client.get(url, headers={"Accept": "application/json"})
         assert response.status_code == 404
+
+    @pytest.fixture
+    def colocated_unreadable_dag(self, session, test_dag):
+        """Insert a second ``DagModel`` sharing the requested Dag's source 
file."""
+        requested = session.scalar(select(DagModel).where(DagModel.dag_id == 
TEST_DAG_ID))
+        colocated = DagModel(
+            dag_id="other_dag_in_same_file",
+            fileloc=requested.fileloc,
+            relative_fileloc=requested.relative_fileloc,
+            bundle_name=requested.bundle_name,
+            is_paused=False,
+        )
+        session.add(colocated)
+        session.commit()
+        return colocated
+
+    
@mock.patch("airflow.api_fastapi.core_api.routes.public.dag_sources.get_auth_manager")
+    def test_source_is_redacted_when_caller_cannot_read_all_dags_in_file(
+        self, mock_get_auth_manager, test_client, test_dag, 
colocated_unreadable_dag
+    ):
+        mock_get_auth_manager.return_value.get_authorized_dag_ids.return_value 
= {TEST_DAG_ID}
+
+        response: Response = test_client.get(
+            f"{API_PREFIX}/{TEST_DAG_ID}", headers={"Accept": 
"application/json"}
+        )
+
+        assert response.status_code == 200
+        assert response.json() == {
+            "content": REDACTED_SOURCE,
+            "dag_id": TEST_DAG_ID,
+            "version_number": 1,
+            "dag_display_name": TEST_DAG_DISPLAY_NAME,
+        }
+        
mock_get_auth_manager.return_value.get_authorized_dag_ids.assert_called_once_with(user=mock.ANY)
+
+    
@mock.patch("airflow.api_fastapi.core_api.routes.public.dag_sources.get_auth_manager")
+    def test_source_is_returned_when_caller_can_read_all_dags_in_file(
+        self, mock_get_auth_manager, test_client, test_dag, 
colocated_unreadable_dag
+    ):
+        mock_get_auth_manager.return_value.get_authorized_dag_ids.return_value 
= {
+            TEST_DAG_ID,
+            colocated_unreadable_dag.dag_id,
+        }
+        dag_content = self._get_dag_file_code(test_dag.fileloc)
+
+        response: Response = test_client.get(
+            f"{API_PREFIX}/{TEST_DAG_ID}", headers={"Accept": 
"application/json"}
+        )
+
+        assert response.status_code == 200
+        assert response.json() == {
+            "content": dag_content,
+            "dag_id": TEST_DAG_ID,
+            "version_number": 1,
+            "dag_display_name": TEST_DAG_DISPLAY_NAME,
+        }

Reply via email to