This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a1a75f5a1b [python] Fix pyarrow module missing attribute (#7258)
a1a75f5a1b is described below
commit a1a75f5a1b26ec487b23fc064f26e36604aece69
Author: tonymtu <[email protected]>
AuthorDate: Thu Feb 12 16:28:10 2026 +0800
[python] Fix pyarrow module missing attribute (#7258)
Fix AttributeError: module 'pyarrow' has no attribute 'parquet' in
format table read/write.
pyarrow.parquet is a submodule that needs explicit import. Previously
this wasn't caught maybe because other modules imported pyarrow.parquet
as a side effect, making it available globally. When those modules are
absent or import order changes, the error surfaces.
---
paimon-python/pypaimon/common/file_io.py | 7 ++--
paimon-python/pypaimon/filesystem/local.py | 8 ++---
paimon-python/pypaimon/filesystem/local_file_io.py | 8 ++---
.../pypaimon/filesystem/pyarrow_file_io.py | 42 ++++++++++------------
.../pypaimon/table/format/format_table_read.py | 34 ++++++++----------
.../pypaimon/table/format/format_table_write.py | 28 ++++++---------
paimon-python/pypaimon/tests/file_io_test.py | 10 +++---
7 files changed, 60 insertions(+), 77 deletions(-)
diff --git a/paimon-python/pypaimon/common/file_io.py
b/paimon-python/pypaimon/common/file_io.py
index 536e06c0b1..c47f82c503 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -21,7 +21,8 @@ from abc import ABC, abstractmethod
from pathlib import Path
from typing import List, Optional
-import pyarrow
+import pyarrow # noqa: F401
+import pyarrow.fs as pafs
from pypaimon.common.options import Options
@@ -97,7 +98,7 @@ class FileIO(ABC):
def is_dir(self, path: str) -> bool:
file_info = self.get_file_status(path)
- return file_info.type == pyarrow.fs.FileType.Directory
+ return file_info.type == pafs.FileType.Directory
def check_or_mkdirs(self, path: str):
if self.exists(path):
@@ -153,7 +154,7 @@ class FileIO(ABC):
def copy_files(self, source_directory: str, target_directory: str,
overwrite: bool = False):
file_infos = self.list_status(source_directory)
for file_info in file_infos:
- if file_info.type == pyarrow.fs.FileType.File:
+ if file_info.type == pafs.FileType.File:
source_file = file_info.path
file_name = source_file.split('/')[-1]
target_file = f"{target_directory.rstrip('/')}/{file_name}" if
target_directory else file_name
diff --git a/paimon-python/pypaimon/filesystem/local.py
b/paimon-python/pypaimon/filesystem/local.py
index c48eab6459..496310a5e8 100644
--- a/paimon-python/pypaimon/filesystem/local.py
+++ b/paimon-python/pypaimon/filesystem/local.py
@@ -16,7 +16,7 @@
# under the License.
import threading
-import pyarrow
+import pyarrow.fs as pafs
from pathlib import Path
from pyarrow._fs import LocalFileSystem
@@ -29,15 +29,15 @@ class PaimonLocalFileSystem(LocalFileSystem):
try:
with PaimonLocalFileSystem.rename_lock:
dst_file_info = self.get_file_info([dst])[0]
- if dst_file_info.type != pyarrow.fs.FileType.NotFound:
- if dst_file_info.type == pyarrow.fs.FileType.File:
+ if dst_file_info.type != pafs.FileType.NotFound:
+ if dst_file_info.type == pafs.FileType.File:
return False
# Make it compatible with HadoopFileIO: if dst is an
existing directory,
# dst=dst/srcFileName
src_name = Path(src).name
dst = str(Path(dst) / src_name)
final_dst_info = self.get_file_info([dst])[0]
- if final_dst_info.type != pyarrow.fs.FileType.NotFound:
+ if final_dst_info.type != pafs.FileType.NotFound:
return False
# Perform atomic move
diff --git a/paimon-python/pypaimon/filesystem/local_file_io.py
b/paimon-python/pypaimon/filesystem/local_file_io.py
index 8e8fd46552..91fc1c9069 100644
--- a/paimon-python/pypaimon/filesystem/local_file_io.py
+++ b/paimon-python/pypaimon/filesystem/local_file_io.py
@@ -26,7 +26,7 @@ from typing import Any, Dict, Optional
from urllib.parse import urlparse
import pyarrow
-import pyarrow.fs
+import pyarrow.fs as pafs
from pypaimon.common.file_io import FileIO
from pypaimon.common.options import Options
@@ -121,9 +121,9 @@ class LocalFileIO(FileIO):
self.base_name = os.path.basename(original_path)
self.size = stat_info.st_size if file_path.is_file() else None
self.type = (
- pyarrow.fs.FileType.Directory if file_path.is_dir()
- else pyarrow.fs.FileType.File if file_path.is_file()
- else pyarrow.fs.FileType.NotFound
+ pafs.FileType.Directory if file_path.is_dir()
+ else pafs.FileType.File if file_path.is_file()
+ else pafs.FileType.NotFound
)
self.mtime = stat_info.st_mtime
diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
index a8cd004ea2..cb689baeaf 100644
--- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
+++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
@@ -25,6 +25,7 @@ from typing import Any, Dict, List, Optional
from urllib.parse import splitport, urlparse
import pyarrow
+import pyarrow.fs as pafs
from packaging.version import parse
from pyarrow._fs import FileSystem
@@ -81,8 +82,7 @@ class PyArrowFileIO(FileIO):
'connect_timeout': connect_timeout
}
try:
- from pyarrow.fs import AwsStandardS3RetryStrategy
- retry_strategy =
AwsStandardS3RetryStrategy(max_attempts=max_attempts)
+ retry_strategy =
pafs.AwsStandardS3RetryStrategy(max_attempts=max_attempts)
config['retry_strategy'] = retry_strategy
except ImportError:
pass
@@ -111,8 +111,6 @@ class PyArrowFileIO(FileIO):
return bucket
def _initialize_oss_fs(self, path) -> FileSystem:
- from pyarrow.fs import S3FileSystem
-
client_kwargs = {
"access_key": self.properties.get(OssOptions.OSS_ACCESS_KEY_ID),
"secret_key":
self.properties.get(OssOptions.OSS_ACCESS_KEY_SECRET),
@@ -130,11 +128,9 @@ class PyArrowFileIO(FileIO):
retry_config = self._create_s3_retry_config()
client_kwargs.update(retry_config)
- return S3FileSystem(**client_kwargs)
+ return pafs.S3FileSystem(**client_kwargs)
def _initialize_s3_fs(self) -> FileSystem:
- from pyarrow.fs import S3FileSystem
-
client_kwargs = {
"endpoint_override": self.properties.get(S3Options.S3_ENDPOINT),
"access_key": self.properties.get(S3Options.S3_ACCESS_KEY_ID),
@@ -148,11 +144,9 @@ class PyArrowFileIO(FileIO):
retry_config = self._create_s3_retry_config()
client_kwargs.update(retry_config)
- return S3FileSystem(**client_kwargs)
+ return pafs.S3FileSystem(**client_kwargs)
def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) ->
FileSystem:
- from pyarrow.fs import HadoopFileSystem
-
if 'HADOOP_HOME' not in os.environ:
raise RuntimeError("HADOOP_HOME environment variable is not set.")
if 'HADOOP_CONF_DIR' not in os.environ:
@@ -171,7 +165,7 @@ class PyArrowFileIO(FileIO):
os.environ['CLASSPATH'] = class_paths.stdout.strip()
host, port_str = splitport(netloc)
- return HadoopFileSystem(
+ return pafs.HadoopFileSystem(
host=host,
port=int(port_str),
user=os.environ.get('HADOOP_USER_NAME', 'hadoop')
@@ -205,35 +199,35 @@ class PyArrowFileIO(FileIO):
file_infos = self.filesystem.get_file_info([path_str])
file_info = file_infos[0]
- if file_info.type == pyarrow.fs.FileType.NotFound:
+ if file_info.type == pafs.FileType.NotFound:
raise FileNotFoundError(f"File {path} (resolved as {path_str})
does not exist")
return file_info
def list_status(self, path: str):
path_str = self.to_filesystem_path(path)
- selector = pyarrow.fs.FileSelector(path_str, recursive=False,
allow_not_found=True)
+ selector = pafs.FileSelector(path_str, recursive=False,
allow_not_found=True)
return self.filesystem.get_file_info(selector)
def list_directories(self, path: str):
file_infos = self.list_status(path)
- return [info for info in file_infos if info.type ==
pyarrow.fs.FileType.Directory]
+ return [info for info in file_infos if info.type ==
pafs.FileType.Directory]
def exists(self, path: str) -> bool:
path_str = self.to_filesystem_path(path)
file_info = self.filesystem.get_file_info([path_str])[0]
- return file_info.type != pyarrow.fs.FileType.NotFound
+ return file_info.type != pafs.FileType.NotFound
def delete(self, path: str, recursive: bool = False) -> bool:
path_str = self.to_filesystem_path(path)
file_info = self.filesystem.get_file_info([path_str])[0]
- if file_info.type == pyarrow.fs.FileType.NotFound:
+ if file_info.type == pafs.FileType.NotFound:
return False
- if file_info.type == pyarrow.fs.FileType.Directory:
+ if file_info.type == pafs.FileType.Directory:
if not recursive:
- selector = pyarrow.fs.FileSelector(path_str, recursive=False,
allow_not_found=True)
+ selector = pafs.FileSelector(path_str, recursive=False,
allow_not_found=True)
dir_contents = self.filesystem.get_file_info(selector)
if len(dir_contents) > 0:
raise OSError(f"Directory {path} is not empty")
@@ -250,9 +244,9 @@ class PyArrowFileIO(FileIO):
path_str = self.to_filesystem_path(path)
file_info = self.filesystem.get_file_info([path_str])[0]
- if file_info.type == pyarrow.fs.FileType.Directory:
+ if file_info.type == pafs.FileType.Directory:
return True
- elif file_info.type == pyarrow.fs.FileType.File:
+ elif file_info.type == pafs.FileType.File:
raise FileExistsError(f"Path exists but is not a directory:
{path}")
self.filesystem.create_dir(path_str, recursive=True)
@@ -271,15 +265,15 @@ class PyArrowFileIO(FileIO):
return self.filesystem.rename(src_str, dst_str)
dst_file_info = self.filesystem.get_file_info([dst_str])[0]
- if dst_file_info.type != pyarrow.fs.FileType.NotFound:
- if dst_file_info.type == pyarrow.fs.FileType.File:
+ if dst_file_info.type != pafs.FileType.NotFound:
+ if dst_file_info.type == pafs.FileType.File:
return False
# Make it compatible with HadoopFileIO: if dst is an existing
directory,
# dst=dst/srcFileName
src_name = Path(src_str).name
dst_str = str(Path(dst_str) / src_name)
final_dst_info = self.filesystem.get_file_info([dst_str])[0]
- if final_dst_info.type != pyarrow.fs.FileType.NotFound:
+ if final_dst_info.type != pafs.FileType.NotFound:
return False
self.filesystem.move(src_str, dst_str)
@@ -317,7 +311,7 @@ class PyArrowFileIO(FileIO):
if self.exists(path):
path_str = self.to_filesystem_path(path)
file_info = self.filesystem.get_file_info([path_str])[0]
- if file_info.type == pyarrow.fs.FileType.Directory:
+ if file_info.type == pafs.FileType.Directory:
return False
temp_path = path + str(uuid.uuid4()) + ".tmp"
diff --git a/paimon-python/pypaimon/table/format/format_table_read.py
b/paimon-python/pypaimon/table/format/format_table_read.py
index 11ce3faf5b..1822c14284 100644
--- a/paimon-python/pypaimon/table/format/format_table_read.py
+++ b/paimon-python/pypaimon/table/format/format_table_read.py
@@ -50,8 +50,9 @@ def _read_file_to_arrow(
) -> pyarrow.Table:
path = split.data_path()
csv_read_options = None
- if fmt == Format.CSV and hasattr(pyarrow, "csv"):
- csv_read_options = pyarrow.csv.ReadOptions(block_size=1 << 20)
+ if fmt == Format.CSV:
+ import pyarrow.csv as csv
+ csv_read_options = csv.ReadOptions(block_size=1 << 20)
try:
with file_io.new_input_stream(path) as stream:
chunks = []
@@ -71,25 +72,25 @@ def _read_file_to_arrow(
if fmt == Format.PARQUET:
import io
+ import pyarrow.parquet as pq
data = (
bytes(data) if not isinstance(data, bytes) else data
)
if len(data) < 4 or data[:4] != b"PAR1":
return pyarrow.table({})
try:
- tbl = pyarrow.parquet.read_table(io.BytesIO(data))
+ tbl = pq.read_table(io.BytesIO(data))
except pyarrow.ArrowInvalid:
return pyarrow.table({})
elif fmt == Format.CSV:
- if hasattr(pyarrow, "csv"):
- tbl = pyarrow.csv.read_csv(
+ import pyarrow.csv as csv
+ try:
+ tbl = csv.read_csv(
pyarrow.BufferReader(data),
read_options=csv_read_options,
)
- else:
- import io
- df = pandas.read_csv(io.BytesIO(data))
- tbl = pyarrow.Table.from_pandas(df)
+ except Exception:
+ return pyarrow.table({})
elif fmt == Format.JSON:
import json
text = data.decode("utf-8") if isinstance(data, bytes) else data
@@ -103,17 +104,12 @@ def _read_file_to_arrow(
tbl = pyarrow.Table.from_pylist(records)
elif fmt == Format.ORC:
import io
+ import pyarrow.orc as orc
data = bytes(data) if not isinstance(data, bytes) else data
- if hasattr(pyarrow, "orc"):
- try:
- tbl = pyarrow.orc.read_table(io.BytesIO(data))
- except Exception:
- return pyarrow.table({})
- else:
- raise ValueError(
- "Format table read for ORC requires PyArrow with ORC support "
- "(pyarrow.orc)"
- )
+ try:
+ tbl = orc.read_table(io.BytesIO(data))
+ except Exception:
+ return pyarrow.table({})
elif fmt == Format.TEXT:
text = data.decode("utf-8") if isinstance(data, bytes) else data
lines = (
diff --git a/paimon-python/pypaimon/table/format/format_table_write.py
b/paimon-python/pypaimon/table/format/format_table_write.py
index eb45b718d5..e19afb3d71 100644
--- a/paimon-python/pypaimon/table/format/format_table_write.py
+++ b/paimon-python/pypaimon/table/format/format_table_write.py
@@ -178,18 +178,15 @@ class FormatTableWrite:
fmt = self._file_format
tbl = pyarrow.Table.from_batches([data])
if fmt == Format.PARQUET:
+ import pyarrow.parquet as pq
buf = io.BytesIO()
- pyarrow.parquet.write_table(tbl, buf, compression="zstd")
+ pq.write_table(tbl, buf, compression="zstd")
raw = buf.getvalue()
elif fmt == Format.CSV:
- if hasattr(pyarrow, "csv"):
- buf = io.BytesIO()
- pyarrow.csv.write_csv(tbl, buf)
- raw = buf.getvalue()
- else:
- buf = io.StringIO()
- tbl.to_pandas().to_csv(buf, index=False)
- raw = buf.getvalue().encode("utf-8")
+ import pyarrow.csv as csv
+ buf = io.BytesIO()
+ csv.write_csv(tbl, buf)
+ raw = buf.getvalue()
elif fmt == Format.JSON:
import json
lines = []
@@ -201,15 +198,10 @@ class FormatTableWrite:
lines.append(json.dumps(row) + "\n")
raw = "".join(lines).encode("utf-8")
elif fmt == Format.ORC:
- if hasattr(pyarrow, "orc"):
- buf = io.BytesIO()
- pyarrow.orc.write_table(tbl, buf)
- raw = buf.getvalue()
- else:
- raise ValueError(
- "Format table write for ORC requires PyArrow with ORC "
- "support (pyarrow.orc)"
- )
+ import pyarrow.orc as orc
+ buf = io.BytesIO()
+ orc.write_table(tbl, buf)
+ raw = buf.getvalue()
elif fmt == Format.TEXT:
partition_keys = self.table.partition_keys
if partition_keys:
diff --git a/paimon-python/pypaimon/tests/file_io_test.py
b/paimon-python/pypaimon/tests/file_io_test.py
index ca834040d1..ae9abeff23 100644
--- a/paimon-python/pypaimon/tests/file_io_test.py
+++ b/paimon-python/pypaimon/tests/file_io_test.py
@@ -23,7 +23,7 @@ from pathlib import Path
from unittest.mock import MagicMock, patch
import pyarrow
-from pyarrow.fs import S3FileSystem
+import pyarrow.fs as pafs
from pypaimon.common.options import Options
from pypaimon.common.options.config import OssOptions
@@ -37,7 +37,7 @@ class FileIOTest(unittest.TestCase):
def test_s3_filesystem_path_conversion(self):
"""Test S3FileSystem path conversion with various formats."""
file_io = PyArrowFileIO("s3://bucket/warehouse", Options({}))
- self.assertIsInstance(file_io.filesystem, S3FileSystem)
+ self.assertIsInstance(file_io.filesystem, pafs.S3FileSystem)
# Test bucket and path
self.assertEqual(file_io.to_filesystem_path("s3://my-bucket/path/to/file.txt"),
@@ -75,7 +75,7 @@ class FileIOTest(unittest.TestCase):
expected_path = (
"path/to/file.txt" if lt7 else "test-bucket/path/to/file.txt")
self.assertEqual(got, expected_path)
- nf = MagicMock(type=pyarrow.fs.FileType.NotFound)
+ nf = MagicMock(type=pafs.FileType.NotFound)
mock_fs = MagicMock()
mock_fs.get_file_info.side_effect = [[nf], [nf]]
mock_fs.create_dir = MagicMock()
@@ -244,7 +244,7 @@ class FileIOTest(unittest.TestCase):
}))
mock_fs = MagicMock()
mock_fs.get_file_info.return_value = [
- MagicMock(type=pyarrow.fs.FileType.NotFound)]
+ MagicMock(type=pafs.FileType.NotFound)]
oss_io.filesystem = mock_fs
self.assertFalse(oss_io.exists("oss://test-bucket/path/to/file.txt"))
finally:
@@ -331,7 +331,7 @@ class FileIOTest(unittest.TestCase):
f.write("test content")
file_info = file_io.get_file_status(f"file://{test_file}")
- self.assertEqual(file_info.type, pyarrow.fs.FileType.File)
+ self.assertEqual(file_info.type, pafs.FileType.File)
self.assertIsNotNone(file_info.size)
with self.assertRaises(FileNotFoundError) as context: