This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e0ac877179 [python] Filter manifest files by partition predicate in
scan (#6419)
e0ac877179 is described below
commit e0ac8771797fc94c6beafbfd27464639125fe996
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Oct 17 22:28:00 2025 +0200
[python] Filter manifest files by partition predicate in scan (#6419)
---
paimon-python/pypaimon/common/predicate.py | 25 +++++++
paimon-python/pypaimon/read/push_down_utils.py | 44 ++++++++++++
paimon-python/pypaimon/read/read_builder.py | 3 +-
.../pypaimon/read/scanner/full_starting_scanner.py | 23 ++++--
.../read/scanner/incremental_starting_scanner.py | 10 ++-
paimon-python/pypaimon/read/table_scan.py | 14 ++--
paimon-python/pypaimon/table/file_store_table.py | 1 +
.../pypaimon/tests/py36/reader_predicate_test.py | 82 ++++++++++++++++++++++
.../pypaimon/tests/reader_predicate_test.py | 82 ++++++++++++++++++++++
paimon-python/pypaimon/write/file_store_commit.py | 2 +-
10 files changed, 262 insertions(+), 24 deletions(-)
diff --git a/paimon-python/pypaimon/common/predicate.py
b/paimon-python/pypaimon/common/predicate.py
index c8a4070c6a..4ca8644f6e 100644
--- a/paimon-python/pypaimon/common/predicate.py
+++ b/paimon-python/pypaimon/common/predicate.py
@@ -24,6 +24,7 @@ import pyarrow
from pyarrow import compute as pyarrow_compute
from pyarrow import dataset as pyarrow_dataset
+from pypaimon.manifest.schema.simple_stats import SimpleStats
from pypaimon.table.row.internal_row import InternalRow
@@ -34,6 +35,20 @@ class Predicate:
field: Optional[str]
literals: Optional[List[Any]] = None
+ def new_index(self, index: int):
+ return Predicate(
+ method=self.method,
+ index=index,
+ field=self.field,
+ literals=self.literals)
+
+ def new_literals(self, literals: List[Any]):
+ return Predicate(
+ method=self.method,
+ index=self.index,
+ field=self.field,
+ literals=literals)
+
def test(self, record: InternalRow) -> bool:
if self.method == 'equal':
return record.get_field(self.index) == self.literals[0]
@@ -125,6 +140,16 @@ class Predicate:
raise ValueError("Unsupported predicate method:
{}".format(self.method))
+ def test_by_simple_stats(self, stat: SimpleStats, row_count: int) -> bool:
+ return self.test_by_stats({
+ "min_values": stat.min_values.to_dict(),
+ "max_values": stat.max_values.to_dict(),
+ "null_counts": {
+ stat.min_values.fields[i].name: stat.null_counts[i] for i in
range(len(stat.min_values.fields))
+ },
+ "row_count": row_count,
+ })
+
def test_by_stats(self, stat: Dict) -> bool:
if self.method == 'and':
return all(p.test_by_stats(stat) for p in self.literals)
diff --git a/paimon-python/pypaimon/read/push_down_utils.py
b/paimon-python/pypaimon/read/push_down_utils.py
index 95a99d9005..64e7c238f8 100644
--- a/paimon-python/pypaimon/read/push_down_utils.py
+++ b/paimon-python/pypaimon/read/push_down_utils.py
@@ -21,6 +21,50 @@ from typing import Dict, List, Set
from pypaimon.common.predicate import Predicate
+def to_partition_predicate(input_predicate: 'Predicate', all_fields:
List[str], partition_keys: List[str]):
+ if not input_predicate or not partition_keys:
+ return None
+
+ predicates: list['Predicate'] = _split_and(input_predicate)
+ predicates = [element for element in predicates if
_get_all_fields(element).issubset(partition_keys)]
+ new_predicate = Predicate(
+ method='and',
+ index=None,
+ field=None,
+ literals=predicates
+ )
+
+ part_to_index = {element: idx for idx, element in
enumerate(partition_keys)}
+ mapping: Dict[int, int] = {
+ i: part_to_index.get(all_fields[i], -1)
+ for i in range(len(all_fields))
+ }
+
+ return _change_index(new_predicate, mapping)
+
+
+def _split_and(input_predicate: 'Predicate'):
+ if not input_predicate:
+ return list()
+
+ if input_predicate.method == 'and':
+ return list(input_predicate.literals)
+
+ return [input_predicate]
+
+
+def _change_index(input_predicate: 'Predicate', mapping: Dict[int, int]):
+ if not input_predicate:
+ return None
+
+ if input_predicate.method == 'and' or input_predicate.method == 'or':
+ predicates: list['Predicate'] = input_predicate.literals
+ new_predicates = [_change_index(element, mapping) for element in
predicates]
+ return input_predicate.new_literals(new_predicates)
+
+ return input_predicate.new_index(mapping[input_predicate.index])
+
+
def extract_predicate_to_list(result: list, input_predicate: 'Predicate',
keys: List[str]):
if not input_predicate or not keys:
return
diff --git a/paimon-python/pypaimon/read/read_builder.py
b/paimon-python/pypaimon/read/read_builder.py
index 30f824a698..6fc8026c43 100644
--- a/paimon-python/pypaimon/read/read_builder.py
+++ b/paimon-python/pypaimon/read/read_builder.py
@@ -52,8 +52,7 @@ class ReadBuilder:
return TableScan(
table=self.table,
predicate=self._predicate,
- limit=self._limit,
- read_type=self.read_type()
+ limit=self._limit
)
def new_read(self) -> TableRead:
diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
index 73915a0412..a363a6fb6f 100644
--- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
@@ -25,25 +25,25 @@ from pypaimon.manifest.manifest_file_manager import
ManifestFileManager
from pypaimon.manifest.manifest_list_manager import ManifestListManager
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.manifest.schema.manifest_entry import ManifestEntry
+from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta
from pypaimon.read.interval_partition import IntervalPartition, SortedRun
from pypaimon.read.plan import Plan
from pypaimon.read.push_down_utils import (extract_predicate_to_dict,
- extract_predicate_to_list)
+ extract_predicate_to_list,
+ to_partition_predicate)
from pypaimon.read.scanner.starting_scanner import StartingScanner
from pypaimon.read.split import Split
-from pypaimon.schema.data_types import DataField
from pypaimon.snapshot.snapshot_manager import SnapshotManager
from pypaimon.table.bucket_mode import BucketMode
class FullStartingScanner(StartingScanner):
- def __init__(self, table, predicate: Optional[Predicate], limit:
Optional[int], read_type: List[DataField]):
+ def __init__(self, table, predicate: Optional[Predicate], limit:
Optional[int]):
from pypaimon.table.file_store_table import FileStoreTable
self.table: FileStoreTable = table
self.predicate = predicate
self.limit = limit
- self.read_type = read_type
self.snapshot_manager = SnapshotManager(table)
self.manifest_list_manager = ManifestListManager(table)
@@ -82,15 +82,26 @@ class FullStartingScanner(StartingScanner):
splits = self._apply_push_down_limit(splits)
return Plan(splits)
- def plan_files(self) -> List[ManifestEntry]:
+ def _read_manifest_files(self) -> List[ManifestFileMeta]:
latest_snapshot = self.snapshot_manager.get_latest_snapshot()
if not latest_snapshot:
return []
manifest_files = self.manifest_list_manager.read_all(latest_snapshot)
+ partition_predicate = to_partition_predicate(self.predicate,
self.table.field_names, self.table.partition_keys)
+
+ def test_predicate(file: ManifestFileMeta) -> bool:
+ if not partition_predicate:
+ return True
+ return partition_predicate.test_by_simple_stats(
+ file.partition_stats,
+ file.num_added_files + file.num_deleted_files)
+ return [file for file in manifest_files if test_predicate(file)]
+
+ def plan_files(self) -> List[ManifestEntry]:
+ manifest_files = self._read_manifest_files()
deleted_entries = set()
added_entries = []
- # TODO: filter manifest files by predicate
for manifest_file in manifest_files:
manifest_entries =
self.manifest_file_manager.read(manifest_file.file_name,
lambda row:
self._bucket_filter(row))
diff --git
a/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py
b/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py
index f1a9cd03bc..ead58d260a 100644
--- a/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py
@@ -20,14 +20,13 @@ from typing import List, Optional
from pypaimon.common.predicate import Predicate
from pypaimon.manifest.schema.manifest_entry import ManifestEntry
from pypaimon.read.scanner.full_starting_scanner import FullStartingScanner
-from pypaimon.schema.data_types import DataField
from pypaimon.snapshot.snapshot_manager import SnapshotManager
class IncrementalStartingScanner(FullStartingScanner):
def __init__(self, table, predicate: Optional[Predicate], limit:
Optional[int],
- read_type: List[DataField], start: int, end: int):
- super().__init__(table, predicate, limit, read_type)
+ start: int, end: int):
+ super().__init__(table, predicate, limit)
self.startingSnapshotId = start
self.endingSnapshotId = end
@@ -55,8 +54,7 @@ class IncrementalStartingScanner(FullStartingScanner):
@staticmethod
def between_timestamps(table, predicate: Optional[Predicate], limit:
Optional[int],
- read_type: List[DataField], start_timestamp: int,
- end_timestamp: int) -> 'IncrementalStartingScanner':
+ start_timestamp: int, end_timestamp: int) ->
'IncrementalStartingScanner':
"""
Create an IncrementalStartingScanner for snapshots between two
timestamps.
"""
@@ -74,4 +72,4 @@ class IncrementalStartingScanner(FullStartingScanner):
latest_snapshot = snapshot_manager.get_latest_snapshot()
end_id = end_snapshot.id if end_snapshot else (latest_snapshot.id if
latest_snapshot else -1)
- return IncrementalStartingScanner(table, predicate, limit, read_type,
start_id, end_id)
+ return IncrementalStartingScanner(table, predicate, limit, start_id,
end_id)
diff --git a/paimon-python/pypaimon/read/table_scan.py
b/paimon-python/pypaimon/read/table_scan.py
index 93a7babb30..22994d4bb2 100644
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
-from typing import List, Optional
+from typing import Optional
from pypaimon.common.core_options import CoreOptions
from pypaimon.common.predicate import Predicate
@@ -27,21 +27,18 @@ from pypaimon.read.scanner.full_starting_scanner import
FullStartingScanner
from pypaimon.read.scanner.incremental_starting_scanner import \
IncrementalStartingScanner
from pypaimon.read.scanner.starting_scanner import StartingScanner
-from pypaimon.schema.data_types import DataField
from pypaimon.snapshot.snapshot_manager import SnapshotManager
class TableScan:
"""Implementation of TableScan for native Python reading."""
- def __init__(self, table, predicate: Optional[Predicate], limit:
Optional[int],
- read_type: List[DataField]):
+ def __init__(self, table, predicate: Optional[Predicate], limit:
Optional[int]):
from pypaimon.table.file_store_table import FileStoreTable
self.table: FileStoreTable = table
self.predicate = predicate
self.limit = limit
- self.read_type = read_type
self.starting_scanner = self._create_starting_scanner()
def plan(self) -> Plan:
@@ -67,10 +64,9 @@ class TableScan:
if (start_timestamp == end_timestamp or start_timestamp >
latest_snapshot.time_millis
or end_timestamp < earliest_snapshot.time_millis):
return EmptyStartingScanner()
- return IncrementalStartingScanner.between_timestamps(self.table,
self.predicate, self.limit, self.read_type,
-
start_timestamp,
- end_timestamp)
- return FullStartingScanner(self.table, self.predicate, self.limit,
self.read_type)
+ return IncrementalStartingScanner.between_timestamps(self.table,
self.predicate, self.limit,
+
start_timestamp, end_timestamp)
+ return FullStartingScanner(self.table, self.predicate, self.limit)
def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) ->
'TableScan':
self.starting_scanner.with_shard(idx_of_this_subtask,
number_of_para_subtasks)
diff --git a/paimon-python/pypaimon/table/file_store_table.py
b/paimon-python/pypaimon/table/file_store_table.py
index 57ccbf83bb..f0186b1657 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -46,6 +46,7 @@ class FileStoreTable(Table):
self.table_schema = table_schema
self.fields = table_schema.fields
+ self.field_names = [field.name for field in table_schema.fields]
self.field_dict = {field.name: field for field in self.fields}
self.primary_keys = table_schema.primary_keys
self.partition_keys = table_schema.partition_keys
diff --git a/paimon-python/pypaimon/tests/py36/reader_predicate_test.py
b/paimon-python/pypaimon/tests/py36/reader_predicate_test.py
new file mode 100644
index 0000000000..e772205413
--- /dev/null
+++ b/paimon-python/pypaimon/tests/py36/reader_predicate_test.py
@@ -0,0 +1,82 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory
+from pypaimon import Schema
+from pypaimon.read.split import Split
+
+
+class ReaderPredicateTest(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.tempdir = tempfile.mkdtemp()
+ cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+ cls.catalog = CatalogFactory.create({
+ 'warehouse': cls.warehouse
+ })
+ cls.catalog.create_database('default', False)
+
+ cls.pa_schema = pa.schema([
+ ('a', pa.int64()),
+ ('pt', pa.int64())
+ ])
+ schema = Schema.from_pyarrow_schema(cls.pa_schema,
partition_keys=['pt'])
+ cls.catalog.create_table('default.test_reader_predicate', schema,
False)
+ cls.table = cls.catalog.get_table('default.test_reader_predicate')
+
+ data1 = pa.Table.from_pydict({
+ 'a': [1, 2],
+ 'pt': [1001, 1002]}, schema=cls.pa_schema)
+ write_builder = cls.table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_arrow(data1)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ data2 = pa.Table.from_pydict({
+ 'a': [3, 4],
+ 'pt': [1003, 1004]}, schema=cls.pa_schema)
+ write_builder = cls.table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_arrow(data2)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+ def test_partition_predicate(self):
+ predicate_builder =
self.table.new_read_builder().new_predicate_builder()
+ predicate = predicate_builder.equal('pt', 1003)
+ read_builder = self.table.new_read_builder()
+ read_builder.with_filter(predicate)
+ splits: list[Split] = read_builder.new_scan().plan().splits()
+ self.assertEqual(len(splits), 1)
+ self.assertEqual(splits[0].partition.to_dict().get("pt"), 1003)
diff --git a/paimon-python/pypaimon/tests/reader_predicate_test.py
b/paimon-python/pypaimon/tests/reader_predicate_test.py
new file mode 100644
index 0000000000..e772205413
--- /dev/null
+++ b/paimon-python/pypaimon/tests/reader_predicate_test.py
@@ -0,0 +1,82 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory
+from pypaimon import Schema
+from pypaimon.read.split import Split
+
+
+class ReaderPredicateTest(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.tempdir = tempfile.mkdtemp()
+ cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+ cls.catalog = CatalogFactory.create({
+ 'warehouse': cls.warehouse
+ })
+ cls.catalog.create_database('default', False)
+
+ cls.pa_schema = pa.schema([
+ ('a', pa.int64()),
+ ('pt', pa.int64())
+ ])
+ schema = Schema.from_pyarrow_schema(cls.pa_schema,
partition_keys=['pt'])
+ cls.catalog.create_table('default.test_reader_predicate', schema,
False)
+ cls.table = cls.catalog.get_table('default.test_reader_predicate')
+
+ data1 = pa.Table.from_pydict({
+ 'a': [1, 2],
+ 'pt': [1001, 1002]}, schema=cls.pa_schema)
+ write_builder = cls.table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_arrow(data1)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ data2 = pa.Table.from_pydict({
+ 'a': [3, 4],
+ 'pt': [1003, 1004]}, schema=cls.pa_schema)
+ write_builder = cls.table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_arrow(data2)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+ def test_partition_predicate(self):
+ predicate_builder =
self.table.new_read_builder().new_predicate_builder()
+ predicate = predicate_builder.equal('pt', 1003)
+ read_builder = self.table.new_read_builder()
+ read_builder.with_filter(predicate)
+ splits: list[Split] = read_builder.new_scan().plan().splits()
+ self.assertEqual(len(splits), 1)
+ self.assertEqual(splits[0].partition.to_dict().get("pt"), 1003)
diff --git a/paimon-python/pypaimon/write/file_store_commit.py
b/paimon-python/pypaimon/write/file_store_commit.py
index 97804bbf6b..c2fe33105b 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -102,7 +102,7 @@ class FileStoreCommit:
f"in {msg.partition} does not belong to
this partition")
commit_entries = []
- current_entries = FullStartingScanner(self.table, partition_filter,
None, []).plan_files()
+ current_entries = FullStartingScanner(self.table, partition_filter,
None).plan_files()
for entry in current_entries:
entry.kind = 1
commit_entries.append(entry)