This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 56585c0bee [Python] Support snapshot and manifest for PyPaimon (#5987)
56585c0bee is described below

commit 56585c0beeb72f9f639af9f494a795f80e3a62c8
Author: ChengHui Chen <[email protected]>
AuthorDate: Thu Jul 31 10:30:02 2025 +0800

    [Python] Support snapshot and manifest for PyPaimon (#5987)
---
 paimon-python/pypaimon/common/file_io.py           |  12 +++
 .../{common/file_io.py => manifest/__init__.py}    |  24 -----
 .../pypaimon/manifest/manifest_file_manager.py     | 115 +++++++++++++++++++++
 .../pypaimon/manifest/manifest_list_manager.py     |  87 ++++++++++++++++
 .../file_io.py => manifest/schema/__init__.py}     |  24 -----
 .../pypaimon/manifest/schema/data_file_meta.py     |  86 +++++++++++++++
 .../schema/manifest_entry.py}                      |  39 +++----
 .../schema/manifest_file_meta.py}                  |  44 ++++----
 .../file_io.py => manifest/schema/simple_stats.py} |  35 +++----
 .../{common/file_io.py => snapshot/__init__.py}    |  24 -----
 paimon-python/pypaimon/snapshot/snapshot.py        |  70 +++++++++++++
 .../pypaimon/snapshot/snapshot_manager.py          |  67 ++++++++++++
 paimon-python/setup.py                             |   1 +
 13 files changed, 502 insertions(+), 126 deletions(-)

diff --git a/paimon-python/pypaimon/common/file_io.py 
b/paimon-python/pypaimon/common/file_io.py
index 38602a709b..f2539ca2c0 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -39,3 +39,15 @@ class FileIO(ABC):
     @abstractmethod
     def mkdirs(self, path: Path) -> bool:
         """"""
+
+    @abstractmethod
+    def write_file(self, path: Path, content: str, overwrite: bool = False):
+        """"""
+
+    @abstractmethod
+    def delete_quietly(self, path: Path):
+        """"""
+
+    @abstractmethod
+    def new_input_stream(self, path: Path):
+        """"""
diff --git a/paimon-python/pypaimon/common/file_io.py 
b/paimon-python/pypaimon/manifest/__init__.py
similarity index 65%
copy from paimon-python/pypaimon/common/file_io.py
copy to paimon-python/pypaimon/manifest/__init__.py
index 38602a709b..65b48d4d79 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/manifest/__init__.py
@@ -15,27 +15,3 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-from abc import ABC, abstractmethod
-from pathlib import Path
-
-
-class FileIO(ABC):
-    @abstractmethod
-    def exists(self, path: Path) -> bool:
-        """"""
-
-    @abstractmethod
-    def read_file_utf8(self, path: Path) -> str:
-        """"""
-
-    @abstractmethod
-    def try_to_write_atomic(self, path: Path, content: str) -> bool:
-        """"""
-
-    @abstractmethod
-    def list_status(self, path: Path):
-        """"""
-
-    @abstractmethod
-    def mkdirs(self, path: Path) -> bool:
-        """"""
diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py 
b/paimon-python/pypaimon/manifest/manifest_file_manager.py
new file mode 100644
index 0000000000..a70d0e2d46
--- /dev/null
+++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py
@@ -0,0 +1,115 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import uuid
+import fastavro
+from typing import List
+from io import BytesIO
+
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+from pypaimon.manifest.schema.manifest_entry import ManifestEntry, 
MANIFEST_ENTRY_SCHEMA
+from pypaimon.table.file_store_table import FileStoreTable
+from pypaimon.table.row.binary_row import BinaryRowDeserializer, 
BinaryRowSerializer, BinaryRow
+
+
+class ManifestFileManager:
+    """Writer for manifest files in Avro format using unified FileIO."""
+
+    def __init__(self, table):
+        self.table: FileStoreTable = table
+        self.manifest_path = table.table_path / "manifest"
+        self.file_io = table.file_io
+        self.partition_key_fields = 
self.table.table_schema.get_partition_key_fields()
+        self.primary_key_fields = 
self.table.table_schema.get_primary_key_fields()
+        self.trimmed_primary_key_fields = 
self.table.table_schema.get_trimmed_primary_key_fields()
+
+    def read(self, manifest_file_name: str) -> List[ManifestEntry]:
+        manifest_file_path = self.manifest_path / manifest_file_name
+
+        entries = []
+        with self.file_io.new_input_stream(manifest_file_path) as input_stream:
+            avro_bytes = input_stream.read()
+        buffer = BytesIO(avro_bytes)
+        reader = fastavro.reader(buffer)
+
+        for record in reader:
+            file_info = dict(record['_FILE'])
+            file_meta = DataFileMeta(
+                file_name=file_info['_FILE_NAME'],
+                file_size=file_info['_FILE_SIZE'],
+                row_count=file_info['_ROW_COUNT'],
+                
min_key=BinaryRowDeserializer.from_bytes(file_info['_MIN_KEY'], 
self.trimmed_primary_key_fields),
+                
max_key=BinaryRowDeserializer.from_bytes(file_info['_MAX_KEY'], 
self.trimmed_primary_key_fields),
+                key_stats=None,  # TODO
+                value_stats=None,  # TODO
+                min_sequence_number=file_info['_MIN_SEQUENCE_NUMBER'],
+                max_sequence_number=file_info['_MAX_SEQUENCE_NUMBER'],
+                schema_id=file_info['_SCHEMA_ID'],
+                level=file_info['_LEVEL'],
+                extra_files=None,  # TODO
+            )
+            entry = ManifestEntry(
+                kind=record['_KIND'],
+                
partition=BinaryRowDeserializer.from_bytes(record['_PARTITION'], 
self.partition_key_fields),
+                bucket=record['_BUCKET'],
+                total_buckets=record['_TOTAL_BUCKETS'],
+                file=file_meta
+            )
+            entries.append(entry)
+        return entries
+
+    def write(self, commit_messages: List['CommitMessage']) -> List[str]:
+        avro_records = []
+        for message in commit_messages:
+            partition_bytes = BinaryRowSerializer.to_bytes(
+                BinaryRow(list(message.partition()), 
self.table.table_schema.get_partition_key_fields()))
+            for file in message.new_files():
+                avro_record = {
+                    "_KIND": 0,
+                    "_PARTITION": partition_bytes,
+                    "_BUCKET": message.bucket(),
+                    "_TOTAL_BUCKETS": -1,  # TODO
+                    "_FILE": {
+                        "_FILE_NAME": file.file_name,
+                        "_FILE_SIZE": file.file_size,
+                        "_ROW_COUNT": file.row_count,
+                        "_MIN_KEY": BinaryRowSerializer.to_bytes(file.min_key),
+                        "_MAX_KEY": BinaryRowSerializer.to_bytes(file.max_key),
+                        "_KEY_STATS": 1,  # TODO
+                        "_VALUE_STATS": 1,
+                        "_MIN_SEQUENCE_NUMBER": 0,
+                        "_MAX_SEQUENCE_NUMBER": 0,
+                        "_SCHEMA_ID": 0,
+                        "_LEVEL": 0,
+                        "_EXTRA_FILES": [],
+                    }
+                }
+                avro_records.append(avro_record)
+
+        manifest_filename = f"manifest-{str(uuid.uuid4())}.avro"
+        manifest_path = self.manifest_path / manifest_filename
+        try:
+            buffer = BytesIO()
+            fastavro.writer(buffer, MANIFEST_ENTRY_SCHEMA, avro_records)
+            avro_bytes = buffer.getvalue()
+            with self.file_io.new_output_stream(manifest_path) as 
output_stream:
+                output_stream.write(avro_bytes)
+            return [str(manifest_filename)]
+        except Exception as e:
+            self.file_io.delete_quietly(manifest_path)
+            raise RuntimeError(f"Failed to write manifest file: {e}") from e
diff --git a/paimon-python/pypaimon/manifest/manifest_list_manager.py 
b/paimon-python/pypaimon/manifest/manifest_list_manager.py
new file mode 100644
index 0000000000..969c6a05ef
--- /dev/null
+++ b/paimon-python/pypaimon/manifest/manifest_list_manager.py
@@ -0,0 +1,87 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import uuid
+import fastavro
+from typing import List, Optional
+from io import BytesIO
+
+
+from pypaimon.manifest.schema.manifest_file_meta import 
MANIFEST_FILE_META_SCHEMA
+from pypaimon.snapshot.snapshot import Snapshot
+from pypaimon.table.file_store_table import FileStoreTable
+
+
+class ManifestListManager:
+    """Manager for manifest list files in Avro format using unified FileIO."""
+
+    def __init__(self, table: FileStoreTable):
+        self.table = table
+        self.manifest_path = self.table.table_path / "manifest"
+        self.file_io = self.table.file_io
+
+    def read_all_manifest_files(self, snapshot: Snapshot) -> List[str]:
+        manifest_files = []
+        base_manifests = self.read(snapshot.base_manifest_list)
+        manifest_files.extend(base_manifests)
+        delta_manifests = self.read(snapshot.delta_manifest_list)
+        manifest_files.extend(delta_manifests)
+        return list(set(manifest_files))
+
+    def read(self, manifest_list_name: str) -> List[str]:
+        manifest_list_path = self.manifest_path / manifest_list_name
+        manifest_paths = []
+
+        with self.file_io.new_input_stream(manifest_list_path) as input_stream:
+            avro_bytes = input_stream.read()
+        buffer = BytesIO(avro_bytes)
+        reader = fastavro.reader(buffer)
+        for record in reader:
+            file_name = record['_FILE_NAME']
+            manifest_paths.append(file_name)
+
+        return manifest_paths
+
+    def write(self, manifest_file_names: List[str]) -> Optional[str]:
+        if not manifest_file_names:
+            return None
+
+        avro_records = []
+        for manifest_file_name in manifest_file_names:
+            avro_record = {
+                "_FILE_NAME": manifest_file_name,
+                "_FILE_SIZE": 0,  # TODO
+                "_NUM_ADDED_FILES": 0,
+                "_NUM_DELETED_FILES": 0,
+                "_PARTITION_STATS": 0,
+                "_SCHEMA_ID": 0,
+            }
+            avro_records.append(avro_record)
+
+        list_filename = f"manifest-list-{str(uuid.uuid4())}.avro"
+        list_path = self.manifest_path / list_filename
+        try:
+            buffer = BytesIO()
+            fastavro.writer(buffer, MANIFEST_FILE_META_SCHEMA, avro_records)
+            avro_bytes = buffer.getvalue()
+            with self.file_io.new_output_stream(list_path) as output_stream:
+                output_stream.write(avro_bytes)
+            return list_filename
+        except Exception as e:
+            self.file_io.delete_quietly(list_path)
+            raise RuntimeError(f"Failed to write manifest list file: {e}") 
from e
diff --git a/paimon-python/pypaimon/common/file_io.py 
b/paimon-python/pypaimon/manifest/schema/__init__.py
similarity index 65%
copy from paimon-python/pypaimon/common/file_io.py
copy to paimon-python/pypaimon/manifest/schema/__init__.py
index 38602a709b..65b48d4d79 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/manifest/schema/__init__.py
@@ -15,27 +15,3 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-from abc import ABC, abstractmethod
-from pathlib import Path
-
-
-class FileIO(ABC):
-    @abstractmethod
-    def exists(self, path: Path) -> bool:
-        """"""
-
-    @abstractmethod
-    def read_file_utf8(self, path: Path) -> str:
-        """"""
-
-    @abstractmethod
-    def try_to_write_atomic(self, path: Path, content: str) -> bool:
-        """"""
-
-    @abstractmethod
-    def list_status(self, path: Path):
-        """"""
-
-    @abstractmethod
-    def mkdirs(self, path: Path) -> bool:
-        """"""
diff --git a/paimon-python/pypaimon/manifest/schema/data_file_meta.py 
b/paimon-python/pypaimon/manifest/schema/data_file_meta.py
new file mode 100644
index 0000000000..452a59c4e7
--- /dev/null
+++ b/paimon-python/pypaimon/manifest/schema/data_file_meta.py
@@ -0,0 +1,86 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from dataclasses import dataclass
+from pathlib import Path
+from typing import List, Optional, Dict, Any
+from datetime import datetime
+
+from pypaimon.table.row.binary_row import BinaryRow
+
+
+@dataclass
+class DataFileMeta:
+    file_name: str
+    file_size: int
+    row_count: int
+    min_key: Optional[BinaryRow]
+    max_key: Optional[BinaryRow]
+    key_stats: Optional[Dict[str, Any]]
+    value_stats: Optional[Dict[str, Any]]
+    min_sequence_number: int
+    max_sequence_number: int
+    schema_id: int
+    level: int
+    extra_files: Optional[List[str]]
+
+    creation_time: Optional[datetime] = None
+    delete_row_count: Optional[int] = None
+    embedded_index: Optional[bytes] = None
+    file_source: Optional[str] = None
+    value_stats_cols: Optional[List[str]] = None
+    external_path: Optional[str] = None
+
+    file_path: str = None
+
+    def set_file_path(self, table_path: Path, partition: BinaryRow, bucket: 
int):
+        path_builder = table_path
+
+        partition_dict = partition.to_dict()
+        for field_name, field_value in partition_dict.items():
+            path_builder = path_builder / (field_name + "=" + str(field_value))
+
+        path_builder = path_builder / ("bucket-" + str(bucket)) / 
self.file_name
+
+        self.file_path = str(path_builder)
+
+
+DATA_FILE_META_SCHEMA = {
+    "type": "record",
+    "name": "DataFileMeta",
+    "fields": [
+        {"name": "_FILE_NAME", "type": "string"},
+        {"name": "_FILE_SIZE", "type": "long"},
+        {"name": "_ROW_COUNT", "type": "long"},
+        {"name": "_MIN_KEY", "type": "bytes"},
+        {"name": "_MAX_KEY", "type": "bytes"},
+        {"name": "_KEY_STATS", "type": "long"},  # TODO
+        {"name": "_VALUE_STATS", "type": "long"},  # TODO
+        {"name": "_MIN_SEQUENCE_NUMBER", "type": "long"},
+        {"name": "_MAX_SEQUENCE_NUMBER", "type": "long"},
+        {"name": "_SCHEMA_ID", "type": "long"},
+        {"name": "_LEVEL", "type": "int"},
+        {"name": "_EXTRA_FILES", "type": {"type": "array", "items": "string"}},
+        {"name": "_CREATION_TIME", "type": ["null", "long"], "default": None},
+        {"name": "_DELETE_ROW_COUNT", "type": ["null", "long"], "default": 
None},
+        {"name": "_EMBEDDED_FILE_INDEX", "type": ["null", "bytes"], "default": 
None},
+        {"name": "_FILE_SOURCE", "type": ["null", "int"], "default": None},
+        {"name": "_VALUE_STATS_COLS", "type": ["null", {"type": "array", 
"items": "string"}], "default": None},
+        {"name": "_EXTERNAL_PATH", "type": ["null", "string"], "default": 
None},
+    ]
+}
diff --git a/paimon-python/pypaimon/common/file_io.py 
b/paimon-python/pypaimon/manifest/schema/manifest_entry.py
similarity index 59%
copy from paimon-python/pypaimon/common/file_io.py
copy to paimon-python/pypaimon/manifest/schema/manifest_entry.py
index 38602a709b..6974b98994 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/manifest/schema/manifest_entry.py
@@ -15,27 +15,30 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-from abc import ABC, abstractmethod
-from pathlib import Path
 
+from dataclasses import dataclass
 
-class FileIO(ABC):
-    @abstractmethod
-    def exists(self, path: Path) -> bool:
-        """"""
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta, 
DATA_FILE_META_SCHEMA
+from pypaimon.table.row.binary_row import BinaryRow
 
-    @abstractmethod
-    def read_file_utf8(self, path: Path) -> str:
-        """"""
 
-    @abstractmethod
-    def try_to_write_atomic(self, path: Path, content: str) -> bool:
-        """"""
+@dataclass
+class ManifestEntry:
+    kind: int
+    partition: BinaryRow
+    bucket: int
+    total_buckets: int
+    file: DataFileMeta
 
-    @abstractmethod
-    def list_status(self, path: Path):
-        """"""
 
-    @abstractmethod
-    def mkdirs(self, path: Path) -> bool:
-        """"""
+MANIFEST_ENTRY_SCHEMA = {
+    "type": "record",
+    "name": "ManifestEntry",
+    "fields": [
+        {"name": "_KIND", "type": "int"},
+        {"name": "_PARTITION", "type": "bytes"},
+        {"name": "_BUCKET", "type": "int"},
+        {"name": "_TOTAL_BUCKETS", "type": "int"},
+        {"name": "_FILE", "type": DATA_FILE_META_SCHEMA}
+    ]
+}
diff --git a/paimon-python/pypaimon/common/file_io.py 
b/paimon-python/pypaimon/manifest/schema/manifest_file_meta.py
similarity index 55%
copy from paimon-python/pypaimon/common/file_io.py
copy to paimon-python/pypaimon/manifest/schema/manifest_file_meta.py
index 38602a709b..5796b033e3 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/manifest/schema/manifest_file_meta.py
@@ -15,27 +15,35 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-from abc import ABC, abstractmethod
-from pathlib import Path
 
+from dataclasses import dataclass
 
-class FileIO(ABC):
-    @abstractmethod
-    def exists(self, path: Path) -> bool:
-        """"""
+from pypaimon.manifest.schema.simple_stats import SimpleStats
 
-    @abstractmethod
-    def read_file_utf8(self, path: Path) -> str:
-        """"""
 
-    @abstractmethod
-    def try_to_write_atomic(self, path: Path, content: str) -> bool:
-        """"""
+@dataclass
+class ManifestFileMeta:
+    file_name: str
+    file_size: int
+    num_added_files: int
+    num_deleted_files: int
+    partition_stats: SimpleStats
+    schema_id: int
+    min_bucket: int
+    max_bucket: int
+    min_level: int
+    max_level: int
 
-    @abstractmethod
-    def list_status(self, path: Path):
-        """"""
 
-    @abstractmethod
-    def mkdirs(self, path: Path) -> bool:
-        """"""
+MANIFEST_FILE_META_SCHEMA = {
+    "type": "record",
+    "name": "ManifestFileMeta",
+    "fields": [
+        {"name": "_FILE_NAME", "type": "string"},
+        {"name": "_FILE_SIZE", "type": "long"},
+        {"name": "_NUM_ADDED_FILES", "type": "long"},
+        {"name": "_NUM_DELETED_FILES", "type": "long"},
+        {"name": "_PARTITION_STATS", "type": "long"},  # TODO
+        {"name": "_SCHEMA_ID", "type": "long"},
+    ]
+}
diff --git a/paimon-python/pypaimon/common/file_io.py 
b/paimon-python/pypaimon/manifest/schema/simple_stats.py
similarity index 63%
copy from paimon-python/pypaimon/common/file_io.py
copy to paimon-python/pypaimon/manifest/schema/simple_stats.py
index 38602a709b..59c5de710b 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/manifest/schema/simple_stats.py
@@ -15,27 +15,26 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-from abc import ABC, abstractmethod
-from pathlib import Path
 
+from dataclasses import dataclass
 
-class FileIO(ABC):
-    @abstractmethod
-    def exists(self, path: Path) -> bool:
-        """"""
+from pypaimon.table.row.binary_row import BinaryRow
 
-    @abstractmethod
-    def read_file_utf8(self, path: Path) -> str:
-        """"""
 
-    @abstractmethod
-    def try_to_write_atomic(self, path: Path, content: str) -> bool:
-        """"""
+@dataclass
+class SimpleStats:
+    min_value: BinaryRow
+    max_value: BinaryRow
+    null_count: int
 
-    @abstractmethod
-    def list_status(self, path: Path):
-        """"""
 
-    @abstractmethod
-    def mkdirs(self, path: Path) -> bool:
-        """"""
+SIMPLE_STATS_SCHEMA = {
+    "type": "record",
+    "name": "SimpleStats",
+    "namespace": "com.example.paimon",
+    "fields": [
+        {"name": "null_count", "type": ["null", "long"], "default": None},
+        {"name": "min_value", "type": ["null", "bytes"], "default": None},
+        {"name": "max_value", "type": ["null", "bytes"], "default": None},
+    ]
+}
diff --git a/paimon-python/pypaimon/common/file_io.py 
b/paimon-python/pypaimon/snapshot/__init__.py
similarity index 65%
copy from paimon-python/pypaimon/common/file_io.py
copy to paimon-python/pypaimon/snapshot/__init__.py
index 38602a709b..65b48d4d79 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/snapshot/__init__.py
@@ -15,27 +15,3 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-from abc import ABC, abstractmethod
-from pathlib import Path
-
-
-class FileIO(ABC):
-    @abstractmethod
-    def exists(self, path: Path) -> bool:
-        """"""
-
-    @abstractmethod
-    def read_file_utf8(self, path: Path) -> str:
-        """"""
-
-    @abstractmethod
-    def try_to_write_atomic(self, path: Path, content: str) -> bool:
-        """"""
-
-    @abstractmethod
-    def list_status(self, path: Path):
-        """"""
-
-    @abstractmethod
-    def mkdirs(self, path: Path) -> bool:
-        """"""
diff --git a/paimon-python/pypaimon/snapshot/snapshot.py 
b/paimon-python/pypaimon/snapshot/snapshot.py
new file mode 100644
index 0000000000..e2bc62d551
--- /dev/null
+++ b/paimon-python/pypaimon/snapshot/snapshot.py
@@ -0,0 +1,70 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import re
+from dataclasses import dataclass, fields, asdict
+from typing import Optional, Dict, Any
+
+
+@dataclass
+class Snapshot:
+    version: int
+    id: int
+    schema_id: int
+    base_manifest_list: str
+    delta_manifest_list: str
+    commit_user: str
+    commit_identifier: int
+    commit_kind: str
+    time_millis: int
+    log_offsets: Dict[int, int]
+
+    changelog_manifest_list: Optional[str] = None
+    index_manifest: Optional[str] = None
+    total_record_count: Optional[int] = None
+    delta_record_count: Optional[int] = None
+    changelog_record_count: Optional[int] = None
+    watermark: Optional[int] = None
+    statistics: Optional[str] = None
+
+    @classmethod
+    def from_json(cls, data: Dict[str, Any]):
+        known_fields = {field.name for field in fields(Snapshot)}
+        processed_data = {
+            camel_to_snake(key): value
+            for key, value in data.items()
+            if camel_to_snake(key) in known_fields
+        }
+        return Snapshot(**processed_data)
+
+    def to_json(self) -> Dict[str, Any]:
+        snake_case_dict = asdict(self)
+        return {
+            snake_to_camel(key): value
+            for key, value in snake_case_dict.items()
+        }
+
+
+def camel_to_snake(name: str) -> str:
+    s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
+    return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
+
+
+def snake_to_camel(name: str) -> str:
+    parts = name.split('_')
+    return parts[0] + ''.join(word.capitalize() for word in parts[1:])
diff --git a/paimon-python/pypaimon/snapshot/snapshot_manager.py 
b/paimon-python/pypaimon/snapshot/snapshot_manager.py
new file mode 100644
index 0000000000..9e500c48d2
--- /dev/null
+++ b/paimon-python/pypaimon/snapshot/snapshot_manager.py
@@ -0,0 +1,67 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import json
+from typing import Optional
+
+from pypaimon.common.file_io import FileIO
+from pypaimon.snapshot.snapshot import Snapshot
+from pypaimon.table.file_store_table import FileStoreTable
+
+
+class SnapshotManager:
+    """Manager for snapshot files using unified FileIO."""
+
+    def __init__(self, table: FileStoreTable):
+        self.table = table
+        self.file_io: FileIO = self.table.file_io
+        self.snapshot_dir = self.table.table_path / "snapshot"
+
+    def get_latest_snapshot(self) -> Optional[Snapshot]:
+        latest_file = self.snapshot_dir / "LATEST"
+        if not self.file_io.exists(latest_file):
+            return None
+
+        latest_content = self.file_io.read_file_utf8(latest_file)
+        latest_snapshot_id = int(latest_content.strip())
+
+        snapshot_file = self.snapshot_dir / f"snapshot-{latest_snapshot_id}"
+        if not self.file_io.exists(snapshot_file):
+            return None
+
+        snapshot_content = self.file_io.read_file_utf8(snapshot_file)
+        snapshot_data = json.loads(snapshot_content)
+        return Snapshot.from_json(snapshot_data)
+
+    def commit_snapshot(self, snapshot_id: int, snapshot_data: Snapshot):
+        snapshot_file = self.snapshot_dir / f"snapshot-{snapshot_id}"
+        latest_file = self.snapshot_dir / "LATEST"
+
+        try:
+            snapshot_json = json.dumps(snapshot_data.to_json(), indent=2)
+            snapshot_success = self.file_io.try_to_write_atomic(snapshot_file, 
snapshot_json)
+            if not snapshot_success:
+                self.file_io.write_file(snapshot_file, snapshot_json, 
overwrite=True)
+
+            latest_success = self.file_io.try_to_write_atomic(latest_file, 
str(snapshot_id))
+            if not latest_success:
+                self.file_io.write_file(latest_file, str(snapshot_id), 
overwrite=True)
+
+        except Exception as e:
+            self.file_io.delete_quietly(snapshot_file)
+            raise RuntimeError(f"Failed to commit snapshot {snapshot_id}: 
{e}") from e
diff --git a/paimon-python/setup.py b/paimon-python/setup.py
index 0d82a25ffb..499c6ab8dc 100644
--- a/paimon-python/setup.py
+++ b/paimon-python/setup.py
@@ -26,6 +26,7 @@ install_requires = [
     'fsspec==2024.3.1',
     'cachetools==5.3.3',
     'ossfs==2023.12.0'
+    'fastavro==1.11.1'
 ]
 
 long_description = "See Apache Paimon Python API \

Reply via email to