This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d04ca574e7 [python] use str to replace pathlib.Path to avoid scheme 
missing in pypaimon  (#6604)
d04ca574e7 is described below

commit d04ca574e7016cb5098d535b78919232e28b30ee
Author: XiaoHongbo <[email protected]>
AuthorDate: Thu Nov 20 15:19:10 2025 +0800

    [python] use str to replace pathlib.Path to avoid scheme missing in 
pypaimon  (#6604)
---
 .../org/apache/paimon/flink/BlobTableITCase.java   |   3 +
 .../pypaimon/catalog/filesystem_catalog.py         |  19 +--
 .../pypaimon/catalog/rest/rest_catalog.py          |  20 +--
 .../pypaimon/catalog/rest/rest_token_file_io.py    |   6 +-
 paimon-python/pypaimon/common/file_io.py           | 147 ++++++++++++++-------
 paimon-python/pypaimon/common/uri_reader.py        |  11 +-
 .../pypaimon/manifest/manifest_file_manager.py     |   7 +-
 .../pypaimon/manifest/manifest_list_manager.py     |   7 +-
 .../pypaimon/manifest/schema/data_file_meta.py     |  11 +-
 .../pypaimon/read/reader/format_avro_reader.py     |   3 +-
 .../pypaimon/read/reader/format_blob_reader.py     |   5 +-
 .../pypaimon/read/reader/format_pyarrow_reader.py  |   3 +-
 .../pypaimon/sample/oss_blob_as_descriptor.py      | 111 ++++++++++++++++
 .../sample/rest_catalog_read_write_sample.py       | 108 +++++++++++++++
 paimon-python/pypaimon/schema/schema_manager.py    |  11 +-
 .../pypaimon/snapshot/snapshot_manager.py          |  17 +--
 paimon-python/pypaimon/table/file_store_table.py   |   3 +-
 paimon-python/pypaimon/tests/blob_table_test.py    |   7 +
 paimon-python/pypaimon/tests/blob_test.py          |  76 +++++++----
 paimon-python/pypaimon/tests/file_io_test.py       | 127 ++++++++++++++++++
 .../pypaimon/tests/file_store_commit_test.py       |   3 +-
 .../pypaimon/tests/rest/rest_token_file_io_test.py | 115 ++++++++++++++++
 .../pypaimon/tests/uri_reader_factory_test.py      |   7 +-
 paimon-python/pypaimon/write/file_store_commit.py  |   3 +-
 paimon-python/pypaimon/write/writer/blob_writer.py |   9 +-
 .../pypaimon/write/writer/data_blob_writer.py      |   5 +-
 paimon-python/pypaimon/write/writer/data_writer.py |  13 +-
 27 files changed, 693 insertions(+), 164 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
index 40491f180c..73c11c8f3b 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
@@ -31,6 +31,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.OutputStream;
+import java.net.URI;
 import java.nio.file.Path;
 import java.util.Arrays;
 import java.util.List;
@@ -90,6 +91,8 @@ public class BlobTableITCase extends CatalogITCaseBase {
                 Blob.fromDescriptor(
                         uriReaderFactory.create(newBlobDescriptor.uri()), 
blobDescriptor);
         assertThat(blob.toData()).isEqualTo(blobData);
+        URI blobUri = URI.create(blob.toDescriptor().uri());
+        assertThat(blobUri.getScheme()).isNotNull();
         batchSql("ALTER TABLE blob_table_descriptor SET 
('blob-as-descriptor'='false')");
         assertThat(batchSql("SELECT * FROM blob_table_descriptor"))
                 .containsExactlyInAnyOrder(Row.of(1, "paimon", blobData));
diff --git a/paimon-python/pypaimon/catalog/filesystem_catalog.py 
b/paimon-python/pypaimon/catalog/filesystem_catalog.py
index 8de2850e4a..63128f185d 100644
--- a/paimon-python/pypaimon/catalog/filesystem_catalog.py
+++ b/paimon-python/pypaimon/catalog/filesystem_catalog.py
@@ -16,9 +16,7 @@
 # limitations under the License.
 
#################################################################################
 
-from pathlib import Path
 from typing import List, Optional, Union
-from urllib.parse import urlparse
 
 from pypaimon.catalog.catalog import Catalog
 from pypaimon.catalog.catalog_environment import CatalogEnvironment
@@ -108,18 +106,13 @@ class FileSystemCatalog(Catalog):
             raise TableNotExistException(identifier)
         return table_schema
 
-    def get_database_path(self, name) -> Path:
-        return self._trim_schema(self.warehouse) / f"{name}{Catalog.DB_SUFFIX}"
+    def get_database_path(self, name) -> str:
+        warehouse = self.warehouse.rstrip('/')
+        return f"{warehouse}/{name}{Catalog.DB_SUFFIX}"
 
-    def get_table_path(self, identifier: Identifier) -> Path:
-        return self.get_database_path(identifier.get_database_name()) / 
identifier.get_table_name()
-
-    @staticmethod
-    def _trim_schema(warehouse_url: str) -> Path:
-        parsed = urlparse(warehouse_url)
-        bucket = parsed.netloc
-        warehouse_dir = parsed.path.lstrip('/')
-        return Path(f"{bucket}/{warehouse_dir}" if warehouse_dir else bucket)
+    def get_table_path(self, identifier: Identifier) -> str:
+        db_path = self.get_database_path(identifier.get_database_name())
+        return f"{db_path}/{identifier.get_table_name()}"
 
     def commit_snapshot(
             self,
diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py 
b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
index 53db2abbaa..8bbe3cab8d 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
@@ -15,12 +15,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 
or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 """
-from pathlib import Path
 from typing import Any, Callable, Dict, List, Optional, Union
-from urllib.parse import urlparse
-
-import pyarrow
-from packaging.version import parse
 
 from pypaimon.api.api_response import GetTableResponse, PagedList
 from pypaimon.api.options import Options
@@ -225,22 +220,17 @@ class RESTCatalog(Catalog):
             catalog_loader=self.catalog_loader(),
             supports_version_management=True  # REST catalogs support version 
management
         )
-        path_parsed = urlparse(schema.options.get(CoreOptions.PATH))
-        path = path_parsed.path if path_parsed.scheme is None else 
schema.options.get(CoreOptions.PATH)
-        if path_parsed.scheme == "file":
-            table_path = path_parsed.path
-        else:
-            table_path = path_parsed.netloc + path_parsed.path \
-                if parse(pyarrow.__version__) >= parse("7.0.0") else 
path_parsed.path[1:]
-        table = self.create(data_file_io(path),
-                            Path(table_path),
+        # Use the path from server response directly (do not trim scheme)
+        table_path = schema.options.get(CoreOptions.PATH)
+        table = self.create(data_file_io(table_path),
+                            table_path,
                             schema,
                             catalog_env)
         return table
 
     @staticmethod
     def create(file_io: FileIO,
-               table_path: Path,
+               table_path: str,
                table_schema: TableSchema,
                catalog_environment: CatalogEnvironment
                ) -> FileStoreTable:
diff --git a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py 
b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
index a65e96695c..8feb617c49 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
@@ -18,7 +18,6 @@ limitations under the License.
 import logging
 import threading
 import time
-from pathlib import Path
 from typing import Optional
 
 from pyarrow._fs import FileSystem
@@ -46,8 +45,9 @@ class RESTTokenFileIO(FileIO):
         self.properties.update(self.token.token)
         return super()._initialize_oss_fs(path)
 
-    def new_output_stream(self, path: Path):
-        return self.filesystem.open_output_stream(str(path))
+    def new_output_stream(self, path: str):
+        # Call parent class method to ensure path conversion and parent 
directory creation
+        return super().new_output_stream(path)
 
     def try_to_refresh_token(self):
         if self.should_refresh():
diff --git a/paimon-python/pypaimon/common/file_io.py 
b/paimon-python/pypaimon/common/file_io.py
index f881ba77bc..15e1d8d8d9 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -150,71 +150,81 @@ class FileIO:
 
         return LocalFileSystem()
 
-    def new_input_stream(self, path: Path):
-        return self.filesystem.open_input_file(str(path))
+    def new_input_stream(self, path: str):
+        path_str = self.to_filesystem_path(path)
+        return self.filesystem.open_input_file(path_str)
 
-    def new_output_stream(self, path: Path):
-        parent_dir = path.parent
-        if str(parent_dir) and not self.exists(parent_dir):
-            self.mkdirs(parent_dir)
+    def new_output_stream(self, path: str):
+        path_str = self.to_filesystem_path(path)
+        parent_dir = Path(path_str).parent
+        if str(parent_dir) and not self.exists(str(parent_dir)):
+            self.mkdirs(str(parent_dir))
 
-        return self.filesystem.open_output_stream(str(path))
+        return self.filesystem.open_output_stream(path_str)
 
-    def get_file_status(self, path: Path):
-        file_infos = self.filesystem.get_file_info([str(path)])
+    def get_file_status(self, path: str):
+        path_str = self.to_filesystem_path(path)
+        file_infos = self.filesystem.get_file_info([path_str])
         return file_infos[0]
 
-    def list_status(self, path: Path):
-        selector = pyarrow.fs.FileSelector(str(path), recursive=False, 
allow_not_found=True)
+    def list_status(self, path: str):
+        path_str = self.to_filesystem_path(path)
+        selector = pyarrow.fs.FileSelector(path_str, recursive=False, 
allow_not_found=True)
         return self.filesystem.get_file_info(selector)
 
-    def list_directories(self, path: Path):
+    def list_directories(self, path: str):
         file_infos = self.list_status(path)
         return [info for info in file_infos if info.type == 
pyarrow.fs.FileType.Directory]
 
-    def exists(self, path: Path) -> bool:
+    def exists(self, path: str) -> bool:
         try:
-            file_info = self.filesystem.get_file_info([str(path)])[0]
-            return file_info.type != pyarrow.fs.FileType.NotFound
+            path_str = self.to_filesystem_path(path)
+            file_info = self.filesystem.get_file_info([path_str])[0]
+            result = file_info.type != pyarrow.fs.FileType.NotFound
+            return result
         except Exception:
             return False
 
-    def delete(self, path: Path, recursive: bool = False) -> bool:
+    def delete(self, path: str, recursive: bool = False) -> bool:
         try:
-            file_info = self.filesystem.get_file_info([str(path)])[0]
+            path_str = self.to_filesystem_path(path)
+            file_info = self.filesystem.get_file_info([path_str])[0]
             if file_info.type == pyarrow.fs.FileType.Directory:
                 if recursive:
-                    self.filesystem.delete_dir_contents(str(path))
+                    self.filesystem.delete_dir_contents(path_str)
                 else:
-                    self.filesystem.delete_dir(str(path))
+                    self.filesystem.delete_dir(path_str)
             else:
-                self.filesystem.delete_file(str(path))
+                self.filesystem.delete_file(path_str)
             return True
         except Exception as e:
             self.logger.warning(f"Failed to delete {path}: {e}")
             return False
 
-    def mkdirs(self, path: Path) -> bool:
+    def mkdirs(self, path: str) -> bool:
         try:
-            self.filesystem.create_dir(str(path), recursive=True)
+            path_str = self.to_filesystem_path(path)
+            self.filesystem.create_dir(path_str, recursive=True)
             return True
         except Exception as e:
             self.logger.warning(f"Failed to create directory {path}: {e}")
             return False
 
-    def rename(self, src: Path, dst: Path) -> bool:
+    def rename(self, src: str, dst: str) -> bool:
         try:
-            dst_parent = dst.parent
-            if str(dst_parent) and not self.exists(dst_parent):
-                self.mkdirs(dst_parent)
+            dst_str = self.to_filesystem_path(dst)
+            dst_parent = Path(dst_str).parent
+            if str(dst_parent) and not self.exists(str(dst_parent)):
+                self.mkdirs(str(dst_parent))
 
-            self.filesystem.move(str(src), str(dst))
+            src_str = self.to_filesystem_path(src)
+            self.filesystem.move(src_str, dst_str)
             return True
         except Exception as e:
             self.logger.warning(f"Failed to rename {src} to {dst}: {e}")
             return False
 
-    def delete_quietly(self, path: Path):
+    def delete_quietly(self, path: str):
         if self.logger.isEnabledFor(logging.DEBUG):
             self.logger.debug(f"Ready to delete {path}")
 
@@ -224,11 +234,11 @@ class FileIO:
         except Exception:
             self.logger.warning(f"Exception occurs when deleting file {path}", 
exc_info=True)
 
-    def delete_files_quietly(self, files: List[Path]):
+    def delete_files_quietly(self, files: List[str]):
         for file_path in files:
             self.delete_quietly(file_path)
 
-    def delete_directory_quietly(self, directory: Path):
+    def delete_directory_quietly(self, directory: str):
         if self.logger.isEnabledFor(logging.DEBUG):
             self.logger.debug(f"Ready to delete {directory}")
 
@@ -238,29 +248,29 @@ class FileIO:
         except Exception:
             self.logger.warning(f"Exception occurs when deleting directory 
{directory}", exc_info=True)
 
-    def get_file_size(self, path: Path) -> int:
+    def get_file_size(self, path: str) -> int:
         file_info = self.get_file_status(path)
         if file_info.size is None:
             raise ValueError(f"File size not available for {path}")
         return file_info.size
 
-    def is_dir(self, path: Path) -> bool:
+    def is_dir(self, path: str) -> bool:
         file_info = self.get_file_status(path)
         return file_info.type == pyarrow.fs.FileType.Directory
 
-    def check_or_mkdirs(self, path: Path):
+    def check_or_mkdirs(self, path: str):
         if self.exists(path):
             if not self.is_dir(path):
                 raise ValueError(f"The path '{path}' should be a directory.")
         else:
             self.mkdirs(path)
 
-    def read_file_utf8(self, path: Path) -> str:
+    def read_file_utf8(self, path: str) -> str:
         with self.new_input_stream(path) as input_stream:
             return input_stream.read().decode('utf-8')
 
-    def try_to_write_atomic(self, path: Path, content: str) -> bool:
-        temp_path = path.with_suffix(path.suffix + ".tmp") if path.suffix else 
Path(str(path) + ".tmp")
+    def try_to_write_atomic(self, path: str, content: str) -> bool:
+        temp_path = path + ".tmp"
         success = False
         try:
             self.write_file(temp_path, content, False)
@@ -270,29 +280,32 @@ class FileIO:
                 self.delete_quietly(temp_path)
             return success
 
-    def write_file(self, path: Path, content: str, overwrite: bool = False):
+    def write_file(self, path: str, content: str, overwrite: bool = False):
         with self.new_output_stream(path) as output_stream:
             output_stream.write(content.encode('utf-8'))
 
-    def overwrite_file_utf8(self, path: Path, content: str):
+    def overwrite_file_utf8(self, path: str, content: str):
         with self.new_output_stream(path) as output_stream:
             output_stream.write(content.encode('utf-8'))
 
-    def copy_file(self, source_path: Path, target_path: Path, overwrite: bool 
= False):
+    def copy_file(self, source_path: str, target_path: str, overwrite: bool = 
False):
         if not overwrite and self.exists(target_path):
             raise FileExistsError(f"Target file {target_path} already exists 
and overwrite=False")
 
-        self.filesystem.copy_file(str(source_path), str(target_path))
+        source_str = self.to_filesystem_path(source_path)
+        target_str = self.to_filesystem_path(target_path)
+        self.filesystem.copy_file(source_str, target_str)
 
-    def copy_files(self, source_directory: Path, target_directory: Path, 
overwrite: bool = False):
+    def copy_files(self, source_directory: str, target_directory: str, 
overwrite: bool = False):
         file_infos = self.list_status(source_directory)
         for file_info in file_infos:
             if file_info.type == pyarrow.fs.FileType.File:
-                source_file = Path(file_info.path)
-                target_file = target_directory / source_file.name
+                source_file = file_info.path
+                file_name = source_file.split('/')[-1]
+                target_file = f"{target_directory.rstrip('/')}/{file_name}" if 
target_directory else file_name
                 self.copy_file(source_file, target_file, overwrite)
 
-    def read_overwritten_file_utf8(self, path: Path) -> Optional[str]:
+    def read_overwritten_file_utf8(self, path: str) -> Optional[str]:
         retry_number = 0
         exception = None
         while retry_number < 5:
@@ -319,7 +332,7 @@ class FileIO:
 
         return None
 
-    def write_parquet(self, path: Path, data: pyarrow.Table, compression: str 
= 'snappy', **kwargs):
+    def write_parquet(self, path: str, data: pyarrow.Table, compression: str = 
'snappy', **kwargs):
         try:
             import pyarrow.parquet as pq
 
@@ -330,7 +343,7 @@ class FileIO:
             self.delete_quietly(path)
             raise RuntimeError(f"Failed to write Parquet file {path}: {e}") 
from e
 
-    def write_orc(self, path: Path, data: pyarrow.Table, compression: str = 
'zstd', **kwargs):
+    def write_orc(self, path: str, data: pyarrow.Table, compression: str = 
'zstd', **kwargs):
         try:
             """Write ORC file using PyArrow ORC writer."""
             import sys
@@ -352,7 +365,7 @@ class FileIO:
             self.delete_quietly(path)
             raise RuntimeError(f"Failed to write ORC file {path}: {e}") from e
 
-    def write_avro(self, path: Path, data: pyarrow.Table, avro_schema: 
Optional[Dict[str, Any]] = None, **kwargs):
+    def write_avro(self, path: str, data: pyarrow.Table, avro_schema: 
Optional[Dict[str, Any]] = None, **kwargs):
         import fastavro
         if avro_schema is None:
             from pypaimon.schema.data_types import PyarrowFieldParser
@@ -370,7 +383,7 @@ class FileIO:
         with self.new_output_stream(path) as output_stream:
             fastavro.writer(output_stream, avro_schema, records, **kwargs)
 
-    def write_blob(self, path: Path, data: pyarrow.Table, blob_as_descriptor: 
bool, **kwargs):
+    def write_blob(self, path: str, data: pyarrow.Table, blob_as_descriptor: 
bool, **kwargs):
         try:
             # Validate input constraints
             if data.num_columns != 1:
@@ -423,3 +436,41 @@ class FileIO:
         except Exception as e:
             self.delete_quietly(path)
             raise RuntimeError(f"Failed to write blob file {path}: {e}") from e
+
+    def to_filesystem_path(self, path: str) -> str:
+        from pyarrow.fs import S3FileSystem
+        import re
+
+        parsed = urlparse(path)
+        normalized_path = re.sub(r'/+', '/', parsed.path) if parsed.path else 
''
+
+        if parsed.scheme and len(parsed.scheme) == 1 and not parsed.netloc:
+            # This is likely a Windows drive letter, not a URI scheme
+            return str(path)
+
+        if parsed.scheme == 'file' and parsed.netloc and 
parsed.netloc.endswith(':'):
+            # file://C:/path format - netloc is 'C:', need to reconstruct path 
with drive letter
+            drive_letter = parsed.netloc.rstrip(':')
+            path_part = normalized_path.lstrip('/')
+            return f"{drive_letter}:/{path_part}" if path_part else 
f"{drive_letter}:"
+
+        if isinstance(self.filesystem, S3FileSystem):
+            # For S3, return "bucket/path" format
+            if parsed.scheme:
+                if parsed.netloc:
+                    # Has netloc (bucket): return "bucket/path" format
+                    path_part = normalized_path.lstrip('/')
+                    return f"{parsed.netloc}/{path_part}" if path_part else 
parsed.netloc
+                else:
+                    # Has scheme but no netloc: return path without scheme
+                    result = normalized_path.lstrip('/')
+                    return result if result else '.'
+            return str(path)
+
+        if parsed.scheme:
+            # Handle empty path: return '.' for current directory
+            if not normalized_path:
+                return '.'
+            return normalized_path
+
+        return str(path)
diff --git a/paimon-python/pypaimon/common/uri_reader.py 
b/paimon-python/pypaimon/common/uri_reader.py
index 823020caaf..e55b924d47 100644
--- a/paimon-python/pypaimon/common/uri_reader.py
+++ b/paimon-python/pypaimon/common/uri_reader.py
@@ -18,7 +18,6 @@
 
 import io
 from abc import ABC, abstractmethod
-from pathlib import Path
 from typing import Any, Optional
 from urllib.parse import urlparse, ParseResult
 
@@ -42,12 +41,11 @@ class UriReader(ABC):
     def get_file_path(cls, uri: str):
         parsed_uri = urlparse(uri)
         if parsed_uri.scheme == 'file':
-            path = Path(parsed_uri.path)
+            return parsed_uri.path
         elif parsed_uri.scheme and parsed_uri.scheme != '':
-            path = Path(parsed_uri.netloc + parsed_uri.path)
+            return f"{parsed_uri.netloc}{parsed_uri.path}"
         else:
-            path = Path(uri)
-        return path
+            return uri
 
     @abstractmethod
     def new_input_stream(self, uri: str):
@@ -61,8 +59,7 @@ class FileUriReader(UriReader):
 
     def new_input_stream(self, uri: str):
         try:
-            path = self.get_file_path(uri)
-            return self._file_io.new_input_stream(path)
+            return self._file_io.new_input_stream(uri)
         except Exception as e:
             raise IOError(f"Failed to read file {uri}: {e}")
 
diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py 
b/paimon-python/pypaimon/manifest/manifest_file_manager.py
index 567bba1cbd..01c7094f8f 100644
--- a/paimon-python/pypaimon/manifest/manifest_file_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py
@@ -38,7 +38,8 @@ class ManifestFileManager:
         from pypaimon.table.file_store_table import FileStoreTable
 
         self.table: FileStoreTable = table
-        self.manifest_path = table.table_path / "manifest"
+        manifest_path = table.table_path.rstrip('/')
+        self.manifest_path = f"{manifest_path}/manifest"
         self.file_io = table.file_io
         self.partition_keys_fields = self.table.partition_keys_fields
         self.primary_keys_fields = self.table.primary_keys_fields
@@ -69,7 +70,7 @@ class ManifestFileManager:
         return final_entries
 
     def read(self, manifest_file_name: str, manifest_entry_filter=None, 
drop_stats=True) -> List[ManifestEntry]:
-        manifest_file_path = self.manifest_path / manifest_file_name
+        manifest_file_path = f"{self.manifest_path}/{manifest_file_name}"
 
         entries = []
         with self.file_io.new_input_stream(manifest_file_path) as input_stream:
@@ -188,7 +189,7 @@ class ManifestFileManager:
             }
             avro_records.append(avro_record)
 
-        manifest_path = self.manifest_path / file_name
+        manifest_path = f"{self.manifest_path}/{file_name}"
         try:
             buffer = BytesIO()
             fastavro.writer(buffer, MANIFEST_ENTRY_SCHEMA, avro_records)
diff --git a/paimon-python/pypaimon/manifest/manifest_list_manager.py 
b/paimon-python/pypaimon/manifest/manifest_list_manager.py
index 367f802de5..a1897fbee7 100644
--- a/paimon-python/pypaimon/manifest/manifest_list_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_list_manager.py
@@ -36,7 +36,8 @@ class ManifestListManager:
         from pypaimon.table.file_store_table import FileStoreTable
 
         self.table: FileStoreTable = table
-        self.manifest_path = self.table.table_path / "manifest"
+        manifest_path = table.table_path.rstrip('/')
+        self.manifest_path = f"{manifest_path}/manifest"
         self.file_io = self.table.file_io
 
     def read_all(self, snapshot: Snapshot) -> List[ManifestFileMeta]:
@@ -53,7 +54,7 @@ class ManifestListManager:
     def read(self, manifest_list_name: str) -> List[ManifestFileMeta]:
         manifest_files = []
 
-        manifest_list_path = self.manifest_path / manifest_list_name
+        manifest_list_path = f"{self.manifest_path}/{manifest_list_name}"
         with self.file_io.new_input_stream(manifest_list_path) as input_stream:
             avro_bytes = input_stream.read()
         buffer = BytesIO(avro_bytes)
@@ -101,7 +102,7 @@ class ManifestListManager:
             }
             avro_records.append(avro_record)
 
-        list_path = self.manifest_path / file_name
+        list_path = f"{self.manifest_path}/{file_name}"
         try:
             buffer = BytesIO()
             fastavro.writer(buffer, MANIFEST_FILE_META_SCHEMA, avro_records)
diff --git a/paimon-python/pypaimon/manifest/schema/data_file_meta.py 
b/paimon-python/pypaimon/manifest/schema/data_file_meta.py
index 405c2e3483..d7942d1e9f 100644
--- a/paimon-python/pypaimon/manifest/schema/data_file_meta.py
+++ b/paimon-python/pypaimon/manifest/schema/data_file_meta.py
@@ -18,7 +18,6 @@
 
 from dataclasses import dataclass
 from datetime import datetime
-from pathlib import Path
 from typing import List, Optional
 
 from pypaimon.manifest.schema.simple_stats import (KEY_STATS_SCHEMA, 
VALUE_STATS_SCHEMA,
@@ -53,13 +52,13 @@ class DataFileMeta:
     # not a schema field, just for internal usage
     file_path: str = None
 
-    def set_file_path(self, table_path: Path, partition: GenericRow, bucket: 
int):
-        path_builder = table_path
+    def set_file_path(self, table_path: str, partition: GenericRow, bucket: 
int):
+        path_builder = table_path.rstrip('/')
         partition_dict = partition.to_dict()
         for field_name, field_value in partition_dict.items():
-            path_builder = path_builder / (field_name + "=" + str(field_value))
-        path_builder = path_builder / ("bucket-" + str(bucket)) / 
self.file_name
-        self.file_path = str(path_builder)
+            path_builder = f"{path_builder}/{field_name}={str(field_value)}"
+        path_builder = f"{path_builder}/bucket-{str(bucket)}/{self.file_name}"
+        self.file_path = path_builder
 
     def copy_without_stats(self) -> 'DataFileMeta':
         """Create a new DataFileMeta without value statistics."""
diff --git a/paimon-python/pypaimon/read/reader/format_avro_reader.py 
b/paimon-python/pypaimon/read/reader/format_avro_reader.py
index a9ca01a6e5..4114d8e93b 100644
--- a/paimon-python/pypaimon/read/reader/format_avro_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_avro_reader.py
@@ -36,7 +36,8 @@ class FormatAvroReader(RecordBatchReader):
 
     def __init__(self, file_io: FileIO, file_path: str, read_fields: 
List[str], full_fields: List[DataField],
                  push_down_predicate: Any, batch_size: int = 4096):
-        self._file = file_io.filesystem.open_input_file(file_path)
+        file_path_for_io = file_io.to_filesystem_path(file_path)
+        self._file = file_io.filesystem.open_input_file(file_path_for_io)
         self._avro_reader = fastavro.reader(self._file)
         self._batch_size = batch_size
         self._push_down_predicate = push_down_predicate
diff --git a/paimon-python/pypaimon/read/reader/format_blob_reader.py 
b/paimon-python/pypaimon/read/reader/format_blob_reader.py
index e6846dc568..b993dc9221 100644
--- a/paimon-python/pypaimon/read/reader/format_blob_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_blob_reader.py
@@ -16,7 +16,6 @@
 # limitations under the License.
 
################################################################################
 import struct
-from pathlib import Path
 from typing import List, Optional, Any, Iterator
 
 import pyarrow as pa
@@ -42,7 +41,7 @@ class FormatBlobReader(RecordBatchReader):
         self._blob_as_descriptor = blob_as_descriptor
 
         # Get file size
-        self._file_size = file_io.get_file_size(Path(file_path))
+        self._file_size = file_io.get_file_size(file_path)
 
         # Initialize the low-level blob format reader
         self.file_path = file_path
@@ -124,7 +123,7 @@ class FormatBlobReader(RecordBatchReader):
         self._blob_iterator = None
 
     def _read_index(self) -> None:
-        with self._file_io.new_input_stream(Path(self.file_path)) as f:
+        with self._file_io.new_input_stream(self.file_path) as f:
             # Seek to header: last 5 bytes
             f.seek(self._file_size - 5)
             header = f.read(5)
diff --git a/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py 
b/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
index d6c403b0f9..4d2f1f4c0f 100644
--- a/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
@@ -33,7 +33,8 @@ class FormatPyArrowReader(RecordBatchReader):
 
     def __init__(self, file_io: FileIO, file_format: str, file_path: str, 
read_fields: List[str],
                  push_down_predicate: Any, batch_size: int = 4096):
-        self.dataset = ds.dataset(file_path, format=file_format, 
filesystem=file_io.filesystem)
+        file_path_for_pyarrow = file_io.to_filesystem_path(file_path)
+        self.dataset = ds.dataset(file_path_for_pyarrow, format=file_format, 
filesystem=file_io.filesystem)
         self.reader = self.dataset.scanner(
             columns=read_fields,
             filter=push_down_predicate,
diff --git a/paimon-python/pypaimon/sample/oss_blob_as_descriptor.py 
b/paimon-python/pypaimon/sample/oss_blob_as_descriptor.py
new file mode 100644
index 0000000000..16696f298a
--- /dev/null
+++ b/paimon-python/pypaimon/sample/oss_blob_as_descriptor.py
@@ -0,0 +1,111 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+
+import logging
+import pyarrow as pa
+
+from pypaimon.catalog.catalog_factory import CatalogFactory
+
+# Enable debug logging for catalog operations
+logging.basicConfig(level=logging.DEBUG, format='%(levelname)s: %(name)s: 
%(message)s')
+from pypaimon.schema.schema import Schema
+from pypaimon.table.row.blob import BlobDescriptor, Blob
+
+
+def oss_blob_as_descriptor():
+    warehouse = 'oss://<your-bucket>/<warehouse-path>'
+    catalog = CatalogFactory.create({
+        'warehouse': warehouse,
+        'fs.oss.endpoint': 'oss-<your-region>.aliyuncs.com',
+        'fs.oss.accessKeyId': '<your-ak>',
+        'fs.oss.accessKeySecret': '<your-sk>',
+        'fs.oss.region': '<your-region>'
+    })
+
+    pa_schema = pa.schema([
+        ('id', pa.int32()),
+        ('blob_data', pa.large_binary()),
+    ])
+
+    schema = Schema.from_pyarrow_schema(
+        pa_schema,
+        options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+            'blob-as-descriptor': 'true',
+            'target-file-size': '100MB'
+        }
+    )
+
+    catalog.create_database("test_db", True)
+    catalog.create_table("test_db.blob_uri_scheme_test", schema, True)
+    table = catalog.get_table("test_db.blob_uri_scheme_test")
+
+    # Create external blob file in OSS
+    external_blob_uri = 
f"{warehouse.rstrip('/')}/external_blob_scheme_test.bin"
+    blob_content = b'This is external blob data'
+
+    with table.file_io.new_output_stream(external_blob_uri) as out_stream:
+        out_stream.write(blob_content)
+
+    # Create BlobDescriptor with OSS scheme
+    blob_descriptor = BlobDescriptor(external_blob_uri, 0, len(blob_content))
+    descriptor_bytes = blob_descriptor.serialize()
+
+    # Write the descriptor bytes to the table
+    test_data = pa.Table.from_pydict({
+        'id': [1],
+        'blob_data': [descriptor_bytes]
+    }, schema=pa_schema)
+
+    write_builder = table.new_batch_write_builder()
+    writer = write_builder.new_write()
+    writer.write_arrow(test_data)
+    commit_messages = writer.prepare_commit()
+    commit = write_builder.new_commit()
+    commit.commit(commit_messages)
+    writer.close()
+
+    read_builder = table.new_read_builder()
+    table_scan = read_builder.new_scan()
+    table_read = read_builder.new_read()
+    result = table_read.to_arrow(table_scan.plan().splits())
+
+    picture_bytes = result.column('blob_data').to_pylist()[0]
+    new_blob_descriptor = BlobDescriptor.deserialize(picture_bytes)
+
+    print(f"Original URI: {external_blob_uri}")
+    print(f"Read URI: {new_blob_descriptor.uri}")
+    assert new_blob_descriptor.uri.startswith('oss://'), \
+        f"URI scheme should be preserved. Got: {new_blob_descriptor.uri}"
+
+    from pypaimon.common.uri_reader import FileUriReader
+    uri_reader = FileUriReader(table.file_io)
+    blob = Blob.from_descriptor(uri_reader, new_blob_descriptor)
+
+    blob_descriptor_from_blob = blob.to_descriptor()
+    print(f"Blob descriptor URI from Blob.from_descriptor: 
{blob_descriptor_from_blob.uri}")
+
+    read_data = blob.to_data()
+    assert read_data == blob_content, "Blob data should match original content"
+
+    print("✅ All assertions passed!")
+
+
+if __name__ == '__main__':
+    oss_blob_as_descriptor()
diff --git a/paimon-python/pypaimon/sample/rest_catalog_read_write_sample.py 
b/paimon-python/pypaimon/sample/rest_catalog_read_write_sample.py
new file mode 100644
index 0000000000..543e63dd09
--- /dev/null
+++ b/paimon-python/pypaimon/sample/rest_catalog_read_write_sample.py
@@ -0,0 +1,108 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+"""
+Example: REST Catalog Read and Write
+
+Demonstrates:
+1. REST catalog basic read/write operations
+2. Table paths include URI schemes (file://, oss://, s3://)
+3. Paths with schemes can be used directly with FileIO
+"""
+
+import tempfile
+import uuid
+
+import pandas as pd
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+from pypaimon.tests.rest.rest_server import RESTCatalogServer
+from pypaimon.api.api_response import ConfigResponse
+from pypaimon.api.auth import BearTokenAuthProvider
+
+
+def main():
+    """REST catalog read/write example with path scheme demonstration."""
+
+    # Setup mock REST server
+    temp_dir = tempfile.mkdtemp()
+    token = str(uuid.uuid4())
+    server = RESTCatalogServer(
+        data_path=temp_dir,
+        auth_provider=BearTokenAuthProvider(token),
+        config=ConfigResponse(defaults={"prefix": "mock-test"}),
+        warehouse="warehouse"
+    )
+    server.start()
+    print(f"REST server started at: {server.get_url()}")
+
+    try:
+        # Note: warehouse must match server's warehouse parameter for config 
endpoint
+        catalog = CatalogFactory.create({
+            'metastore': 'rest',
+            'uri': f"http://localhost:{server.port}";,
+            'warehouse': "warehouse",  # Must match server's warehouse 
parameter
+            'token.provider': 'bear',
+            'token': token,
+        })
+        catalog.create_database("default", False)
+
+        schema = Schema.from_pyarrow_schema(pa.schema([
+            ('id', pa.int64()),
+            ('name', pa.string()),
+            ('value', pa.float64()),
+        ]))
+        table_name = 'default.example_table'
+        catalog.create_table(table_name, schema, False)
+        table = catalog.get_table(table_name)
+
+        table_path = table.table_path
+        print(f"\nTable path: {table_path}")
+
+        print("\nWriting data...")
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+
+        data = pd.DataFrame({
+            'id': [1, 2, 3],
+            'name': ['Alice', 'Bob', 'Charlie'],
+            'value': [10.5, 20.3, 30.7]
+        })
+        table_write.write_pandas(data)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+        print("Data written successfully!")
+
+        # Read data
+        print("\nReading data...")
+        read_builder = table.new_read_builder()
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        splits = table_scan.plan().splits()
+        result = table_read.to_pandas(splits)
+        print(result)
+
+    finally:
+        server.shutdown()
+        print("\nServer stopped")
+
+
+if __name__ == '__main__':
+    main()
diff --git a/paimon-python/pypaimon/schema/schema_manager.py 
b/paimon-python/pypaimon/schema/schema_manager.py
index 33c6d4af8b..86f0f00c35 100644
--- a/paimon-python/pypaimon/schema/schema_manager.py
+++ b/paimon-python/pypaimon/schema/schema_manager.py
@@ -15,7 +15,6 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-from pathlib import Path
 from typing import Optional, List
 
 from pypaimon.common.file_io import FileIO
@@ -26,11 +25,11 @@ from pypaimon.schema.table_schema import TableSchema
 
 class SchemaManager:
 
-    def __init__(self, file_io: FileIO, table_path: Path):
+    def __init__(self, file_io: FileIO, table_path: str):
         self.schema_prefix = "schema-"
         self.file_io = file_io
         self.table_path = table_path
-        self.schema_path = table_path / "schema"
+        self.schema_path = f"{table_path.rstrip('/')}/schema"
         self.schema_cache = {}
 
     def latest(self) -> Optional['TableSchema']:
@@ -65,8 +64,8 @@ class SchemaManager:
         except Exception as e:
             raise RuntimeError(f"Failed to commit schema: {e}") from e
 
-    def _to_schema_path(self, schema_id: int) -> Path:
-        return self.schema_path / f"{self.schema_prefix}{schema_id}"
+    def _to_schema_path(self, schema_id: int) -> str:
+        return 
f"{self.schema_path.rstrip('/')}/{self.schema_prefix}{schema_id}"
 
     def get_schema(self, schema_id: int) -> Optional[TableSchema]:
         if schema_id not in self.schema_cache:
@@ -87,7 +86,7 @@ class SchemaManager:
 
         versions = []
         for status in statuses:
-            name = Path(status.path).name
+            name = status.path.split('/')[-1]
             if name.startswith(self.schema_prefix):
                 try:
                     version = int(name[len(self.schema_prefix):])
diff --git a/paimon-python/pypaimon/snapshot/snapshot_manager.py 
b/paimon-python/pypaimon/snapshot/snapshot_manager.py
index 87b42c6f35..0d96563057 100644
--- a/paimon-python/pypaimon/snapshot/snapshot_manager.py
+++ b/paimon-python/pypaimon/snapshot/snapshot_manager.py
@@ -15,7 +15,6 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-from pathlib import Path
 from typing import Optional
 
 from pypaimon.common.file_io import FileIO
@@ -31,8 +30,9 @@ class SnapshotManager:
 
         self.table: FileStoreTable = table
         self.file_io: FileIO = self.table.file_io
-        self.snapshot_dir = self.table.table_path / "snapshot"
-        self.latest_file = self.snapshot_dir / "LATEST"
+        snapshot_path = self.table.table_path.rstrip('/')
+        self.snapshot_dir = f"{snapshot_path}/snapshot"
+        self.latest_file = f"{self.snapshot_dir}/LATEST"
 
     def get_latest_snapshot(self) -> Optional[Snapshot]:
         if not self.file_io.exists(self.latest_file):
@@ -41,14 +41,14 @@ class SnapshotManager:
         latest_content = self.file_io.read_file_utf8(self.latest_file)
         latest_snapshot_id = int(latest_content.strip())
 
-        snapshot_file = self.snapshot_dir / f"snapshot-{latest_snapshot_id}"
+        snapshot_file = f"{self.snapshot_dir}/snapshot-{latest_snapshot_id}"
         if not self.file_io.exists(snapshot_file):
             return None
 
         snapshot_content = self.file_io.read_file_utf8(snapshot_file)
         return JSON.from_json(snapshot_content, Snapshot)
 
-    def get_snapshot_path(self, snapshot_id: int) -> Path:
+    def get_snapshot_path(self, snapshot_id: int) -> str:
         """
         Get the path for a snapshot file.
 
@@ -58,11 +58,12 @@ class SnapshotManager:
         Returns:
             Path to the snapshot file
         """
-        return self.snapshot_dir / f"snapshot-{snapshot_id}"
+        return f"{self.snapshot_dir}/snapshot-{snapshot_id}"
 
     def try_get_earliest_snapshot(self) -> Optional[Snapshot]:
-        if self.file_io.exists(self.snapshot_dir / "EARLIEST"):
-            earliest_content = self.file_io.read_file_utf8(self.snapshot_dir / 
"EARLIEST")
+        earliest_file = f"{self.snapshot_dir}/EARLIEST"
+        if self.file_io.exists(earliest_file):
+            earliest_content = self.file_io.read_file_utf8(earliest_file)
             earliest_snapshot_id = int(earliest_content.strip())
             return self.get_snapshot_by_id(earliest_snapshot_id)
         else:
diff --git a/paimon-python/pypaimon/table/file_store_table.py 
b/paimon-python/pypaimon/table/file_store_table.py
index 6132c9bd69..b8e08b6f52 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -16,7 +16,6 @@
 # limitations under the License.
 
################################################################################
 
-from pathlib import Path
 from typing import Optional
 
 from pypaimon.catalog.catalog_environment import CatalogEnvironment
@@ -37,7 +36,7 @@ from pypaimon.write.row_key_extractor import 
(DynamicBucketRowKeyExtractor,
 
 
 class FileStoreTable(Table):
-    def __init__(self, file_io: FileIO, identifier: Identifier, table_path: 
Path,
+    def __init__(self, file_io: FileIO, identifier: Identifier, table_path: 
str,
                  table_schema: TableSchema, catalog_environment: 
Optional[CatalogEnvironment] = None):
         self.file_io = file_io
         self.identifier = identifier
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py 
b/paimon-python/pypaimon/tests/blob_table_test.py
index 3a2eee98fb..2b79cf5079 100644
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -1059,6 +1059,13 @@ class DataBlobWriterTest(unittest.TestCase):
         uri_reader = uri_reader_factory.create(new_blob_descriptor.uri)
         blob = Blob.from_descriptor(uri_reader, new_blob_descriptor)
 
+        blob_descriptor_from_blob = blob.to_descriptor()
+        self.assertEqual(
+            blob_descriptor_from_blob.uri,
+            new_blob_descriptor.uri,
+            f"URI should be preserved. Expected: {new_blob_descriptor.uri}, 
Got: {blob_descriptor_from_blob.uri}"
+        )
+
         # Verify the blob data matches the original
         self.assertEqual(blob.to_data(), blob_data, "Blob data should match 
original")
 
diff --git a/paimon-python/pypaimon/tests/blob_test.py 
b/paimon-python/pypaimon/tests/blob_test.py
index b66c1dec9a..f833ab1646 100644
--- a/paimon-python/pypaimon/tests/blob_test.py
+++ b/paimon-python/pypaimon/tests/blob_test.py
@@ -40,16 +40,31 @@ class MockFileIO:
 
     def get_file_size(self, path: str) -> int:
         """Get file size."""
-        return self._file_io.get_file_size(Path(path))
+        return self._file_io.get_file_size(path)
 
-    def new_input_stream(self, path: Path):
+    def new_input_stream(self, path):
         """Create new input stream for reading."""
+        if not isinstance(path, (str, type(None))):
+            path = str(path)
         return self._file_io.new_input_stream(path)
 
 
-class BlobTest(unittest.TestCase):
-    """Tests for Blob interface following org.apache.paimon.data.BlobTest."""
+def _to_url(path):
+    """Convert Path to file:// URI string."""
+    if isinstance(path, Path):
+        path_str = str(path)
+        is_absolute = os.path.isabs(path_str) or (len(path_str) >= 2 and 
path_str[1] == ':')
+        if is_absolute:
+            if path_str.startswith('/'):
+                return f"file://{path_str}"
+            else:
+                return f"file:///{path_str}"
+        else:
+            return f"file://{path_str}"
+    return str(path) if path else path
+
 
+class BlobTest(unittest.TestCase):
     def setUp(self):
         """Set up test environment with temporary file."""
         # Create a temporary directory and file
@@ -562,9 +577,11 @@ class BlobEndToEndTest(unittest.TestCase):
         blob_data = [test_data[blob_field_name].to_data()]
         schema = pa.schema([pa.field(blob_field_name, pa.large_binary())])
         table = pa.table([blob_data], schema=schema)
-        blob_files[blob_field_name] = Path(self.temp_dir) / (blob_field_name + 
".blob")
-        file_io.write_blob(blob_files[blob_field_name], table, False)
-        self.assertTrue(file_io.exists(blob_files[blob_field_name]))
+        blob_file_path = Path(self.temp_dir) / (blob_field_name + ".blob")
+        blob_file_url = _to_url(blob_file_path)
+        blob_files[blob_field_name] = blob_file_url
+        file_io.write_blob(blob_file_url, table, False)
+        self.assertTrue(file_io.exists(blob_file_url))
 
         # ========== Step 3: Read Data and Check Data ==========
         for field_name, file_path in blob_files.items():
@@ -597,7 +614,6 @@ class BlobEndToEndTest(unittest.TestCase):
         from pypaimon.common.file_io import FileIO
         from pypaimon.table.row.generic_row import GenericRow, 
GenericRowSerializer
         from pypaimon.table.row.row_kind import RowKind
-        from pathlib import Path
 
         # Set up file I/O
         file_io = FileIO(self.temp_dir, {})
@@ -678,10 +694,11 @@ class BlobEndToEndTest(unittest.TestCase):
         ], schema=multi_column_schema)
 
         multi_column_file = Path(self.temp_dir) / "multi_column.blob"
+        multi_column_url = _to_url(multi_column_file)
 
         # Should throw RuntimeError for multiple columns
         with self.assertRaises(RuntimeError) as context:
-            file_io.write_blob(multi_column_file, multi_column_table, False)
+            file_io.write_blob(multi_column_url, multi_column_table, False)
         self.assertIn("single column", str(context.exception))
 
         # Test that FileIO.write_blob rejects null values
@@ -689,10 +706,11 @@ class BlobEndToEndTest(unittest.TestCase):
         null_table = pa.table([[b"data", None]], schema=null_schema)
 
         null_file = Path(self.temp_dir) / "null_data.blob"
+        null_file_url = _to_url(null_file)
 
         # Should throw RuntimeError for null values
         with self.assertRaises(RuntimeError) as context:
-            file_io.write_blob(null_file, null_table, False)
+            file_io.write_blob(null_file_url, null_table, False)
         self.assertIn("null values", str(context.exception))
 
         # ========== Test FormatBlobReader with complex type schema ==========
@@ -702,7 +720,8 @@ class BlobEndToEndTest(unittest.TestCase):
         valid_table = pa.table([valid_blob_data], schema=valid_schema)
 
         valid_blob_file = Path(self.temp_dir) / "valid_blob.blob"
-        file_io.write_blob(valid_blob_file, valid_table, False)
+        valid_blob_url = _to_url(valid_blob_file)
+        file_io.write_blob(valid_blob_url, valid_table, False)
 
         # Try to read with complex type field definition - this should fail
         # because FormatBlobReader tries to create PyArrow schema with complex 
types
@@ -737,7 +756,6 @@ class BlobEndToEndTest(unittest.TestCase):
         from pypaimon.schema.data_types import DataField, AtomicType
         from pypaimon.common.file_io import FileIO
         from pypaimon.common.delta_varint_compressor import 
DeltaVarintCompressor
-        from pathlib import Path
 
         # Set up file I/O
         file_io = FileIO(self.temp_dir, {})
@@ -750,7 +768,8 @@ class BlobEndToEndTest(unittest.TestCase):
         valid_table = pa.table([valid_blob_data], schema=valid_schema)
 
         header_test_file = Path(self.temp_dir) / "header_test.blob"
-        file_io.write_blob(header_test_file, valid_table, False)
+        header_test_url = _to_url(header_test_file)
+        file_io.write_blob(header_test_url, valid_table, False)
 
         # Read the file and corrupt the header (last 5 bytes: index_length + 
version)
         with open(header_test_file, 'rb') as f:
@@ -788,7 +807,8 @@ class BlobEndToEndTest(unittest.TestCase):
         large_table = pa.table([large_blob_data], schema=large_schema)
 
         full_blob_file = Path(self.temp_dir) / "full_blob.blob"
-        file_io.write_blob(full_blob_file, large_table, False)
+        full_blob_url = _to_url(full_blob_file)
+        file_io.write_blob(full_blob_url, large_table, False)
 
         # Read the full file and truncate it in the middle
         with open(full_blob_file, 'rb') as f:
@@ -826,11 +846,12 @@ class BlobEndToEndTest(unittest.TestCase):
         zero_table = pa.table([zero_blob_data], schema=zero_schema)
 
         zero_blob_file = Path(self.temp_dir) / "zero_length.blob"
-        file_io.write_blob(zero_blob_file, zero_table, False)
+        zero_blob_url = _to_url(zero_blob_file)
+        file_io.write_blob(zero_blob_url, zero_table, False)
 
         # Verify file was created
-        self.assertTrue(file_io.exists(zero_blob_file))
-        file_size = file_io.get_file_size(zero_blob_file)
+        self.assertTrue(file_io.exists(zero_blob_url))
+        file_size = file_io.get_file_size(zero_blob_url)
         self.assertGreater(file_size, 0)  # File should have headers even with 
empty blob
 
         # Read zero-length blob
@@ -868,10 +889,11 @@ class BlobEndToEndTest(unittest.TestCase):
         large_sim_table = pa.table([simulated_large_data], 
schema=large_sim_schema)
 
         large_sim_file = Path(self.temp_dir) / "large_simulation.blob"
-        file_io.write_blob(large_sim_file, large_sim_table, False)
+        large_sim_url = _to_url(large_sim_file)
+        file_io.write_blob(large_sim_url, large_sim_table, False)
 
         # Verify large file was written
-        large_sim_size = file_io.get_file_size(large_sim_file)
+        large_sim_size = file_io.get_file_size(large_sim_url)
         self.assertGreater(large_sim_size, 10 * 1024 * 1024)  # Should be > 
10MB
 
         # Read large blob in memory-safe manner
@@ -937,10 +959,11 @@ class BlobEndToEndTest(unittest.TestCase):
         ], schema=multi_field_schema)
 
         multi_field_file = Path(self.temp_dir) / "multi_field.blob"
+        multi_field_url = _to_url(multi_field_file)
 
         # Should reject multi-field table
         with self.assertRaises(RuntimeError) as context:
-            file_io.write_blob(multi_field_file, multi_field_table, False)
+            file_io.write_blob(multi_field_url, multi_field_table, False)
         self.assertIn("single column", str(context.exception))
 
         # Test that blob format rejects non-binary field types
@@ -948,10 +971,11 @@ class BlobEndToEndTest(unittest.TestCase):
         non_binary_table = pa.table([["not_binary_data"]], 
schema=non_binary_schema)
 
         non_binary_file = Path(self.temp_dir) / "non_binary.blob"
+        non_binary_url = _to_url(non_binary_file)
 
         # Should reject non-binary field
         with self.assertRaises(RuntimeError) as context:
-            file_io.write_blob(non_binary_file, non_binary_table, False)
+            file_io.write_blob(non_binary_url, non_binary_table, False)
         # Should fail due to type conversion issues (non-binary field can't be 
converted to BLOB)
         self.assertTrue(
             "large_binary" in str(context.exception) or
@@ -965,10 +989,11 @@ class BlobEndToEndTest(unittest.TestCase):
         null_table = pa.table([[b"data", None, b"more_data"]], 
schema=null_schema)
 
         null_file = Path(self.temp_dir) / "with_nulls.blob"
+        null_file_url = _to_url(null_file)
 
         # Should reject null values
         with self.assertRaises(RuntimeError) as context:
-            file_io.write_blob(null_file, null_table, False)
+            file_io.write_blob(null_file_url, null_table, False)
         self.assertIn("null values", str(context.exception))
 
     def test_blob_end_to_end_with_descriptor(self):
@@ -1007,10 +1032,11 @@ class BlobEndToEndTest(unittest.TestCase):
 
         # Write the blob file with blob_as_descriptor=True
         blob_file_path = Path(self.temp_dir) / "descriptor_blob.blob"
-        file_io.write_blob(blob_file_path, table, blob_as_descriptor=True)
+        blob_file_url = _to_url(blob_file_path)
+        file_io.write_blob(blob_file_url, table, blob_as_descriptor=True)
         # Verify the blob file was created
-        self.assertTrue(file_io.exists(blob_file_path))
-        file_size = file_io.get_file_size(blob_file_path)
+        self.assertTrue(file_io.exists(blob_file_url))
+        file_size = file_io.get_file_size(blob_file_url)
         self.assertGreater(file_size, 0)
 
         # ========== Step 3: Read data and check ==========
diff --git a/paimon-python/pypaimon/tests/file_io_test.py 
b/paimon-python/pypaimon/tests/file_io_test.py
new file mode 100644
index 0000000000..1d6776058e
--- /dev/null
+++ b/paimon-python/pypaimon/tests/file_io_test.py
@@ -0,0 +1,127 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+import unittest
+from pathlib import Path
+
+from pyarrow.fs import S3FileSystem, LocalFileSystem
+
+from pypaimon.common.file_io import FileIO
+
+
+class FileIOTest(unittest.TestCase):
+    """Test cases for FileIO.to_filesystem_path method."""
+
+    def test_s3_filesystem_path_conversion(self):
+        """Test S3FileSystem path conversion with various formats."""
+        file_io = FileIO("s3://bucket/warehouse", {})
+        self.assertIsInstance(file_io.filesystem, S3FileSystem)
+
+        # Test bucket and path
+        
self.assertEqual(file_io.to_filesystem_path("s3://my-bucket/path/to/file.txt"),
+                         "my-bucket/path/to/file.txt")
+        
self.assertEqual(file_io.to_filesystem_path("oss://my-bucket/path/to/file.txt"),
+                         "my-bucket/path/to/file.txt")
+
+        # Test bucket only
+        self.assertEqual(file_io.to_filesystem_path("s3://my-bucket"), 
"my-bucket")
+        self.assertEqual(file_io.to_filesystem_path("oss://my-bucket"), 
"my-bucket")
+
+        # Test scheme but no netloc
+        
self.assertEqual(file_io.to_filesystem_path("oss:///path/to/file.txt"), 
"path/to/file.txt")
+        self.assertEqual(file_io.to_filesystem_path("s3:///path/to/file.txt"), 
"path/to/file.txt")
+
+        # Test empty path
+        self.assertEqual(file_io.to_filesystem_path("oss:///"), ".")
+
+        # Test path without scheme
+        self.assertEqual(file_io.to_filesystem_path("bucket/path/to/file.txt"),
+                         "bucket/path/to/file.txt")
+
+        # Test idempotency
+        converted_path = "my-bucket/path/to/file.txt"
+        self.assertEqual(file_io.to_filesystem_path(converted_path), 
converted_path)
+        parent_str = str(Path(converted_path).parent)
+        self.assertEqual(file_io.to_filesystem_path(parent_str), parent_str)
+
+    def test_local_filesystem_path_conversion(self):
+        """Test LocalFileSystem path conversion with various formats."""
+        file_io = FileIO("file:///tmp/warehouse", {})
+        self.assertIsInstance(file_io.filesystem, LocalFileSystem)
+
+        # Test file:// scheme
+        
self.assertEqual(file_io.to_filesystem_path("file:///tmp/path/to/file.txt"),
+                         "/tmp/path/to/file.txt")
+        
self.assertEqual(file_io.to_filesystem_path("file:///path/to/file.txt"),
+                         "/path/to/file.txt")
+
+        # Test empty paths
+        self.assertEqual(file_io.to_filesystem_path("file://"), ".")
+        self.assertEqual(file_io.to_filesystem_path("file:///"), "/")
+
+        # Test paths without scheme
+        self.assertEqual(file_io.to_filesystem_path("/tmp/path/to/file.txt"),
+                         "/tmp/path/to/file.txt")
+        
self.assertEqual(file_io.to_filesystem_path("relative/path/to/file.txt"),
+                         "relative/path/to/file.txt")
+        
self.assertEqual(file_io.to_filesystem_path("./relative/path/to/file.txt"),
+                         "./relative/path/to/file.txt")
+
+        # Test idempotency
+        converted_path = "/tmp/path/to/file.txt"
+        self.assertEqual(file_io.to_filesystem_path(converted_path), 
converted_path)
+        parent_str = str(Path(converted_path).parent)
+        self.assertEqual(file_io.to_filesystem_path(parent_str), parent_str)
+
+    def test_windows_path_handling(self):
+        """Test Windows path handling (drive letters, file:// scheme)."""
+        file_io = FileIO("file:///tmp/warehouse", {})
+        self.assertIsInstance(file_io.filesystem, LocalFileSystem)
+
+        # Windows absolute paths
+        self.assertEqual(file_io.to_filesystem_path("C:\\path\\to\\file.txt"),
+                         "C:\\path\\to\\file.txt")
+        self.assertEqual(file_io.to_filesystem_path("C:/path/to/file.txt"),
+                         "C:/path/to/file.txt")
+        self.assertEqual(file_io.to_filesystem_path("C:"), "C:")
+
+        # file:// scheme with Windows drive
+        
self.assertEqual(file_io.to_filesystem_path("file://C:/path/to/file.txt"),
+                         "C:/path/to/file.txt")
+        self.assertEqual(file_io.to_filesystem_path("file://C:/path"), 
"C:/path")
+        self.assertEqual(file_io.to_filesystem_path("file://C:"), "C:")
+        
self.assertEqual(file_io.to_filesystem_path("file:///C:/path/to/file.txt"),
+                         "/C:/path/to/file.txt")
+
+        # Windows path with S3FileSystem (should preserve)
+        s3_file_io = FileIO("s3://bucket/warehouse", {})
+        
self.assertEqual(s3_file_io.to_filesystem_path("C:\\path\\to\\file.txt"),
+                         "C:\\path\\to\\file.txt")
+
+    def test_path_normalization(self):
+        """Test path normalization (multiple slashes)."""
+        file_io = FileIO("file:///tmp/warehouse", {})
+        
self.assertEqual(file_io.to_filesystem_path("file://///tmp///path///file.txt"),
+                         "/tmp/path/file.txt")
+
+        s3_file_io = FileIO("s3://bucket/warehouse", {})
+        
self.assertEqual(s3_file_io.to_filesystem_path("s3://my-bucket///path///to///file.txt"),
+                         "my-bucket/path/to/file.txt")
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/paimon-python/pypaimon/tests/file_store_commit_test.py 
b/paimon-python/pypaimon/tests/file_store_commit_test.py
index ab566c3e52..f1fe4de3d2 100644
--- a/paimon-python/pypaimon/tests/file_store_commit_test.py
+++ b/paimon-python/pypaimon/tests/file_store_commit_test.py
@@ -18,7 +18,6 @@
 
 import unittest
 from datetime import datetime
-from pathlib import Path
 from unittest.mock import Mock, patch
 
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
@@ -41,7 +40,7 @@ class TestFileStoreCommit(unittest.TestCase):
         self.mock_table = Mock()
         self.mock_table.partition_keys = ['dt', 'region']
         self.mock_table.current_branch.return_value = 'main'
-        self.mock_table.table_path = Path('/test/table/path')
+        self.mock_table.table_path = '/test/table/path'
         self.mock_table.file_io = Mock()
 
         # Mock snapshot commit
diff --git a/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py 
b/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py
new file mode 100644
index 0000000000..810e001341
--- /dev/null
+++ b/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py
@@ -0,0 +1,115 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+import os
+import tempfile
+import unittest
+from unittest.mock import patch
+
+from pypaimon.catalog.rest.rest_token_file_io import RESTTokenFileIO
+from pypaimon.common.file_io import FileIO
+from pypaimon.common.identifier import Identifier
+
+
+class RESTTokenFileIOTest(unittest.TestCase):
+    """Test cases for RESTTokenFileIO."""
+
+    def setUp(self):
+        """Set up test fixtures."""
+        self.temp_dir = tempfile.mkdtemp(prefix="rest_token_file_io_test_")
+        self.warehouse_path = f"file://{self.temp_dir}"
+        self.identifier = Identifier.from_string("default.test_table")
+        self.catalog_options = {}
+
+    def tearDown(self):
+        """Clean up test fixtures."""
+        import shutil
+        if os.path.exists(self.temp_dir):
+            shutil.rmtree(self.temp_dir)
+
+    def test_new_output_stream_path_conversion_and_parent_creation(self):
+        """Test new_output_stream correctly handles URI paths and creates 
parent directories."""
+        with patch.object(RESTTokenFileIO, 'try_to_refresh_token'):
+            file_io = RESTTokenFileIO(
+                self.identifier,
+                self.warehouse_path,
+                self.catalog_options
+            )
+
+            # Test with file:// URI path - should convert and create parent 
directory
+            test_file_path = f"file://{self.temp_dir}/subdir/test.txt"
+            test_content = b"test content"
+            expected_path = f"{self.temp_dir}/subdir/test.txt"
+
+            with file_io.new_output_stream(test_file_path) as stream:
+                stream.write(test_content)
+
+            self.assertTrue(os.path.exists(expected_path),
+                            f"File should be created at {expected_path}")
+            with open(expected_path, 'rb') as f:
+                self.assertEqual(f.read(), test_content)
+
+            # Test nested path - should create multiple parent directories
+            nested_path = 
f"file://{self.temp_dir}/level1/level2/level3/nested.txt"
+            parent_dir = f"{self.temp_dir}/level1/level2/level3"
+            self.assertFalse(os.path.exists(parent_dir))
+
+            with file_io.new_output_stream(nested_path) as stream:
+                stream.write(b"nested content")
+
+            self.assertTrue(os.path.exists(parent_dir),
+                            f"Parent directory should be created at 
{parent_dir}")
+            self.assertTrue(os.path.exists(f"{parent_dir}/nested.txt"))
+
+            # Test relative path
+            original_cwd = os.getcwd()
+            try:
+                os.chdir(self.temp_dir)
+                relative_path = "relative_test.txt"
+                with file_io.new_output_stream(relative_path) as stream:
+                    stream.write(b"relative content")
+                expected_relative = os.path.join(self.temp_dir, relative_path)
+                self.assertTrue(os.path.exists(expected_relative))
+            finally:
+                os.chdir(original_cwd)
+
+    def test_new_output_stream_behavior_matches_parent(self):
+        """Test that RESTTokenFileIO.new_output_stream behaves like 
FileIO.new_output_stream."""
+        with patch.object(RESTTokenFileIO, 'try_to_refresh_token'):
+            rest_file_io = RESTTokenFileIO(
+                self.identifier,
+                self.warehouse_path,
+                self.catalog_options
+            )
+            regular_file_io = FileIO(self.warehouse_path, self.catalog_options)
+
+            test_file_path = f"file://{self.temp_dir}/comparison/test.txt"
+            test_content = b"comparison content"
+
+            with rest_file_io.new_output_stream(test_file_path) as stream:
+                stream.write(test_content)
+
+            expected_path = f"{self.temp_dir}/comparison/test.txt"
+            self.assertTrue(os.path.exists(expected_path))
+
+            with regular_file_io.new_input_stream(test_file_path) as stream:
+                read_content = stream.read()
+                self.assertEqual(read_content, test_content)
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/paimon-python/pypaimon/tests/uri_reader_factory_test.py 
b/paimon-python/pypaimon/tests/uri_reader_factory_test.py
index 12088d746c..c111a4612e 100644
--- a/paimon-python/pypaimon/tests/uri_reader_factory_test.py
+++ b/paimon-python/pypaimon/tests/uri_reader_factory_test.py
@@ -18,7 +18,6 @@ limitations under the License.
 import os
 import tempfile
 import unittest
-from pathlib import Path
 from pypaimon.common.file_io import FileIO
 from pypaimon.common.uri_reader import UriReaderFactory, HttpUriReader, 
FileUriReader, UriReader
 
@@ -31,10 +30,12 @@ class MockFileIO:
 
     def get_file_size(self, path: str) -> int:
         """Get file size."""
-        return self._file_io.get_file_size(Path(path))
+        return self._file_io.get_file_size(path)
 
-    def new_input_stream(self, path: Path):
+    def new_input_stream(self, path):
         """Create new input stream for reading."""
+        if not isinstance(path, (str, type(None))):
+            path = str(path)
         return self._file_io.new_input_stream(path)
 
 
diff --git a/paimon-python/pypaimon/write/file_store_commit.py 
b/paimon-python/pypaimon/write/file_store_commit.py
index 224e4a82c9..ba4cee9e90 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -166,9 +166,10 @@ class FileStoreCommit:
         if not all(count == 0 for count in partition_null_counts):
             raise RuntimeError("Partition value should not be null")
 
+        manifest_file_path = 
f"{self.manifest_file_manager.manifest_path}/{new_manifest_file}"
         new_manifest_list = ManifestFileMeta(
             file_name=new_manifest_file,
-            
file_size=self.table.file_io.get_file_size(self.manifest_file_manager.manifest_path
 / new_manifest_file),
+            file_size=self.table.file_io.get_file_size(manifest_file_path),
             num_added_files=added_file_count,
             num_deleted_files=deleted_file_count,
             partition_stats=SimpleStats(
diff --git a/paimon-python/pypaimon/write/writer/blob_writer.py 
b/paimon-python/pypaimon/write/writer/blob_writer.py
index 09bce43a3f..f22577deac 100644
--- a/paimon-python/pypaimon/write/writer/blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/blob_writer.py
@@ -19,7 +19,6 @@
 import logging
 import uuid
 import pyarrow as pa
-from pathlib import Path
 from typing import Optional, Tuple
 
 from pypaimon.common.core_options import CoreOptions
@@ -46,7 +45,7 @@ class BlobWriter(AppendOnlyDataWriter):
         self.blob_target_file_size = 
CoreOptions.get_blob_target_file_size(options)
 
         self.current_writer: Optional[BlobFileWriter] = None
-        self.current_file_path: Optional[Path] = None
+        self.current_file_path: Optional[str] = None
         self.record_count = 0
 
         self.file_uuid = str(uuid.uuid4())
@@ -115,7 +114,7 @@ class BlobWriter(AppendOnlyDataWriter):
             return
 
         file_size = self.current_writer.close()
-        file_name = self.current_file_path.name
+        file_name = self.current_file_path.split('/')[-1]
         row_count = self.current_writer.row_count
 
         self._add_file_metadata(file_name, self.current_file_path, row_count, 
file_size)
@@ -147,7 +146,7 @@ class BlobWriter(AppendOnlyDataWriter):
         # Reuse _add_file_metadata for consistency (blob table is append-only, 
no primary keys)
         self._add_file_metadata(file_name, file_path, data, file_size)
 
-    def _add_file_metadata(self, file_name: str, file_path: Path, 
data_or_row_count, file_size: int):
+    def _add_file_metadata(self, file_name: str, file_path: str, 
data_or_row_count, file_size: int):
         """Add file metadata to committed_files."""
         from datetime import datetime
         from pypaimon.manifest.schema.data_file_meta import DataFileMeta
@@ -204,7 +203,7 @@ class BlobWriter(AppendOnlyDataWriter):
             external_path=None,
             first_row_id=None,
             write_cols=self.write_cols,
-            file_path=str(file_path),
+            file_path=file_path,
         ))
 
     def prepare_commit(self):
diff --git a/paimon-python/pypaimon/write/writer/data_blob_writer.py 
b/paimon-python/pypaimon/write/writer/data_blob_writer.py
index 9d2e0982a4..b3bcc9219c 100644
--- a/paimon-python/pypaimon/write/writer/data_blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_blob_writer.py
@@ -19,7 +19,6 @@
 import logging
 import uuid
 from datetime import datetime
-from pathlib import Path
 from typing import List, Optional, Tuple
 
 import pyarrow as pa
@@ -262,7 +261,7 @@ class DataBlobWriter(DataWriter):
         # Generate metadata
         return self._create_data_file_meta(file_name, file_path, data)
 
-    def _create_data_file_meta(self, file_name: str, file_path: Path, data: 
pa.Table) -> DataFileMeta:
+    def _create_data_file_meta(self, file_name: str, file_path: str, data: 
pa.Table) -> DataFileMeta:
         # Column stats (only for normal columns)
         column_stats = {
             field.name: self._get_column_stats(data, field.name)
@@ -303,7 +302,7 @@ class DataBlobWriter(DataWriter):
             delete_row_count=0,
             file_source=0,
             value_stats_cols=self.normal_column_names,
-            file_path=str(file_path),
+            file_path=file_path,
             write_cols=self.write_cols)
 
     def _validate_consistency(self, normal_meta: DataFileMeta, blob_metas: 
List[DataFileMeta]):
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py 
b/paimon-python/pypaimon/write/writer/data_writer.py
index 9cbb686441..fdc74d6f64 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -20,7 +20,6 @@ import pyarrow.compute as pc
 import uuid
 from abc import ABC, abstractmethod
 from datetime import datetime
-from pathlib import Path
 from typing import Dict, List, Optional, Tuple
 
 from pypaimon.common.core_options import CoreOptions
@@ -216,19 +215,21 @@ class DataWriter(ABC):
             first_row_id=None,
             write_cols=self.write_cols,
             # None means all columns in the table have been written
-            file_path=str(file_path),
+            file_path=file_path,
         ))
 
-    def _generate_file_path(self, file_name: str) -> Path:
-        path_builder = self.table.table_path
+    def _generate_file_path(self, file_name: str) -> str:
+        path_builder = str(self.table.table_path)
 
         for i, field_name in enumerate(self.table.partition_keys):
-            path_builder = path_builder / (field_name + "=" + 
str(self.partition[i]))
+            path_builder = 
f"{path_builder.rstrip('/')}/{field_name}={str(self.partition[i])}"
+
         if self.bucket == BucketMode.POSTPONE_BUCKET.value:
             bucket_name = "postpone"
         else:
             bucket_name = str(self.bucket)
-        path_builder = path_builder / ("bucket-" + bucket_name) / file_name
+
+        path_builder = 
f"{path_builder.rstrip('/')}/bucket-{bucket_name}/{file_name}"
 
         return path_builder
 

Reply via email to