This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new aa11de1090 [Python] optimize codes related to push_down_utils (#6430)
aa11de1090 is described below
commit aa11de1090f55daf10721b87545ab2991ca7b991
Author: ChengHui Chen <[email protected]>
AuthorDate: Mon Oct 20 19:19:41 2025 +0800
[Python] optimize codes related to push_down_utils (#6430)
---
paimon-python/pypaimon/read/push_down_utils.py | 14 +++++++-------
.../pypaimon/read/scanner/full_starting_scanner.py | 6 +++---
paimon-python/pypaimon/read/table_read.py | 4 ++--
paimon-python/pypaimon/tests/py36/reader_predicate_test.py | 11 +++++++++++
paimon-python/pypaimon/tests/reader_predicate_test.py | 11 +++++++++++
5 files changed, 34 insertions(+), 12 deletions(-)
diff --git a/paimon-python/pypaimon/read/push_down_utils.py
b/paimon-python/pypaimon/read/push_down_utils.py
index 43892fb5e6..f812341149 100644
--- a/paimon-python/pypaimon/read/push_down_utils.py
+++ b/paimon-python/pypaimon/read/push_down_utils.py
@@ -22,9 +22,9 @@ from pypaimon.common.predicate import Predicate
from pypaimon.common.predicate_builder import PredicateBuilder
-def filter_and_transform_predicate(input_predicate: Predicate, all_fields:
List[str], fields: List[str]):
- new_predicate = filter_predicate_by_fields(input_predicate, fields)
- part_to_index = {element: idx for idx, element in enumerate(fields)}
+def trim_and_transform_predicate(input_predicate: Predicate, all_fields:
List[str], trimmed_keys: List[str]):
+ new_predicate = trim_predicate_by_fields(input_predicate, trimmed_keys)
+ part_to_index = {element: idx for idx, element in enumerate(trimmed_keys)}
mapping: Dict[int, int] = {
i: part_to_index.get(all_fields[i], -1)
for i in range(len(all_fields))
@@ -32,12 +32,12 @@ def filter_and_transform_predicate(input_predicate:
Predicate, all_fields: List[
return _change_index(new_predicate, mapping)
-def filter_predicate_by_fields(input_predicate: Predicate, fields: List[str]):
- if not input_predicate or not fields:
+def trim_predicate_by_fields(input_predicate: Predicate, trimmed_keys:
List[str]):
+ if not input_predicate or not trimmed_keys:
return None
predicates: list[Predicate] = _split_and(input_predicate)
- predicates = [element for element in predicates if
_get_all_fields(element).issubset(fields)]
+ predicates = [element for element in predicates if
_get_all_fields(element).issubset(trimmed_keys)]
return PredicateBuilder.and_predicates(predicates)
@@ -46,7 +46,7 @@ def _split_and(input_predicate: Predicate):
return list()
if input_predicate.method == 'and':
- return list(input_predicate.literals)
+ return [p for element in (input_predicate.literals or []) for p in
_split_and(element)]
return [input_predicate]
diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
index 156b05cdb6..f1ade2c4e8 100644
--- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
@@ -27,7 +27,7 @@ from pypaimon.manifest.schema.manifest_entry import
ManifestEntry
from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta
from pypaimon.read.interval_partition import IntervalPartition, SortedRun
from pypaimon.read.plan import Plan
-from pypaimon.read.push_down_utils import (filter_and_transform_predicate)
+from pypaimon.read.push_down_utils import (trim_and_transform_predicate)
from pypaimon.read.scanner.starting_scanner import StartingScanner
from pypaimon.read.split import Split
from pypaimon.snapshot.snapshot_manager import SnapshotManager
@@ -46,10 +46,10 @@ class FullStartingScanner(StartingScanner):
self.manifest_list_manager = ManifestListManager(table)
self.manifest_file_manager = ManifestFileManager(table)
- self.primary_key_predicate = filter_and_transform_predicate(
+ self.primary_key_predicate = trim_and_transform_predicate(
self.predicate, self.table.field_names,
self.table.table_schema.get_trimmed_primary_keys())
- self.partition_key_predicate = filter_and_transform_predicate(
+ self.partition_key_predicate = trim_and_transform_predicate(
self.predicate, self.table.field_names, self.table.partition_keys)
self.target_split_size = 128 * 1024 * 1024
diff --git a/paimon-python/pypaimon/read/table_read.py
b/paimon-python/pypaimon/read/table_read.py
index 4bf07d37a5..061303862c 100644
--- a/paimon-python/pypaimon/read/table_read.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -22,7 +22,7 @@ import pyarrow
from pypaimon.common.core_options import CoreOptions
from pypaimon.common.predicate import Predicate
-from pypaimon.read.push_down_utils import filter_predicate_by_fields
+from pypaimon.read.push_down_utils import trim_predicate_by_fields
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
from pypaimon.read.split import Split
from pypaimon.read.split_read import (MergeFileSplitRead, RawFileSplitRead,
@@ -112,7 +112,7 @@ class TableRead:
if self.predicate is None:
return None
elif self.table.is_primary_key_table:
- pk_predicate = filter_predicate_by_fields(self.predicate,
self.table.primary_keys)
+ pk_predicate = trim_predicate_by_fields(self.predicate,
self.table.primary_keys)
if not pk_predicate:
return None
return pk_predicate.to_arrow()
diff --git a/paimon-python/pypaimon/tests/py36/reader_predicate_test.py
b/paimon-python/pypaimon/tests/py36/reader_predicate_test.py
index e772205413..4c52fcdc41 100644
--- a/paimon-python/pypaimon/tests/py36/reader_predicate_test.py
+++ b/paimon-python/pypaimon/tests/py36/reader_predicate_test.py
@@ -25,6 +25,7 @@ import pyarrow as pa
from pypaimon import CatalogFactory
from pypaimon import Schema
+from pypaimon.read import push_down_utils
from pypaimon.read.split import Split
@@ -80,3 +81,13 @@ class ReaderPredicateTest(unittest.TestCase):
splits: list[Split] = read_builder.new_scan().plan().splits()
self.assertEqual(len(splits), 1)
self.assertEqual(splits[0].partition.to_dict().get("pt"), 1003)
+
+ def test_trim_predicate(self):
+ predicate_builder =
self.table.new_read_builder().new_predicate_builder()
+ p1 = predicate_builder.between('pt', 1002, 1003)
+ p2 = predicate_builder.and_predicates([predicate_builder.equal('pt',
1003), predicate_builder.equal('a', 3)])
+ predicate = predicate_builder.and_predicates([p1, p2])
+ pred = push_down_utils.trim_predicate_by_fields(predicate,
self.table.partition_keys)
+ self.assertEqual(len(pred.literals), 2)
+ self.assertEqual(pred.literals[0].field, 'pt')
+ self.assertEqual(pred.literals[1].field, 'pt')
diff --git a/paimon-python/pypaimon/tests/reader_predicate_test.py
b/paimon-python/pypaimon/tests/reader_predicate_test.py
index e772205413..4c52fcdc41 100644
--- a/paimon-python/pypaimon/tests/reader_predicate_test.py
+++ b/paimon-python/pypaimon/tests/reader_predicate_test.py
@@ -25,6 +25,7 @@ import pyarrow as pa
from pypaimon import CatalogFactory
from pypaimon import Schema
+from pypaimon.read import push_down_utils
from pypaimon.read.split import Split
@@ -80,3 +81,13 @@ class ReaderPredicateTest(unittest.TestCase):
splits: list[Split] = read_builder.new_scan().plan().splits()
self.assertEqual(len(splits), 1)
self.assertEqual(splits[0].partition.to_dict().get("pt"), 1003)
+
+ def test_trim_predicate(self):
+ predicate_builder =
self.table.new_read_builder().new_predicate_builder()
+ p1 = predicate_builder.between('pt', 1002, 1003)
+ p2 = predicate_builder.and_predicates([predicate_builder.equal('pt',
1003), predicate_builder.equal('a', 3)])
+ predicate = predicate_builder.and_predicates([p1, p2])
+ pred = push_down_utils.trim_predicate_by_fields(predicate,
self.table.partition_keys)
+ self.assertEqual(len(pred.literals), 2)
+ self.assertEqual(pred.literals[0].field, 'pt')
+ self.assertEqual(pred.literals[1].field, 'pt')