This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 08ff9ba223f83511f244e6097e66ae74cb75d718 Author: JingsongLi <[email protected]> AuthorDate: Wed Oct 22 16:01:40 2025 +0800 [Python] max_workers at least 8 for manifest_file_manager.read_entries_paralle --- paimon-python/pypaimon/common/core_options.py | 2 +- .../pypaimon/manifest/manifest_file_manager.py | 37 ++++++++-------------- .../pypaimon/manifest/simple_stats_evolutions.py | 3 +- .../pypaimon/read/scanner/full_starting_scanner.py | 8 +++-- .../read/scanner/incremental_starting_scanner.py | 10 ++---- 5 files changed, 24 insertions(+), 36 deletions(-) diff --git a/paimon-python/pypaimon/common/core_options.py b/paimon-python/pypaimon/common/core_options.py index be7eff98d2..aab339e269 100644 --- a/paimon-python/pypaimon/common/core_options.py +++ b/paimon-python/pypaimon/common/core_options.py @@ -33,7 +33,7 @@ class CoreOptions(str, Enum): BUCKET = "bucket" BUCKET_KEY = "bucket-key" WAREHOUSE = "warehouse" - MANIFEST_READ_THREADS = "manifest_read_threads" + SCAN_MANIFEST_PARALLELISM = "scan.manifest.parallelism" # File format options FILE_FORMAT = "file.format" FILE_FORMAT_ORC = "orc" diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index 98180e7b41..b635f9e49c 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -17,7 +17,7 @@ ################################################################################ from concurrent.futures import ThreadPoolExecutor from io import BytesIO -from typing import List, Tuple, Set +from typing import List import fastavro @@ -44,34 +44,23 @@ class ManifestFileManager: self.primary_keys_fields = self.table.primary_keys_fields self.trimmed_primary_keys_fields = self.table.trimmed_primary_keys_fields - def read_entries_parallel(self, manifest_files: List[ManifestFileMeta], manifest_file_filter=None, - manifest_entry_filter=None, drop_stats=True, max_workers=8) -> List[ManifestEntry]: - - def _process_single_manifest(manifest_file: ManifestFileMeta) -> Tuple[List[ManifestEntry], Set[tuple]]: - local_added = [] - local_deleted_keys = set() - if manifest_file_filter and not manifest_file_filter(manifest_file): - return local_added, local_deleted_keys - manifest_entries = self.read(manifest_file.file_name, manifest_entry_filter, drop_stats) - for entry in manifest_entries: - if entry.kind == 0: - local_added.append(entry) - else: - key = (tuple(entry.partition.values), entry.bucket, entry.file.file_name) - local_deleted_keys.add(key) - local_final_added = [ - entry for entry in local_added - if (tuple(entry.partition.values), entry.bucket, entry.file.file_name) not in local_deleted_keys - ] - return local_final_added, local_deleted_keys + def read_entries_parallel(self, manifest_files: List[ManifestFileMeta], manifest_entry_filter=None, + drop_stats=True, max_workers=8) -> List[ManifestEntry]: + + def _process_single_manifest(manifest_file: ManifestFileMeta) -> List[ManifestEntry]: + return self.read(manifest_file.file_name, manifest_entry_filter, drop_stats) deleted_entry_keys = set() added_entries = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: future_results = executor.map(_process_single_manifest, manifest_files) - for added, deleted_keys in future_results: - added_entries.extend(added) - deleted_entry_keys.update(deleted_keys) + for entries in future_results: + for entry in entries: + if entry.kind == 0: + added_entries.append(entry) + else: + key = (tuple(entry.partition.values), entry.bucket, entry.file.file_name) + deleted_entry_keys.add(key) final_entries = [ entry for entry in added_entries diff --git a/paimon-python/pypaimon/manifest/simple_stats_evolutions.py b/paimon-python/pypaimon/manifest/simple_stats_evolutions.py index 373e333cd9..0b99acab21 100644 --- a/paimon-python/pypaimon/manifest/simple_stats_evolutions.py +++ b/paimon-python/pypaimon/manifest/simple_stats_evolutions.py @@ -53,7 +53,8 @@ class SimpleStatsEvolutions: self.evolutions[data_schema_id] = evolution return evolution - def _create_index_cast_mapping(self, table_fields: List[DataField], + @staticmethod + def _create_index_cast_mapping(table_fields: List[DataField], data_fields: List[DataField]) -> Dict[str, Optional[List[int]]]: """ Create index and cast mapping between table fields and data fields. diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py index 1b21619309..cacf3ce343 100644 --- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py +++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py @@ -96,10 +96,14 @@ class FullStartingScanner(StartingScanner): if not latest_snapshot: return [] manifest_files = self.manifest_list_manager.read_all(latest_snapshot) + return self.read_manifest_entries(manifest_files) - max_workers = int(self.table.options.get(CoreOptions.MANIFEST_READ_THREADS, (os.cpu_count() or 4) * 2)) + def read_manifest_entries(self, manifest_files: List[ManifestFileMeta]) -> List[ManifestEntry]: + max_workers = int(self.table.options.get(CoreOptions.SCAN_MANIFEST_PARALLELISM, os.cpu_count() or 8)) + if max_workers < 8: + max_workers = 8 + manifest_files = [entry for entry in manifest_files if self._filter_manifest_file(entry)] return self.manifest_file_manager.read_entries_parallel(manifest_files, - self._filter_manifest_file, self._filter_manifest_entry, max_workers=max_workers) diff --git a/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py b/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py index 0aa1274b7e..56c186188a 100644 --- a/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py +++ b/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py @@ -15,10 +15,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ -import os from typing import List, Optional -from pypaimon.common.core_options import CoreOptions from pypaimon.common.predicate import Predicate from pypaimon.manifest.schema.manifest_entry import ManifestEntry from pypaimon.read.scanner.full_starting_scanner import FullStartingScanner @@ -43,13 +41,9 @@ class IncrementalStartingScanner(FullStartingScanner): file_entries = [] for snapshot in snapshots_in_range: - # Get manifest files for this snapshot manifest_files = self.manifest_list_manager.read_delta(snapshot) - max_workers = int(self.table.options.get(CoreOptions.MANIFEST_READ_THREADS, (os.cpu_count() or 4) * 2)) - file_entries.extend(self.manifest_file_manager.read_entries_parallel(manifest_files, - self._filter_manifest_file, - self._filter_manifest_entry, - max_workers=max_workers)) + entries = self.read_manifest_entries(manifest_files) + file_entries.extend(entries) return file_entries @staticmethod
