This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 85c28c7e25 [python] Extract _write_manifest_file in FileStoreCommit
85c28c7e25 is described below

commit 85c28c7e25b3408801e8447b68455e1285d5672c
Author: JingsongLi <[email protected]>
AuthorDate: Wed Feb 4 13:42:05 2026 +0800

    [python] Extract _write_manifest_file in FileStoreCommit
---
 paimon-python/pypaimon/write/file_store_commit.py | 127 ++++++++++++----------
 1 file changed, 68 insertions(+), 59 deletions(-)

diff --git a/paimon-python/pypaimon/write/file_store_commit.py 
b/paimon-python/pypaimon/write/file_store_commit.py
index 3adc28a0e2..0b5af2c9c3 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -238,9 +238,6 @@ class FileStoreCommit:
 
         # process new_manifest
         new_manifest_file = f"manifest-{str(uuid.uuid4())}-0"
-        added_file_count = 0
-        deleted_file_count = 0
-        delta_record_count = 0
         # process snapshot
         new_snapshot_id = latest_snapshot.id + 1 if latest_snapshot else 1
 
@@ -259,64 +256,9 @@ class FileStoreCommit:
             # Assign row IDs to new files and get the next row ID for the 
snapshot
             commit_entries, next_row_id = 
self._assign_row_tracking_meta(first_row_id_start, commit_entries)
 
-        for entry in commit_entries:
-            if entry.kind == 0:
-                added_file_count += 1
-                delta_record_count += entry.file.row_count
-            else:
-                deleted_file_count += 1
-                delta_record_count -= entry.file.row_count
         try:
-            self.manifest_file_manager.write(new_manifest_file, commit_entries)
             # TODO: implement noConflictsOrFail logic
-            partition_columns = list(zip(*(entry.partition.values for entry in 
commit_entries)))
-            partition_min_stats = [min(col) for col in partition_columns]
-            partition_max_stats = [max(col) for col in partition_columns]
-            partition_null_counts = [sum(value == 0 for value in col) for col 
in partition_columns]
-            if not all(count == 0 for count in partition_null_counts):
-                raise RuntimeError("Partition value should not be null")
-
-            # Calculate min_row_id and max_row_id from commit_entries
-            min_row_id = None
-            max_row_id = None
-            has_null_first_row_id = False
-            for entry in commit_entries:
-                if entry.file.first_row_id is None:
-                    has_null_first_row_id = True
-                    break
-                entry_min = entry.file.first_row_id
-                entry_max = entry.file.first_row_id + entry.file.row_count - 1
-                if min_row_id is None or entry_min < min_row_id:
-                    min_row_id = entry_min
-                if max_row_id is None or entry_max > max_row_id:
-                    max_row_id = entry_max
-
-            # If any file has first_row_id as None, set both min_row_id and 
max_row_id to None
-            if has_null_first_row_id:
-                min_row_id = None
-                max_row_id = None
-
-            manifest_file_path = 
f"{self.manifest_file_manager.manifest_path}/{new_manifest_file}"
-            new_manifest_file_meta = ManifestFileMeta(
-                file_name=new_manifest_file,
-                file_size=self.table.file_io.get_file_size(manifest_file_path),
-                num_added_files=added_file_count,
-                num_deleted_files=deleted_file_count,
-                partition_stats=SimpleStats(
-                    min_values=GenericRow(
-                        values=partition_min_stats,
-                        fields=self.table.partition_keys_fields
-                    ),
-                    max_values=GenericRow(
-                        values=partition_max_stats,
-                        fields=self.table.partition_keys_fields
-                    ),
-                    null_counts=partition_null_counts,
-                ),
-                schema_id=self.table.table_schema.id,
-                min_row_id=min_row_id,
-                max_row_id=max_row_id,
-            )
+            new_manifest_file_meta = self._write_manifest_file(commit_entries, 
new_manifest_file)
             self.manifest_list_manager.write(delta_manifest_list, 
[new_manifest_file_meta])
 
             # process existing_manifest
@@ -330,6 +272,13 @@ class FileStoreCommit:
                 existing_manifest_files = []
             self.manifest_list_manager.write(base_manifest_list, 
existing_manifest_files)
 
+            delta_record_count = 0
+            for entry in commit_entries:
+                if entry.kind == 0:
+                    delta_record_count += entry.file.row_count
+                else:
+                    delta_record_count -= entry.file.row_count
+
             total_record_count += delta_record_count
             snapshot_data = Snapshot(
                 version=3,
@@ -372,6 +321,66 @@ class FileStoreCommit:
         )
         return SuccessResult()
 
+    def _write_manifest_file(self, commit_entries, new_manifest_file):
+        # Write new manifest file
+        self.manifest_file_manager.write(new_manifest_file, commit_entries)
+
+        # Calculate file count & record count statistics
+        added_file_count = 0
+        deleted_file_count = 0
+        for entry in commit_entries:
+            if entry.kind == 0:
+                added_file_count += 1
+            else:
+                deleted_file_count += 1
+
+        # Calculate partition statistics
+        partition_columns = list(zip(*(entry.partition.values for entry in 
commit_entries)))
+        partition_min_stats = [min(col) for col in partition_columns]
+        partition_max_stats = [max(col) for col in partition_columns]
+        partition_null_counts = [sum(value == 0 for value in col) for col in 
partition_columns]
+        if not all(count == 0 for count in partition_null_counts):
+            raise RuntimeError("Partition value should not be null")
+
+        # Calculate min_row_id and max_row_id from commit_entries
+        min_row_id = None
+        max_row_id = None
+        for entry in commit_entries:
+            if entry.file.first_row_id is None:
+                # If any file has first_row_id as None, set both min_row_id 
and max_row_id to None
+                min_row_id = None
+                max_row_id = None
+                break
+            entry_min = entry.file.first_row_id
+            entry_max = entry.file.first_row_id + entry.file.row_count - 1
+            if min_row_id is None or entry_min < min_row_id:
+                min_row_id = entry_min
+            if max_row_id is None or entry_max > max_row_id:
+                max_row_id = entry_max
+
+        # return new ManifestFileMeta
+        manifest_file_path = 
f"{self.manifest_file_manager.manifest_path}/{new_manifest_file}"
+        return ManifestFileMeta(
+            file_name=new_manifest_file,
+            file_size=self.table.file_io.get_file_size(manifest_file_path),
+            num_added_files=added_file_count,
+            num_deleted_files=deleted_file_count,
+            partition_stats=SimpleStats(
+                min_values=GenericRow(
+                    values=partition_min_stats,
+                    fields=self.table.partition_keys_fields
+                ),
+                max_values=GenericRow(
+                    values=partition_max_stats,
+                    fields=self.table.partition_keys_fields
+                ),
+                null_counts=partition_null_counts,
+            ),
+            schema_id=self.table.table_schema.id,
+            min_row_id=min_row_id,
+            max_row_id=max_row_id,
+        )
+
     def _is_duplicate_commit(self, retry_result, latest_snapshot, 
commit_identifier, commit_kind) -> bool:
         if retry_result is not None and latest_snapshot is not None:
             start_check_snapshot_id = 1  # Snapshot.FIRST_SNAPSHOT_ID

Reply via email to