This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit d09c4f1d9ce7da29f0489e0c62c1442af0fadf75 Author: JingsongLi <[email protected]> AuthorDate: Thu Oct 23 16:13:53 2025 +0800 [python] Split read should discard predicate for other fields --- paimon-python/pypaimon/common/predicate.py | 21 +++++------ paimon-python/pypaimon/read/split_read.py | 41 +++++++++++++--------- .../pypaimon/tests/schema_evolution_read_test.py | 10 +++--- 3 files changed, 39 insertions(+), 33 deletions(-) diff --git a/paimon-python/pypaimon/common/predicate.py b/paimon-python/pypaimon/common/predicate.py index 89c82c9de2..9ae2cdfce3 100644 --- a/paimon-python/pypaimon/common/predicate.py +++ b/paimon-python/pypaimon/common/predicate.py @@ -94,10 +94,10 @@ class Predicate: def to_arrow(self) -> Any: if self.method == 'and': - return reduce(lambda x, y: (x[0] & y[0], x[1] | y[1]), + return reduce(lambda x, y: x & y, [p.to_arrow() for p in self.literals]) if self.method == 'or': - return reduce(lambda x, y: (x[0] | y[0], x[1] | y[1]), + return reduce(lambda x, y: x | y, [p.to_arrow() for p in self.literals]) if self.method == 'startsWith': @@ -108,11 +108,10 @@ class Predicate: # Ensure the field is cast to string type string_field = field_ref.cast(pyarrow.string()) result = pyarrow_compute.starts_with(string_field, pattern) - return result, {self.field} + return result except Exception: # Fallback to True - return (pyarrow_dataset.field(self.field).is_valid() | pyarrow_dataset.field(self.field).is_null(), - {self.field}) + return pyarrow_dataset.field(self.field).is_valid() | pyarrow_dataset.field(self.field).is_null() if self.method == 'endsWith': pattern = self.literals[0] # For PyArrow compatibility @@ -121,11 +120,10 @@ class Predicate: # Ensure the field is cast to string type string_field = field_ref.cast(pyarrow.string()) result = pyarrow_compute.ends_with(string_field, pattern) - return result, {self.field} + return result except Exception: # Fallback to True - return (pyarrow_dataset.field(self.field).is_valid() | pyarrow_dataset.field(self.field).is_null(), - {self.field}) + return pyarrow_dataset.field(self.field).is_valid() | pyarrow_dataset.field(self.field).is_null() if self.method == 'contains': pattern = self.literals[0] # For PyArrow compatibility @@ -134,16 +132,15 @@ class Predicate: # Ensure the field is cast to string type string_field = field_ref.cast(pyarrow.string()) result = pyarrow_compute.match_substring(string_field, pattern) - return result, {self.field} + return result except Exception: # Fallback to True - return (pyarrow_dataset.field(self.field).is_valid() | pyarrow_dataset.field(self.field).is_null(), - {self.field}) + return pyarrow_dataset.field(self.field).is_valid() | pyarrow_dataset.field(self.field).is_null() field = pyarrow_dataset.field(self.field) tester = Predicate.testers.get(self.method) if tester: - return tester.test_by_arrow(field, self.literals), {self.field} + return tester.test_by_arrow(field, self.literals) raise ValueError("Unsupported predicate method: {}".format(self.method)) diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index fec59e4a04..5d59f942bd 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -19,7 +19,7 @@ import os from abc import ABC, abstractmethod from functools import partial -from typing import List, Optional, Tuple, Any +from typing import List, Optional, Tuple from pypaimon.common.core_options import CoreOptions from pypaimon.common.predicate import Predicate @@ -60,8 +60,7 @@ class SplitRead(ABC): self.table: FileStoreTable = table self.predicate = predicate - predicate_tuple = self._push_down_predicate() - self.push_down_predicate, self.predicate_fields = predicate_tuple if predicate_tuple else (None, None) + self.push_down_predicate = self._push_down_predicate() self.split = split self.value_arity = len(read_type) @@ -69,31 +68,25 @@ class SplitRead(ABC): self.read_fields = read_type if isinstance(self, MergeFileSplitRead): self.read_fields = self._create_key_value_fields(read_type) + self.schema_id_2_fields = {} - def _push_down_predicate(self) -> Any: + def _push_down_predicate(self) -> Optional[Predicate]: if self.predicate is None: return None elif self.table.is_primary_key_table: pk_predicate = trim_predicate_by_fields(self.predicate, self.table.primary_keys) if not pk_predicate: return None - return pk_predicate.to_arrow() + return pk_predicate else: - return self.predicate.to_arrow() + return self.predicate @abstractmethod def create_reader(self) -> RecordReader: """Create a record reader for the given split.""" def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool, read_fields: List[str]): - schema = self.table.schema_manager.get_schema(file.schema_id) - schema_field_names = set(field.name for field in schema.fields) - if self.table.is_primary_key_table: - schema_field_names.add('_SEQUENCE_NUMBER') - schema_field_names.add('_VALUE_KIND') - if self.predicate_fields and self.predicate_fields - schema_field_names: - return None - read_file_fields = [read_field for read_field in read_fields if read_field in schema_field_names] + (read_file_fields, read_arrow_predicate) = self._get_fields_and_predicate(file.schema_id, read_fields) file_path = file.file_path _, extension = os.path.splitext(file_path) @@ -102,14 +95,14 @@ class SplitRead(ABC): format_reader: RecordBatchReader if file_format == CoreOptions.FILE_FORMAT_AVRO: format_reader = FormatAvroReader(self.table.file_io, file_path, read_file_fields, - self.read_fields, self.push_down_predicate) + self.read_fields, read_arrow_predicate) elif file_format == CoreOptions.FILE_FORMAT_BLOB: blob_as_descriptor = CoreOptions.get_blob_as_descriptor(self.table.options) format_reader = FormatBlobReader(self.table.file_io, file_path, read_file_fields, - self.read_fields, self.push_down_predicate, blob_as_descriptor) + self.read_fields, read_arrow_predicate, blob_as_descriptor) elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format == CoreOptions.FILE_FORMAT_ORC: format_reader = FormatPyArrowReader(self.table.file_io, file_format, file_path, - read_file_fields, self.push_down_predicate) + read_file_fields, read_arrow_predicate) else: raise ValueError(f"Unexpected file format: {file_format}") @@ -122,6 +115,20 @@ class SplitRead(ABC): return DataFileBatchReader(format_reader, index_mapping, partition_info, None, self.table.table_schema.fields) + def _get_fields_and_predicate(self, schema_id: int, read_fields): + key = (schema_id, tuple(read_fields)) + if key not in self.schema_id_2_fields: + schema = self.table.schema_manager.get_schema(schema_id) + schema_field_names = set(field.name for field in schema.fields) + if self.table.is_primary_key_table: + schema_field_names.add('_SEQUENCE_NUMBER') + schema_field_names.add('_VALUE_KIND') + read_file_fields = [read_field for read_field in read_fields if read_field in schema_field_names] + read_predicate = trim_predicate_by_fields(self.push_down_predicate, read_file_fields) + read_arrow_predicate = read_predicate.to_arrow() if read_predicate else None + self.schema_id_2_fields[key] = (read_file_fields, read_arrow_predicate) + return self.schema_id_2_fields[key] + @abstractmethod def _get_all_data_fields(self): """Get all data fields""" diff --git a/paimon-python/pypaimon/tests/schema_evolution_read_test.py b/paimon-python/pypaimon/tests/schema_evolution_read_test.py index dde1f2c15f..046f84487d 100644 --- a/paimon-python/pypaimon/tests/schema_evolution_read_test.py +++ b/paimon-python/pypaimon/tests/schema_evolution_read_test.py @@ -195,11 +195,13 @@ class SchemaEvolutionReadTest(unittest.TestCase): table_read = read_builder.new_read() actual = table_read.to_arrow(splits) + # 'behavior' is not included in the file. In order to filter more conservatively, we choose to discard the + # filtering criteria for 'behavior' expected = pa.Table.from_pydict({ - 'user_id': [5, 8, 6], - 'item_id': [1005, 1008, 1006], - 'dt': ["p2", "p2", "p1"], - 'behavior': ["e", "h", "f"], + 'user_id': [1, 2, 4, 3, 5, 8, 6], + 'item_id': [1001, 1002, 1004, 1003, 1005, 1008, 1006], + 'dt': ["p1", "p1", "p1", "p2", "p2", "p2", "p1"], + 'behavior': [None, None, None, None, "e", "h", "f"], }, schema=pa_schema) self.assertEqual(expected, actual) # user_id filter
