This is an automated email from the ASF dual-hosted git repository.

rahulvats pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-2-test by this push:
     new 3840fae2e16 [v3-2-test] Fix ObjectStoragePath NoCredentialsError when 
using conn_id with remote stores (#64634) (#64646)
3840fae2e16 is described below

commit 3840fae2e16ecbd86fda948607a5822a7c506703
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Apr 3 12:07:24 2026 +0530

    [v3-2-test] Fix ObjectStoragePath NoCredentialsError when using conn_id 
with remote stores (#64634) (#64646)
    
    * Fix ObjectStoragePath credential resolution by injecting authenticated fs 
into __wrapped__._fs_cached
    (cherry picked from commit f391942b90f2347272c321bcdd092c7b109cdc9e)
    
    Co-authored-by: Rahul Vats <[email protected]>
---
 task-sdk/src/airflow/sdk/io/path.py     |  38 +++++++++++
 task-sdk/tests/task_sdk/io/test_path.py | 109 ++++++++++++++++++++++++++++++++
 2 files changed, 147 insertions(+)

diff --git a/task-sdk/src/airflow/sdk/io/path.py 
b/task-sdk/src/airflow/sdk/io/path.py
index cf78bb4ebc6..14efa2b130a 100644
--- a/task-sdk/src/airflow/sdk/io/path.py
+++ b/task-sdk/src/airflow/sdk/io/path.py
@@ -17,6 +17,7 @@
 
 from __future__ import annotations
 
+import logging
 import shutil
 from typing import TYPE_CHECKING, Any, ClassVar
 from urllib.parse import urlsplit
@@ -33,6 +34,8 @@ if TYPE_CHECKING:
     from typing_extensions import Self
     from upath.types import JoinablePathLike
 
+log = logging.getLogger(__name__)
+
 
 class _TrackingFileWrapper:
     """Wrapper that tracks file operations to intercept lineage."""
@@ -116,6 +119,13 @@ class ObjectStoragePath(ProxyUPath):
         # to the underlying fsspec filesystem, which doesn't understand it
         self._conn_id = storage_options.pop("conn_id", None)
         super().__init__(*args, protocol=protocol, **storage_options)
+        # ProxyUPath delegates all operations to self.__wrapped__, which was
+        # constructed with empty storage_options (conn_id stripped above).
+        # Pre-populating __wrapped__._fs_cached with the Airflow-authenticated
+        # filesystem fixes every delegated method (exists, mkdir, iterdir, 
glob,
+        # walk, rename, read_bytes, write_bytes, …) in one place rather than
+        # requiring individual overrides for each one.
+        self._inject_authenticated_fs(self.__wrapped__)
 
     @classmethod_or_method  # type: ignore[arg-type]
     def _from_upath(cls_or_self, upath, /):
@@ -127,8 +137,36 @@ class ObjectStoragePath(ProxyUPath):
         obj = object.__new__(cls)
         obj.__wrapped__ = upath
         obj._conn_id = getattr(cls_or_self, "_conn_id", None) if is_instance 
else None
+        # If the wrapped UPath has not yet had its fs cached (e.g. when 
_from_upath is
+        # called as a classmethod with a fresh UPath), inject the 
authenticated fs now.
+        # Child UPaths produced by __wrapped__ operations (iterdir, glob, 
etc.) already
+        # inherit _fs_cached from the parent UPath, so the hasattr check is a 
no-op for them.
+        if not hasattr(upath, "_fs_cached"):
+            obj._inject_authenticated_fs(upath)
         return obj
 
+    def _inject_authenticated_fs(self, wrapped: UPath) -> None:
+        """
+        Inject the Airflow-authenticated filesystem into wrapped._fs_cached.
+
+        This ensures that all ProxyUPath-delegated operations use the 
connection-aware
+        filesystem rather than an unauthenticated one constructed from empty 
storage_options.
+        Failures are logged at DEBUG level and silently skipped so that 
construction always
+        succeeds — errors will surface naturally at first use of the path.
+        """
+        if self._conn_id is None:
+            return
+        try:
+            wrapped._fs_cached = attach(wrapped.protocol or "file", 
self._conn_id).fs
+        except Exception:
+            log.debug(
+                "Could not pre-populate authenticated filesystem for %r 
(conn_id=%r); "
+                "operations will attempt lazy resolution at first use.",
+                self,
+                self._conn_id,
+                exc_info=True,
+            )
+
     @property
     def conn_id(self) -> str | None:
         """Return the connection ID for this path."""
diff --git a/task-sdk/tests/task_sdk/io/test_path.py 
b/task-sdk/tests/task_sdk/io/test_path.py
index e65c6584293..3113270a47f 100644
--- a/task-sdk/tests/task_sdk/io/test_path.py
+++ b/task-sdk/tests/task_sdk/io/test_path.py
@@ -26,6 +26,7 @@ from unittest import mock
 import pytest
 from fsspec.implementations.local import LocalFileSystem
 from fsspec.implementations.memory import MemoryFileSystem
+from upath import UPath
 
 from airflow.sdk import Asset, ObjectStoragePath
 from airflow.sdk._shared.module_loading import qualname
@@ -228,6 +229,114 @@ class TestAttach:
             method.assert_called_once_with(expected_args, **expected_kwargs)
 
 
+class TestConnIdCredentialResolution:
+    """
+    Regression tests for https://github.com/apache/airflow/issues/64632
+
+    When ObjectStoragePath was migrated from CloudPath to ProxyUPath (3.2.0),
+    methods like exists(), mkdir(), is_dir(), is_file() were delegated to
+    self.__wrapped__ which carries empty storage_options (conn_id is stored
+    separately). This caused NoCredentialsError / 401 errors for remote stores
+    even when a valid conn_id was provided.
+    """
+
+    @pytest.fixture(autouse=True)
+    def restore_cache(self):
+        cache = _STORE_CACHE.copy()
+        yield
+        _STORE_CACHE.clear()
+        _STORE_CACHE.update(cache)
+
+    @pytest.fixture
+    def fake_fs_with_conn(self):
+        fs = _FakeRemoteFileSystem(conn_id="my_conn")
+        attach(protocol="ffs2", conn_id="my_conn", fs=fs)
+        try:
+            yield fs
+        finally:
+            _FakeRemoteFileSystem.store.clear()
+            _FakeRemoteFileSystem.pseudo_dirs[:] = [""]
+
+    def test_exists_uses_authenticated_fs(self, fake_fs_with_conn):
+        """exists() must use self.fs (Airflow-attached) not __wrapped__.fs 
(unauthenticated)."""
+        p = ObjectStoragePath("ffs2://my_conn@bucket/some_file.txt", 
conn_id="my_conn")
+        # Verify the correct fs instance was injected, not merely any 
_FakeRemoteFileSystem
+        assert p.__wrapped__._fs_cached is fake_fs_with_conn
+        fake_fs_with_conn.touch("bucket/some_file.txt")
+
+        assert p.exists() is True
+        assert (
+            ObjectStoragePath("ffs2://my_conn@bucket/no_such_file.txt", 
conn_id="my_conn").exists() is False
+        )
+
+    def test_mkdir_uses_authenticated_fs(self, fake_fs_with_conn):
+        """mkdir() must use self.fs (Airflow-attached) not __wrapped__.fs 
(unauthenticated)."""
+        p = ObjectStoragePath("ffs2://my_conn@bucket/new_dir/", 
conn_id="my_conn")
+        p.mkdir(parents=True, exist_ok=True)
+        assert fake_fs_with_conn.isdir("bucket/new_dir")
+
+    def test_is_dir_uses_authenticated_fs(self, fake_fs_with_conn):
+        """is_dir() must use self.fs (Airflow-attached) not __wrapped__.fs 
(unauthenticated)."""
+        fake_fs_with_conn.mkdir("bucket/a_dir")
+        p = ObjectStoragePath("ffs2://my_conn@bucket/a_dir", conn_id="my_conn")
+        assert p.is_dir() is True
+
+    def test_is_file_uses_authenticated_fs(self, fake_fs_with_conn):
+        """is_file() must use self.fs (Airflow-attached) not __wrapped__.fs 
(unauthenticated)."""
+        fake_fs_with_conn.touch("bucket/a_file.txt")
+        p = ObjectStoragePath("ffs2://my_conn@bucket/a_file.txt", 
conn_id="my_conn")
+        assert p.is_file() is True
+
+    def test_touch_uses_authenticated_fs(self, fake_fs_with_conn):
+        """touch() must use self.fs (Airflow-attached) not __wrapped__.fs 
(unauthenticated)."""
+        p = ObjectStoragePath("ffs2://my_conn@bucket/touched_file.txt", 
conn_id="my_conn")
+        p.touch()
+        assert fake_fs_with_conn.exists("bucket/touched_file.txt")
+
+    def test_unlink_uses_authenticated_fs(self, fake_fs_with_conn):
+        """unlink() must use self.fs (Airflow-attached) not __wrapped__.fs 
(unauthenticated)."""
+        fake_fs_with_conn.touch("bucket/to_delete.txt")
+        p = ObjectStoragePath("ffs2://my_conn@bucket/to_delete.txt", 
conn_id="my_conn")
+        p.unlink()
+        assert not fake_fs_with_conn.exists("bucket/to_delete.txt")
+
+    def test_rmdir_uses_authenticated_fs(self, fake_fs_with_conn):
+        """rmdir() must use self.fs (Airflow-attached) not __wrapped__.fs 
(unauthenticated)."""
+        fake_fs_with_conn.mkdir("bucket/empty_dir")
+        p = ObjectStoragePath("ffs2://my_conn@bucket/empty_dir", 
conn_id="my_conn")
+        # upath's rmdir(recursive=False) calls next(self.iterdir()) without a 
default,
+        # which raises StopIteration on empty dirs — a upath bug. Use the 
default (recursive=True).
+        p.rmdir()
+        assert not fake_fs_with_conn.exists("bucket/empty_dir")
+
+    def test_conn_id_in_uri_works_for_exists(self, fake_fs_with_conn):
+        """conn_id embedded in URI (user@host) should also work for 
exists()."""
+        fake_fs_with_conn.touch("bucket/target.txt")
+        p = ObjectStoragePath("ffs2://my_conn@bucket/target.txt")
+        assert p.conn_id == "my_conn"
+        assert p.exists() is True
+
+    def test_from_upath_injects_fs_when_no_cache(self, fake_fs_with_conn):
+        """_from_upath must inject authenticated fs into a fresh UPath with no 
_fs_cached."""
+        # Simulate _from_upath called as an instance method with a fresh UPath 
that has
+        # no _fs_cached set (e.g. cwd() / home() or a cross-protocol 
_from_upath call).
+        p_instance = ObjectStoragePath("ffs2://my_conn@bucket/root", 
conn_id="my_conn")
+        fresh_upath = UPath("ffs2://bucket/other")
+        assert not hasattr(fresh_upath, "_fs_cached")
+        child = p_instance._from_upath(fresh_upath)
+        assert child.__wrapped__._fs_cached is fake_fs_with_conn
+
+    def test_iterdir_children_use_authenticated_fs(self, fake_fs_with_conn):
+        """Children yielded by iterdir() must also carry the authenticated 
filesystem."""
+        fake_fs_with_conn.touch("bucket/dir/file1.txt")
+        fake_fs_with_conn.touch("bucket/dir/file2.txt")
+        p = ObjectStoragePath("ffs2://my_conn@bucket/dir", conn_id="my_conn")
+        children = list(p.iterdir())
+        assert len(children) == 2
+        # Each child path must use the same authenticated fs, not a fresh 
unauthenticated one
+        assert all(c.__wrapped__._fs_cached is fake_fs_with_conn for c in 
children)
+
+
 class TestRemotePath:
     def test_bucket_key_protocol(self):
         bucket = "bkt"

Reply via email to