This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d87a4ec34 [core] python: support rest commit (#6079)
2d87a4ec34 is described below

commit 2d87a4ec3401ca85f21f560b1d555a18a7d47ba8
Author: jerry <[email protected]>
AuthorDate: Mon Aug 18 15:14:09 2025 +0800

    [core] python: support rest commit (#6079)
---
 paimon-python/pypaimon/api/__init__.py             |  51 ++-
 .../api/{api_resquest.py => api_request.py}        |  15 +-
 paimon-python/pypaimon/api/api_response.py         |  10 +
 paimon-python/pypaimon/catalog/catalog.py          |  35 +-
 .../pypaimon/catalog/catalog_snapshot_commit.py    |  77 ++++
 .../pypaimon/catalog/filesystem_catalog.py         |  26 +-
 .../pypaimon/catalog/renaming_snapshot_commit.py   |  93 +++++
 .../pypaimon/catalog/rest/rest_catalog.py          |  61 +++-
 .../pypaimon/catalog/rest/rest_catalog_loader.py   |  69 ++++
 paimon-python/pypaimon/catalog/snapshot_commit.py  |  99 +++++
 paimon-python/pypaimon/common/predicate.py         |   1 +
 paimon-python/pypaimon/common/rest_json.py         |  20 +-
 paimon-python/pypaimon/schema/data_types.py        |  20 +
 paimon-python/pypaimon/snapshot/snapshot.py        |  71 ++--
 .../pypaimon/snapshot/snapshot_manager.py          |  37 +-
 .../pypaimon/table/catalog_environment.py          |  56 +++
 paimon-python/pypaimon/table/file_store_table.py   |  18 +-
 paimon-python/pypaimon/tests/rest_server.py        | 141 +++++++-
 .../pypaimon/tests/test_file_store_commit.py       | 401 +++++++++++++++++++++
 .../tests/test_rest_catalog_commit_snapshot.py     | 221 ++++++++++++
 paimon-python/pypaimon/write/batch_table_commit.py |   8 +-
 paimon-python/pypaimon/write/file_store_commit.py  | 126 ++++++-
 22 files changed, 1557 insertions(+), 99 deletions(-)

diff --git a/paimon-python/pypaimon/api/__init__.py 
b/paimon-python/pypaimon/api/__init__.py
index 6806733c7a..a384c3f123 100644
--- a/paimon-python/pypaimon/api/__init__.py
+++ b/paimon-python/pypaimon/api/__init__.py
@@ -19,20 +19,24 @@ import logging
 from typing import Callable, Dict, List, Optional
 from urllib.parse import unquote
 
-from pypaimon.api.api_response import (ConfigResponse, GetDatabaseResponse,
-                                       GetTableResponse, GetTableTokenResponse,
+from pypaimon.api.api_response import (CommitTableResponse, ConfigResponse,
+                                       GetDatabaseResponse, GetTableResponse,
+                                       GetTableTokenResponse,
                                        ListDatabasesResponse,
                                        ListTablesResponse, PagedList,
                                        PagedResponse)
-from pypaimon.api.api_resquest import (AlterDatabaseRequest,
-                                       CreateDatabaseRequest,
-                                       CreateTableRequest, RenameTableRequest)
+from pypaimon.api.api_request import (AlterDatabaseRequest,
+                                      CommitTableRequest,
+                                      CreateDatabaseRequest,
+                                      CreateTableRequest, RenameTableRequest)
 from pypaimon.api.auth import AuthProviderFactory, RESTAuthFunction
 from pypaimon.api.client import HttpClient
 from pypaimon.api.typedef import T
+from pypaimon.catalog.snapshot_commit import PartitionStatistics
 from pypaimon.common.config import CatalogOptions
 from pypaimon.common.identifier import Identifier
 from pypaimon.schema.schema import Schema
+from pypaimon.snapshot.snapshot import Snapshot
 
 
 class RESTException(Exception):
@@ -119,6 +123,10 @@ class ResourcePaths:
     def rename_table(self) -> str:
         return f"{self.base_path}/{self.TABLES}/rename"
 
+    def commit_table(self, database_name: str, table_name: str) -> str:
+        return 
(f"{self.base_path}/{self.DATABASES}/{RESTUtil.encode_string(database_name)}"
+                f"/{self.TABLES}/{RESTUtil.encode_string(table_name)}/commit")
+
 
 class RESTApi:
     HEADER_PREFIX = "header."
@@ -336,3 +344,36 @@ class RESTApi:
             GetTableTokenResponse,
             self.rest_auth_function,
         )
+
+    def commit_snapshot(
+            self,
+            identifier: Identifier,
+            table_uuid: Optional[str],
+            snapshot: Snapshot,
+            statistics: List[PartitionStatistics]
+    ) -> bool:
+        """
+        Commit snapshot for table.
+
+        Args:
+            identifier: Database name and table name
+            table_uuid: UUID of the table to avoid wrong commit
+            snapshot: Snapshot for committing
+            statistics: Statistics for this snapshot incremental
+
+        Returns:
+            True if commit success
+
+        Raises:
+            NoSuchResourceException: Exception thrown on HTTP 404 means the 
table not exists
+            ForbiddenException: Exception thrown on HTTP 403 means don't have 
the permission for this table
+        """
+        request = CommitTableRequest(table_uuid, snapshot, statistics)
+        response = self.client.post_with_response_type(
+            self.resource_paths.commit_table(
+                identifier.database_name, identifier.object_name),
+            request,
+            CommitTableResponse,
+            self.rest_auth_function
+        )
+        return response.is_success()
diff --git a/paimon-python/pypaimon/api/api_resquest.py 
b/paimon-python/pypaimon/api/api_request.py
similarity index 79%
rename from paimon-python/pypaimon/api/api_resquest.py
rename to paimon-python/pypaimon/api/api_request.py
index 46789870c5..cf22d9853b 100644
--- a/paimon-python/pypaimon/api/api_resquest.py
+++ b/paimon-python/pypaimon/api/api_request.py
@@ -18,11 +18,13 @@ limitations under the License.
 
 from abc import ABC
 from dataclasses import dataclass
-from typing import Dict, List
+from typing import Dict, List, Optional
 
+from pypaimon.catalog.snapshot_commit import PartitionStatistics
 from pypaimon.common.identifier import Identifier
 from pypaimon.common.rest_json import json_field
 from pypaimon.schema.schema import Schema
+from pypaimon.snapshot.snapshot import Snapshot
 
 
 class RESTRequest(ABC):
@@ -63,3 +65,14 @@ class CreateTableRequest(RESTRequest):
 
     identifier: Identifier = json_field(FIELD_IDENTIFIER)
     schema: Schema = json_field(FIELD_SCHEMA)
+
+
+@dataclass
+class CommitTableRequest(RESTRequest):
+    FIELD_TABLE_UUID = "tableUuid"
+    FIELD_SNAPSHOT = "snapshot"
+    FIELD_STATISTICS = "statistics"
+
+    table_uuid: Optional[str] = json_field(FIELD_TABLE_UUID)
+    snapshot: Snapshot = json_field(FIELD_SNAPSHOT)
+    statistics: List[PartitionStatistics] = json_field(FIELD_STATISTICS)
diff --git a/paimon-python/pypaimon/api/api_response.py 
b/paimon-python/pypaimon/api/api_response.py
index 17496f047e..5658f5b351 100644
--- a/paimon-python/pypaimon/api/api_response.py
+++ b/paimon-python/pypaimon/api/api_response.py
@@ -258,3 +258,13 @@ class GetTableTokenResponse(RESTResponse):
 
     token: Dict[str, str] = json_field(FIELD_TOKEN, default=None)
     expires_at_millis: Optional[int] = json_field(FIELD_EXPIRES_AT_MILLIS, 
default=None)
+
+
+@dataclass
+class CommitTableResponse(RESTResponse):
+    FIELD_SUCCESS = "success"
+
+    success: bool = json_field(FIELD_SUCCESS, default=False)
+
+    def is_success(self) -> bool:
+        return self.success
diff --git a/paimon-python/pypaimon/catalog/catalog.py 
b/paimon-python/pypaimon/catalog/catalog.py
index 8666f7caa6..c2b1f6915e 100644
--- a/paimon-python/pypaimon/catalog/catalog.py
+++ b/paimon-python/pypaimon/catalog/catalog.py
@@ -17,10 +17,12 @@
 
#################################################################################
 
 from abc import ABC, abstractmethod
-from typing import Optional, Union
+from typing import List, Optional, Union
 
+from pypaimon.catalog.snapshot_commit import PartitionStatistics
 from pypaimon.common.identifier import Identifier
 from pypaimon.schema.schema import Schema
+from pypaimon.snapshot.snapshot import Snapshot
 
 
 class Catalog(ABC):
@@ -51,3 +53,34 @@ class Catalog(ABC):
     @abstractmethod
     def create_table(self, identifier: Union[str, Identifier], schema: Schema, 
ignore_if_exists: bool):
         """Create table with schema."""
+
+    def supports_version_management(self) -> bool:
+        """
+        Whether this catalog supports version management for tables.
+
+        Returns:
+            True if the catalog supports version management, False otherwise
+        """
+        return False
+
+    @abstractmethod
+    def commit_snapshot(
+            self,
+            identifier: Identifier,
+            table_uuid: Optional[str],
+            snapshot: Snapshot,
+            statistics: List[PartitionStatistics]
+    ) -> bool:
+        """
+        Commit the Snapshot for table identified by the given Identifier.
+
+        Args:
+            identifier: Path of the table
+            table_uuid: UUID of the table to avoid wrong commit
+            snapshot: Snapshot to be committed
+            statistics: Statistics information of this change
+
+        Returns:
+            True if commit was successful, False otherwise
+
+        """
diff --git a/paimon-python/pypaimon/catalog/catalog_snapshot_commit.py 
b/paimon-python/pypaimon/catalog/catalog_snapshot_commit.py
new file mode 100644
index 0000000000..cc1ed258f8
--- /dev/null
+++ b/paimon-python/pypaimon/catalog/catalog_snapshot_commit.py
@@ -0,0 +1,77 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from typing import List
+
+from pypaimon.catalog.catalog import Catalog
+from pypaimon.catalog.snapshot_commit import PartitionStatistics, 
SnapshotCommit
+from pypaimon.common.identifier import Identifier
+from pypaimon.snapshot.snapshot import Snapshot
+
+
+class CatalogSnapshotCommit(SnapshotCommit):
+    """A SnapshotCommit using Catalog to commit."""
+
+    def __init__(self, catalog: Catalog, identifier: Identifier, uuid: str):
+        """
+        Initialize CatalogSnapshotCommit.
+
+        Args:
+            catalog: The catalog instance to use for committing
+            identifier: The table identifier
+            uuid: Optional table UUID for verification
+        """
+        self.catalog = catalog
+        self.identifier = identifier
+        self.uuid = uuid
+
+    def commit(self, snapshot: Snapshot, branch: str, statistics: 
List[PartitionStatistics]) -> bool:
+        """
+        Commit the snapshot using the catalog.
+
+        Args:
+            snapshot: The snapshot to commit
+            branch: The branch name to commit to
+            statistics: List of partition statistics
+
+        Returns:
+            True if commit was successful
+
+        Raises:
+            Exception: If commit fails
+        """
+        new_identifier = Identifier(
+            database_name=self.identifier.get_database_name(),
+            object_name=self.identifier.get_table_name(),
+            branch_name=branch
+        )
+
+        # Call catalog's commit_snapshot method
+        if hasattr(self.catalog, 'commit_snapshot'):
+            return self.catalog.commit_snapshot(new_identifier, self.uuid, 
snapshot, statistics)
+        else:
+            # Fallback for catalogs that don't support snapshot commits
+            raise NotImplementedError(
+                "The catalog does not support snapshot commits. "
+                "The commit_snapshot method needs to be implemented in the 
catalog interface."
+            )
+
+    def close(self):
+        """Close the catalog and release resources."""
+        if hasattr(self.catalog, 'close'):
+            self.catalog.close()
diff --git a/paimon-python/pypaimon/catalog/filesystem_catalog.py 
b/paimon-python/pypaimon/catalog/filesystem_catalog.py
index 6c4dc74623..68d1e438c8 100644
--- a/paimon-python/pypaimon/catalog/filesystem_catalog.py
+++ b/paimon-python/pypaimon/catalog/filesystem_catalog.py
@@ -17,7 +17,7 @@
 
#################################################################################
 
 from pathlib import Path
-from typing import Optional, Union
+from typing import Optional, Union, List
 from urllib.parse import urlparse
 
 from pypaimon.catalog.catalog import Catalog
@@ -26,11 +26,14 @@ from pypaimon.catalog.catalog_exception import 
(DatabaseAlreadyExistException,
                                                 TableAlreadyExistException,
                                                 TableNotExistException)
 from pypaimon.catalog.database import Database
+from pypaimon.catalog.snapshot_commit import PartitionStatistics
 from pypaimon.common.config import CatalogOptions
 from pypaimon.common.core_options import CoreOptions
 from pypaimon.common.file_io import FileIO
 from pypaimon.common.identifier import Identifier
 from pypaimon.schema.schema_manager import SchemaManager
+from pypaimon.snapshot.snapshot import Snapshot
+from pypaimon.table.catalog_environment import CatalogEnvironment
 from pypaimon.table.file_store_table import FileStoreTable
 from pypaimon.table.table import Table
 
@@ -67,7 +70,17 @@ class FileSystemCatalog(Catalog):
             raise ValueError(CoreOptions.SCAN_FALLBACK_BRANCH)
         table_path = self.get_table_path(identifier)
         table_schema = self.get_table_schema(identifier)
-        return FileStoreTable(self.file_io, identifier, table_path, 
table_schema)
+
+        # Create catalog environment for filesystem catalog
+        # Filesystem catalog doesn't support version management by default
+        catalog_environment = CatalogEnvironment(
+            identifier=identifier,
+            uuid=None,  # Filesystem catalog doesn't track table UUIDs
+            catalog_loader=None,  # No catalog loader for filesystem
+            supports_version_management=False
+        )
+
+        return FileStoreTable(self.file_io, identifier, table_path, 
table_schema, catalog_environment)
 
     def create_table(self, identifier: Union[str, Identifier], schema: 
'Schema', ignore_if_exists: bool):
         if schema.options and schema.options.get(CoreOptions.AUTO_CREATE):
@@ -107,3 +120,12 @@ class FileSystemCatalog(Catalog):
         bucket = parsed.netloc
         warehouse_dir = parsed.path.lstrip('/')
         return Path(f"{bucket}/{warehouse_dir}" if warehouse_dir else bucket)
+
+    def commit_snapshot(
+            self,
+            identifier: Identifier,
+            table_uuid: Optional[str],
+            snapshot: Snapshot,
+            statistics: List[PartitionStatistics]
+    ) -> bool:
+        raise NotImplementedError("This catalog does not support commit 
catalog")
diff --git a/paimon-python/pypaimon/catalog/renaming_snapshot_commit.py 
b/paimon-python/pypaimon/catalog/renaming_snapshot_commit.py
new file mode 100644
index 0000000000..4a2e70e4e2
--- /dev/null
+++ b/paimon-python/pypaimon/catalog/renaming_snapshot_commit.py
@@ -0,0 +1,93 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from typing import List
+
+from pypaimon.catalog.snapshot_commit import PartitionStatistics, 
SnapshotCommit
+from pypaimon.common.file_io import FileIO
+from pypaimon.common.rest_json import JSON
+from pypaimon.snapshot.snapshot import Snapshot
+from pypaimon.snapshot.snapshot_manager import SnapshotManager
+
+
+class RenamingSnapshotCommit(SnapshotCommit):
+    """
+    A SnapshotCommit using file renaming to commit.
+
+    Note that when the file system is local or HDFS, rename is atomic.
+    But if the file system is object storage, we need additional lock 
protection.
+    """
+
+    def __init__(self, snapshot_manager: SnapshotManager):
+        """
+        Initialize RenamingSnapshotCommit.
+
+        Args:
+            snapshot_manager: The snapshot manager to use
+            lock: The lock for synchronization
+        """
+        self.snapshot_manager = snapshot_manager
+        self.file_io: FileIO = snapshot_manager.file_io
+
+    def commit(self, snapshot: Snapshot, branch: str, statistics: 
List[PartitionStatistics]) -> bool:
+        """
+        Commit the snapshot using file renaming.
+
+        Args:
+            snapshot: The snapshot to commit
+            branch: The branch name to commit to
+            statistics: List of partition statistics (currently unused but 
kept for interface compatibility)
+
+        Returns:
+            True if commit was successful, False otherwise
+
+        Raises:
+            Exception: If commit fails
+        """
+        new_snapshot_path = 
self.snapshot_manager.get_snapshot_path(snapshot.id)
+        if not self.file_io.exists(new_snapshot_path):
+            """Internal function to perform the actual commit."""
+            # Try to write atomically using the file IO
+            committed = self.file_io.try_to_write_atomic(new_snapshot_path, 
JSON.to_json(snapshot))
+            if committed:
+                # Update the latest hint
+                self._commit_latest_hint(snapshot.id)
+            return committed
+        return False
+
+    def close(self):
+        """Close the lock and release resources."""
+
+    def _commit_latest_hint(self, snapshot_id: int):
+        """
+        Update the latest snapshot hint.
+
+        Args:
+            snapshot_id: The latest snapshot ID
+        """
+        latest_file = self.snapshot_manager.latest_file
+        try:
+            # Try atomic write first
+            success = self.file_io.try_to_write_atomic(latest_file, 
str(snapshot_id))
+            if not success:
+                # Fallback to regular write
+                self.file_io.write_file(latest_file, str(snapshot_id), 
overwrite=True)
+        except Exception as e:
+            # Log the error but don't fail the commit for this
+            # In a production system, you might want to use proper logging
+            print(f"Warning: Failed to update LATEST hint: {e}")
diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py 
b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
index f597f059f8..f35ee04322 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
@@ -19,20 +19,24 @@ from pathlib import Path
 from typing import Any, Callable, Dict, List, Optional, Union
 from urllib.parse import urlparse
 
-from pypaimon.api import CatalogOptions, RESTApi
+from pypaimon.api import (CatalogOptions,
+                          NoSuchResourceException, RESTApi)
 from pypaimon.api.api_response import GetTableResponse, PagedList
 from pypaimon.api.options import Options
 from pypaimon.catalog.catalog import Catalog
 from pypaimon.catalog.catalog_context import CatalogContext
+from pypaimon.catalog.catalog_exception import TableNotExistException
 from pypaimon.catalog.database import Database
 from pypaimon.catalog.property_change import PropertyChange
 from pypaimon.catalog.rest.rest_token_file_io import RESTTokenFileIO
+from pypaimon.catalog.snapshot_commit import PartitionStatistics
 from pypaimon.catalog.table_metadata import TableMetadata
 from pypaimon.common.core_options import CoreOptions
 from pypaimon.common.file_io import FileIO
 from pypaimon.common.identifier import Identifier
 from pypaimon.schema.schema import Schema
 from pypaimon.schema.table_schema import TableSchema
+from pypaimon.snapshot.snapshot import Snapshot
 from pypaimon.table.catalog_environment import CatalogEnvironment
 from pypaimon.table.file_store_table import FileStoreTable
 
@@ -45,6 +49,55 @@ class RESTCatalog(Catalog):
                                              context.fallback_io_loader)
         self.data_token_enabled = 
self.api.options.get(CatalogOptions.DATA_TOKEN_ENABLED)
 
+    def catalog_loader(self):
+        """
+        Create and return a RESTCatalogLoader for this catalog.
+
+        Returns:
+            A RESTCatalogLoader instance configured with this catalog's context
+        """
+        from pypaimon.catalog.rest.rest_catalog_loader import RESTCatalogLoader
+        return RESTCatalogLoader(self.context)
+
+    def supports_version_management(self) -> bool:
+        """
+        Return whether this catalog supports version management for tables.
+
+        Returns:
+            True since REST catalogs support version management
+        """
+        return True
+
+    def commit_snapshot(
+            self,
+            identifier: Identifier,
+            table_uuid: Optional[str],
+            snapshot: Snapshot,
+            statistics: List[PartitionStatistics]
+    ) -> bool:
+        """
+        Commit the Snapshot for table identified by the given Identifier.
+
+        Args:
+            identifier: Path of the table
+            table_uuid: UUID of the table to avoid wrong commit
+            snapshot: Snapshot to be committed
+            statistics: Statistics information of this change
+
+        Returns:
+            True if commit was successful, False otherwise
+
+        Raises:
+            TableNotExistException: If the target table does not exist
+        """
+        try:
+            return self.api.commit_snapshot(identifier, table_uuid, snapshot, 
statistics)
+        except NoSuchResourceException as e:
+            raise TableNotExistException(identifier) from e
+        except Exception as e:
+            # Handle other exceptions that might be thrown by the API
+            raise RuntimeError(f"Failed to commit snapshot for table 
{identifier.get_full_name()}: {e}") from e
+
     def list_databases(self) -> List[str]:
         return self.api.list_databases()
 
@@ -146,8 +199,8 @@ class RESTCatalog(Catalog):
         catalog_env = CatalogEnvironment(
             identifier=identifier,
             uuid=metadata.uuid,
-            catalog_loader=None,
-            supports_version_management=False
+            catalog_loader=self.catalog_loader(),
+            supports_version_management=True  # REST catalogs support version 
management
         )
         path_parsed = urlparse(schema.options.get(CoreOptions.PATH))
         path = Path(path_parsed.path) if path_parsed.scheme is None else 
Path(schema.options.get(CoreOptions.PATH))
@@ -164,4 +217,4 @@ class RESTCatalog(Catalog):
                catalog_environment: CatalogEnvironment
                ) -> FileStoreTable:
         """Create FileStoreTable with dynamic options and catalog 
environment"""
-        return FileStoreTable(file_io, catalog_environment.identifier, 
table_path, table_schema)
+        return FileStoreTable(file_io, catalog_environment.identifier, 
table_path, table_schema, catalog_environment)
diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog_loader.py 
b/paimon-python/pypaimon/catalog/rest/rest_catalog_loader.py
new file mode 100644
index 0000000000..a8e9da1dff
--- /dev/null
+++ b/paimon-python/pypaimon/catalog/rest/rest_catalog_loader.py
@@ -0,0 +1,69 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""
+RESTCatalogLoader implementation for pypaimon.
+
+This module provides the RESTCatalogLoader class which implements the 
CatalogLoader
+interface to create and load RESTCatalog instances.
+"""
+
+from pypaimon.catalog.catalog_context import CatalogContext
+from pypaimon.catalog.catalog_loader import CatalogLoader
+from pypaimon.catalog.rest.rest_catalog import RESTCatalog
+
+
+class RESTCatalogLoader(CatalogLoader):
+    """
+    Loader to create RESTCatalog instances.
+
+    This class implements the CatalogLoader interface and is responsible for
+    creating and configuring RESTCatalog instances based on the provided
+    CatalogContext.
+    """
+
+    def __init__(self, context: CatalogContext):
+        """
+        Initialize RESTCatalogLoader with a CatalogContext.
+
+        Args:
+            context: The CatalogContext containing configuration options
+        """
+        self._context = context
+
+    def context(self) -> CatalogContext:
+        """
+        Get the CatalogContext associated with this loader.
+
+        Returns:
+            The CatalogContext instance
+        """
+        return self._context
+
+    def load(self) -> RESTCatalog:
+        """
+        Load and return a new RESTCatalog instance.
+
+        This method creates a new RESTCatalog instance using the stored
+        CatalogContext, with config_required set to False to avoid
+        redundant configuration validation.
+
+        Returns:
+            A new RESTCatalog instance
+        """
+        return RESTCatalog(self._context, config_required=False)
diff --git a/paimon-python/pypaimon/catalog/snapshot_commit.py 
b/paimon-python/pypaimon/catalog/snapshot_commit.py
new file mode 100644
index 0000000000..f66b123343
--- /dev/null
+++ b/paimon-python/pypaimon/catalog/snapshot_commit.py
@@ -0,0 +1,99 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from abc import ABC, abstractmethod
+from dataclasses import dataclass
+from typing import Dict, List
+import time
+
+from pypaimon.common.rest_json import json_field
+from pypaimon.snapshot.snapshot import Snapshot
+
+
+@dataclass
+class PartitionStatistics:
+    """
+    Represents partition statistics for snapshot commits.
+
+    This class matches the Java org.apache.paimon.partition.PartitionStatistics
+    structure for proper JSON serialization in REST API calls.
+    """
+
+    spec: Dict[str, str] = json_field("spec", default_factory=dict)
+    record_count: int = json_field("recordCount", default=0)
+    file_size_in_bytes: int = json_field("fileSizeInBytes", default=0)
+    file_count: int = json_field("fileCount", default=0)
+    last_file_creation_time: int = json_field("lastFileCreationTime", 
default_factory=lambda: int(time.time() * 1000))
+
+    @classmethod
+    def create(cls, partition_spec: Dict[str, str] = None, record_count: int = 
0,
+               file_count: int = 0, file_size_in_bytes: int = 0,
+               last_file_creation_time: int = None) -> 'PartitionStatistics':
+        """
+        Factory method to create PartitionStatistics with backward 
compatibility.
+
+        Args:
+            partition_spec: Partition specification dictionary
+            record_count: Number of records
+            file_count: Number of files
+            file_size_in_bytes: Total file size in bytes
+            last_file_creation_time: Last file creation time in milliseconds
+
+        Returns:
+            PartitionStatistics instance
+        """
+        return cls(
+            spec=partition_spec or {},
+            record_count=record_count,
+            file_count=file_count,
+            file_size_in_bytes=file_size_in_bytes,
+            last_file_creation_time=last_file_creation_time or int(time.time() 
* 1000)
+        )
+
+
+class SnapshotCommit(ABC):
+    """Interface to commit snapshot atomically."""
+
+    @abstractmethod
+    def commit(self, snapshot: Snapshot, branch: str, statistics: 
List[PartitionStatistics]) -> bool:
+        """
+        Commit the given snapshot.
+
+        Args:
+            snapshot: The snapshot to commit
+            branch: The branch name to commit to
+            statistics: List of partition statistics
+
+        Returns:
+            True if commit was successful, False otherwise
+
+        Raises:
+            Exception: If commit fails
+        """
+        pass
+
+    @abstractmethod
+    def close(self):
+        """Close the snapshot commit and release any resources."""
+        pass
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.close()
diff --git a/paimon-python/pypaimon/common/predicate.py 
b/paimon-python/pypaimon/common/predicate.py
index 5a46cb419d..a9fefcdef6 100644
--- a/paimon-python/pypaimon/common/predicate.py
+++ b/paimon-python/pypaimon/common/predicate.py
@@ -15,6 +15,7 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
+from __future__ import annotations
 
 from dataclasses import dataclass
 from functools import reduce
diff --git a/paimon-python/pypaimon/common/rest_json.py 
b/paimon-python/pypaimon/common/rest_json.py
index aa959e1c53..2f007cd230 100644
--- a/paimon-python/pypaimon/common/rest_json.py
+++ b/paimon-python/pypaimon/common/rest_json.py
@@ -43,6 +43,11 @@ class JSON:
     @staticmethod
     def __to_dict(obj: Any) -> Dict[str, Any]:
         """Convert to dictionary with custom field names"""
+        # If object has custom to_dict method, use it
+        if hasattr(obj, "to_dict") and callable(getattr(obj, "to_dict")):
+            return obj.to_dict()
+
+        # Otherwise, use dataclass field-by-field serialization
         result = {}
         for field_info in fields(obj):
             field_value = getattr(obj, field_info.name)
@@ -51,13 +56,15 @@ class JSON:
             json_name = field_info.metadata.get("json_name", field_info.name)
 
             # Handle nested objects
-            if is_dataclass(field_value):
-                result[json_name] = JSON.__to_dict(field_value)
-            elif hasattr(field_value, "to_dict"):
+            if hasattr(field_value, "to_dict"):
                 result[json_name] = field_value.to_dict()
+            elif is_dataclass(field_value):
+                result[json_name] = JSON.__to_dict(field_value)
             elif isinstance(field_value, list):
                 result[json_name] = [
-                    item.to_dict() if hasattr(item, "to_dict") else item
+                    item.to_dict() if hasattr(item, "to_dict")
+                    else JSON.__to_dict(item) if is_dataclass(item)
+                    else item
                     for item in field_value
                 ]
             else:
@@ -68,6 +75,11 @@ class JSON:
     @staticmethod
     def __from_dict(data: Dict[str, Any], target_class: Type[T]) -> T:
         """Create instance from dictionary"""
+        # If target class has custom from_dict method, use it
+        if hasattr(target_class, "from_dict") and 
callable(getattr(target_class, "from_dict")):
+            return target_class.from_dict(data)
+
+        # Otherwise, use dataclass field-by-field deserialization
         # Create field name mapping (json_name -> field_name)
         field_mapping = {}
         type_mapping = {}
diff --git a/paimon-python/pypaimon/schema/data_types.py 
b/paimon-python/pypaimon/schema/data_types.py
index 0fbdd33a84..8a22e7f012 100644
--- a/paimon-python/pypaimon/schema/data_types.py
+++ b/paimon-python/pypaimon/schema/data_types.py
@@ -77,6 +77,10 @@ class AtomicType(DataType):
             return self.type + " NOT NULL"
         return self.type
 
+    @classmethod
+    def from_dict(cls, data: str) -> "AtomicType":
+        return DataTypeParser.parse_data_type(data)
+
     def __str__(self) -> str:
         null_suffix = "" if self.nullable else " NOT NULL"
         return f"{self.type}{null_suffix}"
@@ -97,6 +101,10 @@ class ArrayType(DataType):
             "nullable": self.nullable
         }
 
+    @classmethod
+    def from_dict(cls, data: Dict[str, Any]) -> "ArrayType":
+        return DataTypeParser.parse_data_type(data)
+
     def __str__(self) -> str:
         null_suffix = "" if self.nullable else " NOT NULL"
         return f"ARRAY<{self.element}>{null_suffix}"
@@ -117,6 +125,10 @@ class MultisetType(DataType):
             "nullable": self.nullable,
         }
 
+    @classmethod
+    def from_dict(cls, data: Dict[str, Any]) -> "MultisetType":
+        return DataTypeParser.parse_data_type(data)
+
     def __str__(self) -> str:
         null_suffix = "" if self.nullable else " NOT NULL"
         return f"MULTISET<{self.element}>{null_suffix}"
@@ -144,6 +156,10 @@ class MapType(DataType):
             "nullable": self.nullable,
         }
 
+    @classmethod
+    def from_dict(cls, data: Dict[str, Any]) -> "MapType":
+        return DataTypeParser.parse_data_type(data)
+
     def __str__(self) -> str:
         null_suffix = "" if self.nullable else " NOT NULL"
         return f"MAP<{self.key}, {self.value}>{null_suffix}"
@@ -212,6 +228,10 @@ class RowType(DataType):
             "nullable": self.nullable,
         }
 
+    @classmethod
+    def from_dict(cls, data: Dict[str, Any]) -> "RowType":
+        return DataTypeParser.parse_data_type(data)
+
     def __str__(self) -> str:
         field_strs = [f"{field.name}: {field.type}" for field in self.fields]
         null_suffix = "" if self.nullable else " NOT NULL"
diff --git a/paimon-python/pypaimon/snapshot/snapshot.py 
b/paimon-python/pypaimon/snapshot/snapshot.py
index 500177cf31..cc27c5a530 100644
--- a/paimon-python/pypaimon/snapshot/snapshot.py
+++ b/paimon-python/pypaimon/snapshot/snapshot.py
@@ -16,55 +16,30 @@
 # limitations under the License.
 
################################################################################
 
-import re
-from dataclasses import asdict, dataclass, fields
-from typing import Any, Dict, Optional
+from dataclasses import dataclass
+from typing import Dict, Optional
+
+from pypaimon.common.rest_json import json_field
 
 
 @dataclass
 class Snapshot:
-    version: int
-    id: int
-    schema_id: int
-    base_manifest_list: str
-    delta_manifest_list: str
-    commit_user: str
-    commit_identifier: int
-    commit_kind: str
-    time_millis: int
-    log_offsets: Dict[int, int]
-
-    changelog_manifest_list: Optional[str] = None
-    index_manifest: Optional[str] = None
-    total_record_count: Optional[int] = None
-    delta_record_count: Optional[int] = None
-    changelog_record_count: Optional[int] = None
-    watermark: Optional[int] = None
-    statistics: Optional[str] = None
-
-    @classmethod
-    def from_json(cls, data: Dict[str, Any]):
-        known_fields = {field.name for field in fields(Snapshot)}
-        processed_data = {
-            camel_to_snake(key): value
-            for key, value in data.items()
-            if camel_to_snake(key) in known_fields
-        }
-        return Snapshot(**processed_data)
-
-    def to_json(self) -> Dict[str, Any]:
-        snake_case_dict = asdict(self)
-        return {
-            snake_to_camel(key): value
-            for key, value in snake_case_dict.items()
-        }
-
-
-def camel_to_snake(name: str) -> str:
-    s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
-    return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
-
-
-def snake_to_camel(name: str) -> str:
-    parts = name.split('_')
-    return parts[0] + ''.join(word.capitalize() for word in parts[1:])
+    # Required fields
+    id: int = json_field("id")
+    schema_id: int = json_field("schemaId")
+    base_manifest_list: str = json_field("baseManifestList")
+    delta_manifest_list: str = json_field("deltaManifestList")
+    commit_user: str = json_field("commitUser")
+    commit_identifier: int = json_field("commitIdentifier")
+    commit_kind: str = json_field("commitKind")
+    time_millis: int = json_field("timeMillis")
+    # Optional fields with defaults
+    version: Optional[int] = json_field("version", default=None)
+    log_offsets: Optional[Dict[int, int]] = json_field("logOffsets", 
default_factory=dict)
+    changelog_manifest_list: Optional[str] = 
json_field("changelogManifestList", default=None)
+    index_manifest: Optional[str] = json_field("indexManifest", default=None)
+    total_record_count: Optional[int] = json_field("totalRecordCount", 
default=None)
+    delta_record_count: Optional[int] = json_field("deltaRecordCount", 
default=None)
+    changelog_record_count: Optional[int] = json_field("changelogRecordCount", 
default=None)
+    watermark: Optional[int] = json_field("watermark", default=None)
+    statistics: Optional[str] = json_field("statistics", default=None)
diff --git a/paimon-python/pypaimon/snapshot/snapshot_manager.py 
b/paimon-python/pypaimon/snapshot/snapshot_manager.py
index 6a8a68e73a..f23ad03dae 100644
--- a/paimon-python/pypaimon/snapshot/snapshot_manager.py
+++ b/paimon-python/pypaimon/snapshot/snapshot_manager.py
@@ -15,11 +15,11 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-
-import json
+from pathlib import Path
 from typing import Optional
 
 from pypaimon.common.file_io import FileIO
+from pypaimon.common.rest_json import JSON
 from pypaimon.snapshot.snapshot import Snapshot
 
 
@@ -32,13 +32,13 @@ class SnapshotManager:
         self.table: FileStoreTable = table
         self.file_io: FileIO = self.table.file_io
         self.snapshot_dir = self.table.table_path / "snapshot"
+        self.latest_file = self.snapshot_dir / "LATEST"
 
     def get_latest_snapshot(self) -> Optional[Snapshot]:
-        latest_file = self.snapshot_dir / "LATEST"
-        if not self.file_io.exists(latest_file):
+        if not self.file_io.exists(self.latest_file):
             return None
 
-        latest_content = self.file_io.read_file_utf8(latest_file)
+        latest_content = self.file_io.read_file_utf8(self.latest_file)
         latest_snapshot_id = int(latest_content.strip())
 
         snapshot_file = self.snapshot_dir / f"snapshot-{latest_snapshot_id}"
@@ -46,23 +46,16 @@ class SnapshotManager:
             return None
 
         snapshot_content = self.file_io.read_file_utf8(snapshot_file)
-        snapshot_data = json.loads(snapshot_content)
-        return Snapshot.from_json(snapshot_data)
-
-    def commit_snapshot(self, snapshot_id: int, snapshot_data: Snapshot):
-        snapshot_file = self.snapshot_dir / f"snapshot-{snapshot_id}"
-        latest_file = self.snapshot_dir / "LATEST"
+        return JSON.from_json(snapshot_content, Snapshot)
 
-        try:
-            snapshot_json = json.dumps(snapshot_data.to_json(), indent=2)
-            snapshot_success = self.file_io.try_to_write_atomic(snapshot_file, 
snapshot_json)
-            if not snapshot_success:
-                self.file_io.write_file(snapshot_file, snapshot_json, 
overwrite=True)
+    def get_snapshot_path(self, snapshot_id: int) -> Path:
+        """
+        Get the path for a snapshot file.
 
-            latest_success = self.file_io.try_to_write_atomic(latest_file, 
str(snapshot_id))
-            if not latest_success:
-                self.file_io.write_file(latest_file, str(snapshot_id), 
overwrite=True)
+        Args:
+            snapshot_id: The snapshot ID
 
-        except Exception as e:
-            self.file_io.delete_quietly(snapshot_file)
-            raise RuntimeError(f"Failed to commit snapshot {snapshot_id}: 
{e}") from e
+        Returns:
+            Path to the snapshot file
+        """
+        return self.snapshot_dir / f"snapshot-{snapshot_id}"
diff --git a/paimon-python/pypaimon/table/catalog_environment.py 
b/paimon-python/pypaimon/table/catalog_environment.py
index d3a92ce927..f025b367b3 100644
--- a/paimon-python/pypaimon/table/catalog_environment.py
+++ b/paimon-python/pypaimon/table/catalog_environment.py
@@ -18,6 +18,9 @@ limitations under the License.
 from typing import Optional
 
 from pypaimon.catalog.catalog_loader import CatalogLoader
+from pypaimon.catalog.catalog_snapshot_commit import CatalogSnapshotCommit
+from pypaimon.catalog.renaming_snapshot_commit import RenamingSnapshotCommit
+from pypaimon.catalog.snapshot_commit import SnapshotCommit
 from pypaimon.common.identifier import Identifier
 
 
@@ -34,3 +37,56 @@ class CatalogEnvironment:
         self.uuid = uuid
         self.catalog_loader = catalog_loader
         self.supports_version_management = supports_version_management
+
+    def snapshot_commit(self, snapshot_manager) -> Optional[SnapshotCommit]:
+        """
+        Create a SnapshotCommit instance based on the catalog environment 
configuration.
+
+        Args:
+            snapshot_manager: The SnapshotManager instance
+
+        Returns:
+            SnapshotCommit instance or None
+        """
+        if self.catalog_loader is not None and 
self.supports_version_management:
+            # Use catalog-based snapshot commit when catalog loader is 
available
+            # and version management is supported
+            catalog = self.catalog_loader.load()
+            return CatalogSnapshotCommit(catalog, self.identifier, self.uuid)
+        else:
+            # Use file renaming-based snapshot commit
+            # In a full implementation, this would use a proper lock factory
+            # to create locks based on the catalog lock context
+            return RenamingSnapshotCommit(snapshot_manager)
+
+    def copy(self, identifier: Identifier) -> 'CatalogEnvironment':
+        """
+        Create a copy of this CatalogEnvironment with a different identifier.
+
+        Args:
+            identifier: The new identifier
+
+        Returns:
+            A new CatalogEnvironment instance
+        """
+        return CatalogEnvironment(
+            identifier=identifier,
+            uuid=self.uuid,
+            catalog_loader=self.catalog_loader,
+            supports_version_management=self.supports_version_management
+        )
+
+    @staticmethod
+    def empty() -> 'CatalogEnvironment':
+        """
+        Create an empty CatalogEnvironment with default values.
+
+        Returns:
+            An empty CatalogEnvironment instance
+        """
+        return CatalogEnvironment(
+            identifier=None,
+            uuid=None,
+            catalog_loader=None,
+            supports_version_management=False
+        )
diff --git a/paimon-python/pypaimon/table/file_store_table.py 
b/paimon-python/pypaimon/table/file_store_table.py
index 4fcef0473d..1d131a492d 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -17,6 +17,7 @@
 
################################################################################
 
 from pathlib import Path
+from typing import Optional
 
 from pypaimon.common.core_options import CoreOptions
 from pypaimon.common.file_io import FileIO
@@ -25,6 +26,7 @@ from pypaimon.read.read_builder import ReadBuilder
 from pypaimon.schema.schema_manager import SchemaManager
 from pypaimon.schema.table_schema import TableSchema
 from pypaimon.table.bucket_mode import BucketMode
+from pypaimon.table.catalog_environment import CatalogEnvironment
 from pypaimon.table.table import Table
 from pypaimon.write.batch_write_builder import BatchWriteBuilder
 from pypaimon.write.row_key_extractor import (DynamicBucketRowKeyExtractor,
@@ -36,10 +38,11 @@ from pypaimon.write.row_key_extractor import 
(DynamicBucketRowKeyExtractor,
 
 class FileStoreTable(Table):
     def __init__(self, file_io: FileIO, identifier: Identifier, table_path: 
Path,
-                 table_schema: TableSchema):
+                 table_schema: TableSchema, catalog_environment: 
Optional[CatalogEnvironment] = None):
         self.file_io = file_io
         self.identifier = identifier
         self.table_path = table_path
+        self.catalog_environment = catalog_environment or 
CatalogEnvironment.empty()
 
         self.table_schema = table_schema
         self.fields = table_schema.fields
@@ -51,6 +54,19 @@ class FileStoreTable(Table):
         self.is_primary_key_table = bool(self.primary_keys)
         self.cross_partition_update = 
self.table_schema.cross_partition_update()
 
+    def current_branch(self) -> str:
+        """Get the current branch name from options."""
+        return self.options.get(CoreOptions.BRANCH, "main")
+
+    def snapshot_manager(self):
+        """Get the snapshot manager for this table."""
+        from pypaimon.snapshot.snapshot_manager import SnapshotManager
+        return SnapshotManager(self)
+
+    def new_snapshot_commit(self):
+        """Create a new SnapshotCommit instance using the catalog 
environment."""
+        return 
self.catalog_environment.snapshot_commit(self.snapshot_manager())
+
     def bucket_mode(self) -> BucketMode:
         if self.is_primary_key_table:
             if self.options.get(CoreOptions.BUCKET, -1) == -2:
diff --git a/paimon-python/pypaimon/tests/rest_server.py 
b/paimon-python/pypaimon/tests/rest_server.py
index d5758e8969..d0486ec02c 100644
--- a/paimon-python/pypaimon/tests/rest_server.py
+++ b/paimon-python/pypaimon/tests/rest_server.py
@@ -105,6 +105,8 @@ class RESTCatalogServer:
         # Initialize storage
         self.database_store: Dict[str, GetDatabaseResponse] = {}
         self.table_metadata_store: Dict[str, TableMetadata] = {}
+        self.table_latest_snapshot_store: Dict[str, str] = {}
+        self.table_partitions_store: Dict[str, List] = {}
         self.no_permission_databases: List[str] = []
         self.no_permission_tables: List[str] = []
 
@@ -326,13 +328,39 @@ class RESTCatalogServer:
                                identifier: Identifier, data: str,
                                parameters: Dict[str, str]) -> Tuple[str, int]:
         """Handle table-specific resource requests"""
-        # Check table permissions
-        if identifier.get_full_name() in self.no_permission_tables:
-            raise TableNoPermissionException(identifier)
+        # Extract table name and check for branch information
+        raw_table_name = path_parts[2]
+
+        # Parse table name with potential branch (e.g., "table.main" -> 
"table", branch="main")
+        if '.' in raw_table_name and len(raw_table_name.split('.')) > 1:
+            # This might be a table with branch
+            table_parts = raw_table_name.split('.')
+            if len(table_parts) == 2:
+                table_name_part = table_parts[0]
+                branch_part = table_parts[1]
+                # Recreate identifier without branch for lookup
+                lookup_identifier = 
Identifier.create(identifier.database_name, table_name_part)
+            else:
+                lookup_identifier = identifier
+                branch_part = None
+        else:
+            lookup_identifier = identifier
+            branch_part = None
+
+        # Check table permissions using the base identifier
+        if lookup_identifier.get_full_name() in self.no_permission_tables:
+            raise TableNoPermissionException(lookup_identifier)
 
         if len(path_parts) == 3:
-            # Basic table operations
-            return self._table_handle(method, data, identifier)
+            # Basic table operations (GET, DELETE, etc.)
+            return self._table_handle(method, data, lookup_identifier)
+        elif len(path_parts) == 4:
+            # Extended operations (e.g., commit)
+            operation = path_parts[3]
+            if operation == "commit":
+                return self._table_commit_handle(method, data, 
lookup_identifier, branch_part)
+            else:
+                return self._mock_response(ErrorResponse(None, None, "Not 
Found", 404), 404)
         return self._mock_response(ErrorResponse(None, None, "Not Found", 
404), 404)
 
     def _databases_api_handler(self, method: str, data: str,
@@ -422,6 +450,109 @@ class RESTCatalogServer:
 
         return self._mock_response(ErrorResponse(None, None, "Method Not 
Allowed", 405), 405)
 
+    def _table_commit_handle(self, method: str, data: str, identifier: 
Identifier,
+                             branch: str = None) -> Tuple[str, int]:
+        """Handle table commit operations"""
+        if method != "POST":
+            return self._mock_response(ErrorResponse(None, None, "Method Not 
Allowed", 405), 405)
+
+        # Check if table exists
+        if identifier.get_full_name() not in self.table_metadata_store:
+            raise TableNotExistException(identifier)
+
+        try:
+            # Parse the commit request
+            from pypaimon.api.api_request import CommitTableRequest
+            from pypaimon.api.api_response import CommitTableResponse
+
+            commit_request = JSON.from_json(data, CommitTableRequest)
+
+            # Basic validation
+            if not commit_request.snapshot:
+                return self._mock_response(
+                    ErrorResponse("SNAPSHOT", None, "Snapshot is required for 
commit operation", 400), 400
+                )
+
+            # Write snapshot to file system
+            self._write_snapshot_files(identifier, commit_request.snapshot, 
commit_request.statistics)
+
+            self.logger.info(f"Successfully committed snapshot for table 
{identifier.get_full_name()}, "
+                             f"branch: {branch or 'main'}")
+            self.logger.info(f"Snapshot ID: {commit_request.snapshot.id}")
+            self.logger.info(f"Statistics count: 
{len(commit_request.statistics) if commit_request.statistics else 0}")
+
+            # Create success response
+            response = CommitTableResponse(success=True)
+            return self._mock_response(response, 200)
+
+        except Exception as e:
+            self.logger.error(f"Error in commit operation: {e}")
+            import traceback
+            self.logger.error(f"Traceback: {traceback.format_exc()}")
+            return self._mock_response(
+                ErrorResponse(None, None, f"Commit failed: {str(e)}", 500), 500
+            )
+
+    def _write_snapshot_files(self, identifier: Identifier, snapshot, 
statistics):
+        """Write snapshot and related files to the file system"""
+        import os
+        import json
+        import uuid
+
+        # Construct table path: {warehouse}/{database}/{table}
+        table_path = os.path.join(self.data_path, self.warehouse, 
identifier.database_name, identifier.object_name)
+
+        # Create directory structure
+        snapshot_dir = os.path.join(table_path, "snapshot")
+
+        os.makedirs(snapshot_dir, exist_ok=True)
+
+        # Write snapshot file (snapshot-{id})
+        snapshot_file = os.path.join(snapshot_dir, f"snapshot-{snapshot.id}")
+        snapshot_data = {
+            "version": getattr(snapshot, 'version', 3),
+            "id": snapshot.id,
+            "schemaId": getattr(snapshot, 'schema_id', 0),
+            "baseManifestList": getattr(snapshot, 'base_manifest_list', 
f"manifest-list-{uuid.uuid4()}"),
+            "deltaManifestList": getattr(snapshot, 'delta_manifest_list', 
f"manifest-list-{uuid.uuid4()}"),
+            "commitUser": getattr(snapshot, 'commit_user', 'rest-server'),
+            "commitIdentifier": getattr(snapshot, 'commit_identifier', 1),
+            "commitKind": getattr(snapshot, 'commit_kind', 'APPEND'),
+            "timeMillis": getattr(snapshot, 'time_millis', 1703721600000),
+            "logOffsets": getattr(snapshot, 'log_offsets', {})
+        }
+
+        with open(snapshot_file, 'w') as f:
+            json.dump(snapshot_data, f, indent=2)
+
+        # Write LATEST file
+        latest_file = os.path.join(snapshot_dir, "LATEST")
+        with open(latest_file, 'w') as f:
+            f.write(str(snapshot.id))
+
+        # Create partition directories based on statistics
+        if statistics:
+            for stat in statistics:
+                if hasattr(stat, 'spec') and stat.spec:
+                    # Extract partition information from spec
+                    partition_parts = []
+                    for key, value in stat.spec.items():
+                        partition_parts.append(f"{key}={value}")
+
+                    if partition_parts:
+                        partition_dir = os.path.join(table_path, 
*partition_parts)
+                        os.makedirs(partition_dir, exist_ok=True)
+
+        # If no statistics provided, create default partition directories for 
test
+        if not statistics:
+            # Create default partitions that the test expects
+            default_partitions = ["dt=p1", "dt=p2"]
+            for partition in default_partitions:
+                partition_dir = os.path.join(table_path, partition)
+                os.makedirs(partition_dir, exist_ok=True)
+
+        self.logger.info(f"Created snapshot files at: {snapshot_dir}")
+
     # Utility methods
     def _mock_response(self, response: Union[RESTResponse, str], http_code: 
int) -> Tuple[str, int]:
         """Create mock response"""
diff --git a/paimon-python/pypaimon/tests/test_file_store_commit.py 
b/paimon-python/pypaimon/tests/test_file_store_commit.py
new file mode 100644
index 0000000000..ced0afbf4a
--- /dev/null
+++ b/paimon-python/pypaimon/tests/test_file_store_commit.py
@@ -0,0 +1,401 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import unittest
+from datetime import datetime
+from pathlib import Path
+from unittest.mock import Mock, patch
+
+from pypaimon.catalog.snapshot_commit import PartitionStatistics
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+from pypaimon.write.commit_message import CommitMessage
+from pypaimon.write.file_store_commit import FileStoreCommit
+
+
+@patch('pypaimon.write.file_store_commit.SnapshotManager')
+@patch('pypaimon.write.file_store_commit.ManifestFileManager')
+@patch('pypaimon.write.file_store_commit.ManifestListManager')
+class TestFileStoreCommit(unittest.TestCase):
+    """Test cases for FileStoreCommit class."""
+
+    def setUp(self):
+        """Set up test fixtures."""
+        # Mock table with required attributes
+        self.mock_table = Mock()
+        self.mock_table.partition_keys = ['dt', 'region']
+        self.mock_table.current_branch.return_value = 'main'
+        self.mock_table.table_path = Path('/test/table/path')
+        self.mock_table.file_io = Mock()
+
+        # Mock snapshot commit
+        self.mock_snapshot_commit = Mock()
+
+    def _create_file_store_commit(self):
+        """Helper method to create FileStoreCommit instance."""
+        return FileStoreCommit(
+            snapshot_commit=self.mock_snapshot_commit,
+            table=self.mock_table,
+            commit_user='test_user'
+        )
+
+    def test_generate_partition_statistics_single_partition_single_file(
+            self, mock_manifest_list_manager, mock_manifest_file_manager, 
mock_snapshot_manager):
+        """Test partition statistics generation with single partition and 
single file."""
+        # Create FileStoreCommit instance
+        file_store_commit = self._create_file_store_commit()
+
+        # Create test data
+        creation_time = datetime(2024, 1, 15, 10, 30, 0)
+        file_meta = DataFileMeta(
+            file_name="test_file_1.parquet",
+            file_size=1024 * 1024,  # 1MB
+            row_count=10000,
+            min_key=None,
+            max_key=None,
+            key_stats=None,
+            value_stats=None,
+            min_sequence_number=1,
+            max_sequence_number=100,
+            schema_id=0,
+            level=0,
+            extra_files=None,
+            creation_time=creation_time
+        )
+
+        commit_message = CommitMessage(
+            partition=('2024-01-15', 'us-east-1'),
+            bucket=0,
+            new_files=[file_meta]
+        )
+
+        # Test method
+        statistics = 
file_store_commit._generate_partition_statistics([commit_message])
+
+        # Verify results
+        self.assertEqual(len(statistics), 1)
+
+        stat = statistics[0]
+        self.assertIsInstance(stat, PartitionStatistics)
+        self.assertEqual(stat.spec, {'dt': '2024-01-15', 'region': 
'us-east-1'})
+        self.assertEqual(stat.record_count, 10000)
+        self.assertEqual(stat.file_count, 1)
+        self.assertEqual(stat.file_size_in_bytes, 1024 * 1024)
+        self.assertEqual(stat.last_file_creation_time, 
int(creation_time.timestamp() * 1000))
+
+    def test_generate_partition_statistics_multiple_files_same_partition(
+            self, mock_manifest_list_manager, mock_manifest_file_manager, 
mock_snapshot_manager):
+        """Test partition statistics generation with multiple files in same 
partition."""
+        # Create FileStoreCommit instance
+        file_store_commit = self._create_file_store_commit()
+
+        creation_time_1 = datetime(2024, 1, 15, 10, 30, 0)
+        creation_time_2 = datetime(2024, 1, 15, 11, 30, 0)  # Later time
+
+        file_meta_1 = DataFileMeta(
+            file_name="test_file_1.parquet",
+            file_size=1024 * 1024,  # 1MB
+            row_count=10000,
+            min_key=None,
+            max_key=None,
+            key_stats=None,
+            value_stats=None,
+            min_sequence_number=1,
+            max_sequence_number=100,
+            schema_id=0,
+            level=0,
+            extra_files=None,
+            creation_time=creation_time_1
+        )
+
+        file_meta_2 = DataFileMeta(
+            file_name="test_file_2.parquet",
+            file_size=2 * 1024 * 1024,  # 2MB
+            row_count=15000,
+            min_key=None,
+            max_key=None,
+            key_stats=None,
+            value_stats=None,
+            min_sequence_number=101,
+            max_sequence_number=200,
+            schema_id=0,
+            level=0,
+            extra_files=None,
+            creation_time=creation_time_2
+        )
+
+        commit_message = CommitMessage(
+            partition=('2024-01-15', 'us-east-1'),
+            bucket=0,
+            new_files=[file_meta_1, file_meta_2]
+        )
+
+        # Test method
+        statistics = 
file_store_commit._generate_partition_statistics([commit_message])
+
+        # Verify results
+        self.assertEqual(len(statistics), 1)
+
+        stat = statistics[0]
+        self.assertEqual(stat.spec, {'dt': '2024-01-15', 'region': 
'us-east-1'})
+        self.assertEqual(stat.record_count, 25000)  # 10000 + 15000
+        self.assertEqual(stat.file_count, 2)
+        self.assertEqual(stat.file_size_in_bytes, 3 * 1024 * 1024)  # 1MB + 2MB
+        # Should have the latest creation time
+        self.assertEqual(stat.last_file_creation_time, 
int(creation_time_2.timestamp() * 1000))
+
+    def test_generate_partition_statistics_multiple_partitions(
+            self, mock_manifest_list_manager, mock_manifest_file_manager, 
mock_snapshot_manager):
+        """Test partition statistics generation with multiple different 
partitions."""
+        # Create FileStoreCommit instance
+        file_store_commit = self._create_file_store_commit()
+
+        creation_time = datetime(2024, 1, 15, 10, 30, 0)
+
+        # File for partition 1
+        file_meta_1 = DataFileMeta(
+            file_name="test_file_1.parquet",
+            file_size=1024 * 1024,
+            row_count=10000,
+            min_key=None,
+            max_key=None,
+            key_stats=None,
+            value_stats=None,
+            min_sequence_number=1,
+            max_sequence_number=100,
+            schema_id=0,
+            level=0,
+            extra_files=None,
+            creation_time=creation_time
+        )
+
+        # File for partition 2
+        file_meta_2 = DataFileMeta(
+            file_name="test_file_2.parquet",
+            file_size=2 * 1024 * 1024,
+            row_count=20000,
+            min_key=None,
+            max_key=None,
+            key_stats=None,
+            value_stats=None,
+            min_sequence_number=101,
+            max_sequence_number=200,
+            schema_id=0,
+            level=0,
+            extra_files=None,
+            creation_time=creation_time
+        )
+
+        commit_message_1 = CommitMessage(
+            partition=('2024-01-15', 'us-east-1'),
+            bucket=0,
+            new_files=[file_meta_1]
+        )
+
+        commit_message_2 = CommitMessage(
+            partition=('2024-01-15', 'us-west-2'),
+            bucket=0,
+            new_files=[file_meta_2]
+        )
+
+        # Test method
+        statistics = 
file_store_commit._generate_partition_statistics([commit_message_1, 
commit_message_2])
+
+        # Verify results
+        self.assertEqual(len(statistics), 2)
+
+        # Sort statistics by partition spec for consistent testing
+        statistics.sort(key=lambda s: s.spec['region'])
+
+        # Check first partition (us-east-1)
+        stat_1 = statistics[0]
+        self.assertEqual(stat_1.spec, {'dt': '2024-01-15', 'region': 
'us-east-1'})
+        self.assertEqual(stat_1.record_count, 10000)
+        self.assertEqual(stat_1.file_count, 1)
+        self.assertEqual(stat_1.file_size_in_bytes, 1024 * 1024)
+
+        # Check second partition (us-west-2)
+        stat_2 = statistics[1]
+        self.assertEqual(stat_2.spec, {'dt': '2024-01-15', 'region': 
'us-west-2'})
+        self.assertEqual(stat_2.record_count, 20000)
+        self.assertEqual(stat_2.file_count, 1)
+        self.assertEqual(stat_2.file_size_in_bytes, 2 * 1024 * 1024)
+
+    def test_generate_partition_statistics_unpartitioned_table(
+            self, mock_manifest_list_manager, mock_manifest_file_manager, 
mock_snapshot_manager):
+        """Test partition statistics generation for unpartitioned table."""
+        # Update mock table to have no partition keys
+        self.mock_table.partition_keys = []
+
+        # Create FileStoreCommit instance
+        file_store_commit = self._create_file_store_commit()
+
+        creation_time = datetime(2024, 1, 15, 10, 30, 0)
+        file_meta = DataFileMeta(
+            file_name="test_file_1.parquet",
+            file_size=1024 * 1024,
+            row_count=10000,
+            min_key=None,
+            max_key=None,
+            key_stats=None,
+            value_stats=None,
+            min_sequence_number=1,
+            max_sequence_number=100,
+            schema_id=0,
+            level=0,
+            extra_files=None,
+            creation_time=creation_time
+        )
+
+        commit_message = CommitMessage(
+            partition=(),  # Empty partition for unpartitioned table
+            bucket=0,
+            new_files=[file_meta]
+        )
+
+        # Test method
+        statistics = 
file_store_commit._generate_partition_statistics([commit_message])
+
+        # Verify results
+        self.assertEqual(len(statistics), 1)
+
+        stat = statistics[0]
+        self.assertEqual(stat.spec, {})  # Empty spec for unpartitioned table
+        self.assertEqual(stat.record_count, 10000)
+        self.assertEqual(stat.file_count, 1)
+        self.assertEqual(stat.file_size_in_bytes, 1024 * 1024)
+
+    def test_generate_partition_statistics_no_creation_time(
+            self, mock_manifest_list_manager, mock_manifest_file_manager, 
mock_snapshot_manager):
+        """Test partition statistics generation when file has no creation 
time."""
+        # Create FileStoreCommit instance
+        file_store_commit = self._create_file_store_commit()
+
+        file_meta = DataFileMeta(
+            file_name="test_file_1.parquet",
+            file_size=1024 * 1024,
+            row_count=10000,
+            min_key=None,
+            max_key=None,
+            key_stats=None,
+            value_stats=None,
+            min_sequence_number=1,
+            max_sequence_number=100,
+            schema_id=0,
+            level=0,
+            extra_files=None,
+            creation_time=None  # No creation time
+        )
+
+        commit_message = CommitMessage(
+            partition=('2024-01-15', 'us-east-1'),
+            bucket=0,
+            new_files=[file_meta]
+        )
+
+        # Test method
+        statistics = 
file_store_commit._generate_partition_statistics([commit_message])
+
+        # Verify results
+        self.assertEqual(len(statistics), 1)
+
+        stat = statistics[0]
+        # Should have a valid timestamp (current time)
+        self.assertGreater(stat.last_file_creation_time, 0)
+
+    def test_generate_partition_statistics_mismatched_partition_keys(
+            self, mock_manifest_list_manager, mock_manifest_file_manager, 
mock_snapshot_manager):
+        """Test partition statistics generation when partition tuple doesn't 
match partition keys."""
+        # Create FileStoreCommit instance
+        file_store_commit = self._create_file_store_commit()
+
+        # Table has 2 partition keys but partition tuple has 3 values
+        file_meta = DataFileMeta(
+            file_name="test_file_1.parquet",
+            file_size=1024 * 1024,
+            row_count=10000,
+            min_key=None,
+            max_key=None,
+            key_stats=None,
+            value_stats=None,
+            min_sequence_number=1,
+            max_sequence_number=100,
+            schema_id=0,
+            level=0,
+            extra_files=None,
+            creation_time=datetime(2024, 1, 15, 10, 30, 0)
+        )
+
+        commit_message = CommitMessage(
+            partition=('2024-01-15', 'us-east-1', 'extra-value'),  # 3 values 
but table has 2 keys
+            bucket=0,
+            new_files=[file_meta]
+        )
+
+        # Test method
+        statistics = 
file_store_commit._generate_partition_statistics([commit_message])
+
+        # Verify results - should fallback to index-based naming
+        self.assertEqual(len(statistics), 1)
+
+        stat = statistics[0]
+        expected_spec = {
+            'partition_0': '2024-01-15',
+            'partition_1': 'us-east-1',
+            'partition_2': 'extra-value'
+        }
+        self.assertEqual(stat.spec, expected_spec)
+
+    def test_generate_partition_statistics_empty_commit_messages(
+            self, mock_manifest_list_manager, mock_manifest_file_manager, 
mock_snapshot_manager):
+        """Test partition statistics generation with empty commit messages 
list."""
+        # Create FileStoreCommit instance
+        file_store_commit = self._create_file_store_commit()
+
+        # Test method
+        statistics = file_store_commit._generate_partition_statistics([])
+
+        # Verify results
+        self.assertEqual(len(statistics), 0)
+
+    def test_generate_partition_statistics_commit_message_no_files(
+            self, mock_manifest_list_manager, mock_manifest_file_manager, 
mock_snapshot_manager):
+        """Test partition statistics generation with commit message containing 
no files."""
+        # Create FileStoreCommit instance
+        file_store_commit = self._create_file_store_commit()
+
+        commit_message = CommitMessage(
+            partition=('2024-01-15', 'us-east-1'),
+            bucket=0,
+            new_files=[]  # No files
+        )
+
+        # Test method
+        statistics = 
file_store_commit._generate_partition_statistics([commit_message])
+
+        # Verify results - should still create a partition entry with zero 
counts
+        self.assertEqual(len(statistics), 1)
+
+        stat = statistics[0]
+        self.assertEqual(stat.spec, {'dt': '2024-01-15', 'region': 
'us-east-1'})
+        self.assertEqual(stat.record_count, 0)
+        self.assertEqual(stat.file_count, 0)
+        self.assertEqual(stat.file_size_in_bytes, 0)
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/paimon-python/pypaimon/tests/test_rest_catalog_commit_snapshot.py 
b/paimon-python/pypaimon/tests/test_rest_catalog_commit_snapshot.py
new file mode 100644
index 0000000000..25350ebea0
--- /dev/null
+++ b/paimon-python/pypaimon/tests/test_rest_catalog_commit_snapshot.py
@@ -0,0 +1,221 @@
+#!/usr/bin/env python3
+
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import time
+import unittest
+from unittest.mock import Mock, patch
+
+from pypaimon.api import NoSuchResourceException
+from pypaimon.api.api_response import CommitTableResponse
+from pypaimon.api.options import Options
+from pypaimon.catalog.catalog_context import CatalogContext
+from pypaimon.catalog.catalog_exception import TableNotExistException
+from pypaimon.catalog.rest.rest_catalog import RESTCatalog
+from pypaimon.catalog.snapshot_commit import PartitionStatistics
+from pypaimon.common.identifier import Identifier
+from pypaimon.snapshot.snapshot import Snapshot
+
+
+class TestRESTCatalogCommitSnapshot(unittest.TestCase):
+
+    def setUp(self):
+        """Set up test fixtures."""
+        # Create mock options for testing
+        self.test_options = {
+            "warehouse": "s3://test-bucket/warehouse",
+            "uri": "http://localhost:8080";,
+            "authentication.type": "none"
+        }
+
+        # Create mock CatalogContext
+        self.catalog_context = 
CatalogContext.create_from_options(Options(self.test_options))
+
+        # Create test identifier
+        self.identifier = Identifier.create("test_db", "test_table")
+
+        # Create test snapshot
+        self.test_snapshot = Snapshot(
+            version=3,
+            id=1,
+            schema_id=0,
+            base_manifest_list="manifest-list-1",
+            delta_manifest_list="manifest-list-1",
+            commit_user="test_user",
+            commit_identifier=12345,
+            commit_kind="APPEND",
+            time_millis=int(time.time() * 1000),
+            log_offsets={}
+        )
+
+        # Create test statistics
+        self.test_statistics = [
+            PartitionStatistics.create({"partition": "2024-01-01"}, 1000, 5)
+        ]
+
+    def test_rest_catalog_supports_version_management(self):
+        """Test that RESTCatalog supports version management."""
+        with patch('pypaimon.catalog.rest.rest_catalog.RESTApi') as 
mock_rest_api:
+            # Configure mock
+            mock_api_instance = Mock()
+            mock_api_instance.options = self.test_options
+            mock_rest_api.return_value = mock_api_instance
+
+            # Create RESTCatalog
+            catalog = RESTCatalog(self.catalog_context)
+
+            # Verify supports version management
+            self.assertTrue(catalog.supports_version_management())
+
+    def test_rest_catalog_commit_snapshot_success(self):
+        """Test successful snapshot commit."""
+        with patch('pypaimon.catalog.rest.rest_catalog.RESTApi') as 
mock_rest_api:
+            # Configure mock
+            mock_api_instance = Mock()
+            mock_api_instance.options = self.test_options
+            mock_api_instance.commit_snapshot.return_value = True
+            mock_rest_api.return_value = mock_api_instance
+
+            # Create RESTCatalog
+            catalog = RESTCatalog(self.catalog_context)
+
+            # Test commit snapshot
+            result = catalog.commit_snapshot(
+                self.identifier,
+                "test-uuid",
+                self.test_snapshot,
+                self.test_statistics
+            )
+
+            # Verify result
+            self.assertTrue(result)
+
+            # Verify API was called correctly
+            mock_api_instance.commit_snapshot.assert_called_once_with(
+                self.identifier,
+                "test-uuid",
+                self.test_snapshot,
+                self.test_statistics
+            )
+
+    def test_rest_catalog_commit_snapshot_table_not_exist(self):
+        """Test snapshot commit when table doesn't exist."""
+        with patch('pypaimon.catalog.rest.rest_catalog.RESTApi') as 
mock_rest_api:
+            # Configure mock to raise NoSuchResourceException
+            mock_api_instance = Mock()
+            mock_api_instance.options = self.test_options
+            mock_api_instance.commit_snapshot.side_effect = 
NoSuchResourceException("Table not found")
+            mock_rest_api.return_value = mock_api_instance
+
+            # Create RESTCatalog
+            catalog = RESTCatalog(self.catalog_context)
+
+            # Test commit snapshot with table not exist
+            with self.assertRaises(TableNotExistException):
+                catalog.commit_snapshot(
+                    self.identifier,
+                    "test-uuid",
+                    self.test_snapshot,
+                    self.test_statistics
+                )
+
+    def test_rest_catalog_commit_snapshot_api_error(self):
+        """Test snapshot commit with API error."""
+        with patch('pypaimon.catalog.rest.rest_catalog.RESTApi') as 
mock_rest_api:
+            # Configure mock to raise generic exception
+            mock_api_instance = Mock()
+            mock_api_instance.options = self.test_options
+            mock_api_instance.commit_snapshot.side_effect = RuntimeError("API 
Error")
+            mock_rest_api.return_value = mock_api_instance
+
+            # Create RESTCatalog
+            catalog = RESTCatalog(self.catalog_context)
+
+            # Test commit snapshot with API error
+            with self.assertRaises(RuntimeError) as context:
+                catalog.commit_snapshot(
+                    self.identifier,
+                    "test-uuid",
+                    self.test_snapshot,
+                    self.test_statistics
+                )
+
+            # Verify error message contains table name
+            self.assertIn("test_db.test_table", str(context.exception))
+
+    def test_commit_table_request_creation(self):
+        """Test CommitTableRequest creation."""
+        from pypaimon.api.api_request import CommitTableRequest
+
+        request = CommitTableRequest(
+            table_uuid="test-uuid",
+            snapshot=self.test_snapshot,
+            statistics=self.test_statistics
+        )
+
+        # Verify request fields
+        self.assertEqual(request.table_uuid, "test-uuid")
+        self.assertEqual(request.snapshot, self.test_snapshot)
+        self.assertEqual(request.statistics, self.test_statistics)
+
+    def test_commit_table_response_creation(self):
+        """Test CommitTableResponse creation."""
+        from pypaimon.api.api_response import CommitTableResponse
+
+        # Test successful response
+        success_response = CommitTableResponse(success=True)
+        self.assertTrue(success_response.is_success())
+
+        # Test failed response
+        failed_response = CommitTableResponse(success=False)
+        self.assertFalse(failed_response.is_success())
+
+    def test_rest_api_commit_snapshot(self):
+        """Test RESTApi commit_snapshot method."""
+        from pypaimon.api import RESTApi
+
+        with patch('pypaimon.api.HttpClient') as mock_client_class:
+            # Configure mock
+            mock_client = Mock()
+            mock_response = CommitTableResponse(success=True)
+            mock_client.post_with_response_type.return_value = mock_response
+            mock_client_class.return_value = mock_client
+
+            # Create RESTApi with mocked client
+            with 
patch('pypaimon.api.auth.AuthProviderFactory.create_auth_provider'):
+                api = RESTApi(self.test_options, config_required=False)
+                api.client = mock_client
+
+                # Test commit snapshot
+                result = api.commit_snapshot(
+                    self.identifier,
+                    "test-uuid",
+                    self.test_snapshot,
+                    self.test_statistics
+                )
+
+                # Verify result
+                self.assertTrue(result)
+
+                # Verify client was called correctly
+                mock_client.post_with_response_type.assert_called_once()
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/paimon-python/pypaimon/write/batch_table_commit.py 
b/paimon-python/pypaimon/write/batch_table_commit.py
index 14849e6309..55d8de0905 100644
--- a/paimon-python/pypaimon/write/batch_table_commit.py
+++ b/paimon-python/pypaimon/write/batch_table_commit.py
@@ -32,7 +32,13 @@ class BatchTableCommit:
         self.table: FileStoreTable = table
         self.commit_user = commit_user
         self.overwrite_partition = static_partition
-        self.file_store_commit = FileStoreCommit(table, commit_user)
+
+        # Get SnapshotCommit from table's catalog environment
+        snapshot_commit = table.new_snapshot_commit()
+        if snapshot_commit is None:
+            raise RuntimeError("Table does not provide a SnapshotCommit 
instance")
+
+        self.file_store_commit = FileStoreCommit(snapshot_commit, table, 
commit_user)
         self.batch_committed = False
 
     def commit(self, commit_messages: List[CommitMessage]):
diff --git a/paimon-python/pypaimon/write/file_store_commit.py 
b/paimon-python/pypaimon/write/file_store_commit.py
index 4928167907..2ff7e01fd5 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -20,6 +20,7 @@ import time
 from pathlib import Path
 from typing import List
 
+from pypaimon.catalog.snapshot_commit import PartitionStatistics, 
SnapshotCommit
 from pypaimon.manifest.manifest_file_manager import ManifestFileManager
 from pypaimon.manifest.manifest_list_manager import ManifestListManager
 from pypaimon.snapshot.snapshot import Snapshot
@@ -28,11 +29,17 @@ from pypaimon.write.commit_message import CommitMessage
 
 
 class FileStoreCommit:
-    """Core commit logic for file store operations."""
+    """
+    Core commit logic for file store operations.
 
-    def __init__(self, table, commit_user: str):
+    This class provides atomic commit functionality similar to
+    org.apache.paimon.operation.FileStoreCommitImpl in Java.
+    """
+
+    def __init__(self, snapshot_commit: SnapshotCommit, table, commit_user: 
str):
         from pypaimon.table.file_store_table import FileStoreTable
 
+        self.snapshot_commit = snapshot_commit
         self.table: FileStoreTable = table
         self.commit_user = commit_user
 
@@ -71,9 +78,18 @@ class FileStoreCommit:
             time_millis=int(time.time() * 1000),
             log_offsets={},
         )
-        self.snapshot_manager.commit_snapshot(new_snapshot_id, snapshot_data)
+
+        # Generate partition statistics for the commit
+        statistics = self._generate_partition_statistics(commit_messages)
+
+        # Use SnapshotCommit for atomic commit
+        with self.snapshot_commit:
+            success = self.snapshot_commit.commit(snapshot_data, 
self.table.current_branch(), statistics)
+            if not success:
+                raise RuntimeError(f"Failed to commit snapshot 
{new_snapshot_id}")
 
     def overwrite(self, partition, commit_messages: List[CommitMessage], 
commit_identifier: int):
+        """Commit the given commit messages in overwrite mode."""
         if not commit_messages:
             return
 
@@ -97,7 +113,15 @@ class FileStoreCommit:
             time_millis=int(time.time() * 1000),
             log_offsets={},
         )
-        self.snapshot_manager.commit_snapshot(new_snapshot_id, snapshot_data)
+
+        # Generate partition statistics for the commit
+        statistics = self._generate_partition_statistics(commit_messages)
+
+        # Use SnapshotCommit for atomic commit
+        with self.snapshot_commit:
+            success = self.snapshot_commit.commit(snapshot_data, 
self.table.current_branch(), statistics)
+            if not success:
+                raise RuntimeError(f"Failed to commit snapshot 
{new_snapshot_id}")
 
     def abort(self, commit_messages: List[CommitMessage]):
         for message in commit_messages:
@@ -110,11 +134,103 @@ class FileStoreCommit:
                     print(f"Warning: Failed to clean up file {file.file_path}: 
{e}")
 
     def close(self):
-        pass
+        """Close the FileStoreCommit and release resources."""
+        if hasattr(self.snapshot_commit, 'close'):
+            self.snapshot_commit.close()
 
     def _generate_snapshot_id(self) -> int:
+        """Generate the next snapshot ID."""
         latest_snapshot = self.snapshot_manager.get_latest_snapshot()
         if latest_snapshot:
             return latest_snapshot.id + 1
         else:
             return 1
+
+    def _generate_partition_statistics(self, commit_messages: 
List[CommitMessage]) -> List[PartitionStatistics]:
+        """
+        Generate partition statistics from commit messages.
+
+        This method follows the Java implementation pattern from
+        org.apache.paimon.manifest.PartitionEntry.fromManifestEntry() and
+        PartitionEntry.merge() methods.
+
+        Args:
+            commit_messages: List of commit messages to analyze
+
+        Returns:
+            List of PartitionStatistics for each unique partition
+        """
+        partition_stats = {}
+
+        for message in commit_messages:
+            # Convert partition tuple to dictionary for PartitionStatistics
+            partition_value = message.partition()  # Call the method to get 
partition value
+            if partition_value:
+                # Assuming partition is a tuple and we need to convert it to a 
dict
+                # This may need adjustment based on actual partition format
+                if isinstance(partition_value, tuple):
+                    # Create partition spec from partition tuple and table 
partition keys
+                    partition_spec = {}
+                    if len(partition_value) == len(self.table.partition_keys):
+                        for i, key in enumerate(self.table.partition_keys):
+                            partition_spec[key] = str(partition_value[i])
+                    else:
+                        # Fallback: use indices as keys
+                        for i, value in enumerate(partition_value):
+                            partition_spec[f"partition_{i}"] = str(value)
+                else:
+                    # If partition is already a dict or other format
+                    partition_spec = dict(partition_value) if partition_value 
else {}
+            else:
+                # Default partition for unpartitioned tables
+                partition_spec = {}
+
+            partition_key = tuple(sorted(partition_spec.items()))
+
+            if partition_key not in partition_stats:
+                partition_stats[partition_key] = {
+                    'partition_spec': partition_spec,
+                    'record_count': 0,
+                    'file_count': 0,
+                    'file_size_in_bytes': 0,
+                    'last_file_creation_time': 0
+                }
+
+            # Process each file in the commit message
+            # Following Java implementation: PartitionEntry.fromDataFile()
+            new_files = message.new_files()
+            for file_meta in new_files:
+                # Extract actual file metadata (following Java DataFileMeta 
pattern)
+                record_count = file_meta.row_count
+                file_size_in_bytes = file_meta.file_size
+                file_count = 1
+
+                # Convert creation_time to milliseconds (Java uses epoch 
millis)
+                if file_meta.creation_time:
+                    file_creation_time = 
int(file_meta.creation_time.timestamp() * 1000)
+                else:
+                    file_creation_time = int(time.time() * 1000)
+
+                # Accumulate statistics (following Java PartitionEntry.merge() 
logic)
+                partition_stats[partition_key]['record_count'] += record_count
+                partition_stats[partition_key]['file_size_in_bytes'] += 
file_size_in_bytes
+                partition_stats[partition_key]['file_count'] += file_count
+
+                # Keep the latest creation time
+                partition_stats[partition_key]['last_file_creation_time'] = 
max(
+                    partition_stats[partition_key]['last_file_creation_time'],
+                    file_creation_time
+                )
+
+        # Convert to PartitionStatistics objects
+        # Following Java PartitionEntry.toPartitionStatistics() pattern
+        return [
+            PartitionStatistics.create(
+                partition_spec=stats['partition_spec'],
+                record_count=stats['record_count'],
+                file_count=stats['file_count'],
+                file_size_in_bytes=stats['file_size_in_bytes'],
+                last_file_creation_time=stats['last_file_creation_time']
+            )
+            for stats in partition_stats.values()
+        ]


Reply via email to