This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a1a75f5a1b [python] Fix pyarrow module missing attribute (#7258)
a1a75f5a1b is described below

commit a1a75f5a1b26ec487b23fc064f26e36604aece69
Author: tonymtu <[email protected]>
AuthorDate: Thu Feb 12 16:28:10 2026 +0800

    [python] Fix pyarrow module missing attribute (#7258)
    
    Fix AttributeError: module 'pyarrow' has no attribute 'parquet' in
    format table read/write.
    
    pyarrow.parquet is a submodule that needs explicit import. Previously
    this wasn't caught maybe because other modules imported pyarrow.parquet
    as a side effect, making it available globally. When those modules are
    absent or import order changes, the error surfaces.
---
 paimon-python/pypaimon/common/file_io.py           |  7 ++--
 paimon-python/pypaimon/filesystem/local.py         |  8 ++---
 paimon-python/pypaimon/filesystem/local_file_io.py |  8 ++---
 .../pypaimon/filesystem/pyarrow_file_io.py         | 42 ++++++++++------------
 .../pypaimon/table/format/format_table_read.py     | 34 ++++++++----------
 .../pypaimon/table/format/format_table_write.py    | 28 ++++++---------
 paimon-python/pypaimon/tests/file_io_test.py       | 10 +++---
 7 files changed, 60 insertions(+), 77 deletions(-)

diff --git a/paimon-python/pypaimon/common/file_io.py 
b/paimon-python/pypaimon/common/file_io.py
index 536e06c0b1..c47f82c503 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -21,7 +21,8 @@ from abc import ABC, abstractmethod
 from pathlib import Path
 from typing import List, Optional
 
-import pyarrow
+import pyarrow  # noqa: F401
+import pyarrow.fs as pafs
 
 from pypaimon.common.options import Options
 
@@ -97,7 +98,7 @@ class FileIO(ABC):
 
     def is_dir(self, path: str) -> bool:
         file_info = self.get_file_status(path)
-        return file_info.type == pyarrow.fs.FileType.Directory
+        return file_info.type == pafs.FileType.Directory
 
     def check_or_mkdirs(self, path: str):
         if self.exists(path):
@@ -153,7 +154,7 @@ class FileIO(ABC):
     def copy_files(self, source_directory: str, target_directory: str, 
overwrite: bool = False):
         file_infos = self.list_status(source_directory)
         for file_info in file_infos:
-            if file_info.type == pyarrow.fs.FileType.File:
+            if file_info.type == pafs.FileType.File:
                 source_file = file_info.path
                 file_name = source_file.split('/')[-1]
                 target_file = f"{target_directory.rstrip('/')}/{file_name}" if 
target_directory else file_name
diff --git a/paimon-python/pypaimon/filesystem/local.py 
b/paimon-python/pypaimon/filesystem/local.py
index c48eab6459..496310a5e8 100644
--- a/paimon-python/pypaimon/filesystem/local.py
+++ b/paimon-python/pypaimon/filesystem/local.py
@@ -16,7 +16,7 @@
 #  under the License.
 
 import threading
-import pyarrow
+import pyarrow.fs as pafs
 from pathlib import Path
 from pyarrow._fs import LocalFileSystem
 
@@ -29,15 +29,15 @@ class PaimonLocalFileSystem(LocalFileSystem):
         try:
             with PaimonLocalFileSystem.rename_lock:
                 dst_file_info = self.get_file_info([dst])[0]
-                if dst_file_info.type != pyarrow.fs.FileType.NotFound:
-                    if dst_file_info.type == pyarrow.fs.FileType.File:
+                if dst_file_info.type != pafs.FileType.NotFound:
+                    if dst_file_info.type == pafs.FileType.File:
                         return False
                     # Make it compatible with HadoopFileIO: if dst is an 
existing directory,
                     # dst=dst/srcFileName
                     src_name = Path(src).name
                     dst = str(Path(dst) / src_name)
                     final_dst_info = self.get_file_info([dst])[0]
-                    if final_dst_info.type != pyarrow.fs.FileType.NotFound:
+                    if final_dst_info.type != pafs.FileType.NotFound:
                         return False
                 
                 # Perform atomic move
diff --git a/paimon-python/pypaimon/filesystem/local_file_io.py 
b/paimon-python/pypaimon/filesystem/local_file_io.py
index 8e8fd46552..91fc1c9069 100644
--- a/paimon-python/pypaimon/filesystem/local_file_io.py
+++ b/paimon-python/pypaimon/filesystem/local_file_io.py
@@ -26,7 +26,7 @@ from typing import Any, Dict, Optional
 from urllib.parse import urlparse
 
 import pyarrow
-import pyarrow.fs
+import pyarrow.fs as pafs
 
 from pypaimon.common.file_io import FileIO
 from pypaimon.common.options import Options
@@ -121,9 +121,9 @@ class LocalFileIO(FileIO):
                 self.base_name = os.path.basename(original_path)
                 self.size = stat_info.st_size if file_path.is_file() else None
                 self.type = (
-                    pyarrow.fs.FileType.Directory if file_path.is_dir()
-                    else pyarrow.fs.FileType.File if file_path.is_file()
-                    else pyarrow.fs.FileType.NotFound
+                    pafs.FileType.Directory if file_path.is_dir()
+                    else pafs.FileType.File if file_path.is_file()
+                    else pafs.FileType.NotFound
                 )
                 self.mtime = stat_info.st_mtime
         
diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py 
b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
index a8cd004ea2..cb689baeaf 100644
--- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
+++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
@@ -25,6 +25,7 @@ from typing import Any, Dict, List, Optional
 from urllib.parse import splitport, urlparse
 
 import pyarrow
+import pyarrow.fs as pafs
 from packaging.version import parse
 from pyarrow._fs import FileSystem
 
@@ -81,8 +82,7 @@ class PyArrowFileIO(FileIO):
                 'connect_timeout': connect_timeout
             }
             try:
-                from pyarrow.fs import AwsStandardS3RetryStrategy
-                retry_strategy = 
AwsStandardS3RetryStrategy(max_attempts=max_attempts)
+                retry_strategy = 
pafs.AwsStandardS3RetryStrategy(max_attempts=max_attempts)
                 config['retry_strategy'] = retry_strategy
             except ImportError:
                 pass
@@ -111,8 +111,6 @@ class PyArrowFileIO(FileIO):
         return bucket
 
     def _initialize_oss_fs(self, path) -> FileSystem:
-        from pyarrow.fs import S3FileSystem
-
         client_kwargs = {
             "access_key": self.properties.get(OssOptions.OSS_ACCESS_KEY_ID),
             "secret_key": 
self.properties.get(OssOptions.OSS_ACCESS_KEY_SECRET),
@@ -130,11 +128,9 @@ class PyArrowFileIO(FileIO):
         retry_config = self._create_s3_retry_config()
         client_kwargs.update(retry_config)
 
-        return S3FileSystem(**client_kwargs)
+        return pafs.S3FileSystem(**client_kwargs)
 
     def _initialize_s3_fs(self) -> FileSystem:
-        from pyarrow.fs import S3FileSystem
-
         client_kwargs = {
             "endpoint_override": self.properties.get(S3Options.S3_ENDPOINT),
             "access_key": self.properties.get(S3Options.S3_ACCESS_KEY_ID),
@@ -148,11 +144,9 @@ class PyArrowFileIO(FileIO):
         retry_config = self._create_s3_retry_config()
         client_kwargs.update(retry_config)
 
-        return S3FileSystem(**client_kwargs)
+        return pafs.S3FileSystem(**client_kwargs)
 
     def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> 
FileSystem:
-        from pyarrow.fs import HadoopFileSystem
-
         if 'HADOOP_HOME' not in os.environ:
             raise RuntimeError("HADOOP_HOME environment variable is not set.")
         if 'HADOOP_CONF_DIR' not in os.environ:
@@ -171,7 +165,7 @@ class PyArrowFileIO(FileIO):
         os.environ['CLASSPATH'] = class_paths.stdout.strip()
 
         host, port_str = splitport(netloc)
-        return HadoopFileSystem(
+        return pafs.HadoopFileSystem(
             host=host,
             port=int(port_str),
             user=os.environ.get('HADOOP_USER_NAME', 'hadoop')
@@ -205,35 +199,35 @@ class PyArrowFileIO(FileIO):
         file_infos = self.filesystem.get_file_info([path_str])
         file_info = file_infos[0]
         
-        if file_info.type == pyarrow.fs.FileType.NotFound:
+        if file_info.type == pafs.FileType.NotFound:
             raise FileNotFoundError(f"File {path} (resolved as {path_str}) 
does not exist")
         
         return file_info
 
     def list_status(self, path: str):
         path_str = self.to_filesystem_path(path)
-        selector = pyarrow.fs.FileSelector(path_str, recursive=False, 
allow_not_found=True)
+        selector = pafs.FileSelector(path_str, recursive=False, 
allow_not_found=True)
         return self.filesystem.get_file_info(selector)
 
     def list_directories(self, path: str):
         file_infos = self.list_status(path)
-        return [info for info in file_infos if info.type == 
pyarrow.fs.FileType.Directory]
+        return [info for info in file_infos if info.type == 
pafs.FileType.Directory]
 
     def exists(self, path: str) -> bool:
         path_str = self.to_filesystem_path(path)
         file_info = self.filesystem.get_file_info([path_str])[0]
-        return file_info.type != pyarrow.fs.FileType.NotFound
+        return file_info.type != pafs.FileType.NotFound
 
     def delete(self, path: str, recursive: bool = False) -> bool:
         path_str = self.to_filesystem_path(path)
         file_info = self.filesystem.get_file_info([path_str])[0]
         
-        if file_info.type == pyarrow.fs.FileType.NotFound:
+        if file_info.type == pafs.FileType.NotFound:
             return False
         
-        if file_info.type == pyarrow.fs.FileType.Directory:
+        if file_info.type == pafs.FileType.Directory:
             if not recursive:
-                selector = pyarrow.fs.FileSelector(path_str, recursive=False, 
allow_not_found=True)
+                selector = pafs.FileSelector(path_str, recursive=False, 
allow_not_found=True)
                 dir_contents = self.filesystem.get_file_info(selector)
                 if len(dir_contents) > 0:
                     raise OSError(f"Directory {path} is not empty")
@@ -250,9 +244,9 @@ class PyArrowFileIO(FileIO):
         path_str = self.to_filesystem_path(path)
         file_info = self.filesystem.get_file_info([path_str])[0]
         
-        if file_info.type == pyarrow.fs.FileType.Directory:
+        if file_info.type == pafs.FileType.Directory:
             return True
-        elif file_info.type == pyarrow.fs.FileType.File:
+        elif file_info.type == pafs.FileType.File:
             raise FileExistsError(f"Path exists but is not a directory: 
{path}")
         
         self.filesystem.create_dir(path_str, recursive=True)
@@ -271,15 +265,15 @@ class PyArrowFileIO(FileIO):
                 return self.filesystem.rename(src_str, dst_str)
             
             dst_file_info = self.filesystem.get_file_info([dst_str])[0]
-            if dst_file_info.type != pyarrow.fs.FileType.NotFound:
-                if dst_file_info.type == pyarrow.fs.FileType.File:
+            if dst_file_info.type != pafs.FileType.NotFound:
+                if dst_file_info.type == pafs.FileType.File:
                     return False
                 # Make it compatible with HadoopFileIO: if dst is an existing 
directory,
                 # dst=dst/srcFileName
                 src_name = Path(src_str).name
                 dst_str = str(Path(dst_str) / src_name)
                 final_dst_info = self.filesystem.get_file_info([dst_str])[0]
-                if final_dst_info.type != pyarrow.fs.FileType.NotFound:
+                if final_dst_info.type != pafs.FileType.NotFound:
                     return False
             
             self.filesystem.move(src_str, dst_str)
@@ -317,7 +311,7 @@ class PyArrowFileIO(FileIO):
         if self.exists(path):
             path_str = self.to_filesystem_path(path)
             file_info = self.filesystem.get_file_info([path_str])[0]
-            if file_info.type == pyarrow.fs.FileType.Directory:
+            if file_info.type == pafs.FileType.Directory:
                 return False
         
         temp_path = path + str(uuid.uuid4()) + ".tmp"
diff --git a/paimon-python/pypaimon/table/format/format_table_read.py 
b/paimon-python/pypaimon/table/format/format_table_read.py
index 11ce3faf5b..1822c14284 100644
--- a/paimon-python/pypaimon/table/format/format_table_read.py
+++ b/paimon-python/pypaimon/table/format/format_table_read.py
@@ -50,8 +50,9 @@ def _read_file_to_arrow(
 ) -> pyarrow.Table:
     path = split.data_path()
     csv_read_options = None
-    if fmt == Format.CSV and hasattr(pyarrow, "csv"):
-        csv_read_options = pyarrow.csv.ReadOptions(block_size=1 << 20)
+    if fmt == Format.CSV:
+        import pyarrow.csv as csv
+        csv_read_options = csv.ReadOptions(block_size=1 << 20)
     try:
         with file_io.new_input_stream(path) as stream:
             chunks = []
@@ -71,25 +72,25 @@ def _read_file_to_arrow(
 
     if fmt == Format.PARQUET:
         import io
+        import pyarrow.parquet as pq
         data = (
             bytes(data) if not isinstance(data, bytes) else data
         )
         if len(data) < 4 or data[:4] != b"PAR1":
             return pyarrow.table({})
         try:
-            tbl = pyarrow.parquet.read_table(io.BytesIO(data))
+            tbl = pq.read_table(io.BytesIO(data))
         except pyarrow.ArrowInvalid:
             return pyarrow.table({})
     elif fmt == Format.CSV:
-        if hasattr(pyarrow, "csv"):
-            tbl = pyarrow.csv.read_csv(
+        import pyarrow.csv as csv
+        try:
+            tbl = csv.read_csv(
                 pyarrow.BufferReader(data),
                 read_options=csv_read_options,
             )
-        else:
-            import io
-            df = pandas.read_csv(io.BytesIO(data))
-            tbl = pyarrow.Table.from_pandas(df)
+        except Exception:
+            return pyarrow.table({})
     elif fmt == Format.JSON:
         import json
         text = data.decode("utf-8") if isinstance(data, bytes) else data
@@ -103,17 +104,12 @@ def _read_file_to_arrow(
         tbl = pyarrow.Table.from_pylist(records)
     elif fmt == Format.ORC:
         import io
+        import pyarrow.orc as orc
         data = bytes(data) if not isinstance(data, bytes) else data
-        if hasattr(pyarrow, "orc"):
-            try:
-                tbl = pyarrow.orc.read_table(io.BytesIO(data))
-            except Exception:
-                return pyarrow.table({})
-        else:
-            raise ValueError(
-                "Format table read for ORC requires PyArrow with ORC support "
-                "(pyarrow.orc)"
-            )
+        try:
+            tbl = orc.read_table(io.BytesIO(data))
+        except Exception:
+            return pyarrow.table({})
     elif fmt == Format.TEXT:
         text = data.decode("utf-8") if isinstance(data, bytes) else data
         lines = (
diff --git a/paimon-python/pypaimon/table/format/format_table_write.py 
b/paimon-python/pypaimon/table/format/format_table_write.py
index eb45b718d5..e19afb3d71 100644
--- a/paimon-python/pypaimon/table/format/format_table_write.py
+++ b/paimon-python/pypaimon/table/format/format_table_write.py
@@ -178,18 +178,15 @@ class FormatTableWrite:
         fmt = self._file_format
         tbl = pyarrow.Table.from_batches([data])
         if fmt == Format.PARQUET:
+            import pyarrow.parquet as pq
             buf = io.BytesIO()
-            pyarrow.parquet.write_table(tbl, buf, compression="zstd")
+            pq.write_table(tbl, buf, compression="zstd")
             raw = buf.getvalue()
         elif fmt == Format.CSV:
-            if hasattr(pyarrow, "csv"):
-                buf = io.BytesIO()
-                pyarrow.csv.write_csv(tbl, buf)
-                raw = buf.getvalue()
-            else:
-                buf = io.StringIO()
-                tbl.to_pandas().to_csv(buf, index=False)
-                raw = buf.getvalue().encode("utf-8")
+            import pyarrow.csv as csv
+            buf = io.BytesIO()
+            csv.write_csv(tbl, buf)
+            raw = buf.getvalue()
         elif fmt == Format.JSON:
             import json
             lines = []
@@ -201,15 +198,10 @@ class FormatTableWrite:
                 lines.append(json.dumps(row) + "\n")
             raw = "".join(lines).encode("utf-8")
         elif fmt == Format.ORC:
-            if hasattr(pyarrow, "orc"):
-                buf = io.BytesIO()
-                pyarrow.orc.write_table(tbl, buf)
-                raw = buf.getvalue()
-            else:
-                raise ValueError(
-                    "Format table write for ORC requires PyArrow with ORC "
-                    "support (pyarrow.orc)"
-                )
+            import pyarrow.orc as orc
+            buf = io.BytesIO()
+            orc.write_table(tbl, buf)
+            raw = buf.getvalue()
         elif fmt == Format.TEXT:
             partition_keys = self.table.partition_keys
             if partition_keys:
diff --git a/paimon-python/pypaimon/tests/file_io_test.py 
b/paimon-python/pypaimon/tests/file_io_test.py
index ca834040d1..ae9abeff23 100644
--- a/paimon-python/pypaimon/tests/file_io_test.py
+++ b/paimon-python/pypaimon/tests/file_io_test.py
@@ -23,7 +23,7 @@ from pathlib import Path
 from unittest.mock import MagicMock, patch
 
 import pyarrow
-from pyarrow.fs import S3FileSystem
+import pyarrow.fs as pafs
 
 from pypaimon.common.options import Options
 from pypaimon.common.options.config import OssOptions
@@ -37,7 +37,7 @@ class FileIOTest(unittest.TestCase):
     def test_s3_filesystem_path_conversion(self):
         """Test S3FileSystem path conversion with various formats."""
         file_io = PyArrowFileIO("s3://bucket/warehouse", Options({}))
-        self.assertIsInstance(file_io.filesystem, S3FileSystem)
+        self.assertIsInstance(file_io.filesystem, pafs.S3FileSystem)
 
         # Test bucket and path
         
self.assertEqual(file_io.to_filesystem_path("s3://my-bucket/path/to/file.txt"),
@@ -75,7 +75,7 @@ class FileIOTest(unittest.TestCase):
         expected_path = (
             "path/to/file.txt" if lt7 else "test-bucket/path/to/file.txt")
         self.assertEqual(got, expected_path)
-        nf = MagicMock(type=pyarrow.fs.FileType.NotFound)
+        nf = MagicMock(type=pafs.FileType.NotFound)
         mock_fs = MagicMock()
         mock_fs.get_file_info.side_effect = [[nf], [nf]]
         mock_fs.create_dir = MagicMock()
@@ -244,7 +244,7 @@ class FileIOTest(unittest.TestCase):
             }))
             mock_fs = MagicMock()
             mock_fs.get_file_info.return_value = [
-                MagicMock(type=pyarrow.fs.FileType.NotFound)]
+                MagicMock(type=pafs.FileType.NotFound)]
             oss_io.filesystem = mock_fs
             
self.assertFalse(oss_io.exists("oss://test-bucket/path/to/file.txt"))
         finally:
@@ -331,7 +331,7 @@ class FileIOTest(unittest.TestCase):
                 f.write("test content")
             
             file_info = file_io.get_file_status(f"file://{test_file}")
-            self.assertEqual(file_info.type, pyarrow.fs.FileType.File)
+            self.assertEqual(file_info.type, pafs.FileType.File)
             self.assertIsNotNone(file_info.size)
 
             with self.assertRaises(FileNotFoundError) as context:

Reply via email to