This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b0d139ab91 [python] Make FileStoreWrite.max_seq_numbers lazied (#6418)
b0d139ab91 is described below

commit b0d139ab91548bd28697deb4356b54e780d234a2
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Oct 17 22:14:53 2025 +0200

    [python] Make FileStoreWrite.max_seq_numbers lazied (#6418)
---
 paimon-python/pypaimon/write/file_store_write.py | 55 ++++++++++++------------
 1 file changed, 27 insertions(+), 28 deletions(-)

diff --git a/paimon-python/pypaimon/write/file_store_write.py 
b/paimon-python/pypaimon/write/file_store_write.py
index 35b4a7d980..9ebabf1103 100644
--- a/paimon-python/pypaimon/write/file_store_write.py
+++ b/paimon-python/pypaimon/write/file_store_write.py
@@ -24,6 +24,7 @@ from pypaimon.write.writer.append_only_data_writer import 
AppendOnlyDataWriter
 from pypaimon.write.writer.data_blob_writer import DataBlobWriter
 from pypaimon.write.writer.data_writer import DataWriter
 from pypaimon.write.writer.key_value_data_writer import KeyValueDataWriter
+from pypaimon.table.bucket_mode import BucketMode
 
 
 class FileStoreWrite:
@@ -34,7 +35,7 @@ class FileStoreWrite:
 
         self.table: FileStoreTable = table
         self.data_writers: Dict[Tuple, DataWriter] = {}
-        self.max_seq_numbers = self._seq_number_stats()  # TODO: build this 
on-demand instead of on all
+        self.max_seq_numbers: dict = {}
         self.write_cols = None
 
     def write(self, partition: Tuple, bucket: int, data: pa.RecordBatch):
@@ -45,27 +46,29 @@ class FileStoreWrite:
         writer.write(data)
 
     def _create_data_writer(self, partition: Tuple, bucket: int) -> DataWriter:
+        def max_seq_number():
+            return self._seq_number_stats(partition).get(bucket, 1)
         # Check if table has blob columns
         if self._has_blob_columns():
             return DataBlobWriter(
                 table=self.table,
                 partition=partition,
                 bucket=bucket,
-                max_seq_number=self.max_seq_numbers.get((partition, bucket), 
1),
+                max_seq_number=0,
             )
         elif self.table.is_primary_key_table:
             return KeyValueDataWriter(
                 table=self.table,
                 partition=partition,
                 bucket=bucket,
-                max_seq_number=self.max_seq_numbers.get((partition, bucket), 
1),
-            )
+                max_seq_number=max_seq_number())
         else:
+            seq_number = 0 if self.table.bucket_mode() == 
BucketMode.BUCKET_UNAWARE else max_seq_number()
             return AppendOnlyDataWriter(
                 table=self.table,
                 partition=partition,
                 bucket=bucket,
-                max_seq_number=self.max_seq_numbers.get((partition, bucket), 
1),
+                max_seq_number=seq_number,
                 write_cols=self.write_cols
             )
 
@@ -99,32 +102,28 @@ class FileStoreWrite:
             writer.close()
         self.data_writers.clear()
 
-    def _seq_number_stats(self) -> dict:
-        from pypaimon.manifest.manifest_file_manager import ManifestFileManager
-        from pypaimon.manifest.manifest_list_manager import ManifestListManager
-        from pypaimon.snapshot.snapshot_manager import SnapshotManager
-
-        snapshot_manager = SnapshotManager(self.table)
-        manifest_list_manager = ManifestListManager(self.table)
-        manifest_file_manager = ManifestFileManager(self.table)
+    def _seq_number_stats(self, partition: Tuple) -> Dict[int, int]:
+        buckets = self.max_seq_numbers.get(partition)
+        if buckets is None:
+            buckets = self._load_seq_number_stats(partition)
+            self.max_seq_numbers[partition] = buckets
+        return buckets
 
-        latest_snapshot = snapshot_manager.get_latest_snapshot()
-        if not latest_snapshot:
-            return {}
-        manifest_files = manifest_list_manager.read_all(latest_snapshot)
+    def _load_seq_number_stats(self, partition: Tuple) -> dict:
+        read_builder = self.table.new_read_builder()
+        predicate_builder = read_builder.new_predicate_builder()
+        sub_predicates = []
+        for key, value in zip(self.table.partition_keys, partition):
+            sub_predicates.append(predicate_builder.equal(key, value))
+        partition_filter = predicate_builder.and_predicates(sub_predicates)
 
-        file_entries = []
-        for manifest_file in manifest_files:
-            manifest_entries = 
manifest_file_manager.read(manifest_file.file_name)
-            for entry in manifest_entries:
-                if entry.kind == 0:
-                    file_entries.append(entry)
+        scan = read_builder.with_filter(partition_filter).new_scan()
+        splits = scan.plan().splits()
 
         max_seq_numbers = {}
-        for entry in file_entries:
-            partition_key = (tuple(entry.partition.values), entry.bucket)
-            current_seq_num = entry.file.max_sequence_number
-            existing_max = max_seq_numbers.get(partition_key, -1)
+        for split in splits:
+            current_seq_num = max([file.max_sequence_number for file in 
split.files])
+            existing_max = max_seq_numbers.get(split.bucket, -1)
             if current_seq_num > existing_max:
-                max_seq_numbers[partition_key] = current_seq_num
+                max_seq_numbers[split.bucket] = current_seq_num
         return max_seq_numbers

Reply via email to