This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8012ff7deb [python] Support filter by _ROW_ID for data evolution
(#7252)
8012ff7deb is described below
commit 8012ff7deb53a5c5848957d606cf95ee75b10a8e
Author: XiaoHongbo <[email protected]>
AuthorDate: Wed Feb 11 11:57:57 2026 +0800
[python] Support filter by _ROW_ID for data evolution (#7252)
---
paimon-python/pypaimon/globalindex/range.py | 16 ++++
paimon-python/pypaimon/read/push_down_utils.py | 36 +++++++++
.../pypaimon/read/scanner/file_scanner.py | 87 ++++++++++++++++++++-
.../pypaimon/tests/data_evolution_test.py | 88 ++++++++++++++++++++++
paimon-python/pypaimon/tests/range_test.py | 86 +++++++++++++++++++++
.../pypaimon/tests/reader_predicate_test.py | 24 ++++++
6 files changed, 335 insertions(+), 2 deletions(-)
diff --git a/paimon-python/pypaimon/globalindex/range.py
b/paimon-python/pypaimon/globalindex/range.py
index 19b9b40e94..0d65e98974 100644
--- a/paimon-python/pypaimon/globalindex/range.py
+++ b/paimon-python/pypaimon/globalindex/range.py
@@ -75,6 +75,22 @@ class Range:
return result
+ @staticmethod
+ def to_ranges(values: List[int]) -> List['Range']:
+ if not values:
+ return []
+ sorted_ids = sorted(values)
+ result = []
+ range_start = sorted_ids[0]
+ range_end = range_start
+ for current in sorted_ids[1:]:
+ if current != range_end + 1:
+ result.append(Range(range_start, range_end))
+ range_start = current
+ range_end = current
+ result.append(Range(range_start, range_end))
+ return result
+
@staticmethod
def intersect(start1: int, end1: int, start2: int, end2: int) -> bool:
"""Check if two ranges intersect."""
diff --git a/paimon-python/pypaimon/read/push_down_utils.py
b/paimon-python/pypaimon/read/push_down_utils.py
index 7ad7e53acc..47f21f3869 100644
--- a/paimon-python/pypaimon/read/push_down_utils.py
+++ b/paimon-python/pypaimon/read/push_down_utils.py
@@ -88,3 +88,39 @@ def _get_all_fields(predicate: Predicate) -> Set[str]:
for sub_predicate in predicate.literals:
involved_fields.update(_get_all_fields(sub_predicate))
return involved_fields
+
+
+def remove_row_id_filter(predicate: Predicate) -> Optional[Predicate]:
+ from pypaimon.table.special_fields import SpecialFields
+
+ if not predicate:
+ return None
+ if predicate.field == SpecialFields.ROW_ID.name:
+ return None
+ if predicate.method == "and":
+ parts = _split_and(predicate)
+ non_row_id = [
+ p for p in parts
+ if _get_all_fields(p) != {SpecialFields.ROW_ID.name}
+ ]
+ if not non_row_id:
+ return None
+ filtered = []
+ for p in non_row_id:
+ r = remove_row_id_filter(p)
+ if r is None:
+ return None
+ filtered.append(r)
+ return PredicateBuilder.and_predicates(filtered)
+ if predicate.method == "or":
+ new_children = []
+ for c in predicate.literals or []:
+ r = remove_row_id_filter(c)
+ if r is not None:
+ new_children.append(r)
+ if not new_children:
+ return None
+ if len(new_children) == 1:
+ return new_children[0]
+ return PredicateBuilder.or_predicates(new_children)
+ return predicate
diff --git a/paimon-python/pypaimon/read/scanner/file_scanner.py
b/paimon-python/pypaimon/read/scanner/file_scanner.py
index b5d13e561d..8167a743dd 100755
--- a/paimon-python/pypaimon/read/scanner/file_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/file_scanner.py
@@ -31,7 +31,10 @@ from pypaimon.manifest.manifest_list_manager import
ManifestListManager
from pypaimon.manifest.schema.manifest_entry import ManifestEntry
from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta
from pypaimon.read.plan import Plan
-from pypaimon.read.push_down_utils import (trim_and_transform_predicate)
+from pypaimon.read.push_down_utils import (
+ remove_row_id_filter,
+ trim_and_transform_predicate,
+)
from pypaimon.read.scanner.append_table_split_generator import
AppendTableSplitGenerator
from pypaimon.read.scanner.data_evolution_split_generator import
DataEvolutionSplitGenerator
from pypaimon.read.scanner.primary_key_table_split_generator import
PrimaryKeyTableSplitGenerator
@@ -41,6 +44,53 @@ from pypaimon.table.bucket_mode import BucketMode
from pypaimon.manifest.simple_stats_evolutions import SimpleStatsEvolutions
+def _row_ranges_from_predicate(predicate: Optional[Predicate]) ->
Optional[List]:
+ from pypaimon.globalindex.range import Range
+ from pypaimon.table.special_fields import SpecialFields
+
+ if predicate is None:
+ return None
+
+ def visit(p: Predicate):
+ if p.method == 'and':
+ result = None
+ for child in p.literals:
+ sub = visit(child)
+ if sub is None:
+ continue
+ result = Range.and_(result, sub) if result is not None else sub
+ if not result:
+ return result
+ return result
+ if p.method == 'or':
+ parts = []
+ for child in p.literals:
+ sub = visit(child)
+ if sub is None:
+ return None
+ parts.extend(sub)
+ if not parts:
+ return []
+ return Range.sort_and_merge_overlap(parts, merge=True,
adjacent=True)
+ if p.field != SpecialFields.ROW_ID.name:
+ return None
+ if p.method == 'equal':
+ if not p.literals:
+ return []
+ return Range.to_ranges([int(p.literals[0])])
+ if p.method == 'in':
+ if not p.literals:
+ return []
+ return Range.to_ranges([int(x) for x in p.literals])
+ if p.method == 'between':
+ if not p.literals or len(p.literals) < 2:
+ return []
+ return [Range(int(p.literals[0]), int(p.literals[1]))]
+ return None
+
+ return visit(predicate)
+
+
def _filter_manifest_files_by_row_ranges(
manifest_files: List[ManifestFileMeta],
row_ranges: List) -> List[ManifestFileMeta]:
@@ -89,6 +139,31 @@ def _filter_manifest_files_by_row_ranges(
return filtered_files
+def _filter_manifest_entries_by_row_ranges(
+ entries: List[ManifestEntry],
+ row_ranges: List) -> List[ManifestEntry]:
+ from pypaimon.globalindex.range import Range
+
+ if not row_ranges:
+ return []
+
+ filtered = []
+ for entry in entries:
+ first_row_id = entry.file.first_row_id
+ if first_row_id is None:
+ filtered.append(entry)
+ continue
+ file_range = Range(
+ first_row_id,
+ first_row_id + entry.file.row_count - 1
+ )
+ for r in row_ranges:
+ if file_range.overlaps(r):
+ filtered.append(entry)
+ break
+ return filtered
+
+
class FileScanner:
def __init__(
self,
@@ -103,6 +178,7 @@ class FileScanner:
self.table: FileStoreTable = table
self.manifest_scanner = manifest_scanner
self.predicate = predicate
+ self.predicate_for_stats = remove_row_id_filter(predicate) if
predicate else None
self.limit = limit
self.vector_search = vector_search
@@ -196,6 +272,8 @@ class FileScanner:
row_ranges = global_index_result.results().to_range_list()
if isinstance(global_index_result, ScoredGlobalIndexResult):
score_getter = global_index_result.score_getter()
+ if row_ranges is None and self.predicate is not None:
+ row_ranges = _row_ranges_from_predicate(self.predicate)
manifest_files = self.manifest_scanner()
@@ -205,6 +283,9 @@ class FileScanner:
entries = self.read_manifest_entries(manifest_files)
+ if row_ranges is not None:
+ entries = _filter_manifest_entries_by_row_ranges(entries,
row_ranges)
+
return entries, DataEvolutionSplitGenerator(
self.table,
self.target_split_size,
@@ -353,6 +434,8 @@ class FileScanner:
else:
if not self.predicate:
return True
+ if self.predicate_for_stats is None:
+ return True
if entry.file.value_stats_cols is None and entry.file.write_cols
is not None:
stats_fields = entry.file.write_cols
else:
@@ -362,7 +445,7 @@ class FileScanner:
entry.file.row_count,
stats_fields
)
- return self.predicate.test_by_simple_stats(
+ return self.predicate_for_stats.test_by_simple_stats(
evolved_stats,
entry.file.row_count
)
diff --git a/paimon-python/pypaimon/tests/data_evolution_test.py
b/paimon-python/pypaimon/tests/data_evolution_test.py
index a9f0de508b..cfb09a0caf 100644
--- a/paimon-python/pypaimon/tests/data_evolution_test.py
+++ b/paimon-python/pypaimon/tests/data_evolution_test.py
@@ -18,6 +18,7 @@ limitations under the License.
import os
import tempfile
import unittest
+from types import SimpleNamespace
import pyarrow as pa
@@ -587,6 +588,93 @@ class DataEvolutionTest(unittest.TestCase):
'_SEQUENCE_NUMBER must be non-nullable per SpecialFields',
)
+ rb_with_row_id = table.new_read_builder().with_projection(['f0', 'f1',
'f2', '_ROW_ID'])
+ pb = rb_with_row_id.new_predicate_builder()
+ rb_eq0 = table.new_read_builder().with_filter(pb.equal('_ROW_ID', 0))
+ result_eq0 =
rb_eq0.new_read().to_arrow(rb_eq0.new_scan().plan().splits())
+ self.assertEqual(result_eq0, pa.Table.from_pydict(
+ {'f0': [1], 'f1': ['a'], 'f2': ['b']}, schema=simple_pa_schema))
+ rb_eq1 = table.new_read_builder().with_filter(pb.equal('_ROW_ID', 1))
+ result_eq1 =
rb_eq1.new_read().to_arrow(rb_eq1.new_scan().plan().splits())
+ self.assertEqual(result_eq1, pa.Table.from_pydict(
+ {'f0': [2], 'f1': ['c'], 'f2': ['d']}, schema=simple_pa_schema))
+ rb_in = table.new_read_builder().with_filter(pb.is_in('_ROW_ID', [0,
1]))
+ result_in = rb_in.new_read().to_arrow(rb_in.new_scan().plan().splits())
+ self.assertEqual(result_in, expect)
+
+ def test_filter_by_row_id(self):
+ simple_pa_schema = pa.schema([('f0', pa.int32())])
+ schema = Schema.from_pyarrow_schema(
+ simple_pa_schema,
+ options={'row-tracking.enabled': 'true', 'data-evolution.enabled':
'true'},
+ )
+ self.catalog.create_table('default.test_row_id_filter_empty_and_or',
schema, False)
+ table =
self.catalog.get_table('default.test_row_id_filter_empty_and_or')
+ write_builder = table.new_batch_write_builder()
+
+ # Commit 1: _ROW_ID 0, 1 with f0=1, 2
+ w = write_builder.new_write().with_write_type(['f0'])
+ c = write_builder.new_commit()
+ w.write_arrow(pa.Table.from_pydict(
+ {'f0': [1, 2]}, schema=pa.schema([('f0', pa.int32())])))
+ cmts = w.prepare_commit()
+ for msg in cmts:
+ for nf in msg.new_files:
+ nf.first_row_id = 0
+ c.commit(cmts)
+ w.close()
+ c.close()
+
+ # Commit 2: _ROW_ID 2, 3 with f0=101, 102
+ w = write_builder.new_write().with_write_type(['f0'])
+ c = write_builder.new_commit()
+ w.write_arrow(pa.Table.from_pydict(
+ {'f0': [101, 102]}, schema=pa.schema([('f0', pa.int32())])))
+ cmts = w.prepare_commit()
+ for msg in cmts:
+ for nf in msg.new_files:
+ nf.first_row_id = 2
+ c.commit(cmts)
+ w.close()
+ c.close()
+
+ rb_with_row_id = table.new_read_builder().with_projection(['f0',
'_ROW_ID'])
+ pb = rb_with_row_id.new_predicate_builder()
+
+ # 1. Non-existent _ROW_ID -> empty
+ rb_eq999 = table.new_read_builder().with_filter(pb.equal('_ROW_ID',
999))
+ result_eq999 =
rb_eq999.new_read().to_arrow(rb_eq999.new_scan().plan().splits())
+ self.assertEqual(len(result_eq999), 0, "Non-existent _ROW_ID should
return empty")
+
+ # 2. AND: _ROW_ID=0 AND f0=1 -> 1 row
+ rb_and = table.new_read_builder().with_filter(
+ pb.and_predicates([pb.equal('_ROW_ID', 0), pb.equal('f0', 1)])
+ )
+ result_and =
rb_and.new_read().to_arrow(rb_and.new_scan().plan().splits())
+ self.assertEqual(len(result_and), 1)
+ self.assertEqual(result_and['f0'][0].as_py(), 1)
+
+ # 3. OR: _ROW_ID=0 OR f0>100 -> at least row with _ROW_ID=0 and all
f0>100
+ rb_or = table.new_read_builder().with_filter(
+ pb.or_predicates([pb.equal('_ROW_ID', 0), pb.greater_than('f0',
100)])
+ )
+ result_or = rb_or.new_read().to_arrow(rb_or.new_scan().plan().splits())
+ f0_vals = set(result_or['f0'][i].as_py() for i in
range(len(result_or)))
+ self.assertGreaterEqual(len(result_or), 3, "OR should return _ROW_ID=0
row and f0>100 rows")
+ self.assertIn(1, f0_vals, "_ROW_ID=0 row has f0=1")
+ self.assertIn(101, f0_vals)
+ self.assertIn(102, f0_vals)
+
+ def test_filter_manifest_entries_by_row_ranges(self):
+ from pypaimon.read.scanner.file_scanner import
_filter_manifest_entries_by_row_ranges
+
+ entry_0 = SimpleNamespace(file=SimpleNamespace(first_row_id=0,
row_count=1))
+ entries = [entry_0]
+ row_ranges = []
+
+ filtered = _filter_manifest_entries_by_row_ranges(entries, row_ranges)
+ self.assertEqual(filtered, [], "empty row_ranges must return no
entries, not all entries")
+
def test_more_data(self):
simple_pa_schema = pa.schema([
('f0', pa.int32()),
diff --git a/paimon-python/pypaimon/tests/range_test.py
b/paimon-python/pypaimon/tests/range_test.py
new file mode 100644
index 0000000000..f3cbea9f2e
--- /dev/null
+++ b/paimon-python/pypaimon/tests/range_test.py
@@ -0,0 +1,86 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import unittest
+
+from pypaimon.common.predicate import Predicate
+from pypaimon.globalindex.range import Range
+from pypaimon.read.scanner.file_scanner import _row_ranges_from_predicate
+from pypaimon.table.special_fields import SpecialFields
+
+
+class RangeTest(unittest.TestCase):
+
+ def test_to_ranges(self):
+ assert Range.to_ranges([]) == []
+ assert Range.to_ranges([5]) == [Range(5, 5)]
+ assert Range.to_ranges([1, 2, 3]) == [Range(1, 3)]
+ assert Range.to_ranges([1, 3, 5]) == [
+ Range(1, 1), Range(3, 3), Range(5, 5)
+ ]
+ assert Range.to_ranges([1, 1, 2]) == [Range(1, 1), Range(1, 2)]
+
+ def test_row_ranges_from_predicate(self):
+ assert _row_ranges_from_predicate(None) is None
+
+ pred_eq = Predicate('equal', 0, SpecialFields.ROW_ID.name, [5])
+ assert _row_ranges_from_predicate(pred_eq) == [Range(5, 5)]
+
+ pred_in = Predicate('in', 0, SpecialFields.ROW_ID.name, [10])
+ assert _row_ranges_from_predicate(pred_in) == [Range(10, 10)]
+ pred_in_multi = Predicate('in', 0, SpecialFields.ROW_ID.name, [1, 2,
3, 5, 6])
+ assert _row_ranges_from_predicate(pred_in_multi) == [Range(1, 3),
Range(5, 6)]
+
+ pred_between = Predicate('between', 0, SpecialFields.ROW_ID.name, [10,
20])
+ result = _row_ranges_from_predicate(pred_between)
+ assert result is not None and result == [Range(10, 20)]
+
+ pred_other = Predicate('equal', 0, 'other_field', [5])
+ assert _row_ranges_from_predicate(pred_other) is None
+
+ pred_gt = Predicate('greaterThan', 0, SpecialFields.ROW_ID.name, [10])
+ assert _row_ranges_from_predicate(pred_gt) is None
+
+ assert _row_ranges_from_predicate(
+ Predicate('equal', 0, SpecialFields.ROW_ID.name, [])
+ ) == []
+ assert _row_ranges_from_predicate(
+ Predicate('in', 0, SpecialFields.ROW_ID.name, [])
+ ) == []
+ assert _row_ranges_from_predicate(
+ Predicate('between', 0, SpecialFields.ROW_ID.name, [10])
+ ) == []
+
+ pred_eq5 = Predicate('equal', 0, SpecialFields.ROW_ID.name, [5])
+ pred_between_1_10 = Predicate('between', 0, SpecialFields.ROW_ID.name,
[1, 10])
+ pred_and = Predicate('and', None, None, [pred_eq5, pred_between_1_10])
+ assert _row_ranges_from_predicate(pred_and) == [Range(5, 5)]
+ pred_eq3 = Predicate('equal', 0, SpecialFields.ROW_ID.name, [3])
+ pred_and_no = Predicate('and', None, None, [pred_eq5, pred_eq3])
+ assert _row_ranges_from_predicate(pred_and_no) == []
+
+ pred_or = Predicate('or', None, None, [pred_eq5, pred_eq3])
+ assert _row_ranges_from_predicate(pred_or) == [Range(3, 3), Range(5,
5)]
+ pred_between_1_5 = Predicate('between', 0, SpecialFields.ROW_ID.name,
[1, 5])
+ pred_between_3_7 = Predicate('between', 0, SpecialFields.ROW_ID.name,
[3, 7])
+ pred_or_overlap = Predicate('or', None, None, [pred_between_1_5,
pred_between_3_7])
+ assert _row_ranges_from_predicate(pred_or_overlap) == [Range(1, 7)]
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/paimon-python/pypaimon/tests/reader_predicate_test.py
b/paimon-python/pypaimon/tests/reader_predicate_test.py
index 4c52fcdc41..9fcc18df22 100644
--- a/paimon-python/pypaimon/tests/reader_predicate_test.py
+++ b/paimon-python/pypaimon/tests/reader_predicate_test.py
@@ -25,8 +25,10 @@ import pyarrow as pa
from pypaimon import CatalogFactory
from pypaimon import Schema
+from pypaimon.common.predicate_builder import PredicateBuilder
from pypaimon.read import push_down_utils
from pypaimon.read.split import Split
+from pypaimon.schema.data_types import AtomicType, DataField
class ReaderPredicateTest(unittest.TestCase):
@@ -91,3 +93,25 @@ class ReaderPredicateTest(unittest.TestCase):
self.assertEqual(len(pred.literals), 2)
self.assertEqual(pred.literals[0].field, 'pt')
self.assertEqual(pred.literals[1].field, 'pt')
+
+ def test_remove_row_id_filter(self):
+ fields = [
+ DataField(0, '_ROW_ID', AtomicType('BIGINT')),
+ DataField(1, 'f0', AtomicType('INT')),
+ ]
+ pb = PredicateBuilder(fields)
+ and_pred = pb.and_predicates([pb.equal('_ROW_ID', 1),
pb.greater_than('f0', 5)])
+ result = push_down_utils.remove_row_id_filter(and_pred)
+ self.assertIsNotNone(result)
+ self.assertEqual(result.field, 'f0')
+ self.assertEqual(result.method, 'greaterThan')
+ or_mixed = pb.or_predicates([pb.equal('_ROW_ID', 1),
pb.greater_than('f0', 5)])
+ result = push_down_utils.remove_row_id_filter(or_mixed)
+ self.assertIsNotNone(result, "OR: strip _ROW_ID child, keep f0>5 (same
as Java)")
+ self.assertEqual(result.field, 'f0')
+ self.assertEqual(result.method, 'greaterThan')
+ or_no_row_id = pb.or_predicates([pb.greater_than('f0', 5),
pb.less_than('f0', 10)])
+ result = push_down_utils.remove_row_id_filter(or_no_row_id)
+ self.assertIsNotNone(result)
+ self.assertEqual(result.method, 'or')
+ self.assertEqual(len(result.literals), 2)