This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e99c19992 Python: Create HadoopFileSystem from netloc (merge request 
!1060) (#8596)
4e99c19992 is described below

commit 4e99c199927e0f79c60489e77a49eb00fe139205
Author: frankliee <[email protected]>
AuthorDate: Fri Sep 22 19:25:38 2023 +0800

    Python: Create HadoopFileSystem from netloc (merge request !1060) (#8596)
---
 python/pyiceberg/io/pyarrow.py  | 36 +++++++++++++++++++-----------------
 python/tests/io/test_pyarrow.py | 27 +++++++++++++--------------
 2 files changed, 32 insertions(+), 31 deletions(-)

diff --git a/python/pyiceberg/io/pyarrow.py b/python/pyiceberg/io/pyarrow.py
index 2cc20549fe..7f6045abed 100644
--- a/python/pyiceberg/io/pyarrow.py
+++ b/python/pyiceberg/io/pyarrow.py
@@ -297,24 +297,24 @@ class PyArrowFile(InputFile, OutputFile):
 
 
 class PyArrowFileIO(FileIO):
-    fs_by_scheme: Callable[[str], FileSystem]
+    fs_by_scheme: Callable[[str, Optional[str]], FileSystem]
 
     def __init__(self, properties: Properties = EMPTY_DICT):
-        self.fs_by_scheme: Callable[[str], FileSystem] = 
lru_cache(self._initialize_fs)
+        self.fs_by_scheme: Callable[[str, Optional[str]], FileSystem] = 
lru_cache(self._initialize_fs)
         super().__init__(properties=properties)
 
     @staticmethod
-    def parse_location(location: str) -> Tuple[str, str]:
+    def parse_location(location: str) -> Tuple[str, str, str]:
         """Return the path without the scheme."""
         uri = urlparse(location)
         if not uri.scheme:
-            return "file", os.path.abspath(location)
+            return "file", uri.netloc, os.path.abspath(location)
         elif uri.scheme == "hdfs":
-            return uri.scheme, location
+            return uri.scheme, uri.netloc, location
         else:
-            return uri.scheme, f"{uri.netloc}{uri.path}"
+            return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}"
 
-    def _initialize_fs(self, scheme: str) -> FileSystem:
+    def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> 
FileSystem:
         if scheme in {"s3", "s3a", "s3n"}:
             from pyarrow.fs import S3FileSystem
 
@@ -334,6 +334,8 @@ class PyArrowFileIO(FileIO):
             from pyarrow.fs import HadoopFileSystem
 
             hdfs_kwargs: Dict[str, Any] = {}
+            if netloc:
+                return HadoopFileSystem.from_uri(f"hdfs://{netloc}")
             if host := self.properties.get(HDFS_HOST):
                 hdfs_kwargs["host"] = host
             if port := self.properties.get(HDFS_PORT):
@@ -377,9 +379,9 @@ class PyArrowFileIO(FileIO):
         Returns:
             PyArrowFile: A PyArrowFile instance for the given location.
         """
-        scheme, path = self.parse_location(location)
+        scheme, netloc, path = self.parse_location(location)
         return PyArrowFile(
-            fs=self.fs_by_scheme(scheme),
+            fs=self.fs_by_scheme(scheme, netloc),
             location=location,
             path=path,
             buffer_size=int(self.properties.get(BUFFER_SIZE, ONE_MEGABYTE)),
@@ -394,9 +396,9 @@ class PyArrowFileIO(FileIO):
         Returns:
             PyArrowFile: A PyArrowFile instance for the given location.
         """
-        scheme, path = self.parse_location(location)
+        scheme, netloc, path = self.parse_location(location)
         return PyArrowFile(
-            fs=self.fs_by_scheme(scheme),
+            fs=self.fs_by_scheme(scheme, netloc),
             location=location,
             path=path,
             buffer_size=int(self.properties.get(BUFFER_SIZE, ONE_MEGABYTE)),
@@ -415,8 +417,8 @@ class PyArrowFileIO(FileIO):
                 an AWS error code 15.
         """
         str_location = location.location if isinstance(location, (InputFile, 
OutputFile)) else location
-        scheme, path = self.parse_location(str_location)
-        fs = self.fs_by_scheme(scheme)
+        scheme, netloc, path = self.parse_location(str_location)
+        fs = self.fs_by_scheme(scheme, netloc)
 
         try:
             fs.delete_file(path)
@@ -588,7 +590,7 @@ def _get_file_format(file_format: FileFormat, **kwargs: 
Dict[str, Any]) -> ds.Fi
 
 
 def _construct_fragment(fs: FileSystem, data_file: DataFile, 
file_format_kwargs: Dict[str, Any] = EMPTY_DICT) -> ds.Fragment:
-    _, path = PyArrowFileIO.parse_location(data_file.file_path)
+    _, _, path = PyArrowFileIO.parse_location(data_file.file_path)
     return _get_file_format(data_file.file_format, 
**file_format_kwargs).make_fragment(path, fs)
 
 
@@ -810,7 +812,7 @@ def _task_to_table(
     if limit and sum(row_counts) >= limit:
         return None
 
-    _, path = PyArrowFileIO.parse_location(task.file.file_path)
+    _, _, path = PyArrowFileIO.parse_location(task.file.file_path)
     arrow_format = ds.ParquetFileFormat(pre_buffer=True, 
buffer_size=(ONE_MEGABYTE * 8))
     with fs.open_input_file(path) as fin:
         fragment = arrow_format.make_fragment(fin)
@@ -919,9 +921,9 @@ def project_table(
     Raises:
         ResolveError: When an incompatible query is done.
     """
-    scheme, _ = PyArrowFileIO.parse_location(table.location())
+    scheme, netloc, _ = PyArrowFileIO.parse_location(table.location())
     if isinstance(table.io, PyArrowFileIO):
-        fs = table.io.fs_by_scheme(scheme)
+        fs = table.io.fs_by_scheme(scheme, netloc)
     else:
         try:
             from pyiceberg.io.fsspec import FsspecFileIO
diff --git a/python/tests/io/test_pyarrow.py b/python/tests/io/test_pyarrow.py
index 49e1c8bca8..8b62212593 100644
--- a/python/tests/io/test_pyarrow.py
+++ b/python/tests/io/test_pyarrow.py
@@ -1529,17 +1529,16 @@ def 
test_writing_avro_file_gcs(generated_manifest_entry_file: str, pyarrow_filei
     pyarrow_fileio_gcs.delete(f"gs://warehouse/{filename}")
 
 
-def test_parse_hdfs_location() -> None:
-    locations = ["hdfs://127.0.0.1:9000/root/foo.txt", 
"hdfs://127.0.0.1/root/foo.txt"]
-    for location in locations:
-        schema, path = PyArrowFileIO.parse_location(location)
-        assert schema == "hdfs"
-        assert location == path
-
-
-def test_parse_local_location() -> None:
-    locations = ["/root/foo.txt", "/root/tmp/foo.txt"]
-    for location in locations:
-        schema, path = PyArrowFileIO.parse_location(location)
-        assert schema == "file"
-        assert location == path
+def test_parse_location() -> None:
+    def check_results(location: str, expected_schema: str, expected_netloc: 
str, expected_uri: str) -> None:
+        schema, netloc, uri = PyArrowFileIO.parse_location(location)
+        assert schema == expected_schema
+        assert netloc == expected_netloc
+        assert uri == expected_uri
+
+    check_results("hdfs://127.0.0.1:9000/root/foo.txt", "hdfs", 
"127.0.0.1:9000", "hdfs://127.0.0.1:9000/root/foo.txt")
+    check_results("hdfs://127.0.0.1/root/foo.txt", "hdfs", "127.0.0.1", 
"hdfs://127.0.0.1/root/foo.txt")
+    check_results("hdfs://clusterA/root/foo.txt", "hdfs", "clusterA", 
"hdfs://clusterA/root/foo.txt")
+
+    check_results("/root/foo.txt", "file", "", "/root/foo.txt")
+    check_results("/root/tmp/foo.txt", "file", "", "/root/tmp/foo.txt")

Reply via email to