This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 53bbba9b56 [python] Split read should discard predicate for other
fields
53bbba9b56 is described below
commit 53bbba9b5641598a59611a6f2b781e89c8bcedeb
Author: JingsongLi <[email protected]>
AuthorDate: Thu Oct 23 16:13:53 2025 +0800
[python] Split read should discard predicate for other fields
---
paimon-python/pypaimon/common/predicate.py | 21 +++++------
paimon-python/pypaimon/read/split_read.py | 41 +++++++++++++---------
.../pypaimon/tests/schema_evolution_read_test.py | 10 +++---
3 files changed, 39 insertions(+), 33 deletions(-)
diff --git a/paimon-python/pypaimon/common/predicate.py
b/paimon-python/pypaimon/common/predicate.py
index 89c82c9de2..9ae2cdfce3 100644
--- a/paimon-python/pypaimon/common/predicate.py
+++ b/paimon-python/pypaimon/common/predicate.py
@@ -94,10 +94,10 @@ class Predicate:
def to_arrow(self) -> Any:
if self.method == 'and':
- return reduce(lambda x, y: (x[0] & y[0], x[1] | y[1]),
+ return reduce(lambda x, y: x & y,
[p.to_arrow() for p in self.literals])
if self.method == 'or':
- return reduce(lambda x, y: (x[0] | y[0], x[1] | y[1]),
+ return reduce(lambda x, y: x | y,
[p.to_arrow() for p in self.literals])
if self.method == 'startsWith':
@@ -108,11 +108,10 @@ class Predicate:
# Ensure the field is cast to string type
string_field = field_ref.cast(pyarrow.string())
result = pyarrow_compute.starts_with(string_field, pattern)
- return result, {self.field}
+ return result
except Exception:
# Fallback to True
- return (pyarrow_dataset.field(self.field).is_valid() |
pyarrow_dataset.field(self.field).is_null(),
- {self.field})
+ return pyarrow_dataset.field(self.field).is_valid() |
pyarrow_dataset.field(self.field).is_null()
if self.method == 'endsWith':
pattern = self.literals[0]
# For PyArrow compatibility
@@ -121,11 +120,10 @@ class Predicate:
# Ensure the field is cast to string type
string_field = field_ref.cast(pyarrow.string())
result = pyarrow_compute.ends_with(string_field, pattern)
- return result, {self.field}
+ return result
except Exception:
# Fallback to True
- return (pyarrow_dataset.field(self.field).is_valid() |
pyarrow_dataset.field(self.field).is_null(),
- {self.field})
+ return pyarrow_dataset.field(self.field).is_valid() |
pyarrow_dataset.field(self.field).is_null()
if self.method == 'contains':
pattern = self.literals[0]
# For PyArrow compatibility
@@ -134,16 +132,15 @@ class Predicate:
# Ensure the field is cast to string type
string_field = field_ref.cast(pyarrow.string())
result = pyarrow_compute.match_substring(string_field, pattern)
- return result, {self.field}
+ return result
except Exception:
# Fallback to True
- return (pyarrow_dataset.field(self.field).is_valid() |
pyarrow_dataset.field(self.field).is_null(),
- {self.field})
+ return pyarrow_dataset.field(self.field).is_valid() |
pyarrow_dataset.field(self.field).is_null()
field = pyarrow_dataset.field(self.field)
tester = Predicate.testers.get(self.method)
if tester:
- return tester.test_by_arrow(field, self.literals), {self.field}
+ return tester.test_by_arrow(field, self.literals)
raise ValueError("Unsupported predicate method:
{}".format(self.method))
diff --git a/paimon-python/pypaimon/read/split_read.py
b/paimon-python/pypaimon/read/split_read.py
index fec59e4a04..5d59f942bd 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -19,7 +19,7 @@
import os
from abc import ABC, abstractmethod
from functools import partial
-from typing import List, Optional, Tuple, Any
+from typing import List, Optional, Tuple
from pypaimon.common.core_options import CoreOptions
from pypaimon.common.predicate import Predicate
@@ -60,8 +60,7 @@ class SplitRead(ABC):
self.table: FileStoreTable = table
self.predicate = predicate
- predicate_tuple = self._push_down_predicate()
- self.push_down_predicate, self.predicate_fields = predicate_tuple if
predicate_tuple else (None, None)
+ self.push_down_predicate = self._push_down_predicate()
self.split = split
self.value_arity = len(read_type)
@@ -69,31 +68,25 @@ class SplitRead(ABC):
self.read_fields = read_type
if isinstance(self, MergeFileSplitRead):
self.read_fields = self._create_key_value_fields(read_type)
+ self.schema_id_2_fields = {}
- def _push_down_predicate(self) -> Any:
+ def _push_down_predicate(self) -> Optional[Predicate]:
if self.predicate is None:
return None
elif self.table.is_primary_key_table:
pk_predicate = trim_predicate_by_fields(self.predicate,
self.table.primary_keys)
if not pk_predicate:
return None
- return pk_predicate.to_arrow()
+ return pk_predicate
else:
- return self.predicate.to_arrow()
+ return self.predicate
@abstractmethod
def create_reader(self) -> RecordReader:
"""Create a record reader for the given split."""
def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool,
read_fields: List[str]):
- schema = self.table.schema_manager.get_schema(file.schema_id)
- schema_field_names = set(field.name for field in schema.fields)
- if self.table.is_primary_key_table:
- schema_field_names.add('_SEQUENCE_NUMBER')
- schema_field_names.add('_VALUE_KIND')
- if self.predicate_fields and self.predicate_fields -
schema_field_names:
- return None
- read_file_fields = [read_field for read_field in read_fields if
read_field in schema_field_names]
+ (read_file_fields, read_arrow_predicate) =
self._get_fields_and_predicate(file.schema_id, read_fields)
file_path = file.file_path
_, extension = os.path.splitext(file_path)
@@ -102,14 +95,14 @@ class SplitRead(ABC):
format_reader: RecordBatchReader
if file_format == CoreOptions.FILE_FORMAT_AVRO:
format_reader = FormatAvroReader(self.table.file_io, file_path,
read_file_fields,
- self.read_fields,
self.push_down_predicate)
+ self.read_fields,
read_arrow_predicate)
elif file_format == CoreOptions.FILE_FORMAT_BLOB:
blob_as_descriptor =
CoreOptions.get_blob_as_descriptor(self.table.options)
format_reader = FormatBlobReader(self.table.file_io, file_path,
read_file_fields,
- self.read_fields,
self.push_down_predicate, blob_as_descriptor)
+ self.read_fields,
read_arrow_predicate, blob_as_descriptor)
elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format ==
CoreOptions.FILE_FORMAT_ORC:
format_reader = FormatPyArrowReader(self.table.file_io,
file_format, file_path,
- read_file_fields,
self.push_down_predicate)
+ read_file_fields,
read_arrow_predicate)
else:
raise ValueError(f"Unexpected file format: {file_format}")
@@ -122,6 +115,20 @@ class SplitRead(ABC):
return DataFileBatchReader(format_reader, index_mapping,
partition_info, None,
self.table.table_schema.fields)
+ def _get_fields_and_predicate(self, schema_id: int, read_fields):
+ key = (schema_id, tuple(read_fields))
+ if key not in self.schema_id_2_fields:
+ schema = self.table.schema_manager.get_schema(schema_id)
+ schema_field_names = set(field.name for field in schema.fields)
+ if self.table.is_primary_key_table:
+ schema_field_names.add('_SEQUENCE_NUMBER')
+ schema_field_names.add('_VALUE_KIND')
+ read_file_fields = [read_field for read_field in read_fields if
read_field in schema_field_names]
+ read_predicate =
trim_predicate_by_fields(self.push_down_predicate, read_file_fields)
+ read_arrow_predicate = read_predicate.to_arrow() if read_predicate
else None
+ self.schema_id_2_fields[key] = (read_file_fields,
read_arrow_predicate)
+ return self.schema_id_2_fields[key]
+
@abstractmethod
def _get_all_data_fields(self):
"""Get all data fields"""
diff --git a/paimon-python/pypaimon/tests/schema_evolution_read_test.py
b/paimon-python/pypaimon/tests/schema_evolution_read_test.py
index dde1f2c15f..046f84487d 100644
--- a/paimon-python/pypaimon/tests/schema_evolution_read_test.py
+++ b/paimon-python/pypaimon/tests/schema_evolution_read_test.py
@@ -195,11 +195,13 @@ class SchemaEvolutionReadTest(unittest.TestCase):
table_read = read_builder.new_read()
actual = table_read.to_arrow(splits)
+ # 'behavior' is not included in the file. In order to filter more
conservatively, we choose to discard the
+ # filtering criteria for 'behavior'
expected = pa.Table.from_pydict({
- 'user_id': [5, 8, 6],
- 'item_id': [1005, 1008, 1006],
- 'dt': ["p2", "p2", "p1"],
- 'behavior': ["e", "h", "f"],
+ 'user_id': [1, 2, 4, 3, 5, 8, 6],
+ 'item_id': [1001, 1002, 1004, 1003, 1005, 1008, 1006],
+ 'dt': ["p1", "p1", "p1", "p2", "p2", "p2", "p1"],
+ 'behavior': [None, None, None, None, "e", "h", "f"],
}, schema=pa_schema)
self.assertEqual(expected, actual)
# user_id filter