This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d04ca574e7 [python] use str to replace pathlib.Path to avoid scheme
missing in pypaimon (#6604)
d04ca574e7 is described below
commit d04ca574e7016cb5098d535b78919232e28b30ee
Author: XiaoHongbo <[email protected]>
AuthorDate: Thu Nov 20 15:19:10 2025 +0800
[python] use str to replace pathlib.Path to avoid scheme missing in
pypaimon (#6604)
---
.../org/apache/paimon/flink/BlobTableITCase.java | 3 +
.../pypaimon/catalog/filesystem_catalog.py | 19 +--
.../pypaimon/catalog/rest/rest_catalog.py | 20 +--
.../pypaimon/catalog/rest/rest_token_file_io.py | 6 +-
paimon-python/pypaimon/common/file_io.py | 147 ++++++++++++++-------
paimon-python/pypaimon/common/uri_reader.py | 11 +-
.../pypaimon/manifest/manifest_file_manager.py | 7 +-
.../pypaimon/manifest/manifest_list_manager.py | 7 +-
.../pypaimon/manifest/schema/data_file_meta.py | 11 +-
.../pypaimon/read/reader/format_avro_reader.py | 3 +-
.../pypaimon/read/reader/format_blob_reader.py | 5 +-
.../pypaimon/read/reader/format_pyarrow_reader.py | 3 +-
.../pypaimon/sample/oss_blob_as_descriptor.py | 111 ++++++++++++++++
.../sample/rest_catalog_read_write_sample.py | 108 +++++++++++++++
paimon-python/pypaimon/schema/schema_manager.py | 11 +-
.../pypaimon/snapshot/snapshot_manager.py | 17 +--
paimon-python/pypaimon/table/file_store_table.py | 3 +-
paimon-python/pypaimon/tests/blob_table_test.py | 7 +
paimon-python/pypaimon/tests/blob_test.py | 76 +++++++----
paimon-python/pypaimon/tests/file_io_test.py | 127 ++++++++++++++++++
.../pypaimon/tests/file_store_commit_test.py | 3 +-
.../pypaimon/tests/rest/rest_token_file_io_test.py | 115 ++++++++++++++++
.../pypaimon/tests/uri_reader_factory_test.py | 7 +-
paimon-python/pypaimon/write/file_store_commit.py | 3 +-
paimon-python/pypaimon/write/writer/blob_writer.py | 9 +-
.../pypaimon/write/writer/data_blob_writer.py | 5 +-
paimon-python/pypaimon/write/writer/data_writer.py | 13 +-
27 files changed, 693 insertions(+), 164 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
index 40491f180c..73c11c8f3b 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
@@ -31,6 +31,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.OutputStream;
+import java.net.URI;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
@@ -90,6 +91,8 @@ public class BlobTableITCase extends CatalogITCaseBase {
Blob.fromDescriptor(
uriReaderFactory.create(newBlobDescriptor.uri()),
blobDescriptor);
assertThat(blob.toData()).isEqualTo(blobData);
+ URI blobUri = URI.create(blob.toDescriptor().uri());
+ assertThat(blobUri.getScheme()).isNotNull();
batchSql("ALTER TABLE blob_table_descriptor SET
('blob-as-descriptor'='false')");
assertThat(batchSql("SELECT * FROM blob_table_descriptor"))
.containsExactlyInAnyOrder(Row.of(1, "paimon", blobData));
diff --git a/paimon-python/pypaimon/catalog/filesystem_catalog.py
b/paimon-python/pypaimon/catalog/filesystem_catalog.py
index 8de2850e4a..63128f185d 100644
--- a/paimon-python/pypaimon/catalog/filesystem_catalog.py
+++ b/paimon-python/pypaimon/catalog/filesystem_catalog.py
@@ -16,9 +16,7 @@
# limitations under the License.
#################################################################################
-from pathlib import Path
from typing import List, Optional, Union
-from urllib.parse import urlparse
from pypaimon.catalog.catalog import Catalog
from pypaimon.catalog.catalog_environment import CatalogEnvironment
@@ -108,18 +106,13 @@ class FileSystemCatalog(Catalog):
raise TableNotExistException(identifier)
return table_schema
- def get_database_path(self, name) -> Path:
- return self._trim_schema(self.warehouse) / f"{name}{Catalog.DB_SUFFIX}"
+ def get_database_path(self, name) -> str:
+ warehouse = self.warehouse.rstrip('/')
+ return f"{warehouse}/{name}{Catalog.DB_SUFFIX}"
- def get_table_path(self, identifier: Identifier) -> Path:
- return self.get_database_path(identifier.get_database_name()) /
identifier.get_table_name()
-
- @staticmethod
- def _trim_schema(warehouse_url: str) -> Path:
- parsed = urlparse(warehouse_url)
- bucket = parsed.netloc
- warehouse_dir = parsed.path.lstrip('/')
- return Path(f"{bucket}/{warehouse_dir}" if warehouse_dir else bucket)
+ def get_table_path(self, identifier: Identifier) -> str:
+ db_path = self.get_database_path(identifier.get_database_name())
+ return f"{db_path}/{identifier.get_table_name()}"
def commit_snapshot(
self,
diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
index 53db2abbaa..8bbe3cab8d 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
@@ -15,12 +15,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
-from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Union
-from urllib.parse import urlparse
-
-import pyarrow
-from packaging.version import parse
from pypaimon.api.api_response import GetTableResponse, PagedList
from pypaimon.api.options import Options
@@ -225,22 +220,17 @@ class RESTCatalog(Catalog):
catalog_loader=self.catalog_loader(),
supports_version_management=True # REST catalogs support version
management
)
- path_parsed = urlparse(schema.options.get(CoreOptions.PATH))
- path = path_parsed.path if path_parsed.scheme is None else
schema.options.get(CoreOptions.PATH)
- if path_parsed.scheme == "file":
- table_path = path_parsed.path
- else:
- table_path = path_parsed.netloc + path_parsed.path \
- if parse(pyarrow.__version__) >= parse("7.0.0") else
path_parsed.path[1:]
- table = self.create(data_file_io(path),
- Path(table_path),
+ # Use the path from server response directly (do not trim scheme)
+ table_path = schema.options.get(CoreOptions.PATH)
+ table = self.create(data_file_io(table_path),
+ table_path,
schema,
catalog_env)
return table
@staticmethod
def create(file_io: FileIO,
- table_path: Path,
+ table_path: str,
table_schema: TableSchema,
catalog_environment: CatalogEnvironment
) -> FileStoreTable:
diff --git a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
index a65e96695c..8feb617c49 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
@@ -18,7 +18,6 @@ limitations under the License.
import logging
import threading
import time
-from pathlib import Path
from typing import Optional
from pyarrow._fs import FileSystem
@@ -46,8 +45,9 @@ class RESTTokenFileIO(FileIO):
self.properties.update(self.token.token)
return super()._initialize_oss_fs(path)
- def new_output_stream(self, path: Path):
- return self.filesystem.open_output_stream(str(path))
+ def new_output_stream(self, path: str):
+ # Call parent class method to ensure path conversion and parent
directory creation
+ return super().new_output_stream(path)
def try_to_refresh_token(self):
if self.should_refresh():
diff --git a/paimon-python/pypaimon/common/file_io.py
b/paimon-python/pypaimon/common/file_io.py
index f881ba77bc..15e1d8d8d9 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -150,71 +150,81 @@ class FileIO:
return LocalFileSystem()
- def new_input_stream(self, path: Path):
- return self.filesystem.open_input_file(str(path))
+ def new_input_stream(self, path: str):
+ path_str = self.to_filesystem_path(path)
+ return self.filesystem.open_input_file(path_str)
- def new_output_stream(self, path: Path):
- parent_dir = path.parent
- if str(parent_dir) and not self.exists(parent_dir):
- self.mkdirs(parent_dir)
+ def new_output_stream(self, path: str):
+ path_str = self.to_filesystem_path(path)
+ parent_dir = Path(path_str).parent
+ if str(parent_dir) and not self.exists(str(parent_dir)):
+ self.mkdirs(str(parent_dir))
- return self.filesystem.open_output_stream(str(path))
+ return self.filesystem.open_output_stream(path_str)
- def get_file_status(self, path: Path):
- file_infos = self.filesystem.get_file_info([str(path)])
+ def get_file_status(self, path: str):
+ path_str = self.to_filesystem_path(path)
+ file_infos = self.filesystem.get_file_info([path_str])
return file_infos[0]
- def list_status(self, path: Path):
- selector = pyarrow.fs.FileSelector(str(path), recursive=False,
allow_not_found=True)
+ def list_status(self, path: str):
+ path_str = self.to_filesystem_path(path)
+ selector = pyarrow.fs.FileSelector(path_str, recursive=False,
allow_not_found=True)
return self.filesystem.get_file_info(selector)
- def list_directories(self, path: Path):
+ def list_directories(self, path: str):
file_infos = self.list_status(path)
return [info for info in file_infos if info.type ==
pyarrow.fs.FileType.Directory]
- def exists(self, path: Path) -> bool:
+ def exists(self, path: str) -> bool:
try:
- file_info = self.filesystem.get_file_info([str(path)])[0]
- return file_info.type != pyarrow.fs.FileType.NotFound
+ path_str = self.to_filesystem_path(path)
+ file_info = self.filesystem.get_file_info([path_str])[0]
+ result = file_info.type != pyarrow.fs.FileType.NotFound
+ return result
except Exception:
return False
- def delete(self, path: Path, recursive: bool = False) -> bool:
+ def delete(self, path: str, recursive: bool = False) -> bool:
try:
- file_info = self.filesystem.get_file_info([str(path)])[0]
+ path_str = self.to_filesystem_path(path)
+ file_info = self.filesystem.get_file_info([path_str])[0]
if file_info.type == pyarrow.fs.FileType.Directory:
if recursive:
- self.filesystem.delete_dir_contents(str(path))
+ self.filesystem.delete_dir_contents(path_str)
else:
- self.filesystem.delete_dir(str(path))
+ self.filesystem.delete_dir(path_str)
else:
- self.filesystem.delete_file(str(path))
+ self.filesystem.delete_file(path_str)
return True
except Exception as e:
self.logger.warning(f"Failed to delete {path}: {e}")
return False
- def mkdirs(self, path: Path) -> bool:
+ def mkdirs(self, path: str) -> bool:
try:
- self.filesystem.create_dir(str(path), recursive=True)
+ path_str = self.to_filesystem_path(path)
+ self.filesystem.create_dir(path_str, recursive=True)
return True
except Exception as e:
self.logger.warning(f"Failed to create directory {path}: {e}")
return False
- def rename(self, src: Path, dst: Path) -> bool:
+ def rename(self, src: str, dst: str) -> bool:
try:
- dst_parent = dst.parent
- if str(dst_parent) and not self.exists(dst_parent):
- self.mkdirs(dst_parent)
+ dst_str = self.to_filesystem_path(dst)
+ dst_parent = Path(dst_str).parent
+ if str(dst_parent) and not self.exists(str(dst_parent)):
+ self.mkdirs(str(dst_parent))
- self.filesystem.move(str(src), str(dst))
+ src_str = self.to_filesystem_path(src)
+ self.filesystem.move(src_str, dst_str)
return True
except Exception as e:
self.logger.warning(f"Failed to rename {src} to {dst}: {e}")
return False
- def delete_quietly(self, path: Path):
+ def delete_quietly(self, path: str):
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug(f"Ready to delete {path}")
@@ -224,11 +234,11 @@ class FileIO:
except Exception:
self.logger.warning(f"Exception occurs when deleting file {path}",
exc_info=True)
- def delete_files_quietly(self, files: List[Path]):
+ def delete_files_quietly(self, files: List[str]):
for file_path in files:
self.delete_quietly(file_path)
- def delete_directory_quietly(self, directory: Path):
+ def delete_directory_quietly(self, directory: str):
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug(f"Ready to delete {directory}")
@@ -238,29 +248,29 @@ class FileIO:
except Exception:
self.logger.warning(f"Exception occurs when deleting directory
{directory}", exc_info=True)
- def get_file_size(self, path: Path) -> int:
+ def get_file_size(self, path: str) -> int:
file_info = self.get_file_status(path)
if file_info.size is None:
raise ValueError(f"File size not available for {path}")
return file_info.size
- def is_dir(self, path: Path) -> bool:
+ def is_dir(self, path: str) -> bool:
file_info = self.get_file_status(path)
return file_info.type == pyarrow.fs.FileType.Directory
- def check_or_mkdirs(self, path: Path):
+ def check_or_mkdirs(self, path: str):
if self.exists(path):
if not self.is_dir(path):
raise ValueError(f"The path '{path}' should be a directory.")
else:
self.mkdirs(path)
- def read_file_utf8(self, path: Path) -> str:
+ def read_file_utf8(self, path: str) -> str:
with self.new_input_stream(path) as input_stream:
return input_stream.read().decode('utf-8')
- def try_to_write_atomic(self, path: Path, content: str) -> bool:
- temp_path = path.with_suffix(path.suffix + ".tmp") if path.suffix else
Path(str(path) + ".tmp")
+ def try_to_write_atomic(self, path: str, content: str) -> bool:
+ temp_path = path + ".tmp"
success = False
try:
self.write_file(temp_path, content, False)
@@ -270,29 +280,32 @@ class FileIO:
self.delete_quietly(temp_path)
return success
- def write_file(self, path: Path, content: str, overwrite: bool = False):
+ def write_file(self, path: str, content: str, overwrite: bool = False):
with self.new_output_stream(path) as output_stream:
output_stream.write(content.encode('utf-8'))
- def overwrite_file_utf8(self, path: Path, content: str):
+ def overwrite_file_utf8(self, path: str, content: str):
with self.new_output_stream(path) as output_stream:
output_stream.write(content.encode('utf-8'))
- def copy_file(self, source_path: Path, target_path: Path, overwrite: bool
= False):
+ def copy_file(self, source_path: str, target_path: str, overwrite: bool =
False):
if not overwrite and self.exists(target_path):
raise FileExistsError(f"Target file {target_path} already exists
and overwrite=False")
- self.filesystem.copy_file(str(source_path), str(target_path))
+ source_str = self.to_filesystem_path(source_path)
+ target_str = self.to_filesystem_path(target_path)
+ self.filesystem.copy_file(source_str, target_str)
- def copy_files(self, source_directory: Path, target_directory: Path,
overwrite: bool = False):
+ def copy_files(self, source_directory: str, target_directory: str,
overwrite: bool = False):
file_infos = self.list_status(source_directory)
for file_info in file_infos:
if file_info.type == pyarrow.fs.FileType.File:
- source_file = Path(file_info.path)
- target_file = target_directory / source_file.name
+ source_file = file_info.path
+ file_name = source_file.split('/')[-1]
+ target_file = f"{target_directory.rstrip('/')}/{file_name}" if
target_directory else file_name
self.copy_file(source_file, target_file, overwrite)
- def read_overwritten_file_utf8(self, path: Path) -> Optional[str]:
+ def read_overwritten_file_utf8(self, path: str) -> Optional[str]:
retry_number = 0
exception = None
while retry_number < 5:
@@ -319,7 +332,7 @@ class FileIO:
return None
- def write_parquet(self, path: Path, data: pyarrow.Table, compression: str
= 'snappy', **kwargs):
+ def write_parquet(self, path: str, data: pyarrow.Table, compression: str =
'snappy', **kwargs):
try:
import pyarrow.parquet as pq
@@ -330,7 +343,7 @@ class FileIO:
self.delete_quietly(path)
raise RuntimeError(f"Failed to write Parquet file {path}: {e}")
from e
- def write_orc(self, path: Path, data: pyarrow.Table, compression: str =
'zstd', **kwargs):
+ def write_orc(self, path: str, data: pyarrow.Table, compression: str =
'zstd', **kwargs):
try:
"""Write ORC file using PyArrow ORC writer."""
import sys
@@ -352,7 +365,7 @@ class FileIO:
self.delete_quietly(path)
raise RuntimeError(f"Failed to write ORC file {path}: {e}") from e
- def write_avro(self, path: Path, data: pyarrow.Table, avro_schema:
Optional[Dict[str, Any]] = None, **kwargs):
+ def write_avro(self, path: str, data: pyarrow.Table, avro_schema:
Optional[Dict[str, Any]] = None, **kwargs):
import fastavro
if avro_schema is None:
from pypaimon.schema.data_types import PyarrowFieldParser
@@ -370,7 +383,7 @@ class FileIO:
with self.new_output_stream(path) as output_stream:
fastavro.writer(output_stream, avro_schema, records, **kwargs)
- def write_blob(self, path: Path, data: pyarrow.Table, blob_as_descriptor:
bool, **kwargs):
+ def write_blob(self, path: str, data: pyarrow.Table, blob_as_descriptor:
bool, **kwargs):
try:
# Validate input constraints
if data.num_columns != 1:
@@ -423,3 +436,41 @@ class FileIO:
except Exception as e:
self.delete_quietly(path)
raise RuntimeError(f"Failed to write blob file {path}: {e}") from e
+
+ def to_filesystem_path(self, path: str) -> str:
+ from pyarrow.fs import S3FileSystem
+ import re
+
+ parsed = urlparse(path)
+ normalized_path = re.sub(r'/+', '/', parsed.path) if parsed.path else
''
+
+ if parsed.scheme and len(parsed.scheme) == 1 and not parsed.netloc:
+ # This is likely a Windows drive letter, not a URI scheme
+ return str(path)
+
+ if parsed.scheme == 'file' and parsed.netloc and
parsed.netloc.endswith(':'):
+ # file://C:/path format - netloc is 'C:', need to reconstruct path
with drive letter
+ drive_letter = parsed.netloc.rstrip(':')
+ path_part = normalized_path.lstrip('/')
+ return f"{drive_letter}:/{path_part}" if path_part else
f"{drive_letter}:"
+
+ if isinstance(self.filesystem, S3FileSystem):
+ # For S3, return "bucket/path" format
+ if parsed.scheme:
+ if parsed.netloc:
+ # Has netloc (bucket): return "bucket/path" format
+ path_part = normalized_path.lstrip('/')
+ return f"{parsed.netloc}/{path_part}" if path_part else
parsed.netloc
+ else:
+ # Has scheme but no netloc: return path without scheme
+ result = normalized_path.lstrip('/')
+ return result if result else '.'
+ return str(path)
+
+ if parsed.scheme:
+ # Handle empty path: return '.' for current directory
+ if not normalized_path:
+ return '.'
+ return normalized_path
+
+ return str(path)
diff --git a/paimon-python/pypaimon/common/uri_reader.py
b/paimon-python/pypaimon/common/uri_reader.py
index 823020caaf..e55b924d47 100644
--- a/paimon-python/pypaimon/common/uri_reader.py
+++ b/paimon-python/pypaimon/common/uri_reader.py
@@ -18,7 +18,6 @@
import io
from abc import ABC, abstractmethod
-from pathlib import Path
from typing import Any, Optional
from urllib.parse import urlparse, ParseResult
@@ -42,12 +41,11 @@ class UriReader(ABC):
def get_file_path(cls, uri: str):
parsed_uri = urlparse(uri)
if parsed_uri.scheme == 'file':
- path = Path(parsed_uri.path)
+ return parsed_uri.path
elif parsed_uri.scheme and parsed_uri.scheme != '':
- path = Path(parsed_uri.netloc + parsed_uri.path)
+ return f"{parsed_uri.netloc}{parsed_uri.path}"
else:
- path = Path(uri)
- return path
+ return uri
@abstractmethod
def new_input_stream(self, uri: str):
@@ -61,8 +59,7 @@ class FileUriReader(UriReader):
def new_input_stream(self, uri: str):
try:
- path = self.get_file_path(uri)
- return self._file_io.new_input_stream(path)
+ return self._file_io.new_input_stream(uri)
except Exception as e:
raise IOError(f"Failed to read file {uri}: {e}")
diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py
b/paimon-python/pypaimon/manifest/manifest_file_manager.py
index 567bba1cbd..01c7094f8f 100644
--- a/paimon-python/pypaimon/manifest/manifest_file_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py
@@ -38,7 +38,8 @@ class ManifestFileManager:
from pypaimon.table.file_store_table import FileStoreTable
self.table: FileStoreTable = table
- self.manifest_path = table.table_path / "manifest"
+ manifest_path = table.table_path.rstrip('/')
+ self.manifest_path = f"{manifest_path}/manifest"
self.file_io = table.file_io
self.partition_keys_fields = self.table.partition_keys_fields
self.primary_keys_fields = self.table.primary_keys_fields
@@ -69,7 +70,7 @@ class ManifestFileManager:
return final_entries
def read(self, manifest_file_name: str, manifest_entry_filter=None,
drop_stats=True) -> List[ManifestEntry]:
- manifest_file_path = self.manifest_path / manifest_file_name
+ manifest_file_path = f"{self.manifest_path}/{manifest_file_name}"
entries = []
with self.file_io.new_input_stream(manifest_file_path) as input_stream:
@@ -188,7 +189,7 @@ class ManifestFileManager:
}
avro_records.append(avro_record)
- manifest_path = self.manifest_path / file_name
+ manifest_path = f"{self.manifest_path}/{file_name}"
try:
buffer = BytesIO()
fastavro.writer(buffer, MANIFEST_ENTRY_SCHEMA, avro_records)
diff --git a/paimon-python/pypaimon/manifest/manifest_list_manager.py
b/paimon-python/pypaimon/manifest/manifest_list_manager.py
index 367f802de5..a1897fbee7 100644
--- a/paimon-python/pypaimon/manifest/manifest_list_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_list_manager.py
@@ -36,7 +36,8 @@ class ManifestListManager:
from pypaimon.table.file_store_table import FileStoreTable
self.table: FileStoreTable = table
- self.manifest_path = self.table.table_path / "manifest"
+ manifest_path = table.table_path.rstrip('/')
+ self.manifest_path = f"{manifest_path}/manifest"
self.file_io = self.table.file_io
def read_all(self, snapshot: Snapshot) -> List[ManifestFileMeta]:
@@ -53,7 +54,7 @@ class ManifestListManager:
def read(self, manifest_list_name: str) -> List[ManifestFileMeta]:
manifest_files = []
- manifest_list_path = self.manifest_path / manifest_list_name
+ manifest_list_path = f"{self.manifest_path}/{manifest_list_name}"
with self.file_io.new_input_stream(manifest_list_path) as input_stream:
avro_bytes = input_stream.read()
buffer = BytesIO(avro_bytes)
@@ -101,7 +102,7 @@ class ManifestListManager:
}
avro_records.append(avro_record)
- list_path = self.manifest_path / file_name
+ list_path = f"{self.manifest_path}/{file_name}"
try:
buffer = BytesIO()
fastavro.writer(buffer, MANIFEST_FILE_META_SCHEMA, avro_records)
diff --git a/paimon-python/pypaimon/manifest/schema/data_file_meta.py
b/paimon-python/pypaimon/manifest/schema/data_file_meta.py
index 405c2e3483..d7942d1e9f 100644
--- a/paimon-python/pypaimon/manifest/schema/data_file_meta.py
+++ b/paimon-python/pypaimon/manifest/schema/data_file_meta.py
@@ -18,7 +18,6 @@
from dataclasses import dataclass
from datetime import datetime
-from pathlib import Path
from typing import List, Optional
from pypaimon.manifest.schema.simple_stats import (KEY_STATS_SCHEMA,
VALUE_STATS_SCHEMA,
@@ -53,13 +52,13 @@ class DataFileMeta:
# not a schema field, just for internal usage
file_path: str = None
- def set_file_path(self, table_path: Path, partition: GenericRow, bucket:
int):
- path_builder = table_path
+ def set_file_path(self, table_path: str, partition: GenericRow, bucket:
int):
+ path_builder = table_path.rstrip('/')
partition_dict = partition.to_dict()
for field_name, field_value in partition_dict.items():
- path_builder = path_builder / (field_name + "=" + str(field_value))
- path_builder = path_builder / ("bucket-" + str(bucket)) /
self.file_name
- self.file_path = str(path_builder)
+ path_builder = f"{path_builder}/{field_name}={str(field_value)}"
+ path_builder = f"{path_builder}/bucket-{str(bucket)}/{self.file_name}"
+ self.file_path = path_builder
def copy_without_stats(self) -> 'DataFileMeta':
"""Create a new DataFileMeta without value statistics."""
diff --git a/paimon-python/pypaimon/read/reader/format_avro_reader.py
b/paimon-python/pypaimon/read/reader/format_avro_reader.py
index a9ca01a6e5..4114d8e93b 100644
--- a/paimon-python/pypaimon/read/reader/format_avro_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_avro_reader.py
@@ -36,7 +36,8 @@ class FormatAvroReader(RecordBatchReader):
def __init__(self, file_io: FileIO, file_path: str, read_fields:
List[str], full_fields: List[DataField],
push_down_predicate: Any, batch_size: int = 4096):
- self._file = file_io.filesystem.open_input_file(file_path)
+ file_path_for_io = file_io.to_filesystem_path(file_path)
+ self._file = file_io.filesystem.open_input_file(file_path_for_io)
self._avro_reader = fastavro.reader(self._file)
self._batch_size = batch_size
self._push_down_predicate = push_down_predicate
diff --git a/paimon-python/pypaimon/read/reader/format_blob_reader.py
b/paimon-python/pypaimon/read/reader/format_blob_reader.py
index e6846dc568..b993dc9221 100644
--- a/paimon-python/pypaimon/read/reader/format_blob_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_blob_reader.py
@@ -16,7 +16,6 @@
# limitations under the License.
################################################################################
import struct
-from pathlib import Path
from typing import List, Optional, Any, Iterator
import pyarrow as pa
@@ -42,7 +41,7 @@ class FormatBlobReader(RecordBatchReader):
self._blob_as_descriptor = blob_as_descriptor
# Get file size
- self._file_size = file_io.get_file_size(Path(file_path))
+ self._file_size = file_io.get_file_size(file_path)
# Initialize the low-level blob format reader
self.file_path = file_path
@@ -124,7 +123,7 @@ class FormatBlobReader(RecordBatchReader):
self._blob_iterator = None
def _read_index(self) -> None:
- with self._file_io.new_input_stream(Path(self.file_path)) as f:
+ with self._file_io.new_input_stream(self.file_path) as f:
# Seek to header: last 5 bytes
f.seek(self._file_size - 5)
header = f.read(5)
diff --git a/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
b/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
index d6c403b0f9..4d2f1f4c0f 100644
--- a/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
@@ -33,7 +33,8 @@ class FormatPyArrowReader(RecordBatchReader):
def __init__(self, file_io: FileIO, file_format: str, file_path: str,
read_fields: List[str],
push_down_predicate: Any, batch_size: int = 4096):
- self.dataset = ds.dataset(file_path, format=file_format,
filesystem=file_io.filesystem)
+ file_path_for_pyarrow = file_io.to_filesystem_path(file_path)
+ self.dataset = ds.dataset(file_path_for_pyarrow, format=file_format,
filesystem=file_io.filesystem)
self.reader = self.dataset.scanner(
columns=read_fields,
filter=push_down_predicate,
diff --git a/paimon-python/pypaimon/sample/oss_blob_as_descriptor.py
b/paimon-python/pypaimon/sample/oss_blob_as_descriptor.py
new file mode 100644
index 0000000000..16696f298a
--- /dev/null
+++ b/paimon-python/pypaimon/sample/oss_blob_as_descriptor.py
@@ -0,0 +1,111 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import logging
+import pyarrow as pa
+
+from pypaimon.catalog.catalog_factory import CatalogFactory
+
+# Enable debug logging for catalog operations
+logging.basicConfig(level=logging.DEBUG, format='%(levelname)s: %(name)s:
%(message)s')
+from pypaimon.schema.schema import Schema
+from pypaimon.table.row.blob import BlobDescriptor, Blob
+
+
+def oss_blob_as_descriptor():
+ warehouse = 'oss://<your-bucket>/<warehouse-path>'
+ catalog = CatalogFactory.create({
+ 'warehouse': warehouse,
+ 'fs.oss.endpoint': 'oss-<your-region>.aliyuncs.com',
+ 'fs.oss.accessKeyId': '<your-ak>',
+ 'fs.oss.accessKeySecret': '<your-sk>',
+ 'fs.oss.region': '<your-region>'
+ })
+
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('blob_data', pa.large_binary()),
+ ])
+
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'blob-as-descriptor': 'true',
+ 'target-file-size': '100MB'
+ }
+ )
+
+ catalog.create_database("test_db", True)
+ catalog.create_table("test_db.blob_uri_scheme_test", schema, True)
+ table = catalog.get_table("test_db.blob_uri_scheme_test")
+
+ # Create external blob file in OSS
+ external_blob_uri =
f"{warehouse.rstrip('/')}/external_blob_scheme_test.bin"
+ blob_content = b'This is external blob data'
+
+ with table.file_io.new_output_stream(external_blob_uri) as out_stream:
+ out_stream.write(blob_content)
+
+ # Create BlobDescriptor with OSS scheme
+ blob_descriptor = BlobDescriptor(external_blob_uri, 0, len(blob_content))
+ descriptor_bytes = blob_descriptor.serialize()
+
+ # Write the descriptor bytes to the table
+ test_data = pa.Table.from_pydict({
+ 'id': [1],
+ 'blob_data': [descriptor_bytes]
+ }, schema=pa_schema)
+
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.write_arrow(test_data)
+ commit_messages = writer.prepare_commit()
+ commit = write_builder.new_commit()
+ commit.commit(commit_messages)
+ writer.close()
+
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ result = table_read.to_arrow(table_scan.plan().splits())
+
+ picture_bytes = result.column('blob_data').to_pylist()[0]
+ new_blob_descriptor = BlobDescriptor.deserialize(picture_bytes)
+
+ print(f"Original URI: {external_blob_uri}")
+ print(f"Read URI: {new_blob_descriptor.uri}")
+ assert new_blob_descriptor.uri.startswith('oss://'), \
+ f"URI scheme should be preserved. Got: {new_blob_descriptor.uri}"
+
+ from pypaimon.common.uri_reader import FileUriReader
+ uri_reader = FileUriReader(table.file_io)
+ blob = Blob.from_descriptor(uri_reader, new_blob_descriptor)
+
+ blob_descriptor_from_blob = blob.to_descriptor()
+ print(f"Blob descriptor URI from Blob.from_descriptor:
{blob_descriptor_from_blob.uri}")
+
+ read_data = blob.to_data()
+ assert read_data == blob_content, "Blob data should match original content"
+
+ print("✅ All assertions passed!")
+
+
+if __name__ == '__main__':
+ oss_blob_as_descriptor()
diff --git a/paimon-python/pypaimon/sample/rest_catalog_read_write_sample.py
b/paimon-python/pypaimon/sample/rest_catalog_read_write_sample.py
new file mode 100644
index 0000000000..543e63dd09
--- /dev/null
+++ b/paimon-python/pypaimon/sample/rest_catalog_read_write_sample.py
@@ -0,0 +1,108 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+"""
+Example: REST Catalog Read and Write
+
+Demonstrates:
+1. REST catalog basic read/write operations
+2. Table paths include URI schemes (file://, oss://, s3://)
+3. Paths with schemes can be used directly with FileIO
+"""
+
+import tempfile
+import uuid
+
+import pandas as pd
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+from pypaimon.tests.rest.rest_server import RESTCatalogServer
+from pypaimon.api.api_response import ConfigResponse
+from pypaimon.api.auth import BearTokenAuthProvider
+
+
+def main():
+ """REST catalog read/write example with path scheme demonstration."""
+
+ # Setup mock REST server
+ temp_dir = tempfile.mkdtemp()
+ token = str(uuid.uuid4())
+ server = RESTCatalogServer(
+ data_path=temp_dir,
+ auth_provider=BearTokenAuthProvider(token),
+ config=ConfigResponse(defaults={"prefix": "mock-test"}),
+ warehouse="warehouse"
+ )
+ server.start()
+ print(f"REST server started at: {server.get_url()}")
+
+ try:
+ # Note: warehouse must match server's warehouse parameter for config
endpoint
+ catalog = CatalogFactory.create({
+ 'metastore': 'rest',
+ 'uri': f"http://localhost:{server.port}",
+ 'warehouse': "warehouse", # Must match server's warehouse
parameter
+ 'token.provider': 'bear',
+ 'token': token,
+ })
+ catalog.create_database("default", False)
+
+ schema = Schema.from_pyarrow_schema(pa.schema([
+ ('id', pa.int64()),
+ ('name', pa.string()),
+ ('value', pa.float64()),
+ ]))
+ table_name = 'default.example_table'
+ catalog.create_table(table_name, schema, False)
+ table = catalog.get_table(table_name)
+
+ table_path = table.table_path
+ print(f"\nTable path: {table_path}")
+
+ print("\nWriting data...")
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ data = pd.DataFrame({
+ 'id': [1, 2, 3],
+ 'name': ['Alice', 'Bob', 'Charlie'],
+ 'value': [10.5, 20.3, 30.7]
+ })
+ table_write.write_pandas(data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+ print("Data written successfully!")
+
+ # Read data
+ print("\nReading data...")
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ splits = table_scan.plan().splits()
+ result = table_read.to_pandas(splits)
+ print(result)
+
+ finally:
+ server.shutdown()
+ print("\nServer stopped")
+
+
+if __name__ == '__main__':
+ main()
diff --git a/paimon-python/pypaimon/schema/schema_manager.py
b/paimon-python/pypaimon/schema/schema_manager.py
index 33c6d4af8b..86f0f00c35 100644
--- a/paimon-python/pypaimon/schema/schema_manager.py
+++ b/paimon-python/pypaimon/schema/schema_manager.py
@@ -15,7 +15,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-from pathlib import Path
from typing import Optional, List
from pypaimon.common.file_io import FileIO
@@ -26,11 +25,11 @@ from pypaimon.schema.table_schema import TableSchema
class SchemaManager:
- def __init__(self, file_io: FileIO, table_path: Path):
+ def __init__(self, file_io: FileIO, table_path: str):
self.schema_prefix = "schema-"
self.file_io = file_io
self.table_path = table_path
- self.schema_path = table_path / "schema"
+ self.schema_path = f"{table_path.rstrip('/')}/schema"
self.schema_cache = {}
def latest(self) -> Optional['TableSchema']:
@@ -65,8 +64,8 @@ class SchemaManager:
except Exception as e:
raise RuntimeError(f"Failed to commit schema: {e}") from e
- def _to_schema_path(self, schema_id: int) -> Path:
- return self.schema_path / f"{self.schema_prefix}{schema_id}"
+ def _to_schema_path(self, schema_id: int) -> str:
+ return
f"{self.schema_path.rstrip('/')}/{self.schema_prefix}{schema_id}"
def get_schema(self, schema_id: int) -> Optional[TableSchema]:
if schema_id not in self.schema_cache:
@@ -87,7 +86,7 @@ class SchemaManager:
versions = []
for status in statuses:
- name = Path(status.path).name
+ name = status.path.split('/')[-1]
if name.startswith(self.schema_prefix):
try:
version = int(name[len(self.schema_prefix):])
diff --git a/paimon-python/pypaimon/snapshot/snapshot_manager.py
b/paimon-python/pypaimon/snapshot/snapshot_manager.py
index 87b42c6f35..0d96563057 100644
--- a/paimon-python/pypaimon/snapshot/snapshot_manager.py
+++ b/paimon-python/pypaimon/snapshot/snapshot_manager.py
@@ -15,7 +15,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-from pathlib import Path
from typing import Optional
from pypaimon.common.file_io import FileIO
@@ -31,8 +30,9 @@ class SnapshotManager:
self.table: FileStoreTable = table
self.file_io: FileIO = self.table.file_io
- self.snapshot_dir = self.table.table_path / "snapshot"
- self.latest_file = self.snapshot_dir / "LATEST"
+ snapshot_path = self.table.table_path.rstrip('/')
+ self.snapshot_dir = f"{snapshot_path}/snapshot"
+ self.latest_file = f"{self.snapshot_dir}/LATEST"
def get_latest_snapshot(self) -> Optional[Snapshot]:
if not self.file_io.exists(self.latest_file):
@@ -41,14 +41,14 @@ class SnapshotManager:
latest_content = self.file_io.read_file_utf8(self.latest_file)
latest_snapshot_id = int(latest_content.strip())
- snapshot_file = self.snapshot_dir / f"snapshot-{latest_snapshot_id}"
+ snapshot_file = f"{self.snapshot_dir}/snapshot-{latest_snapshot_id}"
if not self.file_io.exists(snapshot_file):
return None
snapshot_content = self.file_io.read_file_utf8(snapshot_file)
return JSON.from_json(snapshot_content, Snapshot)
- def get_snapshot_path(self, snapshot_id: int) -> Path:
+ def get_snapshot_path(self, snapshot_id: int) -> str:
"""
Get the path for a snapshot file.
@@ -58,11 +58,12 @@ class SnapshotManager:
Returns:
Path to the snapshot file
"""
- return self.snapshot_dir / f"snapshot-{snapshot_id}"
+ return f"{self.snapshot_dir}/snapshot-{snapshot_id}"
def try_get_earliest_snapshot(self) -> Optional[Snapshot]:
- if self.file_io.exists(self.snapshot_dir / "EARLIEST"):
- earliest_content = self.file_io.read_file_utf8(self.snapshot_dir /
"EARLIEST")
+ earliest_file = f"{self.snapshot_dir}/EARLIEST"
+ if self.file_io.exists(earliest_file):
+ earliest_content = self.file_io.read_file_utf8(earliest_file)
earliest_snapshot_id = int(earliest_content.strip())
return self.get_snapshot_by_id(earliest_snapshot_id)
else:
diff --git a/paimon-python/pypaimon/table/file_store_table.py
b/paimon-python/pypaimon/table/file_store_table.py
index 6132c9bd69..b8e08b6f52 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -16,7 +16,6 @@
# limitations under the License.
################################################################################
-from pathlib import Path
from typing import Optional
from pypaimon.catalog.catalog_environment import CatalogEnvironment
@@ -37,7 +36,7 @@ from pypaimon.write.row_key_extractor import
(DynamicBucketRowKeyExtractor,
class FileStoreTable(Table):
- def __init__(self, file_io: FileIO, identifier: Identifier, table_path:
Path,
+ def __init__(self, file_io: FileIO, identifier: Identifier, table_path:
str,
table_schema: TableSchema, catalog_environment:
Optional[CatalogEnvironment] = None):
self.file_io = file_io
self.identifier = identifier
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py
b/paimon-python/pypaimon/tests/blob_table_test.py
index 3a2eee98fb..2b79cf5079 100644
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -1059,6 +1059,13 @@ class DataBlobWriterTest(unittest.TestCase):
uri_reader = uri_reader_factory.create(new_blob_descriptor.uri)
blob = Blob.from_descriptor(uri_reader, new_blob_descriptor)
+ blob_descriptor_from_blob = blob.to_descriptor()
+ self.assertEqual(
+ blob_descriptor_from_blob.uri,
+ new_blob_descriptor.uri,
+ f"URI should be preserved. Expected: {new_blob_descriptor.uri},
Got: {blob_descriptor_from_blob.uri}"
+ )
+
# Verify the blob data matches the original
self.assertEqual(blob.to_data(), blob_data, "Blob data should match
original")
diff --git a/paimon-python/pypaimon/tests/blob_test.py
b/paimon-python/pypaimon/tests/blob_test.py
index b66c1dec9a..f833ab1646 100644
--- a/paimon-python/pypaimon/tests/blob_test.py
+++ b/paimon-python/pypaimon/tests/blob_test.py
@@ -40,16 +40,31 @@ class MockFileIO:
def get_file_size(self, path: str) -> int:
"""Get file size."""
- return self._file_io.get_file_size(Path(path))
+ return self._file_io.get_file_size(path)
- def new_input_stream(self, path: Path):
+ def new_input_stream(self, path):
"""Create new input stream for reading."""
+ if not isinstance(path, (str, type(None))):
+ path = str(path)
return self._file_io.new_input_stream(path)
-class BlobTest(unittest.TestCase):
- """Tests for Blob interface following org.apache.paimon.data.BlobTest."""
+def _to_url(path):
+ """Convert Path to file:// URI string."""
+ if isinstance(path, Path):
+ path_str = str(path)
+ is_absolute = os.path.isabs(path_str) or (len(path_str) >= 2 and
path_str[1] == ':')
+ if is_absolute:
+ if path_str.startswith('/'):
+ return f"file://{path_str}"
+ else:
+ return f"file:///{path_str}"
+ else:
+ return f"file://{path_str}"
+ return str(path) if path else path
+
+class BlobTest(unittest.TestCase):
def setUp(self):
"""Set up test environment with temporary file."""
# Create a temporary directory and file
@@ -562,9 +577,11 @@ class BlobEndToEndTest(unittest.TestCase):
blob_data = [test_data[blob_field_name].to_data()]
schema = pa.schema([pa.field(blob_field_name, pa.large_binary())])
table = pa.table([blob_data], schema=schema)
- blob_files[blob_field_name] = Path(self.temp_dir) / (blob_field_name +
".blob")
- file_io.write_blob(blob_files[blob_field_name], table, False)
- self.assertTrue(file_io.exists(blob_files[blob_field_name]))
+ blob_file_path = Path(self.temp_dir) / (blob_field_name + ".blob")
+ blob_file_url = _to_url(blob_file_path)
+ blob_files[blob_field_name] = blob_file_url
+ file_io.write_blob(blob_file_url, table, False)
+ self.assertTrue(file_io.exists(blob_file_url))
# ========== Step 3: Read Data and Check Data ==========
for field_name, file_path in blob_files.items():
@@ -597,7 +614,6 @@ class BlobEndToEndTest(unittest.TestCase):
from pypaimon.common.file_io import FileIO
from pypaimon.table.row.generic_row import GenericRow,
GenericRowSerializer
from pypaimon.table.row.row_kind import RowKind
- from pathlib import Path
# Set up file I/O
file_io = FileIO(self.temp_dir, {})
@@ -678,10 +694,11 @@ class BlobEndToEndTest(unittest.TestCase):
], schema=multi_column_schema)
multi_column_file = Path(self.temp_dir) / "multi_column.blob"
+ multi_column_url = _to_url(multi_column_file)
# Should throw RuntimeError for multiple columns
with self.assertRaises(RuntimeError) as context:
- file_io.write_blob(multi_column_file, multi_column_table, False)
+ file_io.write_blob(multi_column_url, multi_column_table, False)
self.assertIn("single column", str(context.exception))
# Test that FileIO.write_blob rejects null values
@@ -689,10 +706,11 @@ class BlobEndToEndTest(unittest.TestCase):
null_table = pa.table([[b"data", None]], schema=null_schema)
null_file = Path(self.temp_dir) / "null_data.blob"
+ null_file_url = _to_url(null_file)
# Should throw RuntimeError for null values
with self.assertRaises(RuntimeError) as context:
- file_io.write_blob(null_file, null_table, False)
+ file_io.write_blob(null_file_url, null_table, False)
self.assertIn("null values", str(context.exception))
# ========== Test FormatBlobReader with complex type schema ==========
@@ -702,7 +720,8 @@ class BlobEndToEndTest(unittest.TestCase):
valid_table = pa.table([valid_blob_data], schema=valid_schema)
valid_blob_file = Path(self.temp_dir) / "valid_blob.blob"
- file_io.write_blob(valid_blob_file, valid_table, False)
+ valid_blob_url = _to_url(valid_blob_file)
+ file_io.write_blob(valid_blob_url, valid_table, False)
# Try to read with complex type field definition - this should fail
# because FormatBlobReader tries to create PyArrow schema with complex
types
@@ -737,7 +756,6 @@ class BlobEndToEndTest(unittest.TestCase):
from pypaimon.schema.data_types import DataField, AtomicType
from pypaimon.common.file_io import FileIO
from pypaimon.common.delta_varint_compressor import
DeltaVarintCompressor
- from pathlib import Path
# Set up file I/O
file_io = FileIO(self.temp_dir, {})
@@ -750,7 +768,8 @@ class BlobEndToEndTest(unittest.TestCase):
valid_table = pa.table([valid_blob_data], schema=valid_schema)
header_test_file = Path(self.temp_dir) / "header_test.blob"
- file_io.write_blob(header_test_file, valid_table, False)
+ header_test_url = _to_url(header_test_file)
+ file_io.write_blob(header_test_url, valid_table, False)
# Read the file and corrupt the header (last 5 bytes: index_length +
version)
with open(header_test_file, 'rb') as f:
@@ -788,7 +807,8 @@ class BlobEndToEndTest(unittest.TestCase):
large_table = pa.table([large_blob_data], schema=large_schema)
full_blob_file = Path(self.temp_dir) / "full_blob.blob"
- file_io.write_blob(full_blob_file, large_table, False)
+ full_blob_url = _to_url(full_blob_file)
+ file_io.write_blob(full_blob_url, large_table, False)
# Read the full file and truncate it in the middle
with open(full_blob_file, 'rb') as f:
@@ -826,11 +846,12 @@ class BlobEndToEndTest(unittest.TestCase):
zero_table = pa.table([zero_blob_data], schema=zero_schema)
zero_blob_file = Path(self.temp_dir) / "zero_length.blob"
- file_io.write_blob(zero_blob_file, zero_table, False)
+ zero_blob_url = _to_url(zero_blob_file)
+ file_io.write_blob(zero_blob_url, zero_table, False)
# Verify file was created
- self.assertTrue(file_io.exists(zero_blob_file))
- file_size = file_io.get_file_size(zero_blob_file)
+ self.assertTrue(file_io.exists(zero_blob_url))
+ file_size = file_io.get_file_size(zero_blob_url)
self.assertGreater(file_size, 0) # File should have headers even with
empty blob
# Read zero-length blob
@@ -868,10 +889,11 @@ class BlobEndToEndTest(unittest.TestCase):
large_sim_table = pa.table([simulated_large_data],
schema=large_sim_schema)
large_sim_file = Path(self.temp_dir) / "large_simulation.blob"
- file_io.write_blob(large_sim_file, large_sim_table, False)
+ large_sim_url = _to_url(large_sim_file)
+ file_io.write_blob(large_sim_url, large_sim_table, False)
# Verify large file was written
- large_sim_size = file_io.get_file_size(large_sim_file)
+ large_sim_size = file_io.get_file_size(large_sim_url)
self.assertGreater(large_sim_size, 10 * 1024 * 1024) # Should be >
10MB
# Read large blob in memory-safe manner
@@ -937,10 +959,11 @@ class BlobEndToEndTest(unittest.TestCase):
], schema=multi_field_schema)
multi_field_file = Path(self.temp_dir) / "multi_field.blob"
+ multi_field_url = _to_url(multi_field_file)
# Should reject multi-field table
with self.assertRaises(RuntimeError) as context:
- file_io.write_blob(multi_field_file, multi_field_table, False)
+ file_io.write_blob(multi_field_url, multi_field_table, False)
self.assertIn("single column", str(context.exception))
# Test that blob format rejects non-binary field types
@@ -948,10 +971,11 @@ class BlobEndToEndTest(unittest.TestCase):
non_binary_table = pa.table([["not_binary_data"]],
schema=non_binary_schema)
non_binary_file = Path(self.temp_dir) / "non_binary.blob"
+ non_binary_url = _to_url(non_binary_file)
# Should reject non-binary field
with self.assertRaises(RuntimeError) as context:
- file_io.write_blob(non_binary_file, non_binary_table, False)
+ file_io.write_blob(non_binary_url, non_binary_table, False)
# Should fail due to type conversion issues (non-binary field can't be
converted to BLOB)
self.assertTrue(
"large_binary" in str(context.exception) or
@@ -965,10 +989,11 @@ class BlobEndToEndTest(unittest.TestCase):
null_table = pa.table([[b"data", None, b"more_data"]],
schema=null_schema)
null_file = Path(self.temp_dir) / "with_nulls.blob"
+ null_file_url = _to_url(null_file)
# Should reject null values
with self.assertRaises(RuntimeError) as context:
- file_io.write_blob(null_file, null_table, False)
+ file_io.write_blob(null_file_url, null_table, False)
self.assertIn("null values", str(context.exception))
def test_blob_end_to_end_with_descriptor(self):
@@ -1007,10 +1032,11 @@ class BlobEndToEndTest(unittest.TestCase):
# Write the blob file with blob_as_descriptor=True
blob_file_path = Path(self.temp_dir) / "descriptor_blob.blob"
- file_io.write_blob(blob_file_path, table, blob_as_descriptor=True)
+ blob_file_url = _to_url(blob_file_path)
+ file_io.write_blob(blob_file_url, table, blob_as_descriptor=True)
# Verify the blob file was created
- self.assertTrue(file_io.exists(blob_file_path))
- file_size = file_io.get_file_size(blob_file_path)
+ self.assertTrue(file_io.exists(blob_file_url))
+ file_size = file_io.get_file_size(blob_file_url)
self.assertGreater(file_size, 0)
# ========== Step 3: Read data and check ==========
diff --git a/paimon-python/pypaimon/tests/file_io_test.py
b/paimon-python/pypaimon/tests/file_io_test.py
new file mode 100644
index 0000000000..1d6776058e
--- /dev/null
+++ b/paimon-python/pypaimon/tests/file_io_test.py
@@ -0,0 +1,127 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import unittest
+from pathlib import Path
+
+from pyarrow.fs import S3FileSystem, LocalFileSystem
+
+from pypaimon.common.file_io import FileIO
+
+
+class FileIOTest(unittest.TestCase):
+ """Test cases for FileIO.to_filesystem_path method."""
+
+ def test_s3_filesystem_path_conversion(self):
+ """Test S3FileSystem path conversion with various formats."""
+ file_io = FileIO("s3://bucket/warehouse", {})
+ self.assertIsInstance(file_io.filesystem, S3FileSystem)
+
+ # Test bucket and path
+
self.assertEqual(file_io.to_filesystem_path("s3://my-bucket/path/to/file.txt"),
+ "my-bucket/path/to/file.txt")
+
self.assertEqual(file_io.to_filesystem_path("oss://my-bucket/path/to/file.txt"),
+ "my-bucket/path/to/file.txt")
+
+ # Test bucket only
+ self.assertEqual(file_io.to_filesystem_path("s3://my-bucket"),
"my-bucket")
+ self.assertEqual(file_io.to_filesystem_path("oss://my-bucket"),
"my-bucket")
+
+ # Test scheme but no netloc
+
self.assertEqual(file_io.to_filesystem_path("oss:///path/to/file.txt"),
"path/to/file.txt")
+ self.assertEqual(file_io.to_filesystem_path("s3:///path/to/file.txt"),
"path/to/file.txt")
+
+ # Test empty path
+ self.assertEqual(file_io.to_filesystem_path("oss:///"), ".")
+
+ # Test path without scheme
+ self.assertEqual(file_io.to_filesystem_path("bucket/path/to/file.txt"),
+ "bucket/path/to/file.txt")
+
+ # Test idempotency
+ converted_path = "my-bucket/path/to/file.txt"
+ self.assertEqual(file_io.to_filesystem_path(converted_path),
converted_path)
+ parent_str = str(Path(converted_path).parent)
+ self.assertEqual(file_io.to_filesystem_path(parent_str), parent_str)
+
+ def test_local_filesystem_path_conversion(self):
+ """Test LocalFileSystem path conversion with various formats."""
+ file_io = FileIO("file:///tmp/warehouse", {})
+ self.assertIsInstance(file_io.filesystem, LocalFileSystem)
+
+ # Test file:// scheme
+
self.assertEqual(file_io.to_filesystem_path("file:///tmp/path/to/file.txt"),
+ "/tmp/path/to/file.txt")
+
self.assertEqual(file_io.to_filesystem_path("file:///path/to/file.txt"),
+ "/path/to/file.txt")
+
+ # Test empty paths
+ self.assertEqual(file_io.to_filesystem_path("file://"), ".")
+ self.assertEqual(file_io.to_filesystem_path("file:///"), "/")
+
+ # Test paths without scheme
+ self.assertEqual(file_io.to_filesystem_path("/tmp/path/to/file.txt"),
+ "/tmp/path/to/file.txt")
+
self.assertEqual(file_io.to_filesystem_path("relative/path/to/file.txt"),
+ "relative/path/to/file.txt")
+
self.assertEqual(file_io.to_filesystem_path("./relative/path/to/file.txt"),
+ "./relative/path/to/file.txt")
+
+ # Test idempotency
+ converted_path = "/tmp/path/to/file.txt"
+ self.assertEqual(file_io.to_filesystem_path(converted_path),
converted_path)
+ parent_str = str(Path(converted_path).parent)
+ self.assertEqual(file_io.to_filesystem_path(parent_str), parent_str)
+
+ def test_windows_path_handling(self):
+ """Test Windows path handling (drive letters, file:// scheme)."""
+ file_io = FileIO("file:///tmp/warehouse", {})
+ self.assertIsInstance(file_io.filesystem, LocalFileSystem)
+
+ # Windows absolute paths
+ self.assertEqual(file_io.to_filesystem_path("C:\\path\\to\\file.txt"),
+ "C:\\path\\to\\file.txt")
+ self.assertEqual(file_io.to_filesystem_path("C:/path/to/file.txt"),
+ "C:/path/to/file.txt")
+ self.assertEqual(file_io.to_filesystem_path("C:"), "C:")
+
+ # file:// scheme with Windows drive
+
self.assertEqual(file_io.to_filesystem_path("file://C:/path/to/file.txt"),
+ "C:/path/to/file.txt")
+ self.assertEqual(file_io.to_filesystem_path("file://C:/path"),
"C:/path")
+ self.assertEqual(file_io.to_filesystem_path("file://C:"), "C:")
+
self.assertEqual(file_io.to_filesystem_path("file:///C:/path/to/file.txt"),
+ "/C:/path/to/file.txt")
+
+ # Windows path with S3FileSystem (should preserve)
+ s3_file_io = FileIO("s3://bucket/warehouse", {})
+
self.assertEqual(s3_file_io.to_filesystem_path("C:\\path\\to\\file.txt"),
+ "C:\\path\\to\\file.txt")
+
+ def test_path_normalization(self):
+ """Test path normalization (multiple slashes)."""
+ file_io = FileIO("file:///tmp/warehouse", {})
+
self.assertEqual(file_io.to_filesystem_path("file://///tmp///path///file.txt"),
+ "/tmp/path/file.txt")
+
+ s3_file_io = FileIO("s3://bucket/warehouse", {})
+
self.assertEqual(s3_file_io.to_filesystem_path("s3://my-bucket///path///to///file.txt"),
+ "my-bucket/path/to/file.txt")
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/paimon-python/pypaimon/tests/file_store_commit_test.py
b/paimon-python/pypaimon/tests/file_store_commit_test.py
index ab566c3e52..f1fe4de3d2 100644
--- a/paimon-python/pypaimon/tests/file_store_commit_test.py
+++ b/paimon-python/pypaimon/tests/file_store_commit_test.py
@@ -18,7 +18,6 @@
import unittest
from datetime import datetime
-from pathlib import Path
from unittest.mock import Mock, patch
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
@@ -41,7 +40,7 @@ class TestFileStoreCommit(unittest.TestCase):
self.mock_table = Mock()
self.mock_table.partition_keys = ['dt', 'region']
self.mock_table.current_branch.return_value = 'main'
- self.mock_table.table_path = Path('/test/table/path')
+ self.mock_table.table_path = '/test/table/path'
self.mock_table.file_io = Mock()
# Mock snapshot commit
diff --git a/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py
b/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py
new file mode 100644
index 0000000000..810e001341
--- /dev/null
+++ b/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py
@@ -0,0 +1,115 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import os
+import tempfile
+import unittest
+from unittest.mock import patch
+
+from pypaimon.catalog.rest.rest_token_file_io import RESTTokenFileIO
+from pypaimon.common.file_io import FileIO
+from pypaimon.common.identifier import Identifier
+
+
+class RESTTokenFileIOTest(unittest.TestCase):
+ """Test cases for RESTTokenFileIO."""
+
+ def setUp(self):
+ """Set up test fixtures."""
+ self.temp_dir = tempfile.mkdtemp(prefix="rest_token_file_io_test_")
+ self.warehouse_path = f"file://{self.temp_dir}"
+ self.identifier = Identifier.from_string("default.test_table")
+ self.catalog_options = {}
+
+ def tearDown(self):
+ """Clean up test fixtures."""
+ import shutil
+ if os.path.exists(self.temp_dir):
+ shutil.rmtree(self.temp_dir)
+
+ def test_new_output_stream_path_conversion_and_parent_creation(self):
+ """Test new_output_stream correctly handles URI paths and creates
parent directories."""
+ with patch.object(RESTTokenFileIO, 'try_to_refresh_token'):
+ file_io = RESTTokenFileIO(
+ self.identifier,
+ self.warehouse_path,
+ self.catalog_options
+ )
+
+ # Test with file:// URI path - should convert and create parent
directory
+ test_file_path = f"file://{self.temp_dir}/subdir/test.txt"
+ test_content = b"test content"
+ expected_path = f"{self.temp_dir}/subdir/test.txt"
+
+ with file_io.new_output_stream(test_file_path) as stream:
+ stream.write(test_content)
+
+ self.assertTrue(os.path.exists(expected_path),
+ f"File should be created at {expected_path}")
+ with open(expected_path, 'rb') as f:
+ self.assertEqual(f.read(), test_content)
+
+ # Test nested path - should create multiple parent directories
+ nested_path =
f"file://{self.temp_dir}/level1/level2/level3/nested.txt"
+ parent_dir = f"{self.temp_dir}/level1/level2/level3"
+ self.assertFalse(os.path.exists(parent_dir))
+
+ with file_io.new_output_stream(nested_path) as stream:
+ stream.write(b"nested content")
+
+ self.assertTrue(os.path.exists(parent_dir),
+ f"Parent directory should be created at
{parent_dir}")
+ self.assertTrue(os.path.exists(f"{parent_dir}/nested.txt"))
+
+ # Test relative path
+ original_cwd = os.getcwd()
+ try:
+ os.chdir(self.temp_dir)
+ relative_path = "relative_test.txt"
+ with file_io.new_output_stream(relative_path) as stream:
+ stream.write(b"relative content")
+ expected_relative = os.path.join(self.temp_dir, relative_path)
+ self.assertTrue(os.path.exists(expected_relative))
+ finally:
+ os.chdir(original_cwd)
+
+ def test_new_output_stream_behavior_matches_parent(self):
+ """Test that RESTTokenFileIO.new_output_stream behaves like
FileIO.new_output_stream."""
+ with patch.object(RESTTokenFileIO, 'try_to_refresh_token'):
+ rest_file_io = RESTTokenFileIO(
+ self.identifier,
+ self.warehouse_path,
+ self.catalog_options
+ )
+ regular_file_io = FileIO(self.warehouse_path, self.catalog_options)
+
+ test_file_path = f"file://{self.temp_dir}/comparison/test.txt"
+ test_content = b"comparison content"
+
+ with rest_file_io.new_output_stream(test_file_path) as stream:
+ stream.write(test_content)
+
+ expected_path = f"{self.temp_dir}/comparison/test.txt"
+ self.assertTrue(os.path.exists(expected_path))
+
+ with regular_file_io.new_input_stream(test_file_path) as stream:
+ read_content = stream.read()
+ self.assertEqual(read_content, test_content)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/paimon-python/pypaimon/tests/uri_reader_factory_test.py
b/paimon-python/pypaimon/tests/uri_reader_factory_test.py
index 12088d746c..c111a4612e 100644
--- a/paimon-python/pypaimon/tests/uri_reader_factory_test.py
+++ b/paimon-python/pypaimon/tests/uri_reader_factory_test.py
@@ -18,7 +18,6 @@ limitations under the License.
import os
import tempfile
import unittest
-from pathlib import Path
from pypaimon.common.file_io import FileIO
from pypaimon.common.uri_reader import UriReaderFactory, HttpUriReader,
FileUriReader, UriReader
@@ -31,10 +30,12 @@ class MockFileIO:
def get_file_size(self, path: str) -> int:
"""Get file size."""
- return self._file_io.get_file_size(Path(path))
+ return self._file_io.get_file_size(path)
- def new_input_stream(self, path: Path):
+ def new_input_stream(self, path):
"""Create new input stream for reading."""
+ if not isinstance(path, (str, type(None))):
+ path = str(path)
return self._file_io.new_input_stream(path)
diff --git a/paimon-python/pypaimon/write/file_store_commit.py
b/paimon-python/pypaimon/write/file_store_commit.py
index 224e4a82c9..ba4cee9e90 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -166,9 +166,10 @@ class FileStoreCommit:
if not all(count == 0 for count in partition_null_counts):
raise RuntimeError("Partition value should not be null")
+ manifest_file_path =
f"{self.manifest_file_manager.manifest_path}/{new_manifest_file}"
new_manifest_list = ManifestFileMeta(
file_name=new_manifest_file,
-
file_size=self.table.file_io.get_file_size(self.manifest_file_manager.manifest_path
/ new_manifest_file),
+ file_size=self.table.file_io.get_file_size(manifest_file_path),
num_added_files=added_file_count,
num_deleted_files=deleted_file_count,
partition_stats=SimpleStats(
diff --git a/paimon-python/pypaimon/write/writer/blob_writer.py
b/paimon-python/pypaimon/write/writer/blob_writer.py
index 09bce43a3f..f22577deac 100644
--- a/paimon-python/pypaimon/write/writer/blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/blob_writer.py
@@ -19,7 +19,6 @@
import logging
import uuid
import pyarrow as pa
-from pathlib import Path
from typing import Optional, Tuple
from pypaimon.common.core_options import CoreOptions
@@ -46,7 +45,7 @@ class BlobWriter(AppendOnlyDataWriter):
self.blob_target_file_size =
CoreOptions.get_blob_target_file_size(options)
self.current_writer: Optional[BlobFileWriter] = None
- self.current_file_path: Optional[Path] = None
+ self.current_file_path: Optional[str] = None
self.record_count = 0
self.file_uuid = str(uuid.uuid4())
@@ -115,7 +114,7 @@ class BlobWriter(AppendOnlyDataWriter):
return
file_size = self.current_writer.close()
- file_name = self.current_file_path.name
+ file_name = self.current_file_path.split('/')[-1]
row_count = self.current_writer.row_count
self._add_file_metadata(file_name, self.current_file_path, row_count,
file_size)
@@ -147,7 +146,7 @@ class BlobWriter(AppendOnlyDataWriter):
# Reuse _add_file_metadata for consistency (blob table is append-only,
no primary keys)
self._add_file_metadata(file_name, file_path, data, file_size)
- def _add_file_metadata(self, file_name: str, file_path: Path,
data_or_row_count, file_size: int):
+ def _add_file_metadata(self, file_name: str, file_path: str,
data_or_row_count, file_size: int):
"""Add file metadata to committed_files."""
from datetime import datetime
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
@@ -204,7 +203,7 @@ class BlobWriter(AppendOnlyDataWriter):
external_path=None,
first_row_id=None,
write_cols=self.write_cols,
- file_path=str(file_path),
+ file_path=file_path,
))
def prepare_commit(self):
diff --git a/paimon-python/pypaimon/write/writer/data_blob_writer.py
b/paimon-python/pypaimon/write/writer/data_blob_writer.py
index 9d2e0982a4..b3bcc9219c 100644
--- a/paimon-python/pypaimon/write/writer/data_blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_blob_writer.py
@@ -19,7 +19,6 @@
import logging
import uuid
from datetime import datetime
-from pathlib import Path
from typing import List, Optional, Tuple
import pyarrow as pa
@@ -262,7 +261,7 @@ class DataBlobWriter(DataWriter):
# Generate metadata
return self._create_data_file_meta(file_name, file_path, data)
- def _create_data_file_meta(self, file_name: str, file_path: Path, data:
pa.Table) -> DataFileMeta:
+ def _create_data_file_meta(self, file_name: str, file_path: str, data:
pa.Table) -> DataFileMeta:
# Column stats (only for normal columns)
column_stats = {
field.name: self._get_column_stats(data, field.name)
@@ -303,7 +302,7 @@ class DataBlobWriter(DataWriter):
delete_row_count=0,
file_source=0,
value_stats_cols=self.normal_column_names,
- file_path=str(file_path),
+ file_path=file_path,
write_cols=self.write_cols)
def _validate_consistency(self, normal_meta: DataFileMeta, blob_metas:
List[DataFileMeta]):
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py
b/paimon-python/pypaimon/write/writer/data_writer.py
index 9cbb686441..fdc74d6f64 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -20,7 +20,6 @@ import pyarrow.compute as pc
import uuid
from abc import ABC, abstractmethod
from datetime import datetime
-from pathlib import Path
from typing import Dict, List, Optional, Tuple
from pypaimon.common.core_options import CoreOptions
@@ -216,19 +215,21 @@ class DataWriter(ABC):
first_row_id=None,
write_cols=self.write_cols,
# None means all columns in the table have been written
- file_path=str(file_path),
+ file_path=file_path,
))
- def _generate_file_path(self, file_name: str) -> Path:
- path_builder = self.table.table_path
+ def _generate_file_path(self, file_name: str) -> str:
+ path_builder = str(self.table.table_path)
for i, field_name in enumerate(self.table.partition_keys):
- path_builder = path_builder / (field_name + "=" +
str(self.partition[i]))
+ path_builder =
f"{path_builder.rstrip('/')}/{field_name}={str(self.partition[i])}"
+
if self.bucket == BucketMode.POSTPONE_BUCKET.value:
bucket_name = "postpone"
else:
bucket_name = str(self.bucket)
- path_builder = path_builder / ("bucket-" + bucket_name) / file_name
+
+ path_builder =
f"{path_builder.rstrip('/')}/bucket-{bucket_name}/{file_name}"
return path_builder