This is an automated email from the ASF dual-hosted git repository.
rahulvats pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-2-test by this push:
new 3840fae2e16 [v3-2-test] Fix ObjectStoragePath NoCredentialsError when
using conn_id with remote stores (#64634) (#64646)
3840fae2e16 is described below
commit 3840fae2e16ecbd86fda948607a5822a7c506703
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Apr 3 12:07:24 2026 +0530
[v3-2-test] Fix ObjectStoragePath NoCredentialsError when using conn_id
with remote stores (#64634) (#64646)
* Fix ObjectStoragePath credential resolution by injecting authenticated fs
into __wrapped__._fs_cached
(cherry picked from commit f391942b90f2347272c321bcdd092c7b109cdc9e)
Co-authored-by: Rahul Vats <[email protected]>
---
task-sdk/src/airflow/sdk/io/path.py | 38 +++++++++++
task-sdk/tests/task_sdk/io/test_path.py | 109 ++++++++++++++++++++++++++++++++
2 files changed, 147 insertions(+)
diff --git a/task-sdk/src/airflow/sdk/io/path.py
b/task-sdk/src/airflow/sdk/io/path.py
index cf78bb4ebc6..14efa2b130a 100644
--- a/task-sdk/src/airflow/sdk/io/path.py
+++ b/task-sdk/src/airflow/sdk/io/path.py
@@ -17,6 +17,7 @@
from __future__ import annotations
+import logging
import shutil
from typing import TYPE_CHECKING, Any, ClassVar
from urllib.parse import urlsplit
@@ -33,6 +34,8 @@ if TYPE_CHECKING:
from typing_extensions import Self
from upath.types import JoinablePathLike
+log = logging.getLogger(__name__)
+
class _TrackingFileWrapper:
"""Wrapper that tracks file operations to intercept lineage."""
@@ -116,6 +119,13 @@ class ObjectStoragePath(ProxyUPath):
# to the underlying fsspec filesystem, which doesn't understand it
self._conn_id = storage_options.pop("conn_id", None)
super().__init__(*args, protocol=protocol, **storage_options)
+ # ProxyUPath delegates all operations to self.__wrapped__, which was
+ # constructed with empty storage_options (conn_id stripped above).
+ # Pre-populating __wrapped__._fs_cached with the Airflow-authenticated
+ # filesystem fixes every delegated method (exists, mkdir, iterdir,
glob,
+ # walk, rename, read_bytes, write_bytes, …) in one place rather than
+ # requiring individual overrides for each one.
+ self._inject_authenticated_fs(self.__wrapped__)
@classmethod_or_method # type: ignore[arg-type]
def _from_upath(cls_or_self, upath, /):
@@ -127,8 +137,36 @@ class ObjectStoragePath(ProxyUPath):
obj = object.__new__(cls)
obj.__wrapped__ = upath
obj._conn_id = getattr(cls_or_self, "_conn_id", None) if is_instance
else None
+ # If the wrapped UPath has not yet had its fs cached (e.g. when
_from_upath is
+ # called as a classmethod with a fresh UPath), inject the
authenticated fs now.
+ # Child UPaths produced by __wrapped__ operations (iterdir, glob,
etc.) already
+ # inherit _fs_cached from the parent UPath, so the hasattr check is a
no-op for them.
+ if not hasattr(upath, "_fs_cached"):
+ obj._inject_authenticated_fs(upath)
return obj
+ def _inject_authenticated_fs(self, wrapped: UPath) -> None:
+ """
+ Inject the Airflow-authenticated filesystem into wrapped._fs_cached.
+
+ This ensures that all ProxyUPath-delegated operations use the
connection-aware
+ filesystem rather than an unauthenticated one constructed from empty
storage_options.
+ Failures are logged at DEBUG level and silently skipped so that
construction always
+ succeeds — errors will surface naturally at first use of the path.
+ """
+ if self._conn_id is None:
+ return
+ try:
+ wrapped._fs_cached = attach(wrapped.protocol or "file",
self._conn_id).fs
+ except Exception:
+ log.debug(
+ "Could not pre-populate authenticated filesystem for %r
(conn_id=%r); "
+ "operations will attempt lazy resolution at first use.",
+ self,
+ self._conn_id,
+ exc_info=True,
+ )
+
@property
def conn_id(self) -> str | None:
"""Return the connection ID for this path."""
diff --git a/task-sdk/tests/task_sdk/io/test_path.py
b/task-sdk/tests/task_sdk/io/test_path.py
index e65c6584293..3113270a47f 100644
--- a/task-sdk/tests/task_sdk/io/test_path.py
+++ b/task-sdk/tests/task_sdk/io/test_path.py
@@ -26,6 +26,7 @@ from unittest import mock
import pytest
from fsspec.implementations.local import LocalFileSystem
from fsspec.implementations.memory import MemoryFileSystem
+from upath import UPath
from airflow.sdk import Asset, ObjectStoragePath
from airflow.sdk._shared.module_loading import qualname
@@ -228,6 +229,114 @@ class TestAttach:
method.assert_called_once_with(expected_args, **expected_kwargs)
+class TestConnIdCredentialResolution:
+ """
+ Regression tests for https://github.com/apache/airflow/issues/64632
+
+ When ObjectStoragePath was migrated from CloudPath to ProxyUPath (3.2.0),
+ methods like exists(), mkdir(), is_dir(), is_file() were delegated to
+ self.__wrapped__ which carries empty storage_options (conn_id is stored
+ separately). This caused NoCredentialsError / 401 errors for remote stores
+ even when a valid conn_id was provided.
+ """
+
+ @pytest.fixture(autouse=True)
+ def restore_cache(self):
+ cache = _STORE_CACHE.copy()
+ yield
+ _STORE_CACHE.clear()
+ _STORE_CACHE.update(cache)
+
+ @pytest.fixture
+ def fake_fs_with_conn(self):
+ fs = _FakeRemoteFileSystem(conn_id="my_conn")
+ attach(protocol="ffs2", conn_id="my_conn", fs=fs)
+ try:
+ yield fs
+ finally:
+ _FakeRemoteFileSystem.store.clear()
+ _FakeRemoteFileSystem.pseudo_dirs[:] = [""]
+
+ def test_exists_uses_authenticated_fs(self, fake_fs_with_conn):
+ """exists() must use self.fs (Airflow-attached) not __wrapped__.fs
(unauthenticated)."""
+ p = ObjectStoragePath("ffs2://my_conn@bucket/some_file.txt",
conn_id="my_conn")
+ # Verify the correct fs instance was injected, not merely any
_FakeRemoteFileSystem
+ assert p.__wrapped__._fs_cached is fake_fs_with_conn
+ fake_fs_with_conn.touch("bucket/some_file.txt")
+
+ assert p.exists() is True
+ assert (
+ ObjectStoragePath("ffs2://my_conn@bucket/no_such_file.txt",
conn_id="my_conn").exists() is False
+ )
+
+ def test_mkdir_uses_authenticated_fs(self, fake_fs_with_conn):
+ """mkdir() must use self.fs (Airflow-attached) not __wrapped__.fs
(unauthenticated)."""
+ p = ObjectStoragePath("ffs2://my_conn@bucket/new_dir/",
conn_id="my_conn")
+ p.mkdir(parents=True, exist_ok=True)
+ assert fake_fs_with_conn.isdir("bucket/new_dir")
+
+ def test_is_dir_uses_authenticated_fs(self, fake_fs_with_conn):
+ """is_dir() must use self.fs (Airflow-attached) not __wrapped__.fs
(unauthenticated)."""
+ fake_fs_with_conn.mkdir("bucket/a_dir")
+ p = ObjectStoragePath("ffs2://my_conn@bucket/a_dir", conn_id="my_conn")
+ assert p.is_dir() is True
+
+ def test_is_file_uses_authenticated_fs(self, fake_fs_with_conn):
+ """is_file() must use self.fs (Airflow-attached) not __wrapped__.fs
(unauthenticated)."""
+ fake_fs_with_conn.touch("bucket/a_file.txt")
+ p = ObjectStoragePath("ffs2://my_conn@bucket/a_file.txt",
conn_id="my_conn")
+ assert p.is_file() is True
+
+ def test_touch_uses_authenticated_fs(self, fake_fs_with_conn):
+ """touch() must use self.fs (Airflow-attached) not __wrapped__.fs
(unauthenticated)."""
+ p = ObjectStoragePath("ffs2://my_conn@bucket/touched_file.txt",
conn_id="my_conn")
+ p.touch()
+ assert fake_fs_with_conn.exists("bucket/touched_file.txt")
+
+ def test_unlink_uses_authenticated_fs(self, fake_fs_with_conn):
+ """unlink() must use self.fs (Airflow-attached) not __wrapped__.fs
(unauthenticated)."""
+ fake_fs_with_conn.touch("bucket/to_delete.txt")
+ p = ObjectStoragePath("ffs2://my_conn@bucket/to_delete.txt",
conn_id="my_conn")
+ p.unlink()
+ assert not fake_fs_with_conn.exists("bucket/to_delete.txt")
+
+ def test_rmdir_uses_authenticated_fs(self, fake_fs_with_conn):
+ """rmdir() must use self.fs (Airflow-attached) not __wrapped__.fs
(unauthenticated)."""
+ fake_fs_with_conn.mkdir("bucket/empty_dir")
+ p = ObjectStoragePath("ffs2://my_conn@bucket/empty_dir",
conn_id="my_conn")
+ # upath's rmdir(recursive=False) calls next(self.iterdir()) without a
default,
+ # which raises StopIteration on empty dirs — a upath bug. Use the
default (recursive=True).
+ p.rmdir()
+ assert not fake_fs_with_conn.exists("bucket/empty_dir")
+
+ def test_conn_id_in_uri_works_for_exists(self, fake_fs_with_conn):
+ """conn_id embedded in URI (user@host) should also work for
exists()."""
+ fake_fs_with_conn.touch("bucket/target.txt")
+ p = ObjectStoragePath("ffs2://my_conn@bucket/target.txt")
+ assert p.conn_id == "my_conn"
+ assert p.exists() is True
+
+ def test_from_upath_injects_fs_when_no_cache(self, fake_fs_with_conn):
+ """_from_upath must inject authenticated fs into a fresh UPath with no
_fs_cached."""
+ # Simulate _from_upath called as an instance method with a fresh UPath
that has
+ # no _fs_cached set (e.g. cwd() / home() or a cross-protocol
_from_upath call).
+ p_instance = ObjectStoragePath("ffs2://my_conn@bucket/root",
conn_id="my_conn")
+ fresh_upath = UPath("ffs2://bucket/other")
+ assert not hasattr(fresh_upath, "_fs_cached")
+ child = p_instance._from_upath(fresh_upath)
+ assert child.__wrapped__._fs_cached is fake_fs_with_conn
+
+ def test_iterdir_children_use_authenticated_fs(self, fake_fs_with_conn):
+ """Children yielded by iterdir() must also carry the authenticated
filesystem."""
+ fake_fs_with_conn.touch("bucket/dir/file1.txt")
+ fake_fs_with_conn.touch("bucket/dir/file2.txt")
+ p = ObjectStoragePath("ffs2://my_conn@bucket/dir", conn_id="my_conn")
+ children = list(p.iterdir())
+ assert len(children) == 2
+ # Each child path must use the same authenticated fs, not a fresh
unauthenticated one
+ assert all(c.__wrapped__._fs_cached is fake_fs_with_conn for c in
children)
+
+
class TestRemotePath:
def test_bucket_key_protocol(self):
bucket = "bkt"