bolkedebruin commented on code in PR #34729:
URL: https://github.com/apache/airflow/pull/34729#discussion_r1362615725


##########
airflow/io/__init__.py:
##########
@@ -0,0 +1,297 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Base FileIO classes for implementing reading and writing table files.
+
+The FileIO abstraction includes a subset of full filesystem implementations. 
Specifically,
+Iceberg needs to read or write a file at a given location (as a seekable 
stream), as well
+as check if a file exists. An implementation of the FileIO abstract base class 
is responsible
+for returning an InputFile instance, an OutputFile instance, and deleting a 
file given
+its location.
+
+Ported from Apache Iceberg.
+"""
+from __future__ import annotations
+
+import importlib
+import logging
+import warnings
+from abc import ABC, abstractmethod
+from io import SEEK_SET
+from types import TracebackType
+from typing import (
+    Dict,
+    Optional,
+    Protocol,
+    runtime_checkable,
+)
+from urllib.parse import urlparse
+
+log = logging.getLogger(__name__)
+
+HDFS_HOST = "hdfs.host"
+HDFS_PORT = "hdfs.port"
+HDFS_USER = "hdfs.user"
+HDFS_KERB_TICKET = "hdfs.kerberos_ticket"
+
+TOKEN = "token"
+
+Properties = Dict[str, Optional[str]]
+
+
+@runtime_checkable
+class InputStream(Protocol):
+    """A protocol for the file-like object returned by InputFile.open(...).
+
+    This outlines the minimally required methods for a seekable input stream 
returned from an InputFile
+    implementation's `open(...)` method. These methods are a subset of 
IOBase/RawIOBase.
+    """
+
+    @abstractmethod
+    def read(self, size: int = 0) -> bytes:
+        ...
+
+    @abstractmethod
+    def seek(self, offset: int, whence: int = SEEK_SET) -> int:
+        ...
+
+    @abstractmethod
+    def tell(self) -> int:
+        ...
+
+    @abstractmethod
+    def close(self) -> None:
+        ...
+
+    def __enter__(self) -> InputStream:
+        """Provide setup when opening an InputStream using a 'with' 
statement."""
+
+    @abstractmethod
+    def __exit__(
+        self,
+        exctype: type[BaseException] | None,
+        excinst: BaseException | None,
+        exctb: TracebackType | None,
+    ) -> None:
+        """Perform cleanup when exiting the scope of a 'with' statement."""
+
+
+@runtime_checkable
+class OutputStream(Protocol):  # pragma: no cover
+    """A protocol for the file-like object returned by OutputFile.create(...).
+
+    This outlines the minimally required methods for a writable output stream 
returned from an OutputFile
+    implementation's `create(...)` method. These methods are a subset of 
IOBase/RawIOBase.
+    """
+
+    @abstractmethod
+    def write(self, b: bytes) -> int:
+        ...
+
+    @abstractmethod
+    def close(self) -> None:
+        ...
+
+    @abstractmethod
+    def __enter__(self) -> OutputStream:
+        """Provide setup when opening an OutputStream using a 'with' 
statement."""
+
+    @abstractmethod
+    def __exit__(
+        self,
+        exctype: type[BaseException] | None,
+        excinst: BaseException | None,
+        exctb: TracebackType | None,
+    ) -> None:
+        """Perform cleanup when exiting the scope of a 'with' statement."""
+
+
+class InputFile(ABC):
+    """A base class for InputFile implementations.
+
+    Args:
+        location (str): A URI or a path to a local file.
+
+    Attributes:
+        location (str): The URI or path to a local file for an InputFile 
instance.
+        exists (bool): Whether the file exists or not.
+    """
+
+    def __init__(self, location: str):
+        self._location = location
+
+    @abstractmethod
+    def __len__(self) -> int:
+        """Return the total length of the file, in bytes."""
+
+    @property
+    def location(self) -> str:
+        """The fully-qualified location of the input file."""
+        return self._location
+
+    @abstractmethod
+    def exists(self) -> bool:
+        """Check whether the location exists.
+
+        Raises:
+            PermissionError: If the file at self.location cannot be accessed 
due to a permission error.
+        """
+
+    @abstractmethod
+    def open(self, seekable: bool = True) -> InputStream:
+        """Return an object that matches the InputStream protocol.
+
+        Args:
+            seekable: If the stream should support seek, or if it is consumed 
sequential.
+
+        Returns:
+            InputStream: An object that matches the InputStream protocol.
+
+        Raises:
+            PermissionError: If the file at self.location cannot be accessed 
due to a permission error.
+            FileNotFoundError: If the file at self.location does not exist.
+        """
+
+
+class OutputFile(ABC):
+    """A base class for OutputFile implementations.
+
+    Args:
+        location (str): A URI or a path to a local file.
+
+    Attributes:
+        location (str): The URI or path to a local file for an OutputFile 
instance.
+        exists (bool): Whether the file exists or not.
+    """
+
+    def __init__(self, location: str):
+        self._location = location
+
+    @abstractmethod
+    def __len__(self) -> int:
+        """Return the total length of the file, in bytes."""
+
+    @property
+    def location(self) -> str:
+        """The fully-qualified location of the output file."""
+        return self._location
+
+    @abstractmethod
+    def exists(self) -> bool:
+        """Check whether the location exists.
+
+        Raises:
+            PermissionError: If the file at self.location cannot be accessed 
due to a permission error.
+        """
+
+    @abstractmethod
+    def to_input_file(self) -> InputFile:
+        """Return an InputFile for the location of this output file."""
+
+    @abstractmethod
+    def create(self, overwrite: bool = False) -> OutputStream:
+        """Return an object that matches the OutputStream protocol.
+
+        Args:
+            overwrite (bool): If the file already exists at `self.location`
+                and `overwrite` is False a FileExistsError should be raised.
+
+        Returns:
+            OutputStream: An object that matches the OutputStream protocol.
+
+        Raises:
+            PermissionError: If the file at self.location cannot be accessed 
due to a permission error.
+            FileExistsError: If the file at self.location already exists and 
`overwrite=False`.
+        """
+
+
+class FileIO(ABC):
+    """A base class for FileIO implementations."""
+
+    @abstractmethod
+    def new_input(self, location: str, conn_id: str | None) -> InputFile:

Review Comment:
   `open` implies an action, like actually getting a file handle, which is not 
happening. `new_input` is a java-ism like `InputFile` and `InputStream` 
(remember where it originated from) and might not really be pythonic. 
   
   Besides, I think most of that is covered by ObjectStoragePath now. I need to 
think about that for a bit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to