This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b0d139ab91 [python] Make FileStoreWrite.max_seq_numbers lazied (#6418)
b0d139ab91 is described below
commit b0d139ab91548bd28697deb4356b54e780d234a2
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Oct 17 22:14:53 2025 +0200
[python] Make FileStoreWrite.max_seq_numbers lazied (#6418)
---
paimon-python/pypaimon/write/file_store_write.py | 55 ++++++++++++------------
1 file changed, 27 insertions(+), 28 deletions(-)
diff --git a/paimon-python/pypaimon/write/file_store_write.py
b/paimon-python/pypaimon/write/file_store_write.py
index 35b4a7d980..9ebabf1103 100644
--- a/paimon-python/pypaimon/write/file_store_write.py
+++ b/paimon-python/pypaimon/write/file_store_write.py
@@ -24,6 +24,7 @@ from pypaimon.write.writer.append_only_data_writer import
AppendOnlyDataWriter
from pypaimon.write.writer.data_blob_writer import DataBlobWriter
from pypaimon.write.writer.data_writer import DataWriter
from pypaimon.write.writer.key_value_data_writer import KeyValueDataWriter
+from pypaimon.table.bucket_mode import BucketMode
class FileStoreWrite:
@@ -34,7 +35,7 @@ class FileStoreWrite:
self.table: FileStoreTable = table
self.data_writers: Dict[Tuple, DataWriter] = {}
- self.max_seq_numbers = self._seq_number_stats() # TODO: build this
on-demand instead of on all
+ self.max_seq_numbers: dict = {}
self.write_cols = None
def write(self, partition: Tuple, bucket: int, data: pa.RecordBatch):
@@ -45,27 +46,29 @@ class FileStoreWrite:
writer.write(data)
def _create_data_writer(self, partition: Tuple, bucket: int) -> DataWriter:
+ def max_seq_number():
+ return self._seq_number_stats(partition).get(bucket, 1)
# Check if table has blob columns
if self._has_blob_columns():
return DataBlobWriter(
table=self.table,
partition=partition,
bucket=bucket,
- max_seq_number=self.max_seq_numbers.get((partition, bucket),
1),
+ max_seq_number=0,
)
elif self.table.is_primary_key_table:
return KeyValueDataWriter(
table=self.table,
partition=partition,
bucket=bucket,
- max_seq_number=self.max_seq_numbers.get((partition, bucket),
1),
- )
+ max_seq_number=max_seq_number())
else:
+ seq_number = 0 if self.table.bucket_mode() ==
BucketMode.BUCKET_UNAWARE else max_seq_number()
return AppendOnlyDataWriter(
table=self.table,
partition=partition,
bucket=bucket,
- max_seq_number=self.max_seq_numbers.get((partition, bucket),
1),
+ max_seq_number=seq_number,
write_cols=self.write_cols
)
@@ -99,32 +102,28 @@ class FileStoreWrite:
writer.close()
self.data_writers.clear()
- def _seq_number_stats(self) -> dict:
- from pypaimon.manifest.manifest_file_manager import ManifestFileManager
- from pypaimon.manifest.manifest_list_manager import ManifestListManager
- from pypaimon.snapshot.snapshot_manager import SnapshotManager
-
- snapshot_manager = SnapshotManager(self.table)
- manifest_list_manager = ManifestListManager(self.table)
- manifest_file_manager = ManifestFileManager(self.table)
+ def _seq_number_stats(self, partition: Tuple) -> Dict[int, int]:
+ buckets = self.max_seq_numbers.get(partition)
+ if buckets is None:
+ buckets = self._load_seq_number_stats(partition)
+ self.max_seq_numbers[partition] = buckets
+ return buckets
- latest_snapshot = snapshot_manager.get_latest_snapshot()
- if not latest_snapshot:
- return {}
- manifest_files = manifest_list_manager.read_all(latest_snapshot)
+ def _load_seq_number_stats(self, partition: Tuple) -> dict:
+ read_builder = self.table.new_read_builder()
+ predicate_builder = read_builder.new_predicate_builder()
+ sub_predicates = []
+ for key, value in zip(self.table.partition_keys, partition):
+ sub_predicates.append(predicate_builder.equal(key, value))
+ partition_filter = predicate_builder.and_predicates(sub_predicates)
- file_entries = []
- for manifest_file in manifest_files:
- manifest_entries =
manifest_file_manager.read(manifest_file.file_name)
- for entry in manifest_entries:
- if entry.kind == 0:
- file_entries.append(entry)
+ scan = read_builder.with_filter(partition_filter).new_scan()
+ splits = scan.plan().splits()
max_seq_numbers = {}
- for entry in file_entries:
- partition_key = (tuple(entry.partition.values), entry.bucket)
- current_seq_num = entry.file.max_sequence_number
- existing_max = max_seq_numbers.get(partition_key, -1)
+ for split in splits:
+ current_seq_num = max([file.max_sequence_number for file in
split.files])
+ existing_max = max_seq_numbers.get(split.bucket, -1)
if current_seq_num > existing_max:
- max_seq_numbers[partition_key] = current_seq_num
+ max_seq_numbers[split.bucket] = current_seq_num
return max_seq_numbers