This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 53bbba9b56 [python] Split read should discard predicate for other 
fields
53bbba9b56 is described below

commit 53bbba9b5641598a59611a6f2b781e89c8bcedeb
Author: JingsongLi <[email protected]>
AuthorDate: Thu Oct 23 16:13:53 2025 +0800

    [python] Split read should discard predicate for other fields
---
 paimon-python/pypaimon/common/predicate.py         | 21 +++++------
 paimon-python/pypaimon/read/split_read.py          | 41 +++++++++++++---------
 .../pypaimon/tests/schema_evolution_read_test.py   | 10 +++---
 3 files changed, 39 insertions(+), 33 deletions(-)

diff --git a/paimon-python/pypaimon/common/predicate.py 
b/paimon-python/pypaimon/common/predicate.py
index 89c82c9de2..9ae2cdfce3 100644
--- a/paimon-python/pypaimon/common/predicate.py
+++ b/paimon-python/pypaimon/common/predicate.py
@@ -94,10 +94,10 @@ class Predicate:
 
     def to_arrow(self) -> Any:
         if self.method == 'and':
-            return reduce(lambda x, y: (x[0] & y[0], x[1] | y[1]),
+            return reduce(lambda x, y: x & y,
                           [p.to_arrow() for p in self.literals])
         if self.method == 'or':
-            return reduce(lambda x, y: (x[0] | y[0], x[1] | y[1]),
+            return reduce(lambda x, y: x | y,
                           [p.to_arrow() for p in self.literals])
 
         if self.method == 'startsWith':
@@ -108,11 +108,10 @@ class Predicate:
                 # Ensure the field is cast to string type
                 string_field = field_ref.cast(pyarrow.string())
                 result = pyarrow_compute.starts_with(string_field, pattern)
-                return result, {self.field}
+                return result
             except Exception:
                 # Fallback to True
-                return (pyarrow_dataset.field(self.field).is_valid() | 
pyarrow_dataset.field(self.field).is_null(),
-                        {self.field})
+                return pyarrow_dataset.field(self.field).is_valid() | 
pyarrow_dataset.field(self.field).is_null()
         if self.method == 'endsWith':
             pattern = self.literals[0]
             # For PyArrow compatibility
@@ -121,11 +120,10 @@ class Predicate:
                 # Ensure the field is cast to string type
                 string_field = field_ref.cast(pyarrow.string())
                 result = pyarrow_compute.ends_with(string_field, pattern)
-                return result, {self.field}
+                return result
             except Exception:
                 # Fallback to True
-                return (pyarrow_dataset.field(self.field).is_valid() | 
pyarrow_dataset.field(self.field).is_null(),
-                        {self.field})
+                return pyarrow_dataset.field(self.field).is_valid() | 
pyarrow_dataset.field(self.field).is_null()
         if self.method == 'contains':
             pattern = self.literals[0]
             # For PyArrow compatibility
@@ -134,16 +132,15 @@ class Predicate:
                 # Ensure the field is cast to string type
                 string_field = field_ref.cast(pyarrow.string())
                 result = pyarrow_compute.match_substring(string_field, pattern)
-                return result, {self.field}
+                return result
             except Exception:
                 # Fallback to True
-                return (pyarrow_dataset.field(self.field).is_valid() | 
pyarrow_dataset.field(self.field).is_null(),
-                        {self.field})
+                return pyarrow_dataset.field(self.field).is_valid() | 
pyarrow_dataset.field(self.field).is_null()
 
         field = pyarrow_dataset.field(self.field)
         tester = Predicate.testers.get(self.method)
         if tester:
-            return tester.test_by_arrow(field, self.literals), {self.field}
+            return tester.test_by_arrow(field, self.literals)
 
         raise ValueError("Unsupported predicate method: 
{}".format(self.method))
 
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index fec59e4a04..5d59f942bd 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -19,7 +19,7 @@
 import os
 from abc import ABC, abstractmethod
 from functools import partial
-from typing import List, Optional, Tuple, Any
+from typing import List, Optional, Tuple
 
 from pypaimon.common.core_options import CoreOptions
 from pypaimon.common.predicate import Predicate
@@ -60,8 +60,7 @@ class SplitRead(ABC):
 
         self.table: FileStoreTable = table
         self.predicate = predicate
-        predicate_tuple = self._push_down_predicate()
-        self.push_down_predicate, self.predicate_fields = predicate_tuple if 
predicate_tuple else (None, None)
+        self.push_down_predicate = self._push_down_predicate()
         self.split = split
         self.value_arity = len(read_type)
 
@@ -69,31 +68,25 @@ class SplitRead(ABC):
         self.read_fields = read_type
         if isinstance(self, MergeFileSplitRead):
             self.read_fields = self._create_key_value_fields(read_type)
+        self.schema_id_2_fields = {}
 
-    def _push_down_predicate(self) -> Any:
+    def _push_down_predicate(self) -> Optional[Predicate]:
         if self.predicate is None:
             return None
         elif self.table.is_primary_key_table:
             pk_predicate = trim_predicate_by_fields(self.predicate, 
self.table.primary_keys)
             if not pk_predicate:
                 return None
-            return pk_predicate.to_arrow()
+            return pk_predicate
         else:
-            return self.predicate.to_arrow()
+            return self.predicate
 
     @abstractmethod
     def create_reader(self) -> RecordReader:
         """Create a record reader for the given split."""
 
     def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool, 
read_fields: List[str]):
-        schema = self.table.schema_manager.get_schema(file.schema_id)
-        schema_field_names = set(field.name for field in schema.fields)
-        if self.table.is_primary_key_table:
-            schema_field_names.add('_SEQUENCE_NUMBER')
-            schema_field_names.add('_VALUE_KIND')
-        if self.predicate_fields and self.predicate_fields - 
schema_field_names:
-            return None
-        read_file_fields = [read_field for read_field in read_fields if 
read_field in schema_field_names]
+        (read_file_fields, read_arrow_predicate) = 
self._get_fields_and_predicate(file.schema_id, read_fields)
 
         file_path = file.file_path
         _, extension = os.path.splitext(file_path)
@@ -102,14 +95,14 @@ class SplitRead(ABC):
         format_reader: RecordBatchReader
         if file_format == CoreOptions.FILE_FORMAT_AVRO:
             format_reader = FormatAvroReader(self.table.file_io, file_path, 
read_file_fields,
-                                             self.read_fields, 
self.push_down_predicate)
+                                             self.read_fields, 
read_arrow_predicate)
         elif file_format == CoreOptions.FILE_FORMAT_BLOB:
             blob_as_descriptor = 
CoreOptions.get_blob_as_descriptor(self.table.options)
             format_reader = FormatBlobReader(self.table.file_io, file_path, 
read_file_fields,
-                                             self.read_fields, 
self.push_down_predicate, blob_as_descriptor)
+                                             self.read_fields, 
read_arrow_predicate, blob_as_descriptor)
         elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format == 
CoreOptions.FILE_FORMAT_ORC:
             format_reader = FormatPyArrowReader(self.table.file_io, 
file_format, file_path,
-                                                read_file_fields, 
self.push_down_predicate)
+                                                read_file_fields, 
read_arrow_predicate)
         else:
             raise ValueError(f"Unexpected file format: {file_format}")
 
@@ -122,6 +115,20 @@ class SplitRead(ABC):
             return DataFileBatchReader(format_reader, index_mapping, 
partition_info, None,
                                        self.table.table_schema.fields)
 
+    def _get_fields_and_predicate(self, schema_id: int, read_fields):
+        key = (schema_id, tuple(read_fields))
+        if key not in self.schema_id_2_fields:
+            schema = self.table.schema_manager.get_schema(schema_id)
+            schema_field_names = set(field.name for field in schema.fields)
+            if self.table.is_primary_key_table:
+                schema_field_names.add('_SEQUENCE_NUMBER')
+                schema_field_names.add('_VALUE_KIND')
+            read_file_fields = [read_field for read_field in read_fields if 
read_field in schema_field_names]
+            read_predicate = 
trim_predicate_by_fields(self.push_down_predicate, read_file_fields)
+            read_arrow_predicate = read_predicate.to_arrow() if read_predicate 
else None
+            self.schema_id_2_fields[key] = (read_file_fields, 
read_arrow_predicate)
+        return self.schema_id_2_fields[key]
+
     @abstractmethod
     def _get_all_data_fields(self):
         """Get all data fields"""
diff --git a/paimon-python/pypaimon/tests/schema_evolution_read_test.py 
b/paimon-python/pypaimon/tests/schema_evolution_read_test.py
index dde1f2c15f..046f84487d 100644
--- a/paimon-python/pypaimon/tests/schema_evolution_read_test.py
+++ b/paimon-python/pypaimon/tests/schema_evolution_read_test.py
@@ -195,11 +195,13 @@ class SchemaEvolutionReadTest(unittest.TestCase):
 
         table_read = read_builder.new_read()
         actual = table_read.to_arrow(splits)
+        # 'behavior' is not included in the file. In order to filter more 
conservatively, we choose to discard the
+        # filtering criteria for 'behavior'
         expected = pa.Table.from_pydict({
-            'user_id': [5, 8, 6],
-            'item_id': [1005, 1008, 1006],
-            'dt': ["p2", "p2", "p1"],
-            'behavior': ["e", "h", "f"],
+            'user_id': [1, 2, 4, 3, 5, 8, 6],
+            'item_id': [1001, 1002, 1004, 1003, 1005, 1008, 1006],
+            'dt': ["p1", "p1", "p1", "p2", "p2", "p2", "p1"],
+            'behavior': [None, None, None, None, "e", "h", "f"],
         }, schema=pa_schema)
         self.assertEqual(expected, actual)
         # user_id filter

Reply via email to