This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3aa702b246 [python] refactor file_io to support polymorphism (#7040)
3aa702b246 is described below
commit 3aa702b246f7df8ca2c67a337362f2ad0b96d39d
Author: XiaoHongbo <[email protected]>
AuthorDate: Thu Jan 15 21:10:58 2026 +0800
[python] refactor file_io to support polymorphism (#7040)
---
.../pypaimon/catalog/filesystem_catalog.py | 2 +-
.../pypaimon/catalog/rest/rest_catalog.py | 2 +-
.../pypaimon/catalog/rest/rest_token_file_io.py | 147 +++++-
paimon-python/pypaimon/common/file_io.py | 545 ++++-----------------
paimon-python/pypaimon/common/uri_reader.py | 2 +-
paimon-python/pypaimon/filesystem/local_file_io.py | 439 +++++++++++++++++
.../file_io.py => filesystem/pyarrow_file_io.py} | 103 +---
paimon-python/pypaimon/read/reader/lance_utils.py | 3 +
.../rest_catalog_blob_as_descriptor_sample.py | 4 +-
paimon-python/pypaimon/table/row/blob.py | 2 +-
paimon-python/pypaimon/tests/blob_test.py | 15 +-
paimon-python/pypaimon/tests/file_io_test.py | 129 ++---
.../pypaimon/tests/filesystem_catalog_test.py | 17 +-
paimon-python/pypaimon/tests/lance_utils_test.py | 6 +-
.../pypaimon/tests/py36/ao_simple_test.py | 8 +-
paimon-python/pypaimon/tests/rest/rest_server.py | 2 +-
.../pypaimon/tests/rest/rest_token_file_io_test.py | 86 +++-
17 files changed, 847 insertions(+), 665 deletions(-)
diff --git a/paimon-python/pypaimon/catalog/filesystem_catalog.py
b/paimon-python/pypaimon/catalog/filesystem_catalog.py
index 8d1b485b74..65a4538966 100644
--- a/paimon-python/pypaimon/catalog/filesystem_catalog.py
+++ b/paimon-python/pypaimon/catalog/filesystem_catalog.py
@@ -46,7 +46,7 @@ class FileSystemCatalog(Catalog):
raise ValueError(f"Paimon '{CatalogOptions.WAREHOUSE.key()}' path
must be set")
self.warehouse = catalog_options.get(CatalogOptions.WAREHOUSE)
self.catalog_options = catalog_options
- self.file_io = FileIO(self.warehouse, self.catalog_options)
+ self.file_io = FileIO.get(self.warehouse, self.catalog_options)
def get_database(self, name: str) -> Database:
if self.file_io.exists(self.get_database_path(name)):
diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
index 5d9462f6c3..41a3061fb9 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
@@ -252,7 +252,7 @@ class RESTCatalog(Catalog):
)
def file_io_from_options(self, table_path: str) -> FileIO:
- return FileIO(table_path, self.context.options)
+ return FileIO.get(table_path, self.context.options)
def file_io_for_data(self, table_path: str, identifier: Identifier):
return RESTTokenFileIO(identifier, table_path, self.context.options) \
diff --git a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
index 2cec5df721..f686dc66ea 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
@@ -20,70 +20,164 @@ import threading
import time
from typing import Optional
-from pyarrow._fs import FileSystem
+from cachetools import TTLCache
from pypaimon.api.rest_api import RESTApi
from pypaimon.api.rest_util import RESTUtil
from pypaimon.catalog.rest.rest_token import RESTToken
from pypaimon.common.file_io import FileIO
+from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO
from pypaimon.common.identifier import Identifier
from pypaimon.common.options import Options
from pypaimon.common.options.config import CatalogOptions, OssOptions
+from pypaimon.common.uri_reader import UriReaderFactory
class RESTTokenFileIO(FileIO):
-
+ """
+ A FileIO to support getting token from REST Server.
+ """
+
+ _FILE_IO_CACHE_MAXSIZE = 1000
+ _FILE_IO_CACHE_TTL = 36000 # 10 hours in seconds
+
def __init__(self, identifier: Identifier, path: str,
catalog_options: Optional[Options] = None):
self.identifier = identifier
self.path = path
+ self.catalog_options = catalog_options
+ self.properties = catalog_options or Options({}) # For compatibility
with refresh_token()
self.token: Optional[RESTToken] = None
self.api_instance: Optional[RESTApi] = None
self.lock = threading.Lock()
self.log = logging.getLogger(__name__)
- super().__init__(path, catalog_options)
+ self._uri_reader_factory_cache: Optional[UriReaderFactory] = None
+ self._file_io_cache: TTLCache = TTLCache(
+ maxsize=self._FILE_IO_CACHE_MAXSIZE,
+ ttl=self._FILE_IO_CACHE_TTL
+ )
def __getstate__(self):
state = self.__dict__.copy()
# Remove non-serializable objects
state.pop('lock', None)
state.pop('api_instance', None)
+ state.pop('_file_io_cache', None)
+ state.pop('_uri_reader_factory_cache', None)
# token can be serialized, but we'll refresh it on deserialization
return state
def __setstate__(self, state):
self.__dict__.update(state)
- # Recreate lock after deserialization
+ # Recreate lock and cache after deserialization
self.lock = threading.Lock()
+ self._file_io_cache = TTLCache(
+ maxsize=self._FILE_IO_CACHE_MAXSIZE,
+ ttl=self._FILE_IO_CACHE_TTL
+ )
+ self._uri_reader_factory_cache = None
# api_instance will be recreated when needed
self.api_instance = None
- def _initialize_oss_fs(self, path) -> FileSystem:
+ def file_io(self) -> FileIO:
self.try_to_refresh_token()
- merged_token = self._merge_token_with_catalog_options(self.token.token)
- merged_properties = RESTUtil.merge(
- self.properties.to_map() if self.properties else {},
- merged_token
- )
- merged_options = Options(merged_properties)
- original_properties = self.properties
- self.properties = merged_options
- try:
- return super()._initialize_oss_fs(path)
- finally:
- self.properties = original_properties
+
+ if self.token is None:
+ return FileIO.get(self.path, self.catalog_options or Options({}))
+
+ cache_key = self.token
+
+ file_io = self._file_io_cache.get(cache_key)
+ if file_io is not None:
+ return file_io
+
+ with self.lock:
+ file_io = self._file_io_cache.get(cache_key)
+ if file_io is not None:
+ return file_io
+
+ merged_token =
self._merge_token_with_catalog_options(self.token.token)
+ merged_properties = RESTUtil.merge(
+ self.catalog_options.to_map() if self.catalog_options else {},
+ merged_token
+ )
+ merged_options = Options(merged_properties)
+
+ file_io = PyArrowFileIO(self.path, merged_options)
+ self._file_io_cache[cache_key] = file_io
+ return file_io
def _merge_token_with_catalog_options(self, token: dict) -> dict:
"""Merge token with catalog options, DLF OSS endpoint should override
the standard OSS endpoint."""
merged_token = dict(token)
- dlf_oss_endpoint = self.properties.get(CatalogOptions.DLF_OSS_ENDPOINT)
- if dlf_oss_endpoint and dlf_oss_endpoint.strip():
- merged_token[OssOptions.OSS_ENDPOINT.key()] = dlf_oss_endpoint
+ if self.catalog_options:
+ dlf_oss_endpoint =
self.catalog_options.get(CatalogOptions.DLF_OSS_ENDPOINT)
+ if dlf_oss_endpoint and dlf_oss_endpoint.strip():
+ merged_token[OssOptions.OSS_ENDPOINT.key()] = dlf_oss_endpoint
return merged_token
+ def new_input_stream(self, path: str):
+ return self.file_io().new_input_stream(path)
+
def new_output_stream(self, path: str):
- # Call parent class method to ensure path conversion and parent
directory creation
- return super().new_output_stream(path)
+ return self.file_io().new_output_stream(path)
+
+ def get_file_status(self, path: str):
+ return self.file_io().get_file_status(path)
+
+ def list_status(self, path: str):
+ return self.file_io().list_status(path)
+
+ def exists(self, path: str) -> bool:
+ return self.file_io().exists(path)
+
+ def delete(self, path: str, recursive: bool = False) -> bool:
+ return self.file_io().delete(path, recursive)
+
+ def mkdirs(self, path: str) -> bool:
+ return self.file_io().mkdirs(path)
+
+ def rename(self, src: str, dst: str) -> bool:
+ return self.file_io().rename(src, dst)
+
+ def copy_file(self, source_path: str, target_path: str, overwrite: bool =
False):
+ return self.file_io().copy_file(source_path, target_path, overwrite)
+
+ def to_filesystem_path(self, path: str) -> str:
+ return self.file_io().to_filesystem_path(path)
+
+ def try_to_write_atomic(self, path: str, content: str) -> bool:
+ return self.file_io().try_to_write_atomic(path, content)
+
+ def write_parquet(self, path: str, data, compression: str = 'zstd',
+ zstd_level: int = 1, **kwargs):
+ return self.file_io().write_parquet(path, data, compression,
zstd_level, **kwargs)
+
+ def write_orc(self, path: str, data, compression: str = 'zstd',
+ zstd_level: int = 1, **kwargs):
+ return self.file_io().write_orc(path, data, compression, zstd_level,
**kwargs)
+
+ def write_avro(self, path: str, data, avro_schema=None,
+ compression: str = 'zstd', zstd_level: int = 1, **kwargs):
+ return self.file_io().write_avro(path, data, avro_schema, compression,
zstd_level, **kwargs)
+
+ def write_lance(self, path: str, data, **kwargs):
+ return self.file_io().write_lance(path, data, **kwargs)
+
+ def write_blob(self, path: str, data, blob_as_descriptor: bool, **kwargs):
+ return self.file_io().write_blob(path, data, blob_as_descriptor,
**kwargs)
+
+ @property
+ def uri_reader_factory(self):
+ if self._uri_reader_factory_cache is None:
+ catalog_options = self.catalog_options or Options({})
+ self._uri_reader_factory_cache = UriReaderFactory(catalog_options)
+
+ return self._uri_reader_factory_cache
+
+ @property
+ def filesystem(self):
+ return self.file_io().filesystem
def try_to_refresh_token(self):
if self.should_refresh():
@@ -111,3 +205,12 @@ class RESTTokenFileIO(FileIO):
def valid_token(self):
self.try_to_refresh_token()
return self.token
+
+ def close(self):
+ with self.lock:
+ for file_io in self._file_io_cache.values():
+ try:
+ file_io.close()
+ except Exception as e:
+ self.log.warning(f"Error closing cached FileIO: {e}")
+ self._file_io_cache.clear()
diff --git a/paimon-python/pypaimon/common/file_io.py
b/paimon-python/pypaimon/common/file_io.py
index 3e039248b7..536e06c0b1 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -16,298 +16,78 @@
# limitations under the License.
################################################################################
import logging
-import os
-import subprocess
import uuid
+from abc import ABC, abstractmethod
from pathlib import Path
-from typing import Any, Dict, List, Optional
-from urllib.parse import splitport, urlparse
+from typing import List, Optional
import pyarrow
-from packaging.version import parse
-from pyarrow._fs import FileSystem
from pypaimon.common.options import Options
-from pypaimon.common.options.config import OssOptions, S3Options
-from pypaimon.common.uri_reader import UriReaderFactory
-from pypaimon.filesystem.local import PaimonLocalFileSystem
-from pypaimon.schema.data_types import DataField, AtomicType,
PyarrowFieldParser
-from pypaimon.table.row.blob import BlobData, BlobDescriptor, Blob
-from pypaimon.table.row.generic_row import GenericRow
-from pypaimon.table.row.row_kind import RowKind
-from pypaimon.write.blob_format_writer import BlobFormatWriter
-
-
-class FileIO:
- def __init__(self, path: str, catalog_options: Options):
- self.properties = catalog_options
- self.logger = logging.getLogger(__name__)
- scheme, netloc, _ = self.parse_location(path)
- self.uri_reader_factory = UriReaderFactory(catalog_options)
- if scheme in {"oss"}:
- self.filesystem = self._initialize_oss_fs(path)
- elif scheme in {"s3", "s3a", "s3n"}:
- self.filesystem = self._initialize_s3_fs()
- elif scheme in {"hdfs", "viewfs"}:
- self.filesystem = self._initialize_hdfs_fs(scheme, netloc)
- elif scheme in {"file"}:
- self.filesystem = self._initialize_local_fs()
- else:
- raise ValueError(f"Unrecognized filesystem type in URI: {scheme}")
-
- @staticmethod
- def parse_location(location: str):
- uri = urlparse(location)
- if not uri.scheme:
- return "file", uri.netloc, os.path.abspath(location)
- elif uri.scheme in ("hdfs", "viewfs"):
- return uri.scheme, uri.netloc, uri.path
- else:
- return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}"
-
- @staticmethod
- def _create_s3_retry_config(
- max_attempts: int = 10,
- request_timeout: int = 60,
- connect_timeout: int = 60
- ) -> Dict[str, Any]:
- """
- AwsStandardS3RetryStrategy and timeout parameters are only available
- in PyArrow >= 8.0.0.
- """
- if parse(pyarrow.__version__) >= parse("8.0.0"):
- config = {
- 'request_timeout': request_timeout,
- 'connect_timeout': connect_timeout
- }
- try:
- from pyarrow.fs import AwsStandardS3RetryStrategy
- retry_strategy =
AwsStandardS3RetryStrategy(max_attempts=max_attempts)
- config['retry_strategy'] = retry_strategy
- except ImportError:
- pass
- return config
- else:
- return {}
-
- def _extract_oss_bucket(self, location) -> str:
- uri = urlparse(location)
- if uri.scheme and uri.scheme != "oss":
- raise ValueError("Not an OSS URI: {}".format(location))
-
- netloc = uri.netloc or ""
- # parse oss://access_id:secret_key@Endpoint/bucket/path/to/object
- if (getattr(uri, "username", None) or getattr(uri, "password", None))
or ("@" in netloc):
- first_segment = uri.path.lstrip("/").split("/", 1)[0]
- if not first_segment:
- raise ValueError("Invalid OSS URI without bucket:
{}".format(location))
- return first_segment
-
- # parse oss://bucket/... or oss://bucket.endpoint/...
- host = getattr(uri, "hostname", None) or netloc
- if not host:
- raise ValueError("Invalid OSS URI without host:
{}".format(location))
- bucket = host.split(".", 1)[0]
- if not bucket:
- raise ValueError("Invalid OSS URI without bucket:
{}".format(location))
- return bucket
-
- def _initialize_oss_fs(self, path) -> FileSystem:
- from pyarrow.fs import S3FileSystem
-
- client_kwargs = {
- "access_key": self.properties.get(OssOptions.OSS_ACCESS_KEY_ID),
- "secret_key":
self.properties.get(OssOptions.OSS_ACCESS_KEY_SECRET),
- "session_token":
self.properties.get(OssOptions.OSS_SECURITY_TOKEN),
- "region": self.properties.get(OssOptions.OSS_REGION),
- }
-
- # Based on https://github.com/apache/arrow/issues/40506
- if parse(pyarrow.__version__) >= parse("7.0.0"):
- client_kwargs['force_virtual_addressing'] = True
- client_kwargs['endpoint_override'] =
self.properties.get(OssOptions.OSS_ENDPOINT)
- else:
- oss_bucket = self._extract_oss_bucket(path)
- client_kwargs['endpoint_override'] = (oss_bucket + "." +
-
self.properties.get(OssOptions.OSS_ENDPOINT))
-
- retry_config = self._create_s3_retry_config()
- client_kwargs.update(retry_config)
- return S3FileSystem(**client_kwargs)
-
- def _initialize_s3_fs(self) -> FileSystem:
- from pyarrow.fs import S3FileSystem
-
- client_kwargs = {
- "endpoint_override": self.properties.get(S3Options.S3_ENDPOINT),
- "access_key": self.properties.get(S3Options.S3_ACCESS_KEY_ID),
- "secret_key": self.properties.get(S3Options.S3_ACCESS_KEY_SECRET),
- "session_token": self.properties.get(S3Options.S3_SECURITY_TOKEN),
- "region": self.properties.get(S3Options.S3_REGION),
- "force_virtual_addressing": True,
- }
-
- retry_config = self._create_s3_retry_config()
- client_kwargs.update(retry_config)
-
- return S3FileSystem(**client_kwargs)
-
- def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) ->
FileSystem:
- from pyarrow.fs import HadoopFileSystem
-
- if 'HADOOP_HOME' not in os.environ:
- raise RuntimeError("HADOOP_HOME environment variable is not set.")
- if 'HADOOP_CONF_DIR' not in os.environ:
- raise RuntimeError("HADOOP_CONF_DIR environment variable is not
set.")
-
- hadoop_home = os.environ.get("HADOOP_HOME")
- native_lib_path = f"{hadoop_home}/lib/native"
- os.environ['LD_LIBRARY_PATH'] =
f"{native_lib_path}:{os.environ.get('LD_LIBRARY_PATH', '')}"
-
- class_paths = subprocess.run(
- [f'{hadoop_home}/bin/hadoop', 'classpath', '--glob'],
- capture_output=True,
- text=True,
- check=True
- )
- os.environ['CLASSPATH'] = class_paths.stdout.strip()
-
- host, port_str = splitport(netloc)
- return HadoopFileSystem(
- host=host,
- port=int(port_str),
- user=os.environ.get('HADOOP_USER_NAME', 'hadoop')
- )
-
- def _initialize_local_fs(self) -> FileSystem:
-
- return PaimonLocalFileSystem()
+class FileIO(ABC):
+ """
+ File IO interface to read and write files.
+ """
+
+ @abstractmethod
def new_input_stream(self, path: str):
- path_str = self.to_filesystem_path(path)
- return self.filesystem.open_input_file(path_str)
+ pass
+ @abstractmethod
def new_output_stream(self, path: str):
- path_str = self.to_filesystem_path(path)
- parent_dir = Path(path_str).parent
- if str(parent_dir) and not self.exists(str(parent_dir)):
- self.mkdirs(str(parent_dir))
-
- return self.filesystem.open_output_stream(path_str)
-
+ pass
+
+ @abstractmethod
def get_file_status(self, path: str):
- path_str = self.to_filesystem_path(path)
- file_infos = self.filesystem.get_file_info([path_str])
- file_info = file_infos[0]
-
- if file_info.type == pyarrow.fs.FileType.NotFound:
- raise FileNotFoundError(f"File {path} (resolved as {path_str})
does not exist")
-
- return file_info
-
+ pass
+
+ @abstractmethod
def list_status(self, path: str):
- path_str = self.to_filesystem_path(path)
- selector = pyarrow.fs.FileSelector(path_str, recursive=False,
allow_not_found=True)
- return self.filesystem.get_file_info(selector)
-
- def list_directories(self, path: str):
- file_infos = self.list_status(path)
- return [info for info in file_infos if info.type ==
pyarrow.fs.FileType.Directory]
+ pass
+ @abstractmethod
def exists(self, path: str) -> bool:
- path_str = self.to_filesystem_path(path)
- file_info = self.filesystem.get_file_info([path_str])[0]
- return file_info.type != pyarrow.fs.FileType.NotFound
+ pass
+ @abstractmethod
def delete(self, path: str, recursive: bool = False) -> bool:
- path_str = self.to_filesystem_path(path)
- file_info = self.filesystem.get_file_info([path_str])[0]
-
- if file_info.type == pyarrow.fs.FileType.NotFound:
- return False
-
- if file_info.type == pyarrow.fs.FileType.Directory:
- if not recursive:
- selector = pyarrow.fs.FileSelector(path_str, recursive=False,
allow_not_found=True)
- dir_contents = self.filesystem.get_file_info(selector)
- if len(dir_contents) > 0:
- raise OSError(f"Directory {path} is not empty")
- if recursive:
- self.filesystem.delete_dir_contents(path_str)
- self.filesystem.delete_dir(path_str)
- else:
- self.filesystem.delete_dir(path_str)
- else:
- self.filesystem.delete_file(path_str)
- return True
-
+ pass
+
+ @abstractmethod
def mkdirs(self, path: str) -> bool:
- path_str = self.to_filesystem_path(path)
- file_info = self.filesystem.get_file_info([path_str])[0]
-
- if file_info.type == pyarrow.fs.FileType.Directory:
- return True
- elif file_info.type == pyarrow.fs.FileType.File:
- raise FileExistsError(f"Path exists but is not a directory:
{path}")
-
- self.filesystem.create_dir(path_str, recursive=True)
- return True
-
+ pass
+
+ @abstractmethod
def rename(self, src: str, dst: str) -> bool:
- dst_str = self.to_filesystem_path(dst)
- dst_parent = Path(dst_str).parent
- if str(dst_parent) and not self.exists(str(dst_parent)):
- self.mkdirs(str(dst_parent))
-
- src_str = self.to_filesystem_path(src)
-
- try:
- if hasattr(self.filesystem, 'rename'):
- return self.filesystem.rename(src_str, dst_str)
-
- dst_file_info = self.filesystem.get_file_info([dst_str])[0]
- if dst_file_info.type != pyarrow.fs.FileType.NotFound:
- if dst_file_info.type == pyarrow.fs.FileType.File:
- return False
- # Make it compatible with HadoopFileIO: if dst is an existing
directory,
- # dst=dst/srcFileName
- src_name = Path(src_str).name
- dst_str = str(Path(dst_str) / src_name)
- final_dst_info = self.filesystem.get_file_info([dst_str])[0]
- if final_dst_info.type != pyarrow.fs.FileType.NotFound:
- return False
-
- self.filesystem.move(src_str, dst_str)
- return True
- except FileNotFoundError:
- return False
- except (PermissionError, OSError):
- return False
-
+ pass
+
def delete_quietly(self, path: str):
- if self.logger.isEnabledFor(logging.DEBUG):
- self.logger.debug(f"Ready to delete {path}")
+ logger = logging.getLogger(__name__)
+ if logger.isEnabledFor(logging.DEBUG):
+ logger.debug(f"Ready to delete {path}")
try:
if not self.delete(path, False) and self.exists(path):
- self.logger.warning(f"Failed to delete file {path}")
+ logger.warning(f"Failed to delete file {path}")
except Exception:
- self.logger.warning(f"Exception occurs when deleting file {path}",
exc_info=True)
+ logger.warning(f"Exception occurs when deleting file {path}",
exc_info=True)
def delete_files_quietly(self, files: List[str]):
for file_path in files:
self.delete_quietly(file_path)
def delete_directory_quietly(self, directory: str):
- if self.logger.isEnabledFor(logging.DEBUG):
- self.logger.debug(f"Ready to delete {directory}")
+ logger = logging.getLogger(__name__)
+ if logger.isEnabledFor(logging.DEBUG):
+ logger.debug(f"Ready to delete {directory}")
try:
if not self.delete(directory, True) and self.exists(directory):
- self.logger.warning(f"Failed to delete directory {directory}")
+ logger.warning(f"Failed to delete directory {directory}")
except Exception:
- self.logger.warning(f"Exception occurs when deleting directory
{directory}", exc_info=True)
+ logger.warning(f"Exception occurs when deleting directory
{directory}", exc_info=True)
def get_file_size(self, path: str) -> int:
file_info = self.get_file_status(path)
@@ -332,9 +112,7 @@ class FileIO:
def try_to_write_atomic(self, path: str, content: str) -> bool:
if self.exists(path):
- path_str = self.to_filesystem_path(path)
- file_info = self.filesystem.get_file_info([path_str])[0]
- if file_info.type == pyarrow.fs.FileType.Directory:
+ if self.is_dir(path):
return False
temp_path = path + str(uuid.uuid4()) + ".tmp"
@@ -345,7 +123,7 @@ class FileIO:
finally:
if not success:
self.delete_quietly(temp_path)
- return success
+ return success
def write_file(self, path: str, content: str, overwrite: bool = False):
if not overwrite and self.exists(path):
@@ -362,14 +140,15 @@ class FileIO:
if not overwrite and self.exists(target_path):
raise FileExistsError(f"Target file {target_path} already exists
and overwrite=False")
- source_str = self.to_filesystem_path(source_path)
target_str = self.to_filesystem_path(target_path)
target_parent = Path(target_str).parent
if str(target_parent) and not self.exists(str(target_parent)):
self.mkdirs(str(target_parent))
- self.filesystem.copy_file(source_str, target_str)
+ with self.new_input_stream(source_path) as input_stream:
+ with self.new_output_stream(target_path) as output_stream:
+ output_stream.write(input_stream.read())
def copy_files(self, source_directory: str, target_directory: str,
overwrite: bool = False):
file_infos = self.list_status(source_directory)
@@ -407,196 +186,58 @@ class FileIO:
return None
- def write_parquet(self, path: str, data: pyarrow.Table, compression: str =
'zstd',
- zstd_level: int = 1, **kwargs):
- try:
- import pyarrow.parquet as pq
+ def to_filesystem_path(self, path: str) -> str:
+ return path
- with self.new_output_stream(path) as output_stream:
- if compression.lower() == 'zstd':
- kwargs['compression_level'] = zstd_level
- pq.write_table(data, output_stream, compression=compression,
**kwargs)
+ def parse_location(self, location: str):
+ from urllib.parse import urlparse
+ import os
- except Exception as e:
- self.delete_quietly(path)
- raise RuntimeError(f"Failed to write Parquet file {path}: {e}")
from e
+ uri = urlparse(location)
+ if not uri.scheme:
+ return "file", uri.netloc, os.path.abspath(location)
+ elif uri.scheme in ("hdfs", "viewfs"):
+ return uri.scheme, uri.netloc, uri.path
+ else:
+ return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}"
- def write_orc(self, path: str, data: pyarrow.Table, compression: str =
'zstd',
+ def write_parquet(self, path: str, data, compression: str = 'zstd',
+ zstd_level: int = 1, **kwargs):
+ raise NotImplementedError("write_parquet must be implemented by FileIO
subclasses")
+
+ def write_orc(self, path: str, data, compression: str = 'zstd',
zstd_level: int = 1, **kwargs):
- try:
- """Write ORC file using PyArrow ORC writer.
-
- Note: PyArrow's ORC writer doesn't support compression_level
parameter.
- ORC files will use zstd compression with default level
- (which is 3, see
https://github.com/facebook/zstd/blob/dev/programs/zstdcli.c)
- instead of the specified level.
- """
- import sys
- import pyarrow.orc as orc
-
- with self.new_output_stream(path) as output_stream:
- # Check Python version - if 3.6, don't use compression
parameter
- if sys.version_info[:2] == (3, 6):
- orc.write_table(data, output_stream, **kwargs)
- else:
- orc.write_table(
- data,
- output_stream,
- compression=compression,
- **kwargs
- )
-
- except Exception as e:
- self.delete_quietly(path)
- raise RuntimeError(f"Failed to write ORC file {path}: {e}") from e
-
- def write_avro(
- self, path: str, data: pyarrow.Table,
- avro_schema: Optional[Dict[str, Any]] = None,
- compression: str = 'zstd', zstd_level: int = 1, **kwargs):
- import fastavro
- if avro_schema is None:
- from pypaimon.schema.data_types import PyarrowFieldParser
- avro_schema = PyarrowFieldParser.to_avro_schema(data.schema)
-
- records_dict = data.to_pydict()
-
- def record_generator():
- num_rows = len(list(records_dict.values())[0])
- for i in range(num_rows):
- yield {col: records_dict[col][i] for col in
records_dict.keys()}
-
- records = record_generator()
-
- codec_map = {
- 'null': 'null',
- 'deflate': 'deflate',
- 'snappy': 'snappy',
- 'bzip2': 'bzip2',
- 'xz': 'xz',
- 'zstandard': 'zstandard',
- 'zstd': 'zstandard', # zstd is commonly used in Paimon
- }
- compression_lower = compression.lower()
+ raise NotImplementedError("write_orc must be implemented by FileIO
subclasses")
+
+ def write_avro(self, path: str, data, avro_schema=None,
+ compression: str = 'zstd', zstd_level: int = 1, **kwargs):
+ raise NotImplementedError("write_avro must be implemented by FileIO
subclasses")
+
+ def write_lance(self, path: str, data, **kwargs):
+ raise NotImplementedError("write_lance must be implemented by FileIO
subclasses")
+
+ def write_blob(self, path: str, data, blob_as_descriptor: bool, **kwargs):
+ """Write Blob format file. Must be implemented by subclasses."""
+ raise NotImplementedError("write_blob must be implemented by FileIO
subclasses")
+
+ def close(self):
+ pass
+
+ @staticmethod
+ def get(path: str, catalog_options: Optional[Options] = None) -> 'FileIO':
+ """
+ Returns a FileIO instance for accessing the file system identified by
the given path.
+ - LocalFileIO for local file system (file:// or no scheme)
+ - PyArrowFileIO for remote file systems (oss://, s3://, hdfs://, etc.)
+ """
+ from urllib.parse import urlparse
- codec = codec_map.get(compression_lower)
- if codec is None:
- raise ValueError(
- f"Unsupported compression '{compression}' for Avro format. "
- f"Supported compressions: {',
'.join(sorted(codec_map.keys()))}."
- )
-
- with self.new_output_stream(path) as output_stream:
- if codec == 'zstandard':
- kwargs['codec_compression_level'] = zstd_level
- fastavro.writer(output_stream, avro_schema, records, codec=codec,
**kwargs)
-
- def write_lance(self, path: str, data: pyarrow.Table, **kwargs):
- try:
- import lance
- from pypaimon.read.reader.lance_utils import to_lance_specified
- file_path_for_lance, storage_options = to_lance_specified(self,
path)
-
- writer = lance.file.LanceFileWriter(
- file_path_for_lance, data.schema,
storage_options=storage_options, **kwargs)
- try:
- # Write all batches
- for batch in data.to_batches():
- writer.write_batch(batch)
- finally:
- writer.close()
- except Exception as e:
- self.delete_quietly(path)
- raise RuntimeError(f"Failed to write Lance file {path}: {e}") from
e
-
- def write_blob(self, path: str, data: pyarrow.Table, blob_as_descriptor:
bool, **kwargs):
- try:
- # Validate input constraints
- if data.num_columns != 1:
- raise RuntimeError(f"Blob format only supports a single
column, got {data.num_columns} columns")
- # Check for null values
- column = data.column(0)
- if column.null_count > 0:
- raise RuntimeError("Blob format does not support null values")
- # Convert PyArrow schema to Paimon DataFields
- # For blob files, we expect exactly one blob column
- field = data.schema[0]
- if pyarrow.types.is_large_binary(field.type):
- fields = [DataField(0, field.name, AtomicType("BLOB"))]
- else:
- # Convert other types as needed
- paimon_type = PyarrowFieldParser.to_paimon_type(field.type,
field.nullable)
- fields = [DataField(0, field.name, paimon_type)]
- # Convert PyArrow Table to records
- records_dict = data.to_pydict()
- num_rows = data.num_rows
- field_name = fields[0].name
- with self.new_output_stream(path) as output_stream:
- writer = BlobFormatWriter(output_stream)
- # Write each row
- for i in range(num_rows):
- col_data = records_dict[field_name][i]
- # Convert to appropriate type based on field type
- if hasattr(fields[0].type, 'type') and fields[0].type.type
== "BLOB":
- if blob_as_descriptor:
- blob_descriptor =
BlobDescriptor.deserialize(col_data)
- uri_reader =
self.uri_reader_factory.create(blob_descriptor.uri)
- blob_data = Blob.from_descriptor(uri_reader,
blob_descriptor)
- elif isinstance(col_data, bytes):
- blob_data = BlobData(col_data)
- else:
- # Convert to bytes if needed
- if hasattr(col_data, 'as_py'):
- col_data = col_data.as_py()
- if isinstance(col_data, str):
- col_data = col_data.encode('utf-8')
- blob_data = BlobData(col_data)
- row_values = [blob_data]
- else:
- row_values = [col_data]
- # Create GenericRow and write
- row = GenericRow(row_values, fields, RowKind.INSERT)
- writer.add_element(row)
- writer.close()
-
- except Exception as e:
- self.delete_quietly(path)
- raise RuntimeError(f"Failed to write blob file {path}: {e}") from e
-
- def to_filesystem_path(self, path: str) -> str:
- from pyarrow.fs import S3FileSystem
- import re
-
- parsed = urlparse(path)
- normalized_path = re.sub(r'/+', '/', parsed.path) if parsed.path else
''
-
- if parsed.scheme and len(parsed.scheme) == 1 and not parsed.netloc:
- # This is likely a Windows drive letter, not a URI scheme
- return str(path)
-
- if parsed.scheme == 'file' and parsed.netloc and
parsed.netloc.endswith(':'):
- # file://C:/path format - netloc is 'C:', need to reconstruct path
with drive letter
- drive_letter = parsed.netloc.rstrip(':')
- path_part = normalized_path.lstrip('/')
- return f"{drive_letter}:/{path_part}" if path_part else
f"{drive_letter}:"
-
- if isinstance(self.filesystem, S3FileSystem):
- # For S3, return "bucket/path" format
- if parsed.scheme:
- if parsed.netloc:
- # Has netloc (bucket): return "bucket/path" format
- path_part = normalized_path.lstrip('/')
- return f"{parsed.netloc}/{path_part}" if path_part else
parsed.netloc
- else:
- # Has scheme but no netloc: return path without scheme
- result = normalized_path.lstrip('/')
- return result if result else '.'
- return str(path)
-
- if parsed.scheme:
- # Handle empty path: return '.' for current directory
- if not normalized_path:
- return '.'
- return normalized_path
-
- return str(path)
+ uri = urlparse(path)
+ scheme = uri.scheme
+
+ if not scheme or scheme == "file":
+ from pypaimon.filesystem.local_file_io import LocalFileIO
+ return LocalFileIO(path, catalog_options)
+
+ from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO
+ return PyArrowFileIO(path, catalog_options or Options({}))
diff --git a/paimon-python/pypaimon/common/uri_reader.py
b/paimon-python/pypaimon/common/uri_reader.py
index b05ce77253..50718f0849 100644
--- a/paimon-python/pypaimon/common/uri_reader.py
+++ b/paimon-python/pypaimon/common/uri_reader.py
@@ -148,7 +148,7 @@ class UriReaderFactory:
# Import FileIO here to avoid circular imports
from pypaimon.common.file_io import FileIO
uri_string = parsed_uri.geturl()
- file_io = FileIO(uri_string, self.catalog_options)
+ file_io = FileIO.get(uri_string, self.catalog_options)
return UriReader.from_file(file_io)
except Exception as e:
raise RuntimeError(f"Failed to create reader for URI
{parsed_uri.geturl()}") from e
diff --git a/paimon-python/pypaimon/filesystem/local_file_io.py
b/paimon-python/pypaimon/filesystem/local_file_io.py
new file mode 100644
index 0000000000..183eb0ea13
--- /dev/null
+++ b/paimon-python/pypaimon/filesystem/local_file_io.py
@@ -0,0 +1,439 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import logging
+import os
+import shutil
+import threading
+import uuid
+from pathlib import Path
+from typing import Any, Dict, Optional
+from urllib.parse import urlparse
+
+import pyarrow
+import pyarrow.fs
+
+from pypaimon.common.file_io import FileIO
+from pypaimon.common.options import Options
+from pypaimon.common.uri_reader import UriReaderFactory
+from pypaimon.filesystem.local import PaimonLocalFileSystem
+from pypaimon.schema.data_types import DataField, AtomicType,
PyarrowFieldParser
+from pypaimon.table.row.blob import BlobData, BlobDescriptor, Blob
+from pypaimon.table.row.generic_row import GenericRow
+from pypaimon.table.row.row_kind import RowKind
+from pypaimon.write.blob_format_writer import BlobFormatWriter
+
+
+class LocalFileIO(FileIO):
+ """
+ Local file system implementation of FileIO.
+ """
+
+ RENAME_LOCK = threading.Lock()
+
+ def __init__(self, path: str = None, catalog_options: Optional[Options] =
None):
+ self.logger = logging.getLogger(__name__)
+ self.path = path
+ self.properties = catalog_options or Options({})
+ self.filesystem = PaimonLocalFileSystem()
+ self.uri_reader_factory = UriReaderFactory(self.properties)
+
+ @staticmethod
+ def create():
+ return LocalFileIO()
+
+ def _to_file(self, path: str) -> Path:
+ parsed = urlparse(path)
+
+ if parsed.scheme and len(parsed.scheme) == 1 and not parsed.netloc:
+ return Path(path)
+
+ if parsed.scheme == 'file' and parsed.netloc and
parsed.netloc.endswith(':'):
+ drive_letter = parsed.netloc.rstrip(':')
+ path_part = parsed.path.lstrip('/') if parsed.path else ''
+ if path_part:
+ return Path(f"{drive_letter}:/{path_part}")
+ else:
+ return Path(f"{drive_letter}:")
+
+ local_path = parsed.path if parsed.scheme else path
+
+ if not local_path:
+ return Path(".")
+
+ return Path(local_path)
+
+ def new_input_stream(self, path: str):
+ if self.logger.isEnabledFor(logging.DEBUG):
+ self.logger.debug(f"Invoking new_input_stream for {path}")
+
+ file_path = self._to_file(path)
+ if not file_path.exists():
+ raise FileNotFoundError(f"File {path} does not exist")
+
+ return open(file_path, 'rb')
+
+ def new_output_stream(self, path: str):
+ if self.logger.isEnabledFor(logging.DEBUG):
+ self.logger.debug(f"Invoking new_output_stream for {path}")
+
+ file_path = self._to_file(path)
+ # Create parent directories if needed
+ parent = file_path.parent
+ if parent and not parent.exists():
+ parent.mkdir(parents=True, exist_ok=True)
+
+ return open(file_path, 'wb')
+
+ def get_file_status(self, path: str):
+ if self.logger.isEnabledFor(logging.DEBUG):
+ self.logger.debug(f"Invoking get_file_status for {path}")
+
+ file_path = self._to_file(path)
+ if not file_path.exists():
+ import getpass
+ user = getpass.getuser()
+ raise FileNotFoundError(
+ f"File {path} does not exist or the user running "
+ f"Paimon ('{user}') has insufficient permissions to access it."
+ )
+
+ class LocalFileStatus:
+ def __init__(self, file_path: Path, original_path: str):
+ stat_info = file_path.stat()
+ self.path = str(file_path.absolute())
+ self.original_path = original_path
+ self.size = stat_info.st_size if file_path.is_file() else None
+ self.type = (
+ pyarrow.fs.FileType.Directory if file_path.is_dir()
+ else pyarrow.fs.FileType.File if file_path.is_file()
+ else pyarrow.fs.FileType.NotFound
+ )
+ self.mtime = stat_info.st_mtime
+
+ return LocalFileStatus(file_path, path)
+
+ def list_status(self, path: str):
+ if self.logger.isEnabledFor(logging.DEBUG):
+ self.logger.debug(f"Invoking list_status for {path}")
+
+ file_path = self._to_file(path)
+ results = []
+
+ if not file_path.exists():
+ return results
+
+ if file_path.is_file():
+ results.append(self.get_file_status(path))
+ elif file_path.is_dir():
+ try:
+ for item in file_path.iterdir():
+ try:
+ if path.startswith('file://'):
+ item_path = f"file://{item}"
+ else:
+ item_path = str(item)
+ results.append(self.get_file_status(item_path))
+ except FileNotFoundError:
+ pass
+ except PermissionError:
+ pass
+
+ return results
+
+ def exists(self, path: str) -> bool:
+ if self.logger.isEnabledFor(logging.DEBUG):
+ self.logger.debug(f"Invoking exists for {path}")
+
+ file_path = self._to_file(path)
+ return file_path.exists()
+
+ def delete(self, path: str, recursive: bool = False) -> bool:
+ if self.logger.isEnabledFor(logging.DEBUG):
+ self.logger.debug(f"Invoking delete for {path}")
+
+ file_path = self._to_file(path)
+
+ if not file_path.exists():
+ return False
+
+ if file_path.is_file():
+ file_path.unlink()
+ return True
+ elif file_path.is_dir():
+ if not recursive:
+ try:
+ items = list(file_path.iterdir())
+ if items:
+ raise OSError(f"Directory {path} is not empty")
+ except PermissionError:
+ raise OSError(
+ f"Directory {path} does not exist or an I/O error
occurred"
+ )
+ file_path.rmdir()
+ else:
+ shutil.rmtree(file_path)
+ return True
+
+ return False
+
+ def mkdirs(self, path: str) -> bool:
+ if self.logger.isEnabledFor(logging.DEBUG):
+ self.logger.debug(f"Invoking mkdirs for {path}")
+
+ file_path = self._to_file(path)
+
+ if file_path.is_dir():
+ return True
+ elif file_path.exists() and not file_path.is_dir():
+ raise FileExistsError(f"Path exists but is not a directory:
{path}")
+
+ file_path.mkdir(parents=True, exist_ok=True)
+ return True
+
+ def rename(self, src: str, dst: str) -> bool:
+ if self.logger.isEnabledFor(logging.DEBUG):
+ self.logger.debug(f"Invoking rename for {src} to {dst}")
+
+ src_file = self._to_file(src)
+ dst_file = self._to_file(dst)
+
+ dst_parent = dst_file.parent
+ if dst_parent and not dst_parent.exists():
+ dst_parent.mkdir(parents=True, exist_ok=True)
+
+ try:
+ with LocalFileIO.RENAME_LOCK:
+ if dst_file.exists():
+ if dst_file.is_file():
+ return False
+ # Make it compatible with HadoopFileIO: if dst is an
existing directory,
+ # dst=dst/srcFileName
+ dst_file = dst_file / src_file.name
+ if dst_file.exists():
+ return False
+
+ # Perform atomic move
+ src_file.rename(dst_file)
+ return True
+ except FileNotFoundError:
+ return False
+ except (PermissionError, OSError):
+ return False
+
+ def try_to_write_atomic(self, path: str, content: str) -> bool:
+ file_path = self._to_file(path)
+ if file_path.exists() and file_path.is_dir():
+ return False
+
+ parent = file_path.parent
+ if parent and not parent.exists():
+ parent.mkdir(parents=True, exist_ok=True)
+
+ temp_path = file_path.parent / f"{file_path.name}.{uuid.uuid4()}.tmp"
+ success = False
+ try:
+ with open(temp_path, 'w', encoding='utf-8') as f:
+ f.write(content)
+ success = self.rename(str(temp_path), path)
+ finally:
+ if not success and temp_path.exists():
+ self.delete_quietly(str(temp_path))
+ return success
+
+ def copy_file(self, source_path: str, target_path: str, overwrite: bool =
False):
+ if not overwrite and self.exists(target_path):
+ raise FileExistsError(f"Target file {target_path} already exists
and overwrite=False")
+
+ source_file = self._to_file(source_path)
+ target_file = self._to_file(target_path)
+
+ target_parent = target_file.parent
+ if target_parent and not target_parent.exists():
+ target_parent.mkdir(parents=True, exist_ok=True)
+
+ shutil.copy2(source_file, target_file)
+
+ def to_filesystem_path(self, path: str) -> str:
+ file_path = self._to_file(path)
+ result = str(file_path)
+ parsed = urlparse(path)
+ original_path = parsed.path if parsed.scheme else path
+ if original_path.startswith('./') and not result.startswith('./'):
+ result = './' + result
+ return result
+
+ @staticmethod
+ def parse_location(location: str):
+ uri = urlparse(location)
+ if not uri.scheme:
+ return "file", uri.netloc, os.path.abspath(location)
+ elif uri.scheme == "file":
+ return "file", uri.netloc, uri.path
+ else:
+ raise ValueError(f"LocalFileIO only supports file:// scheme, got
{uri.scheme}")
+
+ def write_parquet(self, path: str, data: pyarrow.Table, compression: str =
'zstd',
+ zstd_level: int = 1, **kwargs):
+ try:
+ import pyarrow.parquet as pq
+
+ file_path = self._to_file(path)
+ parent = file_path.parent
+ if parent and not parent.exists():
+ parent.mkdir(parents=True, exist_ok=True)
+
+ with open(file_path, 'wb') as f:
+ if compression.lower() == 'zstd':
+ kwargs['compression_level'] = zstd_level
+ pq.write_table(data, f, compression=compression, **kwargs)
+ except Exception as e:
+ self.delete_quietly(path)
+ raise RuntimeError(f"Failed to write Parquet file {path}: {e}")
from e
+
+ def write_orc(self, path: str, data: pyarrow.Table, compression: str =
'zstd',
+ zstd_level: int = 1, **kwargs):
+ try:
+ import sys
+ import pyarrow.orc as orc
+
+ file_path = self._to_file(path)
+ parent = file_path.parent
+ if parent and not parent.exists():
+ parent.mkdir(parents=True, exist_ok=True)
+
+ with open(file_path, 'wb') as f:
+ if sys.version_info[:2] == (3, 6):
+ orc.write_table(data, f, **kwargs)
+ else:
+ orc.write_table(data, f, compression=compression, **kwargs)
+ except Exception as e:
+ self.delete_quietly(path)
+ raise RuntimeError(f"Failed to write ORC file {path}: {e}") from e
+
+ def write_avro(self, path: str, data: pyarrow.Table,
+ avro_schema: Optional[Dict[str, Any]] = None,
+ compression: str = 'zstd', zstd_level: int = 1, **kwargs):
+ import fastavro
+ if avro_schema is None:
+ avro_schema = PyarrowFieldParser.to_avro_schema(data.schema)
+
+ records_dict = data.to_pydict()
+
+ def record_generator():
+ num_rows = len(list(records_dict.values())[0])
+ for i in range(num_rows):
+ yield {col: records_dict[col][i] for col in
records_dict.keys()}
+
+ records = record_generator()
+
+ codec_map = {
+ 'null': 'null',
+ 'deflate': 'deflate',
+ 'snappy': 'snappy',
+ 'bzip2': 'bzip2',
+ 'xz': 'xz',
+ 'zstandard': 'zstandard',
+ 'zstd': 'zstandard',
+ }
+ compression_lower = compression.lower()
+
+ codec = codec_map.get(compression_lower)
+ if codec is None:
+ raise ValueError(
+ f"Unsupported compression '{compression}' for Avro format. "
+ f"Supported compressions: {',
'.join(sorted(codec_map.keys()))}."
+ )
+
+ file_path = self._to_file(path)
+ parent = file_path.parent
+ if parent and not parent.exists():
+ parent.mkdir(parents=True, exist_ok=True)
+
+ with open(file_path, 'wb') as output_stream:
+ if codec == 'zstandard':
+ kwargs['codec_compression_level'] = zstd_level
+ fastavro.writer(output_stream, avro_schema, records, codec=codec,
**kwargs)
+
+ def write_lance(self, path: str, data: pyarrow.Table, **kwargs):
+ try:
+ import lance
+ from pypaimon.read.reader.lance_utils import to_lance_specified
+ file_path_for_lance, storage_options = to_lance_specified(self,
path)
+
+ writer = lance.file.LanceFileWriter(
+ file_path_for_lance, data.schema,
storage_options=storage_options, **kwargs)
+ try:
+ for batch in data.to_batches():
+ writer.write_batch(batch)
+ finally:
+ writer.close()
+ except Exception as e:
+ self.delete_quietly(path)
+ raise RuntimeError(f"Failed to write Lance file {path}: {e}") from
e
+
+ def write_blob(self, path: str, data: pyarrow.Table, blob_as_descriptor:
bool, **kwargs):
+ try:
+ if data.num_columns != 1:
+ raise RuntimeError(f"Blob format only supports a single
column, got {data.num_columns} columns")
+
+ column = data.column(0)
+ if column.null_count > 0:
+ raise RuntimeError("Blob format does not support null values")
+
+ field = data.schema[0]
+ if pyarrow.types.is_large_binary(field.type):
+ fields = [DataField(0, field.name, AtomicType("BLOB"))]
+ else:
+ paimon_type = PyarrowFieldParser.to_paimon_type(field.type,
field.nullable)
+ fields = [DataField(0, field.name, paimon_type)]
+
+ records_dict = data.to_pydict()
+ num_rows = data.num_rows
+ field_name = fields[0].name
+
+ file_path = self._to_file(path)
+ parent = file_path.parent
+ if parent and not parent.exists():
+ parent.mkdir(parents=True, exist_ok=True)
+
+ with open(file_path, 'wb') as output_stream:
+ writer = BlobFormatWriter(output_stream)
+ for i in range(num_rows):
+ col_data = records_dict[field_name][i]
+ if hasattr(fields[0].type, 'type') and fields[0].type.type
== "BLOB":
+ if blob_as_descriptor:
+ blob_descriptor =
BlobDescriptor.deserialize(col_data)
+ uri_reader =
self.uri_reader_factory.create(blob_descriptor.uri)
+ blob_data = Blob.from_descriptor(uri_reader,
blob_descriptor)
+ elif isinstance(col_data, bytes):
+ blob_data = BlobData(col_data)
+ else:
+ if hasattr(col_data, 'as_py'):
+ col_data = col_data.as_py()
+ if isinstance(col_data, str):
+ col_data = col_data.encode('utf-8')
+ blob_data = BlobData(col_data)
+ row_values = [blob_data]
+ else:
+ row_values = [col_data]
+ row = GenericRow(row_values, fields, RowKind.INSERT)
+ writer.add_element(row)
+ writer.close()
+ except Exception as e:
+ self.delete_quietly(path)
+ raise RuntimeError(f"Failed to write blob file {path}: {e}") from e
diff --git a/paimon-python/pypaimon/common/file_io.py
b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
similarity index 83%
copy from paimon-python/pypaimon/common/file_io.py
copy to paimon-python/pypaimon/filesystem/pyarrow_file_io.py
index 3e039248b7..a2b83ed2cc 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
@@ -27,10 +27,10 @@ import pyarrow
from packaging.version import parse
from pyarrow._fs import FileSystem
+from pypaimon.common.file_io import FileIO
from pypaimon.common.options import Options
from pypaimon.common.options.config import OssOptions, S3Options
from pypaimon.common.uri_reader import UriReaderFactory
-from pypaimon.filesystem.local import PaimonLocalFileSystem
from pypaimon.schema.data_types import DataField, AtomicType,
PyarrowFieldParser
from pypaimon.table.row.blob import BlobData, BlobDescriptor, Blob
from pypaimon.table.row.generic_row import GenericRow
@@ -38,7 +38,7 @@ from pypaimon.table.row.row_kind import RowKind
from pypaimon.write.blob_format_writer import BlobFormatWriter
-class FileIO:
+class PyArrowFileIO(FileIO):
def __init__(self, path: str, catalog_options: Options):
self.properties = catalog_options
self.logger = logging.getLogger(__name__)
@@ -50,8 +50,6 @@ class FileIO:
self.filesystem = self._initialize_s3_fs()
elif scheme in {"hdfs", "viewfs"}:
self.filesystem = self._initialize_hdfs_fs(scheme, netloc)
- elif scheme in {"file"}:
- self.filesystem = self._initialize_local_fs()
else:
raise ValueError(f"Unrecognized filesystem type in URI: {scheme}")
@@ -71,10 +69,6 @@ class FileIO:
request_timeout: int = 60,
connect_timeout: int = 60
) -> Dict[str, Any]:
- """
- AwsStandardS3RetryStrategy and timeout parameters are only available
- in PyArrow >= 8.0.0.
- """
if parse(pyarrow.__version__) >= parse("8.0.0"):
config = {
'request_timeout': request_timeout,
@@ -96,14 +90,12 @@ class FileIO:
raise ValueError("Not an OSS URI: {}".format(location))
netloc = uri.netloc or ""
- # parse oss://access_id:secret_key@Endpoint/bucket/path/to/object
if (getattr(uri, "username", None) or getattr(uri, "password", None))
or ("@" in netloc):
first_segment = uri.path.lstrip("/").split("/", 1)[0]
if not first_segment:
raise ValueError("Invalid OSS URI without bucket:
{}".format(location))
return first_segment
- # parse oss://bucket/... or oss://bucket.endpoint/...
host = getattr(uri, "hostname", None) or netloc
if not host:
raise ValueError("Invalid OSS URI without host:
{}".format(location))
@@ -122,7 +114,6 @@ class FileIO:
"region": self.properties.get(OssOptions.OSS_REGION),
}
- # Based on https://github.com/apache/arrow/issues/40506
if parse(pyarrow.__version__) >= parse("7.0.0"):
client_kwargs['force_virtual_addressing'] = True
client_kwargs['endpoint_override'] =
self.properties.get(OssOptions.OSS_ENDPOINT)
@@ -180,10 +171,6 @@ class FileIO:
user=os.environ.get('HADOOP_USER_NAME', 'hadoop')
)
- def _initialize_local_fs(self) -> FileSystem:
-
- return PaimonLocalFileSystem()
-
def new_input_stream(self, path: str):
path_str = self.to_filesystem_path(path)
return self.filesystem.open_input_file(path_str)
@@ -309,27 +296,6 @@ class FileIO:
except Exception:
self.logger.warning(f"Exception occurs when deleting directory
{directory}", exc_info=True)
- def get_file_size(self, path: str) -> int:
- file_info = self.get_file_status(path)
- if file_info.size is None:
- raise ValueError(f"File size not available for {path}")
- return file_info.size
-
- def is_dir(self, path: str) -> bool:
- file_info = self.get_file_status(path)
- return file_info.type == pyarrow.fs.FileType.Directory
-
- def check_or_mkdirs(self, path: str):
- if self.exists(path):
- if not self.is_dir(path):
- raise ValueError(f"The path '{path}' should be a directory.")
- else:
- self.mkdirs(path)
-
- def read_file_utf8(self, path: str) -> str:
- with self.new_input_stream(path) as input_stream:
- return input_stream.read().decode('utf-8')
-
def try_to_write_atomic(self, path: str, content: str) -> bool:
if self.exists(path):
path_str = self.to_filesystem_path(path)
@@ -345,18 +311,7 @@ class FileIO:
finally:
if not success:
self.delete_quietly(temp_path)
- return success
-
- def write_file(self, path: str, content: str, overwrite: bool = False):
- if not overwrite and self.exists(path):
- raise FileExistsError(f"File {path} already exists and
overwrite=False")
-
- with self.new_output_stream(path) as output_stream:
- output_stream.write(content.encode('utf-8'))
-
- def overwrite_file_utf8(self, path: str, content: str):
- with self.new_output_stream(path) as output_stream:
- output_stream.write(content.encode('utf-8'))
+ return success
def copy_file(self, source_path: str, target_path: str, overwrite: bool =
False):
if not overwrite and self.exists(target_path):
@@ -371,42 +326,6 @@ class FileIO:
self.filesystem.copy_file(source_str, target_str)
- def copy_files(self, source_directory: str, target_directory: str,
overwrite: bool = False):
- file_infos = self.list_status(source_directory)
- for file_info in file_infos:
- if file_info.type == pyarrow.fs.FileType.File:
- source_file = file_info.path
- file_name = source_file.split('/')[-1]
- target_file = f"{target_directory.rstrip('/')}/{file_name}" if
target_directory else file_name
- self.copy_file(source_file, target_file, overwrite)
-
- def read_overwritten_file_utf8(self, path: str) -> Optional[str]:
- retry_number = 0
- exception = None
- while retry_number < 5:
- try:
- return self.read_file_utf8(path)
- except FileNotFoundError:
- return None
- except Exception as e:
- if not self.exists(path):
- return None
-
- if
(str(type(e).__name__).endswith("RemoteFileChangedException") or
- (str(e) and "Blocklist for" in str(e) and "has
changed" in str(e))):
- exception = e
- retry_number += 1
- else:
- raise e
-
- if exception:
- if isinstance(exception, Exception):
- raise exception
- else:
- raise RuntimeError(exception)
-
- return None
-
def write_parquet(self, path: str, data: pyarrow.Table, compression: str =
'zstd',
zstd_level: int = 1, **kwargs):
try:
@@ -511,32 +430,24 @@ class FileIO:
def write_blob(self, path: str, data: pyarrow.Table, blob_as_descriptor:
bool, **kwargs):
try:
- # Validate input constraints
if data.num_columns != 1:
raise RuntimeError(f"Blob format only supports a single
column, got {data.num_columns} columns")
- # Check for null values
column = data.column(0)
if column.null_count > 0:
raise RuntimeError("Blob format does not support null values")
- # Convert PyArrow schema to Paimon DataFields
- # For blob files, we expect exactly one blob column
field = data.schema[0]
if pyarrow.types.is_large_binary(field.type):
fields = [DataField(0, field.name, AtomicType("BLOB"))]
else:
- # Convert other types as needed
paimon_type = PyarrowFieldParser.to_paimon_type(field.type,
field.nullable)
fields = [DataField(0, field.name, paimon_type)]
- # Convert PyArrow Table to records
records_dict = data.to_pydict()
num_rows = data.num_rows
field_name = fields[0].name
with self.new_output_stream(path) as output_stream:
writer = BlobFormatWriter(output_stream)
- # Write each row
for i in range(num_rows):
col_data = records_dict[field_name][i]
- # Convert to appropriate type based on field type
if hasattr(fields[0].type, 'type') and fields[0].type.type
== "BLOB":
if blob_as_descriptor:
blob_descriptor =
BlobDescriptor.deserialize(col_data)
@@ -545,7 +456,6 @@ class FileIO:
elif isinstance(col_data, bytes):
blob_data = BlobData(col_data)
else:
- # Convert to bytes if needed
if hasattr(col_data, 'as_py'):
col_data = col_data.as_py()
if isinstance(col_data, str):
@@ -554,7 +464,6 @@ class FileIO:
row_values = [blob_data]
else:
row_values = [col_data]
- # Create GenericRow and write
row = GenericRow(row_values, fields, RowKind.INSERT)
writer.add_element(row)
writer.close()
@@ -571,30 +480,24 @@ class FileIO:
normalized_path = re.sub(r'/+', '/', parsed.path) if parsed.path else
''
if parsed.scheme and len(parsed.scheme) == 1 and not parsed.netloc:
- # This is likely a Windows drive letter, not a URI scheme
return str(path)
if parsed.scheme == 'file' and parsed.netloc and
parsed.netloc.endswith(':'):
- # file://C:/path format - netloc is 'C:', need to reconstruct path
with drive letter
drive_letter = parsed.netloc.rstrip(':')
path_part = normalized_path.lstrip('/')
return f"{drive_letter}:/{path_part}" if path_part else
f"{drive_letter}:"
if isinstance(self.filesystem, S3FileSystem):
- # For S3, return "bucket/path" format
if parsed.scheme:
if parsed.netloc:
- # Has netloc (bucket): return "bucket/path" format
path_part = normalized_path.lstrip('/')
return f"{parsed.netloc}/{path_part}" if path_part else
parsed.netloc
else:
- # Has scheme but no netloc: return path without scheme
result = normalized_path.lstrip('/')
return result if result else '.'
return str(path)
if parsed.scheme:
- # Handle empty path: return '.' for current directory
if not normalized_path:
return '.'
return normalized_path
diff --git a/paimon-python/pypaimon/read/reader/lance_utils.py
b/paimon-python/pypaimon/read/reader/lance_utils.py
index 60c7763aa3..c219dc6704 100644
--- a/paimon-python/pypaimon/read/reader/lance_utils.py
+++ b/paimon-python/pypaimon/read/reader/lance_utils.py
@@ -26,6 +26,9 @@ from pypaimon.common.options.config import OssOptions
def to_lance_specified(file_io: FileIO, file_path: str) -> Tuple[str,
Optional[Dict[str, str]]]:
"""Convert path and extract storage options for Lance format."""
+ if hasattr(file_io, 'file_io'):
+ file_io = file_io.file_io()
+
scheme, _, _ = file_io.parse_location(file_path)
storage_options = None
file_path_for_lance = file_io.to_filesystem_path(file_path)
diff --git
a/paimon-python/pypaimon/sample/rest_catalog_blob_as_descriptor_sample.py
b/paimon-python/pypaimon/sample/rest_catalog_blob_as_descriptor_sample.py
index 802d1def10..2770fad136 100644
--- a/paimon-python/pypaimon/sample/rest_catalog_blob_as_descriptor_sample.py
+++ b/paimon-python/pypaimon/sample/rest_catalog_blob_as_descriptor_sample.py
@@ -23,8 +23,8 @@ import pyarrow as pa
from pypaimon import Schema
from pypaimon.table.row.blob import BlobDescriptor, Blob
-from pypaimon.common.file_io import FileIO
from pypaimon.common.options import Options
+from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO
def write_table_with_blob(catalog, video_file_path: str, external_oss_options:
dict):
@@ -66,7 +66,7 @@ def write_table_with_blob(catalog, video_file_path: str,
external_oss_options: d
# Access external OSS file to get file size
try:
- external_file_io = FileIO(video_file_path,
Options(external_oss_options))
+ external_file_io = PyArrowFileIO(video_file_path,
Options(external_oss_options))
video_file_size = external_file_io.get_file_size(video_file_path)
except Exception as e:
raise FileNotFoundError(
diff --git a/paimon-python/pypaimon/table/row/blob.py
b/paimon-python/pypaimon/table/row/blob.py
index b92ddb3e82..a9a2e2fd49 100644
--- a/paimon-python/pypaimon/table/row/blob.py
+++ b/paimon-python/pypaimon/table/row/blob.py
@@ -155,7 +155,7 @@ class Blob(ABC):
file_uri = file
else:
file_uri = f"file://{file}"
- file_io = FileIO(file_uri, {})
+ file_io = FileIO.get(file_uri, {})
uri_reader = FileUriReader(file_io)
descriptor = BlobDescriptor(file, 0, -1)
return Blob.from_descriptor(uri_reader, descriptor)
diff --git a/paimon-python/pypaimon/tests/blob_test.py
b/paimon-python/pypaimon/tests/blob_test.py
index e12d031c7a..91808bc014 100644
--- a/paimon-python/pypaimon/tests/blob_test.py
+++ b/paimon-python/pypaimon/tests/blob_test.py
@@ -25,6 +25,7 @@ import pyarrow as pa
from pypaimon import CatalogFactory
from pypaimon.common.file_io import FileIO
+from pypaimon.filesystem.local_file_io import LocalFileIO
from pypaimon.common.options import Options
from pypaimon.read.reader.format_blob_reader import FormatBlobReader
from pypaimon.schema.data_types import AtomicType, DataField
@@ -108,7 +109,7 @@ class BlobTest(unittest.TestCase):
def test_from_file_with_offset_and_length(self):
"""Test Blob.from_file() method with offset and length."""
- file_io = FileIO(self.file if self.file.startswith('file://') else
f"file://{self.file}", Options({}))
+ file_io = LocalFileIO(self.file if self.file.startswith('file://')
else f"file://{self.file}", Options({}))
blob = Blob.from_file(file_io, self.file, 0, 4)
# Verify it returns a BlobRef instance
@@ -204,7 +205,7 @@ class BlobTest(unittest.TestCase):
self.assertIsInstance(blob_ref, Blob)
# from_file should return BlobRef
- file_io = FileIO(self.file if self.file.startswith('file://') else
f"file://{self.file}", Options({}))
+ file_io = LocalFileIO(self.file if self.file.startswith('file://')
else f"file://{self.file}", Options({}))
blob_file = Blob.from_file(file_io, self.file, 0,
os.path.getsize(self.file))
self.assertIsInstance(blob_file, BlobRef)
self.assertIsInstance(blob_file, Blob)
@@ -563,7 +564,7 @@ class BlobEndToEndTest(unittest.TestCase):
def test_blob_end_to_end(self):
# Set up file I/O
- file_io = FileIO(self.temp_dir, Options({}))
+ file_io = LocalFileIO(self.temp_dir, Options({}))
blob_field_name = "blob_field"
# ========== Step 1: Check Type Validation ==========
@@ -612,12 +613,11 @@ class BlobEndToEndTest(unittest.TestCase):
"""Test that complex types containing BLOB elements throw exceptions
during read/write operations."""
from pypaimon.schema.data_types import DataField, AtomicType,
ArrayType, MultisetType, MapType
from pypaimon.table.row.blob import BlobData
- from pypaimon.common.file_io import FileIO
from pypaimon.table.row.generic_row import GenericRow,
GenericRowSerializer
from pypaimon.table.row.row_kind import RowKind
# Set up file I/O
- file_io = FileIO(self.temp_dir, Options({}))
+ file_io = LocalFileIO(self.temp_dir, Options({}))
# ========== Test ArrayType(nullable=True,
element_type=AtomicType("BLOB")) ==========
array_fields = [
@@ -755,11 +755,10 @@ class BlobEndToEndTest(unittest.TestCase):
def test_blob_advanced_scenarios(self):
"""Test advanced blob scenarios: corruption, truncation, zero-length,
large blobs, compression, cross-format."""
from pypaimon.schema.data_types import DataField, AtomicType
- from pypaimon.common.file_io import FileIO
from pypaimon.common.delta_varint_compressor import
DeltaVarintCompressor
# Set up file I/O
- file_io = FileIO(self.temp_dir, Options({}))
+ file_io = LocalFileIO(self.temp_dir, Options({}))
# ========== Test 1: Corrupted file header test ==========
@@ -999,7 +998,7 @@ class BlobEndToEndTest(unittest.TestCase):
def test_blob_end_to_end_with_descriptor(self):
# Set up file I/O
- file_io = FileIO(self.temp_dir, Options({}))
+ file_io = LocalFileIO(self.temp_dir, Options({}))
# ========== Step 1: Write data to local file ==========
# Create test data and write it to a local file
diff --git a/paimon-python/pypaimon/tests/file_io_test.py
b/paimon-python/pypaimon/tests/file_io_test.py
index ec5d6c4e82..6cdf713b0c 100644
--- a/paimon-python/pypaimon/tests/file_io_test.py
+++ b/paimon-python/pypaimon/tests/file_io_test.py
@@ -23,9 +23,11 @@ from pathlib import Path
from unittest.mock import MagicMock, patch
import pyarrow
-from pyarrow.fs import S3FileSystem, LocalFileSystem
+from pyarrow.fs import S3FileSystem
-from pypaimon.common.file_io import FileIO
+from pypaimon.common.options import Options
+from pypaimon.filesystem.local_file_io import LocalFileIO
+from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO
class FileIOTest(unittest.TestCase):
@@ -33,7 +35,7 @@ class FileIOTest(unittest.TestCase):
def test_s3_filesystem_path_conversion(self):
"""Test S3FileSystem path conversion with various formats."""
- file_io = FileIO("s3://bucket/warehouse", {})
+ file_io = PyArrowFileIO("s3://bucket/warehouse", Options({}))
self.assertIsInstance(file_io.filesystem, S3FileSystem)
# Test bucket and path
@@ -64,9 +66,8 @@ class FileIOTest(unittest.TestCase):
self.assertEqual(file_io.to_filesystem_path(parent_str), parent_str)
def test_local_filesystem_path_conversion(self):
- """Test LocalFileSystem path conversion with various formats."""
- file_io = FileIO("file:///tmp/warehouse", {})
- self.assertIsInstance(file_io.filesystem, LocalFileSystem)
+ file_io = LocalFileIO("file:///tmp/warehouse", Options({}))
+ self.assertIsInstance(file_io, LocalFileIO)
# Test file:// scheme
self.assertEqual(file_io.to_filesystem_path("file:///tmp/path/to/file.txt"),
@@ -93,11 +94,9 @@ class FileIOTest(unittest.TestCase):
self.assertEqual(file_io.to_filesystem_path(parent_str), parent_str)
def test_windows_path_handling(self):
- """Test Windows path handling (drive letters, file:// scheme)."""
- file_io = FileIO("file:///tmp/warehouse", {})
- self.assertIsInstance(file_io.filesystem, LocalFileSystem)
+ file_io = LocalFileIO("file:///tmp/warehouse", Options({}))
+ self.assertIsInstance(file_io, LocalFileIO)
- # Windows absolute paths
self.assertEqual(file_io.to_filesystem_path("C:\\path\\to\\file.txt"),
"C:\\path\\to\\file.txt")
self.assertEqual(file_io.to_filesystem_path("C:/path/to/file.txt"),
@@ -113,17 +112,17 @@ class FileIOTest(unittest.TestCase):
"/C:/path/to/file.txt")
# Windows path with S3FileSystem (should preserve)
- s3_file_io = FileIO("s3://bucket/warehouse", {})
+ s3_file_io = PyArrowFileIO("s3://bucket/warehouse", Options({}))
self.assertEqual(s3_file_io.to_filesystem_path("C:\\path\\to\\file.txt"),
"C:\\path\\to\\file.txt")
def test_path_normalization(self):
"""Test path normalization (multiple slashes)."""
- file_io = FileIO("file:///tmp/warehouse", {})
+ file_io = LocalFileIO("file:///tmp/warehouse", Options({}))
self.assertEqual(file_io.to_filesystem_path("file://///tmp///path///file.txt"),
"/tmp/path/file.txt")
- s3_file_io = FileIO("s3://bucket/warehouse", {})
+ s3_file_io = PyArrowFileIO("s3://bucket/warehouse", Options({}))
self.assertEqual(s3_file_io.to_filesystem_path("s3://my-bucket///path///to///file.txt"),
"my-bucket/path/to/file.txt")
@@ -131,7 +130,7 @@ class FileIOTest(unittest.TestCase):
temp_dir = tempfile.mkdtemp(prefix="file_io_write_test_")
try:
warehouse_path = f"file://{temp_dir}"
- file_io = FileIO(warehouse_path, {})
+ file_io = LocalFileIO(warehouse_path, Options({}))
test_file_uri = f"file://{temp_dir}/overwrite_test.txt"
expected_path = os.path.join(temp_dir, "overwrite_test.txt")
@@ -161,7 +160,7 @@ class FileIOTest(unittest.TestCase):
temp_dir = tempfile.mkdtemp(prefix="file_io_exists_test_")
try:
warehouse_path = f"file://{temp_dir}"
- file_io = FileIO(warehouse_path, {})
+ file_io = LocalFileIO(warehouse_path, Options({}))
test_file = os.path.join(temp_dir, "test_file.txt")
with open(test_file, "w") as f:
@@ -169,35 +168,48 @@ class FileIOTest(unittest.TestCase):
self.assertTrue(file_io.exists(f"file://{test_file}"))
self.assertFalse(file_io.exists(f"file://{temp_dir}/nonexistent.txt"))
- mock_filesystem = MagicMock()
- mock_filesystem.get_file_info.side_effect = OSError("Permission
denied")
- file_io.filesystem = mock_filesystem
+ mock_path = MagicMock(spec=Path)
+ mock_path.exists.side_effect = OSError("Permission denied")
+ with patch.object(file_io, '_to_file', return_value=mock_path):
+ with self.assertRaises(OSError) as context:
+ file_io.exists("file:///some/path")
+ self.assertIn("Permission denied", str(context.exception))
- with self.assertRaises(OSError) as context:
- file_io.exists("file:///some/path")
- self.assertIn("Permission denied", str(context.exception))
-
- with self.assertRaises(OSError):
- file_io.new_output_stream("file:///some/path/file.txt")
-
- with self.assertRaises(OSError):
- file_io.check_or_mkdirs("file:///some/path")
-
- with self.assertRaises(OSError):
- file_io.write_file("file:///some/path", "content",
overwrite=False)
-
- with self.assertRaises(OSError):
- file_io.copy_file("file:///src", "file:///dst",
overwrite=False)
+ with patch('builtins.open', side_effect=OSError("Permission
denied")):
+ with self.assertRaises(OSError):
+ file_io.new_output_stream("file:///some/path/file.txt")
- with patch.object(file_io, 'read_file_utf8',
side_effect=Exception("Read error")):
+ mock_path = MagicMock(spec=Path)
+ mock_path.is_dir.side_effect = OSError("Permission denied")
+ with patch.object(file_io, '_to_file', return_value=mock_path):
with self.assertRaises(OSError):
- file_io.read_overwritten_file_utf8("file:///some/path")
+ file_io.check_or_mkdirs("file:///some/path")
- mock_filesystem.get_file_info.side_effect = OSError("Network
error")
- file_io.filesystem = mock_filesystem
+ with patch('builtins.open', side_effect=OSError("Permission
denied")):
+ with self.assertRaises(OSError):
+ file_io.write_file("file:///some/path", "content",
overwrite=False)
- with self.assertRaises(OSError):
- file_io.rename("file:///src", "file:///dst")
+ with patch('builtins.open', side_effect=OSError("Permission
denied")):
+ with self.assertRaises(OSError):
+ file_io.copy_file("file:///src", "file:///dst",
overwrite=False)
+
+ with patch.object(file_io, 'exists', return_value=True):
+ with patch.object(file_io, 'read_file_utf8',
side_effect=OSError("Read error")):
+ with self.assertRaises(OSError) as context:
+ file_io.read_overwritten_file_utf8("file:///some/path")
+ self.assertIn("Read error", str(context.exception))
+
+ # rename() catches OSError and returns False (consistent with Java
implementation)
+ mock_src_path = MagicMock(spec=Path)
+ mock_dst_path = MagicMock(spec=Path)
+ mock_dst_path.parent = MagicMock()
+ mock_dst_path.parent.exists.return_value = True
+ mock_dst_path.exists.return_value = False
+ mock_src_path.rename.side_effect = OSError("Network error")
+ with patch.object(file_io, '_to_file', side_effect=[mock_src_path,
mock_dst_path]):
+ # rename() catches OSError and returns False, doesn't raise
+ result = file_io.rename("file:///src", "file:///dst")
+ self.assertFalse(result, "rename() should return False when
OSError occurs")
file_io.delete_quietly("file:///some/path")
file_io.delete_directory_quietly("file:///some/path")
@@ -208,7 +220,7 @@ class FileIOTest(unittest.TestCase):
temp_dir = tempfile.mkdtemp(prefix="file_io_delete_test_")
try:
warehouse_path = f"file://{temp_dir}"
- file_io = FileIO(warehouse_path, {})
+ file_io = LocalFileIO(warehouse_path, Options({}))
test_dir = os.path.join(temp_dir, "test_dir")
os.makedirs(test_dir)
@@ -226,7 +238,7 @@ class FileIOTest(unittest.TestCase):
temp_dir = tempfile.mkdtemp(prefix="file_io_delete_test_")
try:
warehouse_path = f"file://{temp_dir}"
- file_io = FileIO(warehouse_path, {})
+ file_io = LocalFileIO(warehouse_path, Options({}))
result = file_io.delete(f"file://{temp_dir}/nonexistent.txt")
self.assertFalse(result, "delete() should return False when file
does not exist")
@@ -240,7 +252,7 @@ class FileIOTest(unittest.TestCase):
temp_dir = tempfile.mkdtemp(prefix="file_io_mkdirs_test_")
try:
warehouse_path = f"file://{temp_dir}"
- file_io = FileIO(warehouse_path, {})
+ file_io = LocalFileIO(warehouse_path, Options({}))
test_file = os.path.join(temp_dir, "test_file.txt")
with open(test_file, "w") as f:
@@ -256,7 +268,7 @@ class FileIOTest(unittest.TestCase):
temp_dir = tempfile.mkdtemp(prefix="file_io_rename_test_")
try:
warehouse_path = f"file://{temp_dir}"
- file_io = FileIO(warehouse_path, {})
+ file_io = LocalFileIO(warehouse_path, Options({}))
src_file = os.path.join(temp_dir, "src.txt")
dst_file = os.path.join(temp_dir, "dst.txt")
@@ -274,7 +286,7 @@ class FileIOTest(unittest.TestCase):
temp_dir = tempfile.mkdtemp(prefix="file_io_get_file_status_test_")
try:
warehouse_path = f"file://{temp_dir}"
- file_io = FileIO(warehouse_path, {})
+ file_io = LocalFileIO(warehouse_path, Options({}))
with self.assertRaises(FileNotFoundError) as context:
file_io.get_file_status(f"file://{temp_dir}/nonexistent.txt")
@@ -302,7 +314,7 @@ class FileIOTest(unittest.TestCase):
temp_dir = tempfile.mkdtemp(prefix="file_io_copy_test_")
try:
warehouse_path = f"file://{temp_dir}"
- file_io = FileIO(warehouse_path, {})
+ file_io = LocalFileIO(warehouse_path, Options({}))
source_file = os.path.join(temp_dir, "source.txt")
target_file = os.path.join(temp_dir, "target.txt")
@@ -338,22 +350,29 @@ class FileIOTest(unittest.TestCase):
def test_try_to_write_atomic(self):
temp_dir = tempfile.mkdtemp(prefix="file_io_try_write_atomic_test_")
try:
- warehouse_path = f"file://{temp_dir}"
- file_io = FileIO(warehouse_path, {})
-
target_dir = os.path.join(temp_dir, "target_dir")
+ normal_file = os.path.join(temp_dir, "normal_file.txt")
+
+ from pypaimon.filesystem.local_file_io import LocalFileIO
+ local_file_io = LocalFileIO(f"file://{temp_dir}", Options({}))
os.makedirs(target_dir)
+ self.assertFalse(
+ local_file_io.try_to_write_atomic(f"file://{target_dir}",
"test content"),
+ "LocalFileIO should return False when target is a directory")
+ self.assertEqual(len(os.listdir(target_dir)), 0, "No file should
be created inside the directory")
- result = file_io.try_to_write_atomic(f"file://{target_dir}", "test
content")
- self.assertFalse(result, "try_to_write_atomic should return False
when target is a directory")
+
self.assertTrue(local_file_io.try_to_write_atomic(f"file://{normal_file}",
"test content"))
+ with open(normal_file, "r") as f:
+ self.assertEqual(f.read(), "test content")
- self.assertTrue(os.path.isdir(target_dir))
+ os.remove(normal_file)
+ local_file_io = LocalFileIO(f"file://{temp_dir}", Options({}))
+ self.assertFalse(
+ local_file_io.try_to_write_atomic(f"file://{target_dir}",
"test content"),
+ "LocalFileIO should return False when target is a directory")
self.assertEqual(len(os.listdir(target_dir)), 0, "No file should
be created inside the directory")
- normal_file = os.path.join(temp_dir, "normal_file.txt")
- result = file_io.try_to_write_atomic(f"file://{normal_file}",
"test content")
- self.assertTrue(result, "try_to_write_atomic should succeed for a
normal file path")
- self.assertTrue(os.path.exists(normal_file))
+
self.assertTrue(local_file_io.try_to_write_atomic(f"file://{normal_file}",
"test content"))
with open(normal_file, "r") as f:
self.assertEqual(f.read(), "test content")
finally:
diff --git a/paimon-python/pypaimon/tests/filesystem_catalog_test.py
b/paimon-python/pypaimon/tests/filesystem_catalog_test.py
index 6bdba47b81..fe2c19f18a 100644
--- a/paimon-python/pypaimon/tests/filesystem_catalog_test.py
+++ b/paimon-python/pypaimon/tests/filesystem_catalog_test.py
@@ -1,5 +1,4 @@
# Licensed to the Apache Software Foundation (ASF) under one
-# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
@@ -181,11 +180,21 @@ class FileSystemCatalogTest(unittest.TestCase):
with self.assertRaises(DatabaseNotExistException):
catalog.get_database("nonexistent_db")
- mock_filesystem = MagicMock()
- mock_filesystem.get_file_info.side_effect = OSError("Permission
denied")
- catalog.file_io.filesystem = mock_filesystem
+ catalog.create_database("test_db", False)
+
+ # FileSystemCatalog has file_io attribute
+ from pypaimon.catalog.filesystem_catalog import FileSystemCatalog
+ self.assertIsInstance(catalog, FileSystemCatalog)
+ filesystem_catalog = catalog # type: FileSystemCatalog
+
+ original_exists = filesystem_catalog.file_io.exists
+ filesystem_catalog.file_io.exists =
MagicMock(side_effect=OSError("Permission denied"))
+ # Now get_database should propagate OSError, not
DatabaseNotExistException
with self.assertRaises(OSError) as context:
catalog.get_database("test_db")
self.assertIn("Permission denied", str(context.exception))
self.assertNotIsInstance(context.exception, DatabaseNotExistException)
+
+ # Restore original method
+ filesystem_catalog.file_io.exists = original_exists
diff --git a/paimon-python/pypaimon/tests/lance_utils_test.py
b/paimon-python/pypaimon/tests/lance_utils_test.py
index 2676414d8b..71b7b2d571 100644
--- a/paimon-python/pypaimon/tests/lance_utils_test.py
+++ b/paimon-python/pypaimon/tests/lance_utils_test.py
@@ -20,7 +20,7 @@ import unittest
from pypaimon.common.options import Options
from pypaimon.common.options.config import OssOptions
-from pypaimon.common.file_io import FileIO
+from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO
from pypaimon.read.reader.lance_utils import to_lance_specified
@@ -35,7 +35,7 @@ class LanceUtilsTest(unittest.TestCase):
OssOptions.OSS_ACCESS_KEY_SECRET.key(): "test-secret",
})
- file_io = FileIO(file_path, properties)
+ file_io = PyArrowFileIO(file_path, properties)
file_path_for_lance, storage_options = to_lance_specified(file_io,
file_path)
self.assertEqual(
@@ -61,7 +61,7 @@ class LanceUtilsTest(unittest.TestCase):
OssOptions.OSS_SECURITY_TOKEN.key(): "test-token",
})
- file_io = FileIO(file_path, properties)
+ file_io = PyArrowFileIO(file_path, properties)
file_path_for_lance, storage_options = to_lance_specified(file_io,
file_path)
self.assertEqual(file_path_for_lance,
"oss://my-bucket/path/to/file.lance")
diff --git a/paimon-python/pypaimon/tests/py36/ao_simple_test.py
b/paimon-python/pypaimon/tests/py36/ao_simple_test.py
index 183a3a72a6..f6bcfa9767 100644
--- a/paimon-python/pypaimon/tests/py36/ao_simple_test.py
+++ b/paimon-python/pypaimon/tests/py36/ao_simple_test.py
@@ -26,7 +26,7 @@ from pypaimon.catalog.catalog_exception import
(DatabaseAlreadyExistException,
TableNotExistException)
from pypaimon.common.options import Options
from pypaimon.common.options.config import OssOptions
-from pypaimon.common.file_io import FileIO
+from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO
from pypaimon.tests.py36.pyarrow_compat import table_sort_by
from pypaimon.tests.rest.rest_base_test import RESTBaseTest
@@ -404,19 +404,19 @@ class AOSimpleTest(RESTBaseTest):
with patch("pypaimon.common.file_io.pyarrow.__version__", "6.0.0"), \
patch("pyarrow.fs.S3FileSystem") as mock_s3fs:
- FileIO("oss://oss-bucket/paimon-database/paimon-table",
Options(props))
+ PyArrowFileIO("oss://oss-bucket/paimon-database/paimon-table",
Options(props))
mock_s3fs.assert_called_once_with(access_key="AKID",
secret_key="SECRET",
session_token="TOKEN",
region="cn-hangzhou",
endpoint_override="oss-bucket."
+ props[OssOptions.OSS_ENDPOINT.key()])
- FileIO("oss://oss-bucket.endpoint/paimon-database/paimon-table",
Options(props))
+
PyArrowFileIO("oss://oss-bucket.endpoint/paimon-database/paimon-table",
Options(props))
mock_s3fs.assert_called_with(access_key="AKID",
secret_key="SECRET",
session_token="TOKEN",
region="cn-hangzhou",
endpoint_override="oss-bucket." +
props[OssOptions.OSS_ENDPOINT.key()])
-
FileIO("oss://access_id:secret_key@Endpoint/oss-bucket/paimon-database/paimon-table",
Options(props))
+
PyArrowFileIO("oss://access_id:secret_key@Endpoint/oss-bucket/paimon-database/paimon-table",
Options(props))
mock_s3fs.assert_called_with(access_key="AKID",
secret_key="SECRET",
session_token="TOKEN",
diff --git a/paimon-python/pypaimon/tests/rest/rest_server.py
b/paimon-python/pypaimon/tests/rest/rest_server.py
index e0bedeac1b..d556f7f55c 100755
--- a/paimon-python/pypaimon/tests/rest/rest_server.py
+++ b/paimon-python/pypaimon/tests/rest/rest_server.py
@@ -800,7 +800,7 @@ class RESTCatalogServer:
from pypaimon.common.options import Options
warehouse_path = str(Path(self.data_path) / self.warehouse)
options = Options({"warehouse": warehouse_path})
- return FileIO(warehouse_path, options)
+ return FileIO.get(warehouse_path, options)
def _create_table_metadata(self, identifier: Identifier, schema_id: int,
schema: Schema, uuid_str: str, is_external:
bool) -> TableMetadata:
diff --git a/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py
b/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py
index 07e445b12a..47ea8e6cb6 100644
--- a/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py
+++ b/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py
@@ -18,16 +18,15 @@
import os
import pickle
import tempfile
-import time
import unittest
from unittest.mock import patch
from pypaimon.catalog.rest.rest_token_file_io import RESTTokenFileIO
-from pypaimon.common.file_io import FileIO
from pypaimon.common.identifier import Identifier
from pypaimon.common.options import Options
from pypaimon.common.options.config import CatalogOptions, OssOptions
+from pypaimon.filesystem.local_file_io import LocalFileIO
class RESTTokenFileIOTest(unittest.TestCase):
@@ -92,6 +91,30 @@ class RESTTokenFileIOTest(unittest.TestCase):
finally:
os.chdir(original_cwd)
+ def test_try_to_write_atomic_directory_check(self):
+ with patch.object(RESTTokenFileIO, 'try_to_refresh_token'):
+ file_io = RESTTokenFileIO(
+ self.identifier,
+ self.warehouse_path,
+ self.catalog_options
+ )
+
+ target_dir = os.path.join(self.temp_dir, "target_dir")
+ os.makedirs(target_dir)
+
+ result = file_io.try_to_write_atomic(f"file://{target_dir}", "test
content")
+ self.assertFalse(result, "try_to_write_atomic should return False
when target is a directory")
+
+ self.assertTrue(os.path.isdir(target_dir))
+ self.assertEqual(len(os.listdir(target_dir)), 0, "No file should
be created inside the directory")
+
+ normal_file = os.path.join(self.temp_dir, "normal_file.txt")
+ result = file_io.try_to_write_atomic(f"file://{normal_file}",
"test content")
+ self.assertTrue(result, "try_to_write_atomic should succeed for a
normal file path")
+ self.assertTrue(os.path.exists(normal_file))
+ with open(normal_file, "r") as f:
+ self.assertEqual(f.read(), "test content")
+
def test_new_output_stream_behavior_matches_parent(self):
"""Test that RESTTokenFileIO.new_output_stream behaves like
FileIO.new_output_stream."""
with patch.object(RESTTokenFileIO, 'try_to_refresh_token'):
@@ -100,7 +123,7 @@ class RESTTokenFileIOTest(unittest.TestCase):
self.warehouse_path,
self.catalog_options
)
- regular_file_io = FileIO(self.warehouse_path, self.catalog_options)
+ regular_file_io = LocalFileIO(self.warehouse_path,
self.catalog_options)
test_file_path = f"file://{self.temp_dir}/comparison/test.txt"
test_content = b"comparison content"
@@ -195,9 +218,7 @@ class RESTTokenFileIOTest(unittest.TestCase):
def test_catalog_options_not_modified(self):
from pypaimon.api.rest_util import RESTUtil
- from pypaimon.catalog.rest.rest_token import RESTToken
- from pyarrow.fs import LocalFileSystem
-
+
original_catalog_options = Options({
CatalogOptions.URI.key(): "http://test-uri",
"custom.key": "custom.value"
@@ -217,10 +238,8 @@ class RESTTokenFileIOTest(unittest.TestCase):
OssOptions.OSS_ACCESS_KEY_SECRET.key(): "token-secret-key",
OssOptions.OSS_ENDPOINT.key(): "token-endpoint"
}
- file_io.token = RESTToken(token_dict, int(time.time() * 1000) +
3600000)
- with patch.object(FileIO, '_initialize_oss_fs',
return_value=LocalFileSystem()):
- file_io._initialize_oss_fs("file:///test/path")
+ merged_token =
file_io._merge_token_with_catalog_options(token_dict)
self.assertEqual(
original_catalog_options.to_map(),
@@ -230,7 +249,7 @@ class RESTTokenFileIOTest(unittest.TestCase):
merged_properties = RESTUtil.merge(
original_catalog_options.to_map(),
- file_io._merge_token_with_catalog_options(token_dict)
+ merged_token
)
self.assertIn("custom.key", merged_properties)
@@ -238,6 +257,53 @@ class RESTTokenFileIOTest(unittest.TestCase):
self.assertIn(OssOptions.OSS_ACCESS_KEY_ID.key(),
merged_properties)
self.assertEqual(merged_properties[OssOptions.OSS_ACCESS_KEY_ID.key()],
"token-access-key")
+ def test_filesystem_property(self):
+ with patch.object(RESTTokenFileIO, 'try_to_refresh_token'):
+ file_io = RESTTokenFileIO(
+ self.identifier,
+ self.warehouse_path,
+ self.catalog_options
+ )
+
+ self.assertTrue(hasattr(file_io, 'filesystem'), "RESTTokenFileIO
should have filesystem property")
+ filesystem = file_io.filesystem
+ self.assertIsNotNone(filesystem, "filesystem should not be None")
+
+ self.assertTrue(hasattr(filesystem, 'open_input_file'),
+ "filesystem should support open_input_file method")
+
+ def test_uri_reader_factory_property(self):
+ with patch.object(RESTTokenFileIO, 'try_to_refresh_token'):
+ file_io = RESTTokenFileIO(
+ self.identifier,
+ self.warehouse_path,
+ self.catalog_options
+ )
+
+ self.assertTrue(hasattr(file_io, 'uri_reader_factory'),
+ "RESTTokenFileIO should have uri_reader_factory
property")
+ uri_reader_factory = file_io.uri_reader_factory
+ self.assertIsNotNone(uri_reader_factory, "uri_reader_factory
should not be None")
+
+ self.assertTrue(hasattr(uri_reader_factory, 'create'),
+ "uri_reader_factory should support create method")
+
+ def test_filesystem_and_uri_reader_factory_after_serialization(self):
+ with patch.object(RESTTokenFileIO, 'try_to_refresh_token'):
+ original_file_io = RESTTokenFileIO(
+ self.identifier,
+ self.warehouse_path,
+ self.catalog_options
+ )
+
+ pickled = pickle.dumps(original_file_io)
+ restored_file_io = pickle.loads(pickled)
+
+ self.assertIsNotNone(restored_file_io.filesystem,
+ "filesystem should work after
deserialization")
+ self.assertIsNotNone(restored_file_io.uri_reader_factory,
+ "uri_reader_factory should work after
deserialization")
+
if __name__ == '__main__':
unittest.main()