This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2d87a4ec34 [core] python: support rest commit (#6079)
2d87a4ec34 is described below
commit 2d87a4ec3401ca85f21f560b1d555a18a7d47ba8
Author: jerry <[email protected]>
AuthorDate: Mon Aug 18 15:14:09 2025 +0800
[core] python: support rest commit (#6079)
---
paimon-python/pypaimon/api/__init__.py | 51 ++-
.../api/{api_resquest.py => api_request.py} | 15 +-
paimon-python/pypaimon/api/api_response.py | 10 +
paimon-python/pypaimon/catalog/catalog.py | 35 +-
.../pypaimon/catalog/catalog_snapshot_commit.py | 77 ++++
.../pypaimon/catalog/filesystem_catalog.py | 26 +-
.../pypaimon/catalog/renaming_snapshot_commit.py | 93 +++++
.../pypaimon/catalog/rest/rest_catalog.py | 61 +++-
.../pypaimon/catalog/rest/rest_catalog_loader.py | 69 ++++
paimon-python/pypaimon/catalog/snapshot_commit.py | 99 +++++
paimon-python/pypaimon/common/predicate.py | 1 +
paimon-python/pypaimon/common/rest_json.py | 20 +-
paimon-python/pypaimon/schema/data_types.py | 20 +
paimon-python/pypaimon/snapshot/snapshot.py | 71 ++--
.../pypaimon/snapshot/snapshot_manager.py | 37 +-
.../pypaimon/table/catalog_environment.py | 56 +++
paimon-python/pypaimon/table/file_store_table.py | 18 +-
paimon-python/pypaimon/tests/rest_server.py | 141 +++++++-
.../pypaimon/tests/test_file_store_commit.py | 401 +++++++++++++++++++++
.../tests/test_rest_catalog_commit_snapshot.py | 221 ++++++++++++
paimon-python/pypaimon/write/batch_table_commit.py | 8 +-
paimon-python/pypaimon/write/file_store_commit.py | 126 ++++++-
22 files changed, 1557 insertions(+), 99 deletions(-)
diff --git a/paimon-python/pypaimon/api/__init__.py
b/paimon-python/pypaimon/api/__init__.py
index 6806733c7a..a384c3f123 100644
--- a/paimon-python/pypaimon/api/__init__.py
+++ b/paimon-python/pypaimon/api/__init__.py
@@ -19,20 +19,24 @@ import logging
from typing import Callable, Dict, List, Optional
from urllib.parse import unquote
-from pypaimon.api.api_response import (ConfigResponse, GetDatabaseResponse,
- GetTableResponse, GetTableTokenResponse,
+from pypaimon.api.api_response import (CommitTableResponse, ConfigResponse,
+ GetDatabaseResponse, GetTableResponse,
+ GetTableTokenResponse,
ListDatabasesResponse,
ListTablesResponse, PagedList,
PagedResponse)
-from pypaimon.api.api_resquest import (AlterDatabaseRequest,
- CreateDatabaseRequest,
- CreateTableRequest, RenameTableRequest)
+from pypaimon.api.api_request import (AlterDatabaseRequest,
+ CommitTableRequest,
+ CreateDatabaseRequest,
+ CreateTableRequest, RenameTableRequest)
from pypaimon.api.auth import AuthProviderFactory, RESTAuthFunction
from pypaimon.api.client import HttpClient
from pypaimon.api.typedef import T
+from pypaimon.catalog.snapshot_commit import PartitionStatistics
from pypaimon.common.config import CatalogOptions
from pypaimon.common.identifier import Identifier
from pypaimon.schema.schema import Schema
+from pypaimon.snapshot.snapshot import Snapshot
class RESTException(Exception):
@@ -119,6 +123,10 @@ class ResourcePaths:
def rename_table(self) -> str:
return f"{self.base_path}/{self.TABLES}/rename"
+ def commit_table(self, database_name: str, table_name: str) -> str:
+ return
(f"{self.base_path}/{self.DATABASES}/{RESTUtil.encode_string(database_name)}"
+ f"/{self.TABLES}/{RESTUtil.encode_string(table_name)}/commit")
+
class RESTApi:
HEADER_PREFIX = "header."
@@ -336,3 +344,36 @@ class RESTApi:
GetTableTokenResponse,
self.rest_auth_function,
)
+
+ def commit_snapshot(
+ self,
+ identifier: Identifier,
+ table_uuid: Optional[str],
+ snapshot: Snapshot,
+ statistics: List[PartitionStatistics]
+ ) -> bool:
+ """
+ Commit snapshot for table.
+
+ Args:
+ identifier: Database name and table name
+ table_uuid: UUID of the table to avoid wrong commit
+ snapshot: Snapshot for committing
+ statistics: Statistics for this snapshot incremental
+
+ Returns:
+ True if commit success
+
+ Raises:
+ NoSuchResourceException: Exception thrown on HTTP 404 means the
table not exists
+ ForbiddenException: Exception thrown on HTTP 403 means don't have
the permission for this table
+ """
+ request = CommitTableRequest(table_uuid, snapshot, statistics)
+ response = self.client.post_with_response_type(
+ self.resource_paths.commit_table(
+ identifier.database_name, identifier.object_name),
+ request,
+ CommitTableResponse,
+ self.rest_auth_function
+ )
+ return response.is_success()
diff --git a/paimon-python/pypaimon/api/api_resquest.py
b/paimon-python/pypaimon/api/api_request.py
similarity index 79%
rename from paimon-python/pypaimon/api/api_resquest.py
rename to paimon-python/pypaimon/api/api_request.py
index 46789870c5..cf22d9853b 100644
--- a/paimon-python/pypaimon/api/api_resquest.py
+++ b/paimon-python/pypaimon/api/api_request.py
@@ -18,11 +18,13 @@ limitations under the License.
from abc import ABC
from dataclasses import dataclass
-from typing import Dict, List
+from typing import Dict, List, Optional
+from pypaimon.catalog.snapshot_commit import PartitionStatistics
from pypaimon.common.identifier import Identifier
from pypaimon.common.rest_json import json_field
from pypaimon.schema.schema import Schema
+from pypaimon.snapshot.snapshot import Snapshot
class RESTRequest(ABC):
@@ -63,3 +65,14 @@ class CreateTableRequest(RESTRequest):
identifier: Identifier = json_field(FIELD_IDENTIFIER)
schema: Schema = json_field(FIELD_SCHEMA)
+
+
+@dataclass
+class CommitTableRequest(RESTRequest):
+ FIELD_TABLE_UUID = "tableUuid"
+ FIELD_SNAPSHOT = "snapshot"
+ FIELD_STATISTICS = "statistics"
+
+ table_uuid: Optional[str] = json_field(FIELD_TABLE_UUID)
+ snapshot: Snapshot = json_field(FIELD_SNAPSHOT)
+ statistics: List[PartitionStatistics] = json_field(FIELD_STATISTICS)
diff --git a/paimon-python/pypaimon/api/api_response.py
b/paimon-python/pypaimon/api/api_response.py
index 17496f047e..5658f5b351 100644
--- a/paimon-python/pypaimon/api/api_response.py
+++ b/paimon-python/pypaimon/api/api_response.py
@@ -258,3 +258,13 @@ class GetTableTokenResponse(RESTResponse):
token: Dict[str, str] = json_field(FIELD_TOKEN, default=None)
expires_at_millis: Optional[int] = json_field(FIELD_EXPIRES_AT_MILLIS,
default=None)
+
+
+@dataclass
+class CommitTableResponse(RESTResponse):
+ FIELD_SUCCESS = "success"
+
+ success: bool = json_field(FIELD_SUCCESS, default=False)
+
+ def is_success(self) -> bool:
+ return self.success
diff --git a/paimon-python/pypaimon/catalog/catalog.py
b/paimon-python/pypaimon/catalog/catalog.py
index 8666f7caa6..c2b1f6915e 100644
--- a/paimon-python/pypaimon/catalog/catalog.py
+++ b/paimon-python/pypaimon/catalog/catalog.py
@@ -17,10 +17,12 @@
#################################################################################
from abc import ABC, abstractmethod
-from typing import Optional, Union
+from typing import List, Optional, Union
+from pypaimon.catalog.snapshot_commit import PartitionStatistics
from pypaimon.common.identifier import Identifier
from pypaimon.schema.schema import Schema
+from pypaimon.snapshot.snapshot import Snapshot
class Catalog(ABC):
@@ -51,3 +53,34 @@ class Catalog(ABC):
@abstractmethod
def create_table(self, identifier: Union[str, Identifier], schema: Schema,
ignore_if_exists: bool):
"""Create table with schema."""
+
+ def supports_version_management(self) -> bool:
+ """
+ Whether this catalog supports version management for tables.
+
+ Returns:
+ True if the catalog supports version management, False otherwise
+ """
+ return False
+
+ @abstractmethod
+ def commit_snapshot(
+ self,
+ identifier: Identifier,
+ table_uuid: Optional[str],
+ snapshot: Snapshot,
+ statistics: List[PartitionStatistics]
+ ) -> bool:
+ """
+ Commit the Snapshot for table identified by the given Identifier.
+
+ Args:
+ identifier: Path of the table
+ table_uuid: UUID of the table to avoid wrong commit
+ snapshot: Snapshot to be committed
+ statistics: Statistics information of this change
+
+ Returns:
+ True if commit was successful, False otherwise
+
+ """
diff --git a/paimon-python/pypaimon/catalog/catalog_snapshot_commit.py
b/paimon-python/pypaimon/catalog/catalog_snapshot_commit.py
new file mode 100644
index 0000000000..cc1ed258f8
--- /dev/null
+++ b/paimon-python/pypaimon/catalog/catalog_snapshot_commit.py
@@ -0,0 +1,77 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from typing import List
+
+from pypaimon.catalog.catalog import Catalog
+from pypaimon.catalog.snapshot_commit import PartitionStatistics,
SnapshotCommit
+from pypaimon.common.identifier import Identifier
+from pypaimon.snapshot.snapshot import Snapshot
+
+
+class CatalogSnapshotCommit(SnapshotCommit):
+ """A SnapshotCommit using Catalog to commit."""
+
+ def __init__(self, catalog: Catalog, identifier: Identifier, uuid: str):
+ """
+ Initialize CatalogSnapshotCommit.
+
+ Args:
+ catalog: The catalog instance to use for committing
+ identifier: The table identifier
+ uuid: Optional table UUID for verification
+ """
+ self.catalog = catalog
+ self.identifier = identifier
+ self.uuid = uuid
+
+ def commit(self, snapshot: Snapshot, branch: str, statistics:
List[PartitionStatistics]) -> bool:
+ """
+ Commit the snapshot using the catalog.
+
+ Args:
+ snapshot: The snapshot to commit
+ branch: The branch name to commit to
+ statistics: List of partition statistics
+
+ Returns:
+ True if commit was successful
+
+ Raises:
+ Exception: If commit fails
+ """
+ new_identifier = Identifier(
+ database_name=self.identifier.get_database_name(),
+ object_name=self.identifier.get_table_name(),
+ branch_name=branch
+ )
+
+ # Call catalog's commit_snapshot method
+ if hasattr(self.catalog, 'commit_snapshot'):
+ return self.catalog.commit_snapshot(new_identifier, self.uuid,
snapshot, statistics)
+ else:
+ # Fallback for catalogs that don't support snapshot commits
+ raise NotImplementedError(
+ "The catalog does not support snapshot commits. "
+ "The commit_snapshot method needs to be implemented in the
catalog interface."
+ )
+
+ def close(self):
+ """Close the catalog and release resources."""
+ if hasattr(self.catalog, 'close'):
+ self.catalog.close()
diff --git a/paimon-python/pypaimon/catalog/filesystem_catalog.py
b/paimon-python/pypaimon/catalog/filesystem_catalog.py
index 6c4dc74623..68d1e438c8 100644
--- a/paimon-python/pypaimon/catalog/filesystem_catalog.py
+++ b/paimon-python/pypaimon/catalog/filesystem_catalog.py
@@ -17,7 +17,7 @@
#################################################################################
from pathlib import Path
-from typing import Optional, Union
+from typing import Optional, Union, List
from urllib.parse import urlparse
from pypaimon.catalog.catalog import Catalog
@@ -26,11 +26,14 @@ from pypaimon.catalog.catalog_exception import
(DatabaseAlreadyExistException,
TableAlreadyExistException,
TableNotExistException)
from pypaimon.catalog.database import Database
+from pypaimon.catalog.snapshot_commit import PartitionStatistics
from pypaimon.common.config import CatalogOptions
from pypaimon.common.core_options import CoreOptions
from pypaimon.common.file_io import FileIO
from pypaimon.common.identifier import Identifier
from pypaimon.schema.schema_manager import SchemaManager
+from pypaimon.snapshot.snapshot import Snapshot
+from pypaimon.table.catalog_environment import CatalogEnvironment
from pypaimon.table.file_store_table import FileStoreTable
from pypaimon.table.table import Table
@@ -67,7 +70,17 @@ class FileSystemCatalog(Catalog):
raise ValueError(CoreOptions.SCAN_FALLBACK_BRANCH)
table_path = self.get_table_path(identifier)
table_schema = self.get_table_schema(identifier)
- return FileStoreTable(self.file_io, identifier, table_path,
table_schema)
+
+ # Create catalog environment for filesystem catalog
+ # Filesystem catalog doesn't support version management by default
+ catalog_environment = CatalogEnvironment(
+ identifier=identifier,
+ uuid=None, # Filesystem catalog doesn't track table UUIDs
+ catalog_loader=None, # No catalog loader for filesystem
+ supports_version_management=False
+ )
+
+ return FileStoreTable(self.file_io, identifier, table_path,
table_schema, catalog_environment)
def create_table(self, identifier: Union[str, Identifier], schema:
'Schema', ignore_if_exists: bool):
if schema.options and schema.options.get(CoreOptions.AUTO_CREATE):
@@ -107,3 +120,12 @@ class FileSystemCatalog(Catalog):
bucket = parsed.netloc
warehouse_dir = parsed.path.lstrip('/')
return Path(f"{bucket}/{warehouse_dir}" if warehouse_dir else bucket)
+
+ def commit_snapshot(
+ self,
+ identifier: Identifier,
+ table_uuid: Optional[str],
+ snapshot: Snapshot,
+ statistics: List[PartitionStatistics]
+ ) -> bool:
+ raise NotImplementedError("This catalog does not support commit
catalog")
diff --git a/paimon-python/pypaimon/catalog/renaming_snapshot_commit.py
b/paimon-python/pypaimon/catalog/renaming_snapshot_commit.py
new file mode 100644
index 0000000000..4a2e70e4e2
--- /dev/null
+++ b/paimon-python/pypaimon/catalog/renaming_snapshot_commit.py
@@ -0,0 +1,93 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from typing import List
+
+from pypaimon.catalog.snapshot_commit import PartitionStatistics,
SnapshotCommit
+from pypaimon.common.file_io import FileIO
+from pypaimon.common.rest_json import JSON
+from pypaimon.snapshot.snapshot import Snapshot
+from pypaimon.snapshot.snapshot_manager import SnapshotManager
+
+
+class RenamingSnapshotCommit(SnapshotCommit):
+ """
+ A SnapshotCommit using file renaming to commit.
+
+ Note that when the file system is local or HDFS, rename is atomic.
+ But if the file system is object storage, we need additional lock
protection.
+ """
+
+ def __init__(self, snapshot_manager: SnapshotManager):
+ """
+ Initialize RenamingSnapshotCommit.
+
+ Args:
+ snapshot_manager: The snapshot manager to use
+ lock: The lock for synchronization
+ """
+ self.snapshot_manager = snapshot_manager
+ self.file_io: FileIO = snapshot_manager.file_io
+
+ def commit(self, snapshot: Snapshot, branch: str, statistics:
List[PartitionStatistics]) -> bool:
+ """
+ Commit the snapshot using file renaming.
+
+ Args:
+ snapshot: The snapshot to commit
+ branch: The branch name to commit to
+ statistics: List of partition statistics (currently unused but
kept for interface compatibility)
+
+ Returns:
+ True if commit was successful, False otherwise
+
+ Raises:
+ Exception: If commit fails
+ """
+ new_snapshot_path =
self.snapshot_manager.get_snapshot_path(snapshot.id)
+ if not self.file_io.exists(new_snapshot_path):
+ """Internal function to perform the actual commit."""
+ # Try to write atomically using the file IO
+ committed = self.file_io.try_to_write_atomic(new_snapshot_path,
JSON.to_json(snapshot))
+ if committed:
+ # Update the latest hint
+ self._commit_latest_hint(snapshot.id)
+ return committed
+ return False
+
+ def close(self):
+ """Close the lock and release resources."""
+
+ def _commit_latest_hint(self, snapshot_id: int):
+ """
+ Update the latest snapshot hint.
+
+ Args:
+ snapshot_id: The latest snapshot ID
+ """
+ latest_file = self.snapshot_manager.latest_file
+ try:
+ # Try atomic write first
+ success = self.file_io.try_to_write_atomic(latest_file,
str(snapshot_id))
+ if not success:
+ # Fallback to regular write
+ self.file_io.write_file(latest_file, str(snapshot_id),
overwrite=True)
+ except Exception as e:
+ # Log the error but don't fail the commit for this
+ # In a production system, you might want to use proper logging
+ print(f"Warning: Failed to update LATEST hint: {e}")
diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
index f597f059f8..f35ee04322 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
@@ -19,20 +19,24 @@ from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Union
from urllib.parse import urlparse
-from pypaimon.api import CatalogOptions, RESTApi
+from pypaimon.api import (CatalogOptions,
+ NoSuchResourceException, RESTApi)
from pypaimon.api.api_response import GetTableResponse, PagedList
from pypaimon.api.options import Options
from pypaimon.catalog.catalog import Catalog
from pypaimon.catalog.catalog_context import CatalogContext
+from pypaimon.catalog.catalog_exception import TableNotExistException
from pypaimon.catalog.database import Database
from pypaimon.catalog.property_change import PropertyChange
from pypaimon.catalog.rest.rest_token_file_io import RESTTokenFileIO
+from pypaimon.catalog.snapshot_commit import PartitionStatistics
from pypaimon.catalog.table_metadata import TableMetadata
from pypaimon.common.core_options import CoreOptions
from pypaimon.common.file_io import FileIO
from pypaimon.common.identifier import Identifier
from pypaimon.schema.schema import Schema
from pypaimon.schema.table_schema import TableSchema
+from pypaimon.snapshot.snapshot import Snapshot
from pypaimon.table.catalog_environment import CatalogEnvironment
from pypaimon.table.file_store_table import FileStoreTable
@@ -45,6 +49,55 @@ class RESTCatalog(Catalog):
context.fallback_io_loader)
self.data_token_enabled =
self.api.options.get(CatalogOptions.DATA_TOKEN_ENABLED)
+ def catalog_loader(self):
+ """
+ Create and return a RESTCatalogLoader for this catalog.
+
+ Returns:
+ A RESTCatalogLoader instance configured with this catalog's context
+ """
+ from pypaimon.catalog.rest.rest_catalog_loader import RESTCatalogLoader
+ return RESTCatalogLoader(self.context)
+
+ def supports_version_management(self) -> bool:
+ """
+ Return whether this catalog supports version management for tables.
+
+ Returns:
+ True since REST catalogs support version management
+ """
+ return True
+
+ def commit_snapshot(
+ self,
+ identifier: Identifier,
+ table_uuid: Optional[str],
+ snapshot: Snapshot,
+ statistics: List[PartitionStatistics]
+ ) -> bool:
+ """
+ Commit the Snapshot for table identified by the given Identifier.
+
+ Args:
+ identifier: Path of the table
+ table_uuid: UUID of the table to avoid wrong commit
+ snapshot: Snapshot to be committed
+ statistics: Statistics information of this change
+
+ Returns:
+ True if commit was successful, False otherwise
+
+ Raises:
+ TableNotExistException: If the target table does not exist
+ """
+ try:
+ return self.api.commit_snapshot(identifier, table_uuid, snapshot,
statistics)
+ except NoSuchResourceException as e:
+ raise TableNotExistException(identifier) from e
+ except Exception as e:
+ # Handle other exceptions that might be thrown by the API
+ raise RuntimeError(f"Failed to commit snapshot for table
{identifier.get_full_name()}: {e}") from e
+
def list_databases(self) -> List[str]:
return self.api.list_databases()
@@ -146,8 +199,8 @@ class RESTCatalog(Catalog):
catalog_env = CatalogEnvironment(
identifier=identifier,
uuid=metadata.uuid,
- catalog_loader=None,
- supports_version_management=False
+ catalog_loader=self.catalog_loader(),
+ supports_version_management=True # REST catalogs support version
management
)
path_parsed = urlparse(schema.options.get(CoreOptions.PATH))
path = Path(path_parsed.path) if path_parsed.scheme is None else
Path(schema.options.get(CoreOptions.PATH))
@@ -164,4 +217,4 @@ class RESTCatalog(Catalog):
catalog_environment: CatalogEnvironment
) -> FileStoreTable:
"""Create FileStoreTable with dynamic options and catalog
environment"""
- return FileStoreTable(file_io, catalog_environment.identifier,
table_path, table_schema)
+ return FileStoreTable(file_io, catalog_environment.identifier,
table_path, table_schema, catalog_environment)
diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog_loader.py
b/paimon-python/pypaimon/catalog/rest/rest_catalog_loader.py
new file mode 100644
index 0000000000..a8e9da1dff
--- /dev/null
+++ b/paimon-python/pypaimon/catalog/rest/rest_catalog_loader.py
@@ -0,0 +1,69 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""
+RESTCatalogLoader implementation for pypaimon.
+
+This module provides the RESTCatalogLoader class which implements the
CatalogLoader
+interface to create and load RESTCatalog instances.
+"""
+
+from pypaimon.catalog.catalog_context import CatalogContext
+from pypaimon.catalog.catalog_loader import CatalogLoader
+from pypaimon.catalog.rest.rest_catalog import RESTCatalog
+
+
+class RESTCatalogLoader(CatalogLoader):
+ """
+ Loader to create RESTCatalog instances.
+
+ This class implements the CatalogLoader interface and is responsible for
+ creating and configuring RESTCatalog instances based on the provided
+ CatalogContext.
+ """
+
+ def __init__(self, context: CatalogContext):
+ """
+ Initialize RESTCatalogLoader with a CatalogContext.
+
+ Args:
+ context: The CatalogContext containing configuration options
+ """
+ self._context = context
+
+ def context(self) -> CatalogContext:
+ """
+ Get the CatalogContext associated with this loader.
+
+ Returns:
+ The CatalogContext instance
+ """
+ return self._context
+
+ def load(self) -> RESTCatalog:
+ """
+ Load and return a new RESTCatalog instance.
+
+ This method creates a new RESTCatalog instance using the stored
+ CatalogContext, with config_required set to False to avoid
+ redundant configuration validation.
+
+ Returns:
+ A new RESTCatalog instance
+ """
+ return RESTCatalog(self._context, config_required=False)
diff --git a/paimon-python/pypaimon/catalog/snapshot_commit.py
b/paimon-python/pypaimon/catalog/snapshot_commit.py
new file mode 100644
index 0000000000..f66b123343
--- /dev/null
+++ b/paimon-python/pypaimon/catalog/snapshot_commit.py
@@ -0,0 +1,99 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from abc import ABC, abstractmethod
+from dataclasses import dataclass
+from typing import Dict, List
+import time
+
+from pypaimon.common.rest_json import json_field
+from pypaimon.snapshot.snapshot import Snapshot
+
+
+@dataclass
+class PartitionStatistics:
+ """
+ Represents partition statistics for snapshot commits.
+
+ This class matches the Java org.apache.paimon.partition.PartitionStatistics
+ structure for proper JSON serialization in REST API calls.
+ """
+
+ spec: Dict[str, str] = json_field("spec", default_factory=dict)
+ record_count: int = json_field("recordCount", default=0)
+ file_size_in_bytes: int = json_field("fileSizeInBytes", default=0)
+ file_count: int = json_field("fileCount", default=0)
+ last_file_creation_time: int = json_field("lastFileCreationTime",
default_factory=lambda: int(time.time() * 1000))
+
+ @classmethod
+ def create(cls, partition_spec: Dict[str, str] = None, record_count: int =
0,
+ file_count: int = 0, file_size_in_bytes: int = 0,
+ last_file_creation_time: int = None) -> 'PartitionStatistics':
+ """
+ Factory method to create PartitionStatistics with backward
compatibility.
+
+ Args:
+ partition_spec: Partition specification dictionary
+ record_count: Number of records
+ file_count: Number of files
+ file_size_in_bytes: Total file size in bytes
+ last_file_creation_time: Last file creation time in milliseconds
+
+ Returns:
+ PartitionStatistics instance
+ """
+ return cls(
+ spec=partition_spec or {},
+ record_count=record_count,
+ file_count=file_count,
+ file_size_in_bytes=file_size_in_bytes,
+ last_file_creation_time=last_file_creation_time or int(time.time()
* 1000)
+ )
+
+
+class SnapshotCommit(ABC):
+ """Interface to commit snapshot atomically."""
+
+ @abstractmethod
+ def commit(self, snapshot: Snapshot, branch: str, statistics:
List[PartitionStatistics]) -> bool:
+ """
+ Commit the given snapshot.
+
+ Args:
+ snapshot: The snapshot to commit
+ branch: The branch name to commit to
+ statistics: List of partition statistics
+
+ Returns:
+ True if commit was successful, False otherwise
+
+ Raises:
+ Exception: If commit fails
+ """
+ pass
+
+ @abstractmethod
+ def close(self):
+ """Close the snapshot commit and release any resources."""
+ pass
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.close()
diff --git a/paimon-python/pypaimon/common/predicate.py
b/paimon-python/pypaimon/common/predicate.py
index 5a46cb419d..a9fefcdef6 100644
--- a/paimon-python/pypaimon/common/predicate.py
+++ b/paimon-python/pypaimon/common/predicate.py
@@ -15,6 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
+from __future__ import annotations
from dataclasses import dataclass
from functools import reduce
diff --git a/paimon-python/pypaimon/common/rest_json.py
b/paimon-python/pypaimon/common/rest_json.py
index aa959e1c53..2f007cd230 100644
--- a/paimon-python/pypaimon/common/rest_json.py
+++ b/paimon-python/pypaimon/common/rest_json.py
@@ -43,6 +43,11 @@ class JSON:
@staticmethod
def __to_dict(obj: Any) -> Dict[str, Any]:
"""Convert to dictionary with custom field names"""
+ # If object has custom to_dict method, use it
+ if hasattr(obj, "to_dict") and callable(getattr(obj, "to_dict")):
+ return obj.to_dict()
+
+ # Otherwise, use dataclass field-by-field serialization
result = {}
for field_info in fields(obj):
field_value = getattr(obj, field_info.name)
@@ -51,13 +56,15 @@ class JSON:
json_name = field_info.metadata.get("json_name", field_info.name)
# Handle nested objects
- if is_dataclass(field_value):
- result[json_name] = JSON.__to_dict(field_value)
- elif hasattr(field_value, "to_dict"):
+ if hasattr(field_value, "to_dict"):
result[json_name] = field_value.to_dict()
+ elif is_dataclass(field_value):
+ result[json_name] = JSON.__to_dict(field_value)
elif isinstance(field_value, list):
result[json_name] = [
- item.to_dict() if hasattr(item, "to_dict") else item
+ item.to_dict() if hasattr(item, "to_dict")
+ else JSON.__to_dict(item) if is_dataclass(item)
+ else item
for item in field_value
]
else:
@@ -68,6 +75,11 @@ class JSON:
@staticmethod
def __from_dict(data: Dict[str, Any], target_class: Type[T]) -> T:
"""Create instance from dictionary"""
+ # If target class has custom from_dict method, use it
+ if hasattr(target_class, "from_dict") and
callable(getattr(target_class, "from_dict")):
+ return target_class.from_dict(data)
+
+ # Otherwise, use dataclass field-by-field deserialization
# Create field name mapping (json_name -> field_name)
field_mapping = {}
type_mapping = {}
diff --git a/paimon-python/pypaimon/schema/data_types.py
b/paimon-python/pypaimon/schema/data_types.py
index 0fbdd33a84..8a22e7f012 100644
--- a/paimon-python/pypaimon/schema/data_types.py
+++ b/paimon-python/pypaimon/schema/data_types.py
@@ -77,6 +77,10 @@ class AtomicType(DataType):
return self.type + " NOT NULL"
return self.type
+ @classmethod
+ def from_dict(cls, data: str) -> "AtomicType":
+ return DataTypeParser.parse_data_type(data)
+
def __str__(self) -> str:
null_suffix = "" if self.nullable else " NOT NULL"
return f"{self.type}{null_suffix}"
@@ -97,6 +101,10 @@ class ArrayType(DataType):
"nullable": self.nullable
}
+ @classmethod
+ def from_dict(cls, data: Dict[str, Any]) -> "ArrayType":
+ return DataTypeParser.parse_data_type(data)
+
def __str__(self) -> str:
null_suffix = "" if self.nullable else " NOT NULL"
return f"ARRAY<{self.element}>{null_suffix}"
@@ -117,6 +125,10 @@ class MultisetType(DataType):
"nullable": self.nullable,
}
+ @classmethod
+ def from_dict(cls, data: Dict[str, Any]) -> "MultisetType":
+ return DataTypeParser.parse_data_type(data)
+
def __str__(self) -> str:
null_suffix = "" if self.nullable else " NOT NULL"
return f"MULTISET<{self.element}>{null_suffix}"
@@ -144,6 +156,10 @@ class MapType(DataType):
"nullable": self.nullable,
}
+ @classmethod
+ def from_dict(cls, data: Dict[str, Any]) -> "MapType":
+ return DataTypeParser.parse_data_type(data)
+
def __str__(self) -> str:
null_suffix = "" if self.nullable else " NOT NULL"
return f"MAP<{self.key}, {self.value}>{null_suffix}"
@@ -212,6 +228,10 @@ class RowType(DataType):
"nullable": self.nullable,
}
+ @classmethod
+ def from_dict(cls, data: Dict[str, Any]) -> "RowType":
+ return DataTypeParser.parse_data_type(data)
+
def __str__(self) -> str:
field_strs = [f"{field.name}: {field.type}" for field in self.fields]
null_suffix = "" if self.nullable else " NOT NULL"
diff --git a/paimon-python/pypaimon/snapshot/snapshot.py
b/paimon-python/pypaimon/snapshot/snapshot.py
index 500177cf31..cc27c5a530 100644
--- a/paimon-python/pypaimon/snapshot/snapshot.py
+++ b/paimon-python/pypaimon/snapshot/snapshot.py
@@ -16,55 +16,30 @@
# limitations under the License.
################################################################################
-import re
-from dataclasses import asdict, dataclass, fields
-from typing import Any, Dict, Optional
+from dataclasses import dataclass
+from typing import Dict, Optional
+
+from pypaimon.common.rest_json import json_field
@dataclass
class Snapshot:
- version: int
- id: int
- schema_id: int
- base_manifest_list: str
- delta_manifest_list: str
- commit_user: str
- commit_identifier: int
- commit_kind: str
- time_millis: int
- log_offsets: Dict[int, int]
-
- changelog_manifest_list: Optional[str] = None
- index_manifest: Optional[str] = None
- total_record_count: Optional[int] = None
- delta_record_count: Optional[int] = None
- changelog_record_count: Optional[int] = None
- watermark: Optional[int] = None
- statistics: Optional[str] = None
-
- @classmethod
- def from_json(cls, data: Dict[str, Any]):
- known_fields = {field.name for field in fields(Snapshot)}
- processed_data = {
- camel_to_snake(key): value
- for key, value in data.items()
- if camel_to_snake(key) in known_fields
- }
- return Snapshot(**processed_data)
-
- def to_json(self) -> Dict[str, Any]:
- snake_case_dict = asdict(self)
- return {
- snake_to_camel(key): value
- for key, value in snake_case_dict.items()
- }
-
-
-def camel_to_snake(name: str) -> str:
- s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
- return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
-
-
-def snake_to_camel(name: str) -> str:
- parts = name.split('_')
- return parts[0] + ''.join(word.capitalize() for word in parts[1:])
+ # Required fields
+ id: int = json_field("id")
+ schema_id: int = json_field("schemaId")
+ base_manifest_list: str = json_field("baseManifestList")
+ delta_manifest_list: str = json_field("deltaManifestList")
+ commit_user: str = json_field("commitUser")
+ commit_identifier: int = json_field("commitIdentifier")
+ commit_kind: str = json_field("commitKind")
+ time_millis: int = json_field("timeMillis")
+ # Optional fields with defaults
+ version: Optional[int] = json_field("version", default=None)
+ log_offsets: Optional[Dict[int, int]] = json_field("logOffsets",
default_factory=dict)
+ changelog_manifest_list: Optional[str] =
json_field("changelogManifestList", default=None)
+ index_manifest: Optional[str] = json_field("indexManifest", default=None)
+ total_record_count: Optional[int] = json_field("totalRecordCount",
default=None)
+ delta_record_count: Optional[int] = json_field("deltaRecordCount",
default=None)
+ changelog_record_count: Optional[int] = json_field("changelogRecordCount",
default=None)
+ watermark: Optional[int] = json_field("watermark", default=None)
+ statistics: Optional[str] = json_field("statistics", default=None)
diff --git a/paimon-python/pypaimon/snapshot/snapshot_manager.py
b/paimon-python/pypaimon/snapshot/snapshot_manager.py
index 6a8a68e73a..f23ad03dae 100644
--- a/paimon-python/pypaimon/snapshot/snapshot_manager.py
+++ b/paimon-python/pypaimon/snapshot/snapshot_manager.py
@@ -15,11 +15,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-
-import json
+from pathlib import Path
from typing import Optional
from pypaimon.common.file_io import FileIO
+from pypaimon.common.rest_json import JSON
from pypaimon.snapshot.snapshot import Snapshot
@@ -32,13 +32,13 @@ class SnapshotManager:
self.table: FileStoreTable = table
self.file_io: FileIO = self.table.file_io
self.snapshot_dir = self.table.table_path / "snapshot"
+ self.latest_file = self.snapshot_dir / "LATEST"
def get_latest_snapshot(self) -> Optional[Snapshot]:
- latest_file = self.snapshot_dir / "LATEST"
- if not self.file_io.exists(latest_file):
+ if not self.file_io.exists(self.latest_file):
return None
- latest_content = self.file_io.read_file_utf8(latest_file)
+ latest_content = self.file_io.read_file_utf8(self.latest_file)
latest_snapshot_id = int(latest_content.strip())
snapshot_file = self.snapshot_dir / f"snapshot-{latest_snapshot_id}"
@@ -46,23 +46,16 @@ class SnapshotManager:
return None
snapshot_content = self.file_io.read_file_utf8(snapshot_file)
- snapshot_data = json.loads(snapshot_content)
- return Snapshot.from_json(snapshot_data)
-
- def commit_snapshot(self, snapshot_id: int, snapshot_data: Snapshot):
- snapshot_file = self.snapshot_dir / f"snapshot-{snapshot_id}"
- latest_file = self.snapshot_dir / "LATEST"
+ return JSON.from_json(snapshot_content, Snapshot)
- try:
- snapshot_json = json.dumps(snapshot_data.to_json(), indent=2)
- snapshot_success = self.file_io.try_to_write_atomic(snapshot_file,
snapshot_json)
- if not snapshot_success:
- self.file_io.write_file(snapshot_file, snapshot_json,
overwrite=True)
+ def get_snapshot_path(self, snapshot_id: int) -> Path:
+ """
+ Get the path for a snapshot file.
- latest_success = self.file_io.try_to_write_atomic(latest_file,
str(snapshot_id))
- if not latest_success:
- self.file_io.write_file(latest_file, str(snapshot_id),
overwrite=True)
+ Args:
+ snapshot_id: The snapshot ID
- except Exception as e:
- self.file_io.delete_quietly(snapshot_file)
- raise RuntimeError(f"Failed to commit snapshot {snapshot_id}:
{e}") from e
+ Returns:
+ Path to the snapshot file
+ """
+ return self.snapshot_dir / f"snapshot-{snapshot_id}"
diff --git a/paimon-python/pypaimon/table/catalog_environment.py
b/paimon-python/pypaimon/table/catalog_environment.py
index d3a92ce927..f025b367b3 100644
--- a/paimon-python/pypaimon/table/catalog_environment.py
+++ b/paimon-python/pypaimon/table/catalog_environment.py
@@ -18,6 +18,9 @@ limitations under the License.
from typing import Optional
from pypaimon.catalog.catalog_loader import CatalogLoader
+from pypaimon.catalog.catalog_snapshot_commit import CatalogSnapshotCommit
+from pypaimon.catalog.renaming_snapshot_commit import RenamingSnapshotCommit
+from pypaimon.catalog.snapshot_commit import SnapshotCommit
from pypaimon.common.identifier import Identifier
@@ -34,3 +37,56 @@ class CatalogEnvironment:
self.uuid = uuid
self.catalog_loader = catalog_loader
self.supports_version_management = supports_version_management
+
+ def snapshot_commit(self, snapshot_manager) -> Optional[SnapshotCommit]:
+ """
+ Create a SnapshotCommit instance based on the catalog environment
configuration.
+
+ Args:
+ snapshot_manager: The SnapshotManager instance
+
+ Returns:
+ SnapshotCommit instance or None
+ """
+ if self.catalog_loader is not None and
self.supports_version_management:
+ # Use catalog-based snapshot commit when catalog loader is
available
+ # and version management is supported
+ catalog = self.catalog_loader.load()
+ return CatalogSnapshotCommit(catalog, self.identifier, self.uuid)
+ else:
+ # Use file renaming-based snapshot commit
+ # In a full implementation, this would use a proper lock factory
+ # to create locks based on the catalog lock context
+ return RenamingSnapshotCommit(snapshot_manager)
+
+ def copy(self, identifier: Identifier) -> 'CatalogEnvironment':
+ """
+ Create a copy of this CatalogEnvironment with a different identifier.
+
+ Args:
+ identifier: The new identifier
+
+ Returns:
+ A new CatalogEnvironment instance
+ """
+ return CatalogEnvironment(
+ identifier=identifier,
+ uuid=self.uuid,
+ catalog_loader=self.catalog_loader,
+ supports_version_management=self.supports_version_management
+ )
+
+ @staticmethod
+ def empty() -> 'CatalogEnvironment':
+ """
+ Create an empty CatalogEnvironment with default values.
+
+ Returns:
+ An empty CatalogEnvironment instance
+ """
+ return CatalogEnvironment(
+ identifier=None,
+ uuid=None,
+ catalog_loader=None,
+ supports_version_management=False
+ )
diff --git a/paimon-python/pypaimon/table/file_store_table.py
b/paimon-python/pypaimon/table/file_store_table.py
index 4fcef0473d..1d131a492d 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -17,6 +17,7 @@
################################################################################
from pathlib import Path
+from typing import Optional
from pypaimon.common.core_options import CoreOptions
from pypaimon.common.file_io import FileIO
@@ -25,6 +26,7 @@ from pypaimon.read.read_builder import ReadBuilder
from pypaimon.schema.schema_manager import SchemaManager
from pypaimon.schema.table_schema import TableSchema
from pypaimon.table.bucket_mode import BucketMode
+from pypaimon.table.catalog_environment import CatalogEnvironment
from pypaimon.table.table import Table
from pypaimon.write.batch_write_builder import BatchWriteBuilder
from pypaimon.write.row_key_extractor import (DynamicBucketRowKeyExtractor,
@@ -36,10 +38,11 @@ from pypaimon.write.row_key_extractor import
(DynamicBucketRowKeyExtractor,
class FileStoreTable(Table):
def __init__(self, file_io: FileIO, identifier: Identifier, table_path:
Path,
- table_schema: TableSchema):
+ table_schema: TableSchema, catalog_environment:
Optional[CatalogEnvironment] = None):
self.file_io = file_io
self.identifier = identifier
self.table_path = table_path
+ self.catalog_environment = catalog_environment or
CatalogEnvironment.empty()
self.table_schema = table_schema
self.fields = table_schema.fields
@@ -51,6 +54,19 @@ class FileStoreTable(Table):
self.is_primary_key_table = bool(self.primary_keys)
self.cross_partition_update =
self.table_schema.cross_partition_update()
+ def current_branch(self) -> str:
+ """Get the current branch name from options."""
+ return self.options.get(CoreOptions.BRANCH, "main")
+
+ def snapshot_manager(self):
+ """Get the snapshot manager for this table."""
+ from pypaimon.snapshot.snapshot_manager import SnapshotManager
+ return SnapshotManager(self)
+
+ def new_snapshot_commit(self):
+ """Create a new SnapshotCommit instance using the catalog
environment."""
+ return
self.catalog_environment.snapshot_commit(self.snapshot_manager())
+
def bucket_mode(self) -> BucketMode:
if self.is_primary_key_table:
if self.options.get(CoreOptions.BUCKET, -1) == -2:
diff --git a/paimon-python/pypaimon/tests/rest_server.py
b/paimon-python/pypaimon/tests/rest_server.py
index d5758e8969..d0486ec02c 100644
--- a/paimon-python/pypaimon/tests/rest_server.py
+++ b/paimon-python/pypaimon/tests/rest_server.py
@@ -105,6 +105,8 @@ class RESTCatalogServer:
# Initialize storage
self.database_store: Dict[str, GetDatabaseResponse] = {}
self.table_metadata_store: Dict[str, TableMetadata] = {}
+ self.table_latest_snapshot_store: Dict[str, str] = {}
+ self.table_partitions_store: Dict[str, List] = {}
self.no_permission_databases: List[str] = []
self.no_permission_tables: List[str] = []
@@ -326,13 +328,39 @@ class RESTCatalogServer:
identifier: Identifier, data: str,
parameters: Dict[str, str]) -> Tuple[str, int]:
"""Handle table-specific resource requests"""
- # Check table permissions
- if identifier.get_full_name() in self.no_permission_tables:
- raise TableNoPermissionException(identifier)
+ # Extract table name and check for branch information
+ raw_table_name = path_parts[2]
+
+ # Parse table name with potential branch (e.g., "table.main" ->
"table", branch="main")
+ if '.' in raw_table_name and len(raw_table_name.split('.')) > 1:
+ # This might be a table with branch
+ table_parts = raw_table_name.split('.')
+ if len(table_parts) == 2:
+ table_name_part = table_parts[0]
+ branch_part = table_parts[1]
+ # Recreate identifier without branch for lookup
+ lookup_identifier =
Identifier.create(identifier.database_name, table_name_part)
+ else:
+ lookup_identifier = identifier
+ branch_part = None
+ else:
+ lookup_identifier = identifier
+ branch_part = None
+
+ # Check table permissions using the base identifier
+ if lookup_identifier.get_full_name() in self.no_permission_tables:
+ raise TableNoPermissionException(lookup_identifier)
if len(path_parts) == 3:
- # Basic table operations
- return self._table_handle(method, data, identifier)
+ # Basic table operations (GET, DELETE, etc.)
+ return self._table_handle(method, data, lookup_identifier)
+ elif len(path_parts) == 4:
+ # Extended operations (e.g., commit)
+ operation = path_parts[3]
+ if operation == "commit":
+ return self._table_commit_handle(method, data,
lookup_identifier, branch_part)
+ else:
+ return self._mock_response(ErrorResponse(None, None, "Not
Found", 404), 404)
return self._mock_response(ErrorResponse(None, None, "Not Found",
404), 404)
def _databases_api_handler(self, method: str, data: str,
@@ -422,6 +450,109 @@ class RESTCatalogServer:
return self._mock_response(ErrorResponse(None, None, "Method Not
Allowed", 405), 405)
+ def _table_commit_handle(self, method: str, data: str, identifier:
Identifier,
+ branch: str = None) -> Tuple[str, int]:
+ """Handle table commit operations"""
+ if method != "POST":
+ return self._mock_response(ErrorResponse(None, None, "Method Not
Allowed", 405), 405)
+
+ # Check if table exists
+ if identifier.get_full_name() not in self.table_metadata_store:
+ raise TableNotExistException(identifier)
+
+ try:
+ # Parse the commit request
+ from pypaimon.api.api_request import CommitTableRequest
+ from pypaimon.api.api_response import CommitTableResponse
+
+ commit_request = JSON.from_json(data, CommitTableRequest)
+
+ # Basic validation
+ if not commit_request.snapshot:
+ return self._mock_response(
+ ErrorResponse("SNAPSHOT", None, "Snapshot is required for
commit operation", 400), 400
+ )
+
+ # Write snapshot to file system
+ self._write_snapshot_files(identifier, commit_request.snapshot,
commit_request.statistics)
+
+ self.logger.info(f"Successfully committed snapshot for table
{identifier.get_full_name()}, "
+ f"branch: {branch or 'main'}")
+ self.logger.info(f"Snapshot ID: {commit_request.snapshot.id}")
+ self.logger.info(f"Statistics count:
{len(commit_request.statistics) if commit_request.statistics else 0}")
+
+ # Create success response
+ response = CommitTableResponse(success=True)
+ return self._mock_response(response, 200)
+
+ except Exception as e:
+ self.logger.error(f"Error in commit operation: {e}")
+ import traceback
+ self.logger.error(f"Traceback: {traceback.format_exc()}")
+ return self._mock_response(
+ ErrorResponse(None, None, f"Commit failed: {str(e)}", 500), 500
+ )
+
+ def _write_snapshot_files(self, identifier: Identifier, snapshot,
statistics):
+ """Write snapshot and related files to the file system"""
+ import os
+ import json
+ import uuid
+
+ # Construct table path: {warehouse}/{database}/{table}
+ table_path = os.path.join(self.data_path, self.warehouse,
identifier.database_name, identifier.object_name)
+
+ # Create directory structure
+ snapshot_dir = os.path.join(table_path, "snapshot")
+
+ os.makedirs(snapshot_dir, exist_ok=True)
+
+ # Write snapshot file (snapshot-{id})
+ snapshot_file = os.path.join(snapshot_dir, f"snapshot-{snapshot.id}")
+ snapshot_data = {
+ "version": getattr(snapshot, 'version', 3),
+ "id": snapshot.id,
+ "schemaId": getattr(snapshot, 'schema_id', 0),
+ "baseManifestList": getattr(snapshot, 'base_manifest_list',
f"manifest-list-{uuid.uuid4()}"),
+ "deltaManifestList": getattr(snapshot, 'delta_manifest_list',
f"manifest-list-{uuid.uuid4()}"),
+ "commitUser": getattr(snapshot, 'commit_user', 'rest-server'),
+ "commitIdentifier": getattr(snapshot, 'commit_identifier', 1),
+ "commitKind": getattr(snapshot, 'commit_kind', 'APPEND'),
+ "timeMillis": getattr(snapshot, 'time_millis', 1703721600000),
+ "logOffsets": getattr(snapshot, 'log_offsets', {})
+ }
+
+ with open(snapshot_file, 'w') as f:
+ json.dump(snapshot_data, f, indent=2)
+
+ # Write LATEST file
+ latest_file = os.path.join(snapshot_dir, "LATEST")
+ with open(latest_file, 'w') as f:
+ f.write(str(snapshot.id))
+
+ # Create partition directories based on statistics
+ if statistics:
+ for stat in statistics:
+ if hasattr(stat, 'spec') and stat.spec:
+ # Extract partition information from spec
+ partition_parts = []
+ for key, value in stat.spec.items():
+ partition_parts.append(f"{key}={value}")
+
+ if partition_parts:
+ partition_dir = os.path.join(table_path,
*partition_parts)
+ os.makedirs(partition_dir, exist_ok=True)
+
+ # If no statistics provided, create default partition directories for
test
+ if not statistics:
+ # Create default partitions that the test expects
+ default_partitions = ["dt=p1", "dt=p2"]
+ for partition in default_partitions:
+ partition_dir = os.path.join(table_path, partition)
+ os.makedirs(partition_dir, exist_ok=True)
+
+ self.logger.info(f"Created snapshot files at: {snapshot_dir}")
+
# Utility methods
def _mock_response(self, response: Union[RESTResponse, str], http_code:
int) -> Tuple[str, int]:
"""Create mock response"""
diff --git a/paimon-python/pypaimon/tests/test_file_store_commit.py
b/paimon-python/pypaimon/tests/test_file_store_commit.py
new file mode 100644
index 0000000000..ced0afbf4a
--- /dev/null
+++ b/paimon-python/pypaimon/tests/test_file_store_commit.py
@@ -0,0 +1,401 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import unittest
+from datetime import datetime
+from pathlib import Path
+from unittest.mock import Mock, patch
+
+from pypaimon.catalog.snapshot_commit import PartitionStatistics
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+from pypaimon.write.commit_message import CommitMessage
+from pypaimon.write.file_store_commit import FileStoreCommit
+
+
+@patch('pypaimon.write.file_store_commit.SnapshotManager')
+@patch('pypaimon.write.file_store_commit.ManifestFileManager')
+@patch('pypaimon.write.file_store_commit.ManifestListManager')
+class TestFileStoreCommit(unittest.TestCase):
+ """Test cases for FileStoreCommit class."""
+
+ def setUp(self):
+ """Set up test fixtures."""
+ # Mock table with required attributes
+ self.mock_table = Mock()
+ self.mock_table.partition_keys = ['dt', 'region']
+ self.mock_table.current_branch.return_value = 'main'
+ self.mock_table.table_path = Path('/test/table/path')
+ self.mock_table.file_io = Mock()
+
+ # Mock snapshot commit
+ self.mock_snapshot_commit = Mock()
+
+ def _create_file_store_commit(self):
+ """Helper method to create FileStoreCommit instance."""
+ return FileStoreCommit(
+ snapshot_commit=self.mock_snapshot_commit,
+ table=self.mock_table,
+ commit_user='test_user'
+ )
+
+ def test_generate_partition_statistics_single_partition_single_file(
+ self, mock_manifest_list_manager, mock_manifest_file_manager,
mock_snapshot_manager):
+ """Test partition statistics generation with single partition and
single file."""
+ # Create FileStoreCommit instance
+ file_store_commit = self._create_file_store_commit()
+
+ # Create test data
+ creation_time = datetime(2024, 1, 15, 10, 30, 0)
+ file_meta = DataFileMeta(
+ file_name="test_file_1.parquet",
+ file_size=1024 * 1024, # 1MB
+ row_count=10000,
+ min_key=None,
+ max_key=None,
+ key_stats=None,
+ value_stats=None,
+ min_sequence_number=1,
+ max_sequence_number=100,
+ schema_id=0,
+ level=0,
+ extra_files=None,
+ creation_time=creation_time
+ )
+
+ commit_message = CommitMessage(
+ partition=('2024-01-15', 'us-east-1'),
+ bucket=0,
+ new_files=[file_meta]
+ )
+
+ # Test method
+ statistics =
file_store_commit._generate_partition_statistics([commit_message])
+
+ # Verify results
+ self.assertEqual(len(statistics), 1)
+
+ stat = statistics[0]
+ self.assertIsInstance(stat, PartitionStatistics)
+ self.assertEqual(stat.spec, {'dt': '2024-01-15', 'region':
'us-east-1'})
+ self.assertEqual(stat.record_count, 10000)
+ self.assertEqual(stat.file_count, 1)
+ self.assertEqual(stat.file_size_in_bytes, 1024 * 1024)
+ self.assertEqual(stat.last_file_creation_time,
int(creation_time.timestamp() * 1000))
+
+ def test_generate_partition_statistics_multiple_files_same_partition(
+ self, mock_manifest_list_manager, mock_manifest_file_manager,
mock_snapshot_manager):
+ """Test partition statistics generation with multiple files in same
partition."""
+ # Create FileStoreCommit instance
+ file_store_commit = self._create_file_store_commit()
+
+ creation_time_1 = datetime(2024, 1, 15, 10, 30, 0)
+ creation_time_2 = datetime(2024, 1, 15, 11, 30, 0) # Later time
+
+ file_meta_1 = DataFileMeta(
+ file_name="test_file_1.parquet",
+ file_size=1024 * 1024, # 1MB
+ row_count=10000,
+ min_key=None,
+ max_key=None,
+ key_stats=None,
+ value_stats=None,
+ min_sequence_number=1,
+ max_sequence_number=100,
+ schema_id=0,
+ level=0,
+ extra_files=None,
+ creation_time=creation_time_1
+ )
+
+ file_meta_2 = DataFileMeta(
+ file_name="test_file_2.parquet",
+ file_size=2 * 1024 * 1024, # 2MB
+ row_count=15000,
+ min_key=None,
+ max_key=None,
+ key_stats=None,
+ value_stats=None,
+ min_sequence_number=101,
+ max_sequence_number=200,
+ schema_id=0,
+ level=0,
+ extra_files=None,
+ creation_time=creation_time_2
+ )
+
+ commit_message = CommitMessage(
+ partition=('2024-01-15', 'us-east-1'),
+ bucket=0,
+ new_files=[file_meta_1, file_meta_2]
+ )
+
+ # Test method
+ statistics =
file_store_commit._generate_partition_statistics([commit_message])
+
+ # Verify results
+ self.assertEqual(len(statistics), 1)
+
+ stat = statistics[0]
+ self.assertEqual(stat.spec, {'dt': '2024-01-15', 'region':
'us-east-1'})
+ self.assertEqual(stat.record_count, 25000) # 10000 + 15000
+ self.assertEqual(stat.file_count, 2)
+ self.assertEqual(stat.file_size_in_bytes, 3 * 1024 * 1024) # 1MB + 2MB
+ # Should have the latest creation time
+ self.assertEqual(stat.last_file_creation_time,
int(creation_time_2.timestamp() * 1000))
+
+ def test_generate_partition_statistics_multiple_partitions(
+ self, mock_manifest_list_manager, mock_manifest_file_manager,
mock_snapshot_manager):
+ """Test partition statistics generation with multiple different
partitions."""
+ # Create FileStoreCommit instance
+ file_store_commit = self._create_file_store_commit()
+
+ creation_time = datetime(2024, 1, 15, 10, 30, 0)
+
+ # File for partition 1
+ file_meta_1 = DataFileMeta(
+ file_name="test_file_1.parquet",
+ file_size=1024 * 1024,
+ row_count=10000,
+ min_key=None,
+ max_key=None,
+ key_stats=None,
+ value_stats=None,
+ min_sequence_number=1,
+ max_sequence_number=100,
+ schema_id=0,
+ level=0,
+ extra_files=None,
+ creation_time=creation_time
+ )
+
+ # File for partition 2
+ file_meta_2 = DataFileMeta(
+ file_name="test_file_2.parquet",
+ file_size=2 * 1024 * 1024,
+ row_count=20000,
+ min_key=None,
+ max_key=None,
+ key_stats=None,
+ value_stats=None,
+ min_sequence_number=101,
+ max_sequence_number=200,
+ schema_id=0,
+ level=0,
+ extra_files=None,
+ creation_time=creation_time
+ )
+
+ commit_message_1 = CommitMessage(
+ partition=('2024-01-15', 'us-east-1'),
+ bucket=0,
+ new_files=[file_meta_1]
+ )
+
+ commit_message_2 = CommitMessage(
+ partition=('2024-01-15', 'us-west-2'),
+ bucket=0,
+ new_files=[file_meta_2]
+ )
+
+ # Test method
+ statistics =
file_store_commit._generate_partition_statistics([commit_message_1,
commit_message_2])
+
+ # Verify results
+ self.assertEqual(len(statistics), 2)
+
+ # Sort statistics by partition spec for consistent testing
+ statistics.sort(key=lambda s: s.spec['region'])
+
+ # Check first partition (us-east-1)
+ stat_1 = statistics[0]
+ self.assertEqual(stat_1.spec, {'dt': '2024-01-15', 'region':
'us-east-1'})
+ self.assertEqual(stat_1.record_count, 10000)
+ self.assertEqual(stat_1.file_count, 1)
+ self.assertEqual(stat_1.file_size_in_bytes, 1024 * 1024)
+
+ # Check second partition (us-west-2)
+ stat_2 = statistics[1]
+ self.assertEqual(stat_2.spec, {'dt': '2024-01-15', 'region':
'us-west-2'})
+ self.assertEqual(stat_2.record_count, 20000)
+ self.assertEqual(stat_2.file_count, 1)
+ self.assertEqual(stat_2.file_size_in_bytes, 2 * 1024 * 1024)
+
+ def test_generate_partition_statistics_unpartitioned_table(
+ self, mock_manifest_list_manager, mock_manifest_file_manager,
mock_snapshot_manager):
+ """Test partition statistics generation for unpartitioned table."""
+ # Update mock table to have no partition keys
+ self.mock_table.partition_keys = []
+
+ # Create FileStoreCommit instance
+ file_store_commit = self._create_file_store_commit()
+
+ creation_time = datetime(2024, 1, 15, 10, 30, 0)
+ file_meta = DataFileMeta(
+ file_name="test_file_1.parquet",
+ file_size=1024 * 1024,
+ row_count=10000,
+ min_key=None,
+ max_key=None,
+ key_stats=None,
+ value_stats=None,
+ min_sequence_number=1,
+ max_sequence_number=100,
+ schema_id=0,
+ level=0,
+ extra_files=None,
+ creation_time=creation_time
+ )
+
+ commit_message = CommitMessage(
+ partition=(), # Empty partition for unpartitioned table
+ bucket=0,
+ new_files=[file_meta]
+ )
+
+ # Test method
+ statistics =
file_store_commit._generate_partition_statistics([commit_message])
+
+ # Verify results
+ self.assertEqual(len(statistics), 1)
+
+ stat = statistics[0]
+ self.assertEqual(stat.spec, {}) # Empty spec for unpartitioned table
+ self.assertEqual(stat.record_count, 10000)
+ self.assertEqual(stat.file_count, 1)
+ self.assertEqual(stat.file_size_in_bytes, 1024 * 1024)
+
+ def test_generate_partition_statistics_no_creation_time(
+ self, mock_manifest_list_manager, mock_manifest_file_manager,
mock_snapshot_manager):
+ """Test partition statistics generation when file has no creation
time."""
+ # Create FileStoreCommit instance
+ file_store_commit = self._create_file_store_commit()
+
+ file_meta = DataFileMeta(
+ file_name="test_file_1.parquet",
+ file_size=1024 * 1024,
+ row_count=10000,
+ min_key=None,
+ max_key=None,
+ key_stats=None,
+ value_stats=None,
+ min_sequence_number=1,
+ max_sequence_number=100,
+ schema_id=0,
+ level=0,
+ extra_files=None,
+ creation_time=None # No creation time
+ )
+
+ commit_message = CommitMessage(
+ partition=('2024-01-15', 'us-east-1'),
+ bucket=0,
+ new_files=[file_meta]
+ )
+
+ # Test method
+ statistics =
file_store_commit._generate_partition_statistics([commit_message])
+
+ # Verify results
+ self.assertEqual(len(statistics), 1)
+
+ stat = statistics[0]
+ # Should have a valid timestamp (current time)
+ self.assertGreater(stat.last_file_creation_time, 0)
+
+ def test_generate_partition_statistics_mismatched_partition_keys(
+ self, mock_manifest_list_manager, mock_manifest_file_manager,
mock_snapshot_manager):
+ """Test partition statistics generation when partition tuple doesn't
match partition keys."""
+ # Create FileStoreCommit instance
+ file_store_commit = self._create_file_store_commit()
+
+ # Table has 2 partition keys but partition tuple has 3 values
+ file_meta = DataFileMeta(
+ file_name="test_file_1.parquet",
+ file_size=1024 * 1024,
+ row_count=10000,
+ min_key=None,
+ max_key=None,
+ key_stats=None,
+ value_stats=None,
+ min_sequence_number=1,
+ max_sequence_number=100,
+ schema_id=0,
+ level=0,
+ extra_files=None,
+ creation_time=datetime(2024, 1, 15, 10, 30, 0)
+ )
+
+ commit_message = CommitMessage(
+ partition=('2024-01-15', 'us-east-1', 'extra-value'), # 3 values
but table has 2 keys
+ bucket=0,
+ new_files=[file_meta]
+ )
+
+ # Test method
+ statistics =
file_store_commit._generate_partition_statistics([commit_message])
+
+ # Verify results - should fallback to index-based naming
+ self.assertEqual(len(statistics), 1)
+
+ stat = statistics[0]
+ expected_spec = {
+ 'partition_0': '2024-01-15',
+ 'partition_1': 'us-east-1',
+ 'partition_2': 'extra-value'
+ }
+ self.assertEqual(stat.spec, expected_spec)
+
+ def test_generate_partition_statistics_empty_commit_messages(
+ self, mock_manifest_list_manager, mock_manifest_file_manager,
mock_snapshot_manager):
+ """Test partition statistics generation with empty commit messages
list."""
+ # Create FileStoreCommit instance
+ file_store_commit = self._create_file_store_commit()
+
+ # Test method
+ statistics = file_store_commit._generate_partition_statistics([])
+
+ # Verify results
+ self.assertEqual(len(statistics), 0)
+
+ def test_generate_partition_statistics_commit_message_no_files(
+ self, mock_manifest_list_manager, mock_manifest_file_manager,
mock_snapshot_manager):
+ """Test partition statistics generation with commit message containing
no files."""
+ # Create FileStoreCommit instance
+ file_store_commit = self._create_file_store_commit()
+
+ commit_message = CommitMessage(
+ partition=('2024-01-15', 'us-east-1'),
+ bucket=0,
+ new_files=[] # No files
+ )
+
+ # Test method
+ statistics =
file_store_commit._generate_partition_statistics([commit_message])
+
+ # Verify results - should still create a partition entry with zero
counts
+ self.assertEqual(len(statistics), 1)
+
+ stat = statistics[0]
+ self.assertEqual(stat.spec, {'dt': '2024-01-15', 'region':
'us-east-1'})
+ self.assertEqual(stat.record_count, 0)
+ self.assertEqual(stat.file_count, 0)
+ self.assertEqual(stat.file_size_in_bytes, 0)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/paimon-python/pypaimon/tests/test_rest_catalog_commit_snapshot.py
b/paimon-python/pypaimon/tests/test_rest_catalog_commit_snapshot.py
new file mode 100644
index 0000000000..25350ebea0
--- /dev/null
+++ b/paimon-python/pypaimon/tests/test_rest_catalog_commit_snapshot.py
@@ -0,0 +1,221 @@
+#!/usr/bin/env python3
+
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import time
+import unittest
+from unittest.mock import Mock, patch
+
+from pypaimon.api import NoSuchResourceException
+from pypaimon.api.api_response import CommitTableResponse
+from pypaimon.api.options import Options
+from pypaimon.catalog.catalog_context import CatalogContext
+from pypaimon.catalog.catalog_exception import TableNotExistException
+from pypaimon.catalog.rest.rest_catalog import RESTCatalog
+from pypaimon.catalog.snapshot_commit import PartitionStatistics
+from pypaimon.common.identifier import Identifier
+from pypaimon.snapshot.snapshot import Snapshot
+
+
+class TestRESTCatalogCommitSnapshot(unittest.TestCase):
+
+ def setUp(self):
+ """Set up test fixtures."""
+ # Create mock options for testing
+ self.test_options = {
+ "warehouse": "s3://test-bucket/warehouse",
+ "uri": "http://localhost:8080",
+ "authentication.type": "none"
+ }
+
+ # Create mock CatalogContext
+ self.catalog_context =
CatalogContext.create_from_options(Options(self.test_options))
+
+ # Create test identifier
+ self.identifier = Identifier.create("test_db", "test_table")
+
+ # Create test snapshot
+ self.test_snapshot = Snapshot(
+ version=3,
+ id=1,
+ schema_id=0,
+ base_manifest_list="manifest-list-1",
+ delta_manifest_list="manifest-list-1",
+ commit_user="test_user",
+ commit_identifier=12345,
+ commit_kind="APPEND",
+ time_millis=int(time.time() * 1000),
+ log_offsets={}
+ )
+
+ # Create test statistics
+ self.test_statistics = [
+ PartitionStatistics.create({"partition": "2024-01-01"}, 1000, 5)
+ ]
+
+ def test_rest_catalog_supports_version_management(self):
+ """Test that RESTCatalog supports version management."""
+ with patch('pypaimon.catalog.rest.rest_catalog.RESTApi') as
mock_rest_api:
+ # Configure mock
+ mock_api_instance = Mock()
+ mock_api_instance.options = self.test_options
+ mock_rest_api.return_value = mock_api_instance
+
+ # Create RESTCatalog
+ catalog = RESTCatalog(self.catalog_context)
+
+ # Verify supports version management
+ self.assertTrue(catalog.supports_version_management())
+
+ def test_rest_catalog_commit_snapshot_success(self):
+ """Test successful snapshot commit."""
+ with patch('pypaimon.catalog.rest.rest_catalog.RESTApi') as
mock_rest_api:
+ # Configure mock
+ mock_api_instance = Mock()
+ mock_api_instance.options = self.test_options
+ mock_api_instance.commit_snapshot.return_value = True
+ mock_rest_api.return_value = mock_api_instance
+
+ # Create RESTCatalog
+ catalog = RESTCatalog(self.catalog_context)
+
+ # Test commit snapshot
+ result = catalog.commit_snapshot(
+ self.identifier,
+ "test-uuid",
+ self.test_snapshot,
+ self.test_statistics
+ )
+
+ # Verify result
+ self.assertTrue(result)
+
+ # Verify API was called correctly
+ mock_api_instance.commit_snapshot.assert_called_once_with(
+ self.identifier,
+ "test-uuid",
+ self.test_snapshot,
+ self.test_statistics
+ )
+
+ def test_rest_catalog_commit_snapshot_table_not_exist(self):
+ """Test snapshot commit when table doesn't exist."""
+ with patch('pypaimon.catalog.rest.rest_catalog.RESTApi') as
mock_rest_api:
+ # Configure mock to raise NoSuchResourceException
+ mock_api_instance = Mock()
+ mock_api_instance.options = self.test_options
+ mock_api_instance.commit_snapshot.side_effect =
NoSuchResourceException("Table not found")
+ mock_rest_api.return_value = mock_api_instance
+
+ # Create RESTCatalog
+ catalog = RESTCatalog(self.catalog_context)
+
+ # Test commit snapshot with table not exist
+ with self.assertRaises(TableNotExistException):
+ catalog.commit_snapshot(
+ self.identifier,
+ "test-uuid",
+ self.test_snapshot,
+ self.test_statistics
+ )
+
+ def test_rest_catalog_commit_snapshot_api_error(self):
+ """Test snapshot commit with API error."""
+ with patch('pypaimon.catalog.rest.rest_catalog.RESTApi') as
mock_rest_api:
+ # Configure mock to raise generic exception
+ mock_api_instance = Mock()
+ mock_api_instance.options = self.test_options
+ mock_api_instance.commit_snapshot.side_effect = RuntimeError("API
Error")
+ mock_rest_api.return_value = mock_api_instance
+
+ # Create RESTCatalog
+ catalog = RESTCatalog(self.catalog_context)
+
+ # Test commit snapshot with API error
+ with self.assertRaises(RuntimeError) as context:
+ catalog.commit_snapshot(
+ self.identifier,
+ "test-uuid",
+ self.test_snapshot,
+ self.test_statistics
+ )
+
+ # Verify error message contains table name
+ self.assertIn("test_db.test_table", str(context.exception))
+
+ def test_commit_table_request_creation(self):
+ """Test CommitTableRequest creation."""
+ from pypaimon.api.api_request import CommitTableRequest
+
+ request = CommitTableRequest(
+ table_uuid="test-uuid",
+ snapshot=self.test_snapshot,
+ statistics=self.test_statistics
+ )
+
+ # Verify request fields
+ self.assertEqual(request.table_uuid, "test-uuid")
+ self.assertEqual(request.snapshot, self.test_snapshot)
+ self.assertEqual(request.statistics, self.test_statistics)
+
+ def test_commit_table_response_creation(self):
+ """Test CommitTableResponse creation."""
+ from pypaimon.api.api_response import CommitTableResponse
+
+ # Test successful response
+ success_response = CommitTableResponse(success=True)
+ self.assertTrue(success_response.is_success())
+
+ # Test failed response
+ failed_response = CommitTableResponse(success=False)
+ self.assertFalse(failed_response.is_success())
+
+ def test_rest_api_commit_snapshot(self):
+ """Test RESTApi commit_snapshot method."""
+ from pypaimon.api import RESTApi
+
+ with patch('pypaimon.api.HttpClient') as mock_client_class:
+ # Configure mock
+ mock_client = Mock()
+ mock_response = CommitTableResponse(success=True)
+ mock_client.post_with_response_type.return_value = mock_response
+ mock_client_class.return_value = mock_client
+
+ # Create RESTApi with mocked client
+ with
patch('pypaimon.api.auth.AuthProviderFactory.create_auth_provider'):
+ api = RESTApi(self.test_options, config_required=False)
+ api.client = mock_client
+
+ # Test commit snapshot
+ result = api.commit_snapshot(
+ self.identifier,
+ "test-uuid",
+ self.test_snapshot,
+ self.test_statistics
+ )
+
+ # Verify result
+ self.assertTrue(result)
+
+ # Verify client was called correctly
+ mock_client.post_with_response_type.assert_called_once()
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/paimon-python/pypaimon/write/batch_table_commit.py
b/paimon-python/pypaimon/write/batch_table_commit.py
index 14849e6309..55d8de0905 100644
--- a/paimon-python/pypaimon/write/batch_table_commit.py
+++ b/paimon-python/pypaimon/write/batch_table_commit.py
@@ -32,7 +32,13 @@ class BatchTableCommit:
self.table: FileStoreTable = table
self.commit_user = commit_user
self.overwrite_partition = static_partition
- self.file_store_commit = FileStoreCommit(table, commit_user)
+
+ # Get SnapshotCommit from table's catalog environment
+ snapshot_commit = table.new_snapshot_commit()
+ if snapshot_commit is None:
+ raise RuntimeError("Table does not provide a SnapshotCommit
instance")
+
+ self.file_store_commit = FileStoreCommit(snapshot_commit, table,
commit_user)
self.batch_committed = False
def commit(self, commit_messages: List[CommitMessage]):
diff --git a/paimon-python/pypaimon/write/file_store_commit.py
b/paimon-python/pypaimon/write/file_store_commit.py
index 4928167907..2ff7e01fd5 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -20,6 +20,7 @@ import time
from pathlib import Path
from typing import List
+from pypaimon.catalog.snapshot_commit import PartitionStatistics,
SnapshotCommit
from pypaimon.manifest.manifest_file_manager import ManifestFileManager
from pypaimon.manifest.manifest_list_manager import ManifestListManager
from pypaimon.snapshot.snapshot import Snapshot
@@ -28,11 +29,17 @@ from pypaimon.write.commit_message import CommitMessage
class FileStoreCommit:
- """Core commit logic for file store operations."""
+ """
+ Core commit logic for file store operations.
- def __init__(self, table, commit_user: str):
+ This class provides atomic commit functionality similar to
+ org.apache.paimon.operation.FileStoreCommitImpl in Java.
+ """
+
+ def __init__(self, snapshot_commit: SnapshotCommit, table, commit_user:
str):
from pypaimon.table.file_store_table import FileStoreTable
+ self.snapshot_commit = snapshot_commit
self.table: FileStoreTable = table
self.commit_user = commit_user
@@ -71,9 +78,18 @@ class FileStoreCommit:
time_millis=int(time.time() * 1000),
log_offsets={},
)
- self.snapshot_manager.commit_snapshot(new_snapshot_id, snapshot_data)
+
+ # Generate partition statistics for the commit
+ statistics = self._generate_partition_statistics(commit_messages)
+
+ # Use SnapshotCommit for atomic commit
+ with self.snapshot_commit:
+ success = self.snapshot_commit.commit(snapshot_data,
self.table.current_branch(), statistics)
+ if not success:
+ raise RuntimeError(f"Failed to commit snapshot
{new_snapshot_id}")
def overwrite(self, partition, commit_messages: List[CommitMessage],
commit_identifier: int):
+ """Commit the given commit messages in overwrite mode."""
if not commit_messages:
return
@@ -97,7 +113,15 @@ class FileStoreCommit:
time_millis=int(time.time() * 1000),
log_offsets={},
)
- self.snapshot_manager.commit_snapshot(new_snapshot_id, snapshot_data)
+
+ # Generate partition statistics for the commit
+ statistics = self._generate_partition_statistics(commit_messages)
+
+ # Use SnapshotCommit for atomic commit
+ with self.snapshot_commit:
+ success = self.snapshot_commit.commit(snapshot_data,
self.table.current_branch(), statistics)
+ if not success:
+ raise RuntimeError(f"Failed to commit snapshot
{new_snapshot_id}")
def abort(self, commit_messages: List[CommitMessage]):
for message in commit_messages:
@@ -110,11 +134,103 @@ class FileStoreCommit:
print(f"Warning: Failed to clean up file {file.file_path}:
{e}")
def close(self):
- pass
+ """Close the FileStoreCommit and release resources."""
+ if hasattr(self.snapshot_commit, 'close'):
+ self.snapshot_commit.close()
def _generate_snapshot_id(self) -> int:
+ """Generate the next snapshot ID."""
latest_snapshot = self.snapshot_manager.get_latest_snapshot()
if latest_snapshot:
return latest_snapshot.id + 1
else:
return 1
+
+ def _generate_partition_statistics(self, commit_messages:
List[CommitMessage]) -> List[PartitionStatistics]:
+ """
+ Generate partition statistics from commit messages.
+
+ This method follows the Java implementation pattern from
+ org.apache.paimon.manifest.PartitionEntry.fromManifestEntry() and
+ PartitionEntry.merge() methods.
+
+ Args:
+ commit_messages: List of commit messages to analyze
+
+ Returns:
+ List of PartitionStatistics for each unique partition
+ """
+ partition_stats = {}
+
+ for message in commit_messages:
+ # Convert partition tuple to dictionary for PartitionStatistics
+ partition_value = message.partition() # Call the method to get
partition value
+ if partition_value:
+ # Assuming partition is a tuple and we need to convert it to a
dict
+ # This may need adjustment based on actual partition format
+ if isinstance(partition_value, tuple):
+ # Create partition spec from partition tuple and table
partition keys
+ partition_spec = {}
+ if len(partition_value) == len(self.table.partition_keys):
+ for i, key in enumerate(self.table.partition_keys):
+ partition_spec[key] = str(partition_value[i])
+ else:
+ # Fallback: use indices as keys
+ for i, value in enumerate(partition_value):
+ partition_spec[f"partition_{i}"] = str(value)
+ else:
+ # If partition is already a dict or other format
+ partition_spec = dict(partition_value) if partition_value
else {}
+ else:
+ # Default partition for unpartitioned tables
+ partition_spec = {}
+
+ partition_key = tuple(sorted(partition_spec.items()))
+
+ if partition_key not in partition_stats:
+ partition_stats[partition_key] = {
+ 'partition_spec': partition_spec,
+ 'record_count': 0,
+ 'file_count': 0,
+ 'file_size_in_bytes': 0,
+ 'last_file_creation_time': 0
+ }
+
+ # Process each file in the commit message
+ # Following Java implementation: PartitionEntry.fromDataFile()
+ new_files = message.new_files()
+ for file_meta in new_files:
+ # Extract actual file metadata (following Java DataFileMeta
pattern)
+ record_count = file_meta.row_count
+ file_size_in_bytes = file_meta.file_size
+ file_count = 1
+
+ # Convert creation_time to milliseconds (Java uses epoch
millis)
+ if file_meta.creation_time:
+ file_creation_time =
int(file_meta.creation_time.timestamp() * 1000)
+ else:
+ file_creation_time = int(time.time() * 1000)
+
+ # Accumulate statistics (following Java PartitionEntry.merge()
logic)
+ partition_stats[partition_key]['record_count'] += record_count
+ partition_stats[partition_key]['file_size_in_bytes'] +=
file_size_in_bytes
+ partition_stats[partition_key]['file_count'] += file_count
+
+ # Keep the latest creation time
+ partition_stats[partition_key]['last_file_creation_time'] =
max(
+ partition_stats[partition_key]['last_file_creation_time'],
+ file_creation_time
+ )
+
+ # Convert to PartitionStatistics objects
+ # Following Java PartitionEntry.toPartitionStatistics() pattern
+ return [
+ PartitionStatistics.create(
+ partition_spec=stats['partition_spec'],
+ record_count=stats['record_count'],
+ file_count=stats['file_count'],
+ file_size_in_bytes=stats['file_size_in_bytes'],
+ last_file_creation_time=stats['last_file_creation_time']
+ )
+ for stats in partition_stats.values()
+ ]