This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new cef2c5e64b [Python] parallel read manifest entries (#6451)
cef2c5e64b is described below
commit cef2c5e64bcb927d42e0ca2d3a9302cb18e3b4aa
Author: ChengHui Chen <[email protected]>
AuthorDate: Wed Oct 22 15:43:22 2025 +0800
[Python] parallel read manifest entries (#6451)
---
paimon-python/pypaimon/common/core_options.py | 1 +
.../pypaimon/manifest/manifest_file_manager.py | 39 +++++++++++++++++++-
.../pypaimon/read/scanner/full_starting_scanner.py | 41 +++++++---------------
.../read/scanner/incremental_starting_scanner.py | 8 ++++-
4 files changed, 59 insertions(+), 30 deletions(-)
diff --git a/paimon-python/pypaimon/common/core_options.py
b/paimon-python/pypaimon/common/core_options.py
index cbf35b33e4..be7eff98d2 100644
--- a/paimon-python/pypaimon/common/core_options.py
+++ b/paimon-python/pypaimon/common/core_options.py
@@ -33,6 +33,7 @@ class CoreOptions(str, Enum):
BUCKET = "bucket"
BUCKET_KEY = "bucket-key"
WAREHOUSE = "warehouse"
+ MANIFEST_READ_THREADS = "manifest_read_threads"
# File format options
FILE_FORMAT = "file.format"
FILE_FORMAT_ORC = "orc"
diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py
b/paimon-python/pypaimon/manifest/manifest_file_manager.py
index 9fc92a4113..98180e7b41 100644
--- a/paimon-python/pypaimon/manifest/manifest_file_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py
@@ -15,14 +15,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
+from concurrent.futures import ThreadPoolExecutor
from io import BytesIO
-from typing import List
+from typing import List, Tuple, Set
import fastavro
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.manifest.schema.manifest_entry import (MANIFEST_ENTRY_SCHEMA,
ManifestEntry)
+from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta
from pypaimon.manifest.schema.simple_stats import SimpleStats
from pypaimon.table.row.generic_row import (GenericRowDeserializer,
GenericRowSerializer)
@@ -42,6 +44,41 @@ class ManifestFileManager:
self.primary_keys_fields = self.table.primary_keys_fields
self.trimmed_primary_keys_fields =
self.table.trimmed_primary_keys_fields
+ def read_entries_parallel(self, manifest_files: List[ManifestFileMeta],
manifest_file_filter=None,
+ manifest_entry_filter=None, drop_stats=True,
max_workers=8) -> List[ManifestEntry]:
+
+ def _process_single_manifest(manifest_file: ManifestFileMeta) ->
Tuple[List[ManifestEntry], Set[tuple]]:
+ local_added = []
+ local_deleted_keys = set()
+ if manifest_file_filter and not
manifest_file_filter(manifest_file):
+ return local_added, local_deleted_keys
+ manifest_entries = self.read(manifest_file.file_name,
manifest_entry_filter, drop_stats)
+ for entry in manifest_entries:
+ if entry.kind == 0:
+ local_added.append(entry)
+ else:
+ key = (tuple(entry.partition.values), entry.bucket,
entry.file.file_name)
+ local_deleted_keys.add(key)
+ local_final_added = [
+ entry for entry in local_added
+ if (tuple(entry.partition.values), entry.bucket,
entry.file.file_name) not in local_deleted_keys
+ ]
+ return local_final_added, local_deleted_keys
+
+ deleted_entry_keys = set()
+ added_entries = []
+ with ThreadPoolExecutor(max_workers=max_workers) as executor:
+ future_results = executor.map(_process_single_manifest,
manifest_files)
+ for added, deleted_keys in future_results:
+ added_entries.extend(added)
+ deleted_entry_keys.update(deleted_keys)
+
+ final_entries = [
+ entry for entry in added_entries
+ if (tuple(entry.partition.values), entry.bucket,
entry.file.file_name) not in deleted_entry_keys
+ ]
+ return final_entries
+
def read(self, manifest_file_name: str, manifest_entry_filter=None,
drop_stats=True) -> List[ManifestEntry]:
manifest_file_path = self.manifest_path / manifest_file_name
diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
index dc1b178135..1b21619309 100644
--- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
@@ -15,6 +15,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
+import os
from collections import defaultdict
from typing import Callable, List, Optional
@@ -95,35 +96,12 @@ class FullStartingScanner(StartingScanner):
if not latest_snapshot:
return []
manifest_files = self.manifest_list_manager.read_all(latest_snapshot)
- return self.read_manifest_entries(manifest_files)
- def read_manifest_entries(self, manifest_files: List[ManifestFileMeta]) ->
List[ManifestEntry]:
- def filter_manifest_file(file: ManifestFileMeta) -> bool:
- if not self.partition_key_predicate:
- return True
- return self.partition_key_predicate.test_by_simple_stats(
- file.partition_stats,
- file.num_added_files + file.num_deleted_files)
-
- deleted_entries = set()
- added_entries = []
- for manifest_file in manifest_files:
- if not filter_manifest_file(manifest_file):
- continue
- manifest_entries = self.manifest_file_manager.read(
- manifest_file.file_name,
- lambda row: self._filter_manifest_entry(row))
- for entry in manifest_entries:
- if entry.kind == 0:
- added_entries.append(entry)
- else:
- deleted_entries.add((tuple(entry.partition.values),
entry.bucket, entry.file.file_name))
-
- file_entries = [
- entry for entry in added_entries
- if (tuple(entry.partition.values), entry.bucket,
entry.file.file_name) not in deleted_entries
- ]
- return file_entries
+ max_workers =
int(self.table.options.get(CoreOptions.MANIFEST_READ_THREADS, (os.cpu_count()
or 4) * 2))
+ return self.manifest_file_manager.read_entries_parallel(manifest_files,
+
self._filter_manifest_file,
+
self._filter_manifest_entry,
+
max_workers=max_workers)
def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) ->
'FullStartingScanner':
if idx_of_this_subtask >= number_of_para_subtasks:
@@ -224,6 +202,13 @@ class FullStartingScanner(StartingScanner):
return limited_splits
+ def _filter_manifest_file(self, file: ManifestFileMeta) -> bool:
+ if not self.partition_key_predicate:
+ return True
+ return self.partition_key_predicate.test_by_simple_stats(
+ file.partition_stats,
+ file.num_added_files + file.num_deleted_files)
+
def _filter_manifest_entry(self, entry: ManifestEntry) -> bool:
if self.only_read_real_buckets and entry.bucket < 0:
return False
diff --git
a/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py
b/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py
index 0139dafc40..0aa1274b7e 100644
--- a/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py
@@ -15,8 +15,10 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
+import os
from typing import List, Optional
+from pypaimon.common.core_options import CoreOptions
from pypaimon.common.predicate import Predicate
from pypaimon.manifest.schema.manifest_entry import ManifestEntry
from pypaimon.read.scanner.full_starting_scanner import FullStartingScanner
@@ -43,7 +45,11 @@ class IncrementalStartingScanner(FullStartingScanner):
for snapshot in snapshots_in_range:
# Get manifest files for this snapshot
manifest_files = self.manifest_list_manager.read_delta(snapshot)
- file_entries.extend(self.read_manifest_entries(manifest_files))
+ max_workers =
int(self.table.options.get(CoreOptions.MANIFEST_READ_THREADS, (os.cpu_count()
or 4) * 2))
+
file_entries.extend(self.manifest_file_manager.read_entries_parallel(manifest_files,
+
self._filter_manifest_file,
+
self._filter_manifest_entry,
+
max_workers=max_workers))
return file_entries
@staticmethod