jens-scheffler-bosch commented on code in PR #34729:
URL: https://github.com/apache/airflow/pull/34729#discussion_r1367761520


##########
airflow/io/store/__init__.py:
##########


Review Comment:
   What is the reason that we have a sub-package `store` below `airflow.io` if 
there are no other packages or large modules in `airflow.io` otherwise.
   Do you foresee Python modelling-wise that below `airflow.io` another larger 
ecosystem wil elvolve, thus packaging the AirflowStore in another sub-package 
from the start?
   I believe usage might be much leaner if the `airflow.io.store` modules are 
placed directly in `airflow.io`. Deep nested structures only make sense if we 
want to reserve space in the structure for large follow-up code blocks.
   But anyway, besides the interfaces and abstract base classes, big blocks of 
logic are anyway pushed to provider packages. So potential to make it leaner on 
this level.
   `airflow.io.store.ObjectStore` would get `airflow.io.ObjectStore`



##########
airflow/io/store/path.py:
##########
@@ -0,0 +1,666 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import contextlib
+import os
+import typing
+from io import UnsupportedOperation
+from stat import S_ISLNK
+
+from fsspec.utils import stringify_path
+
+from airflow.io.store import ObjectStore, attach
+
+
+def _rewrite_info(info: dict, store: ObjectStore) -> dict:
+    info["name"] = ObjectStoragePath(info["name"], store=store)
+    return info
+
+
+class ObjectStoragePath(os.PathLike):

Review Comment:
   I was also thinking a bit if `ObjectStorage` is the right term becaus 
etargets are that we are close to POSIX standards. ObjectStorage actually is 
one technical implementation of a storage system and we also use the same 
interface for local path references (Not limited to cloud storage == Object 
Storage).
   How about making it fully neutral and name it `Storage` only? Then the 
airflow specific Path could be `StoragePath`



##########
airflow/providers/common/io/provider.yaml:
##########


Review Comment:
   Cor consistency I'd propose to move the file system hook from 
`airflow/hooks/filesystem.py` into the provider package as well. (Of course 
leaving an alias/Stub in the original location for compatability).



##########
tests/system/providers/common/io/example_file_transfer_local_to_s3.py:
##########
@@ -0,0 +1,90 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import os
+from datetime import datetime
+
+from airflow import DAG
+from airflow.decorators import task
+from airflow.io.store.path import ObjectStoragePath
+from airflow.models.baseoperator import chain
+from airflow.providers.common.io.operators.file_transfer import 
FileTransferOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+DAG_ID = "example_file_transfer_local_to_s3"
+
+SAMPLE_TEXT = "This is some sample text."
+
+FILENAME = "sample-txt.txt"
+TEMP_FILE_PATH = ObjectStoragePath("file:///tmp") / FILENAME
+
+AWS_BUCKET_NAME = f"bucket-aws-{DAG_ID}-{ENV_ID}".replace("_", "-")
+AWS_BUCKET = ObjectStoragePath(f"s3://{AWS_BUCKET_NAME}")
+
+AWS_FILE_PATH = AWS_BUCKET / FILENAME
+
+
+@task
+def create_temp_file():
+    with TEMP_FILE_PATH.open("w") as file:
+        file.write(SAMPLE_TEXT)

Review Comment:
   Can you leverage this example to show how the `ObjectStoragePath` object is 
passed via XCom from one task to another? So that create returns it, XCom 
serializes it and the follow-up operations get the input (and do not rely on 
the global constants)



##########
airflow/io/store/path.py:
##########
@@ -0,0 +1,680 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import contextlib
+import os
+import shutil
+import typing
+from io import UnsupportedOperation
+from stat import S_ISLNK
+
+from fsspec.utils import stringify_path
+
+from airflow.io.store import ObjectStore, attach
+from airflow.io.store.stat import stat_result
+
+
+def _rewrite_info(info: dict, store: ObjectStore) -> dict:
+    info["name"] = ObjectStoragePath(info["name"], store=store)
+    return info
+
+
+class ObjectStoragePath(os.PathLike):
+    """A path-like object for object storage."""
+
+    __version__: typing.ClassVar[int] = 1
+
+    sep: typing.ClassVar[str] = "/"
+    root_marker: typing.ClassVar[str] = "/"
+
+    __slots__ = (
+        "_store",
+        "_bucket",
+        "_key",
+        "_conn_id",
+        "_protocol",
+        "_hash",
+    )
+
+    def __init__(self, path, conn_id: str | None = None, store: ObjectStore | 
None = None):
+        self._conn_id = conn_id
+        self._store = store
+
+        self._hash = None
+
+        self._protocol, self._bucket, self._key = self.split_path(path)
+
+        if store:
+            self._conn_id = store.conn_id
+            self._protocol = self._protocol if self._protocol else 
store.protocol
+        elif self._protocol:
+            self._store = attach(self._protocol, conn_id)
+
+    @classmethod
+    def split_path(cls, path) -> tuple[str, str, str]:
+        protocol = ""
+        key = ""
+
+        path = stringify_path(path)
+
+        i = path.find("://")
+        if i > 0:
+            protocol = path[:i]
+            path = path[i + 3 :]
+
+        if cls.sep not in path:
+            bucket = path
+        else:
+            bucket, key = path.split(cls.sep, 1)
+
+        # we don't care about versions etc
+        return protocol, bucket, key
+
+    def __fspath__(self):
+        return self.__str__()
+
+    def __repr__(self):
+        return f"<{type(self).__name__}('{self}')>"
+
+    def __str__(self):
+        path = (
+            f"{self._protocol}://{self._bucket}/{self._key}"
+            if self._protocol
+            else f"{self._bucket}/{self._key}"
+        )
+
+        return path
+
+    def __lt__(self, other):
+        if not isinstance(other, ObjectStoragePath):
+            return NotImplemented
+
+        return self._bucket < other._bucket
+
+    def __le__(self, other):
+        if not isinstance(other, ObjectStoragePath):
+            return NotImplemented
+
+        return self._bucket <= other._bucket
+
+    def __eq__(self, other):
+        if not isinstance(other, ObjectStoragePath):
+            return NotImplemented
+
+        return self._bucket == other._bucket
+
+    def __ne__(self, other):
+        if not isinstance(other, ObjectStoragePath):
+            return NotImplemented
+
+        return self._bucket != other._bucket
+
+    def __gt__(self, other):
+        if not isinstance(other, ObjectStoragePath):
+            return NotImplemented
+
+        return self._bucket > other._bucket
+
+    def __ge__(self, other):
+        if not isinstance(other, ObjectStoragePath):
+            return NotImplemented
+
+        return self._bucket >= other._bucket
+
+    def __hash__(self):
+        if not self._hash:
+            self._hash = hash(self._bucket)
+
+        return self._hash
+
+    def __truediv__(self, other) -> ObjectStoragePath:
+        o_protocol, o_bucket, o_key = self.split_path(other)
+        if not isinstance(other, str) and o_bucket and self._bucket != 
o_bucket:
+            raise ValueError("Cannot combine paths from different buckets / 
containers")
+
+        if o_protocol and self._protocol != o_protocol:
+            raise ValueError("Cannot combine paths from different protocols")
+
+        path = 
f"{stringify_path(self).rstrip(self.sep)}/{stringify_path(other).lstrip(self.sep)}"
+        return ObjectStoragePath(path, conn_id=self._conn_id)
+
+    def _unsupported(self, method_name):
+        msg = f"{type(self).__name__}.{method_name}() is unsupported"
+        raise UnsupportedOperation(msg)
+
+    def samestore(self, other):
+        return isinstance(other, ObjectStoragePath) and self._store == 
other._store
+
+    @property
+    def container(self) -> str:
+        return self._bucket
+
+    @property
+    def bucket(self) -> str:
+        return self._bucket
+
+    @property
+    def key(self) -> str:
+        return self._key
+
+    @property
+    def store(self) -> ObjectStore:
+        if not self._store:
+            raise ValueError("Cannot do operations. No store attached.")
+
+        return self._store
+
+    def stat(self, *, follow_symlinks=True):
+        """Return the result of the `stat()` call."""  # noqa: D402
+        stat = self.store.fs.stat(self)
+        stat.update(
+            {
+                "protocol": self.store.protocol,
+                "conn_id": self.store.conn_id,
+            }
+        )
+        return stat_result(stat)
+
+    def lstat(self):
+        """Like stat() except that it doesn't follow symlinks."""
+        return self.stat(follow_symlinks=False)
+
+    def exists(self):
+        """Whether this path exists."""
+        return self.store.fs.exists(self)
+
+    def is_dir(self):
+        """Return True if this path is directory like."""
+        return self.store.fs.isdir(self)
+
+    def is_file(self):
+        """Return True if this path is a regular file."""
+        return self.store.fs.isfile(self)
+
+    def is_mount(self):
+        return self._unsupported("is_mount")
+
+    def is_symlink(self):
+        """Whether this path is a symbolic link."""
+        try:
+            return S_ISLNK(self.lstat().st_mode)
+        except OSError:
+            # Path doesn't exist
+            return False
+        except ValueError:
+            # Non-encodable path
+            return False
+
+    def is_block_device(self):
+        self._unsupported("is_block_device")
+
+    def is_char_device(self):
+        self._unsupported("is_char_device")
+
+    def is_fifo(self):
+        self._unsupported("is_fifo")
+
+    def is_socket(self):
+        self._unsupported("is_socket")
+
+    def samefile(self, other_path):
+        """Return whether other_path is the same or not as this file."""
+        if other_path != ObjectStoragePath:
+            return False
+
+        st = self.stat()
+        other_st = other_path.stat()
+
+        return (
+            st["protocol"] == other_st["protocol"]
+            and st["conn_id"] == other_st["conn_id"]
+            and st["ino"] == other_st["ino"]
+        )
+
+    def checksum(self):
+        """Return the checksum of the file at this path."""
+        return self.store.fs.checksum(self)
+
+    def open(
+        self,
+        mode="rb",
+        block_size=None,
+        cache_options=None,
+        compression=None,
+        encoding=None,
+        errors=None,
+        newline=None,
+        **kwargs,
+    ):
+        """
+        Return a file-like object from the filesystem.
+
+        The resultant instance must function correctly in a context 'with' 
block.
+
+        :param mode: str like 'rb', 'w'
+                  See builtin 'open()'.
+        :param block_size: int
+                        Some indication of buffering - this is a value in 
bytes.
+        :param cache_options: dict, optional
+                           Extra arguments to pass through to the cache.
+        :param compression: string or None
+                        If given, open file using a compression codec. Can 
either be a compression
+                        name (a key in 'fsspec.compression.compr') or 'infer' 
to guess the
+                        compression from the filename suffix.
+        :param encoding: passed on to TextIOWrapper for text mode
+        :param errors: passed on to TextIOWrapper for text mode
+        :param newline: passed on to TextIOWrapper for text mode
+
+        kwargs: Additional keyword arguments to be passed on.
+        """
+        return self.store.fs.open(
+            str(self),
+            mode=mode,
+            block_size=block_size,
+            cache_options=cache_options,
+            compression=compression,
+            encoding=encoding,
+            errors=errors,
+            newline=newline,
+            **kwargs,
+        )
+
+    def read_bytes(self, start: int | None = None, end: int | None = None):
+        """Open the file in bytes mode, read it, and close the file."""
+        self.store.fs.read_bytes(str(self), start=start, end=end)
+
+    def read_text(self, encoding=None, errors=None, newline=None, **kwargs):
+        """Open the file in text mode, read it, and close the file."""
+        return self.store.fs.read_text(str(self), encoding=encoding, 
errors=errors, newline=newline, **kwargs)
+
+    def write_bytes(self, data, **kwargs):
+        """Open the file in bytes mode, write to it, and close the file."""
+        self.store.fs.pipe_file(self, value=data, **kwargs)
+
+    def write_text(self, data, encoding=None, errors=None, newline=None, 
**kwargs):
+        """Open the file in text mode, write to it, and close the file."""
+        return self.store.fs.write_text(
+            str(self), value=data, encoding=encoding, errors=errors, 
newline=newline, **kwargs
+        )
+
+    def iterdir(self):
+        """Iterate over the files in this directory."""
+        return self._unsupported("iterdir")
+
+    def _scandir(self):
+        # Emulate os.scandir(), which returns an object that can be used as a
+        # context manager.
+        return contextlib.nullcontext(self.iterdir())
+
+    def glob(self, pattern: str, maxdepth: int | None = None, **kwargs):
+        """
+        Find files by glob-matching.
+
+        If the path ends with '/', only folders are returned.
+
+        We support ``"**"``,
+        ``"?"`` and ``"[..]"``. We do not support ^ for pattern negation.
+
+        The `maxdepth` option is applied on the first `**` found in the path.
+
+        Search path names that contain embedded characters special to this
+        implementation of glob may not produce expected results;
+        e.g., 'foo/bar/*starredfilename*'.
+
+        :param pattern: str
+                       The glob pattern to match against.
+        :param maxdepth: int or None
+                         The maximum depth to search. If None, there is no 
depth limit.
+
+        kwargs: Additional keyword arguments to be passed on.
+        """
+        path = os.path.join(self._bucket, pattern)
+
+        detail = kwargs.get("detail", False)
+        items = self.store.fs.glob(path, maxdepth=maxdepth, **kwargs)
+        if detail:
+            t = {
+                ObjectStoragePath(k, store=self.store): _rewrite_info(v, 
self.store) for k, v in items.items()
+            }
+            return t
+        else:
+            return [ObjectStoragePath(c, store=self.store) for c in items]
+
+    def rglob(self, maxdepth: int | None = None, **kwargs):
+        self._unsupported("rglob")
+
+    def walk(self, maxdepth: int | None = None, topdown: bool = True, 
on_error: str = "omit", **kwargs):
+        """
+        Return all files belows path.
+
+        List all files, recursing into subdirectories; output is 
iterator-style,
+        like ``os.walk()``. For a simple list of files, ``find()`` is 
available.
+
+        When topdown is True, the caller can modify the dirnames list in-place 
(perhaps
+        using del or slice assignment), and walk() will
+        only recurse into the subdirectories whose names remain in dirnames;
+        this can be used to prune the search, impose a specific order of 
visiting,
+        or even to inform walk() about directories the caller creates or 
renames before
+        it resumes walk() again.
+        Modifying dirnames when topdown is False has no effect. (see os.walk)
+
+        Note that the "files" outputted will include anything that is not
+        a directory, such as links.
+
+        :param maxdepth: int or None
+                        Maximum recursion depth. None means limitless, but not 
recommended
+                        on link-based file-systems.
+        :param topdown: bool (True)
+                        Whether to walk the directory tree from the top 
downwards or from
+                        the bottom upwards.
+        :param on_error: "omit", "raise", a collable
+                        if omit (default), path with exception will simply be 
empty;
+                        If raise, an underlying exception will be raised;
+                        if callable, it will be called with a single OSError 
instance as argument
+        kwargs: Additional keyword arguments to be passed on.
+        """
+        detail = kwargs.get("detail", False)
+        items = self.store.fs.walk(str(self), maxdepth=maxdepth, 
topdown=topdown, on_error=on_error, **kwargs)
+        if not detail:
+            for path, dirs, files in items:
+                yield ObjectStoragePath(path, store=self.store), dirs, files
+        else:
+            for path, dirs, files in items:
+                yield (
+                    ObjectStoragePath(path, store=self.store),
+                    {k: _rewrite_info(v, self.store) for k, v in dirs.items()},
+                    {k: _rewrite_info(v, self.store) for k, v in 
files.items()},
+                )
+
+    def ls(self, detail: bool = True, **kwargs):
+        """
+        List files at path.
+
+        :param detail: bool
+                       If True, return a dict containing details about each 
entry, otherwise
+                       return a list of paths.
+
+        kwargs: Additional keyword arguments to be passed on.
+        """
+        items = self.store.fs.ls(str(self), detail=detail, **kwargs)
+
+        if detail:
+            return [_rewrite_info(c, self.store) for c in items]
+        else:
+            return [ObjectStoragePath(c, store=self.store) for c in items]
+
+    def absolute(self):
+        """Return an absolute version of this path. Resolving any aliases."""
+        path = f"{self.store.protocol}://{self._key}"
+        return path
+
+    def touch(self, truncate: bool = True):
+        """Create an empty file, or update the timestamp.
+
+        :param truncate: bool (True)
+                         If True, always set the file size to 0; if False, 
update the timestamp and
+                         leave the file unchanged, if the backend allows this.
+        """
+        return self.store.fs.touch(str(self), truncate=truncate)
+
+    def mkdir(self, create_parents: bool = True, **kwargs):
+        """
+        Create a directory entry at the specified path or within a 
bucket/container.
+
+        For systems that don't have true directories, it may create a 
directory entry
+        for this instance only and not affect the real filesystem.
+
+        :param create_parents: bool
+                              if True, this is equivalent to 'makedirs'.
+
+        kwargs: Additional keyword arguments, which may include permissions, 
etc.
+        """
+        return self.store.fs.mkdir(str(self), create_parents=create_parents, 
**kwargs)
+
+    def unlink(self, recursive: bool = False, maxdepth: int | None = None):
+        """
+        Remove this file or link.
+
+        If the path is a directory, use rmdir() instead.
+        """
+        self.store.fs.rm(str(self), recursive=recursive, maxdepth=maxdepth)
+
+    def rm(self, recursive: bool = False, maxdepth: int | None = None):
+        """
+        Remove this file or link.
+
+        Alias of unlink
+        """
+        self.unlink(recursive=recursive, maxdepth=maxdepth)
+
+    def rmdir(self):
+        """Remove this directory.  The directory must be empty."""
+        return self.store.fs.rmdir(str(self))
+
+    def rename(self, target: str | ObjectStoragePath, overwrite=False):
+        """
+        Rename this path to the target path.
+
+        The target path may be absolute or relative. Relative paths are
+        interpreted relative to the current working directory, *not* the
+        directory of the Path object.
+
+        Returns the new Path instance pointing to the target path.
+        """
+        if not overwrite:

Review Comment:
   Do we also need to sanity check that the renamed location is within the same 
storage path/backend? Else you need to implement and cater for the (heavy 
lifting) moving of content from X to Y as well. `Rename`would imply for me just 
a rename w/o moving content.
   In the `move()` implementation below this is explicitly handled.



##########
airflow/io/__init__.py:
##########
@@ -0,0 +1,90 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+from typing import (
+    TYPE_CHECKING,
+    Callable,
+)
+
+from fsspec.implementations.local import LocalFileSystem
+
+from airflow.compat.functools import cache
+from airflow.providers_manager import ProvidersManager
+from airflow.stats import Stats
+from airflow.utils.module_loading import import_string
+
+if TYPE_CHECKING:
+    from fsspec import AbstractFileSystem
+
+log = logging.getLogger(__name__)
+
+
+def _file(_: str | None) -> LocalFileSystem:
+    return LocalFileSystem()
+
+
+# builtin supported filesystems
+_BUILTIN_SCHEME_TO_FS: dict[str, Callable[[str | None], AbstractFileSystem]] = 
{
+    "file": _file,
+}
+
+
+@cache
+def _register_filesystems() -> dict[str, Callable[[str | None], 
AbstractFileSystem]]:

Review Comment:
   But all is lazy initialized, so the register process is only executed once 
(per process/component). For me it is okay w/o explicit filter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to