This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new dff6fa2c10 [python] refactor FullStartingScanner to append and pk
split generator (#7042)
dff6fa2c10 is described below
commit dff6fa2c1057e705d30c1e8bcb852709b75a3f17
Author: jerry <[email protected]>
AuthorDate: Wed Jan 14 23:25:56 2026 +0800
[python] refactor FullStartingScanner to append and pk split generator
(#7042)
---
.../pypaimon/globalindex/indexed_split.py | 5 -
.../read/scanner/append_table_split_generator.py | 173 ++++++
.../read/scanner/data_evolution_split_generator.py | 396 +++++++++++++
.../pypaimon/read/scanner/full_starting_scanner.py | 637 ++-------------------
.../scanner/primary_key_table_split_generator.py | 126 ++++
.../pypaimon/read/scanner/split_generator.py | 246 ++++++++
paimon-python/pypaimon/read/sliced_split.py | 110 ++++
paimon-python/pypaimon/read/split.py | 4 +-
paimon-python/pypaimon/read/split_read.py | 17 +-
9 files changed, 1117 insertions(+), 597 deletions(-)
diff --git a/paimon-python/pypaimon/globalindex/indexed_split.py
b/paimon-python/pypaimon/globalindex/indexed_split.py
index 6cdee70b41..0203923fc7 100644
--- a/paimon-python/pypaimon/globalindex/indexed_split.py
+++ b/paimon-python/pypaimon/globalindex/indexed_split.py
@@ -88,11 +88,6 @@ class IndexedSplit(SplitBase):
"""Delegate to data_split."""
return self._data_split.file_size
- @property
- def shard_file_idx_map(self):
- """Delegate to data_split."""
- return self._data_split.shard_file_idx_map
-
@property
def raw_convertible(self):
"""Delegate to data_split."""
diff --git
a/paimon-python/pypaimon/read/scanner/append_table_split_generator.py
b/paimon-python/pypaimon/read/scanner/append_table_split_generator.py
new file mode 100644
index 0000000000..775771eed1
--- /dev/null
+++ b/paimon-python/pypaimon/read/scanner/append_table_split_generator.py
@@ -0,0 +1,173 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+from collections import defaultdict
+from typing import List, Dict, Tuple
+
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+from pypaimon.manifest.schema.manifest_entry import ManifestEntry
+from pypaimon.read.scanner.split_generator import AbstractSplitGenerator
+from pypaimon.read.split import Split
+from pypaimon.read.sliced_split import SlicedSplit
+
+
+class AppendTableSplitGenerator(AbstractSplitGenerator):
+ """
+ Split generator for append-only tables.
+ """
+
+ def create_splits(self, file_entries: List[ManifestEntry]) -> List[Split]:
+ partitioned_files = defaultdict(list)
+ for entry in file_entries:
+ partitioned_files[(tuple(entry.partition.values),
entry.bucket)].append(entry)
+
+ plan_start_pos = 0
+ plan_end_pos = 0
+
+ if self.start_pos_of_this_subtask is not None:
+ # shard data range: [plan_start_pos, plan_end_pos)
+ partitioned_files, plan_start_pos, plan_end_pos = \
+ self.__filter_by_slice(
+ partitioned_files,
+ self.start_pos_of_this_subtask,
+ self.end_pos_of_this_subtask
+ )
+ elif self.idx_of_this_subtask is not None:
+ partitioned_files, plan_start_pos, plan_end_pos =
self._filter_by_shard(partitioned_files)
+
+ def weight_func(f: DataFileMeta) -> int:
+ return max(f.file_size, self.open_file_cost)
+
+ splits = []
+ for key, file_entries_list in partitioned_files.items():
+ if not file_entries_list:
+ continue
+
+ data_files: List[DataFileMeta] = [e.file for e in
file_entries_list]
+
+ packed_files: List[List[DataFileMeta]] = self._pack_for_ordered(
+ data_files, weight_func, self.target_split_size
+ )
+ splits += self._build_split_from_pack(
+ packed_files, file_entries_list, False
+ )
+
+ if self.start_pos_of_this_subtask is not None or
self.idx_of_this_subtask is not None:
+ splits = self._wrap_to_sliced_splits(splits, plan_start_pos,
plan_end_pos)
+
+ return splits
+
+ def _wrap_to_sliced_splits(self, splits: List[Split], plan_start_pos: int,
plan_end_pos: int) -> List[Split]:
+ sliced_splits = []
+ file_end_pos = 0 # end row position of current file in all splits data
+
+ for split in splits:
+ shard_file_idx_map = self.__compute_split_file_idx_map(
+ plan_start_pos, plan_end_pos, split, file_end_pos
+ )
+ file_end_pos = shard_file_idx_map[self.NEXT_POS_KEY]
+ del shard_file_idx_map[self.NEXT_POS_KEY]
+
+ if shard_file_idx_map:
+ sliced_splits.append(SlicedSplit(split, shard_file_idx_map))
+ else:
+ sliced_splits.append(split)
+
+ return sliced_splits
+
+ @staticmethod
+ def __filter_by_slice(
+ partitioned_files: defaultdict,
+ start_pos: int,
+ end_pos: int
+ ) -> tuple:
+ plan_start_pos = 0
+ plan_end_pos = 0
+ entry_end_pos = 0 # end row position of current file in all data
+ splits_start_pos = 0
+ filtered_partitioned_files = defaultdict(list)
+
+ # Iterate through all file entries to find files that overlap with
current shard range
+ for key, file_entries in partitioned_files.items():
+ filtered_entries = []
+ for entry in file_entries:
+ entry_begin_pos = entry_end_pos # Starting row position of
current file in all data
+ entry_end_pos += entry.file.row_count # Update to row
position after current file
+
+ # If current file is completely after shard range, stop
iteration
+ if entry_begin_pos >= end_pos:
+ break
+ # If current file is completely before shard range, skip it
+ if entry_end_pos <= start_pos:
+ continue
+ if entry_begin_pos <= start_pos < entry_end_pos:
+ splits_start_pos = entry_begin_pos
+ plan_start_pos = start_pos - entry_begin_pos
+ # If shard end position is within current file, record
relative end position
+ if entry_begin_pos < end_pos <= entry_end_pos:
+ plan_end_pos = end_pos - splits_start_pos
+ # Add files that overlap with shard range to result
+ filtered_entries.append(entry)
+ if filtered_entries:
+ filtered_partitioned_files[key] = filtered_entries
+
+ return filtered_partitioned_files, plan_start_pos, plan_end_pos
+
+ def _filter_by_shard(self, partitioned_files: defaultdict) -> tuple:
+ """
+ Filter file entries by shard. Only keep the files within the range,
which means
+ that only the starting and ending files need to be further divided
subsequently.
+ """
+ # Calculate total rows
+ total_row = sum(
+ entry.file.row_count
+ for file_entries in partitioned_files.values()
+ for entry in file_entries
+ )
+
+ # Calculate shard range using shared helper
+ start_pos, end_pos = self._compute_shard_range(total_row)
+
+ return self.__filter_by_slice(partitioned_files, start_pos, end_pos)
+
+ @staticmethod
+ def __compute_split_file_idx_map(
+ plan_start_pos: int,
+ plan_end_pos: int,
+ split: Split,
+ file_end_pos: int
+ ) -> Dict[str, Tuple[int, int]]:
+ """
+ Compute file index map for a split, determining which rows to read
from each file.
+
+ """
+ shard_file_idx_map = {}
+
+ for file in split.files:
+ file_begin_pos = file_end_pos # Starting row position of current
file in all data
+ file_end_pos += file.row_count # Update to row position after
current file
+
+ # Use shared helper to compute file range
+ file_range = AppendTableSplitGenerator._compute_file_range(
+ plan_start_pos, plan_end_pos, file_begin_pos, file.row_count
+ )
+
+ if file_range is not None:
+ shard_file_idx_map[file.file_name] = file_range
+
+ shard_file_idx_map[AppendTableSplitGenerator.NEXT_POS_KEY] =
file_end_pos
+ return shard_file_idx_map
diff --git
a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py
b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py
new file mode 100644
index 0000000000..10847b12b2
--- /dev/null
+++ b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py
@@ -0,0 +1,396 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+from collections import defaultdict
+from typing import List, Optional, Dict, Tuple
+
+from pypaimon.globalindex.indexed_split import IndexedSplit
+from pypaimon.globalindex.range import Range
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+from pypaimon.manifest.schema.manifest_entry import ManifestEntry
+from pypaimon.read.scanner.split_generator import AbstractSplitGenerator
+from pypaimon.read.split import Split
+from pypaimon.read.sliced_split import SlicedSplit
+
+
+class DataEvolutionSplitGenerator(AbstractSplitGenerator):
+ """
+ Split generator for data evolution tables.
+ """
+
+ def __init__(
+ self,
+ table,
+ target_split_size: int,
+ open_file_cost: int,
+ deletion_files_map=None,
+ row_ranges: Optional[List] = None,
+ score_getter=None
+ ):
+ super().__init__(table, target_split_size, open_file_cost,
deletion_files_map)
+ self.row_ranges = row_ranges
+ self.score_getter = score_getter
+
+ def create_splits(self, file_entries: List[ManifestEntry]) -> List[Split]:
+ """
+ Create splits for data evolution tables.
+ """
+ def sort_key(manifest_entry: ManifestEntry) -> tuple:
+ first_row_id = (
+ manifest_entry.file.first_row_id
+ if manifest_entry.file.first_row_id is not None
+ else float('-inf')
+ )
+ is_blob = 1 if self._is_blob_file(manifest_entry.file.file_name)
else 0
+ max_seq = manifest_entry.file.max_sequence_number
+ return first_row_id, is_blob, -max_seq
+
+ sorted_entries = sorted(file_entries, key=sort_key)
+
+ partitioned_files = defaultdict(list)
+ for entry in sorted_entries:
+ partitioned_files[(tuple(entry.partition.values),
entry.bucket)].append(entry)
+
+ plan_start_pos = 0
+ plan_end_pos = 0
+
+ if self.start_pos_of_this_subtask is not None:
+ # shard data range: [plan_start_pos, plan_end_pos)
+ partitioned_files, plan_start_pos, plan_end_pos = \
+ self._filter_by_row_range(
+ partitioned_files,
+ self.start_pos_of_this_subtask,
+ self.end_pos_of_this_subtask
+ )
+ elif self.idx_of_this_subtask is not None:
+ # shard data range: [plan_start_pos, plan_end_pos)
+ partitioned_files, plan_start_pos, plan_end_pos =
self._filter_by_shard(partitioned_files)
+
+ def weight_func(file_list: List[DataFileMeta]) -> int:
+ return max(sum(f.file_size for f in file_list),
self.open_file_cost)
+
+ splits = []
+ for key, sorted_entries_list in partitioned_files.items():
+ if not sorted_entries_list:
+ continue
+
+ data_files: List[DataFileMeta] = [e.file for e in
sorted_entries_list]
+
+ # Split files by firstRowId for data evolution
+ split_by_row_id = self._split_by_row_id(data_files)
+
+ # Pack the split groups for optimal split sizes
+ packed_files = self._pack_for_ordered(
+ split_by_row_id, weight_func, self.target_split_size
+ )
+
+ # Flatten the packed files and build splits
+ flatten_packed_files: List[List[DataFileMeta]] = [
+ [file for sub_pack in pack for file in sub_pack]
+ for pack in packed_files
+ ]
+
+ splits += self._build_split_from_pack(
+ flatten_packed_files, sorted_entries_list, False
+ )
+
+ if self.start_pos_of_this_subtask is not None or
self.idx_of_this_subtask is not None:
+ splits = self._wrap_to_sliced_splits(splits, plan_start_pos,
plan_end_pos)
+
+ # Wrap splits with IndexedSplit if row_ranges is provided
+ if self.row_ranges:
+ splits = self._wrap_to_indexed_splits(splits)
+
+ return splits
+
+ def _wrap_to_sliced_splits(self, splits: List[Split], plan_start_pos: int,
plan_end_pos: int) -> List[Split]:
+ """
+ Wrap splits with SlicedSplit to add file-level slicing information.
+ """
+ sliced_splits = []
+ file_end_pos = 0 # end row position of current file in all splits data
+
+ for split in splits:
+ # Compute file index map for both data and blob files
+ # Blob files share the same row position tracking as data files
+ shard_file_idx_map = self._compute_split_file_idx_map(
+ plan_start_pos, plan_end_pos, split, file_end_pos
+ )
+ file_end_pos = shard_file_idx_map[self.NEXT_POS_KEY]
+ del shard_file_idx_map[self.NEXT_POS_KEY]
+
+ if shard_file_idx_map:
+ sliced_splits.append(SlicedSplit(split, shard_file_idx_map))
+ else:
+ sliced_splits.append(split)
+
+ return sliced_splits
+
+ def _filter_by_row_range(
+ self,
+ partitioned_files: defaultdict,
+ start_pos: int,
+ end_pos: int
+ ) -> tuple:
+ """
+ Filter file entries by row range for data evolution tables.
+ """
+ plan_start_pos = 0
+ plan_end_pos = 0
+ entry_end_pos = 0 # end row position of current file in all data
+ splits_start_pos = 0
+ filtered_partitioned_files = defaultdict(list)
+
+ # Iterate through all file entries to find files that overlap with
current shard range
+ for key, file_entries in partitioned_files.items():
+ filtered_entries = []
+ blob_added = False # If it is true, all blobs corresponding to
this data file are added
+ for entry in file_entries:
+ if self._is_blob_file(entry.file.file_name):
+ if blob_added:
+ filtered_entries.append(entry)
+ continue
+ blob_added = False
+ entry_begin_pos = entry_end_pos # Starting row position of
current file in all data
+ entry_end_pos += entry.file.row_count # Update to row
position after current file
+
+ # If current file is completely after shard range, stop
iteration
+ if entry_begin_pos >= end_pos:
+ break
+ # If current file is completely before shard range, skip it
+ if entry_end_pos <= start_pos:
+ continue
+ if entry_begin_pos <= start_pos < entry_end_pos:
+ splits_start_pos = entry_begin_pos
+ plan_start_pos = start_pos - entry_begin_pos
+ # If shard end position is within current file, record
relative end position
+ if entry_begin_pos < end_pos <= entry_end_pos:
+ plan_end_pos = end_pos - splits_start_pos
+ # Add files that overlap with shard range to result
+ filtered_entries.append(entry)
+ blob_added = True
+ if filtered_entries:
+ filtered_partitioned_files[key] = filtered_entries
+
+ return filtered_partitioned_files, plan_start_pos, plan_end_pos
+
+ def _filter_by_shard(self, partitioned_files: defaultdict) -> tuple:
+ """
+ Filter file entries by shard for data evolution tables.
+ """
+ # Calculate total rows (excluding blob files)
+ total_row = sum(
+ entry.file.row_count
+ for file_entries in partitioned_files.values()
+ for entry in file_entries
+ if not self._is_blob_file(entry.file.file_name)
+ )
+
+ # Calculate shard range using shared helper
+ start_pos, end_pos = self._compute_shard_range(total_row)
+
+ return self._filter_by_row_range(partitioned_files, start_pos, end_pos)
+
+ def _split_by_row_id(self, files: List[DataFileMeta]) ->
List[List[DataFileMeta]]:
+ """
+ Split files by row ID for data evolution tables.
+ """
+ split_by_row_id = []
+
+ # Filter blob files to only include those within the row ID range of
non-blob files
+ sorted_files = self._filter_blob(files)
+
+ # Split files by firstRowId
+ last_row_id = -1
+ check_row_id_start = 0
+ current_split = []
+
+ for file in sorted_files:
+ first_row_id = file.first_row_id
+ if first_row_id is None:
+ # Files without firstRowId are treated as individual splits
+ split_by_row_id.append([file])
+ continue
+
+ if not self._is_blob_file(file.file_name) and first_row_id !=
last_row_id:
+ if current_split:
+ split_by_row_id.append(current_split)
+
+ # Validate that files don't overlap
+ if first_row_id < check_row_id_start:
+ file_names = [f.file_name for f in sorted_files]
+ raise ValueError(
+ f"There are overlapping files in the split:
{file_names}, "
+ f"the wrong file is: {file.file_name}"
+ )
+
+ current_split = []
+ last_row_id = first_row_id
+ check_row_id_start = first_row_id + file.row_count
+
+ current_split.append(file)
+
+ if current_split:
+ split_by_row_id.append(current_split)
+
+ return split_by_row_id
+
+ def _compute_split_file_idx_map(
+ self,
+ plan_start_pos: int,
+ plan_end_pos: int,
+ split: Split,
+ file_end_pos: int
+ ) -> Dict[str, Tuple[int, int]]:
+ """
+ Compute file index map for a split, determining which rows to read
from each file.
+ For data files, the range is calculated based on the file's position
in the cumulative row space.
+ For blob files (which may be rolled), the range is calculated based on
each file's first_row_id.
+ """
+ shard_file_idx_map = {}
+
+ # Find the first non-blob file to determine the row range for this
split
+ data_file = None
+ for file in split.files:
+ if not self._is_blob_file(file.file_name):
+ data_file = file
+ break
+
+ if data_file is None:
+ # No data file, skip this split
+ shard_file_idx_map[self.NEXT_POS_KEY] = file_end_pos
+ return shard_file_idx_map
+
+ # Calculate the row range based on the data file position
+ file_begin_pos = file_end_pos
+ file_end_pos += data_file.row_count
+ data_file_first_row_id = data_file.first_row_id if
data_file.first_row_id is not None else 0
+
+ # Determine the row range for the data file in this split using shared
helper
+ data_file_range = self._compute_file_range(
+ plan_start_pos, plan_end_pos, file_begin_pos, data_file.row_count
+ )
+
+ # Apply ranges to each file in the split
+ for file in split.files:
+ if self._is_blob_file(file.file_name):
+ # For blob files, calculate range based on their first_row_id
+ if data_file_range is None:
+ # Data file is completely within shard, blob files should
also be
+ continue
+ elif data_file_range == (-1, -1):
+ # Data file is completely outside shard, blob files should
be skipped
+ shard_file_idx_map[file.file_name] = (-1, -1)
+ else:
+ # Calculate blob file's position relative to data file's
first_row_id
+ blob_first_row_id = file.first_row_id if file.first_row_id
is not None else 0
+ # Blob's position relative to data file start
+ blob_rel_start = blob_first_row_id - data_file_first_row_id
+ blob_rel_end = blob_rel_start + file.row_count
+
+ # Shard range relative to data file start
+ shard_start = data_file_range[0]
+ shard_end = data_file_range[1]
+
+ # Intersect blob's range with shard range
+ intersect_start = max(blob_rel_start, shard_start)
+ intersect_end = min(blob_rel_end, shard_end)
+
+ if intersect_start >= intersect_end:
+ # Blob file is completely outside shard range
+ shard_file_idx_map[file.file_name] = (-1, -1)
+ elif intersect_start == blob_rel_start and intersect_end
== blob_rel_end:
+ # Blob file is completely within shard range, no
slicing needed
+ pass
+ else:
+ # Convert to file-local indices
+ local_start = intersect_start - blob_rel_start
+ local_end = intersect_end - blob_rel_start
+ shard_file_idx_map[file.file_name] = (local_start,
local_end)
+ else:
+ # Data file
+ if data_file_range is not None:
+ shard_file_idx_map[file.file_name] = data_file_range
+
+ shard_file_idx_map[self.NEXT_POS_KEY] = file_end_pos
+ return shard_file_idx_map
+
+ def _wrap_to_indexed_splits(self, splits: List[Split]) -> List[Split]:
+ """
+ Wrap splits with IndexedSplit for row range filtering.
+ """
+ indexed_splits = []
+ for split in splits:
+ # Calculate file ranges for this split
+ file_ranges = []
+ for file in split.files:
+ first_row_id = file.first_row_id
+ if first_row_id is not None:
+ file_ranges.append(Range(
+ first_row_id,
+ first_row_id + file.row_count - 1
+ ))
+
+ if not file_ranges:
+ # No row IDs, keep original split
+ indexed_splits.append(split)
+ continue
+
+ # Merge file ranges
+ file_ranges = Range.merge_sorted_as_possible(file_ranges)
+
+ # Intersect with row_ranges from global index
+ expected = Range.and_(file_ranges, self.row_ranges)
+
+ if not expected:
+ # No intersection, skip this split
+ continue
+
+ # Create scores array if score_getter is provided
+ scores = None
+ if self.score_getter is not None:
+ scores = []
+ for r in expected:
+ for row_id in range(r.from_, r.to + 1):
+ score = self.score_getter(row_id)
+ scores.append(score if score is not None else 0.0)
+
+ indexed_splits.append(IndexedSplit(split, expected, scores))
+
+ return indexed_splits
+
+ @staticmethod
+ def _filter_blob(files: List[DataFileMeta]) -> List[DataFileMeta]:
+ """
+ Filter blob files to only include those within row ID range of
non-blob files.
+ """
+ result = []
+ row_id_start = -1
+ row_id_end = -1
+
+ for file in files:
+ if not DataEvolutionSplitGenerator._is_blob_file(file.file_name):
+ if file.first_row_id is not None:
+ row_id_start = file.first_row_id
+ row_id_end = file.first_row_id + file.row_count
+ result.append(file)
+ else:
+ if file.first_row_id is not None and row_id_start != -1:
+ if row_id_start <= file.first_row_id < row_id_end:
+ result.append(file)
+
+ return result
diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
index 3d07d0b6cd..e4fdb5d16b 100755
--- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
@@ -16,29 +16,26 @@ See the License for the specific language governing
permissions and
limitations under the License.
"""
import os
-from collections import defaultdict
-from typing import Callable, List, Optional, Dict, Set
+from typing import List, Optional, Dict, Set
from pypaimon.common.predicate import Predicate
-from pypaimon.globalindex import VectorSearchGlobalIndexResult, Range
-from pypaimon.globalindex.indexed_split import IndexedSplit
+from pypaimon.globalindex import VectorSearchGlobalIndexResult
from pypaimon.table.source.deletion_file import DeletionFile
-from pypaimon.table.row.generic_row import GenericRow
from pypaimon.manifest.index_manifest_file import IndexManifestFile
from pypaimon.manifest.manifest_file_manager import ManifestFileManager
from pypaimon.manifest.manifest_list_manager import ManifestListManager
-from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.manifest.schema.manifest_entry import ManifestEntry
from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta
-from pypaimon.read.interval_partition import IntervalPartition, SortedRun
from pypaimon.read.plan import Plan
from pypaimon.read.push_down_utils import (trim_and_transform_predicate)
+from pypaimon.read.scanner.append_table_split_generator import
AppendTableSplitGenerator
+from pypaimon.read.scanner.data_evolution_split_generator import
DataEvolutionSplitGenerator
+from pypaimon.read.scanner.primary_key_table_split_generator import
PrimaryKeyTableSplitGenerator
from pypaimon.read.scanner.starting_scanner import StartingScanner
from pypaimon.read.split import Split
from pypaimon.snapshot.snapshot_manager import SnapshotManager
from pypaimon.table.bucket_mode import BucketMode
from pypaimon.manifest.simple_stats_evolutions import SimpleStatsEvolutions
-from pypaimon.common.options.core_options import MergeEngine
class FullStartingScanner(StartingScanner):
@@ -75,7 +72,7 @@ class FullStartingScanner(StartingScanner):
self.start_pos_of_this_subtask = None
self.end_pos_of_this_subtask = None
- self.only_read_real_buckets = True if options.bucket() ==
BucketMode.POSTPONE_BUCKET.value else False
+ self.only_read_real_buckets = options.bucket() ==
BucketMode.POSTPONE_BUCKET.value
self.data_evolution = options.data_evolution_enabled()
self.deletion_vectors_enabled = options.deletion_vectors_enabled()
@@ -91,17 +88,6 @@ class FullStartingScanner(StartingScanner):
file_entries = self.plan_files()
if not file_entries:
return Plan([])
-
- # Evaluate global index if enabled (for data evolution tables)
- row_ranges = None
- score_getter = None
- if self.data_evolution:
- global_index_result = self._eval_global_index()
- if global_index_result is not None:
- row_ranges = global_index_result.results().to_range_list()
- if isinstance(global_index_result,
VectorSearchGlobalIndexResult):
- score_getter = global_index_result.score_getter()
-
# Get deletion files map if deletion vectors are enabled.
# {partition-bucket -> {filename -> DeletionFile}}
deletion_files_map: dict[tuple, dict[str, DeletionFile]] = {}
@@ -113,14 +99,46 @@ class FullStartingScanner(StartingScanner):
buckets.add((tuple(entry.partition.values), entry.bucket))
deletion_files_map = self._scan_dv_index(latest_snapshot, buckets)
+ # Create appropriate split generator based on table type
if self.table.is_primary_key_table:
- splits = self._create_primary_key_splits(file_entries,
deletion_files_map)
+ split_generator = PrimaryKeyTableSplitGenerator(
+ self.table,
+ self.target_split_size,
+ self.open_file_cost,
+ deletion_files_map
+ )
elif self.data_evolution:
- splits = self._create_data_evolution_splits(
- file_entries, deletion_files_map, row_ranges, score_getter
+ global_index_result = self._eval_global_index()
+ row_ranges = None
+ score_getter = None
+ if global_index_result is not None:
+ row_ranges = global_index_result.results().to_range_list()
+ if isinstance(global_index_result,
VectorSearchGlobalIndexResult):
+ score_getter = global_index_result.score_getter()
+ split_generator = DataEvolutionSplitGenerator(
+ self.table,
+ self.target_split_size,
+ self.open_file_cost,
+ deletion_files_map,
+ row_ranges,
+ score_getter
)
else:
- splits = self._create_append_only_splits(file_entries,
deletion_files_map)
+ split_generator = AppendTableSplitGenerator(
+ self.table,
+ self.target_split_size,
+ self.open_file_cost,
+ deletion_files_map
+ )
+
+ # Configure sharding if needed
+ if self.idx_of_this_subtask is not None:
+ split_generator.with_shard(self.idx_of_this_subtask,
self.number_of_para_subtasks)
+ elif self.start_pos_of_this_subtask is not None:
+ split_generator.with_slice(self.start_pos_of_this_subtask,
self.end_pos_of_this_subtask)
+
+ # Generate splits
+ splits = split_generator.create_splits(file_entries)
splits = self._apply_push_down_limit(splits)
return Plan(splits)
@@ -196,218 +214,32 @@ class FullStartingScanner(StartingScanner):
return result
def read_manifest_entries(self, manifest_files: List[ManifestFileMeta]) ->
List[ManifestEntry]:
- max_workers =
self.table.options.scan_manifest_parallelism(os.cpu_count() or 8)
- if max_workers < 8:
- max_workers = 8
+ max_workers = max(8,
self.table.options.scan_manifest_parallelism(os.cpu_count() or 8))
manifest_files = [entry for entry in manifest_files if
self._filter_manifest_file(entry)]
- return self.manifest_file_manager.read_entries_parallel(manifest_files,
-
self._filter_manifest_entry,
-
max_workers=max_workers)
+ return self.manifest_file_manager.read_entries_parallel(
+ manifest_files,
+ self._filter_manifest_entry,
+ max_workers=max_workers
+ )
- def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) ->
'FullStartingScanner':
+ def with_shard(self, idx_of_this_subtask: int, number_of_para_subtasks:
int) -> 'FullStartingScanner':
if idx_of_this_subtask >= number_of_para_subtasks:
- raise Exception("idx_of_this_subtask must be less than
number_of_para_subtasks")
+ raise ValueError("idx_of_this_subtask must be less than
number_of_para_subtasks")
if self.start_pos_of_this_subtask is not None:
raise Exception("with_shard and with_slice cannot be used
simultaneously")
self.idx_of_this_subtask = idx_of_this_subtask
self.number_of_para_subtasks = number_of_para_subtasks
return self
- def with_slice(self, start_pos, end_pos) -> 'FullStartingScanner':
+ def with_slice(self, start_pos: int, end_pos: int) ->
'FullStartingScanner':
if start_pos >= end_pos:
- raise Exception("start_pos must be less than end_pos")
+ raise ValueError("start_pos must be less than end_pos")
if self.idx_of_this_subtask is not None:
raise Exception("with_slice and with_shard cannot be used
simultaneously")
self.start_pos_of_this_subtask = start_pos
self.end_pos_of_this_subtask = end_pos
return self
- @staticmethod
- def _append_only_filter_by_slice(partitioned_files: defaultdict,
- start_pos: int,
- end_pos: int) -> (defaultdict, int, int):
- plan_start_pos = 0
- plan_end_pos = 0
- entry_end_pos = 0 # end row position of current file in all data
- splits_start_pos = 0
- filtered_partitioned_files = defaultdict(list)
- # Iterate through all file entries to find files that overlap with
current shard range
- for key, file_entries in partitioned_files.items():
- filtered_entries = []
- for entry in file_entries:
- entry_begin_pos = entry_end_pos # Starting row position of
current file in all data
- entry_end_pos += entry.file.row_count # Update to row
position after current file
-
- # If current file is completely after shard range, stop
iteration
- if entry_begin_pos >= end_pos:
- break
- # If current file is completely before shard range, skip it
- if entry_end_pos <= start_pos:
- continue
- if entry_begin_pos <= start_pos < entry_end_pos:
- splits_start_pos = entry_begin_pos
- plan_start_pos = start_pos - entry_begin_pos
- # If shard end position is within current file, record
relative end position
- if entry_begin_pos < end_pos <= entry_end_pos:
- plan_end_pos = end_pos - splits_start_pos
- # Add files that overlap with shard range to result
- filtered_entries.append(entry)
- if filtered_entries:
- filtered_partitioned_files[key] = filtered_entries
-
- return filtered_partitioned_files, plan_start_pos, plan_end_pos
-
- def _append_only_filter_by_shard(self, partitioned_files: defaultdict) ->
(defaultdict, int, int):
- """
- Filter file entries by shard. Only keep the files within the range,
which means
- that only the starting and ending files need to be further divided
subsequently
- """
- total_row = 0
- # Sort by file creation time to ensure consistent sharding
- for key, file_entries in partitioned_files.items():
- for entry in file_entries:
- total_row += entry.file.row_count
-
- # Calculate number of rows this shard should process using balanced
distribution
- # Distribute remainder evenly among first few shards to avoid last
shard overload
- base_rows_per_shard = total_row // self.number_of_para_subtasks
- remainder = total_row % self.number_of_para_subtasks
-
- # Each of the first 'remainder' shards gets one extra row
- if self.idx_of_this_subtask < remainder:
- num_row = base_rows_per_shard + 1
- start_pos = self.idx_of_this_subtask * (base_rows_per_shard + 1)
- else:
- num_row = base_rows_per_shard
- start_pos = (remainder * (base_rows_per_shard + 1) +
- (self.idx_of_this_subtask - remainder) *
base_rows_per_shard)
-
- end_pos = start_pos + num_row
-
- return self._append_only_filter_by_slice(partitioned_files, start_pos,
end_pos)
-
- def _data_evolution_filter_by_row_range(self, partitioned_files:
defaultdict,
- start_pos: int,
- end_pos: int) -> (defaultdict,
int, int):
- plan_start_pos = 0
- plan_end_pos = 0
- entry_end_pos = 0 # end row position of current file in all data
- splits_start_pos = 0
- filtered_partitioned_files = defaultdict(list)
- # Iterate through all file entries to find files that overlap with
current shard range
- for key, file_entries in partitioned_files.items():
- filtered_entries = []
- blob_added = False # If it is true, all blobs corresponding to
this data file are added
- for entry in file_entries:
- if self._is_blob_file(entry.file.file_name):
- if blob_added:
- filtered_entries.append(entry)
- continue
- blob_added = False
- entry_begin_pos = entry_end_pos # Starting row position of
current file in all data
- entry_end_pos += entry.file.row_count # Update to row
position after current file
-
- # If current file is completely after shard range, stop
iteration
- if entry_begin_pos >= end_pos:
- break
- # If current file is completely before shard range, skip it
- if entry_end_pos <= start_pos:
- continue
- if entry_begin_pos <= start_pos < entry_end_pos:
- splits_start_pos = entry_begin_pos
- plan_start_pos = start_pos - entry_begin_pos
- # If shard end position is within current file, record
relative end position
- if entry_begin_pos < end_pos <= entry_end_pos:
- plan_end_pos = end_pos - splits_start_pos
- # Add files that overlap with shard range to result
- filtered_entries.append(entry)
- blob_added = True
- if filtered_entries:
- filtered_partitioned_files[key] = filtered_entries
-
- return filtered_partitioned_files, plan_start_pos, plan_end_pos
-
- def _data_evolution_filter_by_shard(self, partitioned_files: defaultdict)
-> (defaultdict, int, int):
- total_row = 0
- for key, file_entries in partitioned_files.items():
- for entry in file_entries:
- if not self._is_blob_file(entry.file.file_name):
- total_row += entry.file.row_count
-
- # Calculate number of rows this shard should process using balanced
distribution
- # Distribute remainder evenly among first few shards to avoid last
shard overload
- base_rows_per_shard = total_row // self.number_of_para_subtasks
- remainder = total_row % self.number_of_para_subtasks
-
- # Each of the first 'remainder' shards gets one extra row
- if self.idx_of_this_subtask < remainder:
- num_row = base_rows_per_shard + 1
- start_pos = self.idx_of_this_subtask * (base_rows_per_shard + 1)
- else:
- num_row = base_rows_per_shard
- start_pos = (remainder * (base_rows_per_shard + 1) +
- (self.idx_of_this_subtask - remainder) *
base_rows_per_shard)
-
- end_pos = start_pos + num_row
- return self._data_evolution_filter_by_row_range(partitioned_files,
start_pos, end_pos)
-
- def _compute_split_start_end_pos(self, splits: List[Split],
plan_start_pos, plan_end_pos):
- """
- Find files that needs to be divided for each split
- :param splits: splits
- :param plan_start_pos: plan begin row in all splits data
- :param plan_end_pos: plan end row in all splits data
- """
- file_end_pos = 0 # end row position of current file in all splits data
-
- for split in splits:
- cur_split_end_pos = file_end_pos
- # Compute split_file_idx_map for data files
- file_end_pos = self._compute_split_file_idx_map(plan_start_pos,
plan_end_pos,
- split,
cur_split_end_pos, False)
- # Compute split_file_idx_map for blob files
- if self.data_evolution:
- self._compute_split_file_idx_map(plan_start_pos, plan_end_pos,
- split, cur_split_end_pos,
True)
-
- def _compute_split_file_idx_map(self, plan_start_pos, plan_end_pos, split:
Split,
- file_end_pos: int, is_blob: bool = False):
- """
- Traverse all the files in current split, find the starting shard and
ending shard files,
- and add them to shard_file_idx_map;
- - for data file, only two data files will be divided in all splits.
- - for blob file, perhaps there will be some unnecessary files in
addition to two files(start and end).
- Add them to shard_file_idx_map as well, because they need to be
removed later.
- """
- row_cnt = 0
- for file in split.files:
- if not is_blob and self._is_blob_file(file.file_name):
- continue
- if is_blob and not self._is_blob_file(file.file_name):
- continue
- row_cnt += file.row_count
- file_begin_pos = file_end_pos # Starting row position of current
file in all data
- file_end_pos += file.row_count # Update to row position after
current file
- if file_begin_pos <= plan_start_pos < plan_end_pos <= file_end_pos:
- split.shard_file_idx_map[file.file_name] = (
- plan_start_pos - file_begin_pos, plan_end_pos -
file_begin_pos)
- # If shard start position is within current file, record actual
start position and relative offset
- elif file_begin_pos < plan_start_pos < file_end_pos:
- split.shard_file_idx_map[file.file_name] = (plan_start_pos -
file_begin_pos, file.row_count)
- # If shard end position is within current file, record relative
end position
- elif file_begin_pos < plan_end_pos < file_end_pos:
- split.shard_file_idx_map[file.file_name] = (0, plan_end_pos -
file_begin_pos)
- elif file_end_pos <= plan_start_pos or file_begin_pos >=
plan_end_pos:
- split.shard_file_idx_map[file.file_name] = (-1, -1)
- return file_end_pos
-
- def _primary_key_filter_by_shard(self, file_entries: List[ManifestEntry])
-> List[ManifestEntry]:
- filtered_entries = []
- for entry in file_entries:
- if entry.bucket % self.number_of_para_subtasks ==
self.idx_of_this_subtask:
- filtered_entries.append(entry)
- return filtered_entries
-
def _apply_push_down_limit(self, splits: List[Split]) -> List[Split]:
if self.limit is None:
return splits
@@ -525,368 +357,3 @@ class FullStartingScanner(StartingScanner):
deletion_files[data_file_name] = deletion_file
return deletion_files
-
- def _get_deletion_files_for_split(self, data_files: List[DataFileMeta],
- deletion_files_map: dict,
- partition: GenericRow,
- bucket: int) ->
Optional[List[DeletionFile]]:
- """
- Get deletion files for the given data files in a split.
- """
- if not deletion_files_map:
- return None
-
- partition_key = (tuple(partition.values), bucket)
- file_deletion_map = deletion_files_map.get(partition_key, {})
-
- if not file_deletion_map:
- return None
-
- deletion_files = []
- for data_file in data_files:
- deletion_file = file_deletion_map.get(data_file.file_name)
- if deletion_file:
- deletion_files.append(deletion_file)
- else:
- deletion_files.append(None)
-
- return deletion_files if any(df is not None for df in deletion_files)
else None
-
- def _create_append_only_splits(
- self, file_entries: List[ManifestEntry], deletion_files_map: dict
= None) -> List['Split']:
- partitioned_files = defaultdict(list)
- for entry in file_entries:
- partitioned_files[(tuple(entry.partition.values),
entry.bucket)].append(entry)
-
- if self.start_pos_of_this_subtask is not None:
- # shard data range: [plan_start_pos, plan_end_pos)
- partitioned_files, plan_start_pos, plan_end_pos = \
- self._append_only_filter_by_slice(partitioned_files,
-
self.start_pos_of_this_subtask,
- self.end_pos_of_this_subtask)
- elif self.idx_of_this_subtask is not None:
- partitioned_files, plan_start_pos, plan_end_pos =
self._append_only_filter_by_shard(partitioned_files)
-
- def weight_func(f: DataFileMeta) -> int:
- return max(f.file_size, self.open_file_cost)
-
- splits = []
- for key, file_entries in partitioned_files.items():
- if not file_entries:
- return []
-
- data_files: List[DataFileMeta] = [e.file for e in file_entries]
-
- packed_files: List[List[DataFileMeta]] =
self._pack_for_ordered(data_files, weight_func,
-
self.target_split_size)
- splits += self._build_split_from_pack(packed_files, file_entries,
False, deletion_files_map)
- if self.start_pos_of_this_subtask is not None or
self.idx_of_this_subtask is not None:
- # When files are combined into splits, it is necessary to find
files that needs to be divided for each split
- self._compute_split_start_end_pos(splits, plan_start_pos,
plan_end_pos)
- return splits
-
- def _without_delete_row(self, data_file_meta: DataFileMeta) -> bool:
- # null to true to be compatible with old version
- if data_file_meta.delete_row_count is None:
- return True
- return data_file_meta.delete_row_count == 0
-
- def _create_primary_key_splits(
- self, file_entries: List[ManifestEntry], deletion_files_map: dict
= None) -> List['Split']:
- if self.idx_of_this_subtask is not None:
- file_entries = self._primary_key_filter_by_shard(file_entries)
- partitioned_files = defaultdict(list)
- for entry in file_entries:
- partitioned_files[(tuple(entry.partition.values),
entry.bucket)].append(entry)
-
- def single_weight_func(f: DataFileMeta) -> int:
- return max(f.file_size, self.open_file_cost)
-
- def weight_func(fl: List[DataFileMeta]) -> int:
- return max(sum(f.file_size for f in fl), self.open_file_cost)
-
- merge_engine = self.table.options.merge_engine()
- merge_engine_first_row = merge_engine == MergeEngine.FIRST_ROW
-
- splits = []
- for key, file_entries in partitioned_files.items():
- if not file_entries:
- continue
-
- data_files: List[DataFileMeta] = [e.file for e in file_entries]
-
- raw_convertible = all(
- f.level != 0 and self._without_delete_row(f)
- for f in data_files
- )
-
- levels = {f.level for f in data_files}
- one_level = len(levels) == 1
-
- use_optimized_path = raw_convertible and (
- self.deletion_vectors_enabled or merge_engine_first_row or
one_level)
- if use_optimized_path:
- packed_files: List[List[DataFileMeta]] =
self._pack_for_ordered(
- data_files, single_weight_func, self.target_split_size
- )
- splits += self._build_split_from_pack(
- packed_files, file_entries, True, deletion_files_map,
- use_optimized_path)
- else:
- partition_sort_runs: List[List[SortedRun]] =
IntervalPartition(data_files).partition()
- sections: List[List[DataFileMeta]] = [
- [file for s in sl for file in s.files]
- for sl in partition_sort_runs
- ]
-
- packed_files: List[List[List[DataFileMeta]]] =
self._pack_for_ordered(sections, weight_func,
-
self.target_split_size)
-
- flatten_packed_files: List[List[DataFileMeta]] = [
- [file for sub_pack in pack for file in sub_pack]
- for pack in packed_files
- ]
- splits += self._build_split_from_pack(
- flatten_packed_files, file_entries, True,
- deletion_files_map, False)
- return splits
-
- def _create_data_evolution_splits(
- self, file_entries: List[ManifestEntry], deletion_files_map: dict
= None,
- row_ranges: List = None, score_getter=None) -> List['Split']:
- def sort_key(manifest_entry: ManifestEntry) -> tuple:
- first_row_id = manifest_entry.file.first_row_id if
manifest_entry.file.first_row_id is not None else float(
- '-inf')
- is_blob = 1 if self._is_blob_file(manifest_entry.file.file_name)
else 0
- # For files with same firstRowId, sort by maxSequenceNumber in
descending order
- # (larger sequence number means more recent data)
- max_seq = manifest_entry.file.max_sequence_number
- return first_row_id, is_blob, -max_seq
-
- sorted_entries = sorted(file_entries, key=sort_key)
-
- partitioned_files = defaultdict(list)
- for entry in sorted_entries:
- partitioned_files[(tuple(entry.partition.values),
entry.bucket)].append(entry)
-
- if self.start_pos_of_this_subtask is not None:
- # shard data range: [plan_start_pos, plan_end_pos)
- partitioned_files, plan_start_pos, plan_end_pos = \
- self._data_evolution_filter_by_row_range(partitioned_files,
-
self.start_pos_of_this_subtask,
-
self.end_pos_of_this_subtask)
- elif self.idx_of_this_subtask is not None:
- # shard data range: [plan_start_pos, plan_end_pos)
- partitioned_files, plan_start_pos, plan_end_pos =
self._data_evolution_filter_by_shard(partitioned_files)
-
- def weight_func(file_list: List[DataFileMeta]) -> int:
- return max(sum(f.file_size for f in file_list),
self.open_file_cost)
-
- splits = []
- for key, sorted_entries in partitioned_files.items():
- if not sorted_entries:
- continue
-
- data_files: List[DataFileMeta] = [e.file for e in sorted_entries]
-
- # Split files by firstRowId for data evolution
- split_by_row_id = self._split_by_row_id(data_files)
-
- # Pack the split groups for optimal split sizes
- packed_files: List[List[List[DataFileMeta]]] =
self._pack_for_ordered(split_by_row_id, weight_func,
-
self.target_split_size)
-
- # Flatten the packed files and build splits
- flatten_packed_files: List[List[DataFileMeta]] = [
- [file for sub_pack in pack for file in sub_pack]
- for pack in packed_files
- ]
-
- splits += self._build_split_from_pack(flatten_packed_files,
sorted_entries, False, deletion_files_map)
-
- if self.start_pos_of_this_subtask is not None or
self.idx_of_this_subtask is not None:
- self._compute_split_start_end_pos(splits, plan_start_pos,
plan_end_pos)
-
- # Wrap splits with IndexedSplit if row_ranges is provided
- if row_ranges:
- splits = self._wrap_to_indexed_splits(splits, row_ranges,
score_getter)
-
- return splits
-
- def _wrap_to_indexed_splits(
- self,
- splits: List['Split'],
- row_ranges: List,
- score_getter
- ) -> List['Split']:
-
- indexed_splits = []
- for split in splits:
- # Calculate file ranges for this split
- file_ranges = []
- for file in split.files:
- first_row_id = file.first_row_id
- if first_row_id is not None:
- file_ranges.append(Range(
- first_row_id,
- first_row_id + file.row_count - 1
- ))
-
- if not file_ranges:
- # No row IDs, keep original split
- indexed_splits.append(split)
- continue
-
- # Merge file ranges
- file_ranges = Range.merge_sorted_as_possible(file_ranges)
-
- # Intersect with row_ranges from global index
- expected = Range.and_(file_ranges, row_ranges)
-
- if not expected:
- # No intersection, skip this split
- continue
-
- # Create scores array if score_getter is provided
- scores = None
- if score_getter is not None:
- scores = []
- for r in expected:
- for row_id in range(r.from_, r.to + 1):
- score = score_getter(row_id)
- scores.append(score if score is not None else 0.0)
-
- indexed_splits.append(IndexedSplit(split, expected, scores))
-
- return indexed_splits
-
- def _split_by_row_id(self, files: List[DataFileMeta]) ->
List[List[DataFileMeta]]:
- split_by_row_id = []
-
- # Filter blob files to only include those within the row ID range of
non-blob files
- sorted_files = self._filter_blob(files)
-
- # Split files by firstRowId
- last_row_id = -1
- check_row_id_start = 0
- current_split = []
-
- for file in sorted_files:
- first_row_id = file.first_row_id
- if first_row_id is None:
- # Files without firstRowId are treated as individual splits
- split_by_row_id.append([file])
- continue
-
- if not self._is_blob_file(file.file_name) and first_row_id !=
last_row_id:
- if current_split:
- split_by_row_id.append(current_split)
-
- # Validate that files don't overlap
- if first_row_id < check_row_id_start:
- file_names = [f.file_name for f in sorted_files]
- raise ValueError(
- f"There are overlapping files in the split:
{file_names}, "
- f"the wrong file is: {file.file_name}"
- )
-
- current_split = []
- last_row_id = first_row_id
- check_row_id_start = first_row_id + file.row_count
-
- current_split.append(file)
-
- if current_split:
- split_by_row_id.append(current_split)
-
- return split_by_row_id
-
- def _build_split_from_pack(self, packed_files, file_entries,
for_primary_key_split: bool,
- deletion_files_map: dict = None,
use_optimized_path: bool = False) -> List['Split']:
- splits = []
- for file_group in packed_files:
- if use_optimized_path:
- raw_convertible = True
- elif for_primary_key_split:
- raw_convertible = len(file_group) == 1 and
self._without_delete_row(file_group[0])
- else:
- raw_convertible = True
-
- file_paths = []
- total_file_size = 0
- total_record_count = 0
-
- for data_file in file_group:
- data_file.set_file_path(self.table.table_path,
file_entries[0].partition,
- file_entries[0].bucket)
- file_paths.append(data_file.file_path)
- total_file_size += data_file.file_size
- total_record_count += data_file.row_count
-
- if file_paths:
- # Get deletion files for this split
- data_deletion_files = None
- if deletion_files_map:
- data_deletion_files = self._get_deletion_files_for_split(
- file_group,
- deletion_files_map,
- file_entries[0].partition,
- file_entries[0].bucket
- )
-
- split = Split(
- files=file_group,
- partition=file_entries[0].partition,
- bucket=file_entries[0].bucket,
- file_paths=file_paths,
- row_count=total_record_count,
- file_size=total_file_size,
- raw_convertible=raw_convertible,
- data_deletion_files=data_deletion_files
- )
- splits.append(split)
- return splits
-
- @staticmethod
- def _pack_for_ordered(items: List, weight_func: Callable, target_weight:
int) -> List[List]:
- packed = []
- bin_items = []
- bin_weight = 0
-
- for item in items:
- weight = weight_func(item)
- if bin_weight + weight > target_weight and len(bin_items) > 0:
- packed.append(list(bin_items))
- bin_items.clear()
- bin_weight = 0
-
- bin_weight += weight
- bin_items.append(item)
-
- if len(bin_items) > 0:
- packed.append(bin_items)
-
- return packed
-
- @staticmethod
- def _is_blob_file(file_name: str) -> bool:
- return file_name.endswith('.blob')
-
- @staticmethod
- def _filter_blob(files: List[DataFileMeta]) -> List[DataFileMeta]:
- result = []
- row_id_start = -1
- row_id_end = -1
-
- for file in files:
- if not FullStartingScanner._is_blob_file(file.file_name):
- if file.first_row_id is not None:
- row_id_start = file.first_row_id
- row_id_end = file.first_row_id + file.row_count
- result.append(file)
- else:
- if file.first_row_id is not None and row_id_start != -1:
- if row_id_start <= file.first_row_id < row_id_end:
- result.append(file)
-
- return result
diff --git
a/paimon-python/pypaimon/read/scanner/primary_key_table_split_generator.py
b/paimon-python/pypaimon/read/scanner/primary_key_table_split_generator.py
new file mode 100644
index 0000000000..5955b6aa94
--- /dev/null
+++ b/paimon-python/pypaimon/read/scanner/primary_key_table_split_generator.py
@@ -0,0 +1,126 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+from collections import defaultdict
+from typing import List
+
+from pypaimon.common.options.core_options import MergeEngine
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+from pypaimon.manifest.schema.manifest_entry import ManifestEntry
+from pypaimon.read.interval_partition import IntervalPartition
+from pypaimon.read.scanner.split_generator import AbstractSplitGenerator
+from pypaimon.read.split import Split
+
+
+class PrimaryKeyTableSplitGenerator(AbstractSplitGenerator):
+ """
+ Split generator for primary key tables.
+ """
+
+ def __init__(
+ self,
+ table,
+ target_split_size: int,
+ open_file_cost: int,
+ deletion_files_map=None
+ ):
+ super().__init__(table, target_split_size, open_file_cost,
deletion_files_map)
+ self.deletion_vectors_enabled =
table.options.deletion_vectors_enabled()
+ self.merge_engine = table.options.merge_engine()
+
+ def with_slice(self, start_pos: int, end_pos: int):
+ """Primary key tables do not support slice-based sharding."""
+ raise NotImplementedError(
+ "Primary key tables do not support with_slice(). "
+ "Use with_shard() for bucket-based parallel processing instead."
+ )
+
+ def create_splits(self, file_entries: List[ManifestEntry]) -> List[Split]:
+ """
+ Create splits for primary key tables.
+ """
+ if self.idx_of_this_subtask is not None:
+ file_entries = self._filter_by_shard(file_entries)
+
+ partitioned_files = defaultdict(list)
+ for entry in file_entries:
+ partitioned_files[(tuple(entry.partition.values),
entry.bucket)].append(entry)
+
+ def single_weight_func(f: DataFileMeta) -> int:
+ return max(f.file_size, self.open_file_cost)
+
+ def weight_func(fl: List[DataFileMeta]) -> int:
+ return max(sum(f.file_size for f in fl), self.open_file_cost)
+
+ merge_engine_first_row = self.merge_engine == MergeEngine.FIRST_ROW
+
+ splits = []
+ for key, file_entries_list in partitioned_files.items():
+ if not file_entries_list:
+ continue
+
+ data_files: List[DataFileMeta] = [e.file for e in
file_entries_list]
+
+ raw_convertible = all(
+ f.level != 0 and self._without_delete_row(f)
+ for f in data_files
+ )
+
+ levels = {f.level for f in data_files}
+ one_level = len(levels) == 1
+
+ use_optimized_path = raw_convertible and (
+ self.deletion_vectors_enabled or merge_engine_first_row or
one_level
+ )
+
+ if use_optimized_path:
+ packed_files: List[List[DataFileMeta]] =
self._pack_for_ordered(
+ data_files, single_weight_func, self.target_split_size
+ )
+ splits += self._build_split_from_pack(
+ packed_files, file_entries_list, True, use_optimized_path
+ )
+ else:
+ partition_sort_runs = IntervalPartition(data_files).partition()
+ sections: List[List[DataFileMeta]] = [
+ [file for s in sl for file in s.files]
+ for sl in partition_sort_runs
+ ]
+
+ packed_files = self._pack_for_ordered(
+ sections, weight_func, self.target_split_size
+ )
+
+ flatten_packed_files: List[List[DataFileMeta]] = [
+ [file for sub_pack in pack for file in sub_pack]
+ for pack in packed_files
+ ]
+ splits += self._build_split_from_pack(
+ flatten_packed_files, file_entries_list, True, False
+ )
+
+ return splits
+
+ def _filter_by_shard(self, file_entries: List[ManifestEntry]) ->
List[ManifestEntry]:
+ """
+ Filter file entries by bucket-based sharding.
+ """
+ filtered_entries = []
+ for entry in file_entries:
+ if entry.bucket % self.number_of_para_subtasks ==
self.idx_of_this_subtask:
+ filtered_entries.append(entry)
+ return filtered_entries
diff --git a/paimon-python/pypaimon/read/scanner/split_generator.py
b/paimon-python/pypaimon/read/scanner/split_generator.py
new file mode 100644
index 0000000000..8f94338900
--- /dev/null
+++ b/paimon-python/pypaimon/read/scanner/split_generator.py
@@ -0,0 +1,246 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+from abc import ABC, abstractmethod
+from typing import Callable, List, Optional, Dict, Tuple
+
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+from pypaimon.manifest.schema.manifest_entry import ManifestEntry
+from pypaimon.read.split import Split
+from pypaimon.table.row.generic_row import GenericRow
+from pypaimon.table.source.deletion_file import DeletionFile
+
+
+class AbstractSplitGenerator(ABC):
+ """
+ Abstract base class for generating splits.
+ """
+
+ # Special key for tracking file end position in split file index map
+ NEXT_POS_KEY = '_next_pos'
+
+ def __init__(
+ self,
+ table,
+ target_split_size: int,
+ open_file_cost: int,
+ deletion_files_map: Optional[Dict] = None
+ ):
+ self.table = table
+ self.target_split_size = target_split_size
+ self.open_file_cost = open_file_cost
+ self.deletion_files_map = deletion_files_map or {}
+
+ # Shard configuration
+ self.idx_of_this_subtask = None
+ self.number_of_para_subtasks = None
+ self.start_pos_of_this_subtask = None
+ self.end_pos_of_this_subtask = None
+
+ def with_shard(self, idx_of_this_subtask: int, number_of_para_subtasks:
int):
+ """Configure sharding for parallel processing."""
+ if idx_of_this_subtask >= number_of_para_subtasks:
+ raise ValueError("idx_of_this_subtask must be less than
number_of_para_subtasks")
+ if self.start_pos_of_this_subtask is not None:
+ raise ValueError("with_shard and with_slice cannot be used
simultaneously")
+ self.idx_of_this_subtask = idx_of_this_subtask
+ self.number_of_para_subtasks = number_of_para_subtasks
+ return self
+
+ def with_slice(self, start_pos: int, end_pos: int):
+ """Configure slice range for processing."""
+ if start_pos >= end_pos:
+ raise ValueError("start_pos must be less than end_pos")
+ if self.idx_of_this_subtask is not None:
+ raise ValueError("with_slice and with_shard cannot be used
simultaneously")
+ self.start_pos_of_this_subtask = start_pos
+ self.end_pos_of_this_subtask = end_pos
+ return self
+
+ @abstractmethod
+ def create_splits(self, file_entries: List[ManifestEntry]) -> List[Split]:
+ """
+ Create splits from manifest entries.
+ """
+ pass
+
+ def _build_split_from_pack(
+ self,
+ packed_files: List[List[DataFileMeta]],
+ file_entries: List[ManifestEntry],
+ for_primary_key_split: bool,
+ use_optimized_path: bool = False
+ ) -> List[Split]:
+ """
+ Build splits from packed files.
+ """
+ splits = []
+ for file_group in packed_files:
+ if use_optimized_path:
+ raw_convertible = True
+ elif for_primary_key_split:
+ raw_convertible = len(file_group) == 1 and
self._without_delete_row(file_group[0])
+ else:
+ raw_convertible = True
+
+ file_paths = []
+ total_file_size = 0
+ total_record_count = 0
+
+ for data_file in file_group:
+ data_file.set_file_path(
+ self.table.table_path,
+ file_entries[0].partition,
+ file_entries[0].bucket
+ )
+ file_paths.append(data_file.file_path)
+ total_file_size += data_file.file_size
+ total_record_count += data_file.row_count
+
+ if file_paths:
+ # Get deletion files for this split
+ data_deletion_files = None
+ if self.deletion_files_map:
+ data_deletion_files = self._get_deletion_files_for_split(
+ file_group,
+ file_entries[0].partition,
+ file_entries[0].bucket
+ )
+
+ split = Split(
+ files=file_group,
+ partition=file_entries[0].partition,
+ bucket=file_entries[0].bucket,
+ file_paths=file_paths,
+ row_count=total_record_count,
+ file_size=total_file_size,
+ raw_convertible=raw_convertible,
+ data_deletion_files=data_deletion_files
+ )
+ splits.append(split)
+ return splits
+
+ def _get_deletion_files_for_split(
+ self,
+ data_files: List[DataFileMeta],
+ partition: GenericRow,
+ bucket: int
+ ) -> Optional[List[DeletionFile]]:
+ """Get deletion files for the given data files in a split."""
+ if not self.deletion_files_map:
+ return None
+
+ partition_key = (tuple(partition.values), bucket)
+ file_deletion_map = self.deletion_files_map.get(partition_key, {})
+
+ if not file_deletion_map:
+ return None
+
+ deletion_files = []
+ for data_file in data_files:
+ deletion_file = file_deletion_map.get(data_file.file_name)
+ if deletion_file:
+ deletion_files.append(deletion_file)
+ else:
+ deletion_files.append(None)
+
+ return deletion_files if any(df is not None for df in deletion_files)
else None
+
+ @staticmethod
+ def _without_delete_row(data_file_meta: DataFileMeta) -> bool:
+ """Check if a data file has no deleted rows."""
+ if data_file_meta.delete_row_count is None:
+ return True
+ return data_file_meta.delete_row_count == 0
+
+ @staticmethod
+ def _pack_for_ordered(
+ items: List,
+ weight_func: Callable,
+ target_weight: int
+ ) -> List[List]:
+ """Pack items into groups based on target weight."""
+ packed = []
+ bin_items = []
+ bin_weight = 0
+
+ for item in items:
+ weight = weight_func(item)
+ if bin_weight + weight > target_weight and len(bin_items) > 0:
+ packed.append(list(bin_items))
+ bin_items.clear()
+ bin_weight = 0
+
+ bin_weight += weight
+ bin_items.append(item)
+
+ if len(bin_items) > 0:
+ packed.append(bin_items)
+
+ return packed
+
+ def _compute_shard_range(self, total_row: int) -> Tuple[int, int]:
+ """
+ Calculate start and end positions for this shard based on total rows.
+ Uses balanced distribution to avoid last shard overload.
+ """
+ base_rows_per_shard = total_row // self.number_of_para_subtasks
+ remainder = total_row % self.number_of_para_subtasks
+
+ # Each of the first 'remainder' shards gets one extra row
+ if self.idx_of_this_subtask < remainder:
+ num_row = base_rows_per_shard + 1
+ start_pos = self.idx_of_this_subtask * (base_rows_per_shard + 1)
+ else:
+ num_row = base_rows_per_shard
+ start_pos = (
+ remainder * (base_rows_per_shard + 1) +
+ (self.idx_of_this_subtask - remainder) * base_rows_per_shard
+ )
+
+ end_pos = start_pos + num_row
+ return start_pos, end_pos
+
+ @staticmethod
+ def _compute_file_range(
+ plan_start_pos: int,
+ plan_end_pos: int,
+ file_begin_pos: int,
+ file_row_count: int
+ ) -> Optional[Tuple[int, int]]:
+ """
+ Compute the row range to read from a file given shard range and file
position.
+ Returns None if file is completely within shard range (no slicing
needed).
+ Returns (-1, -1) if file is completely outside shard range.
+ """
+ file_end_pos = file_begin_pos + file_row_count
+
+ if file_begin_pos <= plan_start_pos < plan_end_pos <= file_end_pos:
+ return plan_start_pos - file_begin_pos, plan_end_pos -
file_begin_pos
+ elif file_begin_pos < plan_start_pos < file_end_pos:
+ return plan_start_pos - file_begin_pos, file_row_count
+ elif file_begin_pos < plan_end_pos < file_end_pos:
+ return 0, plan_end_pos - file_begin_pos
+ elif file_end_pos <= plan_start_pos or file_begin_pos >= plan_end_pos:
+ return -1, -1
+ # File is completely within the shard range
+ return None
+
+ @staticmethod
+ def _is_blob_file(file_name: str) -> bool:
+ """Check if a file is a blob file."""
+ return file_name.endswith('.blob')
diff --git a/paimon-python/pypaimon/read/sliced_split.py
b/paimon-python/pypaimon/read/sliced_split.py
new file mode 100644
index 0000000000..ff96798633
--- /dev/null
+++ b/paimon-python/pypaimon/read/sliced_split.py
@@ -0,0 +1,110 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""
+SlicedSplit wraps a Split with file index ranges for shard/slice processing.
+"""
+
+from typing import List, Dict, Tuple
+
+from pypaimon.read.split import SplitBase
+
+
+class SlicedSplit(SplitBase):
+ """
+ Wrapper for Split that adds file-level slicing information.
+
+ This is used when a split needs to be further divided at the file level,
+ storing the start and end row indices for each file in shard_file_idx_map.
+
+ Maps file_name -> (start_row, end_row) where:
+ - start_row: starting row index within the file (inclusive)
+ - end_row: ending row index within the file (exclusive)
+ - (-1, -1): file should be skipped entirely
+ """
+
+ def __init__(
+ self,
+ data_split: 'Split',
+ shard_file_idx_map: Dict[str, Tuple[int, int]]
+ ):
+ self._data_split = data_split
+ self._shard_file_idx_map = shard_file_idx_map
+
+ def data_split(self) -> 'Split':
+ return self._data_split
+
+ def shard_file_idx_map(self) -> Dict[str, Tuple[int, int]]:
+ return self._shard_file_idx_map
+
+ @property
+ def files(self) -> List['DataFileMeta']:
+ return self._data_split.files
+
+ @property
+ def partition(self) -> 'GenericRow':
+ return self._data_split.partition
+
+ @property
+ def bucket(self) -> int:
+ return self._data_split.bucket
+
+ @property
+ def row_count(self) -> int:
+ if not self._shard_file_idx_map:
+ return self._data_split.row_count
+
+ total_rows = 0
+ for file in self._data_split.files:
+ if file.file_name in self._shard_file_idx_map:
+ start, end = self._shard_file_idx_map[file.file_name]
+ if start != -1 and end != -1:
+ total_rows += (end - start)
+ else:
+ total_rows += file.row_count
+
+ return total_rows
+
+ @property
+ def file_paths(self):
+ return self._data_split.file_paths
+
+ @property
+ def file_size(self):
+ return self._data_split.file_size
+
+ @property
+ def raw_convertible(self):
+ return self._data_split.raw_convertible
+
+ @property
+ def data_deletion_files(self):
+ return self._data_split.data_deletion_files
+
+ def __eq__(self, other):
+ if not isinstance(other, SlicedSplit):
+ return False
+ return (self._data_split == other._data_split and
+ self._shard_file_idx_map == other._shard_file_idx_map)
+
+ def __hash__(self):
+ return hash((id(self._data_split),
tuple(sorted(self._shard_file_idx_map.items()))))
+
+ def __repr__(self):
+ return (f"SlicedSplit(data_split={self._data_split}, "
+ f"shard_file_idx_map={self._shard_file_idx_map})")
diff --git a/paimon-python/pypaimon/read/split.py
b/paimon-python/pypaimon/read/split.py
index f5357b37ea..3daccb75f3 100644
--- a/paimon-python/pypaimon/read/split.py
+++ b/paimon-python/pypaimon/read/split.py
@@ -17,7 +17,7 @@
################################################################################
from abc import ABC, abstractmethod
-from typing import List, Optional, Dict, Tuple
+from typing import List, Optional
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.table.row.generic_row import GenericRow
@@ -71,7 +71,6 @@ class Split(SplitBase):
file_paths: List[str],
row_count: int,
file_size: int,
- shard_file_idx_map: Optional[Dict[str, Tuple[int, int]]] = None,
raw_convertible: bool = False,
data_deletion_files: Optional[List[DeletionFile]] = None
):
@@ -81,7 +80,6 @@ class Split(SplitBase):
self._file_paths = file_paths
self._row_count = row_count
self._file_size = file_size
- self.shard_file_idx_map = shard_file_idx_map or {}
self.raw_convertible = raw_convertible
self.data_deletion_files = data_deletion_files
diff --git a/paimon-python/pypaimon/read/split_read.py
b/paimon-python/pypaimon/read/split_read.py
index 5df6ccb96e..47edf63d9a 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -51,6 +51,7 @@ from pypaimon.read.reader.key_value_wrap_reader import
KeyValueWrapReader
from pypaimon.read.reader.shard_batch_reader import ShardBatchReader
from pypaimon.read.reader.sort_merge_reader import SortMergeReaderWithMinHeap
from pypaimon.read.split import Split
+from pypaimon.read.sliced_split import SlicedSplit
from pypaimon.schema.data_types import DataField
from pypaimon.table.special_fields import SpecialFields
@@ -333,8 +334,12 @@ class RawFileSplitRead(SplitRead):
def raw_reader_supplier(self, file: DataFileMeta, dv_factory:
Optional[Callable] = None) -> Optional[RecordReader]:
read_fields = self._get_final_read_data_fields()
# If the current file needs to be further divided for reading, use
ShardBatchReader
- if file.file_name in self.split.shard_file_idx_map:
- (start_pos, end_pos) =
self.split.shard_file_idx_map[file.file_name]
+ # Check if this is a SlicedSplit to get shard_file_idx_map
+ shard_file_idx_map = (
+ self.split.shard_file_idx_map() if isinstance(self.split,
SlicedSplit) else {}
+ )
+ if file.file_name in shard_file_idx_map:
+ (start_pos, end_pos) = shard_file_idx_map[file.file_name]
if (start_pos, end_pos) == (-1, -1):
return None
else:
@@ -567,8 +572,12 @@ class DataEvolutionSplitRead(SplitRead):
def _create_file_reader(self, file: DataFileMeta, read_fields: [str]) ->
Optional[RecordReader]:
"""Create a file reader for a single file."""
# If the current file needs to be further divided for reading, use
ShardBatchReader
- if file.file_name in self.split.shard_file_idx_map:
- (begin_pos, end_pos) =
self.split.shard_file_idx_map[file.file_name]
+ # Check if this is a SlicedSplit to get shard_file_idx_map
+ shard_file_idx_map = (
+ self.split.shard_file_idx_map() if isinstance(self.split,
SlicedSplit) else {}
+ )
+ if file.file_name in shard_file_idx_map:
+ (begin_pos, end_pos) = shard_file_idx_map[file.file_name]
if (begin_pos, end_pos) == (-1, -1):
return None
else: