jens-scheffler-bosch commented on code in PR #34729:
URL: https://github.com/apache/airflow/pull/34729#discussion_r1362568182


##########
airflow/io/__init__.py:
##########
@@ -0,0 +1,297 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Base FileIO classes for implementing reading and writing table files.
+
+The FileIO abstraction includes a subset of full filesystem implementations. 
Specifically,
+Iceberg needs to read or write a file at a given location (as a seekable 
stream), as well
+as check if a file exists. An implementation of the FileIO abstract base class 
is responsible
+for returning an InputFile instance, an OutputFile instance, and deleting a 
file given
+its location.
+
+Ported from Apache Iceberg.
+"""
+from __future__ import annotations
+
+import importlib
+import logging
+import warnings
+from abc import ABC, abstractmethod
+from io import SEEK_SET
+from types import TracebackType
+from typing import (
+    Dict,
+    Optional,
+    Protocol,
+    runtime_checkable,
+)
+from urllib.parse import urlparse
+
+log = logging.getLogger(__name__)
+
+HDFS_HOST = "hdfs.host"
+HDFS_PORT = "hdfs.port"
+HDFS_USER = "hdfs.user"
+HDFS_KERB_TICKET = "hdfs.kerberos_ticket"
+
+TOKEN = "token"
+
+Properties = Dict[str, Optional[str]]
+
+
+@runtime_checkable
+class InputStream(Protocol):
+    """A protocol for the file-like object returned by InputFile.open(...).
+
+    This outlines the minimally required methods for a seekable input stream 
returned from an InputFile
+    implementation's `open(...)` method. These methods are a subset of 
IOBase/RawIOBase.
+    """
+
+    @abstractmethod
+    def read(self, size: int = 0) -> bytes:
+        ...
+
+    @abstractmethod
+    def seek(self, offset: int, whence: int = SEEK_SET) -> int:
+        ...
+
+    @abstractmethod
+    def tell(self) -> int:
+        ...
+
+    @abstractmethod
+    def close(self) -> None:
+        ...
+
+    def __enter__(self) -> InputStream:
+        """Provide setup when opening an InputStream using a 'with' 
statement."""
+
+    @abstractmethod
+    def __exit__(
+        self,
+        exctype: type[BaseException] | None,
+        excinst: BaseException | None,
+        exctb: TracebackType | None,
+    ) -> None:
+        """Perform cleanup when exiting the scope of a 'with' statement."""
+
+
+@runtime_checkable
+class OutputStream(Protocol):  # pragma: no cover
+    """A protocol for the file-like object returned by OutputFile.create(...).
+
+    This outlines the minimally required methods for a writable output stream 
returned from an OutputFile
+    implementation's `create(...)` method. These methods are a subset of 
IOBase/RawIOBase.
+    """
+
+    @abstractmethod
+    def write(self, b: bytes) -> int:
+        ...
+
+    @abstractmethod
+    def close(self) -> None:
+        ...
+
+    @abstractmethod
+    def __enter__(self) -> OutputStream:
+        """Provide setup when opening an OutputStream using a 'with' 
statement."""
+
+    @abstractmethod
+    def __exit__(
+        self,
+        exctype: type[BaseException] | None,
+        excinst: BaseException | None,
+        exctb: TracebackType | None,
+    ) -> None:
+        """Perform cleanup when exiting the scope of a 'with' statement."""
+
+
+class InputFile(ABC):
+    """A base class for InputFile implementations.
+
+    Args:
+        location (str): A URI or a path to a local file.
+
+    Attributes:
+        location (str): The URI or path to a local file for an InputFile 
instance.
+        exists (bool): Whether the file exists or not.
+    """
+
+    def __init__(self, location: str):
+        self._location = location
+
+    @abstractmethod
+    def __len__(self) -> int:
+        """Return the total length of the file, in bytes."""
+
+    @property
+    def location(self) -> str:
+        """The fully-qualified location of the input file."""
+        return self._location
+
+    @abstractmethod
+    def exists(self) -> bool:
+        """Check whether the location exists.
+
+        Raises:
+            PermissionError: If the file at self.location cannot be accessed 
due to a permission error.
+        """
+
+    @abstractmethod
+    def open(self, seekable: bool = True) -> InputStream:
+        """Return an object that matches the InputStream protocol.
+
+        Args:
+            seekable: If the stream should support seek, or if it is consumed 
sequential.
+
+        Returns:
+            InputStream: An object that matches the InputStream protocol.
+
+        Raises:
+            PermissionError: If the file at self.location cannot be accessed 
due to a permission error.
+            FileNotFoundError: If the file at self.location does not exist.
+        """
+
+
+class OutputFile(ABC):
+    """A base class for OutputFile implementations.
+
+    Args:
+        location (str): A URI or a path to a local file.
+
+    Attributes:
+        location (str): The URI or path to a local file for an OutputFile 
instance.
+        exists (bool): Whether the file exists or not.
+    """
+
+    def __init__(self, location: str):
+        self._location = location
+
+    @abstractmethod
+    def __len__(self) -> int:
+        """Return the total length of the file, in bytes."""
+
+    @property
+    def location(self) -> str:
+        """The fully-qualified location of the output file."""
+        return self._location
+
+    @abstractmethod
+    def exists(self) -> bool:
+        """Check whether the location exists.
+
+        Raises:
+            PermissionError: If the file at self.location cannot be accessed 
due to a permission error.
+        """
+
+    @abstractmethod
+    def to_input_file(self) -> InputFile:
+        """Return an InputFile for the location of this output file."""
+
+    @abstractmethod
+    def create(self, overwrite: bool = False) -> OutputStream:
+        """Return an object that matches the OutputStream protocol.
+
+        Args:
+            overwrite (bool): If the file already exists at `self.location`
+                and `overwrite` is False a FileExistsError should be raised.
+
+        Returns:
+            OutputStream: An object that matches the OutputStream protocol.
+
+        Raises:
+            PermissionError: If the file at self.location cannot be accessed 
due to a permission error.
+            FileExistsError: If the file at self.location already exists and 
`overwrite=False`.
+        """
+
+
+class FileIO(ABC):
+    """A base class for FileIO implementations."""
+
+    @abstractmethod
+    def new_input(self, location: str, conn_id: str | None) -> InputFile:

Review Comment:
   Coding question... does it make sense to instantiate the `FileIO` class just 
to open `new_input()`? Shall this be rather a static method or a module level 
entry point function?



##########
airflow/io/__init__.py:
##########
@@ -0,0 +1,297 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Base FileIO classes for implementing reading and writing table files.
+
+The FileIO abstraction includes a subset of full filesystem implementations. 
Specifically,
+Iceberg needs to read or write a file at a given location (as a seekable 
stream), as well
+as check if a file exists. An implementation of the FileIO abstract base class 
is responsible
+for returning an InputFile instance, an OutputFile instance, and deleting a 
file given
+its location.
+
+Ported from Apache Iceberg.
+"""
+from __future__ import annotations
+
+import importlib
+import logging
+import warnings
+from abc import ABC, abstractmethod
+from io import SEEK_SET
+from types import TracebackType
+from typing import (
+    Dict,
+    Optional,
+    Protocol,
+    runtime_checkable,
+)
+from urllib.parse import urlparse
+
+log = logging.getLogger(__name__)
+
+HDFS_HOST = "hdfs.host"
+HDFS_PORT = "hdfs.port"
+HDFS_USER = "hdfs.user"
+HDFS_KERB_TICKET = "hdfs.kerberos_ticket"
+
+TOKEN = "token"
+
+Properties = Dict[str, Optional[str]]
+
+
+@runtime_checkable
+class InputStream(Protocol):
+    """A protocol for the file-like object returned by InputFile.open(...).
+
+    This outlines the minimally required methods for a seekable input stream 
returned from an InputFile
+    implementation's `open(...)` method. These methods are a subset of 
IOBase/RawIOBase.
+    """
+
+    @abstractmethod
+    def read(self, size: int = 0) -> bytes:
+        ...
+
+    @abstractmethod
+    def seek(self, offset: int, whence: int = SEEK_SET) -> int:
+        ...
+
+    @abstractmethod
+    def tell(self) -> int:
+        ...
+
+    @abstractmethod
+    def close(self) -> None:
+        ...
+
+    def __enter__(self) -> InputStream:
+        """Provide setup when opening an InputStream using a 'with' 
statement."""
+
+    @abstractmethod
+    def __exit__(
+        self,
+        exctype: type[BaseException] | None,
+        excinst: BaseException | None,
+        exctb: TracebackType | None,
+    ) -> None:
+        """Perform cleanup when exiting the scope of a 'with' statement."""
+
+
+@runtime_checkable
+class OutputStream(Protocol):  # pragma: no cover
+    """A protocol for the file-like object returned by OutputFile.create(...).
+
+    This outlines the minimally required methods for a writable output stream 
returned from an OutputFile
+    implementation's `create(...)` method. These methods are a subset of 
IOBase/RawIOBase.
+    """
+
+    @abstractmethod
+    def write(self, b: bytes) -> int:
+        ...
+
+    @abstractmethod
+    def close(self) -> None:
+        ...
+
+    @abstractmethod
+    def __enter__(self) -> OutputStream:
+        """Provide setup when opening an OutputStream using a 'with' 
statement."""
+
+    @abstractmethod
+    def __exit__(
+        self,
+        exctype: type[BaseException] | None,
+        excinst: BaseException | None,
+        exctb: TracebackType | None,
+    ) -> None:
+        """Perform cleanup when exiting the scope of a 'with' statement."""
+
+
+class InputFile(ABC):
+    """A base class for InputFile implementations.
+
+    Args:
+        location (str): A URI or a path to a local file.
+
+    Attributes:
+        location (str): The URI or path to a local file for an InputFile 
instance.
+        exists (bool): Whether the file exists or not.
+    """
+
+    def __init__(self, location: str):
+        self._location = location
+
+    @abstractmethod
+    def __len__(self) -> int:
+        """Return the total length of the file, in bytes."""
+
+    @property
+    def location(self) -> str:
+        """The fully-qualified location of the input file."""
+        return self._location
+
+    @abstractmethod
+    def exists(self) -> bool:
+        """Check whether the location exists.
+
+        Raises:
+            PermissionError: If the file at self.location cannot be accessed 
due to a permission error.
+        """
+
+    @abstractmethod
+    def open(self, seekable: bool = True) -> InputStream:
+        """Return an object that matches the InputStream protocol.
+
+        Args:
+            seekable: If the stream should support seek, or if it is consumed 
sequential.
+
+        Returns:
+            InputStream: An object that matches the InputStream protocol.
+
+        Raises:
+            PermissionError: If the file at self.location cannot be accessed 
due to a permission error.
+            FileNotFoundError: If the file at self.location does not exist.
+        """
+
+
+class OutputFile(ABC):
+    """A base class for OutputFile implementations.
+
+    Args:
+        location (str): A URI or a path to a local file.
+
+    Attributes:
+        location (str): The URI or path to a local file for an OutputFile 
instance.
+        exists (bool): Whether the file exists or not.
+    """
+
+    def __init__(self, location: str):
+        self._location = location
+
+    @abstractmethod
+    def __len__(self) -> int:
+        """Return the total length of the file, in bytes."""
+
+    @property
+    def location(self) -> str:
+        """The fully-qualified location of the output file."""
+        return self._location
+
+    @abstractmethod
+    def exists(self) -> bool:
+        """Check whether the location exists.
+
+        Raises:
+            PermissionError: If the file at self.location cannot be accessed 
due to a permission error.
+        """
+
+    @abstractmethod
+    def to_input_file(self) -> InputFile:
+        """Return an InputFile for the location of this output file."""
+
+    @abstractmethod
+    def create(self, overwrite: bool = False) -> OutputStream:
+        """Return an object that matches the OutputStream protocol.
+
+        Args:
+            overwrite (bool): If the file already exists at `self.location`
+                and `overwrite` is False a FileExistsError should be raised.
+
+        Returns:
+            OutputStream: An object that matches the OutputStream protocol.
+
+        Raises:
+            PermissionError: If the file at self.location cannot be accessed 
due to a permission error.
+            FileExistsError: If the file at self.location already exists and 
`overwrite=False`.
+        """
+
+
+class FileIO(ABC):
+    """A base class for FileIO implementations."""
+
+    @abstractmethod
+    def new_input(self, location: str, conn_id: str | None) -> InputFile:
+        """Get an InputFile instance to read bytes from the file at the given 
location.
+
+        Args:
+            location (str): A URI or a path to a local file.
+            conn_id (str): The connection ID to use when getting the file.
+        """
+
+    @abstractmethod
+    def new_output(self, location: str, conn_id: str | None) -> OutputFile:
+        """Get an OutputFile instance to write bytes to the file at the given 
location.
+
+        Args:
+            location (str): A URI or a path to a local file.
+            conn_id (str): The connection ID to use when getting the file.
+        """
+
+    @abstractmethod
+    def delete(self, location: str | InputFile | OutputFile, conn_id: str | 
None) -> None:
+        """Delete the file at the given path.
+
+        Args:
+            location (Union[str, InputFile, OutputFile]): A URI or a path to a 
local file--if an InputFile
+                instance or an OutputFile instance is provided, the location 
attribute for that instance is
+                used as the URI to delete.
+            conn_id (str): The connection ID to use when deleting the file.
+
+        Raises:
+            PermissionError: If the file at location cannot be accessed due to 
a permission error.
+            FileNotFoundError: When the file at the provided location does not 
exist.
+        """
+
+
+FSSPEC_FILE_IO = "airflow.io.fsspec.FsspecFileIO"
+
+# Mappings from the Java FileIO impl to a Python one. The list is ordered by 
preference.
+# If an implementation isn't installed, it will fall back to the next one.
+SCHEMA_TO_FILE_IO: dict[str, list[str]] = {
+    "s3": [FSSPEC_FILE_IO],
+    "s3a": [FSSPEC_FILE_IO],
+    "s3n": [FSSPEC_FILE_IO],
+    "gs": [FSSPEC_FILE_IO],
+    "file": [FSSPEC_FILE_IO],
+    # "hdfs": [ARROW_FILE_IO],
+    "abfs": [FSSPEC_FILE_IO],
+    "abfss": [FSSPEC_FILE_IO],
+}
+
+
+def _import_file_io(io_impl: str) -> FileIO | None:
+    try:
+        path_parts = io_impl.split(".")
+        if len(path_parts) < 2:
+            raise ValueError(f"py-io-impl should be full path 
(module.CustomFileIO), got: {io_impl}")
+        module_name, class_name = ".".join(path_parts[:-1]), path_parts[-1]
+        module = importlib.import_module(module_name)
+        class_ = getattr(module, class_name)
+        return class_()
+    except ModuleNotFoundError:
+        log.warning("Could not initialize FileIO: %s", io_impl)
+        return None
+
+
+def _infer_file_io_from_scheme(path: str) -> FileIO | None:

Review Comment:
   This private method seems to be un-used or I am too stupid finding where it 
is used. :-D



##########
airflow/io/__init__.py:
##########
@@ -0,0 +1,297 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Base FileIO classes for implementing reading and writing table files.
+
+The FileIO abstraction includes a subset of full filesystem implementations. 
Specifically,
+Iceberg needs to read or write a file at a given location (as a seekable 
stream), as well
+as check if a file exists. An implementation of the FileIO abstract base class 
is responsible
+for returning an InputFile instance, an OutputFile instance, and deleting a 
file given
+its location.
+
+Ported from Apache Iceberg.
+"""
+from __future__ import annotations
+
+import importlib
+import logging
+import warnings
+from abc import ABC, abstractmethod
+from io import SEEK_SET
+from types import TracebackType
+from typing import (
+    Dict,
+    Optional,
+    Protocol,
+    runtime_checkable,
+)
+from urllib.parse import urlparse
+
+log = logging.getLogger(__name__)
+
+HDFS_HOST = "hdfs.host"
+HDFS_PORT = "hdfs.port"
+HDFS_USER = "hdfs.user"
+HDFS_KERB_TICKET = "hdfs.kerberos_ticket"
+
+TOKEN = "token"
+
+Properties = Dict[str, Optional[str]]
+
+
+@runtime_checkable
+class InputStream(Protocol):
+    """A protocol for the file-like object returned by InputFile.open(...).
+
+    This outlines the minimally required methods for a seekable input stream 
returned from an InputFile
+    implementation's `open(...)` method. These methods are a subset of 
IOBase/RawIOBase.
+    """
+
+    @abstractmethod
+    def read(self, size: int = 0) -> bytes:
+        ...
+
+    @abstractmethod
+    def seek(self, offset: int, whence: int = SEEK_SET) -> int:
+        ...
+
+    @abstractmethod
+    def tell(self) -> int:
+        ...
+
+    @abstractmethod
+    def close(self) -> None:
+        ...
+
+    def __enter__(self) -> InputStream:
+        """Provide setup when opening an InputStream using a 'with' 
statement."""
+
+    @abstractmethod
+    def __exit__(
+        self,
+        exctype: type[BaseException] | None,
+        excinst: BaseException | None,
+        exctb: TracebackType | None,
+    ) -> None:
+        """Perform cleanup when exiting the scope of a 'with' statement."""
+
+
+@runtime_checkable
+class OutputStream(Protocol):  # pragma: no cover
+    """A protocol for the file-like object returned by OutputFile.create(...).
+
+    This outlines the minimally required methods for a writable output stream 
returned from an OutputFile
+    implementation's `create(...)` method. These methods are a subset of 
IOBase/RawIOBase.
+    """
+
+    @abstractmethod
+    def write(self, b: bytes) -> int:
+        ...
+
+    @abstractmethod
+    def close(self) -> None:
+        ...
+
+    @abstractmethod
+    def __enter__(self) -> OutputStream:
+        """Provide setup when opening an OutputStream using a 'with' 
statement."""
+
+    @abstractmethod
+    def __exit__(
+        self,
+        exctype: type[BaseException] | None,
+        excinst: BaseException | None,
+        exctb: TracebackType | None,
+    ) -> None:
+        """Perform cleanup when exiting the scope of a 'with' statement."""
+
+
+class InputFile(ABC):
+    """A base class for InputFile implementations.
+
+    Args:
+        location (str): A URI or a path to a local file.
+
+    Attributes:
+        location (str): The URI or path to a local file for an InputFile 
instance.
+        exists (bool): Whether the file exists or not.
+    """
+
+    def __init__(self, location: str):
+        self._location = location
+
+    @abstractmethod
+    def __len__(self) -> int:
+        """Return the total length of the file, in bytes."""
+
+    @property
+    def location(self) -> str:
+        """The fully-qualified location of the input file."""
+        return self._location
+
+    @abstractmethod
+    def exists(self) -> bool:
+        """Check whether the location exists.
+
+        Raises:
+            PermissionError: If the file at self.location cannot be accessed 
due to a permission error.
+        """
+
+    @abstractmethod
+    def open(self, seekable: bool = True) -> InputStream:
+        """Return an object that matches the InputStream protocol.
+
+        Args:
+            seekable: If the stream should support seek, or if it is consumed 
sequential.
+
+        Returns:
+            InputStream: An object that matches the InputStream protocol.
+
+        Raises:
+            PermissionError: If the file at self.location cannot be accessed 
due to a permission error.
+            FileNotFoundError: If the file at self.location does not exist.
+        """
+
+
+class OutputFile(ABC):

Review Comment:
   I like the (basic) model but am not certain if it makes sense to separate 
the file / path like objects for reading and writing. Of course if I want to 
get an `InputStream` or `OutputStream` I need to have different IO classes but 
abstract wise the `File` Object is mainly the same irrespective if I want to 
open for reading or writing later?
   Not sure about other opinions of if there is a functional dependency or use 
case but I'd rather merge `InputFile` and `OutputFile` together.



##########
airflow/io/store/__init__.py:
##########
@@ -0,0 +1,125 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, ClassVar
+
+from airflow.io.fsspec import SCHEME_TO_FS
+from airflow.utils.module_loading import import_string, qualname
+
+if TYPE_CHECKING:
+    from fsspec import AbstractFileSystem
+
+
+class Store:

Review Comment:
   Is this also an ABC missing the base class reference? Assuming there is an 
`ObjectStore` providing concrete implementations?



##########
airflow/io/fsspec.py:
##########
@@ -0,0 +1,237 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""FileIO implementation for reading and writing files that uses fsspec 
compatible filesystems."""
+from __future__ import annotations
+
+import errno
+import logging
+import os
+from functools import lru_cache
+from typing import (
+    TYPE_CHECKING,
+    Callable,
+)
+from urllib.parse import urlparse
+
+from fsspec.implementations.local import LocalFileSystem
+
+from airflow.io import (
+    FileIO,
+    InputFile,
+    InputStream,
+    OutputFile,
+    OutputStream,
+)
+from airflow.providers_manager import ProvidersManager
+from airflow.stats import Stats
+from airflow.utils.module_loading import import_string
+
+if TYPE_CHECKING:
+    from fsspec import AbstractFileSystem
+
+log = logging.getLogger(__name__)
+
+
+def _file(_: str | None) -> LocalFileSystem:
+    return LocalFileSystem()
+
+
+# always support local file systems
+SCHEME_TO_FS: dict[str, Callable] = {
+    "file": _file,
+}
+
+
+def _register_schemes() -> None:
+    with Stats.timer("airflow.io.load_filesystems") as timer:
+        manager = ProvidersManager()
+        for fs_module_name in manager.filesystem_module_names:
+            fs_module = import_string(fs_module_name)
+            for scheme in getattr(fs_module, "schemes", []):
+                if scheme in SCHEME_TO_FS:
+                    log.warning("Overriding scheme %s for %s", scheme, 
fs_module_name)
+
+                method = getattr(fs_module, "get_fs", None)
+                if method is None:
+                    raise ImportError(f"Filesystem {fs_module_name} does not 
have a get_fs method")
+                SCHEME_TO_FS[scheme] = method
+
+    log.debug("loading filesystems from providers took %.3f seconds", 
timer.duration)
+
+
+_register_schemes()
+
+
+class FsspecInputFile(InputFile):
+    """An input file implementation for the FsspecFileIO.
+
+    Args:
+        location (str): A URI to a file location.
+        fs (AbstractFileSystem): An fsspec filesystem instance.
+    """
+
+    def __init__(self, location: str, fs: AbstractFileSystem):
+        self._fs = fs
+        super().__init__(location=location)
+
+    def __len__(self) -> int:
+        """Return the total length of the file, in bytes."""
+        object_info = self._fs.info(self.location)
+        if size := object_info.get("Size"):
+            return size
+        elif size := object_info.get("size"):
+            return size
+        raise RuntimeError(f"Cannot retrieve object info: {self.location}")
+
+    def exists(self) -> bool:
+        """Check whether the location exists."""
+        return self._fs.lexists(self.location)
+
+    def open(self, seekable: bool = True) -> InputStream:
+        """Create an input stream for reading the contents of the file.
+
+        Args:
+            seekable: If the stream should support seek, or if it is consumed 
sequential.
+
+        Returns:
+            OpenFile: An fsspec compliant file-like object.
+
+        Raises:
+            FileNotFoundError: If the file does not exist.
+        """
+        try:
+            return self._fs.open(self.location, "rb")
+        except FileNotFoundError as e:
+            # To have a consistent error handling experience,
+            # make sure exception contains missing file location.
+            raise e if e.filename else FileNotFoundError(
+                errno.ENOENT, os.strerror(errno.ENOENT), self.location
+            ) from e
+
+
+class FsspecOutputFile(OutputFile):
+    """An output file implementation for the FsspecFileIO.
+
+    Args:
+        location (str): A URI to a file location.
+        fs (AbstractFileSystem): An fsspec filesystem instance.
+    """
+
+    def __init__(self, location: str, fs: AbstractFileSystem):
+        self._fs = fs
+        super().__init__(location=location)
+
+    def __len__(self) -> int:
+        """Return the total length of the file, in bytes."""
+        object_info = self._fs.info(self.location)
+        if size := object_info.get("Size"):
+            return size
+        elif size := object_info.get("size"):
+            return size
+        raise RuntimeError(f"Cannot retrieve object info: {self.location}")
+
+    def exists(self) -> bool:
+        """Check whether the location exists."""
+        return self._fs.lexists(self.location)
+
+    def create(self, overwrite: bool = False) -> OutputStream:
+        """Create an output stream for reading the contents of the file.
+
+        Args:
+            overwrite (bool): Whether to overwrite the file if it already 
exists.
+
+        Returns:
+            OpenFile: An fsspec compliant file-like object.
+
+        Raises:
+            FileExistsError: If the file already exists at the location and 
overwrite is set to False.
+
+        Note:
+            If overwrite is set to False, a check is first performed to verify 
that the file does not exist.
+            This is not thread-safe and a possibility does exist that the file 
can be created by a concurrent
+            process after the existence check yet before the output stream is 
created. In such a case, the
+            default behavior will truncate the contents of the existing file 
when opening the output stream.
+        """
+        if not overwrite and self.exists():
+            raise FileExistsError(f"Cannot create file, file already exists: 
{self.location}")
+        return self._fs.open(self.location, "wb")
+
+    def to_input_file(self) -> FsspecInputFile:
+        """Return a new FsspecInputFile for the location at `self.location`."""
+        return FsspecInputFile(location=self.location, fs=self._fs)
+
+
+class FsspecFileIO(FileIO):

Review Comment:
   I do not fully understand the separation between the module level `FileIO` 
abstract base class and this implementation. Why is this separated? (Maybe I 
don't understand this pattern. I'd propose to push this up onto module level)



##########
airflow/io/__init__.py:
##########
@@ -0,0 +1,297 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Base FileIO classes for implementing reading and writing table files.
+
+The FileIO abstraction includes a subset of full filesystem implementations. 
Specifically,
+Iceberg needs to read or write a file at a given location (as a seekable 
stream), as well
+as check if a file exists. An implementation of the FileIO abstract base class 
is responsible
+for returning an InputFile instance, an OutputFile instance, and deleting a 
file given
+its location.
+
+Ported from Apache Iceberg.
+"""
+from __future__ import annotations
+
+import importlib
+import logging
+import warnings
+from abc import ABC, abstractmethod
+from io import SEEK_SET
+from types import TracebackType
+from typing import (
+    Dict,
+    Optional,
+    Protocol,
+    runtime_checkable,
+)
+from urllib.parse import urlparse
+
+log = logging.getLogger(__name__)
+
+HDFS_HOST = "hdfs.host"
+HDFS_PORT = "hdfs.port"
+HDFS_USER = "hdfs.user"
+HDFS_KERB_TICKET = "hdfs.kerberos_ticket"
+
+TOKEN = "token"
+
+Properties = Dict[str, Optional[str]]
+
+
+@runtime_checkable
+class InputStream(Protocol):
+    """A protocol for the file-like object returned by InputFile.open(...).
+
+    This outlines the minimally required methods for a seekable input stream 
returned from an InputFile
+    implementation's `open(...)` method. These methods are a subset of 
IOBase/RawIOBase.
+    """
+
+    @abstractmethod
+    def read(self, size: int = 0) -> bytes:
+        ...
+
+    @abstractmethod
+    def seek(self, offset: int, whence: int = SEEK_SET) -> int:
+        ...
+
+    @abstractmethod
+    def tell(self) -> int:
+        ...
+
+    @abstractmethod
+    def close(self) -> None:
+        ...
+
+    def __enter__(self) -> InputStream:
+        """Provide setup when opening an InputStream using a 'with' 
statement."""
+
+    @abstractmethod
+    def __exit__(
+        self,
+        exctype: type[BaseException] | None,
+        excinst: BaseException | None,
+        exctb: TracebackType | None,
+    ) -> None:
+        """Perform cleanup when exiting the scope of a 'with' statement."""
+
+
+@runtime_checkable
+class OutputStream(Protocol):  # pragma: no cover
+    """A protocol for the file-like object returned by OutputFile.create(...).
+
+    This outlines the minimally required methods for a writable output stream 
returned from an OutputFile
+    implementation's `create(...)` method. These methods are a subset of 
IOBase/RawIOBase.
+    """
+
+    @abstractmethod
+    def write(self, b: bytes) -> int:
+        ...
+
+    @abstractmethod
+    def close(self) -> None:
+        ...
+
+    @abstractmethod
+    def __enter__(self) -> OutputStream:
+        """Provide setup when opening an OutputStream using a 'with' 
statement."""
+
+    @abstractmethod
+    def __exit__(
+        self,
+        exctype: type[BaseException] | None,
+        excinst: BaseException | None,
+        exctb: TracebackType | None,
+    ) -> None:
+        """Perform cleanup when exiting the scope of a 'with' statement."""
+
+
+class InputFile(ABC):
+    """A base class for InputFile implementations.
+
+    Args:
+        location (str): A URI or a path to a local file.
+
+    Attributes:
+        location (str): The URI or path to a local file for an InputFile 
instance.
+        exists (bool): Whether the file exists or not.
+    """
+
+    def __init__(self, location: str):
+        self._location = location
+
+    @abstractmethod
+    def __len__(self) -> int:
+        """Return the total length of the file, in bytes."""
+
+    @property
+    def location(self) -> str:
+        """The fully-qualified location of the input file."""
+        return self._location
+
+    @abstractmethod
+    def exists(self) -> bool:
+        """Check whether the location exists.
+
+        Raises:
+            PermissionError: If the file at self.location cannot be accessed 
due to a permission error.
+        """
+
+    @abstractmethod
+    def open(self, seekable: bool = True) -> InputStream:
+        """Return an object that matches the InputStream protocol.
+
+        Args:
+            seekable: If the stream should support seek, or if it is consumed 
sequential.
+
+        Returns:
+            InputStream: An object that matches the InputStream protocol.
+
+        Raises:
+            PermissionError: If the file at self.location cannot be accessed 
due to a permission error.
+            FileNotFoundError: If the file at self.location does not exist.
+        """
+
+
+class OutputFile(ABC):
+    """A base class for OutputFile implementations.
+
+    Args:
+        location (str): A URI or a path to a local file.
+
+    Attributes:
+        location (str): The URI or path to a local file for an OutputFile 
instance.
+        exists (bool): Whether the file exists or not.
+    """
+
+    def __init__(self, location: str):
+        self._location = location
+
+    @abstractmethod
+    def __len__(self) -> int:
+        """Return the total length of the file, in bytes."""
+
+    @property
+    def location(self) -> str:
+        """The fully-qualified location of the output file."""
+        return self._location
+
+    @abstractmethod
+    def exists(self) -> bool:
+        """Check whether the location exists.
+
+        Raises:
+            PermissionError: If the file at self.location cannot be accessed 
due to a permission error.
+        """
+
+    @abstractmethod
+    def to_input_file(self) -> InputFile:
+        """Return an InputFile for the location of this output file."""
+
+    @abstractmethod
+    def create(self, overwrite: bool = False) -> OutputStream:
+        """Return an object that matches the OutputStream protocol.
+
+        Args:
+            overwrite (bool): If the file already exists at `self.location`
+                and `overwrite` is False a FileExistsError should be raised.
+
+        Returns:
+            OutputStream: An object that matches the OutputStream protocol.
+
+        Raises:
+            PermissionError: If the file at self.location cannot be accessed 
due to a permission error.
+            FileExistsError: If the file at self.location already exists and 
`overwrite=False`.
+        """
+
+
+class FileIO(ABC):
+    """A base class for FileIO implementations."""
+
+    @abstractmethod
+    def new_input(self, location: str, conn_id: str | None) -> InputFile:

Review Comment:
   And another, this time naming: I feel like the name `new_input()` is not 
correct. I would assume the file exists before, then it is not "new".
   I would rather propose to name it `open()` - but this overlaps with the 
`FileInput` class.
   
   Thinking out (a bit) loud... what shall be the difference between `FileIO` 
and `FileInput`. I assume `FileIO` is rather an entry point to "resolve" files 
and locations to get "File" handles or objects to operate with?
   Instead of splitting "input" and "output" methods here, is it a kind of 
`resolve()` to a file like object (PathLike) on which you would call open (for 
read/write) later?



##########
airflow/io/store/util.py:
##########
@@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from airflow.io.store.path import ObjectStoragePath
+
+
+def cp(
+    path1: str | ObjectStoragePath,
+    path2: str | ObjectStoragePath,
+    recursive: bool = False,
+    maxdepth: int = None,
+    on_error: str = None,
+    **kwargs,
+):
+    """Copy between two locations.
+    on_error : "raise", "ignore"
+        If raise, any not-found exceptions will be raised; if ignore any
+        not-found exceptions will cause the path to be skipped; defaults to
+        raise unless recursive is true, where the default is ignore.
+    """
+    if isinstance(path1, str):
+        path1 = ObjectStoragePath(path1)
+
+    if isinstance(path2, str):
+        path2 = ObjectStoragePath(path2)
+
+    if path1.samefile(path2):
+        path1._store.fs.copy(
+            str(path1),
+            str(path2),
+            recursive=recursive,
+            maxdepth=maxdepth,
+            on_error=on_error,
+            **kwargs,
+        )
+    else:
+        # non-local copy
+        with path1.open("rb") as f1, path2.open("wb") as f2:
+            f2.write(f1.read())
+
+
+def mv(
+    path1: str | ObjectStoragePath,
+    path2: str | ObjectStoragePath,
+    recursive=False,
+    maxdepth=None,
+    **kwargs,
+):
+    """Move between two locations."""
+    if isinstance(path1, str):
+        path1 = ObjectStoragePath(path1)
+
+    if isinstance(path2, str):
+        path2 = ObjectStoragePath(path2)
+
+    if path1.samestore(path2):
+        path1._store.fs.move(str(path1), str(path2), recursive=recursive, 
maxdepth=maxdepth, **kwargs)
+    else:
+        cp(path1, path2, recursive=recursive, maxdepth=maxdepth, **kwargs)
+        path1.unlink(recursive=recursive, maxdepth=maxdepth)
+
+
+copy = cp

Review Comment:
   Do we need an `rm` as well? :-D



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to