This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 08ff9ba223f83511f244e6097e66ae74cb75d718
Author: JingsongLi <[email protected]>
AuthorDate: Wed Oct 22 16:01:40 2025 +0800

    [Python] max_workers at least 8 for 
manifest_file_manager.read_entries_paralle
---
 paimon-python/pypaimon/common/core_options.py      |  2 +-
 .../pypaimon/manifest/manifest_file_manager.py     | 37 ++++++++--------------
 .../pypaimon/manifest/simple_stats_evolutions.py   |  3 +-
 .../pypaimon/read/scanner/full_starting_scanner.py |  8 +++--
 .../read/scanner/incremental_starting_scanner.py   | 10 ++----
 5 files changed, 24 insertions(+), 36 deletions(-)

diff --git a/paimon-python/pypaimon/common/core_options.py 
b/paimon-python/pypaimon/common/core_options.py
index be7eff98d2..aab339e269 100644
--- a/paimon-python/pypaimon/common/core_options.py
+++ b/paimon-python/pypaimon/common/core_options.py
@@ -33,7 +33,7 @@ class CoreOptions(str, Enum):
     BUCKET = "bucket"
     BUCKET_KEY = "bucket-key"
     WAREHOUSE = "warehouse"
-    MANIFEST_READ_THREADS = "manifest_read_threads"
+    SCAN_MANIFEST_PARALLELISM = "scan.manifest.parallelism"
     # File format options
     FILE_FORMAT = "file.format"
     FILE_FORMAT_ORC = "orc"
diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py 
b/paimon-python/pypaimon/manifest/manifest_file_manager.py
index 98180e7b41..b635f9e49c 100644
--- a/paimon-python/pypaimon/manifest/manifest_file_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py
@@ -17,7 +17,7 @@
 
################################################################################
 from concurrent.futures import ThreadPoolExecutor
 from io import BytesIO
-from typing import List, Tuple, Set
+from typing import List
 
 import fastavro
 
@@ -44,34 +44,23 @@ class ManifestFileManager:
         self.primary_keys_fields = self.table.primary_keys_fields
         self.trimmed_primary_keys_fields = 
self.table.trimmed_primary_keys_fields
 
-    def read_entries_parallel(self, manifest_files: List[ManifestFileMeta], 
manifest_file_filter=None,
-                              manifest_entry_filter=None, drop_stats=True, 
max_workers=8) -> List[ManifestEntry]:
-
-        def _process_single_manifest(manifest_file: ManifestFileMeta) -> 
Tuple[List[ManifestEntry], Set[tuple]]:
-            local_added = []
-            local_deleted_keys = set()
-            if manifest_file_filter and not 
manifest_file_filter(manifest_file):
-                return local_added, local_deleted_keys
-            manifest_entries = self.read(manifest_file.file_name, 
manifest_entry_filter, drop_stats)
-            for entry in manifest_entries:
-                if entry.kind == 0:
-                    local_added.append(entry)
-                else:
-                    key = (tuple(entry.partition.values), entry.bucket, 
entry.file.file_name)
-                    local_deleted_keys.add(key)
-            local_final_added = [
-                entry for entry in local_added
-                if (tuple(entry.partition.values), entry.bucket, 
entry.file.file_name) not in local_deleted_keys
-            ]
-            return local_final_added, local_deleted_keys
+    def read_entries_parallel(self, manifest_files: List[ManifestFileMeta], 
manifest_entry_filter=None,
+                              drop_stats=True, max_workers=8) -> 
List[ManifestEntry]:
+
+        def _process_single_manifest(manifest_file: ManifestFileMeta) -> 
List[ManifestEntry]:
+            return self.read(manifest_file.file_name, manifest_entry_filter, 
drop_stats)
 
         deleted_entry_keys = set()
         added_entries = []
         with ThreadPoolExecutor(max_workers=max_workers) as executor:
             future_results = executor.map(_process_single_manifest, 
manifest_files)
-            for added, deleted_keys in future_results:
-                added_entries.extend(added)
-                deleted_entry_keys.update(deleted_keys)
+            for entries in future_results:
+                for entry in entries:
+                    if entry.kind == 0:
+                        added_entries.append(entry)
+                    else:
+                        key = (tuple(entry.partition.values), entry.bucket, 
entry.file.file_name)
+                        deleted_entry_keys.add(key)
 
         final_entries = [
             entry for entry in added_entries
diff --git a/paimon-python/pypaimon/manifest/simple_stats_evolutions.py 
b/paimon-python/pypaimon/manifest/simple_stats_evolutions.py
index 373e333cd9..0b99acab21 100644
--- a/paimon-python/pypaimon/manifest/simple_stats_evolutions.py
+++ b/paimon-python/pypaimon/manifest/simple_stats_evolutions.py
@@ -53,7 +53,8 @@ class SimpleStatsEvolutions:
         self.evolutions[data_schema_id] = evolution
         return evolution
 
-    def _create_index_cast_mapping(self, table_fields: List[DataField],
+    @staticmethod
+    def _create_index_cast_mapping(table_fields: List[DataField],
                                    data_fields: List[DataField]) -> Dict[str, 
Optional[List[int]]]:
         """
         Create index and cast mapping between table fields and data fields.
diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py 
b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
index 1b21619309..cacf3ce343 100644
--- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
@@ -96,10 +96,14 @@ class FullStartingScanner(StartingScanner):
         if not latest_snapshot:
             return []
         manifest_files = self.manifest_list_manager.read_all(latest_snapshot)
+        return self.read_manifest_entries(manifest_files)
 
-        max_workers = 
int(self.table.options.get(CoreOptions.MANIFEST_READ_THREADS, (os.cpu_count() 
or 4) * 2))
+    def read_manifest_entries(self, manifest_files: List[ManifestFileMeta]) -> 
List[ManifestEntry]:
+        max_workers = 
int(self.table.options.get(CoreOptions.SCAN_MANIFEST_PARALLELISM, 
os.cpu_count() or 8))
+        if max_workers < 8:
+            max_workers = 8
+        manifest_files = [entry for entry in manifest_files if 
self._filter_manifest_file(entry)]
         return self.manifest_file_manager.read_entries_parallel(manifest_files,
-                                                                
self._filter_manifest_file,
                                                                 
self._filter_manifest_entry,
                                                                 
max_workers=max_workers)
 
diff --git 
a/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py 
b/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py
index 0aa1274b7e..56c186188a 100644
--- a/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py
@@ -15,10 +15,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 
or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 """
-import os
 from typing import List, Optional
 
-from pypaimon.common.core_options import CoreOptions
 from pypaimon.common.predicate import Predicate
 from pypaimon.manifest.schema.manifest_entry import ManifestEntry
 from pypaimon.read.scanner.full_starting_scanner import FullStartingScanner
@@ -43,13 +41,9 @@ class IncrementalStartingScanner(FullStartingScanner):
         file_entries = []
 
         for snapshot in snapshots_in_range:
-            # Get manifest files for this snapshot
             manifest_files = self.manifest_list_manager.read_delta(snapshot)
-            max_workers = 
int(self.table.options.get(CoreOptions.MANIFEST_READ_THREADS, (os.cpu_count() 
or 4) * 2))
-            
file_entries.extend(self.manifest_file_manager.read_entries_parallel(manifest_files,
-                                                                               
  self._filter_manifest_file,
-                                                                               
  self._filter_manifest_entry,
-                                                                               
  max_workers=max_workers))
+            entries = self.read_manifest_entries(manifest_files)
+            file_entries.extend(entries)
         return file_entries
 
     @staticmethod

Reply via email to