This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new cef2c5e64b [Python] parallel read manifest entries (#6451)
cef2c5e64b is described below

commit cef2c5e64bcb927d42e0ca2d3a9302cb18e3b4aa
Author: ChengHui Chen <[email protected]>
AuthorDate: Wed Oct 22 15:43:22 2025 +0800

    [Python] parallel read manifest entries (#6451)
---
 paimon-python/pypaimon/common/core_options.py      |  1 +
 .../pypaimon/manifest/manifest_file_manager.py     | 39 +++++++++++++++++++-
 .../pypaimon/read/scanner/full_starting_scanner.py | 41 +++++++---------------
 .../read/scanner/incremental_starting_scanner.py   |  8 ++++-
 4 files changed, 59 insertions(+), 30 deletions(-)

diff --git a/paimon-python/pypaimon/common/core_options.py 
b/paimon-python/pypaimon/common/core_options.py
index cbf35b33e4..be7eff98d2 100644
--- a/paimon-python/pypaimon/common/core_options.py
+++ b/paimon-python/pypaimon/common/core_options.py
@@ -33,6 +33,7 @@ class CoreOptions(str, Enum):
     BUCKET = "bucket"
     BUCKET_KEY = "bucket-key"
     WAREHOUSE = "warehouse"
+    MANIFEST_READ_THREADS = "manifest_read_threads"
     # File format options
     FILE_FORMAT = "file.format"
     FILE_FORMAT_ORC = "orc"
diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py 
b/paimon-python/pypaimon/manifest/manifest_file_manager.py
index 9fc92a4113..98180e7b41 100644
--- a/paimon-python/pypaimon/manifest/manifest_file_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py
@@ -15,14 +15,16 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
+from concurrent.futures import ThreadPoolExecutor
 from io import BytesIO
-from typing import List
+from typing import List, Tuple, Set
 
 import fastavro
 
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
 from pypaimon.manifest.schema.manifest_entry import (MANIFEST_ENTRY_SCHEMA,
                                                      ManifestEntry)
+from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta
 from pypaimon.manifest.schema.simple_stats import SimpleStats
 from pypaimon.table.row.generic_row import (GenericRowDeserializer,
                                             GenericRowSerializer)
@@ -42,6 +44,41 @@ class ManifestFileManager:
         self.primary_keys_fields = self.table.primary_keys_fields
         self.trimmed_primary_keys_fields = 
self.table.trimmed_primary_keys_fields
 
+    def read_entries_parallel(self, manifest_files: List[ManifestFileMeta], 
manifest_file_filter=None,
+                              manifest_entry_filter=None, drop_stats=True, 
max_workers=8) -> List[ManifestEntry]:
+
+        def _process_single_manifest(manifest_file: ManifestFileMeta) -> 
Tuple[List[ManifestEntry], Set[tuple]]:
+            local_added = []
+            local_deleted_keys = set()
+            if manifest_file_filter and not 
manifest_file_filter(manifest_file):
+                return local_added, local_deleted_keys
+            manifest_entries = self.read(manifest_file.file_name, 
manifest_entry_filter, drop_stats)
+            for entry in manifest_entries:
+                if entry.kind == 0:
+                    local_added.append(entry)
+                else:
+                    key = (tuple(entry.partition.values), entry.bucket, 
entry.file.file_name)
+                    local_deleted_keys.add(key)
+            local_final_added = [
+                entry for entry in local_added
+                if (tuple(entry.partition.values), entry.bucket, 
entry.file.file_name) not in local_deleted_keys
+            ]
+            return local_final_added, local_deleted_keys
+
+        deleted_entry_keys = set()
+        added_entries = []
+        with ThreadPoolExecutor(max_workers=max_workers) as executor:
+            future_results = executor.map(_process_single_manifest, 
manifest_files)
+            for added, deleted_keys in future_results:
+                added_entries.extend(added)
+                deleted_entry_keys.update(deleted_keys)
+
+        final_entries = [
+            entry for entry in added_entries
+            if (tuple(entry.partition.values), entry.bucket, 
entry.file.file_name) not in deleted_entry_keys
+        ]
+        return final_entries
+
     def read(self, manifest_file_name: str, manifest_entry_filter=None, 
drop_stats=True) -> List[ManifestEntry]:
         manifest_file_path = self.manifest_path / manifest_file_name
 
diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py 
b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
index dc1b178135..1b21619309 100644
--- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
@@ -15,6 +15,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 
or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 """
+import os
 from collections import defaultdict
 from typing import Callable, List, Optional
 
@@ -95,35 +96,12 @@ class FullStartingScanner(StartingScanner):
         if not latest_snapshot:
             return []
         manifest_files = self.manifest_list_manager.read_all(latest_snapshot)
-        return self.read_manifest_entries(manifest_files)
 
-    def read_manifest_entries(self, manifest_files: List[ManifestFileMeta]) -> 
List[ManifestEntry]:
-        def filter_manifest_file(file: ManifestFileMeta) -> bool:
-            if not self.partition_key_predicate:
-                return True
-            return self.partition_key_predicate.test_by_simple_stats(
-                file.partition_stats,
-                file.num_added_files + file.num_deleted_files)
-
-        deleted_entries = set()
-        added_entries = []
-        for manifest_file in manifest_files:
-            if not filter_manifest_file(manifest_file):
-                continue
-            manifest_entries = self.manifest_file_manager.read(
-                manifest_file.file_name,
-                lambda row: self._filter_manifest_entry(row))
-            for entry in manifest_entries:
-                if entry.kind == 0:
-                    added_entries.append(entry)
-                else:
-                    deleted_entries.add((tuple(entry.partition.values), 
entry.bucket, entry.file.file_name))
-
-        file_entries = [
-            entry for entry in added_entries
-            if (tuple(entry.partition.values), entry.bucket, 
entry.file.file_name) not in deleted_entries
-        ]
-        return file_entries
+        max_workers = 
int(self.table.options.get(CoreOptions.MANIFEST_READ_THREADS, (os.cpu_count() 
or 4) * 2))
+        return self.manifest_file_manager.read_entries_parallel(manifest_files,
+                                                                
self._filter_manifest_file,
+                                                                
self._filter_manifest_entry,
+                                                                
max_workers=max_workers)
 
     def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 
'FullStartingScanner':
         if idx_of_this_subtask >= number_of_para_subtasks:
@@ -224,6 +202,13 @@ class FullStartingScanner(StartingScanner):
 
         return limited_splits
 
+    def _filter_manifest_file(self, file: ManifestFileMeta) -> bool:
+        if not self.partition_key_predicate:
+            return True
+        return self.partition_key_predicate.test_by_simple_stats(
+            file.partition_stats,
+            file.num_added_files + file.num_deleted_files)
+
     def _filter_manifest_entry(self, entry: ManifestEntry) -> bool:
         if self.only_read_real_buckets and entry.bucket < 0:
             return False
diff --git 
a/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py 
b/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py
index 0139dafc40..0aa1274b7e 100644
--- a/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py
@@ -15,8 +15,10 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 
or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 """
+import os
 from typing import List, Optional
 
+from pypaimon.common.core_options import CoreOptions
 from pypaimon.common.predicate import Predicate
 from pypaimon.manifest.schema.manifest_entry import ManifestEntry
 from pypaimon.read.scanner.full_starting_scanner import FullStartingScanner
@@ -43,7 +45,11 @@ class IncrementalStartingScanner(FullStartingScanner):
         for snapshot in snapshots_in_range:
             # Get manifest files for this snapshot
             manifest_files = self.manifest_list_manager.read_delta(snapshot)
-            file_entries.extend(self.read_manifest_entries(manifest_files))
+            max_workers = 
int(self.table.options.get(CoreOptions.MANIFEST_READ_THREADS, (os.cpu_count() 
or 4) * 2))
+            
file_entries.extend(self.manifest_file_manager.read_entries_parallel(manifest_files,
+                                                                               
  self._filter_manifest_file,
+                                                                               
  self._filter_manifest_entry,
+                                                                               
  max_workers=max_workers))
         return file_entries
 
     @staticmethod

Reply via email to