This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 85c28c7e25 [python] Extract _write_manifest_file in FileStoreCommit
85c28c7e25 is described below
commit 85c28c7e25b3408801e8447b68455e1285d5672c
Author: JingsongLi <[email protected]>
AuthorDate: Wed Feb 4 13:42:05 2026 +0800
[python] Extract _write_manifest_file in FileStoreCommit
---
paimon-python/pypaimon/write/file_store_commit.py | 127 ++++++++++++----------
1 file changed, 68 insertions(+), 59 deletions(-)
diff --git a/paimon-python/pypaimon/write/file_store_commit.py
b/paimon-python/pypaimon/write/file_store_commit.py
index 3adc28a0e2..0b5af2c9c3 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -238,9 +238,6 @@ class FileStoreCommit:
# process new_manifest
new_manifest_file = f"manifest-{str(uuid.uuid4())}-0"
- added_file_count = 0
- deleted_file_count = 0
- delta_record_count = 0
# process snapshot
new_snapshot_id = latest_snapshot.id + 1 if latest_snapshot else 1
@@ -259,64 +256,9 @@ class FileStoreCommit:
# Assign row IDs to new files and get the next row ID for the
snapshot
commit_entries, next_row_id =
self._assign_row_tracking_meta(first_row_id_start, commit_entries)
- for entry in commit_entries:
- if entry.kind == 0:
- added_file_count += 1
- delta_record_count += entry.file.row_count
- else:
- deleted_file_count += 1
- delta_record_count -= entry.file.row_count
try:
- self.manifest_file_manager.write(new_manifest_file, commit_entries)
# TODO: implement noConflictsOrFail logic
- partition_columns = list(zip(*(entry.partition.values for entry in
commit_entries)))
- partition_min_stats = [min(col) for col in partition_columns]
- partition_max_stats = [max(col) for col in partition_columns]
- partition_null_counts = [sum(value == 0 for value in col) for col
in partition_columns]
- if not all(count == 0 for count in partition_null_counts):
- raise RuntimeError("Partition value should not be null")
-
- # Calculate min_row_id and max_row_id from commit_entries
- min_row_id = None
- max_row_id = None
- has_null_first_row_id = False
- for entry in commit_entries:
- if entry.file.first_row_id is None:
- has_null_first_row_id = True
- break
- entry_min = entry.file.first_row_id
- entry_max = entry.file.first_row_id + entry.file.row_count - 1
- if min_row_id is None or entry_min < min_row_id:
- min_row_id = entry_min
- if max_row_id is None or entry_max > max_row_id:
- max_row_id = entry_max
-
- # If any file has first_row_id as None, set both min_row_id and
max_row_id to None
- if has_null_first_row_id:
- min_row_id = None
- max_row_id = None
-
- manifest_file_path =
f"{self.manifest_file_manager.manifest_path}/{new_manifest_file}"
- new_manifest_file_meta = ManifestFileMeta(
- file_name=new_manifest_file,
- file_size=self.table.file_io.get_file_size(manifest_file_path),
- num_added_files=added_file_count,
- num_deleted_files=deleted_file_count,
- partition_stats=SimpleStats(
- min_values=GenericRow(
- values=partition_min_stats,
- fields=self.table.partition_keys_fields
- ),
- max_values=GenericRow(
- values=partition_max_stats,
- fields=self.table.partition_keys_fields
- ),
- null_counts=partition_null_counts,
- ),
- schema_id=self.table.table_schema.id,
- min_row_id=min_row_id,
- max_row_id=max_row_id,
- )
+ new_manifest_file_meta = self._write_manifest_file(commit_entries,
new_manifest_file)
self.manifest_list_manager.write(delta_manifest_list,
[new_manifest_file_meta])
# process existing_manifest
@@ -330,6 +272,13 @@ class FileStoreCommit:
existing_manifest_files = []
self.manifest_list_manager.write(base_manifest_list,
existing_manifest_files)
+ delta_record_count = 0
+ for entry in commit_entries:
+ if entry.kind == 0:
+ delta_record_count += entry.file.row_count
+ else:
+ delta_record_count -= entry.file.row_count
+
total_record_count += delta_record_count
snapshot_data = Snapshot(
version=3,
@@ -372,6 +321,66 @@ class FileStoreCommit:
)
return SuccessResult()
+ def _write_manifest_file(self, commit_entries, new_manifest_file):
+ # Write new manifest file
+ self.manifest_file_manager.write(new_manifest_file, commit_entries)
+
+ # Calculate file count & record count statistics
+ added_file_count = 0
+ deleted_file_count = 0
+ for entry in commit_entries:
+ if entry.kind == 0:
+ added_file_count += 1
+ else:
+ deleted_file_count += 1
+
+ # Calculate partition statistics
+ partition_columns = list(zip(*(entry.partition.values for entry in
commit_entries)))
+ partition_min_stats = [min(col) for col in partition_columns]
+ partition_max_stats = [max(col) for col in partition_columns]
+ partition_null_counts = [sum(value == 0 for value in col) for col in
partition_columns]
+ if not all(count == 0 for count in partition_null_counts):
+ raise RuntimeError("Partition value should not be null")
+
+ # Calculate min_row_id and max_row_id from commit_entries
+ min_row_id = None
+ max_row_id = None
+ for entry in commit_entries:
+ if entry.file.first_row_id is None:
+ # If any file has first_row_id as None, set both min_row_id
and max_row_id to None
+ min_row_id = None
+ max_row_id = None
+ break
+ entry_min = entry.file.first_row_id
+ entry_max = entry.file.first_row_id + entry.file.row_count - 1
+ if min_row_id is None or entry_min < min_row_id:
+ min_row_id = entry_min
+ if max_row_id is None or entry_max > max_row_id:
+ max_row_id = entry_max
+
+ # return new ManifestFileMeta
+ manifest_file_path =
f"{self.manifest_file_manager.manifest_path}/{new_manifest_file}"
+ return ManifestFileMeta(
+ file_name=new_manifest_file,
+ file_size=self.table.file_io.get_file_size(manifest_file_path),
+ num_added_files=added_file_count,
+ num_deleted_files=deleted_file_count,
+ partition_stats=SimpleStats(
+ min_values=GenericRow(
+ values=partition_min_stats,
+ fields=self.table.partition_keys_fields
+ ),
+ max_values=GenericRow(
+ values=partition_max_stats,
+ fields=self.table.partition_keys_fields
+ ),
+ null_counts=partition_null_counts,
+ ),
+ schema_id=self.table.table_schema.id,
+ min_row_id=min_row_id,
+ max_row_id=max_row_id,
+ )
+
def _is_duplicate_commit(self, retry_result, latest_snapshot,
commit_identifier, commit_kind) -> bool:
if retry_result is not None and latest_snapshot is not None:
start_check_snapshot_id = 1 # Snapshot.FIRST_SNAPSHOT_ID