This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8012ff7deb [python] Support filter by _ROW_ID for data evolution 
(#7252)
8012ff7deb is described below

commit 8012ff7deb53a5c5848957d606cf95ee75b10a8e
Author: XiaoHongbo <[email protected]>
AuthorDate: Wed Feb 11 11:57:57 2026 +0800

    [python] Support filter by _ROW_ID for data evolution (#7252)
---
 paimon-python/pypaimon/globalindex/range.py        | 16 ++++
 paimon-python/pypaimon/read/push_down_utils.py     | 36 +++++++++
 .../pypaimon/read/scanner/file_scanner.py          | 87 ++++++++++++++++++++-
 .../pypaimon/tests/data_evolution_test.py          | 88 ++++++++++++++++++++++
 paimon-python/pypaimon/tests/range_test.py         | 86 +++++++++++++++++++++
 .../pypaimon/tests/reader_predicate_test.py        | 24 ++++++
 6 files changed, 335 insertions(+), 2 deletions(-)

diff --git a/paimon-python/pypaimon/globalindex/range.py 
b/paimon-python/pypaimon/globalindex/range.py
index 19b9b40e94..0d65e98974 100644
--- a/paimon-python/pypaimon/globalindex/range.py
+++ b/paimon-python/pypaimon/globalindex/range.py
@@ -75,6 +75,22 @@ class Range:
 
         return result
 
+    @staticmethod
+    def to_ranges(values: List[int]) -> List['Range']:
+        if not values:
+            return []
+        sorted_ids = sorted(values)
+        result = []
+        range_start = sorted_ids[0]
+        range_end = range_start
+        for current in sorted_ids[1:]:
+            if current != range_end + 1:
+                result.append(Range(range_start, range_end))
+                range_start = current
+            range_end = current
+        result.append(Range(range_start, range_end))
+        return result
+
     @staticmethod
     def intersect(start1: int, end1: int, start2: int, end2: int) -> bool:
         """Check if two ranges intersect."""
diff --git a/paimon-python/pypaimon/read/push_down_utils.py 
b/paimon-python/pypaimon/read/push_down_utils.py
index 7ad7e53acc..47f21f3869 100644
--- a/paimon-python/pypaimon/read/push_down_utils.py
+++ b/paimon-python/pypaimon/read/push_down_utils.py
@@ -88,3 +88,39 @@ def _get_all_fields(predicate: Predicate) -> Set[str]:
         for sub_predicate in predicate.literals:
             involved_fields.update(_get_all_fields(sub_predicate))
     return involved_fields
+
+
+def remove_row_id_filter(predicate: Predicate) -> Optional[Predicate]:
+    from pypaimon.table.special_fields import SpecialFields
+
+    if not predicate:
+        return None
+    if predicate.field == SpecialFields.ROW_ID.name:
+        return None
+    if predicate.method == "and":
+        parts = _split_and(predicate)
+        non_row_id = [
+            p for p in parts
+            if _get_all_fields(p) != {SpecialFields.ROW_ID.name}
+        ]
+        if not non_row_id:
+            return None
+        filtered = []
+        for p in non_row_id:
+            r = remove_row_id_filter(p)
+            if r is None:
+                return None
+            filtered.append(r)
+        return PredicateBuilder.and_predicates(filtered)
+    if predicate.method == "or":
+        new_children = []
+        for c in predicate.literals or []:
+            r = remove_row_id_filter(c)
+            if r is not None:
+                new_children.append(r)
+        if not new_children:
+            return None
+        if len(new_children) == 1:
+            return new_children[0]
+        return PredicateBuilder.or_predicates(new_children)
+    return predicate
diff --git a/paimon-python/pypaimon/read/scanner/file_scanner.py 
b/paimon-python/pypaimon/read/scanner/file_scanner.py
index b5d13e561d..8167a743dd 100755
--- a/paimon-python/pypaimon/read/scanner/file_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/file_scanner.py
@@ -31,7 +31,10 @@ from pypaimon.manifest.manifest_list_manager import 
ManifestListManager
 from pypaimon.manifest.schema.manifest_entry import ManifestEntry
 from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta
 from pypaimon.read.plan import Plan
-from pypaimon.read.push_down_utils import (trim_and_transform_predicate)
+from pypaimon.read.push_down_utils import (
+    remove_row_id_filter,
+    trim_and_transform_predicate,
+)
 from pypaimon.read.scanner.append_table_split_generator import 
AppendTableSplitGenerator
 from pypaimon.read.scanner.data_evolution_split_generator import 
DataEvolutionSplitGenerator
 from pypaimon.read.scanner.primary_key_table_split_generator import 
PrimaryKeyTableSplitGenerator
@@ -41,6 +44,53 @@ from pypaimon.table.bucket_mode import BucketMode
 from pypaimon.manifest.simple_stats_evolutions import SimpleStatsEvolutions
 
 
+def _row_ranges_from_predicate(predicate: Optional[Predicate]) -> 
Optional[List]:
+    from pypaimon.globalindex.range import Range
+    from pypaimon.table.special_fields import SpecialFields
+
+    if predicate is None:
+        return None
+
+    def visit(p: Predicate):
+        if p.method == 'and':
+            result = None
+            for child in p.literals:
+                sub = visit(child)
+                if sub is None:
+                    continue
+                result = Range.and_(result, sub) if result is not None else sub
+                if not result:
+                    return result
+            return result
+        if p.method == 'or':
+            parts = []
+            for child in p.literals:
+                sub = visit(child)
+                if sub is None:
+                    return None
+                parts.extend(sub)
+            if not parts:
+                return []
+            return Range.sort_and_merge_overlap(parts, merge=True, 
adjacent=True)
+        if p.field != SpecialFields.ROW_ID.name:
+            return None
+        if p.method == 'equal':
+            if not p.literals:
+                return []
+            return Range.to_ranges([int(p.literals[0])])
+        if p.method == 'in':
+            if not p.literals:
+                return []
+            return Range.to_ranges([int(x) for x in p.literals])
+        if p.method == 'between':
+            if not p.literals or len(p.literals) < 2:
+                return []
+            return [Range(int(p.literals[0]), int(p.literals[1]))]
+        return None
+
+    return visit(predicate)
+
+
 def _filter_manifest_files_by_row_ranges(
         manifest_files: List[ManifestFileMeta],
         row_ranges: List) -> List[ManifestFileMeta]:
@@ -89,6 +139,31 @@ def _filter_manifest_files_by_row_ranges(
     return filtered_files
 
 
+def _filter_manifest_entries_by_row_ranges(
+        entries: List[ManifestEntry],
+        row_ranges: List) -> List[ManifestEntry]:
+    from pypaimon.globalindex.range import Range
+
+    if not row_ranges:
+        return []
+
+    filtered = []
+    for entry in entries:
+        first_row_id = entry.file.first_row_id
+        if first_row_id is None:
+            filtered.append(entry)
+            continue
+        file_range = Range(
+            first_row_id,
+            first_row_id + entry.file.row_count - 1
+        )
+        for r in row_ranges:
+            if file_range.overlaps(r):
+                filtered.append(entry)
+                break
+    return filtered
+
+
 class FileScanner:
     def __init__(
         self,
@@ -103,6 +178,7 @@ class FileScanner:
         self.table: FileStoreTable = table
         self.manifest_scanner = manifest_scanner
         self.predicate = predicate
+        self.predicate_for_stats = remove_row_id_filter(predicate) if 
predicate else None
         self.limit = limit
         self.vector_search = vector_search
 
@@ -196,6 +272,8 @@ class FileScanner:
             row_ranges = global_index_result.results().to_range_list()
             if isinstance(global_index_result, ScoredGlobalIndexResult):
                 score_getter = global_index_result.score_getter()
+        if row_ranges is None and self.predicate is not None:
+            row_ranges = _row_ranges_from_predicate(self.predicate)
 
         manifest_files = self.manifest_scanner()
 
@@ -205,6 +283,9 @@ class FileScanner:
 
         entries = self.read_manifest_entries(manifest_files)
 
+        if row_ranges is not None:
+            entries = _filter_manifest_entries_by_row_ranges(entries, 
row_ranges)
+
         return entries, DataEvolutionSplitGenerator(
             self.table,
             self.target_split_size,
@@ -353,6 +434,8 @@ class FileScanner:
         else:
             if not self.predicate:
                 return True
+            if self.predicate_for_stats is None:
+                return True
             if entry.file.value_stats_cols is None and entry.file.write_cols 
is not None:
                 stats_fields = entry.file.write_cols
             else:
@@ -362,7 +445,7 @@ class FileScanner:
                 entry.file.row_count,
                 stats_fields
             )
-            return self.predicate.test_by_simple_stats(
+            return self.predicate_for_stats.test_by_simple_stats(
                 evolved_stats,
                 entry.file.row_count
             )
diff --git a/paimon-python/pypaimon/tests/data_evolution_test.py 
b/paimon-python/pypaimon/tests/data_evolution_test.py
index a9f0de508b..cfb09a0caf 100644
--- a/paimon-python/pypaimon/tests/data_evolution_test.py
+++ b/paimon-python/pypaimon/tests/data_evolution_test.py
@@ -18,6 +18,7 @@ limitations under the License.
 import os
 import tempfile
 import unittest
+from types import SimpleNamespace
 
 import pyarrow as pa
 
@@ -587,6 +588,93 @@ class DataEvolutionTest(unittest.TestCase):
             '_SEQUENCE_NUMBER must be non-nullable per SpecialFields',
         )
 
+        rb_with_row_id = table.new_read_builder().with_projection(['f0', 'f1', 
'f2', '_ROW_ID'])
+        pb = rb_with_row_id.new_predicate_builder()
+        rb_eq0 = table.new_read_builder().with_filter(pb.equal('_ROW_ID', 0))
+        result_eq0 = 
rb_eq0.new_read().to_arrow(rb_eq0.new_scan().plan().splits())
+        self.assertEqual(result_eq0, pa.Table.from_pydict(
+            {'f0': [1], 'f1': ['a'], 'f2': ['b']}, schema=simple_pa_schema))
+        rb_eq1 = table.new_read_builder().with_filter(pb.equal('_ROW_ID', 1))
+        result_eq1 = 
rb_eq1.new_read().to_arrow(rb_eq1.new_scan().plan().splits())
+        self.assertEqual(result_eq1, pa.Table.from_pydict(
+            {'f0': [2], 'f1': ['c'], 'f2': ['d']}, schema=simple_pa_schema))
+        rb_in = table.new_read_builder().with_filter(pb.is_in('_ROW_ID', [0, 
1]))
+        result_in = rb_in.new_read().to_arrow(rb_in.new_scan().plan().splits())
+        self.assertEqual(result_in, expect)
+
+    def test_filter_by_row_id(self):
+        simple_pa_schema = pa.schema([('f0', pa.int32())])
+        schema = Schema.from_pyarrow_schema(
+            simple_pa_schema,
+            options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 
'true'},
+        )
+        self.catalog.create_table('default.test_row_id_filter_empty_and_or', 
schema, False)
+        table = 
self.catalog.get_table('default.test_row_id_filter_empty_and_or')
+        write_builder = table.new_batch_write_builder()
+
+        # Commit 1: _ROW_ID 0, 1 with f0=1, 2
+        w = write_builder.new_write().with_write_type(['f0'])
+        c = write_builder.new_commit()
+        w.write_arrow(pa.Table.from_pydict(
+            {'f0': [1, 2]}, schema=pa.schema([('f0', pa.int32())])))
+        cmts = w.prepare_commit()
+        for msg in cmts:
+            for nf in msg.new_files:
+                nf.first_row_id = 0
+        c.commit(cmts)
+        w.close()
+        c.close()
+
+        # Commit 2: _ROW_ID 2, 3 with f0=101, 102
+        w = write_builder.new_write().with_write_type(['f0'])
+        c = write_builder.new_commit()
+        w.write_arrow(pa.Table.from_pydict(
+            {'f0': [101, 102]}, schema=pa.schema([('f0', pa.int32())])))
+        cmts = w.prepare_commit()
+        for msg in cmts:
+            for nf in msg.new_files:
+                nf.first_row_id = 2
+        c.commit(cmts)
+        w.close()
+        c.close()
+
+        rb_with_row_id = table.new_read_builder().with_projection(['f0', 
'_ROW_ID'])
+        pb = rb_with_row_id.new_predicate_builder()
+
+        # 1. Non-existent _ROW_ID -> empty
+        rb_eq999 = table.new_read_builder().with_filter(pb.equal('_ROW_ID', 
999))
+        result_eq999 = 
rb_eq999.new_read().to_arrow(rb_eq999.new_scan().plan().splits())
+        self.assertEqual(len(result_eq999), 0, "Non-existent _ROW_ID should 
return empty")
+
+        # 2. AND: _ROW_ID=0 AND f0=1 -> 1 row
+        rb_and = table.new_read_builder().with_filter(
+            pb.and_predicates([pb.equal('_ROW_ID', 0), pb.equal('f0', 1)])
+        )
+        result_and = 
rb_and.new_read().to_arrow(rb_and.new_scan().plan().splits())
+        self.assertEqual(len(result_and), 1)
+        self.assertEqual(result_and['f0'][0].as_py(), 1)
+
+        # 3. OR: _ROW_ID=0 OR f0>100 -> at least row with _ROW_ID=0 and all 
f0>100
+        rb_or = table.new_read_builder().with_filter(
+            pb.or_predicates([pb.equal('_ROW_ID', 0), pb.greater_than('f0', 
100)])
+        )
+        result_or = rb_or.new_read().to_arrow(rb_or.new_scan().plan().splits())
+        f0_vals = set(result_or['f0'][i].as_py() for i in 
range(len(result_or)))
+        self.assertGreaterEqual(len(result_or), 3, "OR should return _ROW_ID=0 
row and f0>100 rows")
+        self.assertIn(1, f0_vals, "_ROW_ID=0 row has f0=1")
+        self.assertIn(101, f0_vals)
+        self.assertIn(102, f0_vals)
+
+    def test_filter_manifest_entries_by_row_ranges(self):
+        from pypaimon.read.scanner.file_scanner import 
_filter_manifest_entries_by_row_ranges
+
+        entry_0 = SimpleNamespace(file=SimpleNamespace(first_row_id=0, 
row_count=1))
+        entries = [entry_0]
+        row_ranges = []
+
+        filtered = _filter_manifest_entries_by_row_ranges(entries, row_ranges)
+        self.assertEqual(filtered, [], "empty row_ranges must return no 
entries, not all entries")
+
     def test_more_data(self):
         simple_pa_schema = pa.schema([
             ('f0', pa.int32()),
diff --git a/paimon-python/pypaimon/tests/range_test.py 
b/paimon-python/pypaimon/tests/range_test.py
new file mode 100644
index 0000000000..f3cbea9f2e
--- /dev/null
+++ b/paimon-python/pypaimon/tests/range_test.py
@@ -0,0 +1,86 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import unittest
+
+from pypaimon.common.predicate import Predicate
+from pypaimon.globalindex.range import Range
+from pypaimon.read.scanner.file_scanner import _row_ranges_from_predicate
+from pypaimon.table.special_fields import SpecialFields
+
+
+class RangeTest(unittest.TestCase):
+
+    def test_to_ranges(self):
+        assert Range.to_ranges([]) == []
+        assert Range.to_ranges([5]) == [Range(5, 5)]
+        assert Range.to_ranges([1, 2, 3]) == [Range(1, 3)]
+        assert Range.to_ranges([1, 3, 5]) == [
+            Range(1, 1), Range(3, 3), Range(5, 5)
+        ]
+        assert Range.to_ranges([1, 1, 2]) == [Range(1, 1), Range(1, 2)]
+
+    def test_row_ranges_from_predicate(self):
+        assert _row_ranges_from_predicate(None) is None
+
+        pred_eq = Predicate('equal', 0, SpecialFields.ROW_ID.name, [5])
+        assert _row_ranges_from_predicate(pred_eq) == [Range(5, 5)]
+
+        pred_in = Predicate('in', 0, SpecialFields.ROW_ID.name, [10])
+        assert _row_ranges_from_predicate(pred_in) == [Range(10, 10)]
+        pred_in_multi = Predicate('in', 0, SpecialFields.ROW_ID.name, [1, 2, 
3, 5, 6])
+        assert _row_ranges_from_predicate(pred_in_multi) == [Range(1, 3), 
Range(5, 6)]
+
+        pred_between = Predicate('between', 0, SpecialFields.ROW_ID.name, [10, 
20])
+        result = _row_ranges_from_predicate(pred_between)
+        assert result is not None and result == [Range(10, 20)]
+
+        pred_other = Predicate('equal', 0, 'other_field', [5])
+        assert _row_ranges_from_predicate(pred_other) is None
+
+        pred_gt = Predicate('greaterThan', 0, SpecialFields.ROW_ID.name, [10])
+        assert _row_ranges_from_predicate(pred_gt) is None
+
+        assert _row_ranges_from_predicate(
+            Predicate('equal', 0, SpecialFields.ROW_ID.name, [])
+        ) == []
+        assert _row_ranges_from_predicate(
+            Predicate('in', 0, SpecialFields.ROW_ID.name, [])
+        ) == []
+        assert _row_ranges_from_predicate(
+            Predicate('between', 0, SpecialFields.ROW_ID.name, [10])
+        ) == []
+
+        pred_eq5 = Predicate('equal', 0, SpecialFields.ROW_ID.name, [5])
+        pred_between_1_10 = Predicate('between', 0, SpecialFields.ROW_ID.name, 
[1, 10])
+        pred_and = Predicate('and', None, None, [pred_eq5, pred_between_1_10])
+        assert _row_ranges_from_predicate(pred_and) == [Range(5, 5)]
+        pred_eq3 = Predicate('equal', 0, SpecialFields.ROW_ID.name, [3])
+        pred_and_no = Predicate('and', None, None, [pred_eq5, pred_eq3])
+        assert _row_ranges_from_predicate(pred_and_no) == []
+
+        pred_or = Predicate('or', None, None, [pred_eq5, pred_eq3])
+        assert _row_ranges_from_predicate(pred_or) == [Range(3, 3), Range(5, 
5)]
+        pred_between_1_5 = Predicate('between', 0, SpecialFields.ROW_ID.name, 
[1, 5])
+        pred_between_3_7 = Predicate('between', 0, SpecialFields.ROW_ID.name, 
[3, 7])
+        pred_or_overlap = Predicate('or', None, None, [pred_between_1_5, 
pred_between_3_7])
+        assert _row_ranges_from_predicate(pred_or_overlap) == [Range(1, 7)]
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/paimon-python/pypaimon/tests/reader_predicate_test.py 
b/paimon-python/pypaimon/tests/reader_predicate_test.py
index 4c52fcdc41..9fcc18df22 100644
--- a/paimon-python/pypaimon/tests/reader_predicate_test.py
+++ b/paimon-python/pypaimon/tests/reader_predicate_test.py
@@ -25,8 +25,10 @@ import pyarrow as pa
 
 from pypaimon import CatalogFactory
 from pypaimon import Schema
+from pypaimon.common.predicate_builder import PredicateBuilder
 from pypaimon.read import push_down_utils
 from pypaimon.read.split import Split
+from pypaimon.schema.data_types import AtomicType, DataField
 
 
 class ReaderPredicateTest(unittest.TestCase):
@@ -91,3 +93,25 @@ class ReaderPredicateTest(unittest.TestCase):
         self.assertEqual(len(pred.literals), 2)
         self.assertEqual(pred.literals[0].field, 'pt')
         self.assertEqual(pred.literals[1].field, 'pt')
+
+    def test_remove_row_id_filter(self):
+        fields = [
+            DataField(0, '_ROW_ID', AtomicType('BIGINT')),
+            DataField(1, 'f0', AtomicType('INT')),
+        ]
+        pb = PredicateBuilder(fields)
+        and_pred = pb.and_predicates([pb.equal('_ROW_ID', 1), 
pb.greater_than('f0', 5)])
+        result = push_down_utils.remove_row_id_filter(and_pred)
+        self.assertIsNotNone(result)
+        self.assertEqual(result.field, 'f0')
+        self.assertEqual(result.method, 'greaterThan')
+        or_mixed = pb.or_predicates([pb.equal('_ROW_ID', 1), 
pb.greater_than('f0', 5)])
+        result = push_down_utils.remove_row_id_filter(or_mixed)
+        self.assertIsNotNone(result, "OR: strip _ROW_ID child, keep f0>5 (same 
as Java)")
+        self.assertEqual(result.field, 'f0')
+        self.assertEqual(result.method, 'greaterThan')
+        or_no_row_id = pb.or_predicates([pb.greater_than('f0', 5), 
pb.less_than('f0', 10)])
+        result = push_down_utils.remove_row_id_filter(or_no_row_id)
+        self.assertIsNotNone(result)
+        self.assertEqual(result.method, 'or')
+        self.assertEqual(len(result.literals), 2)

Reply via email to