This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 56585c0bee [Python] Support snapshot and manifest for PyPaimon (#5987)
56585c0bee is described below
commit 56585c0beeb72f9f639af9f494a795f80e3a62c8
Author: ChengHui Chen <[email protected]>
AuthorDate: Thu Jul 31 10:30:02 2025 +0800
[Python] Support snapshot and manifest for PyPaimon (#5987)
---
paimon-python/pypaimon/common/file_io.py | 12 +++
.../{common/file_io.py => manifest/__init__.py} | 24 -----
.../pypaimon/manifest/manifest_file_manager.py | 115 +++++++++++++++++++++
.../pypaimon/manifest/manifest_list_manager.py | 87 ++++++++++++++++
.../file_io.py => manifest/schema/__init__.py} | 24 -----
.../pypaimon/manifest/schema/data_file_meta.py | 86 +++++++++++++++
.../schema/manifest_entry.py} | 39 +++----
.../schema/manifest_file_meta.py} | 44 ++++----
.../file_io.py => manifest/schema/simple_stats.py} | 35 +++----
.../{common/file_io.py => snapshot/__init__.py} | 24 -----
paimon-python/pypaimon/snapshot/snapshot.py | 70 +++++++++++++
.../pypaimon/snapshot/snapshot_manager.py | 67 ++++++++++++
paimon-python/setup.py | 1 +
13 files changed, 502 insertions(+), 126 deletions(-)
diff --git a/paimon-python/pypaimon/common/file_io.py
b/paimon-python/pypaimon/common/file_io.py
index 38602a709b..f2539ca2c0 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -39,3 +39,15 @@ class FileIO(ABC):
@abstractmethod
def mkdirs(self, path: Path) -> bool:
""""""
+
+ @abstractmethod
+ def write_file(self, path: Path, content: str, overwrite: bool = False):
+ """"""
+
+ @abstractmethod
+ def delete_quietly(self, path: Path):
+ """"""
+
+ @abstractmethod
+ def new_input_stream(self, path: Path):
+ """"""
diff --git a/paimon-python/pypaimon/common/file_io.py
b/paimon-python/pypaimon/manifest/__init__.py
similarity index 65%
copy from paimon-python/pypaimon/common/file_io.py
copy to paimon-python/pypaimon/manifest/__init__.py
index 38602a709b..65b48d4d79 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/manifest/__init__.py
@@ -15,27 +15,3 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-from abc import ABC, abstractmethod
-from pathlib import Path
-
-
-class FileIO(ABC):
- @abstractmethod
- def exists(self, path: Path) -> bool:
- """"""
-
- @abstractmethod
- def read_file_utf8(self, path: Path) -> str:
- """"""
-
- @abstractmethod
- def try_to_write_atomic(self, path: Path, content: str) -> bool:
- """"""
-
- @abstractmethod
- def list_status(self, path: Path):
- """"""
-
- @abstractmethod
- def mkdirs(self, path: Path) -> bool:
- """"""
diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py
b/paimon-python/pypaimon/manifest/manifest_file_manager.py
new file mode 100644
index 0000000000..a70d0e2d46
--- /dev/null
+++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py
@@ -0,0 +1,115 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import uuid
+import fastavro
+from typing import List
+from io import BytesIO
+
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+from pypaimon.manifest.schema.manifest_entry import ManifestEntry,
MANIFEST_ENTRY_SCHEMA
+from pypaimon.table.file_store_table import FileStoreTable
+from pypaimon.table.row.binary_row import BinaryRowDeserializer,
BinaryRowSerializer, BinaryRow
+
+
+class ManifestFileManager:
+ """Writer for manifest files in Avro format using unified FileIO."""
+
+ def __init__(self, table):
+ self.table: FileStoreTable = table
+ self.manifest_path = table.table_path / "manifest"
+ self.file_io = table.file_io
+ self.partition_key_fields =
self.table.table_schema.get_partition_key_fields()
+ self.primary_key_fields =
self.table.table_schema.get_primary_key_fields()
+ self.trimmed_primary_key_fields =
self.table.table_schema.get_trimmed_primary_key_fields()
+
+ def read(self, manifest_file_name: str) -> List[ManifestEntry]:
+ manifest_file_path = self.manifest_path / manifest_file_name
+
+ entries = []
+ with self.file_io.new_input_stream(manifest_file_path) as input_stream:
+ avro_bytes = input_stream.read()
+ buffer = BytesIO(avro_bytes)
+ reader = fastavro.reader(buffer)
+
+ for record in reader:
+ file_info = dict(record['_FILE'])
+ file_meta = DataFileMeta(
+ file_name=file_info['_FILE_NAME'],
+ file_size=file_info['_FILE_SIZE'],
+ row_count=file_info['_ROW_COUNT'],
+
min_key=BinaryRowDeserializer.from_bytes(file_info['_MIN_KEY'],
self.trimmed_primary_key_fields),
+
max_key=BinaryRowDeserializer.from_bytes(file_info['_MAX_KEY'],
self.trimmed_primary_key_fields),
+ key_stats=None, # TODO
+ value_stats=None, # TODO
+ min_sequence_number=file_info['_MIN_SEQUENCE_NUMBER'],
+ max_sequence_number=file_info['_MAX_SEQUENCE_NUMBER'],
+ schema_id=file_info['_SCHEMA_ID'],
+ level=file_info['_LEVEL'],
+ extra_files=None, # TODO
+ )
+ entry = ManifestEntry(
+ kind=record['_KIND'],
+
partition=BinaryRowDeserializer.from_bytes(record['_PARTITION'],
self.partition_key_fields),
+ bucket=record['_BUCKET'],
+ total_buckets=record['_TOTAL_BUCKETS'],
+ file=file_meta
+ )
+ entries.append(entry)
+ return entries
+
+ def write(self, commit_messages: List['CommitMessage']) -> List[str]:
+ avro_records = []
+ for message in commit_messages:
+ partition_bytes = BinaryRowSerializer.to_bytes(
+ BinaryRow(list(message.partition()),
self.table.table_schema.get_partition_key_fields()))
+ for file in message.new_files():
+ avro_record = {
+ "_KIND": 0,
+ "_PARTITION": partition_bytes,
+ "_BUCKET": message.bucket(),
+ "_TOTAL_BUCKETS": -1, # TODO
+ "_FILE": {
+ "_FILE_NAME": file.file_name,
+ "_FILE_SIZE": file.file_size,
+ "_ROW_COUNT": file.row_count,
+ "_MIN_KEY": BinaryRowSerializer.to_bytes(file.min_key),
+ "_MAX_KEY": BinaryRowSerializer.to_bytes(file.max_key),
+ "_KEY_STATS": 1, # TODO
+ "_VALUE_STATS": 1,
+ "_MIN_SEQUENCE_NUMBER": 0,
+ "_MAX_SEQUENCE_NUMBER": 0,
+ "_SCHEMA_ID": 0,
+ "_LEVEL": 0,
+ "_EXTRA_FILES": [],
+ }
+ }
+ avro_records.append(avro_record)
+
+ manifest_filename = f"manifest-{str(uuid.uuid4())}.avro"
+ manifest_path = self.manifest_path / manifest_filename
+ try:
+ buffer = BytesIO()
+ fastavro.writer(buffer, MANIFEST_ENTRY_SCHEMA, avro_records)
+ avro_bytes = buffer.getvalue()
+ with self.file_io.new_output_stream(manifest_path) as
output_stream:
+ output_stream.write(avro_bytes)
+ return [str(manifest_filename)]
+ except Exception as e:
+ self.file_io.delete_quietly(manifest_path)
+ raise RuntimeError(f"Failed to write manifest file: {e}") from e
diff --git a/paimon-python/pypaimon/manifest/manifest_list_manager.py
b/paimon-python/pypaimon/manifest/manifest_list_manager.py
new file mode 100644
index 0000000000..969c6a05ef
--- /dev/null
+++ b/paimon-python/pypaimon/manifest/manifest_list_manager.py
@@ -0,0 +1,87 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import uuid
+import fastavro
+from typing import List, Optional
+from io import BytesIO
+
+
+from pypaimon.manifest.schema.manifest_file_meta import
MANIFEST_FILE_META_SCHEMA
+from pypaimon.snapshot.snapshot import Snapshot
+from pypaimon.table.file_store_table import FileStoreTable
+
+
+class ManifestListManager:
+ """Manager for manifest list files in Avro format using unified FileIO."""
+
+ def __init__(self, table: FileStoreTable):
+ self.table = table
+ self.manifest_path = self.table.table_path / "manifest"
+ self.file_io = self.table.file_io
+
+ def read_all_manifest_files(self, snapshot: Snapshot) -> List[str]:
+ manifest_files = []
+ base_manifests = self.read(snapshot.base_manifest_list)
+ manifest_files.extend(base_manifests)
+ delta_manifests = self.read(snapshot.delta_manifest_list)
+ manifest_files.extend(delta_manifests)
+ return list(set(manifest_files))
+
+ def read(self, manifest_list_name: str) -> List[str]:
+ manifest_list_path = self.manifest_path / manifest_list_name
+ manifest_paths = []
+
+ with self.file_io.new_input_stream(manifest_list_path) as input_stream:
+ avro_bytes = input_stream.read()
+ buffer = BytesIO(avro_bytes)
+ reader = fastavro.reader(buffer)
+ for record in reader:
+ file_name = record['_FILE_NAME']
+ manifest_paths.append(file_name)
+
+ return manifest_paths
+
+ def write(self, manifest_file_names: List[str]) -> Optional[str]:
+ if not manifest_file_names:
+ return None
+
+ avro_records = []
+ for manifest_file_name in manifest_file_names:
+ avro_record = {
+ "_FILE_NAME": manifest_file_name,
+ "_FILE_SIZE": 0, # TODO
+ "_NUM_ADDED_FILES": 0,
+ "_NUM_DELETED_FILES": 0,
+ "_PARTITION_STATS": 0,
+ "_SCHEMA_ID": 0,
+ }
+ avro_records.append(avro_record)
+
+ list_filename = f"manifest-list-{str(uuid.uuid4())}.avro"
+ list_path = self.manifest_path / list_filename
+ try:
+ buffer = BytesIO()
+ fastavro.writer(buffer, MANIFEST_FILE_META_SCHEMA, avro_records)
+ avro_bytes = buffer.getvalue()
+ with self.file_io.new_output_stream(list_path) as output_stream:
+ output_stream.write(avro_bytes)
+ return list_filename
+ except Exception as e:
+ self.file_io.delete_quietly(list_path)
+ raise RuntimeError(f"Failed to write manifest list file: {e}")
from e
diff --git a/paimon-python/pypaimon/common/file_io.py
b/paimon-python/pypaimon/manifest/schema/__init__.py
similarity index 65%
copy from paimon-python/pypaimon/common/file_io.py
copy to paimon-python/pypaimon/manifest/schema/__init__.py
index 38602a709b..65b48d4d79 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/manifest/schema/__init__.py
@@ -15,27 +15,3 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-from abc import ABC, abstractmethod
-from pathlib import Path
-
-
-class FileIO(ABC):
- @abstractmethod
- def exists(self, path: Path) -> bool:
- """"""
-
- @abstractmethod
- def read_file_utf8(self, path: Path) -> str:
- """"""
-
- @abstractmethod
- def try_to_write_atomic(self, path: Path, content: str) -> bool:
- """"""
-
- @abstractmethod
- def list_status(self, path: Path):
- """"""
-
- @abstractmethod
- def mkdirs(self, path: Path) -> bool:
- """"""
diff --git a/paimon-python/pypaimon/manifest/schema/data_file_meta.py
b/paimon-python/pypaimon/manifest/schema/data_file_meta.py
new file mode 100644
index 0000000000..452a59c4e7
--- /dev/null
+++ b/paimon-python/pypaimon/manifest/schema/data_file_meta.py
@@ -0,0 +1,86 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from dataclasses import dataclass
+from pathlib import Path
+from typing import List, Optional, Dict, Any
+from datetime import datetime
+
+from pypaimon.table.row.binary_row import BinaryRow
+
+
+@dataclass
+class DataFileMeta:
+ file_name: str
+ file_size: int
+ row_count: int
+ min_key: Optional[BinaryRow]
+ max_key: Optional[BinaryRow]
+ key_stats: Optional[Dict[str, Any]]
+ value_stats: Optional[Dict[str, Any]]
+ min_sequence_number: int
+ max_sequence_number: int
+ schema_id: int
+ level: int
+ extra_files: Optional[List[str]]
+
+ creation_time: Optional[datetime] = None
+ delete_row_count: Optional[int] = None
+ embedded_index: Optional[bytes] = None
+ file_source: Optional[str] = None
+ value_stats_cols: Optional[List[str]] = None
+ external_path: Optional[str] = None
+
+ file_path: str = None
+
+ def set_file_path(self, table_path: Path, partition: BinaryRow, bucket:
int):
+ path_builder = table_path
+
+ partition_dict = partition.to_dict()
+ for field_name, field_value in partition_dict.items():
+ path_builder = path_builder / (field_name + "=" + str(field_value))
+
+ path_builder = path_builder / ("bucket-" + str(bucket)) /
self.file_name
+
+ self.file_path = str(path_builder)
+
+
+DATA_FILE_META_SCHEMA = {
+ "type": "record",
+ "name": "DataFileMeta",
+ "fields": [
+ {"name": "_FILE_NAME", "type": "string"},
+ {"name": "_FILE_SIZE", "type": "long"},
+ {"name": "_ROW_COUNT", "type": "long"},
+ {"name": "_MIN_KEY", "type": "bytes"},
+ {"name": "_MAX_KEY", "type": "bytes"},
+ {"name": "_KEY_STATS", "type": "long"}, # TODO
+ {"name": "_VALUE_STATS", "type": "long"}, # TODO
+ {"name": "_MIN_SEQUENCE_NUMBER", "type": "long"},
+ {"name": "_MAX_SEQUENCE_NUMBER", "type": "long"},
+ {"name": "_SCHEMA_ID", "type": "long"},
+ {"name": "_LEVEL", "type": "int"},
+ {"name": "_EXTRA_FILES", "type": {"type": "array", "items": "string"}},
+ {"name": "_CREATION_TIME", "type": ["null", "long"], "default": None},
+ {"name": "_DELETE_ROW_COUNT", "type": ["null", "long"], "default":
None},
+ {"name": "_EMBEDDED_FILE_INDEX", "type": ["null", "bytes"], "default":
None},
+ {"name": "_FILE_SOURCE", "type": ["null", "int"], "default": None},
+ {"name": "_VALUE_STATS_COLS", "type": ["null", {"type": "array",
"items": "string"}], "default": None},
+ {"name": "_EXTERNAL_PATH", "type": ["null", "string"], "default":
None},
+ ]
+}
diff --git a/paimon-python/pypaimon/common/file_io.py
b/paimon-python/pypaimon/manifest/schema/manifest_entry.py
similarity index 59%
copy from paimon-python/pypaimon/common/file_io.py
copy to paimon-python/pypaimon/manifest/schema/manifest_entry.py
index 38602a709b..6974b98994 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/manifest/schema/manifest_entry.py
@@ -15,27 +15,30 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-from abc import ABC, abstractmethod
-from pathlib import Path
+from dataclasses import dataclass
-class FileIO(ABC):
- @abstractmethod
- def exists(self, path: Path) -> bool:
- """"""
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta,
DATA_FILE_META_SCHEMA
+from pypaimon.table.row.binary_row import BinaryRow
- @abstractmethod
- def read_file_utf8(self, path: Path) -> str:
- """"""
- @abstractmethod
- def try_to_write_atomic(self, path: Path, content: str) -> bool:
- """"""
+@dataclass
+class ManifestEntry:
+ kind: int
+ partition: BinaryRow
+ bucket: int
+ total_buckets: int
+ file: DataFileMeta
- @abstractmethod
- def list_status(self, path: Path):
- """"""
- @abstractmethod
- def mkdirs(self, path: Path) -> bool:
- """"""
+MANIFEST_ENTRY_SCHEMA = {
+ "type": "record",
+ "name": "ManifestEntry",
+ "fields": [
+ {"name": "_KIND", "type": "int"},
+ {"name": "_PARTITION", "type": "bytes"},
+ {"name": "_BUCKET", "type": "int"},
+ {"name": "_TOTAL_BUCKETS", "type": "int"},
+ {"name": "_FILE", "type": DATA_FILE_META_SCHEMA}
+ ]
+}
diff --git a/paimon-python/pypaimon/common/file_io.py
b/paimon-python/pypaimon/manifest/schema/manifest_file_meta.py
similarity index 55%
copy from paimon-python/pypaimon/common/file_io.py
copy to paimon-python/pypaimon/manifest/schema/manifest_file_meta.py
index 38602a709b..5796b033e3 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/manifest/schema/manifest_file_meta.py
@@ -15,27 +15,35 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-from abc import ABC, abstractmethod
-from pathlib import Path
+from dataclasses import dataclass
-class FileIO(ABC):
- @abstractmethod
- def exists(self, path: Path) -> bool:
- """"""
+from pypaimon.manifest.schema.simple_stats import SimpleStats
- @abstractmethod
- def read_file_utf8(self, path: Path) -> str:
- """"""
- @abstractmethod
- def try_to_write_atomic(self, path: Path, content: str) -> bool:
- """"""
+@dataclass
+class ManifestFileMeta:
+ file_name: str
+ file_size: int
+ num_added_files: int
+ num_deleted_files: int
+ partition_stats: SimpleStats
+ schema_id: int
+ min_bucket: int
+ max_bucket: int
+ min_level: int
+ max_level: int
- @abstractmethod
- def list_status(self, path: Path):
- """"""
- @abstractmethod
- def mkdirs(self, path: Path) -> bool:
- """"""
+MANIFEST_FILE_META_SCHEMA = {
+ "type": "record",
+ "name": "ManifestFileMeta",
+ "fields": [
+ {"name": "_FILE_NAME", "type": "string"},
+ {"name": "_FILE_SIZE", "type": "long"},
+ {"name": "_NUM_ADDED_FILES", "type": "long"},
+ {"name": "_NUM_DELETED_FILES", "type": "long"},
+ {"name": "_PARTITION_STATS", "type": "long"}, # TODO
+ {"name": "_SCHEMA_ID", "type": "long"},
+ ]
+}
diff --git a/paimon-python/pypaimon/common/file_io.py
b/paimon-python/pypaimon/manifest/schema/simple_stats.py
similarity index 63%
copy from paimon-python/pypaimon/common/file_io.py
copy to paimon-python/pypaimon/manifest/schema/simple_stats.py
index 38602a709b..59c5de710b 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/manifest/schema/simple_stats.py
@@ -15,27 +15,26 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-from abc import ABC, abstractmethod
-from pathlib import Path
+from dataclasses import dataclass
-class FileIO(ABC):
- @abstractmethod
- def exists(self, path: Path) -> bool:
- """"""
+from pypaimon.table.row.binary_row import BinaryRow
- @abstractmethod
- def read_file_utf8(self, path: Path) -> str:
- """"""
- @abstractmethod
- def try_to_write_atomic(self, path: Path, content: str) -> bool:
- """"""
+@dataclass
+class SimpleStats:
+ min_value: BinaryRow
+ max_value: BinaryRow
+ null_count: int
- @abstractmethod
- def list_status(self, path: Path):
- """"""
- @abstractmethod
- def mkdirs(self, path: Path) -> bool:
- """"""
+SIMPLE_STATS_SCHEMA = {
+ "type": "record",
+ "name": "SimpleStats",
+ "namespace": "com.example.paimon",
+ "fields": [
+ {"name": "null_count", "type": ["null", "long"], "default": None},
+ {"name": "min_value", "type": ["null", "bytes"], "default": None},
+ {"name": "max_value", "type": ["null", "bytes"], "default": None},
+ ]
+}
diff --git a/paimon-python/pypaimon/common/file_io.py
b/paimon-python/pypaimon/snapshot/__init__.py
similarity index 65%
copy from paimon-python/pypaimon/common/file_io.py
copy to paimon-python/pypaimon/snapshot/__init__.py
index 38602a709b..65b48d4d79 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/snapshot/__init__.py
@@ -15,27 +15,3 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-from abc import ABC, abstractmethod
-from pathlib import Path
-
-
-class FileIO(ABC):
- @abstractmethod
- def exists(self, path: Path) -> bool:
- """"""
-
- @abstractmethod
- def read_file_utf8(self, path: Path) -> str:
- """"""
-
- @abstractmethod
- def try_to_write_atomic(self, path: Path, content: str) -> bool:
- """"""
-
- @abstractmethod
- def list_status(self, path: Path):
- """"""
-
- @abstractmethod
- def mkdirs(self, path: Path) -> bool:
- """"""
diff --git a/paimon-python/pypaimon/snapshot/snapshot.py
b/paimon-python/pypaimon/snapshot/snapshot.py
new file mode 100644
index 0000000000..e2bc62d551
--- /dev/null
+++ b/paimon-python/pypaimon/snapshot/snapshot.py
@@ -0,0 +1,70 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import re
+from dataclasses import dataclass, fields, asdict
+from typing import Optional, Dict, Any
+
+
+@dataclass
+class Snapshot:
+ version: int
+ id: int
+ schema_id: int
+ base_manifest_list: str
+ delta_manifest_list: str
+ commit_user: str
+ commit_identifier: int
+ commit_kind: str
+ time_millis: int
+ log_offsets: Dict[int, int]
+
+ changelog_manifest_list: Optional[str] = None
+ index_manifest: Optional[str] = None
+ total_record_count: Optional[int] = None
+ delta_record_count: Optional[int] = None
+ changelog_record_count: Optional[int] = None
+ watermark: Optional[int] = None
+ statistics: Optional[str] = None
+
+ @classmethod
+ def from_json(cls, data: Dict[str, Any]):
+ known_fields = {field.name for field in fields(Snapshot)}
+ processed_data = {
+ camel_to_snake(key): value
+ for key, value in data.items()
+ if camel_to_snake(key) in known_fields
+ }
+ return Snapshot(**processed_data)
+
+ def to_json(self) -> Dict[str, Any]:
+ snake_case_dict = asdict(self)
+ return {
+ snake_to_camel(key): value
+ for key, value in snake_case_dict.items()
+ }
+
+
+def camel_to_snake(name: str) -> str:
+ s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
+ return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
+
+
+def snake_to_camel(name: str) -> str:
+ parts = name.split('_')
+ return parts[0] + ''.join(word.capitalize() for word in parts[1:])
diff --git a/paimon-python/pypaimon/snapshot/snapshot_manager.py
b/paimon-python/pypaimon/snapshot/snapshot_manager.py
new file mode 100644
index 0000000000..9e500c48d2
--- /dev/null
+++ b/paimon-python/pypaimon/snapshot/snapshot_manager.py
@@ -0,0 +1,67 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import json
+from typing import Optional
+
+from pypaimon.common.file_io import FileIO
+from pypaimon.snapshot.snapshot import Snapshot
+from pypaimon.table.file_store_table import FileStoreTable
+
+
+class SnapshotManager:
+ """Manager for snapshot files using unified FileIO."""
+
+ def __init__(self, table: FileStoreTable):
+ self.table = table
+ self.file_io: FileIO = self.table.file_io
+ self.snapshot_dir = self.table.table_path / "snapshot"
+
+ def get_latest_snapshot(self) -> Optional[Snapshot]:
+ latest_file = self.snapshot_dir / "LATEST"
+ if not self.file_io.exists(latest_file):
+ return None
+
+ latest_content = self.file_io.read_file_utf8(latest_file)
+ latest_snapshot_id = int(latest_content.strip())
+
+ snapshot_file = self.snapshot_dir / f"snapshot-{latest_snapshot_id}"
+ if not self.file_io.exists(snapshot_file):
+ return None
+
+ snapshot_content = self.file_io.read_file_utf8(snapshot_file)
+ snapshot_data = json.loads(snapshot_content)
+ return Snapshot.from_json(snapshot_data)
+
+ def commit_snapshot(self, snapshot_id: int, snapshot_data: Snapshot):
+ snapshot_file = self.snapshot_dir / f"snapshot-{snapshot_id}"
+ latest_file = self.snapshot_dir / "LATEST"
+
+ try:
+ snapshot_json = json.dumps(snapshot_data.to_json(), indent=2)
+ snapshot_success = self.file_io.try_to_write_atomic(snapshot_file,
snapshot_json)
+ if not snapshot_success:
+ self.file_io.write_file(snapshot_file, snapshot_json,
overwrite=True)
+
+ latest_success = self.file_io.try_to_write_atomic(latest_file,
str(snapshot_id))
+ if not latest_success:
+ self.file_io.write_file(latest_file, str(snapshot_id),
overwrite=True)
+
+ except Exception as e:
+ self.file_io.delete_quietly(snapshot_file)
+ raise RuntimeError(f"Failed to commit snapshot {snapshot_id}:
{e}") from e
diff --git a/paimon-python/setup.py b/paimon-python/setup.py
index 0d82a25ffb..499c6ab8dc 100644
--- a/paimon-python/setup.py
+++ b/paimon-python/setup.py
@@ -26,6 +26,7 @@ install_requires = [
'fsspec==2024.3.1',
'cachetools==5.3.3',
'ossfs==2023.12.0'
+ 'fastavro==1.11.1'
]
long_description = "See Apache Paimon Python API \