This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3aa702b246 [python] refactor file_io to support polymorphism (#7040)
3aa702b246 is described below

commit 3aa702b246f7df8ca2c67a337362f2ad0b96d39d
Author: XiaoHongbo <[email protected]>
AuthorDate: Thu Jan 15 21:10:58 2026 +0800

    [python] refactor file_io to support polymorphism (#7040)
---
 .../pypaimon/catalog/filesystem_catalog.py         |   2 +-
 .../pypaimon/catalog/rest/rest_catalog.py          |   2 +-
 .../pypaimon/catalog/rest/rest_token_file_io.py    | 147 +++++-
 paimon-python/pypaimon/common/file_io.py           | 545 ++++-----------------
 paimon-python/pypaimon/common/uri_reader.py        |   2 +-
 paimon-python/pypaimon/filesystem/local_file_io.py | 439 +++++++++++++++++
 .../file_io.py => filesystem/pyarrow_file_io.py}   | 103 +---
 paimon-python/pypaimon/read/reader/lance_utils.py  |   3 +
 .../rest_catalog_blob_as_descriptor_sample.py      |   4 +-
 paimon-python/pypaimon/table/row/blob.py           |   2 +-
 paimon-python/pypaimon/tests/blob_test.py          |  15 +-
 paimon-python/pypaimon/tests/file_io_test.py       | 129 ++---
 .../pypaimon/tests/filesystem_catalog_test.py      |  17 +-
 paimon-python/pypaimon/tests/lance_utils_test.py   |   6 +-
 .../pypaimon/tests/py36/ao_simple_test.py          |   8 +-
 paimon-python/pypaimon/tests/rest/rest_server.py   |   2 +-
 .../pypaimon/tests/rest/rest_token_file_io_test.py |  86 +++-
 17 files changed, 847 insertions(+), 665 deletions(-)

diff --git a/paimon-python/pypaimon/catalog/filesystem_catalog.py 
b/paimon-python/pypaimon/catalog/filesystem_catalog.py
index 8d1b485b74..65a4538966 100644
--- a/paimon-python/pypaimon/catalog/filesystem_catalog.py
+++ b/paimon-python/pypaimon/catalog/filesystem_catalog.py
@@ -46,7 +46,7 @@ class FileSystemCatalog(Catalog):
             raise ValueError(f"Paimon '{CatalogOptions.WAREHOUSE.key()}' path 
must be set")
         self.warehouse = catalog_options.get(CatalogOptions.WAREHOUSE)
         self.catalog_options = catalog_options
-        self.file_io = FileIO(self.warehouse, self.catalog_options)
+        self.file_io = FileIO.get(self.warehouse, self.catalog_options)
 
     def get_database(self, name: str) -> Database:
         if self.file_io.exists(self.get_database_path(name)):
diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py 
b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
index 5d9462f6c3..41a3061fb9 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
@@ -252,7 +252,7 @@ class RESTCatalog(Catalog):
         )
 
     def file_io_from_options(self, table_path: str) -> FileIO:
-        return FileIO(table_path, self.context.options)
+        return FileIO.get(table_path, self.context.options)
 
     def file_io_for_data(self, table_path: str, identifier: Identifier):
         return RESTTokenFileIO(identifier, table_path, self.context.options) \
diff --git a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py 
b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
index 2cec5df721..f686dc66ea 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
@@ -20,70 +20,164 @@ import threading
 import time
 from typing import Optional
 
-from pyarrow._fs import FileSystem
+from cachetools import TTLCache
 
 from pypaimon.api.rest_api import RESTApi
 from pypaimon.api.rest_util import RESTUtil
 from pypaimon.catalog.rest.rest_token import RESTToken
 from pypaimon.common.file_io import FileIO
+from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO
 from pypaimon.common.identifier import Identifier
 from pypaimon.common.options import Options
 from pypaimon.common.options.config import CatalogOptions, OssOptions
+from pypaimon.common.uri_reader import UriReaderFactory
 
 
 class RESTTokenFileIO(FileIO):
-
+    """
+    A FileIO to support getting token from REST Server.
+    """
+    
+    _FILE_IO_CACHE_MAXSIZE = 1000
+    _FILE_IO_CACHE_TTL = 36000  # 10 hours in seconds
+    
     def __init__(self, identifier: Identifier, path: str,
                  catalog_options: Optional[Options] = None):
         self.identifier = identifier
         self.path = path
+        self.catalog_options = catalog_options
+        self.properties = catalog_options or Options({})  # For compatibility 
with refresh_token()
         self.token: Optional[RESTToken] = None
         self.api_instance: Optional[RESTApi] = None
         self.lock = threading.Lock()
         self.log = logging.getLogger(__name__)
-        super().__init__(path, catalog_options)
+        self._uri_reader_factory_cache: Optional[UriReaderFactory] = None
+        self._file_io_cache: TTLCache = TTLCache(
+            maxsize=self._FILE_IO_CACHE_MAXSIZE,
+            ttl=self._FILE_IO_CACHE_TTL
+        )
 
     def __getstate__(self):
         state = self.__dict__.copy()
         # Remove non-serializable objects
         state.pop('lock', None)
         state.pop('api_instance', None)
+        state.pop('_file_io_cache', None)
+        state.pop('_uri_reader_factory_cache', None)
         # token can be serialized, but we'll refresh it on deserialization
         return state
 
     def __setstate__(self, state):
         self.__dict__.update(state)
-        # Recreate lock after deserialization
+        # Recreate lock and cache after deserialization
         self.lock = threading.Lock()
+        self._file_io_cache = TTLCache(
+            maxsize=self._FILE_IO_CACHE_MAXSIZE,
+            ttl=self._FILE_IO_CACHE_TTL
+        )
+        self._uri_reader_factory_cache = None
         # api_instance will be recreated when needed
         self.api_instance = None
 
-    def _initialize_oss_fs(self, path) -> FileSystem:
+    def file_io(self) -> FileIO:
         self.try_to_refresh_token()
-        merged_token = self._merge_token_with_catalog_options(self.token.token)
-        merged_properties = RESTUtil.merge(
-            self.properties.to_map() if self.properties else {},
-            merged_token
-        )
-        merged_options = Options(merged_properties)
-        original_properties = self.properties
-        self.properties = merged_options
-        try:
-            return super()._initialize_oss_fs(path)
-        finally:
-            self.properties = original_properties
+
+        if self.token is None:
+            return FileIO.get(self.path, self.catalog_options or Options({}))
+        
+        cache_key = self.token
+        
+        file_io = self._file_io_cache.get(cache_key)
+        if file_io is not None:
+            return file_io
+        
+        with self.lock:
+            file_io = self._file_io_cache.get(cache_key)
+            if file_io is not None:
+                return file_io
+            
+            merged_token = 
self._merge_token_with_catalog_options(self.token.token)
+            merged_properties = RESTUtil.merge(
+                self.catalog_options.to_map() if self.catalog_options else {},
+                merged_token
+            )
+            merged_options = Options(merged_properties)
+            
+            file_io = PyArrowFileIO(self.path, merged_options)
+            self._file_io_cache[cache_key] = file_io
+            return file_io
 
     def _merge_token_with_catalog_options(self, token: dict) -> dict:
         """Merge token with catalog options, DLF OSS endpoint should override 
the standard OSS endpoint."""
         merged_token = dict(token)
-        dlf_oss_endpoint = self.properties.get(CatalogOptions.DLF_OSS_ENDPOINT)
-        if dlf_oss_endpoint and dlf_oss_endpoint.strip():
-            merged_token[OssOptions.OSS_ENDPOINT.key()] = dlf_oss_endpoint
+        if self.catalog_options:
+            dlf_oss_endpoint = 
self.catalog_options.get(CatalogOptions.DLF_OSS_ENDPOINT)
+            if dlf_oss_endpoint and dlf_oss_endpoint.strip():
+                merged_token[OssOptions.OSS_ENDPOINT.key()] = dlf_oss_endpoint
         return merged_token
 
+    def new_input_stream(self, path: str):
+        return self.file_io().new_input_stream(path)
+
     def new_output_stream(self, path: str):
-        # Call parent class method to ensure path conversion and parent 
directory creation
-        return super().new_output_stream(path)
+        return self.file_io().new_output_stream(path)
+
+    def get_file_status(self, path: str):
+        return self.file_io().get_file_status(path)
+
+    def list_status(self, path: str):
+        return self.file_io().list_status(path)
+
+    def exists(self, path: str) -> bool:
+        return self.file_io().exists(path)
+
+    def delete(self, path: str, recursive: bool = False) -> bool:
+        return self.file_io().delete(path, recursive)
+
+    def mkdirs(self, path: str) -> bool:
+        return self.file_io().mkdirs(path)
+
+    def rename(self, src: str, dst: str) -> bool:
+        return self.file_io().rename(src, dst)
+
+    def copy_file(self, source_path: str, target_path: str, overwrite: bool = 
False):
+        return self.file_io().copy_file(source_path, target_path, overwrite)
+
+    def to_filesystem_path(self, path: str) -> str:
+        return self.file_io().to_filesystem_path(path)
+
+    def try_to_write_atomic(self, path: str, content: str) -> bool:
+        return self.file_io().try_to_write_atomic(path, content)
+
+    def write_parquet(self, path: str, data, compression: str = 'zstd',
+                      zstd_level: int = 1, **kwargs):
+        return self.file_io().write_parquet(path, data, compression, 
zstd_level, **kwargs)
+
+    def write_orc(self, path: str, data, compression: str = 'zstd',
+                  zstd_level: int = 1, **kwargs):
+        return self.file_io().write_orc(path, data, compression, zstd_level, 
**kwargs)
+
+    def write_avro(self, path: str, data, avro_schema=None,
+                   compression: str = 'zstd', zstd_level: int = 1, **kwargs):
+        return self.file_io().write_avro(path, data, avro_schema, compression, 
zstd_level, **kwargs)
+
+    def write_lance(self, path: str, data, **kwargs):
+        return self.file_io().write_lance(path, data, **kwargs)
+
+    def write_blob(self, path: str, data, blob_as_descriptor: bool, **kwargs):
+        return self.file_io().write_blob(path, data, blob_as_descriptor, 
**kwargs)
+
+    @property
+    def uri_reader_factory(self):
+        if self._uri_reader_factory_cache is None:
+            catalog_options = self.catalog_options or Options({})
+            self._uri_reader_factory_cache = UriReaderFactory(catalog_options)
+        
+        return self._uri_reader_factory_cache
+
+    @property
+    def filesystem(self):
+        return self.file_io().filesystem
 
     def try_to_refresh_token(self):
         if self.should_refresh():
@@ -111,3 +205,12 @@ class RESTTokenFileIO(FileIO):
     def valid_token(self):
         self.try_to_refresh_token()
         return self.token
+
+    def close(self):
+        with self.lock:
+            for file_io in self._file_io_cache.values():
+                try:
+                    file_io.close()
+                except Exception as e:
+                    self.log.warning(f"Error closing cached FileIO: {e}")
+            self._file_io_cache.clear()
diff --git a/paimon-python/pypaimon/common/file_io.py 
b/paimon-python/pypaimon/common/file_io.py
index 3e039248b7..536e06c0b1 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -16,298 +16,78 @@
 # limitations under the License.
 
################################################################################
 import logging
-import os
-import subprocess
 import uuid
+from abc import ABC, abstractmethod
 from pathlib import Path
-from typing import Any, Dict, List, Optional
-from urllib.parse import splitport, urlparse
+from typing import List, Optional
 
 import pyarrow
-from packaging.version import parse
-from pyarrow._fs import FileSystem
 
 from pypaimon.common.options import Options
-from pypaimon.common.options.config import OssOptions, S3Options
-from pypaimon.common.uri_reader import UriReaderFactory
-from pypaimon.filesystem.local import PaimonLocalFileSystem
-from pypaimon.schema.data_types import DataField, AtomicType, 
PyarrowFieldParser
-from pypaimon.table.row.blob import BlobData, BlobDescriptor, Blob
-from pypaimon.table.row.generic_row import GenericRow
-from pypaimon.table.row.row_kind import RowKind
-from pypaimon.write.blob_format_writer import BlobFormatWriter
-
-
-class FileIO:
-    def __init__(self, path: str, catalog_options: Options):
-        self.properties = catalog_options
-        self.logger = logging.getLogger(__name__)
-        scheme, netloc, _ = self.parse_location(path)
-        self.uri_reader_factory = UriReaderFactory(catalog_options)
-        if scheme in {"oss"}:
-            self.filesystem = self._initialize_oss_fs(path)
-        elif scheme in {"s3", "s3a", "s3n"}:
-            self.filesystem = self._initialize_s3_fs()
-        elif scheme in {"hdfs", "viewfs"}:
-            self.filesystem = self._initialize_hdfs_fs(scheme, netloc)
-        elif scheme in {"file"}:
-            self.filesystem = self._initialize_local_fs()
-        else:
-            raise ValueError(f"Unrecognized filesystem type in URI: {scheme}")
-
-    @staticmethod
-    def parse_location(location: str):
-        uri = urlparse(location)
-        if not uri.scheme:
-            return "file", uri.netloc, os.path.abspath(location)
-        elif uri.scheme in ("hdfs", "viewfs"):
-            return uri.scheme, uri.netloc, uri.path
-        else:
-            return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}"
-
-    @staticmethod
-    def _create_s3_retry_config(
-            max_attempts: int = 10,
-            request_timeout: int = 60,
-            connect_timeout: int = 60
-    ) -> Dict[str, Any]:
-        """
-        AwsStandardS3RetryStrategy and timeout parameters are only available
-        in PyArrow >= 8.0.0.
-        """
-        if parse(pyarrow.__version__) >= parse("8.0.0"):
-            config = {
-                'request_timeout': request_timeout,
-                'connect_timeout': connect_timeout
-            }
-            try:
-                from pyarrow.fs import AwsStandardS3RetryStrategy
-                retry_strategy = 
AwsStandardS3RetryStrategy(max_attempts=max_attempts)
-                config['retry_strategy'] = retry_strategy
-            except ImportError:
-                pass
-            return config
-        else:
-            return {}
-
-    def _extract_oss_bucket(self, location) -> str:
-        uri = urlparse(location)
-        if uri.scheme and uri.scheme != "oss":
-            raise ValueError("Not an OSS URI: {}".format(location))
-
-        netloc = uri.netloc or ""
-        # parse oss://access_id:secret_key@Endpoint/bucket/path/to/object
-        if (getattr(uri, "username", None) or getattr(uri, "password", None)) 
or ("@" in netloc):
-            first_segment = uri.path.lstrip("/").split("/", 1)[0]
-            if not first_segment:
-                raise ValueError("Invalid OSS URI without bucket: 
{}".format(location))
-            return first_segment
-
-        # parse oss://bucket/... or oss://bucket.endpoint/...
-        host = getattr(uri, "hostname", None) or netloc
-        if not host:
-            raise ValueError("Invalid OSS URI without host: 
{}".format(location))
-        bucket = host.split(".", 1)[0]
-        if not bucket:
-            raise ValueError("Invalid OSS URI without bucket: 
{}".format(location))
-        return bucket
-
-    def _initialize_oss_fs(self, path) -> FileSystem:
-        from pyarrow.fs import S3FileSystem
-
-        client_kwargs = {
-            "access_key": self.properties.get(OssOptions.OSS_ACCESS_KEY_ID),
-            "secret_key": 
self.properties.get(OssOptions.OSS_ACCESS_KEY_SECRET),
-            "session_token": 
self.properties.get(OssOptions.OSS_SECURITY_TOKEN),
-            "region": self.properties.get(OssOptions.OSS_REGION),
-        }
-
-        # Based on https://github.com/apache/arrow/issues/40506
-        if parse(pyarrow.__version__) >= parse("7.0.0"):
-            client_kwargs['force_virtual_addressing'] = True
-            client_kwargs['endpoint_override'] = 
self.properties.get(OssOptions.OSS_ENDPOINT)
-        else:
-            oss_bucket = self._extract_oss_bucket(path)
-            client_kwargs['endpoint_override'] = (oss_bucket + "." +
-                                                  
self.properties.get(OssOptions.OSS_ENDPOINT))
-
-        retry_config = self._create_s3_retry_config()
-        client_kwargs.update(retry_config)
 
-        return S3FileSystem(**client_kwargs)
-
-    def _initialize_s3_fs(self) -> FileSystem:
-        from pyarrow.fs import S3FileSystem
-
-        client_kwargs = {
-            "endpoint_override": self.properties.get(S3Options.S3_ENDPOINT),
-            "access_key": self.properties.get(S3Options.S3_ACCESS_KEY_ID),
-            "secret_key": self.properties.get(S3Options.S3_ACCESS_KEY_SECRET),
-            "session_token": self.properties.get(S3Options.S3_SECURITY_TOKEN),
-            "region": self.properties.get(S3Options.S3_REGION),
-            "force_virtual_addressing": True,
-        }
-
-        retry_config = self._create_s3_retry_config()
-        client_kwargs.update(retry_config)
-
-        return S3FileSystem(**client_kwargs)
-
-    def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> 
FileSystem:
-        from pyarrow.fs import HadoopFileSystem
-
-        if 'HADOOP_HOME' not in os.environ:
-            raise RuntimeError("HADOOP_HOME environment variable is not set.")
-        if 'HADOOP_CONF_DIR' not in os.environ:
-            raise RuntimeError("HADOOP_CONF_DIR environment variable is not 
set.")
-
-        hadoop_home = os.environ.get("HADOOP_HOME")
-        native_lib_path = f"{hadoop_home}/lib/native"
-        os.environ['LD_LIBRARY_PATH'] = 
f"{native_lib_path}:{os.environ.get('LD_LIBRARY_PATH', '')}"
-
-        class_paths = subprocess.run(
-            [f'{hadoop_home}/bin/hadoop', 'classpath', '--glob'],
-            capture_output=True,
-            text=True,
-            check=True
-        )
-        os.environ['CLASSPATH'] = class_paths.stdout.strip()
-
-        host, port_str = splitport(netloc)
-        return HadoopFileSystem(
-            host=host,
-            port=int(port_str),
-            user=os.environ.get('HADOOP_USER_NAME', 'hadoop')
-        )
-
-    def _initialize_local_fs(self) -> FileSystem:
-
-        return PaimonLocalFileSystem()
 
+class FileIO(ABC):
+    """
+    File IO interface to read and write files.
+    """
+    
+    @abstractmethod
     def new_input_stream(self, path: str):
-        path_str = self.to_filesystem_path(path)
-        return self.filesystem.open_input_file(path_str)
+        pass
 
+    @abstractmethod
     def new_output_stream(self, path: str):
-        path_str = self.to_filesystem_path(path)
-        parent_dir = Path(path_str).parent
-        if str(parent_dir) and not self.exists(str(parent_dir)):
-            self.mkdirs(str(parent_dir))
-
-        return self.filesystem.open_output_stream(path_str)
-
+        pass
+    
+    @abstractmethod
     def get_file_status(self, path: str):
-        path_str = self.to_filesystem_path(path)
-        file_infos = self.filesystem.get_file_info([path_str])
-        file_info = file_infos[0]
-        
-        if file_info.type == pyarrow.fs.FileType.NotFound:
-            raise FileNotFoundError(f"File {path} (resolved as {path_str}) 
does not exist")
-        
-        return file_info
-
+        pass
+    
+    @abstractmethod
     def list_status(self, path: str):
-        path_str = self.to_filesystem_path(path)
-        selector = pyarrow.fs.FileSelector(path_str, recursive=False, 
allow_not_found=True)
-        return self.filesystem.get_file_info(selector)
-
-    def list_directories(self, path: str):
-        file_infos = self.list_status(path)
-        return [info for info in file_infos if info.type == 
pyarrow.fs.FileType.Directory]
+        pass
 
+    @abstractmethod
     def exists(self, path: str) -> bool:
-        path_str = self.to_filesystem_path(path)
-        file_info = self.filesystem.get_file_info([path_str])[0]
-        return file_info.type != pyarrow.fs.FileType.NotFound
+        pass
 
+    @abstractmethod
     def delete(self, path: str, recursive: bool = False) -> bool:
-        path_str = self.to_filesystem_path(path)
-        file_info = self.filesystem.get_file_info([path_str])[0]
-        
-        if file_info.type == pyarrow.fs.FileType.NotFound:
-            return False
-        
-        if file_info.type == pyarrow.fs.FileType.Directory:
-            if not recursive:
-                selector = pyarrow.fs.FileSelector(path_str, recursive=False, 
allow_not_found=True)
-                dir_contents = self.filesystem.get_file_info(selector)
-                if len(dir_contents) > 0:
-                    raise OSError(f"Directory {path} is not empty")
-            if recursive:
-                self.filesystem.delete_dir_contents(path_str)
-                self.filesystem.delete_dir(path_str)
-            else:
-                self.filesystem.delete_dir(path_str)
-        else:
-            self.filesystem.delete_file(path_str)
-        return True
-
+        pass
+    
+    @abstractmethod
     def mkdirs(self, path: str) -> bool:
-        path_str = self.to_filesystem_path(path)
-        file_info = self.filesystem.get_file_info([path_str])[0]
-        
-        if file_info.type == pyarrow.fs.FileType.Directory:
-            return True
-        elif file_info.type == pyarrow.fs.FileType.File:
-            raise FileExistsError(f"Path exists but is not a directory: 
{path}")
-        
-        self.filesystem.create_dir(path_str, recursive=True)
-        return True
-
+        pass
+    
+    @abstractmethod
     def rename(self, src: str, dst: str) -> bool:
-        dst_str = self.to_filesystem_path(dst)
-        dst_parent = Path(dst_str).parent
-        if str(dst_parent) and not self.exists(str(dst_parent)):
-            self.mkdirs(str(dst_parent))
-        
-        src_str = self.to_filesystem_path(src)
-        
-        try:
-            if hasattr(self.filesystem, 'rename'):
-                return self.filesystem.rename(src_str, dst_str)
-            
-            dst_file_info = self.filesystem.get_file_info([dst_str])[0]
-            if dst_file_info.type != pyarrow.fs.FileType.NotFound:
-                if dst_file_info.type == pyarrow.fs.FileType.File:
-                    return False
-                # Make it compatible with HadoopFileIO: if dst is an existing 
directory,
-                # dst=dst/srcFileName
-                src_name = Path(src_str).name
-                dst_str = str(Path(dst_str) / src_name)
-                final_dst_info = self.filesystem.get_file_info([dst_str])[0]
-                if final_dst_info.type != pyarrow.fs.FileType.NotFound:
-                    return False
-            
-            self.filesystem.move(src_str, dst_str)
-            return True
-        except FileNotFoundError:
-            return False
-        except (PermissionError, OSError):
-            return False
-
+        pass
+    
     def delete_quietly(self, path: str):
-        if self.logger.isEnabledFor(logging.DEBUG):
-            self.logger.debug(f"Ready to delete {path}")
+        logger = logging.getLogger(__name__)
+        if logger.isEnabledFor(logging.DEBUG):
+            logger.debug(f"Ready to delete {path}")
 
         try:
             if not self.delete(path, False) and self.exists(path):
-                self.logger.warning(f"Failed to delete file {path}")
+                logger.warning(f"Failed to delete file {path}")
         except Exception:
-            self.logger.warning(f"Exception occurs when deleting file {path}", 
exc_info=True)
+            logger.warning(f"Exception occurs when deleting file {path}", 
exc_info=True)
 
     def delete_files_quietly(self, files: List[str]):
         for file_path in files:
             self.delete_quietly(file_path)
 
     def delete_directory_quietly(self, directory: str):
-        if self.logger.isEnabledFor(logging.DEBUG):
-            self.logger.debug(f"Ready to delete {directory}")
+        logger = logging.getLogger(__name__)
+        if logger.isEnabledFor(logging.DEBUG):
+            logger.debug(f"Ready to delete {directory}")
 
         try:
             if not self.delete(directory, True) and self.exists(directory):
-                self.logger.warning(f"Failed to delete directory {directory}")
+                logger.warning(f"Failed to delete directory {directory}")
         except Exception:
-            self.logger.warning(f"Exception occurs when deleting directory 
{directory}", exc_info=True)
+            logger.warning(f"Exception occurs when deleting directory 
{directory}", exc_info=True)
 
     def get_file_size(self, path: str) -> int:
         file_info = self.get_file_status(path)
@@ -332,9 +112,7 @@ class FileIO:
 
     def try_to_write_atomic(self, path: str, content: str) -> bool:
         if self.exists(path):
-            path_str = self.to_filesystem_path(path)
-            file_info = self.filesystem.get_file_info([path_str])[0]
-            if file_info.type == pyarrow.fs.FileType.Directory:
+            if self.is_dir(path):
                 return False
         
         temp_path = path + str(uuid.uuid4()) + ".tmp"
@@ -345,7 +123,7 @@ class FileIO:
         finally:
             if not success:
                 self.delete_quietly(temp_path)
-            return success
+        return success
 
     def write_file(self, path: str, content: str, overwrite: bool = False):
         if not overwrite and self.exists(path):
@@ -362,14 +140,15 @@ class FileIO:
         if not overwrite and self.exists(target_path):
             raise FileExistsError(f"Target file {target_path} already exists 
and overwrite=False")
 
-        source_str = self.to_filesystem_path(source_path)
         target_str = self.to_filesystem_path(target_path)
         target_parent = Path(target_str).parent
         
         if str(target_parent) and not self.exists(str(target_parent)):
             self.mkdirs(str(target_parent))
 
-        self.filesystem.copy_file(source_str, target_str)
+        with self.new_input_stream(source_path) as input_stream:
+            with self.new_output_stream(target_path) as output_stream:
+                output_stream.write(input_stream.read())
 
     def copy_files(self, source_directory: str, target_directory: str, 
overwrite: bool = False):
         file_infos = self.list_status(source_directory)
@@ -407,196 +186,58 @@ class FileIO:
 
         return None
 
-    def write_parquet(self, path: str, data: pyarrow.Table, compression: str = 
'zstd',
-                      zstd_level: int = 1, **kwargs):
-        try:
-            import pyarrow.parquet as pq
+    def to_filesystem_path(self, path: str) -> str:
+        return path
 
-            with self.new_output_stream(path) as output_stream:
-                if compression.lower() == 'zstd':
-                    kwargs['compression_level'] = zstd_level
-                pq.write_table(data, output_stream, compression=compression, 
**kwargs)
+    def parse_location(self, location: str):
+        from urllib.parse import urlparse
+        import os
 
-        except Exception as e:
-            self.delete_quietly(path)
-            raise RuntimeError(f"Failed to write Parquet file {path}: {e}") 
from e
+        uri = urlparse(location)
+        if not uri.scheme:
+            return "file", uri.netloc, os.path.abspath(location)
+        elif uri.scheme in ("hdfs", "viewfs"):
+            return uri.scheme, uri.netloc, uri.path
+        else:
+            return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}"
 
-    def write_orc(self, path: str, data: pyarrow.Table, compression: str = 
'zstd',
+    def write_parquet(self, path: str, data, compression: str = 'zstd',
+                      zstd_level: int = 1, **kwargs):
+        raise NotImplementedError("write_parquet must be implemented by FileIO 
subclasses")
+
+    def write_orc(self, path: str, data, compression: str = 'zstd',
                   zstd_level: int = 1, **kwargs):
-        try:
-            """Write ORC file using PyArrow ORC writer.
-            
-            Note: PyArrow's ORC writer doesn't support compression_level 
parameter.
-            ORC files will use zstd compression with default level
-            (which is 3, see 
https://github.com/facebook/zstd/blob/dev/programs/zstdcli.c)
-            instead of the specified level.
-            """
-            import sys
-            import pyarrow.orc as orc
-
-            with self.new_output_stream(path) as output_stream:
-                # Check Python version - if 3.6, don't use compression 
parameter
-                if sys.version_info[:2] == (3, 6):
-                    orc.write_table(data, output_stream, **kwargs)
-                else:
-                    orc.write_table(
-                        data,
-                        output_stream,
-                        compression=compression,
-                        **kwargs
-                    )
-
-        except Exception as e:
-            self.delete_quietly(path)
-            raise RuntimeError(f"Failed to write ORC file {path}: {e}") from e
-
-    def write_avro(
-            self, path: str, data: pyarrow.Table,
-            avro_schema: Optional[Dict[str, Any]] = None,
-            compression: str = 'zstd', zstd_level: int = 1, **kwargs):
-        import fastavro
-        if avro_schema is None:
-            from pypaimon.schema.data_types import PyarrowFieldParser
-            avro_schema = PyarrowFieldParser.to_avro_schema(data.schema)
-
-        records_dict = data.to_pydict()
-
-        def record_generator():
-            num_rows = len(list(records_dict.values())[0])
-            for i in range(num_rows):
-                yield {col: records_dict[col][i] for col in 
records_dict.keys()}
-
-        records = record_generator()
-
-        codec_map = {
-            'null': 'null',
-            'deflate': 'deflate',
-            'snappy': 'snappy',
-            'bzip2': 'bzip2',
-            'xz': 'xz',
-            'zstandard': 'zstandard',
-            'zstd': 'zstandard',  # zstd is commonly used in Paimon
-        }
-        compression_lower = compression.lower()
+        raise NotImplementedError("write_orc must be implemented by FileIO 
subclasses")
+
+    def write_avro(self, path: str, data, avro_schema=None,
+                   compression: str = 'zstd', zstd_level: int = 1, **kwargs):
+        raise NotImplementedError("write_avro must be implemented by FileIO 
subclasses")
+
+    def write_lance(self, path: str, data, **kwargs):
+        raise NotImplementedError("write_lance must be implemented by FileIO 
subclasses")
+
+    def write_blob(self, path: str, data, blob_as_descriptor: bool, **kwargs):
+        """Write Blob format file. Must be implemented by subclasses."""
+        raise NotImplementedError("write_blob must be implemented by FileIO 
subclasses")
+    
+    def close(self):
+        pass
+    
+    @staticmethod
+    def get(path: str, catalog_options: Optional[Options] = None) -> 'FileIO':
+        """
+        Returns a FileIO instance for accessing the file system identified by 
the given path.
+        - LocalFileIO for local file system (file:// or no scheme)
+        - PyArrowFileIO for remote file systems (oss://, s3://, hdfs://, etc.)
+        """
+        from urllib.parse import urlparse
         
-        codec = codec_map.get(compression_lower)
-        if codec is None:
-            raise ValueError(
-                f"Unsupported compression '{compression}' for Avro format. "
-                f"Supported compressions: {', 
'.join(sorted(codec_map.keys()))}."
-            )
-
-        with self.new_output_stream(path) as output_stream:
-            if codec == 'zstandard':
-                kwargs['codec_compression_level'] = zstd_level
-            fastavro.writer(output_stream, avro_schema, records, codec=codec, 
**kwargs)
-
-    def write_lance(self, path: str, data: pyarrow.Table, **kwargs):
-        try:
-            import lance
-            from pypaimon.read.reader.lance_utils import to_lance_specified
-            file_path_for_lance, storage_options = to_lance_specified(self, 
path)
-
-            writer = lance.file.LanceFileWriter(
-                file_path_for_lance, data.schema, 
storage_options=storage_options, **kwargs)
-            try:
-                # Write all batches
-                for batch in data.to_batches():
-                    writer.write_batch(batch)
-            finally:
-                writer.close()
-        except Exception as e:
-            self.delete_quietly(path)
-            raise RuntimeError(f"Failed to write Lance file {path}: {e}") from 
e
-
-    def write_blob(self, path: str, data: pyarrow.Table, blob_as_descriptor: 
bool, **kwargs):
-        try:
-            # Validate input constraints
-            if data.num_columns != 1:
-                raise RuntimeError(f"Blob format only supports a single 
column, got {data.num_columns} columns")
-            # Check for null values
-            column = data.column(0)
-            if column.null_count > 0:
-                raise RuntimeError("Blob format does not support null values")
-            # Convert PyArrow schema to Paimon DataFields
-            # For blob files, we expect exactly one blob column
-            field = data.schema[0]
-            if pyarrow.types.is_large_binary(field.type):
-                fields = [DataField(0, field.name, AtomicType("BLOB"))]
-            else:
-                # Convert other types as needed
-                paimon_type = PyarrowFieldParser.to_paimon_type(field.type, 
field.nullable)
-                fields = [DataField(0, field.name, paimon_type)]
-            # Convert PyArrow Table to records
-            records_dict = data.to_pydict()
-            num_rows = data.num_rows
-            field_name = fields[0].name
-            with self.new_output_stream(path) as output_stream:
-                writer = BlobFormatWriter(output_stream)
-                # Write each row
-                for i in range(num_rows):
-                    col_data = records_dict[field_name][i]
-                    # Convert to appropriate type based on field type
-                    if hasattr(fields[0].type, 'type') and fields[0].type.type 
== "BLOB":
-                        if blob_as_descriptor:
-                            blob_descriptor = 
BlobDescriptor.deserialize(col_data)
-                            uri_reader = 
self.uri_reader_factory.create(blob_descriptor.uri)
-                            blob_data = Blob.from_descriptor(uri_reader, 
blob_descriptor)
-                        elif isinstance(col_data, bytes):
-                            blob_data = BlobData(col_data)
-                        else:
-                            # Convert to bytes if needed
-                            if hasattr(col_data, 'as_py'):
-                                col_data = col_data.as_py()
-                            if isinstance(col_data, str):
-                                col_data = col_data.encode('utf-8')
-                            blob_data = BlobData(col_data)
-                        row_values = [blob_data]
-                    else:
-                        row_values = [col_data]
-                    # Create GenericRow and write
-                    row = GenericRow(row_values, fields, RowKind.INSERT)
-                    writer.add_element(row)
-                writer.close()
-
-        except Exception as e:
-            self.delete_quietly(path)
-            raise RuntimeError(f"Failed to write blob file {path}: {e}") from e
-
-    def to_filesystem_path(self, path: str) -> str:
-        from pyarrow.fs import S3FileSystem
-        import re
-
-        parsed = urlparse(path)
-        normalized_path = re.sub(r'/+', '/', parsed.path) if parsed.path else 
''
-
-        if parsed.scheme and len(parsed.scheme) == 1 and not parsed.netloc:
-            # This is likely a Windows drive letter, not a URI scheme
-            return str(path)
-
-        if parsed.scheme == 'file' and parsed.netloc and 
parsed.netloc.endswith(':'):
-            # file://C:/path format - netloc is 'C:', need to reconstruct path 
with drive letter
-            drive_letter = parsed.netloc.rstrip(':')
-            path_part = normalized_path.lstrip('/')
-            return f"{drive_letter}:/{path_part}" if path_part else 
f"{drive_letter}:"
-
-        if isinstance(self.filesystem, S3FileSystem):
-            # For S3, return "bucket/path" format
-            if parsed.scheme:
-                if parsed.netloc:
-                    # Has netloc (bucket): return "bucket/path" format
-                    path_part = normalized_path.lstrip('/')
-                    return f"{parsed.netloc}/{path_part}" if path_part else 
parsed.netloc
-                else:
-                    # Has scheme but no netloc: return path without scheme
-                    result = normalized_path.lstrip('/')
-                    return result if result else '.'
-            return str(path)
-
-        if parsed.scheme:
-            # Handle empty path: return '.' for current directory
-            if not normalized_path:
-                return '.'
-            return normalized_path
-
-        return str(path)
+        uri = urlparse(path)
+        scheme = uri.scheme
+        
+        if not scheme or scheme == "file":
+            from pypaimon.filesystem.local_file_io import LocalFileIO
+            return LocalFileIO(path, catalog_options)
+        
+        from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO
+        return PyArrowFileIO(path, catalog_options or Options({}))
diff --git a/paimon-python/pypaimon/common/uri_reader.py 
b/paimon-python/pypaimon/common/uri_reader.py
index b05ce77253..50718f0849 100644
--- a/paimon-python/pypaimon/common/uri_reader.py
+++ b/paimon-python/pypaimon/common/uri_reader.py
@@ -148,7 +148,7 @@ class UriReaderFactory:
             # Import FileIO here to avoid circular imports
             from pypaimon.common.file_io import FileIO
             uri_string = parsed_uri.geturl()
-            file_io = FileIO(uri_string, self.catalog_options)
+            file_io = FileIO.get(uri_string, self.catalog_options)
             return UriReader.from_file(file_io)
         except Exception as e:
             raise RuntimeError(f"Failed to create reader for URI 
{parsed_uri.geturl()}") from e
diff --git a/paimon-python/pypaimon/filesystem/local_file_io.py 
b/paimon-python/pypaimon/filesystem/local_file_io.py
new file mode 100644
index 0000000000..183eb0ea13
--- /dev/null
+++ b/paimon-python/pypaimon/filesystem/local_file_io.py
@@ -0,0 +1,439 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+import logging
+import os
+import shutil
+import threading
+import uuid
+from pathlib import Path
+from typing import Any, Dict, Optional
+from urllib.parse import urlparse
+
+import pyarrow
+import pyarrow.fs
+
+from pypaimon.common.file_io import FileIO
+from pypaimon.common.options import Options
+from pypaimon.common.uri_reader import UriReaderFactory
+from pypaimon.filesystem.local import PaimonLocalFileSystem
+from pypaimon.schema.data_types import DataField, AtomicType, 
PyarrowFieldParser
+from pypaimon.table.row.blob import BlobData, BlobDescriptor, Blob
+from pypaimon.table.row.generic_row import GenericRow
+from pypaimon.table.row.row_kind import RowKind
+from pypaimon.write.blob_format_writer import BlobFormatWriter
+
+
+class LocalFileIO(FileIO):
+    """
+    Local file system implementation of FileIO.
+    """
+    
+    RENAME_LOCK = threading.Lock()
+    
+    def __init__(self, path: str = None, catalog_options: Optional[Options] = 
None):
+        self.logger = logging.getLogger(__name__)
+        self.path = path
+        self.properties = catalog_options or Options({})
+        self.filesystem = PaimonLocalFileSystem()
+        self.uri_reader_factory = UriReaderFactory(self.properties)
+    
+    @staticmethod
+    def create():
+        return LocalFileIO()
+    
+    def _to_file(self, path: str) -> Path:
+        parsed = urlparse(path)
+        
+        if parsed.scheme and len(parsed.scheme) == 1 and not parsed.netloc:
+            return Path(path)
+        
+        if parsed.scheme == 'file' and parsed.netloc and 
parsed.netloc.endswith(':'):
+            drive_letter = parsed.netloc.rstrip(':')
+            path_part = parsed.path.lstrip('/') if parsed.path else ''
+            if path_part:
+                return Path(f"{drive_letter}:/{path_part}")
+            else:
+                return Path(f"{drive_letter}:")
+        
+        local_path = parsed.path if parsed.scheme else path
+        
+        if not local_path:
+            return Path(".")
+        
+        return Path(local_path)
+    
+    def new_input_stream(self, path: str):
+        if self.logger.isEnabledFor(logging.DEBUG):
+            self.logger.debug(f"Invoking new_input_stream for {path}")
+        
+        file_path = self._to_file(path)
+        if not file_path.exists():
+            raise FileNotFoundError(f"File {path} does not exist")
+        
+        return open(file_path, 'rb')
+    
+    def new_output_stream(self, path: str):
+        if self.logger.isEnabledFor(logging.DEBUG):
+            self.logger.debug(f"Invoking new_output_stream for {path}")
+        
+        file_path = self._to_file(path)
+        # Create parent directories if needed
+        parent = file_path.parent
+        if parent and not parent.exists():
+            parent.mkdir(parents=True, exist_ok=True)
+        
+        return open(file_path, 'wb')
+    
+    def get_file_status(self, path: str):
+        if self.logger.isEnabledFor(logging.DEBUG):
+            self.logger.debug(f"Invoking get_file_status for {path}")
+        
+        file_path = self._to_file(path)
+        if not file_path.exists():
+            import getpass
+            user = getpass.getuser()
+            raise FileNotFoundError(
+                f"File {path} does not exist or the user running "
+                f"Paimon ('{user}') has insufficient permissions to access it."
+            )
+        
+        class LocalFileStatus:
+            def __init__(self, file_path: Path, original_path: str):
+                stat_info = file_path.stat()
+                self.path = str(file_path.absolute())
+                self.original_path = original_path
+                self.size = stat_info.st_size if file_path.is_file() else None
+                self.type = (
+                    pyarrow.fs.FileType.Directory if file_path.is_dir()
+                    else pyarrow.fs.FileType.File if file_path.is_file()
+                    else pyarrow.fs.FileType.NotFound
+                )
+                self.mtime = stat_info.st_mtime
+        
+        return LocalFileStatus(file_path, path)
+    
+    def list_status(self, path: str):
+        if self.logger.isEnabledFor(logging.DEBUG):
+            self.logger.debug(f"Invoking list_status for {path}")
+        
+        file_path = self._to_file(path)
+        results = []
+        
+        if not file_path.exists():
+            return results
+        
+        if file_path.is_file():
+            results.append(self.get_file_status(path))
+        elif file_path.is_dir():
+            try:
+                for item in file_path.iterdir():
+                    try:
+                        if path.startswith('file://'):
+                            item_path = f"file://{item}"
+                        else:
+                            item_path = str(item)
+                        results.append(self.get_file_status(item_path))
+                    except FileNotFoundError:
+                        pass
+            except PermissionError:
+                pass
+        
+        return results
+    
+    def exists(self, path: str) -> bool:
+        if self.logger.isEnabledFor(logging.DEBUG):
+            self.logger.debug(f"Invoking exists for {path}")
+        
+        file_path = self._to_file(path)
+        return file_path.exists()
+    
+    def delete(self, path: str, recursive: bool = False) -> bool:
+        if self.logger.isEnabledFor(logging.DEBUG):
+            self.logger.debug(f"Invoking delete for {path}")
+        
+        file_path = self._to_file(path)
+        
+        if not file_path.exists():
+            return False
+        
+        if file_path.is_file():
+            file_path.unlink()
+            return True
+        elif file_path.is_dir():
+            if not recursive:
+                try:
+                    items = list(file_path.iterdir())
+                    if items:
+                        raise OSError(f"Directory {path} is not empty")
+                except PermissionError:
+                    raise OSError(
+                        f"Directory {path} does not exist or an I/O error 
occurred"
+                    )
+                file_path.rmdir()
+            else:
+                shutil.rmtree(file_path)
+            return True
+        
+        return False
+    
+    def mkdirs(self, path: str) -> bool:
+        if self.logger.isEnabledFor(logging.DEBUG):
+            self.logger.debug(f"Invoking mkdirs for {path}")
+        
+        file_path = self._to_file(path)
+        
+        if file_path.is_dir():
+            return True
+        elif file_path.exists() and not file_path.is_dir():
+            raise FileExistsError(f"Path exists but is not a directory: 
{path}")
+        
+        file_path.mkdir(parents=True, exist_ok=True)
+        return True
+    
+    def rename(self, src: str, dst: str) -> bool:
+        if self.logger.isEnabledFor(logging.DEBUG):
+            self.logger.debug(f"Invoking rename for {src} to {dst}")
+        
+        src_file = self._to_file(src)
+        dst_file = self._to_file(dst)
+        
+        dst_parent = dst_file.parent
+        if dst_parent and not dst_parent.exists():
+            dst_parent.mkdir(parents=True, exist_ok=True)
+        
+        try:
+            with LocalFileIO.RENAME_LOCK:
+                if dst_file.exists():
+                    if dst_file.is_file():
+                        return False
+                    # Make it compatible with HadoopFileIO: if dst is an 
existing directory,
+                    # dst=dst/srcFileName
+                    dst_file = dst_file / src_file.name
+                    if dst_file.exists():
+                        return False
+                
+                # Perform atomic move
+                src_file.rename(dst_file)
+                return True
+        except FileNotFoundError:
+            return False
+        except (PermissionError, OSError):
+            return False
+    
+    def try_to_write_atomic(self, path: str, content: str) -> bool:
+        file_path = self._to_file(path)
+        if file_path.exists() and file_path.is_dir():
+            return False
+        
+        parent = file_path.parent
+        if parent and not parent.exists():
+            parent.mkdir(parents=True, exist_ok=True)
+        
+        temp_path = file_path.parent / f"{file_path.name}.{uuid.uuid4()}.tmp"
+        success = False
+        try:
+            with open(temp_path, 'w', encoding='utf-8') as f:
+                f.write(content)
+            success = self.rename(str(temp_path), path)
+        finally:
+            if not success and temp_path.exists():
+                self.delete_quietly(str(temp_path))
+        return success
+    
+    def copy_file(self, source_path: str, target_path: str, overwrite: bool = 
False):
+        if not overwrite and self.exists(target_path):
+            raise FileExistsError(f"Target file {target_path} already exists 
and overwrite=False")
+        
+        source_file = self._to_file(source_path)
+        target_file = self._to_file(target_path)
+        
+        target_parent = target_file.parent
+        if target_parent and not target_parent.exists():
+            target_parent.mkdir(parents=True, exist_ok=True)
+        
+        shutil.copy2(source_file, target_file)
+    
+    def to_filesystem_path(self, path: str) -> str:
+        file_path = self._to_file(path)
+        result = str(file_path)
+        parsed = urlparse(path)
+        original_path = parsed.path if parsed.scheme else path
+        if original_path.startswith('./') and not result.startswith('./'):
+            result = './' + result
+        return result
+    
+    @staticmethod
+    def parse_location(location: str):
+        uri = urlparse(location)
+        if not uri.scheme:
+            return "file", uri.netloc, os.path.abspath(location)
+        elif uri.scheme == "file":
+            return "file", uri.netloc, uri.path
+        else:
+            raise ValueError(f"LocalFileIO only supports file:// scheme, got 
{uri.scheme}")
+    
+    def write_parquet(self, path: str, data: pyarrow.Table, compression: str = 
'zstd',
+                      zstd_level: int = 1, **kwargs):
+        try:
+            import pyarrow.parquet as pq
+            
+            file_path = self._to_file(path)
+            parent = file_path.parent
+            if parent and not parent.exists():
+                parent.mkdir(parents=True, exist_ok=True)
+            
+            with open(file_path, 'wb') as f:
+                if compression.lower() == 'zstd':
+                    kwargs['compression_level'] = zstd_level
+                pq.write_table(data, f, compression=compression, **kwargs)
+        except Exception as e:
+            self.delete_quietly(path)
+            raise RuntimeError(f"Failed to write Parquet file {path}: {e}") 
from e
+    
+    def write_orc(self, path: str, data: pyarrow.Table, compression: str = 
'zstd',
+                  zstd_level: int = 1, **kwargs):
+        try:
+            import sys
+            import pyarrow.orc as orc
+            
+            file_path = self._to_file(path)
+            parent = file_path.parent
+            if parent and not parent.exists():
+                parent.mkdir(parents=True, exist_ok=True)
+            
+            with open(file_path, 'wb') as f:
+                if sys.version_info[:2] == (3, 6):
+                    orc.write_table(data, f, **kwargs)
+                else:
+                    orc.write_table(data, f, compression=compression, **kwargs)
+        except Exception as e:
+            self.delete_quietly(path)
+            raise RuntimeError(f"Failed to write ORC file {path}: {e}") from e
+    
+    def write_avro(self, path: str, data: pyarrow.Table,
+                   avro_schema: Optional[Dict[str, Any]] = None,
+                   compression: str = 'zstd', zstd_level: int = 1, **kwargs):
+        import fastavro
+        if avro_schema is None:
+            avro_schema = PyarrowFieldParser.to_avro_schema(data.schema)
+        
+        records_dict = data.to_pydict()
+        
+        def record_generator():
+            num_rows = len(list(records_dict.values())[0])
+            for i in range(num_rows):
+                yield {col: records_dict[col][i] for col in 
records_dict.keys()}
+        
+        records = record_generator()
+        
+        codec_map = {
+            'null': 'null',
+            'deflate': 'deflate',
+            'snappy': 'snappy',
+            'bzip2': 'bzip2',
+            'xz': 'xz',
+            'zstandard': 'zstandard',
+            'zstd': 'zstandard',
+        }
+        compression_lower = compression.lower()
+        
+        codec = codec_map.get(compression_lower)
+        if codec is None:
+            raise ValueError(
+                f"Unsupported compression '{compression}' for Avro format. "
+                f"Supported compressions: {', 
'.join(sorted(codec_map.keys()))}."
+            )
+        
+        file_path = self._to_file(path)
+        parent = file_path.parent
+        if parent and not parent.exists():
+            parent.mkdir(parents=True, exist_ok=True)
+        
+        with open(file_path, 'wb') as output_stream:
+            if codec == 'zstandard':
+                kwargs['codec_compression_level'] = zstd_level
+            fastavro.writer(output_stream, avro_schema, records, codec=codec, 
**kwargs)
+    
+    def write_lance(self, path: str, data: pyarrow.Table, **kwargs):
+        try:
+            import lance
+            from pypaimon.read.reader.lance_utils import to_lance_specified
+            file_path_for_lance, storage_options = to_lance_specified(self, 
path)
+            
+            writer = lance.file.LanceFileWriter(
+                file_path_for_lance, data.schema, 
storage_options=storage_options, **kwargs)
+            try:
+                for batch in data.to_batches():
+                    writer.write_batch(batch)
+            finally:
+                writer.close()
+        except Exception as e:
+            self.delete_quietly(path)
+            raise RuntimeError(f"Failed to write Lance file {path}: {e}") from 
e
+    
+    def write_blob(self, path: str, data: pyarrow.Table, blob_as_descriptor: 
bool, **kwargs):
+        try:
+            if data.num_columns != 1:
+                raise RuntimeError(f"Blob format only supports a single 
column, got {data.num_columns} columns")
+            
+            column = data.column(0)
+            if column.null_count > 0:
+                raise RuntimeError("Blob format does not support null values")
+            
+            field = data.schema[0]
+            if pyarrow.types.is_large_binary(field.type):
+                fields = [DataField(0, field.name, AtomicType("BLOB"))]
+            else:
+                paimon_type = PyarrowFieldParser.to_paimon_type(field.type, 
field.nullable)
+                fields = [DataField(0, field.name, paimon_type)]
+            
+            records_dict = data.to_pydict()
+            num_rows = data.num_rows
+            field_name = fields[0].name
+            
+            file_path = self._to_file(path)
+            parent = file_path.parent
+            if parent and not parent.exists():
+                parent.mkdir(parents=True, exist_ok=True)
+            
+            with open(file_path, 'wb') as output_stream:
+                writer = BlobFormatWriter(output_stream)
+                for i in range(num_rows):
+                    col_data = records_dict[field_name][i]
+                    if hasattr(fields[0].type, 'type') and fields[0].type.type 
== "BLOB":
+                        if blob_as_descriptor:
+                            blob_descriptor = 
BlobDescriptor.deserialize(col_data)
+                            uri_reader = 
self.uri_reader_factory.create(blob_descriptor.uri)
+                            blob_data = Blob.from_descriptor(uri_reader, 
blob_descriptor)
+                        elif isinstance(col_data, bytes):
+                            blob_data = BlobData(col_data)
+                        else:
+                            if hasattr(col_data, 'as_py'):
+                                col_data = col_data.as_py()
+                            if isinstance(col_data, str):
+                                col_data = col_data.encode('utf-8')
+                            blob_data = BlobData(col_data)
+                        row_values = [blob_data]
+                    else:
+                        row_values = [col_data]
+                    row = GenericRow(row_values, fields, RowKind.INSERT)
+                    writer.add_element(row)
+                writer.close()
+        except Exception as e:
+            self.delete_quietly(path)
+            raise RuntimeError(f"Failed to write blob file {path}: {e}") from e
diff --git a/paimon-python/pypaimon/common/file_io.py 
b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
similarity index 83%
copy from paimon-python/pypaimon/common/file_io.py
copy to paimon-python/pypaimon/filesystem/pyarrow_file_io.py
index 3e039248b7..a2b83ed2cc 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
@@ -27,10 +27,10 @@ import pyarrow
 from packaging.version import parse
 from pyarrow._fs import FileSystem
 
+from pypaimon.common.file_io import FileIO
 from pypaimon.common.options import Options
 from pypaimon.common.options.config import OssOptions, S3Options
 from pypaimon.common.uri_reader import UriReaderFactory
-from pypaimon.filesystem.local import PaimonLocalFileSystem
 from pypaimon.schema.data_types import DataField, AtomicType, 
PyarrowFieldParser
 from pypaimon.table.row.blob import BlobData, BlobDescriptor, Blob
 from pypaimon.table.row.generic_row import GenericRow
@@ -38,7 +38,7 @@ from pypaimon.table.row.row_kind import RowKind
 from pypaimon.write.blob_format_writer import BlobFormatWriter
 
 
-class FileIO:
+class PyArrowFileIO(FileIO):
     def __init__(self, path: str, catalog_options: Options):
         self.properties = catalog_options
         self.logger = logging.getLogger(__name__)
@@ -50,8 +50,6 @@ class FileIO:
             self.filesystem = self._initialize_s3_fs()
         elif scheme in {"hdfs", "viewfs"}:
             self.filesystem = self._initialize_hdfs_fs(scheme, netloc)
-        elif scheme in {"file"}:
-            self.filesystem = self._initialize_local_fs()
         else:
             raise ValueError(f"Unrecognized filesystem type in URI: {scheme}")
 
@@ -71,10 +69,6 @@ class FileIO:
             request_timeout: int = 60,
             connect_timeout: int = 60
     ) -> Dict[str, Any]:
-        """
-        AwsStandardS3RetryStrategy and timeout parameters are only available
-        in PyArrow >= 8.0.0.
-        """
         if parse(pyarrow.__version__) >= parse("8.0.0"):
             config = {
                 'request_timeout': request_timeout,
@@ -96,14 +90,12 @@ class FileIO:
             raise ValueError("Not an OSS URI: {}".format(location))
 
         netloc = uri.netloc or ""
-        # parse oss://access_id:secret_key@Endpoint/bucket/path/to/object
         if (getattr(uri, "username", None) or getattr(uri, "password", None)) 
or ("@" in netloc):
             first_segment = uri.path.lstrip("/").split("/", 1)[0]
             if not first_segment:
                 raise ValueError("Invalid OSS URI without bucket: 
{}".format(location))
             return first_segment
 
-        # parse oss://bucket/... or oss://bucket.endpoint/...
         host = getattr(uri, "hostname", None) or netloc
         if not host:
             raise ValueError("Invalid OSS URI without host: 
{}".format(location))
@@ -122,7 +114,6 @@ class FileIO:
             "region": self.properties.get(OssOptions.OSS_REGION),
         }
 
-        # Based on https://github.com/apache/arrow/issues/40506
         if parse(pyarrow.__version__) >= parse("7.0.0"):
             client_kwargs['force_virtual_addressing'] = True
             client_kwargs['endpoint_override'] = 
self.properties.get(OssOptions.OSS_ENDPOINT)
@@ -180,10 +171,6 @@ class FileIO:
             user=os.environ.get('HADOOP_USER_NAME', 'hadoop')
         )
 
-    def _initialize_local_fs(self) -> FileSystem:
-
-        return PaimonLocalFileSystem()
-
     def new_input_stream(self, path: str):
         path_str = self.to_filesystem_path(path)
         return self.filesystem.open_input_file(path_str)
@@ -309,27 +296,6 @@ class FileIO:
         except Exception:
             self.logger.warning(f"Exception occurs when deleting directory 
{directory}", exc_info=True)
 
-    def get_file_size(self, path: str) -> int:
-        file_info = self.get_file_status(path)
-        if file_info.size is None:
-            raise ValueError(f"File size not available for {path}")
-        return file_info.size
-
-    def is_dir(self, path: str) -> bool:
-        file_info = self.get_file_status(path)
-        return file_info.type == pyarrow.fs.FileType.Directory
-
-    def check_or_mkdirs(self, path: str):
-        if self.exists(path):
-            if not self.is_dir(path):
-                raise ValueError(f"The path '{path}' should be a directory.")
-        else:
-            self.mkdirs(path)
-
-    def read_file_utf8(self, path: str) -> str:
-        with self.new_input_stream(path) as input_stream:
-            return input_stream.read().decode('utf-8')
-
     def try_to_write_atomic(self, path: str, content: str) -> bool:
         if self.exists(path):
             path_str = self.to_filesystem_path(path)
@@ -345,18 +311,7 @@ class FileIO:
         finally:
             if not success:
                 self.delete_quietly(temp_path)
-            return success
-
-    def write_file(self, path: str, content: str, overwrite: bool = False):
-        if not overwrite and self.exists(path):
-            raise FileExistsError(f"File {path} already exists and 
overwrite=False")
-
-        with self.new_output_stream(path) as output_stream:
-            output_stream.write(content.encode('utf-8'))
-
-    def overwrite_file_utf8(self, path: str, content: str):
-        with self.new_output_stream(path) as output_stream:
-            output_stream.write(content.encode('utf-8'))
+        return success
 
     def copy_file(self, source_path: str, target_path: str, overwrite: bool = 
False):
         if not overwrite and self.exists(target_path):
@@ -371,42 +326,6 @@ class FileIO:
 
         self.filesystem.copy_file(source_str, target_str)
 
-    def copy_files(self, source_directory: str, target_directory: str, 
overwrite: bool = False):
-        file_infos = self.list_status(source_directory)
-        for file_info in file_infos:
-            if file_info.type == pyarrow.fs.FileType.File:
-                source_file = file_info.path
-                file_name = source_file.split('/')[-1]
-                target_file = f"{target_directory.rstrip('/')}/{file_name}" if 
target_directory else file_name
-                self.copy_file(source_file, target_file, overwrite)
-
-    def read_overwritten_file_utf8(self, path: str) -> Optional[str]:
-        retry_number = 0
-        exception = None
-        while retry_number < 5:
-            try:
-                return self.read_file_utf8(path)
-            except FileNotFoundError:
-                return None
-            except Exception as e:
-                if not self.exists(path):
-                    return None
-
-                if 
(str(type(e).__name__).endswith("RemoteFileChangedException") or
-                        (str(e) and "Blocklist for" in str(e) and "has 
changed" in str(e))):
-                    exception = e
-                    retry_number += 1
-                else:
-                    raise e
-
-        if exception:
-            if isinstance(exception, Exception):
-                raise exception
-            else:
-                raise RuntimeError(exception)
-
-        return None
-
     def write_parquet(self, path: str, data: pyarrow.Table, compression: str = 
'zstd',
                       zstd_level: int = 1, **kwargs):
         try:
@@ -511,32 +430,24 @@ class FileIO:
 
     def write_blob(self, path: str, data: pyarrow.Table, blob_as_descriptor: 
bool, **kwargs):
         try:
-            # Validate input constraints
             if data.num_columns != 1:
                 raise RuntimeError(f"Blob format only supports a single 
column, got {data.num_columns} columns")
-            # Check for null values
             column = data.column(0)
             if column.null_count > 0:
                 raise RuntimeError("Blob format does not support null values")
-            # Convert PyArrow schema to Paimon DataFields
-            # For blob files, we expect exactly one blob column
             field = data.schema[0]
             if pyarrow.types.is_large_binary(field.type):
                 fields = [DataField(0, field.name, AtomicType("BLOB"))]
             else:
-                # Convert other types as needed
                 paimon_type = PyarrowFieldParser.to_paimon_type(field.type, 
field.nullable)
                 fields = [DataField(0, field.name, paimon_type)]
-            # Convert PyArrow Table to records
             records_dict = data.to_pydict()
             num_rows = data.num_rows
             field_name = fields[0].name
             with self.new_output_stream(path) as output_stream:
                 writer = BlobFormatWriter(output_stream)
-                # Write each row
                 for i in range(num_rows):
                     col_data = records_dict[field_name][i]
-                    # Convert to appropriate type based on field type
                     if hasattr(fields[0].type, 'type') and fields[0].type.type 
== "BLOB":
                         if blob_as_descriptor:
                             blob_descriptor = 
BlobDescriptor.deserialize(col_data)
@@ -545,7 +456,6 @@ class FileIO:
                         elif isinstance(col_data, bytes):
                             blob_data = BlobData(col_data)
                         else:
-                            # Convert to bytes if needed
                             if hasattr(col_data, 'as_py'):
                                 col_data = col_data.as_py()
                             if isinstance(col_data, str):
@@ -554,7 +464,6 @@ class FileIO:
                         row_values = [blob_data]
                     else:
                         row_values = [col_data]
-                    # Create GenericRow and write
                     row = GenericRow(row_values, fields, RowKind.INSERT)
                     writer.add_element(row)
                 writer.close()
@@ -571,30 +480,24 @@ class FileIO:
         normalized_path = re.sub(r'/+', '/', parsed.path) if parsed.path else 
''
 
         if parsed.scheme and len(parsed.scheme) == 1 and not parsed.netloc:
-            # This is likely a Windows drive letter, not a URI scheme
             return str(path)
 
         if parsed.scheme == 'file' and parsed.netloc and 
parsed.netloc.endswith(':'):
-            # file://C:/path format - netloc is 'C:', need to reconstruct path 
with drive letter
             drive_letter = parsed.netloc.rstrip(':')
             path_part = normalized_path.lstrip('/')
             return f"{drive_letter}:/{path_part}" if path_part else 
f"{drive_letter}:"
 
         if isinstance(self.filesystem, S3FileSystem):
-            # For S3, return "bucket/path" format
             if parsed.scheme:
                 if parsed.netloc:
-                    # Has netloc (bucket): return "bucket/path" format
                     path_part = normalized_path.lstrip('/')
                     return f"{parsed.netloc}/{path_part}" if path_part else 
parsed.netloc
                 else:
-                    # Has scheme but no netloc: return path without scheme
                     result = normalized_path.lstrip('/')
                     return result if result else '.'
             return str(path)
 
         if parsed.scheme:
-            # Handle empty path: return '.' for current directory
             if not normalized_path:
                 return '.'
             return normalized_path
diff --git a/paimon-python/pypaimon/read/reader/lance_utils.py 
b/paimon-python/pypaimon/read/reader/lance_utils.py
index 60c7763aa3..c219dc6704 100644
--- a/paimon-python/pypaimon/read/reader/lance_utils.py
+++ b/paimon-python/pypaimon/read/reader/lance_utils.py
@@ -26,6 +26,9 @@ from pypaimon.common.options.config import OssOptions
 
 def to_lance_specified(file_io: FileIO, file_path: str) -> Tuple[str, 
Optional[Dict[str, str]]]:
     """Convert path and extract storage options for Lance format."""
+    if hasattr(file_io, 'file_io'):
+        file_io = file_io.file_io()
+    
     scheme, _, _ = file_io.parse_location(file_path)
     storage_options = None
     file_path_for_lance = file_io.to_filesystem_path(file_path)
diff --git 
a/paimon-python/pypaimon/sample/rest_catalog_blob_as_descriptor_sample.py 
b/paimon-python/pypaimon/sample/rest_catalog_blob_as_descriptor_sample.py
index 802d1def10..2770fad136 100644
--- a/paimon-python/pypaimon/sample/rest_catalog_blob_as_descriptor_sample.py
+++ b/paimon-python/pypaimon/sample/rest_catalog_blob_as_descriptor_sample.py
@@ -23,8 +23,8 @@ import pyarrow as pa
 
 from pypaimon import Schema
 from pypaimon.table.row.blob import BlobDescriptor, Blob
-from pypaimon.common.file_io import FileIO
 from pypaimon.common.options import Options
+from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO
 
 
 def write_table_with_blob(catalog, video_file_path: str, external_oss_options: 
dict):
@@ -66,7 +66,7 @@ def write_table_with_blob(catalog, video_file_path: str, 
external_oss_options: d
 
     # Access external OSS file to get file size
     try:
-        external_file_io = FileIO(video_file_path, 
Options(external_oss_options))
+        external_file_io = PyArrowFileIO(video_file_path, 
Options(external_oss_options))
         video_file_size = external_file_io.get_file_size(video_file_path)
     except Exception as e:
         raise FileNotFoundError(
diff --git a/paimon-python/pypaimon/table/row/blob.py 
b/paimon-python/pypaimon/table/row/blob.py
index b92ddb3e82..a9a2e2fd49 100644
--- a/paimon-python/pypaimon/table/row/blob.py
+++ b/paimon-python/pypaimon/table/row/blob.py
@@ -155,7 +155,7 @@ class Blob(ABC):
             file_uri = file
         else:
             file_uri = f"file://{file}"
-        file_io = FileIO(file_uri, {})
+        file_io = FileIO.get(file_uri, {})
         uri_reader = FileUriReader(file_io)
         descriptor = BlobDescriptor(file, 0, -1)
         return Blob.from_descriptor(uri_reader, descriptor)
diff --git a/paimon-python/pypaimon/tests/blob_test.py 
b/paimon-python/pypaimon/tests/blob_test.py
index e12d031c7a..91808bc014 100644
--- a/paimon-python/pypaimon/tests/blob_test.py
+++ b/paimon-python/pypaimon/tests/blob_test.py
@@ -25,6 +25,7 @@ import pyarrow as pa
 
 from pypaimon import CatalogFactory
 from pypaimon.common.file_io import FileIO
+from pypaimon.filesystem.local_file_io import LocalFileIO
 from pypaimon.common.options import Options
 from pypaimon.read.reader.format_blob_reader import FormatBlobReader
 from pypaimon.schema.data_types import AtomicType, DataField
@@ -108,7 +109,7 @@ class BlobTest(unittest.TestCase):
 
     def test_from_file_with_offset_and_length(self):
         """Test Blob.from_file() method with offset and length."""
-        file_io = FileIO(self.file if self.file.startswith('file://') else 
f"file://{self.file}", Options({}))
+        file_io = LocalFileIO(self.file if self.file.startswith('file://') 
else f"file://{self.file}", Options({}))
         blob = Blob.from_file(file_io, self.file, 0, 4)
 
         # Verify it returns a BlobRef instance
@@ -204,7 +205,7 @@ class BlobTest(unittest.TestCase):
         self.assertIsInstance(blob_ref, Blob)
 
         # from_file should return BlobRef
-        file_io = FileIO(self.file if self.file.startswith('file://') else 
f"file://{self.file}", Options({}))
+        file_io = LocalFileIO(self.file if self.file.startswith('file://') 
else f"file://{self.file}", Options({}))
         blob_file = Blob.from_file(file_io, self.file, 0, 
os.path.getsize(self.file))
         self.assertIsInstance(blob_file, BlobRef)
         self.assertIsInstance(blob_file, Blob)
@@ -563,7 +564,7 @@ class BlobEndToEndTest(unittest.TestCase):
 
     def test_blob_end_to_end(self):
         # Set up file I/O
-        file_io = FileIO(self.temp_dir, Options({}))
+        file_io = LocalFileIO(self.temp_dir, Options({}))
 
         blob_field_name = "blob_field"
         # ========== Step 1: Check Type Validation ==========
@@ -612,12 +613,11 @@ class BlobEndToEndTest(unittest.TestCase):
         """Test that complex types containing BLOB elements throw exceptions 
during read/write operations."""
         from pypaimon.schema.data_types import DataField, AtomicType, 
ArrayType, MultisetType, MapType
         from pypaimon.table.row.blob import BlobData
-        from pypaimon.common.file_io import FileIO
         from pypaimon.table.row.generic_row import GenericRow, 
GenericRowSerializer
         from pypaimon.table.row.row_kind import RowKind
 
         # Set up file I/O
-        file_io = FileIO(self.temp_dir, Options({}))
+        file_io = LocalFileIO(self.temp_dir, Options({}))
 
         # ========== Test ArrayType(nullable=True, 
element_type=AtomicType("BLOB")) ==========
         array_fields = [
@@ -755,11 +755,10 @@ class BlobEndToEndTest(unittest.TestCase):
     def test_blob_advanced_scenarios(self):
         """Test advanced blob scenarios: corruption, truncation, zero-length, 
large blobs, compression, cross-format."""
         from pypaimon.schema.data_types import DataField, AtomicType
-        from pypaimon.common.file_io import FileIO
         from pypaimon.common.delta_varint_compressor import 
DeltaVarintCompressor
 
         # Set up file I/O
-        file_io = FileIO(self.temp_dir, Options({}))
+        file_io = LocalFileIO(self.temp_dir, Options({}))
 
         # ========== Test 1: Corrupted file header test ==========
 
@@ -999,7 +998,7 @@ class BlobEndToEndTest(unittest.TestCase):
 
     def test_blob_end_to_end_with_descriptor(self):
         # Set up file I/O
-        file_io = FileIO(self.temp_dir, Options({}))
+        file_io = LocalFileIO(self.temp_dir, Options({}))
 
         # ========== Step 1: Write data to local file ==========
         # Create test data and write it to a local file
diff --git a/paimon-python/pypaimon/tests/file_io_test.py 
b/paimon-python/pypaimon/tests/file_io_test.py
index ec5d6c4e82..6cdf713b0c 100644
--- a/paimon-python/pypaimon/tests/file_io_test.py
+++ b/paimon-python/pypaimon/tests/file_io_test.py
@@ -23,9 +23,11 @@ from pathlib import Path
 from unittest.mock import MagicMock, patch
 
 import pyarrow
-from pyarrow.fs import S3FileSystem, LocalFileSystem
+from pyarrow.fs import S3FileSystem
 
-from pypaimon.common.file_io import FileIO
+from pypaimon.common.options import Options
+from pypaimon.filesystem.local_file_io import LocalFileIO
+from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO
 
 
 class FileIOTest(unittest.TestCase):
@@ -33,7 +35,7 @@ class FileIOTest(unittest.TestCase):
 
     def test_s3_filesystem_path_conversion(self):
         """Test S3FileSystem path conversion with various formats."""
-        file_io = FileIO("s3://bucket/warehouse", {})
+        file_io = PyArrowFileIO("s3://bucket/warehouse", Options({}))
         self.assertIsInstance(file_io.filesystem, S3FileSystem)
 
         # Test bucket and path
@@ -64,9 +66,8 @@ class FileIOTest(unittest.TestCase):
         self.assertEqual(file_io.to_filesystem_path(parent_str), parent_str)
 
     def test_local_filesystem_path_conversion(self):
-        """Test LocalFileSystem path conversion with various formats."""
-        file_io = FileIO("file:///tmp/warehouse", {})
-        self.assertIsInstance(file_io.filesystem, LocalFileSystem)
+        file_io = LocalFileIO("file:///tmp/warehouse", Options({}))
+        self.assertIsInstance(file_io, LocalFileIO)
 
         # Test file:// scheme
         
self.assertEqual(file_io.to_filesystem_path("file:///tmp/path/to/file.txt"),
@@ -93,11 +94,9 @@ class FileIOTest(unittest.TestCase):
         self.assertEqual(file_io.to_filesystem_path(parent_str), parent_str)
 
     def test_windows_path_handling(self):
-        """Test Windows path handling (drive letters, file:// scheme)."""
-        file_io = FileIO("file:///tmp/warehouse", {})
-        self.assertIsInstance(file_io.filesystem, LocalFileSystem)
+        file_io = LocalFileIO("file:///tmp/warehouse", Options({}))
+        self.assertIsInstance(file_io, LocalFileIO)
 
-        # Windows absolute paths
         self.assertEqual(file_io.to_filesystem_path("C:\\path\\to\\file.txt"),
                          "C:\\path\\to\\file.txt")
         self.assertEqual(file_io.to_filesystem_path("C:/path/to/file.txt"),
@@ -113,17 +112,17 @@ class FileIOTest(unittest.TestCase):
                          "/C:/path/to/file.txt")
 
         # Windows path with S3FileSystem (should preserve)
-        s3_file_io = FileIO("s3://bucket/warehouse", {})
+        s3_file_io = PyArrowFileIO("s3://bucket/warehouse", Options({}))
         
self.assertEqual(s3_file_io.to_filesystem_path("C:\\path\\to\\file.txt"),
                          "C:\\path\\to\\file.txt")
 
     def test_path_normalization(self):
         """Test path normalization (multiple slashes)."""
-        file_io = FileIO("file:///tmp/warehouse", {})
+        file_io = LocalFileIO("file:///tmp/warehouse", Options({}))
         
self.assertEqual(file_io.to_filesystem_path("file://///tmp///path///file.txt"),
                          "/tmp/path/file.txt")
 
-        s3_file_io = FileIO("s3://bucket/warehouse", {})
+        s3_file_io = PyArrowFileIO("s3://bucket/warehouse", Options({}))
         
self.assertEqual(s3_file_io.to_filesystem_path("s3://my-bucket///path///to///file.txt"),
                          "my-bucket/path/to/file.txt")
 
@@ -131,7 +130,7 @@ class FileIOTest(unittest.TestCase):
         temp_dir = tempfile.mkdtemp(prefix="file_io_write_test_")
         try:
             warehouse_path = f"file://{temp_dir}"
-            file_io = FileIO(warehouse_path, {})
+            file_io = LocalFileIO(warehouse_path, Options({}))
 
             test_file_uri = f"file://{temp_dir}/overwrite_test.txt"
             expected_path = os.path.join(temp_dir, "overwrite_test.txt")
@@ -161,7 +160,7 @@ class FileIOTest(unittest.TestCase):
         temp_dir = tempfile.mkdtemp(prefix="file_io_exists_test_")
         try:
             warehouse_path = f"file://{temp_dir}"
-            file_io = FileIO(warehouse_path, {})
+            file_io = LocalFileIO(warehouse_path, Options({}))
 
             test_file = os.path.join(temp_dir, "test_file.txt")
             with open(test_file, "w") as f:
@@ -169,35 +168,48 @@ class FileIOTest(unittest.TestCase):
             self.assertTrue(file_io.exists(f"file://{test_file}"))
             
self.assertFalse(file_io.exists(f"file://{temp_dir}/nonexistent.txt"))
 
-            mock_filesystem = MagicMock()
-            mock_filesystem.get_file_info.side_effect = OSError("Permission 
denied")
-            file_io.filesystem = mock_filesystem
+            mock_path = MagicMock(spec=Path)
+            mock_path.exists.side_effect = OSError("Permission denied")
+            with patch.object(file_io, '_to_file', return_value=mock_path):
+                with self.assertRaises(OSError) as context:
+                    file_io.exists("file:///some/path")
+                self.assertIn("Permission denied", str(context.exception))
 
-            with self.assertRaises(OSError) as context:
-                file_io.exists("file:///some/path")
-            self.assertIn("Permission denied", str(context.exception))
-
-            with self.assertRaises(OSError):
-                file_io.new_output_stream("file:///some/path/file.txt")
-
-            with self.assertRaises(OSError):
-                file_io.check_or_mkdirs("file:///some/path")
-
-            with self.assertRaises(OSError):
-                file_io.write_file("file:///some/path", "content", 
overwrite=False)
-
-            with self.assertRaises(OSError):
-                file_io.copy_file("file:///src", "file:///dst", 
overwrite=False)
+            with patch('builtins.open', side_effect=OSError("Permission 
denied")):
+                with self.assertRaises(OSError):
+                    file_io.new_output_stream("file:///some/path/file.txt")
 
-            with patch.object(file_io, 'read_file_utf8', 
side_effect=Exception("Read error")):
+            mock_path = MagicMock(spec=Path)
+            mock_path.is_dir.side_effect = OSError("Permission denied")
+            with patch.object(file_io, '_to_file', return_value=mock_path):
                 with self.assertRaises(OSError):
-                    file_io.read_overwritten_file_utf8("file:///some/path")
+                    file_io.check_or_mkdirs("file:///some/path")
 
-            mock_filesystem.get_file_info.side_effect = OSError("Network 
error")
-            file_io.filesystem = mock_filesystem
+            with patch('builtins.open', side_effect=OSError("Permission 
denied")):
+                with self.assertRaises(OSError):
+                    file_io.write_file("file:///some/path", "content", 
overwrite=False)
 
-            with self.assertRaises(OSError):
-                file_io.rename("file:///src", "file:///dst")
+            with patch('builtins.open', side_effect=OSError("Permission 
denied")):
+                with self.assertRaises(OSError):
+                    file_io.copy_file("file:///src", "file:///dst", 
overwrite=False)
+
+            with patch.object(file_io, 'exists', return_value=True):
+                with patch.object(file_io, 'read_file_utf8', 
side_effect=OSError("Read error")):
+                    with self.assertRaises(OSError) as context:
+                        file_io.read_overwritten_file_utf8("file:///some/path")
+                    self.assertIn("Read error", str(context.exception))
+
+            # rename() catches OSError and returns False (consistent with Java 
implementation)
+            mock_src_path = MagicMock(spec=Path)
+            mock_dst_path = MagicMock(spec=Path)
+            mock_dst_path.parent = MagicMock()
+            mock_dst_path.parent.exists.return_value = True
+            mock_dst_path.exists.return_value = False
+            mock_src_path.rename.side_effect = OSError("Network error")
+            with patch.object(file_io, '_to_file', side_effect=[mock_src_path, 
mock_dst_path]):
+                # rename() catches OSError and returns False, doesn't raise
+                result = file_io.rename("file:///src", "file:///dst")
+                self.assertFalse(result, "rename() should return False when 
OSError occurs")
 
             file_io.delete_quietly("file:///some/path")
             file_io.delete_directory_quietly("file:///some/path")
@@ -208,7 +220,7 @@ class FileIOTest(unittest.TestCase):
         temp_dir = tempfile.mkdtemp(prefix="file_io_delete_test_")
         try:
             warehouse_path = f"file://{temp_dir}"
-            file_io = FileIO(warehouse_path, {})
+            file_io = LocalFileIO(warehouse_path, Options({}))
 
             test_dir = os.path.join(temp_dir, "test_dir")
             os.makedirs(test_dir)
@@ -226,7 +238,7 @@ class FileIOTest(unittest.TestCase):
         temp_dir = tempfile.mkdtemp(prefix="file_io_delete_test_")
         try:
             warehouse_path = f"file://{temp_dir}"
-            file_io = FileIO(warehouse_path, {})
+            file_io = LocalFileIO(warehouse_path, Options({}))
 
             result = file_io.delete(f"file://{temp_dir}/nonexistent.txt")
             self.assertFalse(result, "delete() should return False when file 
does not exist")
@@ -240,7 +252,7 @@ class FileIOTest(unittest.TestCase):
         temp_dir = tempfile.mkdtemp(prefix="file_io_mkdirs_test_")
         try:
             warehouse_path = f"file://{temp_dir}"
-            file_io = FileIO(warehouse_path, {})
+            file_io = LocalFileIO(warehouse_path, Options({}))
 
             test_file = os.path.join(temp_dir, "test_file.txt")
             with open(test_file, "w") as f:
@@ -256,7 +268,7 @@ class FileIOTest(unittest.TestCase):
         temp_dir = tempfile.mkdtemp(prefix="file_io_rename_test_")
         try:
             warehouse_path = f"file://{temp_dir}"
-            file_io = FileIO(warehouse_path, {})
+            file_io = LocalFileIO(warehouse_path, Options({}))
 
             src_file = os.path.join(temp_dir, "src.txt")
             dst_file = os.path.join(temp_dir, "dst.txt")
@@ -274,7 +286,7 @@ class FileIOTest(unittest.TestCase):
         temp_dir = tempfile.mkdtemp(prefix="file_io_get_file_status_test_")
         try:
             warehouse_path = f"file://{temp_dir}"
-            file_io = FileIO(warehouse_path, {})
+            file_io = LocalFileIO(warehouse_path, Options({}))
 
             with self.assertRaises(FileNotFoundError) as context:
                 file_io.get_file_status(f"file://{temp_dir}/nonexistent.txt")
@@ -302,7 +314,7 @@ class FileIOTest(unittest.TestCase):
         temp_dir = tempfile.mkdtemp(prefix="file_io_copy_test_")
         try:
             warehouse_path = f"file://{temp_dir}"
-            file_io = FileIO(warehouse_path, {})
+            file_io = LocalFileIO(warehouse_path, Options({}))
 
             source_file = os.path.join(temp_dir, "source.txt")
             target_file = os.path.join(temp_dir, "target.txt")
@@ -338,22 +350,29 @@ class FileIOTest(unittest.TestCase):
     def test_try_to_write_atomic(self):
         temp_dir = tempfile.mkdtemp(prefix="file_io_try_write_atomic_test_")
         try:
-            warehouse_path = f"file://{temp_dir}"
-            file_io = FileIO(warehouse_path, {})
-
             target_dir = os.path.join(temp_dir, "target_dir")
+            normal_file = os.path.join(temp_dir, "normal_file.txt")
+            
+            from pypaimon.filesystem.local_file_io import LocalFileIO
+            local_file_io = LocalFileIO(f"file://{temp_dir}", Options({}))
             os.makedirs(target_dir)
+            self.assertFalse(
+                local_file_io.try_to_write_atomic(f"file://{target_dir}", 
"test content"),
+                "LocalFileIO should return False when target is a directory")
+            self.assertEqual(len(os.listdir(target_dir)), 0, "No file should 
be created inside the directory")
             
-            result = file_io.try_to_write_atomic(f"file://{target_dir}", "test 
content")
-            self.assertFalse(result, "try_to_write_atomic should return False 
when target is a directory")
+            
self.assertTrue(local_file_io.try_to_write_atomic(f"file://{normal_file}", 
"test content"))
+            with open(normal_file, "r") as f:
+                self.assertEqual(f.read(), "test content")
             
-            self.assertTrue(os.path.isdir(target_dir))
+            os.remove(normal_file)
+            local_file_io = LocalFileIO(f"file://{temp_dir}", Options({}))
+            self.assertFalse(
+                local_file_io.try_to_write_atomic(f"file://{target_dir}", 
"test content"),
+                "LocalFileIO should return False when target is a directory")
             self.assertEqual(len(os.listdir(target_dir)), 0, "No file should 
be created inside the directory")
             
-            normal_file = os.path.join(temp_dir, "normal_file.txt")
-            result = file_io.try_to_write_atomic(f"file://{normal_file}", 
"test content")
-            self.assertTrue(result, "try_to_write_atomic should succeed for a 
normal file path")
-            self.assertTrue(os.path.exists(normal_file))
+            
self.assertTrue(local_file_io.try_to_write_atomic(f"file://{normal_file}", 
"test content"))
             with open(normal_file, "r") as f:
                 self.assertEqual(f.read(), "test content")
         finally:
diff --git a/paimon-python/pypaimon/tests/filesystem_catalog_test.py 
b/paimon-python/pypaimon/tests/filesystem_catalog_test.py
index 6bdba47b81..fe2c19f18a 100644
--- a/paimon-python/pypaimon/tests/filesystem_catalog_test.py
+++ b/paimon-python/pypaimon/tests/filesystem_catalog_test.py
@@ -1,5 +1,4 @@
 # Licensed to the Apache Software Foundation (ASF) under one
-# Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
 # regarding copyright ownership.  The ASF licenses this file
@@ -181,11 +180,21 @@ class FileSystemCatalogTest(unittest.TestCase):
         with self.assertRaises(DatabaseNotExistException):
             catalog.get_database("nonexistent_db")
 
-        mock_filesystem = MagicMock()
-        mock_filesystem.get_file_info.side_effect = OSError("Permission 
denied")
-        catalog.file_io.filesystem = mock_filesystem
+        catalog.create_database("test_db", False)
+
+        # FileSystemCatalog has file_io attribute
+        from pypaimon.catalog.filesystem_catalog import FileSystemCatalog
+        self.assertIsInstance(catalog, FileSystemCatalog)
+        filesystem_catalog = catalog  # type: FileSystemCatalog
+        
+        original_exists = filesystem_catalog.file_io.exists
+        filesystem_catalog.file_io.exists = 
MagicMock(side_effect=OSError("Permission denied"))
 
+        # Now get_database should propagate OSError, not 
DatabaseNotExistException
         with self.assertRaises(OSError) as context:
             catalog.get_database("test_db")
         self.assertIn("Permission denied", str(context.exception))
         self.assertNotIsInstance(context.exception, DatabaseNotExistException)
+        
+        # Restore original method
+        filesystem_catalog.file_io.exists = original_exists
diff --git a/paimon-python/pypaimon/tests/lance_utils_test.py 
b/paimon-python/pypaimon/tests/lance_utils_test.py
index 2676414d8b..71b7b2d571 100644
--- a/paimon-python/pypaimon/tests/lance_utils_test.py
+++ b/paimon-python/pypaimon/tests/lance_utils_test.py
@@ -20,7 +20,7 @@ import unittest
 
 from pypaimon.common.options import Options
 from pypaimon.common.options.config import OssOptions
-from pypaimon.common.file_io import FileIO
+from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO
 from pypaimon.read.reader.lance_utils import to_lance_specified
 
 
@@ -35,7 +35,7 @@ class LanceUtilsTest(unittest.TestCase):
             OssOptions.OSS_ACCESS_KEY_SECRET.key(): "test-secret",
         })
 
-        file_io = FileIO(file_path, properties)
+        file_io = PyArrowFileIO(file_path, properties)
         file_path_for_lance, storage_options = to_lance_specified(file_io, 
file_path)
 
         self.assertEqual(
@@ -61,7 +61,7 @@ class LanceUtilsTest(unittest.TestCase):
             OssOptions.OSS_SECURITY_TOKEN.key(): "test-token",
         })
 
-        file_io = FileIO(file_path, properties)
+        file_io = PyArrowFileIO(file_path, properties)
         file_path_for_lance, storage_options = to_lance_specified(file_io, 
file_path)
 
         self.assertEqual(file_path_for_lance, 
"oss://my-bucket/path/to/file.lance")
diff --git a/paimon-python/pypaimon/tests/py36/ao_simple_test.py 
b/paimon-python/pypaimon/tests/py36/ao_simple_test.py
index 183a3a72a6..f6bcfa9767 100644
--- a/paimon-python/pypaimon/tests/py36/ao_simple_test.py
+++ b/paimon-python/pypaimon/tests/py36/ao_simple_test.py
@@ -26,7 +26,7 @@ from pypaimon.catalog.catalog_exception import 
(DatabaseAlreadyExistException,
                                                 TableNotExistException)
 from pypaimon.common.options import Options
 from pypaimon.common.options.config import OssOptions
-from pypaimon.common.file_io import FileIO
+from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO
 from pypaimon.tests.py36.pyarrow_compat import table_sort_by
 from pypaimon.tests.rest.rest_base_test import RESTBaseTest
 
@@ -404,19 +404,19 @@ class AOSimpleTest(RESTBaseTest):
 
         with patch("pypaimon.common.file_io.pyarrow.__version__", "6.0.0"), \
                 patch("pyarrow.fs.S3FileSystem") as mock_s3fs:
-            FileIO("oss://oss-bucket/paimon-database/paimon-table", 
Options(props))
+            PyArrowFileIO("oss://oss-bucket/paimon-database/paimon-table", 
Options(props))
             mock_s3fs.assert_called_once_with(access_key="AKID",
                                               secret_key="SECRET",
                                               session_token="TOKEN",
                                               region="cn-hangzhou",
                                               endpoint_override="oss-bucket." 
+ props[OssOptions.OSS_ENDPOINT.key()])
-            FileIO("oss://oss-bucket.endpoint/paimon-database/paimon-table", 
Options(props))
+            
PyArrowFileIO("oss://oss-bucket.endpoint/paimon-database/paimon-table", 
Options(props))
             mock_s3fs.assert_called_with(access_key="AKID",
                                          secret_key="SECRET",
                                          session_token="TOKEN",
                                          region="cn-hangzhou",
                                          endpoint_override="oss-bucket." + 
props[OssOptions.OSS_ENDPOINT.key()])
-            
FileIO("oss://access_id:secret_key@Endpoint/oss-bucket/paimon-database/paimon-table",
 Options(props))
+            
PyArrowFileIO("oss://access_id:secret_key@Endpoint/oss-bucket/paimon-database/paimon-table",
 Options(props))
             mock_s3fs.assert_called_with(access_key="AKID",
                                          secret_key="SECRET",
                                          session_token="TOKEN",
diff --git a/paimon-python/pypaimon/tests/rest/rest_server.py 
b/paimon-python/pypaimon/tests/rest/rest_server.py
index e0bedeac1b..d556f7f55c 100755
--- a/paimon-python/pypaimon/tests/rest/rest_server.py
+++ b/paimon-python/pypaimon/tests/rest/rest_server.py
@@ -800,7 +800,7 @@ class RESTCatalogServer:
         from pypaimon.common.options import Options
         warehouse_path = str(Path(self.data_path) / self.warehouse)
         options = Options({"warehouse": warehouse_path})
-        return FileIO(warehouse_path, options)
+        return FileIO.get(warehouse_path, options)
 
     def _create_table_metadata(self, identifier: Identifier, schema_id: int,
                                schema: Schema, uuid_str: str, is_external: 
bool) -> TableMetadata:
diff --git a/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py 
b/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py
index 07e445b12a..47ea8e6cb6 100644
--- a/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py
+++ b/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py
@@ -18,16 +18,15 @@
 import os
 import pickle
 import tempfile
-import time
 import unittest
 from unittest.mock import patch
 
 from pypaimon.catalog.rest.rest_token_file_io import RESTTokenFileIO
 
-from pypaimon.common.file_io import FileIO
 from pypaimon.common.identifier import Identifier
 from pypaimon.common.options import Options
 from pypaimon.common.options.config import CatalogOptions, OssOptions
+from pypaimon.filesystem.local_file_io import LocalFileIO
 
 
 class RESTTokenFileIOTest(unittest.TestCase):
@@ -92,6 +91,30 @@ class RESTTokenFileIOTest(unittest.TestCase):
             finally:
                 os.chdir(original_cwd)
 
+    def test_try_to_write_atomic_directory_check(self):
+        with patch.object(RESTTokenFileIO, 'try_to_refresh_token'):
+            file_io = RESTTokenFileIO(
+                self.identifier,
+                self.warehouse_path,
+                self.catalog_options
+            )
+
+            target_dir = os.path.join(self.temp_dir, "target_dir")
+            os.makedirs(target_dir)
+            
+            result = file_io.try_to_write_atomic(f"file://{target_dir}", "test 
content")
+            self.assertFalse(result, "try_to_write_atomic should return False 
when target is a directory")
+            
+            self.assertTrue(os.path.isdir(target_dir))
+            self.assertEqual(len(os.listdir(target_dir)), 0, "No file should 
be created inside the directory")
+            
+            normal_file = os.path.join(self.temp_dir, "normal_file.txt")
+            result = file_io.try_to_write_atomic(f"file://{normal_file}", 
"test content")
+            self.assertTrue(result, "try_to_write_atomic should succeed for a 
normal file path")
+            self.assertTrue(os.path.exists(normal_file))
+            with open(normal_file, "r") as f:
+                self.assertEqual(f.read(), "test content")
+
     def test_new_output_stream_behavior_matches_parent(self):
         """Test that RESTTokenFileIO.new_output_stream behaves like 
FileIO.new_output_stream."""
         with patch.object(RESTTokenFileIO, 'try_to_refresh_token'):
@@ -100,7 +123,7 @@ class RESTTokenFileIOTest(unittest.TestCase):
                 self.warehouse_path,
                 self.catalog_options
             )
-            regular_file_io = FileIO(self.warehouse_path, self.catalog_options)
+            regular_file_io = LocalFileIO(self.warehouse_path, 
self.catalog_options)
 
             test_file_path = f"file://{self.temp_dir}/comparison/test.txt"
             test_content = b"comparison content"
@@ -195,9 +218,7 @@ class RESTTokenFileIOTest(unittest.TestCase):
 
     def test_catalog_options_not_modified(self):
         from pypaimon.api.rest_util import RESTUtil
-        from pypaimon.catalog.rest.rest_token import RESTToken
-        from pyarrow.fs import LocalFileSystem
-        
+
         original_catalog_options = Options({
             CatalogOptions.URI.key(): "http://test-uri";,
             "custom.key": "custom.value"
@@ -217,10 +238,8 @@ class RESTTokenFileIOTest(unittest.TestCase):
                 OssOptions.OSS_ACCESS_KEY_SECRET.key(): "token-secret-key",
                 OssOptions.OSS_ENDPOINT.key(): "token-endpoint"
             }
-            file_io.token = RESTToken(token_dict, int(time.time() * 1000) + 
3600000)
             
-            with patch.object(FileIO, '_initialize_oss_fs', 
return_value=LocalFileSystem()):
-                file_io._initialize_oss_fs("file:///test/path")
+            merged_token = 
file_io._merge_token_with_catalog_options(token_dict)
             
             self.assertEqual(
                 original_catalog_options.to_map(),
@@ -230,7 +249,7 @@ class RESTTokenFileIOTest(unittest.TestCase):
             
             merged_properties = RESTUtil.merge(
                 original_catalog_options.to_map(),
-                file_io._merge_token_with_catalog_options(token_dict)
+                merged_token
             )
             
             self.assertIn("custom.key", merged_properties)
@@ -238,6 +257,53 @@ class RESTTokenFileIOTest(unittest.TestCase):
             self.assertIn(OssOptions.OSS_ACCESS_KEY_ID.key(), 
merged_properties)
             
self.assertEqual(merged_properties[OssOptions.OSS_ACCESS_KEY_ID.key()], 
"token-access-key")
 
+    def test_filesystem_property(self):
+        with patch.object(RESTTokenFileIO, 'try_to_refresh_token'):
+            file_io = RESTTokenFileIO(
+                self.identifier,
+                self.warehouse_path,
+                self.catalog_options
+            )
+            
+            self.assertTrue(hasattr(file_io, 'filesystem'), "RESTTokenFileIO 
should have filesystem property")
+            filesystem = file_io.filesystem
+            self.assertIsNotNone(filesystem, "filesystem should not be None")
+            
+            self.assertTrue(hasattr(filesystem, 'open_input_file'),
+                            "filesystem should support open_input_file method")
+
+    def test_uri_reader_factory_property(self):
+        with patch.object(RESTTokenFileIO, 'try_to_refresh_token'):
+            file_io = RESTTokenFileIO(
+                self.identifier,
+                self.warehouse_path,
+                self.catalog_options
+            )
+            
+            self.assertTrue(hasattr(file_io, 'uri_reader_factory'),
+                            "RESTTokenFileIO should have uri_reader_factory 
property")
+            uri_reader_factory = file_io.uri_reader_factory
+            self.assertIsNotNone(uri_reader_factory, "uri_reader_factory 
should not be None")
+            
+            self.assertTrue(hasattr(uri_reader_factory, 'create'),
+                            "uri_reader_factory should support create method")
+
+    def test_filesystem_and_uri_reader_factory_after_serialization(self):
+        with patch.object(RESTTokenFileIO, 'try_to_refresh_token'):
+            original_file_io = RESTTokenFileIO(
+                self.identifier,
+                self.warehouse_path,
+                self.catalog_options
+            )
+            
+            pickled = pickle.dumps(original_file_io)
+            restored_file_io = pickle.loads(pickled)
+            
+            self.assertIsNotNone(restored_file_io.filesystem,
+                                 "filesystem should work after 
deserialization")
+            self.assertIsNotNone(restored_file_io.uri_reader_factory,
+                                 "uri_reader_factory should work after 
deserialization")
+
 
 if __name__ == '__main__':
     unittest.main()

Reply via email to