This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit fa4671c59cedb93d73799c7655e87401496efbb9 Author: Jingsong Lee <[email protected]> AuthorDate: Mon Oct 20 10:32:16 2025 +0200 [python] Filter manifest entry by advance to reduce memory (#6428) --- paimon-python/pypaimon/common/predicate_builder.py | 14 +++- .../pypaimon/manifest/manifest_file_manager.py | 4 +- paimon-python/pypaimon/read/push_down_utils.py | 78 +++++--------------- .../pypaimon/read/scanner/full_starting_scanner.py | 83 +++++++--------------- .../read/scanner/incremental_starting_scanner.py | 8 +-- paimon-python/pypaimon/read/table_read.py | 13 ++-- paimon-python/pypaimon/schema/table_schema.py | 3 + paimon-python/pypaimon/table/row/generic_row.py | 26 +++++-- paimon-python/pypaimon/table/row/offset_row.py | 12 ++-- paimon-python/pypaimon/tests/predicates_test.py | 4 +- .../pypaimon/tests/py36/rest_ao_read_write_test.py | 2 +- paimon-python/pypaimon/tests/reader_base_test.py | 2 +- 12 files changed, 95 insertions(+), 154 deletions(-) diff --git a/paimon-python/pypaimon/common/predicate_builder.py b/paimon-python/pypaimon/common/predicate_builder.py index bf668eed50..7b96bb6134 100644 --- a/paimon-python/pypaimon/common/predicate_builder.py +++ b/paimon-python/pypaimon/common/predicate_builder.py @@ -101,8 +101,13 @@ class PredicateBuilder: """Create a between predicate.""" return self._build_predicate('between', field, [included_lower_bound, included_upper_bound]) - def and_predicates(self, predicates: List[Predicate]) -> Predicate: + @staticmethod + def and_predicates(predicates: List[Predicate]) -> Optional[Predicate]: """Create an AND predicate from multiple predicates.""" + if len(predicates) == 0: + return None + if len(predicates) == 1: + return predicates[0] return Predicate( method='and', index=None, @@ -110,8 +115,13 @@ class PredicateBuilder: literals=predicates ) - def or_predicates(self, predicates: List[Predicate]) -> Predicate: + @staticmethod + def or_predicates(predicates: List[Predicate]) -> Optional[Predicate]: """Create an OR predicate from multiple predicates.""" + if len(predicates) == 0: + return None + if len(predicates) == 1: + return predicates[0] return Predicate( method='or', index=None, diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index 9893b8659a..07e434dd37 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -41,7 +41,7 @@ class ManifestFileManager: self.primary_key_fields = self.table.table_schema.get_primary_key_fields() self.trimmed_primary_key_fields = self.table.table_schema.get_trimmed_primary_key_fields() - def read(self, manifest_file_name: str, bucket_filter=None) -> List[ManifestEntry]: + def read(self, manifest_file_name: str, manifest_entry_filter=None) -> List[ManifestEntry]: manifest_file_path = self.manifest_path / manifest_file_name entries = [] @@ -105,7 +105,7 @@ class ManifestFileManager: total_buckets=record['_TOTAL_BUCKETS'], file=file_meta ) - if bucket_filter is not None and not bucket_filter(entry): + if manifest_entry_filter is not None and not manifest_entry_filter(entry): continue entries.append(entry) return entries diff --git a/paimon-python/pypaimon/read/push_down_utils.py b/paimon-python/pypaimon/read/push_down_utils.py index 64e7c238f8..43892fb5e6 100644 --- a/paimon-python/pypaimon/read/push_down_utils.py +++ b/paimon-python/pypaimon/read/push_down_utils.py @@ -19,31 +19,29 @@ from typing import Dict, List, Set from pypaimon.common.predicate import Predicate +from pypaimon.common.predicate_builder import PredicateBuilder -def to_partition_predicate(input_predicate: 'Predicate', all_fields: List[str], partition_keys: List[str]): - if not input_predicate or not partition_keys: - return None - - predicates: list['Predicate'] = _split_and(input_predicate) - predicates = [element for element in predicates if _get_all_fields(element).issubset(partition_keys)] - new_predicate = Predicate( - method='and', - index=None, - field=None, - literals=predicates - ) - - part_to_index = {element: idx for idx, element in enumerate(partition_keys)} +def filter_and_transform_predicate(input_predicate: Predicate, all_fields: List[str], fields: List[str]): + new_predicate = filter_predicate_by_fields(input_predicate, fields) + part_to_index = {element: idx for idx, element in enumerate(fields)} mapping: Dict[int, int] = { i: part_to_index.get(all_fields[i], -1) for i in range(len(all_fields)) } - return _change_index(new_predicate, mapping) -def _split_and(input_predicate: 'Predicate'): +def filter_predicate_by_fields(input_predicate: Predicate, fields: List[str]): + if not input_predicate or not fields: + return None + + predicates: list[Predicate] = _split_and(input_predicate) + predicates = [element for element in predicates if _get_all_fields(element).issubset(fields)] + return PredicateBuilder.and_predicates(predicates) + + +def _split_and(input_predicate: Predicate): if not input_predicate: return list() @@ -53,38 +51,19 @@ def _split_and(input_predicate: 'Predicate'): return [input_predicate] -def _change_index(input_predicate: 'Predicate', mapping: Dict[int, int]): +def _change_index(input_predicate: Predicate, mapping: Dict[int, int]): if not input_predicate: return None if input_predicate.method == 'and' or input_predicate.method == 'or': - predicates: list['Predicate'] = input_predicate.literals + predicates: list[Predicate] = input_predicate.literals new_predicates = [_change_index(element, mapping) for element in predicates] return input_predicate.new_literals(new_predicates) return input_predicate.new_index(mapping[input_predicate.index]) -def extract_predicate_to_list(result: list, input_predicate: 'Predicate', keys: List[str]): - if not input_predicate or not keys: - return - - if input_predicate.method == 'and': - for sub_predicate in input_predicate.literals: - extract_predicate_to_list(result, sub_predicate, keys) - return - elif input_predicate.method == 'or': - # condition: involved keys all belong to primary keys - involved_fields = _get_all_fields(input_predicate) - if involved_fields and involved_fields.issubset(keys): - result.append(input_predicate) - return - - if input_predicate.field in keys: - result.append(input_predicate) - - -def _get_all_fields(predicate: 'Predicate') -> Set[str]: +def _get_all_fields(predicate: Predicate) -> Set[str]: if predicate.field is not None: return {predicate.field} involved_fields = set() @@ -92,26 +71,3 @@ def _get_all_fields(predicate: 'Predicate') -> Set[str]: for sub_predicate in predicate.literals: involved_fields.update(_get_all_fields(sub_predicate)) return involved_fields - - -def extract_predicate_to_dict(result: Dict, input_predicate: 'Predicate', keys: List[str]): - if not input_predicate or not keys: - return - - if input_predicate.method == 'and': - for sub_predicate in input_predicate.literals: - extract_predicate_to_dict(result, sub_predicate, keys) - return - elif input_predicate.method == 'or': - # ensure no recursive and/or - if not input_predicate.literals or any(p.field is None for p in input_predicate.literals): - return - # condition: only one key for 'or', and the key belongs to keys - involved_fields = {p.field for p in input_predicate.literals} - field = involved_fields.pop() if len(involved_fields) == 1 else None - if field is not None and field in keys: - result[field].append(input_predicate) - return - - if input_predicate.field in keys: - result[input_predicate.field].append(input_predicate) diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py index a363a6fb6f..156b05cdb6 100644 --- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py +++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py @@ -20,7 +20,6 @@ from typing import Callable, List, Optional from pypaimon.common.core_options import CoreOptions from pypaimon.common.predicate import Predicate -from pypaimon.common.predicate_builder import PredicateBuilder from pypaimon.manifest.manifest_file_manager import ManifestFileManager from pypaimon.manifest.manifest_list_manager import ManifestListManager from pypaimon.manifest.schema.data_file_meta import DataFileMeta @@ -28,9 +27,7 @@ from pypaimon.manifest.schema.manifest_entry import ManifestEntry from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta from pypaimon.read.interval_partition import IntervalPartition, SortedRun from pypaimon.read.plan import Plan -from pypaimon.read.push_down_utils import (extract_predicate_to_dict, - extract_predicate_to_list, - to_partition_predicate) +from pypaimon.read.push_down_utils import (filter_and_transform_predicate) from pypaimon.read.scanner.starting_scanner import StartingScanner from pypaimon.read.split import Split from pypaimon.snapshot.snapshot_manager import SnapshotManager @@ -49,14 +46,11 @@ class FullStartingScanner(StartingScanner): self.manifest_list_manager = ManifestListManager(table) self.manifest_file_manager = ManifestFileManager(table) - pk_conditions = [] - trimmed_pk = [field.name for field in self.table.table_schema.get_trimmed_primary_key_fields()] - extract_predicate_to_list(pk_conditions, self.predicate, trimmed_pk) - self.primary_key_predicate = PredicateBuilder(self.table.fields).and_predicates(pk_conditions) + self.primary_key_predicate = filter_and_transform_predicate( + self.predicate, self.table.field_names, self.table.table_schema.get_trimmed_primary_keys()) - partition_conditions = defaultdict(list) - extract_predicate_to_dict(partition_conditions, self.predicate, self.table.partition_keys) - self.partition_key_predicate = partition_conditions + self.partition_key_predicate = filter_and_transform_predicate( + self.predicate, self.table.field_names, self.table.partition_keys) self.target_split_size = 128 * 1024 * 1024 self.open_file_cost = 4 * 1024 * 1024 @@ -82,29 +76,29 @@ class FullStartingScanner(StartingScanner): splits = self._apply_push_down_limit(splits) return Plan(splits) - def _read_manifest_files(self) -> List[ManifestFileMeta]: + def plan_files(self) -> List[ManifestEntry]: latest_snapshot = self.snapshot_manager.get_latest_snapshot() if not latest_snapshot: return [] manifest_files = self.manifest_list_manager.read_all(latest_snapshot) - partition_predicate = to_partition_predicate(self.predicate, self.table.field_names, self.table.partition_keys) + return self.read_manifest_entries(manifest_files) - def test_predicate(file: ManifestFileMeta) -> bool: - if not partition_predicate: + def read_manifest_entries(self, manifest_files: List[ManifestFileMeta]) -> List[ManifestEntry]: + def filter_manifest_file(file: ManifestFileMeta) -> bool: + if not self.partition_key_predicate: return True - return partition_predicate.test_by_simple_stats( + return self.partition_key_predicate.test_by_simple_stats( file.partition_stats, file.num_added_files + file.num_deleted_files) - return [file for file in manifest_files if test_predicate(file)] - - def plan_files(self) -> List[ManifestEntry]: - manifest_files = self._read_manifest_files() deleted_entries = set() added_entries = [] for manifest_file in manifest_files: - manifest_entries = self.manifest_file_manager.read(manifest_file.file_name, - lambda row: self._bucket_filter(row)) + if not filter_manifest_file(manifest_file): + continue + manifest_entries = self.manifest_file_manager.read( + manifest_file.file_name, + lambda row: self._filter_manifest_entry(row)) for entry in manifest_entries: if entry.kind == 0: added_entries.append(entry) @@ -115,8 +109,6 @@ class FullStartingScanner(StartingScanner): entry for entry in added_entries if (tuple(entry.partition.values), entry.bucket, entry.file.file_name) not in deleted_entries ] - if self.predicate: - file_entries = self._filter_by_predicate(file_entries) return file_entries def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 'FullStartingScanner': @@ -203,12 +195,6 @@ class FullStartingScanner(StartingScanner): filtered_entries.append(entry) return filtered_entries - def _bucket_filter(self, entry: Optional[ManifestEntry]) -> bool: - bucket = entry.bucket - if self.only_read_real_buckets and bucket < 0: - return False - return True - def _apply_push_down_limit(self, splits: List[Split]) -> List[Split]: if self.limit is None: return splits @@ -224,45 +210,26 @@ class FullStartingScanner(StartingScanner): return limited_splits - def _filter_by_predicate(self, file_entries: List[ManifestEntry]) -> List[ManifestEntry]: - if not self.predicate: - return file_entries - - filtered_files = [] - for file_entry in file_entries: - if self.partition_key_predicate and not self._filter_by_partition(file_entry): - continue - if not self._filter_by_stats(file_entry): - continue - filtered_files.append(file_entry) - - return filtered_files - - def _filter_by_partition(self, file_entry: ManifestEntry) -> bool: - partition_dict = file_entry.partition.to_dict() - for field_name, conditions in self.partition_key_predicate.items(): - partition_value = partition_dict[field_name] - for predicate in conditions: - if not predicate.test_by_value(partition_value): - return False - return True - - def _filter_by_stats(self, file_entry: ManifestEntry) -> bool: - if file_entry.kind != 0: + def _filter_manifest_entry(self, entry: ManifestEntry) -> bool: + if self.only_read_real_buckets and entry.bucket < 0: + return False + if self.partition_key_predicate and not self.partition_key_predicate.test(entry.partition): return False if self.table.is_primary_key_table: predicate = self.primary_key_predicate - stats = file_entry.file.key_stats + stats = entry.file.key_stats else: predicate = self.predicate - stats = file_entry.file.value_stats + stats = entry.file.value_stats + if not predicate: + return True return predicate.test_by_stats({ "min_values": stats.min_values.to_dict(), "max_values": stats.max_values.to_dict(), "null_counts": { stats.min_values.fields[i].name: stats.null_counts[i] for i in range(len(stats.min_values.fields)) }, - "row_count": file_entry.file.row_count, + "row_count": entry.file.row_count }) def _create_append_only_splits(self, file_entries: List[ManifestEntry]) -> List['Split']: diff --git a/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py b/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py index ead58d260a..0139dafc40 100644 --- a/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py +++ b/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py @@ -43,13 +43,7 @@ class IncrementalStartingScanner(FullStartingScanner): for snapshot in snapshots_in_range: # Get manifest files for this snapshot manifest_files = self.manifest_list_manager.read_delta(snapshot) - - # Read all entries from manifest files - for manifest_file in manifest_files: - entries = self.manifest_file_manager.read(manifest_file.file_name) - file_entries.extend(entries) - if self.predicate: - file_entries = self._filter_by_predicate(file_entries) + file_entries.extend(self.read_manifest_entries(manifest_files)) return file_entries @staticmethod diff --git a/paimon-python/pypaimon/read/table_read.py b/paimon-python/pypaimon/read/table_read.py index 4c2c615d89..4bf07d37a5 100644 --- a/paimon-python/pypaimon/read/table_read.py +++ b/paimon-python/pypaimon/read/table_read.py @@ -22,8 +22,7 @@ import pyarrow from pypaimon.common.core_options import CoreOptions from pypaimon.common.predicate import Predicate -from pypaimon.common.predicate_builder import PredicateBuilder -from pypaimon.read.push_down_utils import extract_predicate_to_list +from pypaimon.read.push_down_utils import filter_predicate_by_fields from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader from pypaimon.read.split import Split from pypaimon.read.split_read import (MergeFileSplitRead, RawFileSplitRead, @@ -113,14 +112,10 @@ class TableRead: if self.predicate is None: return None elif self.table.is_primary_key_table: - result = [] - extract_predicate_to_list(result, self.predicate, self.table.primary_keys) - if result: - # the field index is unused for arrow field - pk_predicates = (PredicateBuilder(self.table.fields).and_predicates(result)).to_arrow() - return pk_predicates - else: + pk_predicate = filter_predicate_by_fields(self.predicate, self.table.primary_keys) + if not pk_predicate: return None + return pk_predicate.to_arrow() else: return self.predicate.to_arrow() diff --git a/paimon-python/pypaimon/schema/table_schema.py b/paimon-python/pypaimon/schema/table_schema.py index f74d713e9f..d1875a7b0b 100644 --- a/paimon-python/pypaimon/schema/table_schema.py +++ b/paimon-python/pypaimon/schema/table_schema.py @@ -170,3 +170,6 @@ class TableSchema: "this will result in only one record in a partition") field_map = {field.name: field for field in self.fields} return [field_map[name] for name in adjusted if name in field_map] + + def get_trimmed_primary_keys(self) -> List[str]: + return [field.name for field in self.get_trimmed_primary_key_fields()] diff --git a/paimon-python/pypaimon/table/row/generic_row.py b/paimon-python/pypaimon/table/row/generic_row.py index 5f8f9f6d9b..b409d3f2eb 100644 --- a/paimon-python/pypaimon/table/row/generic_row.py +++ b/paimon-python/pypaimon/table/row/generic_row.py @@ -23,19 +23,35 @@ from decimal import Decimal from typing import Any, List from pypaimon.schema.data_types import AtomicType, DataField, DataType -from pypaimon.table.row.row_kind import RowKind +from pypaimon.table.row.internal_row import InternalRow, RowKind from pypaimon.table.row.blob import BlobData @dataclass -class GenericRow: - values: List[Any] - fields: List[DataField] - row_kind: RowKind = RowKind.INSERT +class GenericRow(InternalRow): + + def __init__(self, values: List[Any], fields: List[DataField], row_kind: RowKind = RowKind.INSERT): + self.values = values + self.fields = fields + self.row_kind = row_kind def to_dict(self): return {self.fields[i].name: self.values[i] for i in range(len(self.fields))} + def get_field(self, pos: int): + if pos >= len(self.values): + raise IndexError(f"Position {pos} is out of bounds for row arity {len(self.values)}") + return self.values[pos] + + def is_null_at(self, pos: int) -> bool: + return self.get_field(pos) is None + + def get_row_kind(self) -> RowKind: + return self.row_kind + + def __len__(self) -> int: + return len(self.values) + class GenericRowDeserializer: HEADER_SIZE_IN_BITS = 8 diff --git a/paimon-python/pypaimon/table/row/offset_row.py b/paimon-python/pypaimon/table/row/offset_row.py index 90f23d2126..9a51a246c9 100644 --- a/paimon-python/pypaimon/table/row/offset_row.py +++ b/paimon-python/pypaimon/table/row/offset_row.py @@ -36,6 +36,12 @@ class OffsetRow(InternalRow): raise ValueError(f"Offset {self.offset} plus arity {self.arity} is out of row length {len(row_tuple)}") return self + def set_row_kind_byte(self, row_kind_byte: int) -> None: + """ + Store RowKind as a byte and instantiate it lazily to avoid performance overhead. + """ + self.row_kind_byte = row_kind_byte + def get_field(self, pos: int): if pos >= self.arity: raise IndexError(f"Position {pos} is out of bounds for row arity {self.arity}") @@ -47,11 +53,5 @@ class OffsetRow(InternalRow): def get_row_kind(self) -> RowKind: return RowKind(self.row_kind_byte) - def set_row_kind_byte(self, row_kind_byte: int) -> None: - """ - Store RowKind as a byte and instantiate it lazily to avoid performance overhead. - """ - self.row_kind_byte = row_kind_byte - def __len__(self) -> int: return self.arity diff --git a/paimon-python/pypaimon/tests/predicates_test.py b/paimon-python/pypaimon/tests/predicates_test.py index 9ab1cfbb37..a3a0e3229c 100644 --- a/paimon-python/pypaimon/tests/predicates_test.py +++ b/paimon-python/pypaimon/tests/predicates_test.py @@ -419,7 +419,7 @@ class PredicateTest(unittest.TestCase): table_commit.close() # test filter by partition - predicate_builder: PredicateBuilder = table.new_read_builder().new_predicate_builder() + predicate_builder = table.new_read_builder().new_predicate_builder() p1 = predicate_builder.startswith('dt1', "p1") p2 = predicate_builder.is_in('dt1', ["p2"]) p3 = predicate_builder.or_predicates([p1, p2]) @@ -433,7 +433,7 @@ class PredicateTest(unittest.TestCase): self.assertEqual(splits[1].partition.to_dict()["dt2"], 2) # test filter by stats - predicate_builder: PredicateBuilder = table.new_read_builder().new_predicate_builder() + predicate_builder = table.new_read_builder().new_predicate_builder() p1 = predicate_builder.equal('key1', 7) p2 = predicate_builder.is_in('key2', ["e", "f"]) p3 = predicate_builder.or_predicates([p1, p2]) diff --git a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py index fe6902a5f5..a184f64866 100644 --- a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py +++ b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py @@ -180,7 +180,7 @@ class RESTAOReadWritePy36Test(RESTBaseTest): latest_snapshot = SnapshotManager(table).get_latest_snapshot() manifest_files = table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot) manifest_entries = table_scan.starting_scanner.manifest_file_manager.read( - manifest_files[0].file_name, lambda row: table_scan.starting_scanner._bucket_filter(row)) + manifest_files[0].file_name, lambda row: table_scan.starting_scanner._filter_manifest_entry(row)) min_value_stats = manifest_entries[0].file.value_stats.min_values.values max_value_stats = manifest_entries[0].file.value_stats.max_values.values expected_min_values = [col[0].as_py() for col in expect_data] diff --git a/paimon-python/pypaimon/tests/reader_base_test.py b/paimon-python/pypaimon/tests/reader_base_test.py index 6bb2cdd675..d158c824ef 100644 --- a/paimon-python/pypaimon/tests/reader_base_test.py +++ b/paimon-python/pypaimon/tests/reader_base_test.py @@ -217,7 +217,7 @@ class ReaderBasicTest(unittest.TestCase): latest_snapshot = SnapshotManager(table).get_latest_snapshot() manifest_files = table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot) manifest_entries = table_scan.starting_scanner.manifest_file_manager.read( - manifest_files[0].file_name, lambda row: table_scan.starting_scanner._bucket_filter(row)) + manifest_files[0].file_name, lambda row: table_scan.starting_scanner._filter_manifest_entry(row)) min_value_stats = manifest_entries[0].file.value_stats.min_values.values max_value_stats = manifest_entries[0].file.value_stats.max_values.values expected_min_values = [col[0].as_py() for col in expect_data]
