This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit fa4671c59cedb93d73799c7655e87401496efbb9
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Oct 20 10:32:16 2025 +0200

    [python] Filter manifest entry by advance to reduce memory (#6428)
---
 paimon-python/pypaimon/common/predicate_builder.py | 14 +++-
 .../pypaimon/manifest/manifest_file_manager.py     |  4 +-
 paimon-python/pypaimon/read/push_down_utils.py     | 78 +++++---------------
 .../pypaimon/read/scanner/full_starting_scanner.py | 83 +++++++---------------
 .../read/scanner/incremental_starting_scanner.py   |  8 +--
 paimon-python/pypaimon/read/table_read.py          | 13 ++--
 paimon-python/pypaimon/schema/table_schema.py      |  3 +
 paimon-python/pypaimon/table/row/generic_row.py    | 26 +++++--
 paimon-python/pypaimon/table/row/offset_row.py     | 12 ++--
 paimon-python/pypaimon/tests/predicates_test.py    |  4 +-
 .../pypaimon/tests/py36/rest_ao_read_write_test.py |  2 +-
 paimon-python/pypaimon/tests/reader_base_test.py   |  2 +-
 12 files changed, 95 insertions(+), 154 deletions(-)

diff --git a/paimon-python/pypaimon/common/predicate_builder.py 
b/paimon-python/pypaimon/common/predicate_builder.py
index bf668eed50..7b96bb6134 100644
--- a/paimon-python/pypaimon/common/predicate_builder.py
+++ b/paimon-python/pypaimon/common/predicate_builder.py
@@ -101,8 +101,13 @@ class PredicateBuilder:
         """Create a between predicate."""
         return self._build_predicate('between', field, [included_lower_bound, 
included_upper_bound])
 
-    def and_predicates(self, predicates: List[Predicate]) -> Predicate:
+    @staticmethod
+    def and_predicates(predicates: List[Predicate]) -> Optional[Predicate]:
         """Create an AND predicate from multiple predicates."""
+        if len(predicates) == 0:
+            return None
+        if len(predicates) == 1:
+            return predicates[0]
         return Predicate(
             method='and',
             index=None,
@@ -110,8 +115,13 @@ class PredicateBuilder:
             literals=predicates
         )
 
-    def or_predicates(self, predicates: List[Predicate]) -> Predicate:
+    @staticmethod
+    def or_predicates(predicates: List[Predicate]) -> Optional[Predicate]:
         """Create an OR predicate from multiple predicates."""
+        if len(predicates) == 0:
+            return None
+        if len(predicates) == 1:
+            return predicates[0]
         return Predicate(
             method='or',
             index=None,
diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py 
b/paimon-python/pypaimon/manifest/manifest_file_manager.py
index 9893b8659a..07e434dd37 100644
--- a/paimon-python/pypaimon/manifest/manifest_file_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py
@@ -41,7 +41,7 @@ class ManifestFileManager:
         self.primary_key_fields = 
self.table.table_schema.get_primary_key_fields()
         self.trimmed_primary_key_fields = 
self.table.table_schema.get_trimmed_primary_key_fields()
 
-    def read(self, manifest_file_name: str, bucket_filter=None) -> 
List[ManifestEntry]:
+    def read(self, manifest_file_name: str, manifest_entry_filter=None) -> 
List[ManifestEntry]:
         manifest_file_path = self.manifest_path / manifest_file_name
 
         entries = []
@@ -105,7 +105,7 @@ class ManifestFileManager:
                 total_buckets=record['_TOTAL_BUCKETS'],
                 file=file_meta
             )
-            if bucket_filter is not None and not bucket_filter(entry):
+            if manifest_entry_filter is not None and not 
manifest_entry_filter(entry):
                 continue
             entries.append(entry)
         return entries
diff --git a/paimon-python/pypaimon/read/push_down_utils.py 
b/paimon-python/pypaimon/read/push_down_utils.py
index 64e7c238f8..43892fb5e6 100644
--- a/paimon-python/pypaimon/read/push_down_utils.py
+++ b/paimon-python/pypaimon/read/push_down_utils.py
@@ -19,31 +19,29 @@
 from typing import Dict, List, Set
 
 from pypaimon.common.predicate import Predicate
+from pypaimon.common.predicate_builder import PredicateBuilder
 
 
-def to_partition_predicate(input_predicate: 'Predicate', all_fields: 
List[str], partition_keys: List[str]):
-    if not input_predicate or not partition_keys:
-        return None
-
-    predicates: list['Predicate'] = _split_and(input_predicate)
-    predicates = [element for element in predicates if 
_get_all_fields(element).issubset(partition_keys)]
-    new_predicate = Predicate(
-        method='and',
-        index=None,
-        field=None,
-        literals=predicates
-    )
-
-    part_to_index = {element: idx for idx, element in 
enumerate(partition_keys)}
+def filter_and_transform_predicate(input_predicate: Predicate, all_fields: 
List[str], fields: List[str]):
+    new_predicate = filter_predicate_by_fields(input_predicate, fields)
+    part_to_index = {element: idx for idx, element in enumerate(fields)}
     mapping: Dict[int, int] = {
         i: part_to_index.get(all_fields[i], -1)
         for i in range(len(all_fields))
     }
-
     return _change_index(new_predicate, mapping)
 
 
-def _split_and(input_predicate: 'Predicate'):
+def filter_predicate_by_fields(input_predicate: Predicate, fields: List[str]):
+    if not input_predicate or not fields:
+        return None
+
+    predicates: list[Predicate] = _split_and(input_predicate)
+    predicates = [element for element in predicates if 
_get_all_fields(element).issubset(fields)]
+    return PredicateBuilder.and_predicates(predicates)
+
+
+def _split_and(input_predicate: Predicate):
     if not input_predicate:
         return list()
 
@@ -53,38 +51,19 @@ def _split_and(input_predicate: 'Predicate'):
     return [input_predicate]
 
 
-def _change_index(input_predicate: 'Predicate', mapping: Dict[int, int]):
+def _change_index(input_predicate: Predicate, mapping: Dict[int, int]):
     if not input_predicate:
         return None
 
     if input_predicate.method == 'and' or input_predicate.method == 'or':
-        predicates: list['Predicate'] = input_predicate.literals
+        predicates: list[Predicate] = input_predicate.literals
         new_predicates = [_change_index(element, mapping) for element in 
predicates]
         return input_predicate.new_literals(new_predicates)
 
     return input_predicate.new_index(mapping[input_predicate.index])
 
 
-def extract_predicate_to_list(result: list, input_predicate: 'Predicate', 
keys: List[str]):
-    if not input_predicate or not keys:
-        return
-
-    if input_predicate.method == 'and':
-        for sub_predicate in input_predicate.literals:
-            extract_predicate_to_list(result, sub_predicate, keys)
-        return
-    elif input_predicate.method == 'or':
-        # condition: involved keys all belong to primary keys
-        involved_fields = _get_all_fields(input_predicate)
-        if involved_fields and involved_fields.issubset(keys):
-            result.append(input_predicate)
-        return
-
-    if input_predicate.field in keys:
-        result.append(input_predicate)
-
-
-def _get_all_fields(predicate: 'Predicate') -> Set[str]:
+def _get_all_fields(predicate: Predicate) -> Set[str]:
     if predicate.field is not None:
         return {predicate.field}
     involved_fields = set()
@@ -92,26 +71,3 @@ def _get_all_fields(predicate: 'Predicate') -> Set[str]:
         for sub_predicate in predicate.literals:
             involved_fields.update(_get_all_fields(sub_predicate))
     return involved_fields
-
-
-def extract_predicate_to_dict(result: Dict, input_predicate: 'Predicate', 
keys: List[str]):
-    if not input_predicate or not keys:
-        return
-
-    if input_predicate.method == 'and':
-        for sub_predicate in input_predicate.literals:
-            extract_predicate_to_dict(result, sub_predicate, keys)
-        return
-    elif input_predicate.method == 'or':
-        # ensure no recursive and/or
-        if not input_predicate.literals or any(p.field is None for p in 
input_predicate.literals):
-            return
-        # condition: only one key for 'or', and the key belongs to keys
-        involved_fields = {p.field for p in input_predicate.literals}
-        field = involved_fields.pop() if len(involved_fields) == 1 else None
-        if field is not None and field in keys:
-            result[field].append(input_predicate)
-        return
-
-    if input_predicate.field in keys:
-        result[input_predicate.field].append(input_predicate)
diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py 
b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
index a363a6fb6f..156b05cdb6 100644
--- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
@@ -20,7 +20,6 @@ from typing import Callable, List, Optional
 
 from pypaimon.common.core_options import CoreOptions
 from pypaimon.common.predicate import Predicate
-from pypaimon.common.predicate_builder import PredicateBuilder
 from pypaimon.manifest.manifest_file_manager import ManifestFileManager
 from pypaimon.manifest.manifest_list_manager import ManifestListManager
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
@@ -28,9 +27,7 @@ from pypaimon.manifest.schema.manifest_entry import 
ManifestEntry
 from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta
 from pypaimon.read.interval_partition import IntervalPartition, SortedRun
 from pypaimon.read.plan import Plan
-from pypaimon.read.push_down_utils import (extract_predicate_to_dict,
-                                           extract_predicate_to_list,
-                                           to_partition_predicate)
+from pypaimon.read.push_down_utils import (filter_and_transform_predicate)
 from pypaimon.read.scanner.starting_scanner import StartingScanner
 from pypaimon.read.split import Split
 from pypaimon.snapshot.snapshot_manager import SnapshotManager
@@ -49,14 +46,11 @@ class FullStartingScanner(StartingScanner):
         self.manifest_list_manager = ManifestListManager(table)
         self.manifest_file_manager = ManifestFileManager(table)
 
-        pk_conditions = []
-        trimmed_pk = [field.name for field in 
self.table.table_schema.get_trimmed_primary_key_fields()]
-        extract_predicate_to_list(pk_conditions, self.predicate, trimmed_pk)
-        self.primary_key_predicate = 
PredicateBuilder(self.table.fields).and_predicates(pk_conditions)
+        self.primary_key_predicate = filter_and_transform_predicate(
+            self.predicate, self.table.field_names, 
self.table.table_schema.get_trimmed_primary_keys())
 
-        partition_conditions = defaultdict(list)
-        extract_predicate_to_dict(partition_conditions, self.predicate, 
self.table.partition_keys)
-        self.partition_key_predicate = partition_conditions
+        self.partition_key_predicate = filter_and_transform_predicate(
+            self.predicate, self.table.field_names, self.table.partition_keys)
 
         self.target_split_size = 128 * 1024 * 1024
         self.open_file_cost = 4 * 1024 * 1024
@@ -82,29 +76,29 @@ class FullStartingScanner(StartingScanner):
         splits = self._apply_push_down_limit(splits)
         return Plan(splits)
 
-    def _read_manifest_files(self) -> List[ManifestFileMeta]:
+    def plan_files(self) -> List[ManifestEntry]:
         latest_snapshot = self.snapshot_manager.get_latest_snapshot()
         if not latest_snapshot:
             return []
         manifest_files = self.manifest_list_manager.read_all(latest_snapshot)
-        partition_predicate = to_partition_predicate(self.predicate, 
self.table.field_names, self.table.partition_keys)
+        return self.read_manifest_entries(manifest_files)
 
-        def test_predicate(file: ManifestFileMeta) -> bool:
-            if not partition_predicate:
+    def read_manifest_entries(self, manifest_files: List[ManifestFileMeta]) -> 
List[ManifestEntry]:
+        def filter_manifest_file(file: ManifestFileMeta) -> bool:
+            if not self.partition_key_predicate:
                 return True
-            return partition_predicate.test_by_simple_stats(
+            return self.partition_key_predicate.test_by_simple_stats(
                 file.partition_stats,
                 file.num_added_files + file.num_deleted_files)
 
-        return [file for file in manifest_files if test_predicate(file)]
-
-    def plan_files(self) -> List[ManifestEntry]:
-        manifest_files = self._read_manifest_files()
         deleted_entries = set()
         added_entries = []
         for manifest_file in manifest_files:
-            manifest_entries = 
self.manifest_file_manager.read(manifest_file.file_name,
-                                                               lambda row: 
self._bucket_filter(row))
+            if not filter_manifest_file(manifest_file):
+                continue
+            manifest_entries = self.manifest_file_manager.read(
+                manifest_file.file_name,
+                lambda row: self._filter_manifest_entry(row))
             for entry in manifest_entries:
                 if entry.kind == 0:
                     added_entries.append(entry)
@@ -115,8 +109,6 @@ class FullStartingScanner(StartingScanner):
             entry for entry in added_entries
             if (tuple(entry.partition.values), entry.bucket, 
entry.file.file_name) not in deleted_entries
         ]
-        if self.predicate:
-            file_entries = self._filter_by_predicate(file_entries)
         return file_entries
 
     def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 
'FullStartingScanner':
@@ -203,12 +195,6 @@ class FullStartingScanner(StartingScanner):
                 filtered_entries.append(entry)
         return filtered_entries
 
-    def _bucket_filter(self, entry: Optional[ManifestEntry]) -> bool:
-        bucket = entry.bucket
-        if self.only_read_real_buckets and bucket < 0:
-            return False
-        return True
-
     def _apply_push_down_limit(self, splits: List[Split]) -> List[Split]:
         if self.limit is None:
             return splits
@@ -224,45 +210,26 @@ class FullStartingScanner(StartingScanner):
 
         return limited_splits
 
-    def _filter_by_predicate(self, file_entries: List[ManifestEntry]) -> 
List[ManifestEntry]:
-        if not self.predicate:
-            return file_entries
-
-        filtered_files = []
-        for file_entry in file_entries:
-            if self.partition_key_predicate and not 
self._filter_by_partition(file_entry):
-                continue
-            if not self._filter_by_stats(file_entry):
-                continue
-            filtered_files.append(file_entry)
-
-        return filtered_files
-
-    def _filter_by_partition(self, file_entry: ManifestEntry) -> bool:
-        partition_dict = file_entry.partition.to_dict()
-        for field_name, conditions in self.partition_key_predicate.items():
-            partition_value = partition_dict[field_name]
-            for predicate in conditions:
-                if not predicate.test_by_value(partition_value):
-                    return False
-        return True
-
-    def _filter_by_stats(self, file_entry: ManifestEntry) -> bool:
-        if file_entry.kind != 0:
+    def _filter_manifest_entry(self, entry: ManifestEntry) -> bool:
+        if self.only_read_real_buckets and entry.bucket < 0:
+            return False
+        if self.partition_key_predicate and not 
self.partition_key_predicate.test(entry.partition):
             return False
         if self.table.is_primary_key_table:
             predicate = self.primary_key_predicate
-            stats = file_entry.file.key_stats
+            stats = entry.file.key_stats
         else:
             predicate = self.predicate
-            stats = file_entry.file.value_stats
+            stats = entry.file.value_stats
+        if not predicate:
+            return True
         return predicate.test_by_stats({
             "min_values": stats.min_values.to_dict(),
             "max_values": stats.max_values.to_dict(),
             "null_counts": {
                 stats.min_values.fields[i].name: stats.null_counts[i] for i in 
range(len(stats.min_values.fields))
             },
-            "row_count": file_entry.file.row_count,
+            "row_count": entry.file.row_count
         })
 
     def _create_append_only_splits(self, file_entries: List[ManifestEntry]) -> 
List['Split']:
diff --git 
a/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py 
b/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py
index ead58d260a..0139dafc40 100644
--- a/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py
@@ -43,13 +43,7 @@ class IncrementalStartingScanner(FullStartingScanner):
         for snapshot in snapshots_in_range:
             # Get manifest files for this snapshot
             manifest_files = self.manifest_list_manager.read_delta(snapshot)
-
-            # Read all entries from manifest files
-            for manifest_file in manifest_files:
-                entries = 
self.manifest_file_manager.read(manifest_file.file_name)
-                file_entries.extend(entries)
-        if self.predicate:
-            file_entries = self._filter_by_predicate(file_entries)
+            file_entries.extend(self.read_manifest_entries(manifest_files))
         return file_entries
 
     @staticmethod
diff --git a/paimon-python/pypaimon/read/table_read.py 
b/paimon-python/pypaimon/read/table_read.py
index 4c2c615d89..4bf07d37a5 100644
--- a/paimon-python/pypaimon/read/table_read.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -22,8 +22,7 @@ import pyarrow
 
 from pypaimon.common.core_options import CoreOptions
 from pypaimon.common.predicate import Predicate
-from pypaimon.common.predicate_builder import PredicateBuilder
-from pypaimon.read.push_down_utils import extract_predicate_to_list
+from pypaimon.read.push_down_utils import filter_predicate_by_fields
 from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
 from pypaimon.read.split import Split
 from pypaimon.read.split_read import (MergeFileSplitRead, RawFileSplitRead,
@@ -113,14 +112,10 @@ class TableRead:
         if self.predicate is None:
             return None
         elif self.table.is_primary_key_table:
-            result = []
-            extract_predicate_to_list(result, self.predicate, 
self.table.primary_keys)
-            if result:
-                # the field index is unused for arrow field
-                pk_predicates = 
(PredicateBuilder(self.table.fields).and_predicates(result)).to_arrow()
-                return pk_predicates
-            else:
+            pk_predicate = filter_predicate_by_fields(self.predicate, 
self.table.primary_keys)
+            if not pk_predicate:
                 return None
+            return pk_predicate.to_arrow()
         else:
             return self.predicate.to_arrow()
 
diff --git a/paimon-python/pypaimon/schema/table_schema.py 
b/paimon-python/pypaimon/schema/table_schema.py
index f74d713e9f..d1875a7b0b 100644
--- a/paimon-python/pypaimon/schema/table_schema.py
+++ b/paimon-python/pypaimon/schema/table_schema.py
@@ -170,3 +170,6 @@ class TableSchema:
                 "this will result in only one record in a partition")
         field_map = {field.name: field for field in self.fields}
         return [field_map[name] for name in adjusted if name in field_map]
+
+    def get_trimmed_primary_keys(self) -> List[str]:
+        return [field.name for field in self.get_trimmed_primary_key_fields()]
diff --git a/paimon-python/pypaimon/table/row/generic_row.py 
b/paimon-python/pypaimon/table/row/generic_row.py
index 5f8f9f6d9b..b409d3f2eb 100644
--- a/paimon-python/pypaimon/table/row/generic_row.py
+++ b/paimon-python/pypaimon/table/row/generic_row.py
@@ -23,19 +23,35 @@ from decimal import Decimal
 from typing import Any, List
 
 from pypaimon.schema.data_types import AtomicType, DataField, DataType
-from pypaimon.table.row.row_kind import RowKind
+from pypaimon.table.row.internal_row import InternalRow, RowKind
 from pypaimon.table.row.blob import BlobData
 
 
 @dataclass
-class GenericRow:
-    values: List[Any]
-    fields: List[DataField]
-    row_kind: RowKind = RowKind.INSERT
+class GenericRow(InternalRow):
+
+    def __init__(self, values: List[Any], fields: List[DataField], row_kind: 
RowKind = RowKind.INSERT):
+        self.values = values
+        self.fields = fields
+        self.row_kind = row_kind
 
     def to_dict(self):
         return {self.fields[i].name: self.values[i] for i in 
range(len(self.fields))}
 
+    def get_field(self, pos: int):
+        if pos >= len(self.values):
+            raise IndexError(f"Position {pos} is out of bounds for row arity 
{len(self.values)}")
+        return self.values[pos]
+
+    def is_null_at(self, pos: int) -> bool:
+        return self.get_field(pos) is None
+
+    def get_row_kind(self) -> RowKind:
+        return self.row_kind
+
+    def __len__(self) -> int:
+        return len(self.values)
+
 
 class GenericRowDeserializer:
     HEADER_SIZE_IN_BITS = 8
diff --git a/paimon-python/pypaimon/table/row/offset_row.py 
b/paimon-python/pypaimon/table/row/offset_row.py
index 90f23d2126..9a51a246c9 100644
--- a/paimon-python/pypaimon/table/row/offset_row.py
+++ b/paimon-python/pypaimon/table/row/offset_row.py
@@ -36,6 +36,12 @@ class OffsetRow(InternalRow):
             raise ValueError(f"Offset {self.offset} plus arity {self.arity} is 
out of row length {len(row_tuple)}")
         return self
 
+    def set_row_kind_byte(self, row_kind_byte: int) -> None:
+        """
+        Store RowKind as a byte and instantiate it lazily to avoid performance 
overhead.
+        """
+        self.row_kind_byte = row_kind_byte
+
     def get_field(self, pos: int):
         if pos >= self.arity:
             raise IndexError(f"Position {pos} is out of bounds for row arity 
{self.arity}")
@@ -47,11 +53,5 @@ class OffsetRow(InternalRow):
     def get_row_kind(self) -> RowKind:
         return RowKind(self.row_kind_byte)
 
-    def set_row_kind_byte(self, row_kind_byte: int) -> None:
-        """
-        Store RowKind as a byte and instantiate it lazily to avoid performance 
overhead.
-        """
-        self.row_kind_byte = row_kind_byte
-
     def __len__(self) -> int:
         return self.arity
diff --git a/paimon-python/pypaimon/tests/predicates_test.py 
b/paimon-python/pypaimon/tests/predicates_test.py
index 9ab1cfbb37..a3a0e3229c 100644
--- a/paimon-python/pypaimon/tests/predicates_test.py
+++ b/paimon-python/pypaimon/tests/predicates_test.py
@@ -419,7 +419,7 @@ class PredicateTest(unittest.TestCase):
         table_commit.close()
 
         # test filter by partition
-        predicate_builder: PredicateBuilder = 
table.new_read_builder().new_predicate_builder()
+        predicate_builder = table.new_read_builder().new_predicate_builder()
         p1 = predicate_builder.startswith('dt1', "p1")
         p2 = predicate_builder.is_in('dt1', ["p2"])
         p3 = predicate_builder.or_predicates([p1, p2])
@@ -433,7 +433,7 @@ class PredicateTest(unittest.TestCase):
         self.assertEqual(splits[1].partition.to_dict()["dt2"], 2)
 
         # test filter by stats
-        predicate_builder: PredicateBuilder = 
table.new_read_builder().new_predicate_builder()
+        predicate_builder = table.new_read_builder().new_predicate_builder()
         p1 = predicate_builder.equal('key1', 7)
         p2 = predicate_builder.is_in('key2', ["e", "f"])
         p3 = predicate_builder.or_predicates([p1, p2])
diff --git a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py 
b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
index fe6902a5f5..a184f64866 100644
--- a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
+++ b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
@@ -180,7 +180,7 @@ class RESTAOReadWritePy36Test(RESTBaseTest):
         latest_snapshot = SnapshotManager(table).get_latest_snapshot()
         manifest_files = 
table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot)
         manifest_entries = 
table_scan.starting_scanner.manifest_file_manager.read(
-            manifest_files[0].file_name, lambda row: 
table_scan.starting_scanner._bucket_filter(row))
+            manifest_files[0].file_name, lambda row: 
table_scan.starting_scanner._filter_manifest_entry(row))
         min_value_stats = 
manifest_entries[0].file.value_stats.min_values.values
         max_value_stats = 
manifest_entries[0].file.value_stats.max_values.values
         expected_min_values = [col[0].as_py() for col in expect_data]
diff --git a/paimon-python/pypaimon/tests/reader_base_test.py 
b/paimon-python/pypaimon/tests/reader_base_test.py
index 6bb2cdd675..d158c824ef 100644
--- a/paimon-python/pypaimon/tests/reader_base_test.py
+++ b/paimon-python/pypaimon/tests/reader_base_test.py
@@ -217,7 +217,7 @@ class ReaderBasicTest(unittest.TestCase):
         latest_snapshot = SnapshotManager(table).get_latest_snapshot()
         manifest_files = 
table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot)
         manifest_entries = 
table_scan.starting_scanner.manifest_file_manager.read(
-            manifest_files[0].file_name, lambda row: 
table_scan.starting_scanner._bucket_filter(row))
+            manifest_files[0].file_name, lambda row: 
table_scan.starting_scanner._filter_manifest_entry(row))
         min_value_stats = 
manifest_entries[0].file.value_stats.min_values.values
         max_value_stats = 
manifest_entries[0].file.value_stats.max_values.values
         expected_min_values = [col[0].as_py() for col in expect_data]

Reply via email to