This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d673ba5db7 [Python] filter_manifest_entry should not evolution primary
keys
d673ba5db7 is described below
commit d673ba5db7fdcbe7e38f0f6da1a952d8ae46b112
Author: JingsongLi <[email protected]>
AuthorDate: Tue Oct 21 21:47:50 2025 +0800
[Python] filter_manifest_entry should not evolution primary keys
---
.../pypaimon/read/scanner/full_starting_scanner.py | 35 +++++++++++-----------
paimon-python/pypaimon/table/row/projected_row.py | 1 -
paimon-python/pypaimon/write/batch_table_write.py | 6 ++--
3 files changed, 19 insertions(+), 23 deletions(-)
diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
index b94364db99..dc1b178135 100644
--- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
@@ -235,29 +235,28 @@ class FullStartingScanner(StartingScanner):
# Apply evolution to stats
if self.table.is_primary_key_table:
- predicate = self.primary_key_predicate
- stats = entry.file.key_stats
- stats_fields = None
+ if not self.primary_key_predicate:
+ return True
+ return self.primary_key_predicate.test_by_simple_stats(
+ entry.file.key_stats,
+ entry.file.row_count
+ )
else:
- predicate = self.predicate
- stats = entry.file.value_stats
+ if not self.predicate:
+ return True
if entry.file.value_stats_cols is None and entry.file.write_cols
is not None:
stats_fields = entry.file.write_cols
else:
stats_fields = entry.file.value_stats_cols
- if not predicate:
- return True
- evolved_stats = evolution.evolution(
- stats,
- entry.file.row_count,
- stats_fields
- )
-
- # Test predicate against evolved stats
- return predicate.test_by_simple_stats(
- evolved_stats,
- entry.file.row_count
- )
+ evolved_stats = evolution.evolution(
+ entry.file.value_stats,
+ entry.file.row_count,
+ stats_fields
+ )
+ return self.predicate.test_by_simple_stats(
+ evolved_stats,
+ entry.file.row_count
+ )
def _create_append_only_splits(self, file_entries: List[ManifestEntry]) ->
List['Split']:
partitioned_files = defaultdict(list)
diff --git a/paimon-python/pypaimon/table/row/projected_row.py
b/paimon-python/pypaimon/table/row/projected_row.py
index d7a1cc6f40..dff63c372c 100644
--- a/paimon-python/pypaimon/table/row/projected_row.py
+++ b/paimon-python/pypaimon/table/row/projected_row.py
@@ -46,7 +46,6 @@ class ProjectedRow(InternalRow):
def get_field(self, pos: int) -> Any:
"""Returns the value at the given position."""
if self.index_mapping[pos] < 0:
- # TODO move this logical to hive
return None
return self.row.get_field(self.index_mapping[pos])
diff --git a/paimon-python/pypaimon/write/batch_table_write.py
b/paimon-python/pypaimon/write/batch_table_write.py
index a71e9c0503..f8d0660bfa 100644
--- a/paimon-python/pypaimon/write/batch_table_write.py
+++ b/paimon-python/pypaimon/write/batch_table_write.py
@@ -36,14 +36,12 @@ class BatchTableWrite:
self.row_key_extractor = self.table.create_row_key_extractor()
self.batch_committed = False
- def write_arrow(self, table: pa.Table, row_kind: List[int] = None):
- # TODO: support row_kind
+ def write_arrow(self, table: pa.Table):
batches_iterator = table.to_batches()
for batch in batches_iterator:
self.write_arrow_batch(batch)
- def write_arrow_batch(self, data: pa.RecordBatch, row_kind: List[int] =
None):
- # TODO: support row_kind
+ def write_arrow_batch(self, data: pa.RecordBatch):
self._validate_pyarrow_schema(data.schema)
partitions, buckets =
self.row_key_extractor.extract_partition_bucket_batch(data)