This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 866dd0bd03 [Python] Support filesystem catalog for PyPaimon (#5986)
866dd0bd03 is described below

commit 866dd0bd039e4ad3a8da3725f5b01e86f9b64078
Author: ChengHui Chen <[email protected]>
AuthorDate: Wed Jul 30 14:51:37 2025 +0800

    [Python] Support filesystem catalog for PyPaimon (#5986)
---
 paimon-python/pypaimon/__init__.py                 |  12 +
 paimon-python/pypaimon/api/__init__.py             |  10 +-
 paimon-python/pypaimon/api/api_response.py         |   4 +-
 paimon-python/pypaimon/api/api_resquest.py         |   2 +-
 paimon-python/pypaimon/api/auth.py                 |   8 +-
 paimon-python/pypaimon/api/client.py               |   2 +-
 paimon-python/pypaimon/api/config.py               |   3 +-
 paimon-python/pypaimon/api/core_options.py         |   2 +
 paimon-python/pypaimon/api/identifier.py           |   2 +-
 paimon-python/pypaimon/api/token_loader.py         |  22 +-
 paimon-python/pypaimon/catalog/catalog.py          |  20 +-
 .../pypaimon/catalog/catalog_exception.py          | 156 +++++++
 .../file_io.py => catalog/catalog_factory.py}      |  33 +-
 paimon-python/pypaimon/catalog/catalog_utils.py    |   1 -
 .../pypaimon/catalog/filesystem_catalog.py         | 103 +++++
 paimon-python/pypaimon/common/file_io.py           |   4 +
 .../pypaimon/{api => common}/rest_json.py          |   0
 paimon-python/pypaimon/pvfs/__init__.py            |   8 +-
 paimon-python/pypaimon/rest/rest_catalog.py        |  18 +-
 .../pypaimon/{api => schema}/data_types.py         |   0
 paimon-python/pypaimon/schema/schema.py            |   4 +-
 paimon-python/pypaimon/schema/schema_manager.py    |   2 +-
 paimon-python/pypaimon/schema/table_schema.py      |   6 +-
 .../{common/file_io.py => table/bucket_mode.py}    |  24 +-
 paimon-python/pypaimon/table/file_store_table.py   |  18 +-
 .../{common/file_io.py => table/row/__init__.py}   |  20 -
 paimon-python/pypaimon/table/row/binary_row.py     | 469 +++++++++++++++++++++
 .../file_io.py => table/row/internal_row.py}       |  43 +-
 paimon-python/pypaimon/table/row/key_value.py      |  57 +++
 paimon-python/pypaimon/table/row/offset_row.py     |  58 +++
 paimon-python/pypaimon/table/row/row_kind.py       |  61 +++
 paimon-python/pypaimon/tests/api_test.py           |  14 +-
 paimon-python/pypaimon/tests/pvfs_test.py          |   2 +-
 paimon-python/pypaimon/tests/rest_server.py        | 141 +------
 34 files changed, 1074 insertions(+), 255 deletions(-)

diff --git a/paimon-python/pypaimon/__init__.py 
b/paimon-python/pypaimon/__init__.py
index a67d5ea255..f5a9106115 100644
--- a/paimon-python/pypaimon/__init__.py
+++ b/paimon-python/pypaimon/__init__.py
@@ -14,3 +14,15 @@
 #  KIND, either express or implied.  See the License for the
 #  specific language governing permissions and limitations
 #  under the License.
+
+from .schema.schema import Schema
+from .catalog.catalog import Catalog
+from .catalog.database import Database
+from .table.table import Table
+
+__all__ = [
+    'Schema',
+    'Catalog',
+    'Database',
+    'Table',
+]
diff --git a/paimon-python/pypaimon/api/__init__.py 
b/paimon-python/pypaimon/api/__init__.py
index 3c8d1fb2ef..2b35cd12df 100644
--- a/paimon-python/pypaimon/api/__init__.py
+++ b/paimon-python/pypaimon/api/__init__.py
@@ -31,7 +31,7 @@ from pypaimon.api.api_response import (
 )
 from pypaimon.api.api_resquest import CreateDatabaseRequest, 
AlterDatabaseRequest, RenameTableRequest, \
     CreateTableRequest
-from pypaimon.api.config import RESTCatalogOptions
+from pypaimon.api.config import CatalogOptions
 from pypaimon.api.client import HttpClient
 from pypaimon.api.identifier import Identifier
 from pypaimon.api.typedef import T
@@ -89,7 +89,7 @@ class ResourcePaths:
     @classmethod
     def for_catalog_properties(
             cls, options: dict[str, str]) -> "ResourcePaths":
-        prefix = options.get(RESTCatalogOptions.PREFIX, "")
+        prefix = options.get(CatalogOptions.PREFIX, "")
         return cls(prefix)
 
     @staticmethod
@@ -131,15 +131,15 @@ class RESTApi:
 
     def __init__(self, options: Dict[str, str], config_required: bool = True):
         self.logger = logging.getLogger(self.__class__.__name__)
-        self.client = HttpClient(options.get(RESTCatalogOptions.URI))
+        self.client = HttpClient(options.get(CatalogOptions.URI))
         auth_provider = AuthProviderFactory.create_auth_provider(options)
         base_headers = RESTUtil.extract_prefix_map(options, self.HEADER_PREFIX)
 
         if config_required:
-            warehouse = options.get(RESTCatalogOptions.WAREHOUSE)
+            warehouse = options.get(CatalogOptions.WAREHOUSE)
             query_params = {}
             if warehouse:
-                query_params[RESTCatalogOptions.WAREHOUSE] = 
RESTUtil.encode_string(
+                query_params[CatalogOptions.WAREHOUSE] = 
RESTUtil.encode_string(
                     warehouse)
 
             config_response = self.client.get_with_params(
diff --git a/paimon-python/pypaimon/api/api_response.py 
b/paimon-python/pypaimon/api/api_response.py
index b3ec148e94..b50754b3cf 100644
--- a/paimon-python/pypaimon/api/api_response.py
+++ b/paimon-python/pypaimon/api/api_response.py
@@ -20,9 +20,9 @@ from abc import ABC, abstractmethod
 from typing import Dict, Optional, Generic, List
 from dataclasses import dataclass
 
-from .rest_json import json_field
-from pypaimon.schema.schema import Schema
+from pypaimon.common.rest_json import json_field
 from .typedef import T
+from .. import Schema
 
 
 @dataclass
diff --git a/paimon-python/pypaimon/api/api_resquest.py 
b/paimon-python/pypaimon/api/api_resquest.py
index 3ec7c12edc..1ddc3ebe5a 100644
--- a/paimon-python/pypaimon/api/api_resquest.py
+++ b/paimon-python/pypaimon/api/api_resquest.py
@@ -22,7 +22,7 @@ from typing import Dict, List
 
 from .api_response import Schema
 from .identifier import Identifier
-from .rest_json import json_field
+from pypaimon.common.rest_json import json_field
 
 
 class RESTRequest(ABC):
diff --git a/paimon-python/pypaimon/api/auth.py 
b/paimon-python/pypaimon/api/auth.py
index dc51df0740..393fa15920 100644
--- a/paimon-python/pypaimon/api/auth.py
+++ b/paimon-python/pypaimon/api/auth.py
@@ -27,7 +27,7 @@ from typing import Optional, Dict
 
 from .token_loader import DLFTokenLoader, DLFToken
 from .typedef import RESTAuthParameter
-from .config import RESTCatalogOptions
+from .config import CatalogOptions
 
 
 class AuthProvider(ABC):
@@ -61,13 +61,13 @@ class AuthProviderFactory:
 
     @staticmethod
     def create_auth_provider(options: Dict[str, str]) -> AuthProvider:
-        provider = options.get(RESTCatalogOptions.TOKEN_PROVIDER)
+        provider = options.get(CatalogOptions.TOKEN_PROVIDER)
         if provider == 'bear':
-            token = options.get(RESTCatalogOptions.TOKEN)
+            token = options.get(CatalogOptions.TOKEN)
             return BearTokenAuthProvider(token)
         elif provider == 'dlf':
             return DLFAuthProvider(
-                options.get(RESTCatalogOptions.DLF_REGION),
+                options.get(CatalogOptions.DLF_REGION),
                 DLFToken.from_options(options)
             )
         raise ValueError('Unknown auth provider')
diff --git a/paimon-python/pypaimon/api/client.py 
b/paimon-python/pypaimon/api/client.py
index 7f7203a1c5..c318809a73 100644
--- a/paimon-python/pypaimon/api/client.py
+++ b/paimon-python/pypaimon/api/client.py
@@ -30,7 +30,7 @@ from urllib3 import Retry
 
 from .typedef import RESTAuthParameter
 from .api_response import ErrorResponse
-from .rest_json import JSON
+from pypaimon.common.rest_json import JSON
 
 T = TypeVar('T', bound='RESTResponse')
 
diff --git a/paimon-python/pypaimon/api/config.py 
b/paimon-python/pypaimon/api/config.py
index 54f7a8bc49..2a33182993 100644
--- a/paimon-python/pypaimon/api/config.py
+++ b/paimon-python/pypaimon/api/config.py
@@ -22,8 +22,9 @@ class OssOptions:
     OSS_ENDPOINT = "fs.oss.endpoint"
 
 
-class RESTCatalogOptions:
+class CatalogOptions:
     URI = "uri"
+    METASTORE = "metastore"
     WAREHOUSE = "warehouse"
     TOKEN_PROVIDER = "token.provider"
     TOKEN = "token"
diff --git a/paimon-python/pypaimon/api/core_options.py 
b/paimon-python/pypaimon/api/core_options.py
index 2745d1c281..18d2fba291 100644
--- a/paimon-python/pypaimon/api/core_options.py
+++ b/paimon-python/pypaimon/api/core_options.py
@@ -42,3 +42,5 @@ class CoreOptions(str, Enum):
     FILE_COMPRESSION_PER_LEVEL = "file.compression.per.level"
     FILE_FORMAT_PER_LEVEL = "file.format.per.level"
     FILE_BLOCK_SIZE = "file.block-size"
+    # Scan options
+    SCAN_FALLBACK_BRANCH = "scan.fallback-branch"
diff --git a/paimon-python/pypaimon/api/identifier.py 
b/paimon-python/pypaimon/api/identifier.py
index 0d280bbf21..3a27870516 100644
--- a/paimon-python/pypaimon/api/identifier.py
+++ b/paimon-python/pypaimon/api/identifier.py
@@ -18,7 +18,7 @@
 from dataclasses import dataclass
 from typing import Optional
 
-from pypaimon.api.rest_json import json_field
+from pypaimon.common.rest_json import json_field
 
 SYSTEM_TABLE_SPLITTER = '$'
 SYSTEM_BRANCH_PREFIX = 'branch-'
diff --git a/paimon-python/pypaimon/api/token_loader.py 
b/paimon-python/pypaimon/api/token_loader.py
index 223490646c..9214345fce 100644
--- a/paimon-python/pypaimon/api/token_loader.py
+++ b/paimon-python/pypaimon/api/token_loader.py
@@ -24,8 +24,8 @@ import requests
 from requests.adapters import HTTPAdapter
 from requests.exceptions import RequestException
 
-from .rest_json import json_field, JSON
-from .config import RESTCatalogOptions
+from pypaimon.common.rest_json import json_field, JSON
+from .config import CatalogOptions
 from .client import ExponentialRetry
 
 
@@ -59,15 +59,15 @@ class DLFToken:
 
     @classmethod
     def from_options(cls, options: Dict[str, str]) -> Optional['DLFToken']:
-        from .config import RESTCatalogOptions
-        if (options.get(RESTCatalogOptions.DLF_ACCESS_KEY_ID) is None
-                or options.get(RESTCatalogOptions.DLF_ACCESS_KEY_SECRET) is 
None):
+        from .config import CatalogOptions
+        if (options.get(CatalogOptions.DLF_ACCESS_KEY_ID) is None
+                or options.get(CatalogOptions.DLF_ACCESS_KEY_SECRET) is None):
             return None
         else:
             return cls(
-                
access_key_id=options.get(RESTCatalogOptions.DLF_ACCESS_KEY_ID),
-                
access_key_secret=options.get(RESTCatalogOptions.DLF_ACCESS_KEY_SECRET),
-                
security_token=options.get(RESTCatalogOptions.DLF_ACCESS_SECURITY_TOKEN)
+                access_key_id=options.get(CatalogOptions.DLF_ACCESS_KEY_ID),
+                
access_key_secret=options.get(CatalogOptions.DLF_ACCESS_KEY_SECRET),
+                
security_token=options.get(CatalogOptions.DLF_ACCESS_SECURITY_TOKEN)
             )
 
 
@@ -204,12 +204,12 @@ class DLFTokenLoaderFactory:
     @staticmethod
     def create_token_loader(options: Dict[str, str]) -> 
Optional['DLFTokenLoader']:
         """Create ECS token loader"""
-        loader = options.get(RESTCatalogOptions.DLF_TOKEN_LOADER)
+        loader = options.get(CatalogOptions.DLF_TOKEN_LOADER)
         if loader == 'ecs':
             ecs_metadata_url = options.get(
-                RESTCatalogOptions.DLF_TOKEN_ECS_METADATA_URL,
+                CatalogOptions.DLF_TOKEN_ECS_METADATA_URL,
                 
'http://100.100.100.200/latest/meta-data/Ram/security-credentials/'
             )
-            role_name = options.get(RESTCatalogOptions.DLF_TOKEN_ECS_ROLE_NAME)
+            role_name = options.get(CatalogOptions.DLF_TOKEN_ECS_ROLE_NAME)
             return DLFECSTokenLoader(ecs_metadata_url, role_name)
         return None
diff --git a/paimon-python/pypaimon/catalog/catalog.py 
b/paimon-python/pypaimon/catalog/catalog.py
index c133b362c5..f140f53852 100644
--- a/paimon-python/pypaimon/catalog/catalog.py
+++ b/paimon-python/pypaimon/catalog/catalog.py
@@ -17,7 +17,10 @@
 
#################################################################################
 
 from abc import ABC, abstractmethod
-from typing import Optional
+from typing import Optional, Union
+
+from pypaimon import Schema
+from pypaimon.api import Identifier
 
 
 class Catalog(ABC):
@@ -25,13 +28,26 @@ class Catalog(ABC):
     This interface is responsible for reading and writing
     metadata such as database/table from a paimon catalog.
     """
+    DB_SUFFIX = ".db"
+    DEFAULT_DATABASE = "default"
     SYSTEM_DATABASE_NAME = "sys"
+
     DB_LOCATION_PROP = "location"
+    COMMENT_PROP = "comment"
+    OWNER_PROP = "owner"
 
     @abstractmethod
     def get_database(self, name: str) -> 'Database':
         """Get paimon database identified by the given name."""
 
     @abstractmethod
-    def create_database(self, name: str, properties: Optional[dict] = None):
+    def create_database(self, name: str, ignore_if_exists: bool, properties: 
Optional[dict] = None):
         """Create a database with properties."""
+
+    @abstractmethod
+    def get_table(self, identifier: Union[str, Identifier]) -> 'Table':
+        """Get paimon table identified by the given Identifier."""
+
+    @abstractmethod
+    def create_table(self, identifier: Union[str, Identifier], schema: Schema, 
ignore_if_exists: bool):
+        """Create table with schema."""
diff --git a/paimon-python/pypaimon/catalog/catalog_exception.py 
b/paimon-python/pypaimon/catalog/catalog_exception.py
new file mode 100644
index 0000000000..c29243c504
--- /dev/null
+++ b/paimon-python/pypaimon/catalog/catalog_exception.py
@@ -0,0 +1,156 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from pypaimon.api import Identifier
+
+
+# Exception classes
+class CatalogException(Exception):
+    """Base catalog exception"""
+
+
+class DatabaseNotExistException(CatalogException):
+    """Database not exist exception"""
+
+    def __init__(self, database: str):
+        self.database = database
+        super().__init__(f"Database {database} does not exist")
+
+
+class DatabaseAlreadyExistException(CatalogException):
+    """Database already exist exception"""
+
+    def __init__(self, database: str):
+        self.database = database
+        super().__init__(f"Database {database} already exists")
+
+
+class DatabaseNoPermissionException(CatalogException):
+    """Database no permission exception"""
+
+    def __init__(self, database: str):
+        self.database = database
+        super().__init__(f"No permission to access database {database}")
+
+
+class TableNotExistException(CatalogException):
+    """Table not exist exception"""
+
+    def __init__(self, identifier: Identifier):
+        self.identifier = identifier
+        super().__init__(f"Table {identifier.get_full_name()} does not exist")
+
+
+class TableAlreadyExistException(CatalogException):
+    """Table already exist exception"""
+
+    def __init__(self, identifier: Identifier):
+        self.identifier = identifier
+        super().__init__(f"Table {identifier.get_full_name()} already exists")
+
+
+class TableNoPermissionException(CatalogException):
+    """Table no permission exception"""
+
+    def __init__(self, identifier: Identifier):
+        self.identifier = identifier
+        super().__init__(f"No permission to access table 
{identifier.get_full_name()}")
+
+
+class ViewNotExistException(CatalogException):
+    """View not exist exception"""
+
+    def __init__(self, identifier: Identifier):
+        self.identifier = identifier
+        super().__init__(f"View {identifier.get_full_name()} does not exist")
+
+
+class ViewAlreadyExistException(CatalogException):
+    """View already exist exception"""
+
+    def __init__(self, identifier: Identifier):
+        self.identifier = identifier
+        super().__init__(f"View {identifier.get_full_name()} already exists")
+
+
+class FunctionNotExistException(CatalogException):
+    """Function not exist exception"""
+
+    def __init__(self, identifier: Identifier):
+        self.identifier = identifier
+        super().__init__(f"Function {identifier.get_full_name()} does not 
exist")
+
+
+class FunctionAlreadyExistException(CatalogException):
+    """Function already exist exception"""
+
+    def __init__(self, identifier: Identifier):
+        self.identifier = identifier
+        super().__init__(f"Function {identifier.get_full_name()} already 
exists")
+
+
+class ColumnNotExistException(CatalogException):
+    """Column not exist exception"""
+
+    def __init__(self, column: str):
+        self.column = column
+        super().__init__(f"Column {column} does not exist")
+
+
+class ColumnAlreadyExistException(CatalogException):
+    """Column already exist exception"""
+
+    def __init__(self, column: str):
+        self.column = column
+        super().__init__(f"Column {column} already exists")
+
+
+class DefinitionNotExistException(CatalogException):
+    """Definition not exist exception"""
+
+    def __init__(self, identifier: Identifier, name: str):
+        self.identifier = identifier
+        self.name = name
+        super().__init__(f"Definition {name} does not exist in 
{identifier.get_full_name()}")
+
+
+class DefinitionAlreadyExistException(CatalogException):
+    """Definition already exist exception"""
+
+    def __init__(self, identifier: Identifier, name: str):
+        self.identifier = identifier
+        self.name = name
+        super().__init__(f"Definition {name} already exists in 
{identifier.get_full_name()}")
+
+
+class DialectNotExistException(CatalogException):
+    """Dialect not exist exception"""
+
+    def __init__(self, identifier: Identifier, dialect: str):
+        self.identifier = identifier
+        self.dialect = dialect
+        super().__init__(f"Dialect {dialect} does not exist in 
{identifier.get_full_name()}")
+
+
+class DialectAlreadyExistException(CatalogException):
+    """Dialect already exist exception"""
+
+    def __init__(self, identifier: Identifier, dialect: str):
+        self.identifier = identifier
+        self.dialect = dialect
+        super().__init__(f"Dialect {dialect} already exists in 
{identifier.get_full_name()}")
diff --git a/paimon-python/pypaimon/common/file_io.py 
b/paimon-python/pypaimon/catalog/catalog_factory.py
similarity index 55%
copy from paimon-python/pypaimon/common/file_io.py
copy to paimon-python/pypaimon/catalog/catalog_factory.py
index 9e7418966f..b5ec706ae5 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/catalog/catalog_factory.py
@@ -15,23 +15,24 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-from abc import ABC, abstractmethod
-from pathlib import Path
+from pypaimon import Catalog
+from pypaimon.api import CatalogOptions
+from pypaimon.catalog.filesystem_catalog import FileSystemCatalog
+from pypaimon.rest.rest_catalog import RESTCatalog
 
 
-class FileIO(ABC):
-    @abstractmethod
-    def exists(self, path: Path) -> bool:
-        """"""
+class CatalogFactory:
 
-    @abstractmethod
-    def read_file_utf8(self, path: Path) -> str:
-        """"""
+    CATALOG_REGISTRY = {
+        "filesystem": FileSystemCatalog,
+        "rest": RESTCatalog,
+    }
 
-    @abstractmethod
-    def try_to_write_atomic(self, path: Path, content: str) -> bool:
-        """"""
-
-    @abstractmethod
-    def list_status(self, path: Path):
-        """"""
+    @staticmethod
+    def create(catalog_options: dict) -> Catalog:
+        identifier = catalog_options.get(CatalogOptions.METASTORE, 
"filesystem")
+        catalog_class = CatalogFactory.CATALOG_REGISTRY.get(identifier)
+        if catalog_class is None:
+            raise ValueError(f"Unknown catalog identifier: {identifier}. "
+                             f"Available types: 
{list(CatalogFactory.CATALOG_REGISTRY.keys())}")
+        return catalog_class(catalog_options)
diff --git a/paimon-python/pypaimon/catalog/catalog_utils.py 
b/paimon-python/pypaimon/catalog/catalog_utils.py
index d51de0754d..d9877ce53a 100644
--- a/paimon-python/pypaimon/catalog/catalog_utils.py
+++ b/paimon-python/pypaimon/catalog/catalog_utils.py
@@ -21,7 +21,6 @@ from typing import Callable, Any
 from pypaimon.api.core_options import CoreOptions
 from pypaimon.api.identifier import Identifier
 
-from pypaimon.catalog.catalog import Catalog
 from pypaimon.catalog.table_metadata import TableMetadata
 from pypaimon.table.catalog_environment import CatalogEnvironment
 from pypaimon.table.file_store_table import FileStoreTable
diff --git a/paimon-python/pypaimon/catalog/filesystem_catalog.py 
b/paimon-python/pypaimon/catalog/filesystem_catalog.py
new file mode 100644
index 0000000000..6651eec317
--- /dev/null
+++ b/paimon-python/pypaimon/catalog/filesystem_catalog.py
@@ -0,0 +1,103 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+
+from pathlib import Path
+from typing import Optional, Union
+from urllib.parse import urlparse
+
+from pypaimon import Catalog, Database, Table
+from pypaimon.api import CatalogOptions, Identifier
+from pypaimon.api.core_options import CoreOptions
+from pypaimon.catalog.catalog_exception import TableNotExistException, 
DatabaseNotExistException, \
+    TableAlreadyExistException, DatabaseAlreadyExistException
+from pypaimon.schema.schema_manager import SchemaManager
+from pypaimon.table.file_store_table import FileStoreTable
+
+
+class FileSystemCatalog(Catalog):
+    def __init__(self, catalog_options: dict):
+        if CatalogOptions.WAREHOUSE not in catalog_options:
+            raise ValueError(f"Paimon '{CatalogOptions.WAREHOUSE}' path must 
be set")
+        self.warehouse = catalog_options.get(CatalogOptions.WAREHOUSE)
+        self.catalog_options = catalog_options
+        self.file_io = None  # FileIO(self.warehouse, self.catalog_options)
+
+    def get_database(self, name: str) -> Database:
+        if self.file_io.exists(self.get_database_path(name)):
+            return Database(name, {})
+        else:
+            raise DatabaseNotExistException(name)
+
+    def create_database(self, name: str, ignore_if_exists: bool, properties: 
Optional[dict] = None):
+        try:
+            self.get_database(name)
+            if not ignore_if_exists:
+                raise DatabaseAlreadyExistException(name)
+        except DatabaseNotExistException:
+            if properties and Catalog.DB_LOCATION_PROP in properties:
+                raise ValueError("Cannot specify location for a database when 
using fileSystem catalog.")
+            path = self.get_database_path(name)
+            self.file_io.mkdirs(path)
+
+    def get_table(self, identifier: Union[str, Identifier]) -> Table:
+        if not isinstance(identifier, Identifier):
+            identifier = Identifier.from_string(identifier)
+        if CoreOptions.SCAN_FALLBACK_BRANCH in self.catalog_options:
+            raise ValueError(CoreOptions.SCAN_FALLBACK_BRANCH)
+        table_path = self.get_table_path(identifier)
+        table_schema = self.get_table_schema(identifier)
+        return FileStoreTable(self.file_io, identifier, table_path, 
table_schema)
+
+    def create_table(self, identifier: Union[str, Identifier], schema: 
'Schema', ignore_if_exists: bool):
+        if schema.options and schema.options.get(CoreOptions.AUTO_CREATE):
+            raise ValueError(f"The value of {CoreOptions.AUTO_CREATE} property 
should be False.")
+
+        if not isinstance(identifier, Identifier):
+            identifier = Identifier.from_string(identifier)
+        self.get_database(identifier.get_database_name())
+        try:
+            self.get_table(identifier)
+            if not ignore_if_exists:
+                raise TableAlreadyExistException(identifier)
+        except TableNotExistException:
+            if schema.options and CoreOptions.TYPE in schema.options and 
schema.options.get(
+                    CoreOptions.TYPE) != "table":
+                raise ValueError(f"Table Type 
{schema.options.get(CoreOptions.TYPE)}")
+            table_path = self.get_table_path(identifier)
+            schema_manager = SchemaManager(self.file_io, table_path)
+            schema_manager.create_table(schema)
+
+    def get_table_schema(self, identifier: Identifier):
+        table_path = self.get_table_path(identifier)
+        table_schema = SchemaManager(self.file_io, table_path).latest()
+        if table_schema is None:
+            raise TableNotExistException(identifier)
+        return table_schema
+
+    def get_database_path(self, name) -> Path:
+        return self._trim_schema(self.warehouse) / f"{name}{Catalog.DB_SUFFIX}"
+
+    def get_table_path(self, identifier: Identifier) -> Path:
+        return self.get_database_path(identifier.get_database_name()) / 
identifier.get_table_name()
+
+    @staticmethod
+    def _trim_schema(warehouse_url: str) -> Path:
+        parsed = urlparse(warehouse_url)
+        bucket = parsed.netloc
+        warehouse_dir = parsed.path.lstrip('/')
+        return Path(f"{bucket}/{warehouse_dir}" if warehouse_dir else bucket)
diff --git a/paimon-python/pypaimon/common/file_io.py 
b/paimon-python/pypaimon/common/file_io.py
index 9e7418966f..38602a709b 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -35,3 +35,7 @@ class FileIO(ABC):
     @abstractmethod
     def list_status(self, path: Path):
         """"""
+
+    @abstractmethod
+    def mkdirs(self, path: Path) -> bool:
+        """"""
diff --git a/paimon-python/pypaimon/api/rest_json.py 
b/paimon-python/pypaimon/common/rest_json.py
similarity index 100%
rename from paimon-python/pypaimon/api/rest_json.py
rename to paimon-python/pypaimon/common/rest_json.py
diff --git a/paimon-python/pypaimon/pvfs/__init__.py 
b/paimon-python/pypaimon/pvfs/__init__.py
index 0351efa0eb..2b425d0251 100644
--- a/paimon-python/pypaimon/pvfs/__init__.py
+++ b/paimon-python/pypaimon/pvfs/__init__.py
@@ -33,7 +33,7 @@ from fsspec.implementations.local import LocalFileSystem
 
 from pypaimon.api import RESTApi, GetTableTokenResponse, Schema, Identifier, 
GetTableResponse
 from pypaimon.api.client import NoSuchResourceException, AlreadyExistsException
-from pypaimon.api.config import RESTCatalogOptions, OssOptions, PVFSOptions
+from pypaimon.api.config import CatalogOptions, OssOptions, PVFSOptions
 
 PROTOCOL_NAME = "pvfs"
 
@@ -117,9 +117,9 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
     _identifier_pattern = 
re.compile("^pvfs://([^/]+)/([^/]+)/([^/]+)(?:/[^/]+)*/?$")
 
     def __init__(self, options: Dict = None, **kwargs):
-        options.update({RESTCatalogOptions.HTTP_USER_AGENT_HEADER: 
'PythonPVFS'})
+        options.update({CatalogOptions.HTTP_USER_AGENT_HEADER: 'PythonPVFS'})
         self.options = options
-        self.warehouse = options.get(RESTCatalogOptions.WAREHOUSE)
+        self.warehouse = options.get(CatalogOptions.WAREHOUSE)
         cache_expired_time = (
             PVFSOptions.DEFAULT_TABLE_CACHE_TTL
             if options is None
@@ -791,7 +791,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
             rest_api = self._rest_api_cache.get(catalog)
             if rest_api is None:
                 options = self.options.copy()
-                options.update({RESTCatalogOptions.WAREHOUSE: catalog})
+                options.update({CatalogOptions.WAREHOUSE: catalog})
                 rest_api = RESTApi(options)
                 self._rest_api_cache[catalog] = rest_api
             return rest_api
diff --git a/paimon-python/pypaimon/rest/rest_catalog.py 
b/paimon-python/pypaimon/rest/rest_catalog.py
index c1ba37cd78..2b66ea9981 100644
--- a/paimon-python/pypaimon/rest/rest_catalog.py
+++ b/paimon-python/pypaimon/rest/rest_catalog.py
@@ -16,9 +16,10 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 """
 from pathlib import Path
-from typing import List, Dict, Optional
+from typing import List, Dict, Optional, Union
 
-from pypaimon.api import RESTApi, RESTCatalogOptions
+from pypaimon import Database, Catalog, Schema
+from pypaimon.api import RESTApi, CatalogOptions
 from pypaimon.api.api_response import PagedList, GetTableResponse
 from pypaimon.api.core_options import CoreOptions
 from pypaimon.api.identifier import Identifier
@@ -26,10 +27,8 @@ from pypaimon.api.options import Options
 
 from pypaimon.schema.table_schema import TableSchema
 
-from pypaimon.catalog.catalog import Catalog
 from pypaimon.catalog.catalog_context import CatalogContext
 from pypaimon.catalog.catalog_utils import CatalogUtils
-from pypaimon.catalog.database import Database
 from pypaimon.catalog.property_change import PropertyChange
 from pypaimon.catalog.table_metadata import TableMetadata
 from pypaimon.rest.rest_token_file_io import RESTTokenFileIO
@@ -41,7 +40,7 @@ class RESTCatalog(Catalog):
         self.api = RESTApi(context.options.to_map(), config_required)
         self.context = CatalogContext.create(Options(self.api.options), 
context.hadoop_conf, context.prefer_io_loader,
                                              context.fallback_io_loader)
-        self.data_token_enabled = 
self.api.options.get(RESTCatalogOptions.DATA_TOKEN_ENABLED)
+        self.data_token_enabled = 
self.api.options.get(CatalogOptions.DATA_TOKEN_ENABLED)
 
     def list_databases(self) -> List[str]:
         return self.api.list_databases()
@@ -50,7 +49,7 @@ class RESTCatalog(Catalog):
                              database_name_pattern: Optional[str] = None) -> 
PagedList[str]:
         return self.api.list_databases_paged(max_results, page_token, 
database_name_pattern)
 
-    def create_database(self, name: str, properties: Dict[str, str] = None):
+    def create_database(self, name: str, ignore_if_exists: bool, properties: 
Dict[str, str] = None):
         self.api.create_database(name, properties)
 
     def get_database(self, name: str) -> Database:
@@ -85,7 +84,9 @@ class RESTCatalog(Catalog):
             table_name_pattern
         )
 
-    def get_table(self, identifier: Identifier) -> FileStoreTable:
+    def get_table(self, identifier: Union[str, Identifier]) -> FileStoreTable:
+        if not isinstance(identifier, Identifier):
+            identifier = Identifier.from_string(identifier)
         return CatalogUtils.load_table(
             identifier,
             lambda path: self.file_io_for_data(path, identifier),
@@ -93,6 +94,9 @@ class RESTCatalog(Catalog):
             self.load_table_metadata,
         )
 
+    def create_table(self, identifier: Union[str, Identifier], schema: Schema, 
ignore_if_exists: bool):
+        raise ValueError("Not implemented")
+
     def load_table_metadata(self, identifier: Identifier) -> TableMetadata:
         response = self.api.get_table(identifier)
         return self.to_table_metadata(identifier.get_database_name(), response)
diff --git a/paimon-python/pypaimon/api/data_types.py 
b/paimon-python/pypaimon/schema/data_types.py
similarity index 100%
rename from paimon-python/pypaimon/api/data_types.py
rename to paimon-python/pypaimon/schema/data_types.py
diff --git a/paimon-python/pypaimon/schema/schema.py 
b/paimon-python/pypaimon/schema/schema.py
index 0a826bdc8e..354a9f1d21 100644
--- a/paimon-python/pypaimon/schema/schema.py
+++ b/paimon-python/pypaimon/schema/schema.py
@@ -19,8 +19,8 @@ from dataclasses import dataclass
 from typing import Optional, List, Dict
 
 import pyarrow as pa
-from pypaimon.api.data_types import DataField
-from pypaimon.api.rest_json import json_field
+from pypaimon.schema.data_types import DataField
+from pypaimon.common.rest_json import json_field
 
 
 @dataclass
diff --git a/paimon-python/pypaimon/schema/schema_manager.py 
b/paimon-python/pypaimon/schema/schema_manager.py
index 43e773c46e..bd755d1090 100644
--- a/paimon-python/pypaimon/schema/schema_manager.py
+++ b/paimon-python/pypaimon/schema/schema_manager.py
@@ -18,8 +18,8 @@
 from pathlib import Path
 from typing import Optional
 
+from pypaimon import Schema
 from pypaimon.common.file_io import FileIO
-from pypaimon.schema.schema import Schema
 from pypaimon.schema.table_schema import TableSchema
 
 
diff --git a/paimon-python/pypaimon/schema/table_schema.py 
b/paimon-python/pypaimon/schema/table_schema.py
index 11ee24ae98..7d3f6aea64 100644
--- a/paimon-python/pypaimon/schema/table_schema.py
+++ b/paimon-python/pypaimon/schema/table_schema.py
@@ -22,11 +22,11 @@ from typing import List, Dict, Optional
 
 import pyarrow
 
-from pypaimon.api import data_types
+from pypaimon import Schema
+from pypaimon.schema import data_types
 from pypaimon.api.core_options import CoreOptions
 from pypaimon.common.file_io import FileIO
-from pypaimon.schema.schema import Schema
-from pypaimon.api.data_types import DataField
+from pypaimon.schema.data_types import DataField
 
 
 class TableSchema:
diff --git a/paimon-python/pypaimon/common/file_io.py 
b/paimon-python/pypaimon/table/bucket_mode.py
similarity index 69%
copy from paimon-python/pypaimon/common/file_io.py
copy to paimon-python/pypaimon/table/bucket_mode.py
index 9e7418966f..df6c8e949d 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/table/bucket_mode.py
@@ -15,23 +15,15 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-from abc import ABC, abstractmethod
-from pathlib import Path
 
+from enum import Enum, auto
 
-class FileIO(ABC):
-    @abstractmethod
-    def exists(self, path: Path) -> bool:
-        """"""
 
-    @abstractmethod
-    def read_file_utf8(self, path: Path) -> str:
-        """"""
+class BucketMode(Enum):
+    def __str__(self):
+        return self.value
 
-    @abstractmethod
-    def try_to_write_atomic(self, path: Path, content: str) -> bool:
-        """"""
-
-    @abstractmethod
-    def list_status(self, path: Path):
-        """"""
+    HASH_FIXED = auto()
+    HASH_DYNAMIC = auto()
+    CROSS_PARTITION = auto()
+    BUCKET_UNAWARE = auto()
diff --git a/paimon-python/pypaimon/table/file_store_table.py 
b/paimon-python/pypaimon/table/file_store_table.py
index 3e8f5bb166..9f230d3463 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -18,11 +18,13 @@
 
 from pathlib import Path
 
+from pypaimon import Table
+from pypaimon.api.core_options import CoreOptions
 from pypaimon.api.identifier import Identifier
 from pypaimon.schema.table_schema import TableSchema
 from pypaimon.common.file_io import FileIO
 from pypaimon.schema.schema_manager import SchemaManager
-from pypaimon.table.table import Table
+from pypaimon.table.bucket_mode import BucketMode
 
 
 class FileStoreTable(Table):
@@ -40,3 +42,17 @@ class FileStoreTable(Table):
         self.table_schema = table_schema
         self.schema_manager = SchemaManager(file_io, table_path)
         self.is_primary_key_table = bool(self.primary_keys)
+
+    def bucket_mode(self) -> BucketMode:
+        if self.is_primary_key_table:
+            if self.primary_keys == self.partition_keys:
+                return BucketMode.CROSS_PARTITION
+            if self.options.get(CoreOptions.BUCKET, -1) == -1:
+                return BucketMode.HASH_DYNAMIC
+            else:
+                return BucketMode.HASH_FIXED
+        else:
+            if self.options.get(CoreOptions.BUCKET, -1) == -1:
+                return BucketMode.BUCKET_UNAWARE
+            else:
+                return BucketMode.HASH_FIXED
diff --git a/paimon-python/pypaimon/common/file_io.py 
b/paimon-python/pypaimon/table/row/__init__.py
similarity index 69%
copy from paimon-python/pypaimon/common/file_io.py
copy to paimon-python/pypaimon/table/row/__init__.py
index 9e7418966f..65b48d4d79 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/table/row/__init__.py
@@ -15,23 +15,3 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-from abc import ABC, abstractmethod
-from pathlib import Path
-
-
-class FileIO(ABC):
-    @abstractmethod
-    def exists(self, path: Path) -> bool:
-        """"""
-
-    @abstractmethod
-    def read_file_utf8(self, path: Path) -> str:
-        """"""
-
-    @abstractmethod
-    def try_to_write_atomic(self, path: Path, content: str) -> bool:
-        """"""
-
-    @abstractmethod
-    def list_status(self, path: Path):
-        """"""
diff --git a/paimon-python/pypaimon/table/row/binary_row.py 
b/paimon-python/pypaimon/table/row/binary_row.py
new file mode 100644
index 0000000000..aff9ffac05
--- /dev/null
+++ b/paimon-python/pypaimon/table/row/binary_row.py
@@ -0,0 +1,469 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import struct
+from dataclasses import dataclass
+from datetime import datetime, timezone, timedelta
+from decimal import Decimal
+from typing import List, Any
+
+from pypaimon.schema.data_types import DataField, DataType, AtomicType
+from pypaimon.table.row.row_kind import RowKind
+
+
+@dataclass
+class BinaryRow:
+    values: List[Any]
+    fields: List[DataField]
+    row_kind: RowKind = RowKind.INSERT
+
+    def to_dict(self):
+        return {self.fields[i].name: self.values[i] for i in 
range(len(self.fields))}
+
+
+class BinaryRowDeserializer:
+    HEADER_SIZE_IN_BITS = 8
+    MAX_FIX_PART_DATA_SIZE = 7
+    HIGHEST_FIRST_BIT = 0x80 << 56
+    HIGHEST_SECOND_TO_EIGHTH_BIT = 0x7F << 56
+
+    @classmethod
+    def from_bytes(
+            cls,
+            bytes_data: bytes,
+            data_fields: List[DataField]
+    ) -> BinaryRow:
+        if not bytes_data:
+            return BinaryRow([], data_fields)
+
+        arity = len(data_fields)
+        actual_data = bytes_data
+        if len(bytes_data) >= 4:
+            arity_from_bytes = struct.unpack('>i', bytes_data[:4])[0]
+            if 0 < arity_from_bytes < 1000:
+                actual_data = bytes_data[4:]
+
+        fields = []
+        null_bits_size_in_bytes = cls._calculate_bit_set_width_in_bytes(arity)
+        for i, data_field in enumerate(data_fields):
+            value = None
+            if not cls._is_null_at(actual_data, 0, i):
+                value = cls._parse_field_value(actual_data, 0, 
null_bits_size_in_bytes, i, data_field.type)
+            fields.append(value)
+
+        return BinaryRow(fields, data_fields, RowKind(actual_data[0]))
+
+    @classmethod
+    def _calculate_bit_set_width_in_bytes(cls, arity: int) -> int:
+        return ((arity + 63 + cls.HEADER_SIZE_IN_BITS) // 64) * 8
+
+    @classmethod
+    def _is_null_at(cls, bytes_data: bytes, offset: int, pos: int) -> bool:
+        index = pos + cls.HEADER_SIZE_IN_BITS
+        byte_index = offset + (index // 8)
+        bit_index = index % 8
+        return (bytes_data[byte_index] & (1 << bit_index)) != 0
+
+    @classmethod
+    def _parse_field_value(
+            cls,
+            bytes_data: bytes,
+            base_offset: int,
+            null_bits_size_in_bytes: int,
+            pos: int,
+            data_type: DataType
+    ) -> Any:
+        if not isinstance(data_type, AtomicType):
+            raise ValueError(f"BinaryRow only support AtomicType yet, meet 
{data_type.__class__}")
+        field_offset = base_offset + null_bits_size_in_bytes + pos * 8
+        if field_offset >= len(bytes_data):
+            raise ValueError(f"Field offset {field_offset} exceeds data length 
{len(bytes_data)}")
+        type_name = data_type.type.upper()
+
+        if type_name in ['BOOLEAN', 'BOOL']:
+            return cls._parse_boolean(bytes_data, field_offset)
+        elif type_name in ['TINYINT', 'BYTE']:
+            return cls._parse_byte(bytes_data, field_offset)
+        elif type_name in ['SMALLINT', 'SHORT']:
+            return cls._parse_short(bytes_data, field_offset)
+        elif type_name in ['INT', 'INTEGER']:
+            return cls._parse_int(bytes_data, field_offset)
+        elif type_name in ['BIGINT', 'LONG']:
+            return cls._parse_long(bytes_data, field_offset)
+        elif type_name in ['FLOAT', 'REAL']:
+            return cls._parse_float(bytes_data, field_offset)
+        elif type_name in ['DOUBLE']:
+            return cls._parse_double(bytes_data, field_offset)
+        elif type_name in ['VARCHAR', 'STRING', 'CHAR']:
+            return cls._parse_string(bytes_data, base_offset, field_offset)
+        elif type_name in ['BINARY', 'VARBINARY', 'BYTES']:
+            return cls._parse_binary(bytes_data, base_offset, field_offset)
+        elif type_name in ['DECIMAL', 'NUMERIC']:
+            return cls._parse_decimal(bytes_data, base_offset, field_offset, 
data_type)
+        elif type_name in ['TIMESTAMP', 'TIMESTAMP_WITHOUT_TIME_ZONE']:
+            return cls._parse_timestamp(bytes_data, base_offset, field_offset, 
data_type)
+        elif type_name in ['DATE']:
+            return cls._parse_date(bytes_data, field_offset)
+        elif type_name in ['TIME', 'TIME_WITHOUT_TIME_ZONE']:
+            return cls._parse_time(bytes_data, field_offset)
+        else:
+            return cls._parse_string(bytes_data, base_offset, field_offset)
+
+    @classmethod
+    def _parse_boolean(cls, bytes_data: bytes, field_offset: int) -> bool:
+        return bytes_data[field_offset] != 0
+
+    @classmethod
+    def _parse_byte(cls, bytes_data: bytes, field_offset: int) -> int:
+        return struct.unpack('<b', bytes_data[field_offset:field_offset + 
1])[0]
+
+    @classmethod
+    def _parse_short(cls, bytes_data: bytes, field_offset: int) -> int:
+        return struct.unpack('<h', bytes_data[field_offset:field_offset + 
2])[0]
+
+    @classmethod
+    def _parse_int(cls, bytes_data: bytes, field_offset: int) -> int:
+        if field_offset + 4 > len(bytes_data):
+            raise ValueError(f"Not enough bytes for INT: need 4, have 
{len(bytes_data) - field_offset}")
+        return struct.unpack('<i', bytes_data[field_offset:field_offset + 
4])[0]
+
+    @classmethod
+    def _parse_long(cls, bytes_data: bytes, field_offset: int) -> int:
+        if field_offset + 8 > len(bytes_data):
+            raise ValueError(f"Not enough bytes for LONG: need 8, have 
{len(bytes_data) - field_offset}")
+        return struct.unpack('<q', bytes_data[field_offset:field_offset + 
8])[0]
+
+    @classmethod
+    def _parse_float(cls, bytes_data: bytes, field_offset: int) -> float:
+        return struct.unpack('<f', bytes_data[field_offset:field_offset + 
4])[0]
+
+    @classmethod
+    def _parse_double(cls, bytes_data: bytes, field_offset: int) -> float:
+        if field_offset + 8 > len(bytes_data):
+            raise ValueError(f"Not enough bytes for DOUBLE: need 8, have 
{len(bytes_data) - field_offset}")
+        return struct.unpack('<d', bytes_data[field_offset:field_offset + 
8])[0]
+
+    @classmethod
+    def _parse_string(cls, bytes_data: bytes, base_offset: int, field_offset: 
int) -> str:
+        if field_offset + 8 > len(bytes_data):
+            raise ValueError(f"Not enough bytes for STRING offset: need 8, 
have {len(bytes_data) - field_offset}")
+
+        offset_and_len = struct.unpack('<q', 
bytes_data[field_offset:field_offset + 8])[0]
+        mark = offset_and_len & cls.HIGHEST_FIRST_BIT
+        if mark == 0:
+            sub_offset = (offset_and_len >> 32) & 0xFFFFFFFF
+            length = offset_and_len & 0xFFFFFFFF
+            actual_string_offset = base_offset + sub_offset
+            if actual_string_offset + length > len(bytes_data):
+                raise ValueError(
+                    f"String data out of bounds: 
actual_offset={actual_string_offset}, length={length}, "
+                    f"total_length={len(bytes_data)}")
+            string_data = bytes_data[actual_string_offset:actual_string_offset 
+ length]
+            return string_data.decode('utf-8')
+        else:
+            length = (offset_and_len & cls.HIGHEST_SECOND_TO_EIGHTH_BIT) >> 56
+            start_offset = field_offset
+            if start_offset + length > len(bytes_data):
+                raise ValueError(f"Compact string data out of bounds: 
length={length}")
+            string_data = bytes_data[start_offset:start_offset + length]
+            return string_data.decode('utf-8')
+
+    @classmethod
+    def _parse_binary(cls, bytes_data: bytes, base_offset: int, field_offset: 
int) -> bytes:
+        offset_and_len = struct.unpack('<q', 
bytes_data[field_offset:field_offset + 8])[0]
+        mark = offset_and_len & cls.HIGHEST_FIRST_BIT
+        if mark == 0:
+            sub_offset = (offset_and_len >> 32) & 0xFFFFFFFF
+            length = offset_and_len & 0xFFFFFFFF
+            return bytes_data[base_offset + sub_offset:base_offset + 
sub_offset + length]
+        else:
+            length = (offset_and_len & cls.HIGHEST_SECOND_TO_EIGHTH_BIT) >> 56
+            return bytes_data[field_offset + 1:field_offset + 1 + length]
+
+    @classmethod
+    def _parse_decimal(cls, bytes_data: bytes, base_offset: int, field_offset: 
int, data_type: DataType) -> Decimal:
+        unscaled_long = struct.unpack('<q', 
bytes_data[field_offset:field_offset + 8])[0]
+        type_str = str(data_type)
+        if '(' in type_str and ')' in type_str:
+            try:
+                precision_scale = type_str.split('(')[1].split(')')[0]
+                if ',' in precision_scale:
+                    scale = int(precision_scale.split(',')[1])
+                else:
+                    scale = 0
+            except:
+                scale = 0
+        else:
+            scale = 0
+        return Decimal(unscaled_long) / (10 ** scale)
+
+    @classmethod
+    def _parse_timestamp(cls, bytes_data: bytes, base_offset: int, 
field_offset: int, data_type: DataType) -> datetime:
+        millis = struct.unpack('<q', bytes_data[field_offset:field_offset + 
8])[0]
+        return datetime.fromtimestamp(millis / 1000.0, tz=timezone.utc)
+
+    @classmethod
+    def _parse_date(cls, bytes_data: bytes, field_offset: int) -> datetime:
+        days = struct.unpack('<i', bytes_data[field_offset:field_offset + 
4])[0]
+        return datetime(1970, 1, 1) + timedelta(days=days)
+
+    @classmethod
+    def _parse_time(cls, bytes_data: bytes, field_offset: int) -> datetime:
+        millis = struct.unpack('<i', bytes_data[field_offset:field_offset + 
4])[0]
+        seconds = millis // 1000
+        microseconds = (millis % 1000) * 1000
+        return datetime(1970, 1, 1).replace(
+            hour=seconds // 3600,
+            minute=(seconds % 3600) // 60,
+            second=seconds % 60,
+            microsecond=microseconds
+        )
+
+
+class BinaryRowSerializer:
+    HEADER_SIZE_IN_BITS = 8
+    MAX_FIX_PART_DATA_SIZE = 7
+    HIGHEST_FIRST_BIT = 0x80 << 56
+    HIGHEST_SECOND_TO_EIGHTH_BIT = 0x7F << 56
+
+    @classmethod
+    def to_bytes(cls, binary_row: BinaryRow) -> bytes:
+        if not binary_row.values:
+            return b''
+
+        arity = len(binary_row.fields)
+        null_bits_size_in_bytes = cls._calculate_bit_set_width_in_bytes(arity)
+        fixed_part_size = null_bits_size_in_bytes + arity * 8
+        fixed_part = bytearray(fixed_part_size)
+        fixed_part[0] = binary_row.row_kind.value
+
+        for i, value in enumerate(binary_row.values):
+            if value is None:
+                cls._set_null_bit(fixed_part, 0, i)
+
+        variable_data = []
+        variable_offsets = []
+        current_offset = 0
+
+        for i, (value, field) in enumerate(zip(binary_row.values, 
binary_row.fields)):
+            if value is None:
+                struct.pack_into('<q', fixed_part, null_bits_size_in_bytes + i 
* 8, 0)
+                variable_data.append(b'')
+                variable_offsets.append(0)
+                continue
+
+            field_offset = null_bits_size_in_bytes + i * 8
+            if not isinstance(field.type, AtomicType):
+                raise ValueError(f"BinaryRow only support AtomicType yet, meet 
{field.type.__class__}")
+            if field.type.type.upper() in ['VARCHAR', 'STRING', 'CHAR', 
'BINARY', 'VARBINARY', 'BYTES']:
+                if field.type.type.upper() in ['VARCHAR', 'STRING', 'CHAR']:
+                    if isinstance(value, str):
+                        value_bytes = value.encode('utf-8')
+                    else:
+                        value_bytes = bytes(value)
+                else:
+                    if isinstance(value, bytes):
+                        value_bytes = value
+                    else:
+                        value_bytes = bytes(value)
+
+                length = len(value_bytes)
+                if length <= cls.MAX_FIX_PART_DATA_SIZE:
+                    fixed_part[field_offset:field_offset + length] = 
value_bytes
+                    for j in range(length, 8):
+                        fixed_part[field_offset + j] = 0
+                    packed_long = struct.unpack_from('<q', fixed_part, 
field_offset)[0]
+
+                    offset_and_len = packed_long | (length << 56) | 
cls.HIGHEST_FIRST_BIT
+                    if offset_and_len > 0x7FFFFFFFFFFFFFFF:
+                        offset_and_len = offset_and_len - 0x10000000000000000
+                    struct.pack_into('<q', fixed_part, field_offset, 
offset_and_len)
+                    variable_data.append(b'')
+                    variable_offsets.append(0)
+                else:
+                    variable_data.append(value_bytes)
+                    variable_offsets.append(current_offset)
+                    current_offset += len(value_bytes)
+                    offset_and_len = (variable_offsets[i] << 32) | 
len(variable_data[i])
+                    struct.pack_into('<q', fixed_part, null_bits_size_in_bytes 
+ i * 8, offset_and_len)
+            else:
+                if field.type.type.upper() in ['BOOLEAN', 'BOOL']:
+                    struct.pack_into('<b', fixed_part, field_offset, 1 if 
value else 0)
+                elif field.type.type.upper() in ['TINYINT', 'BYTE']:
+                    struct.pack_into('<b', fixed_part, field_offset, value)
+                elif field.type.type.upper() in ['SMALLINT', 'SHORT']:
+                    struct.pack_into('<h', fixed_part, field_offset, value)
+                elif field.type.type.upper() in ['INT', 'INTEGER']:
+                    struct.pack_into('<i', fixed_part, field_offset, value)
+                elif field.type.type.upper() in ['BIGINT', 'LONG']:
+                    struct.pack_into('<q', fixed_part, field_offset, value)
+                elif field.type.type.upper() in ['FLOAT', 'REAL']:
+                    struct.pack_into('<f', fixed_part, field_offset, value)
+                elif field.type.type.upper() in ['DOUBLE']:
+                    struct.pack_into('<d', fixed_part, field_offset, value)
+                else:
+                    field_bytes = cls._serialize_field_value(value, field.type)
+                    fixed_part[field_offset:field_offset + len(field_bytes)] = 
field_bytes
+
+                variable_data.append(b'')
+                variable_offsets.append(0)
+
+        result = bytes(fixed_part) + b''.join(variable_data)
+        return result
+
+    @classmethod
+    def _calculate_bit_set_width_in_bytes(cls, arity: int) -> int:
+        return ((arity + 63 + cls.HEADER_SIZE_IN_BITS) // 64) * 8
+
+    @classmethod
+    def _set_null_bit(cls, bytes_data: bytearray, offset: int, pos: int) -> 
None:
+        index = pos + cls.HEADER_SIZE_IN_BITS
+        byte_index = offset + (index // 8)
+        bit_index = index % 8
+        bytes_data[byte_index] |= (1 << bit_index)
+
+    @classmethod
+    def _serialize_field_value(cls, value: Any, data_type: AtomicType) -> 
bytes:
+        type_name = data_type.type.upper()
+
+        if type_name in ['BOOLEAN', 'BOOL']:
+            return cls._serialize_boolean(value)
+        elif type_name in ['TINYINT', 'BYTE']:
+            return cls._serialize_byte(value)
+        elif type_name in ['SMALLINT', 'SHORT']:
+            return cls._serialize_short(value)
+        elif type_name in ['INT', 'INTEGER']:
+            return cls._serialize_int(value)
+        elif type_name in ['BIGINT', 'LONG']:
+            return cls._serialize_long(value)
+        elif type_name in ['FLOAT', 'REAL']:
+            return cls._serialize_float(value)
+        elif type_name in ['DOUBLE']:
+            return cls._serialize_double(value)
+        elif type_name in ['VARCHAR', 'STRING', 'CHAR']:
+            return cls._serialize_string(value)
+        elif type_name in ['BINARY', 'VARBINARY', 'BYTES']:
+            return cls._serialize_binary(value)
+        elif type_name in ['DECIMAL', 'NUMERIC']:
+            return cls._serialize_decimal(value, data_type)
+        elif type_name in ['TIMESTAMP', 'TIMESTAMP_WITHOUT_TIME_ZONE']:
+            return cls._serialize_timestamp(value)
+        elif type_name in ['DATE']:
+            return cls._serialize_date(value)
+        elif type_name in ['TIME', 'TIME_WITHOUT_TIME_ZONE']:
+            return cls._serialize_time(value)
+        else:
+            return cls._serialize_string(str(value))
+
+    @classmethod
+    def _serialize_boolean(cls, value: bool) -> bytes:
+        return struct.pack('<b', 1 if value else 0)
+
+    @classmethod
+    def _serialize_byte(cls, value: int) -> bytes:
+        return struct.pack('<b', value)
+
+    @classmethod
+    def _serialize_short(cls, value: int) -> bytes:
+        return struct.pack('<h', value)
+
+    @classmethod
+    def _serialize_int(cls, value: int) -> bytes:
+        return struct.pack('<i', value)
+
+    @classmethod
+    def _serialize_long(cls, value: int) -> bytes:
+        return struct.pack('<q', value)
+
+    @classmethod
+    def _serialize_float(cls, value: float) -> bytes:
+        return struct.pack('<f', value)
+
+    @classmethod
+    def _serialize_double(cls, value: float) -> bytes:
+        return struct.pack('<d', value)
+
+    @classmethod
+    def _serialize_string(cls, value) -> bytes:
+        if isinstance(value, str):
+            value_bytes = value.encode('utf-8')
+        else:
+            value_bytes = bytes(value)
+
+        length = len(value_bytes)
+
+        offset_and_len = (0x80 << 56) | (length << 56)
+        if offset_and_len > 0x7FFFFFFFFFFFFFFF:
+            offset_and_len = offset_and_len - 0x10000000000000000
+        return struct.pack('<q', offset_and_len)
+
+    @classmethod
+    def _serialize_binary(cls, value: bytes) -> bytes:
+        if isinstance(value, bytes):
+            data_bytes = value
+        else:
+            data_bytes = bytes(value)
+        length = len(data_bytes)
+        offset_and_len = (0x80 << 56) | (length << 56)
+        if offset_and_len > 0x7FFFFFFFFFFFFFFF:
+            offset_and_len = offset_and_len - 0x10000000000000000
+        return struct.pack('<q', offset_and_len)
+
+    @classmethod
+    def _serialize_decimal(cls, value: Decimal, data_type: DataType) -> bytes:
+        type_str = str(data_type)
+        if '(' in type_str and ')' in type_str:
+            try:
+                precision_scale = type_str.split('(')[1].split(')')[0]
+                if ',' in precision_scale:
+                    scale = int(precision_scale.split(',')[1])
+                else:
+                    scale = 0
+            except:
+                scale = 0
+        else:
+            scale = 0
+
+        unscaled_value = int(value * (10 ** scale))
+        return struct.pack('<q', unscaled_value)
+
+    @classmethod
+    def _serialize_timestamp(cls, value: datetime) -> bytes:
+        if value.tzinfo is None:
+            value = value.replace(tzinfo=timezone.utc)
+        millis = int(value.timestamp() * 1000)
+        return struct.pack('<q', millis)
+
+    @classmethod
+    def _serialize_date(cls, value: datetime) -> bytes:
+        if isinstance(value, datetime):
+            epoch = datetime(1970, 1, 1)
+            days = (value - epoch).days
+        else:
+            epoch = datetime(1970, 1, 1)
+            days = (value - epoch).days
+        return struct.pack('<i', days)
+
+    @classmethod
+    def _serialize_time(cls, value: datetime) -> bytes:
+        if isinstance(value, datetime):
+            midnight = value.replace(hour=0, minute=0, second=0, microsecond=0)
+            millis = int((value - midnight).total_seconds() * 1000)
+        else:
+            millis = value.hour * 3600000 + value.minute * 60000 + 
value.second * 1000 + value.microsecond // 1000
+        return struct.pack('<i', millis)
diff --git a/paimon-python/pypaimon/common/file_io.py 
b/paimon-python/pypaimon/table/row/internal_row.py
similarity index 52%
copy from paimon-python/pypaimon/common/file_io.py
copy to paimon-python/pypaimon/table/row/internal_row.py
index 9e7418966f..ca19ebcddf 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/table/row/internal_row.py
@@ -15,23 +15,46 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
+
 from abc import ABC, abstractmethod
-from pathlib import Path
+from typing import Any
+
+from pypaimon.table.row.row_kind import RowKind
 
 
-class FileIO(ABC):
+class InternalRow(ABC):
+    """
+    Base interface for an internal data structure representing data of RowType.
+    """
+
     @abstractmethod
-    def exists(self, path: Path) -> bool:
-        """"""
+    def get_field(self, pos: int) -> Any:
+        """
+        Returns the value at the given position.
+        """
 
     @abstractmethod
-    def read_file_utf8(self, path: Path) -> str:
-        """"""
+    def is_null_at(self, pos: int) -> bool:
+        """
+        Returns true if the element is null at the given position.
+        """
 
     @abstractmethod
-    def try_to_write_atomic(self, path: Path, content: str) -> bool:
-        """"""
+    def get_row_kind(self) -> RowKind:
+        """
+        Returns the kind of change that this row describes in a changelog.
+        """
 
     @abstractmethod
-    def list_status(self, path: Path):
-        """"""
+    def __len__(self) -> int:
+        """
+        Returns the number of fields in this row.
+        The number does not include RowKind. It is kept separately.
+        """
+
+    def __str__(self) -> str:
+        fields = []
+        for pos in range(self.__len__()):
+            value = self.get_field(pos)
+            fields.append(str(value))
+        return " ".join(fields)
diff --git a/paimon-python/pypaimon/table/row/key_value.py 
b/paimon-python/pypaimon/table/row/key_value.py
new file mode 100644
index 0000000000..22647c4b6d
--- /dev/null
+++ b/paimon-python/pypaimon/table/row/key_value.py
@@ -0,0 +1,57 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from pypaimon.table.row.offset_row import OffsetRow
+from pypaimon.table.row.row_kind import RowKind
+
+
+class KeyValue:
+    """A key value, including user key, sequence number, value kind and 
value."""
+
+    def __init__(self, key_arity: int, value_arity: int):
+        self.key_arity = key_arity
+        self.value_arity = value_arity
+
+        self._row_tuple = None
+        self._reused_key = OffsetRow(None, 0, key_arity)
+        self._reused_value = OffsetRow(None, key_arity + 2, value_arity)
+
+    def replace(self, row_tuple: tuple):
+        self._row_tuple = row_tuple
+        self._reused_key.replace(row_tuple)
+        self._reused_value.replace(row_tuple)
+        return self
+
+    def is_add(self) -> bool:
+        return RowKind.is_add_byte(self.value_row_kind_byte)
+
+    @property
+    def key(self) -> OffsetRow:
+        return self._reused_key
+
+    @property
+    def value(self) -> OffsetRow:
+        return self._reused_value
+
+    @property
+    def sequence_number(self) -> int:
+        return self._row_tuple[self.key_arity]
+
+    @property
+    def value_row_kind_byte(self) -> int:
+        return self._row_tuple[self.key_arity + 1]
diff --git a/paimon-python/pypaimon/table/row/offset_row.py 
b/paimon-python/pypaimon/table/row/offset_row.py
new file mode 100644
index 0000000000..dc3f37954a
--- /dev/null
+++ b/paimon-python/pypaimon/table/row/offset_row.py
@@ -0,0 +1,58 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from typing import Optional
+
+from pypaimon.table.row.internal_row import InternalRow
+from pypaimon.table.row.row_kind import RowKind
+
+
+class OffsetRow(InternalRow):
+    """A InternalRow to wrap row with offset."""
+
+    def __init__(self, row_tuple: Optional[tuple], offset: int, arity: int):
+        self.row_tuple = row_tuple
+        self.offset = offset
+        self.arity = arity
+        self.row_kind_byte: int = 1
+
+    def replace(self, row_tuple: tuple) -> 'OffsetRow':
+        self.row_tuple = row_tuple
+        if self.offset + self.arity > len(row_tuple):
+            raise ValueError(f"Offset {self.offset} plus arity {self.arity} is 
out of row length {len(row_tuple)}")
+        return self
+
+    def get_field(self, pos: int):
+        if pos >= self.arity:
+            raise IndexError(f"Position {pos} is out of bounds for row arity 
{self.arity}")
+        return self.row_tuple[self.offset + pos]
+
+    def is_null_at(self, pos: int) -> bool:
+        return self.get_field(pos) is None
+
+    def get_row_kind(self) -> RowKind:
+        return RowKind(self.row_kind_byte)
+
+    def set_row_kind_byte(self, row_kind_byte: int) -> None:
+        """
+        Store RowKind as a byte and instantiate it lazily to avoid performance 
overhead.
+        """
+        self.row_kind_byte = row_kind_byte
+
+    def __len__(self) -> int:
+        return self.arity
diff --git a/paimon-python/pypaimon/table/row/row_kind.py 
b/paimon-python/pypaimon/table/row/row_kind.py
new file mode 100644
index 0000000000..06a2904fb3
--- /dev/null
+++ b/paimon-python/pypaimon/table/row/row_kind.py
@@ -0,0 +1,61 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from enum import Enum
+
+
+class RowKind(Enum):
+    INSERT = 0  # +I: Update operation with the previous content of the 
updated row.
+    UPDATE_BEFORE = 1  # -U: Update operation with the previous content of the 
updated row
+    UPDATE_AFTER = 2  # +U: Update operation with new content of the updated 
row
+    DELETE = 3  # -D: Deletion operation
+
+    def is_add(self) -> bool:
+        return self in (RowKind.INSERT, RowKind.UPDATE_AFTER)
+
+    def to_string(self) -> str:
+        if self == RowKind.INSERT:
+            return "+I"
+        elif self == RowKind.UPDATE_BEFORE:
+            return "-U"
+        elif self == RowKind.UPDATE_AFTER:
+            return "+U"
+        elif self == RowKind.DELETE:
+            return "-D"
+        else:
+            return "??"
+
+    @staticmethod
+    def from_string(kind_str: str) -> 'RowKind':
+        if kind_str == "+I":
+            return RowKind.INSERT
+        elif kind_str == "-U":
+            return RowKind.UPDATE_BEFORE
+        elif kind_str == "+U":
+            return RowKind.UPDATE_AFTER
+        elif kind_str == "-D":
+            return RowKind.DELETE
+        else:
+            raise ValueError(f"Unknown row kind string: {kind_str}")
+
+    @classmethod
+    def is_add_byte(cls, byte: int):
+        """
+        Check RowKind type from byte, to avoid creation and destruction of 
RowKind objects, reducing GC pressure
+        """
+        return byte == 0 or byte == 2
diff --git a/paimon-python/pypaimon/tests/api_test.py 
b/paimon-python/pypaimon/tests/api_test.py
index 8012a5a364..9a604dad60 100644
--- a/paimon-python/pypaimon/tests/api_test.py
+++ b/paimon-python/pypaimon/tests/api_test.py
@@ -26,11 +26,11 @@ from ..api import RESTApi
 from ..api.auth import BearTokenAuthProvider
 from ..api.identifier import Identifier
 from ..api.options import Options
-from ..api.rest_json import JSON
+from pypaimon.common.rest_json import JSON
 from pypaimon.schema.table_schema import TableSchema
 from ..api.token_loader import DLFTokenLoaderFactory, DLFToken
 
-from ..api.data_types import AtomicInteger, DataTypeParser, AtomicType, 
ArrayType, MapType, RowType, DataField
+from pypaimon.schema.data_types import AtomicInteger, DataTypeParser, 
AtomicType, ArrayType, MapType, RowType, DataField
 from ..catalog.catalog_context import CatalogContext
 from ..catalog.table_metadata import TableMetadata
 from ..rest.rest_catalog import RESTCatalog
@@ -261,8 +261,8 @@ class ApiTestCase(unittest.TestCase):
             server.start()
             ecs_metadata_url = 
f"http://localhost:{server.port}/ram/security-credential/";
             options = {
-                api.RESTCatalogOptions.DLF_TOKEN_LOADER: 'ecs',
-                api.RESTCatalogOptions.DLF_TOKEN_ECS_METADATA_URL: 
ecs_metadata_url
+                api.CatalogOptions.DLF_TOKEN_LOADER: 'ecs',
+                api.CatalogOptions.DLF_TOKEN_ECS_METADATA_URL: ecs_metadata_url
             }
             loader = DLFTokenLoaderFactory.create_token_loader(options)
             load_token = loader.load_token()
@@ -271,9 +271,9 @@ class ApiTestCase(unittest.TestCase):
             self.assertEqual(load_token.security_token, token.security_token)
             self.assertEqual(load_token.expiration, token.expiration)
             options_with_role = {
-                api.RESTCatalogOptions.DLF_TOKEN_LOADER: 'ecs',
-                api.RESTCatalogOptions.DLF_TOKEN_ECS_METADATA_URL: 
ecs_metadata_url,
-                api.RESTCatalogOptions.DLF_TOKEN_ECS_ROLE_NAME: role_name,
+                api.CatalogOptions.DLF_TOKEN_LOADER: 'ecs',
+                api.CatalogOptions.DLF_TOKEN_ECS_METADATA_URL: 
ecs_metadata_url,
+                api.CatalogOptions.DLF_TOKEN_ECS_ROLE_NAME: role_name,
             }
             loader = 
DLFTokenLoaderFactory.create_token_loader(options_with_role)
             token = loader.load_token()
diff --git a/paimon-python/pypaimon/tests/pvfs_test.py 
b/paimon-python/pypaimon/tests/pvfs_test.py
index c20f337302..b81914a347 100644
--- a/paimon-python/pypaimon/tests/pvfs_test.py
+++ b/paimon-python/pypaimon/tests/pvfs_test.py
@@ -24,7 +24,7 @@ from pathlib import Path
 
 from pypaimon.api import ConfigResponse
 from pypaimon.api.auth import BearTokenAuthProvider
-from pypaimon.api.data_types import DataField, AtomicType
+from pypaimon.schema.data_types import DataField, AtomicType
 from pypaimon.schema.table_schema import TableSchema
 from pypaimon.catalog.table_metadata import TableMetadata
 from pypaimon.pvfs import PaimonVirtualFileSystem
diff --git a/paimon-python/pypaimon/tests/rest_server.py 
b/paimon-python/pypaimon/tests/rest_server.py
index 27ba27907c..44be0e9e42 100644
--- a/paimon-python/pypaimon/tests/rest_server.py
+++ b/paimon-python/pypaimon/tests/rest_server.py
@@ -32,8 +32,10 @@ from ..api import RenameTableRequest, CreateTableRequest, 
CreateDatabaseRequest,
 from ..api.api_response import (ConfigResponse, ListDatabasesResponse, 
GetDatabaseResponse,
                                 Schema, GetTableResponse, ListTablesResponse,
                                 RESTResponse, PagedList)
-from ..api.rest_json import JSON
+from pypaimon.common.rest_json import JSON
 from pypaimon.schema.table_schema import TableSchema
+from ..catalog.catalog_exception import DatabaseNoPermissionException, 
TableNotExistException, \
+    DatabaseNotExistException, TableNoPermissionException
 from ..catalog.table_metadata import TableMetadata
 
 
@@ -57,143 +59,6 @@ class ErrorResponse(RESTResponse):
     code: int
 
 
-# Exception classes
-class CatalogException(Exception):
-    """Base catalog exception"""
-
-
-class DatabaseNotExistException(CatalogException):
-    """Database not exist exception"""
-
-    def __init__(self, database: str):
-        self.database = database
-        super().__init__(f"Database {database} does not exist")
-
-
-class DatabaseAlreadyExistException(CatalogException):
-    """Database already exist exception"""
-
-    def __init__(self, database: str):
-        self.database = database
-        super().__init__(f"Database {database} already exists")
-
-
-class DatabaseNoPermissionException(CatalogException):
-    """Database no permission exception"""
-
-    def __init__(self, database: str):
-        self.database = database
-        super().__init__(f"No permission to access database {database}")
-
-
-class TableNotExistException(CatalogException):
-    """Table not exist exception"""
-
-    def __init__(self, identifier: Identifier):
-        self.identifier = identifier
-        super().__init__(f"Table {identifier.get_full_name()} does not exist")
-
-
-class TableAlreadyExistException(CatalogException):
-    """Table already exist exception"""
-
-    def __init__(self, identifier: Identifier):
-        self.identifier = identifier
-        super().__init__(f"Table {identifier.get_full_name()} already exists")
-
-
-class TableNoPermissionException(CatalogException):
-    """Table no permission exception"""
-
-    def __init__(self, identifier: Identifier):
-        self.identifier = identifier
-        super().__init__(f"No permission to access table 
{identifier.get_full_name()}")
-
-
-class ViewNotExistException(CatalogException):
-    """View not exist exception"""
-
-    def __init__(self, identifier: Identifier):
-        self.identifier = identifier
-        super().__init__(f"View {identifier.get_full_name()} does not exist")
-
-
-class ViewAlreadyExistException(CatalogException):
-    """View already exist exception"""
-
-    def __init__(self, identifier: Identifier):
-        self.identifier = identifier
-        super().__init__(f"View {identifier.get_full_name()} already exists")
-
-
-class FunctionNotExistException(CatalogException):
-    """Function not exist exception"""
-
-    def __init__(self, identifier: Identifier):
-        self.identifier = identifier
-        super().__init__(f"Function {identifier.get_full_name()} does not 
exist")
-
-
-class FunctionAlreadyExistException(CatalogException):
-    """Function already exist exception"""
-
-    def __init__(self, identifier: Identifier):
-        self.identifier = identifier
-        super().__init__(f"Function {identifier.get_full_name()} already 
exists")
-
-
-class ColumnNotExistException(CatalogException):
-    """Column not exist exception"""
-
-    def __init__(self, column: str):
-        self.column = column
-        super().__init__(f"Column {column} does not exist")
-
-
-class ColumnAlreadyExistException(CatalogException):
-    """Column already exist exception"""
-
-    def __init__(self, column: str):
-        self.column = column
-        super().__init__(f"Column {column} already exists")
-
-
-class DefinitionNotExistException(CatalogException):
-    """Definition not exist exception"""
-
-    def __init__(self, identifier: Identifier, name: str):
-        self.identifier = identifier
-        self.name = name
-        super().__init__(f"Definition {name} does not exist in 
{identifier.get_full_name()}")
-
-
-class DefinitionAlreadyExistException(CatalogException):
-    """Definition already exist exception"""
-
-    def __init__(self, identifier: Identifier, name: str):
-        self.identifier = identifier
-        self.name = name
-        super().__init__(f"Definition {name} already exists in 
{identifier.get_full_name()}")
-
-
-class DialectNotExistException(CatalogException):
-    """Dialect not exist exception"""
-
-    def __init__(self, identifier: Identifier, dialect: str):
-        self.identifier = identifier
-        self.dialect = dialect
-        super().__init__(f"Dialect {dialect} does not exist in 
{identifier.get_full_name()}")
-
-
-class DialectAlreadyExistException(CatalogException):
-    """Dialect already exist exception"""
-
-    def __init__(self, identifier: Identifier, dialect: str):
-        self.identifier = identifier
-        self.dialect = dialect
-        super().__init__(f"Dialect {dialect} already exists in 
{identifier.get_full_name()}")
-
-
 # Constants
 DEFAULT_MAX_RESULTS = 100
 AUTHORIZATION_HEADER_KEY = "Authorization"

Reply via email to