This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0e0fceb9b73 Add warning when Bundle path may not be accessible to
impersonated user (#60278)
0e0fceb9b73 is described below
commit 0e0fceb9b73aaf04e904715b9bfe48dcd1d027e4
Author: Dev-iL <[email protected]>
AuthorDate: Thu Jan 15 13:32:11 2026 +0200
Add warning when Bundle path may not be accessible to impersonated user
(#60278)
---
.../administration-and-deployment/dag-bundles.rst | 17 ++++++++
.../src/airflow/sdk/execution_time/task_runner.py | 37 ++++++++++++++++
.../task_sdk/execution_time/test_task_runner.py | 50 ++++++++++++++++++++++
3 files changed, 104 insertions(+)
diff --git a/airflow-core/docs/administration-and-deployment/dag-bundles.rst
b/airflow-core/docs/administration-and-deployment/dag-bundles.rst
index a8a962eb8c9..9dd9b968120 100644
--- a/airflow-core/docs/administration-and-deployment/dag-bundles.rst
+++ b/airflow-core/docs/administration-and-deployment/dag-bundles.rst
@@ -122,6 +122,23 @@ Starting Airflow 3.0.2 git is pre installed in the base
image. However, if you a
ENV GIT_PYTHON_REFRESH=quiet
+Using DAG Bundles with User Impersonation
+-----------------------------------------
+
+When using ``run_as_user`` (user impersonation) with DAG bundles, ensure
proper file permissions
+are configured so that impersonated users can access bundle files created by
the main Airflow process.
+
+1. All impersonated users and the Airflow user should be in the same group
+2. Configure appropriate umask settings (e.g., ``umask 0002``)
+
+
+.. note::
+
+ This permission-based approach is a temporary solution. Future versions of
Airflow
+ will handle multi-user access through supervisor-based bundle operations,
eliminating
+ the need for shared group permissions.
+
+
Writing custom Dag bundles
--------------------------
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index c6bab767bc5..95665c43f3b 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -680,6 +680,7 @@ def parse(what: StartupDetails, log: Logger) ->
RuntimeTaskInstance:
version=bundle_info.version,
)
bundle_instance.initialize()
+ _verify_bundle_access(bundle_instance, log)
dag_absolute_path = os.fspath(Path(bundle_instance.path,
what.dag_rel_path))
bag = BundleDagBag(
@@ -750,6 +751,42 @@ SUPERVISOR_COMMS: CommsDecoder[ToTask, ToSupervisor]
# 3. Shutdown and report status
+def _verify_bundle_access(bundle_instance: BaseDagBundle, log: Logger) -> None:
+ """
+ Verify bundle is accessible by the current user.
+
+ This is called after user impersonation (if any) to ensure the bundle
+ is actually accessible. Uses os.access() which works with any permission
+ scheme (standard Unix permissions, ACLs, SELinux, etc.).
+
+ :param bundle_instance: The bundle instance to check
+ :param log: Logger instance
+ :raises AirflowException: if bundle is not accessible
+ """
+ from getpass import getuser
+
+ from airflow.exceptions import AirflowException
+
+ bundle_path = bundle_instance.path
+
+ if not bundle_path.exists():
+ # Already handled by initialize() with a warning
+ return
+
+ # Check read permission (and execute for directories to list contents)
+ access_mode = os.R_OK
+ if bundle_path.is_dir():
+ access_mode |= os.X_OK
+
+ if not os.access(bundle_path, access_mode):
+ raise AirflowException(
+ f"Bundle '{bundle_instance.name}' path '{bundle_path}' is not
accessible "
+ f"by user '{getuser()}'. When using run_as_user, ensure bundle
directories "
+ f"are readable by the impersonated user. "
+ f"See:
https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/dag-bundles.html"
+ )
+
+
def startup() -> tuple[RuntimeTaskInstance, Context, Logger]:
# The parent sends us a StartupDetails message un-prompted. After this,
every single message is only sent
# in response to us sending a request.
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index 400d7db2d99..f743abdd9a6 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -357,6 +357,56 @@ def test_parse_module_in_bundle_root(tmp_path: Path,
make_ti_context):
assert ti.task.dag.dag_id == "dag_name"
+def test_verify_bundle_access_raises_when_not_accessible(tmp_path: Path,
make_ti_context):
+ """Test that _verify_bundle_access raises AirflowException when bundle
path is not accessible."""
+ from airflow.sdk.execution_time.task_runner import _verify_bundle_access
+
+ # Create a directory that exists
+ bundle_path = tmp_path / "test_bundle"
+ bundle_path.mkdir()
+
+ # Create a mock bundle instance
+ mock_bundle = mock.Mock()
+ mock_bundle.path = bundle_path
+ mock_bundle.name = "test-bundle"
+
+ # Mock os.access to simulate permission denied (avoids root user issues in
CI)
+ with patch("airflow.sdk.execution_time.task_runner.os.access",
return_value=False):
+ with pytest.raises(AirflowException) as exc_info:
+ _verify_bundle_access(mock_bundle, mock.Mock())
+
+ assert "not accessible" in str(exc_info.value)
+ assert "test-bundle" in str(exc_info.value)
+
+
+def test_verify_bundle_access_succeeds_when_readable(tmp_path: Path,
make_ti_context):
+ """Test that _verify_bundle_access succeeds when bundle path is
accessible."""
+ from airflow.sdk.execution_time.task_runner import _verify_bundle_access
+
+ # Create a directory with read permissions
+ bundle_path = tmp_path / "accessible_bundle"
+ bundle_path.mkdir()
+
+ mock_bundle = mock.Mock()
+ mock_bundle.path = bundle_path
+ mock_bundle.name = "test-bundle"
+
+ # Should not raise
+ _verify_bundle_access(mock_bundle, mock.Mock())
+
+
+def test_verify_bundle_access_skips_nonexistent_path(tmp_path: Path):
+ """Test that _verify_bundle_access does nothing when bundle path doesn't
exist."""
+ from airflow.sdk.execution_time.task_runner import _verify_bundle_access
+
+ mock_bundle = mock.Mock()
+ mock_bundle.path = tmp_path / "nonexistent"
+ mock_bundle.name = "test-bundle"
+
+ # Should not raise - nonexistent paths are handled by initialize()
+ _verify_bundle_access(mock_bundle, mock.Mock())
+
+
@pytest.mark.parametrize("use_queues", [False, True])
def test_run_deferred_basic(time_machine, create_runtime_ti,
mock_supervisor_comms, use_queues: bool):
"""Test that a task can transition to a deferred state."""