This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new dff6fa2c10 [python] refactor FullStartingScanner to append and pk 
split generator (#7042)
dff6fa2c10 is described below

commit dff6fa2c1057e705d30c1e8bcb852709b75a3f17
Author: jerry <[email protected]>
AuthorDate: Wed Jan 14 23:25:56 2026 +0800

    [python] refactor FullStartingScanner to append and pk split generator 
(#7042)
---
 .../pypaimon/globalindex/indexed_split.py          |   5 -
 .../read/scanner/append_table_split_generator.py   | 173 ++++++
 .../read/scanner/data_evolution_split_generator.py | 396 +++++++++++++
 .../pypaimon/read/scanner/full_starting_scanner.py | 637 ++-------------------
 .../scanner/primary_key_table_split_generator.py   | 126 ++++
 .../pypaimon/read/scanner/split_generator.py       | 246 ++++++++
 paimon-python/pypaimon/read/sliced_split.py        | 110 ++++
 paimon-python/pypaimon/read/split.py               |   4 +-
 paimon-python/pypaimon/read/split_read.py          |  17 +-
 9 files changed, 1117 insertions(+), 597 deletions(-)

diff --git a/paimon-python/pypaimon/globalindex/indexed_split.py 
b/paimon-python/pypaimon/globalindex/indexed_split.py
index 6cdee70b41..0203923fc7 100644
--- a/paimon-python/pypaimon/globalindex/indexed_split.py
+++ b/paimon-python/pypaimon/globalindex/indexed_split.py
@@ -88,11 +88,6 @@ class IndexedSplit(SplitBase):
         """Delegate to data_split."""
         return self._data_split.file_size
 
-    @property
-    def shard_file_idx_map(self):
-        """Delegate to data_split."""
-        return self._data_split.shard_file_idx_map
-
     @property
     def raw_convertible(self):
         """Delegate to data_split."""
diff --git 
a/paimon-python/pypaimon/read/scanner/append_table_split_generator.py 
b/paimon-python/pypaimon/read/scanner/append_table_split_generator.py
new file mode 100644
index 0000000000..775771eed1
--- /dev/null
+++ b/paimon-python/pypaimon/read/scanner/append_table_split_generator.py
@@ -0,0 +1,173 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+from collections import defaultdict
+from typing import List, Dict, Tuple
+
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+from pypaimon.manifest.schema.manifest_entry import ManifestEntry
+from pypaimon.read.scanner.split_generator import AbstractSplitGenerator
+from pypaimon.read.split import Split
+from pypaimon.read.sliced_split import SlicedSplit
+
+
+class AppendTableSplitGenerator(AbstractSplitGenerator):
+    """
+    Split generator for append-only tables.
+    """
+
+    def create_splits(self, file_entries: List[ManifestEntry]) -> List[Split]:
+        partitioned_files = defaultdict(list)
+        for entry in file_entries:
+            partitioned_files[(tuple(entry.partition.values), 
entry.bucket)].append(entry)
+
+        plan_start_pos = 0
+        plan_end_pos = 0
+
+        if self.start_pos_of_this_subtask is not None:
+            # shard data range: [plan_start_pos, plan_end_pos)
+            partitioned_files, plan_start_pos, plan_end_pos = \
+                self.__filter_by_slice(
+                    partitioned_files,
+                    self.start_pos_of_this_subtask,
+                    self.end_pos_of_this_subtask
+                )
+        elif self.idx_of_this_subtask is not None:
+            partitioned_files, plan_start_pos, plan_end_pos = 
self._filter_by_shard(partitioned_files)
+
+        def weight_func(f: DataFileMeta) -> int:
+            return max(f.file_size, self.open_file_cost)
+
+        splits = []
+        for key, file_entries_list in partitioned_files.items():
+            if not file_entries_list:
+                continue
+
+            data_files: List[DataFileMeta] = [e.file for e in 
file_entries_list]
+
+            packed_files: List[List[DataFileMeta]] = self._pack_for_ordered(
+                data_files, weight_func, self.target_split_size
+            )
+            splits += self._build_split_from_pack(
+                packed_files, file_entries_list, False
+            )
+
+        if self.start_pos_of_this_subtask is not None or 
self.idx_of_this_subtask is not None:
+            splits = self._wrap_to_sliced_splits(splits, plan_start_pos, 
plan_end_pos)
+
+        return splits
+
+    def _wrap_to_sliced_splits(self, splits: List[Split], plan_start_pos: int, 
plan_end_pos: int) -> List[Split]:
+        sliced_splits = []
+        file_end_pos = 0  # end row position of current file in all splits data
+
+        for split in splits:
+            shard_file_idx_map = self.__compute_split_file_idx_map(
+                plan_start_pos, plan_end_pos, split, file_end_pos
+            )
+            file_end_pos = shard_file_idx_map[self.NEXT_POS_KEY]
+            del shard_file_idx_map[self.NEXT_POS_KEY]
+            
+            if shard_file_idx_map:
+                sliced_splits.append(SlicedSplit(split, shard_file_idx_map))
+            else:
+                sliced_splits.append(split)
+
+        return sliced_splits
+
+    @staticmethod
+    def __filter_by_slice(
+        partitioned_files: defaultdict,
+        start_pos: int,
+        end_pos: int
+    ) -> tuple:
+        plan_start_pos = 0
+        plan_end_pos = 0
+        entry_end_pos = 0  # end row position of current file in all data
+        splits_start_pos = 0
+        filtered_partitioned_files = defaultdict(list)
+
+        # Iterate through all file entries to find files that overlap with 
current shard range
+        for key, file_entries in partitioned_files.items():
+            filtered_entries = []
+            for entry in file_entries:
+                entry_begin_pos = entry_end_pos  # Starting row position of 
current file in all data
+                entry_end_pos += entry.file.row_count  # Update to row 
position after current file
+
+                # If current file is completely after shard range, stop 
iteration
+                if entry_begin_pos >= end_pos:
+                    break
+                # If current file is completely before shard range, skip it
+                if entry_end_pos <= start_pos:
+                    continue
+                if entry_begin_pos <= start_pos < entry_end_pos:
+                    splits_start_pos = entry_begin_pos
+                    plan_start_pos = start_pos - entry_begin_pos
+                # If shard end position is within current file, record 
relative end position
+                if entry_begin_pos < end_pos <= entry_end_pos:
+                    plan_end_pos = end_pos - splits_start_pos
+                # Add files that overlap with shard range to result
+                filtered_entries.append(entry)
+            if filtered_entries:
+                filtered_partitioned_files[key] = filtered_entries
+
+        return filtered_partitioned_files, plan_start_pos, plan_end_pos
+
+    def _filter_by_shard(self, partitioned_files: defaultdict) -> tuple:
+        """
+        Filter file entries by shard. Only keep the files within the range, 
which means
+        that only the starting and ending files need to be further divided 
subsequently.
+        """
+        # Calculate total rows
+        total_row = sum(
+            entry.file.row_count
+            for file_entries in partitioned_files.values()
+            for entry in file_entries
+        )
+
+        # Calculate shard range using shared helper
+        start_pos, end_pos = self._compute_shard_range(total_row)
+
+        return self.__filter_by_slice(partitioned_files, start_pos, end_pos)
+
+    @staticmethod
+    def __compute_split_file_idx_map(
+        plan_start_pos: int,
+        plan_end_pos: int,
+        split: Split,
+        file_end_pos: int
+    ) -> Dict[str, Tuple[int, int]]:
+        """
+        Compute file index map for a split, determining which rows to read 
from each file.
+
+        """
+        shard_file_idx_map = {}
+        
+        for file in split.files:
+            file_begin_pos = file_end_pos  # Starting row position of current 
file in all data
+            file_end_pos += file.row_count  # Update to row position after 
current file
+
+            # Use shared helper to compute file range
+            file_range = AppendTableSplitGenerator._compute_file_range(
+                plan_start_pos, plan_end_pos, file_begin_pos, file.row_count
+            )
+            
+            if file_range is not None:
+                shard_file_idx_map[file.file_name] = file_range
+
+        shard_file_idx_map[AppendTableSplitGenerator.NEXT_POS_KEY] = 
file_end_pos
+        return shard_file_idx_map
diff --git 
a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py 
b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py
new file mode 100644
index 0000000000..10847b12b2
--- /dev/null
+++ b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py
@@ -0,0 +1,396 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+from collections import defaultdict
+from typing import List, Optional, Dict, Tuple
+
+from pypaimon.globalindex.indexed_split import IndexedSplit
+from pypaimon.globalindex.range import Range
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+from pypaimon.manifest.schema.manifest_entry import ManifestEntry
+from pypaimon.read.scanner.split_generator import AbstractSplitGenerator
+from pypaimon.read.split import Split
+from pypaimon.read.sliced_split import SlicedSplit
+
+
+class DataEvolutionSplitGenerator(AbstractSplitGenerator):
+    """
+    Split generator for data evolution tables.
+    """
+
+    def __init__(
+        self,
+        table,
+        target_split_size: int,
+        open_file_cost: int,
+        deletion_files_map=None,
+        row_ranges: Optional[List] = None,
+        score_getter=None
+    ):
+        super().__init__(table, target_split_size, open_file_cost, 
deletion_files_map)
+        self.row_ranges = row_ranges
+        self.score_getter = score_getter
+
+    def create_splits(self, file_entries: List[ManifestEntry]) -> List[Split]:
+        """
+        Create splits for data evolution tables.
+        """
+        def sort_key(manifest_entry: ManifestEntry) -> tuple:
+            first_row_id = (
+                manifest_entry.file.first_row_id
+                if manifest_entry.file.first_row_id is not None
+                else float('-inf')
+            )
+            is_blob = 1 if self._is_blob_file(manifest_entry.file.file_name) 
else 0
+            max_seq = manifest_entry.file.max_sequence_number
+            return first_row_id, is_blob, -max_seq
+
+        sorted_entries = sorted(file_entries, key=sort_key)
+
+        partitioned_files = defaultdict(list)
+        for entry in sorted_entries:
+            partitioned_files[(tuple(entry.partition.values), 
entry.bucket)].append(entry)
+
+        plan_start_pos = 0
+        plan_end_pos = 0
+
+        if self.start_pos_of_this_subtask is not None:
+            # shard data range: [plan_start_pos, plan_end_pos)
+            partitioned_files, plan_start_pos, plan_end_pos = \
+                self._filter_by_row_range(
+                    partitioned_files,
+                    self.start_pos_of_this_subtask,
+                    self.end_pos_of_this_subtask
+                )
+        elif self.idx_of_this_subtask is not None:
+            # shard data range: [plan_start_pos, plan_end_pos)
+            partitioned_files, plan_start_pos, plan_end_pos = 
self._filter_by_shard(partitioned_files)
+
+        def weight_func(file_list: List[DataFileMeta]) -> int:
+            return max(sum(f.file_size for f in file_list), 
self.open_file_cost)
+
+        splits = []
+        for key, sorted_entries_list in partitioned_files.items():
+            if not sorted_entries_list:
+                continue
+
+            data_files: List[DataFileMeta] = [e.file for e in 
sorted_entries_list]
+
+            # Split files by firstRowId for data evolution
+            split_by_row_id = self._split_by_row_id(data_files)
+
+            # Pack the split groups for optimal split sizes
+            packed_files = self._pack_for_ordered(
+                split_by_row_id, weight_func, self.target_split_size
+            )
+
+            # Flatten the packed files and build splits
+            flatten_packed_files: List[List[DataFileMeta]] = [
+                [file for sub_pack in pack for file in sub_pack]
+                for pack in packed_files
+            ]
+
+            splits += self._build_split_from_pack(
+                flatten_packed_files, sorted_entries_list, False
+            )
+
+        if self.start_pos_of_this_subtask is not None or 
self.idx_of_this_subtask is not None:
+            splits = self._wrap_to_sliced_splits(splits, plan_start_pos, 
plan_end_pos)
+
+        # Wrap splits with IndexedSplit if row_ranges is provided
+        if self.row_ranges:
+            splits = self._wrap_to_indexed_splits(splits)
+
+        return splits
+
+    def _wrap_to_sliced_splits(self, splits: List[Split], plan_start_pos: int, 
plan_end_pos: int) -> List[Split]:
+        """
+        Wrap splits with SlicedSplit to add file-level slicing information.
+        """
+        sliced_splits = []
+        file_end_pos = 0  # end row position of current file in all splits data
+
+        for split in splits:
+            # Compute file index map for both data and blob files
+            # Blob files share the same row position tracking as data files
+            shard_file_idx_map = self._compute_split_file_idx_map(
+                plan_start_pos, plan_end_pos, split, file_end_pos
+            )
+            file_end_pos = shard_file_idx_map[self.NEXT_POS_KEY]
+            del shard_file_idx_map[self.NEXT_POS_KEY]
+            
+            if shard_file_idx_map:
+                sliced_splits.append(SlicedSplit(split, shard_file_idx_map))
+            else:
+                sliced_splits.append(split)
+
+        return sliced_splits
+
+    def _filter_by_row_range(
+        self,
+        partitioned_files: defaultdict,
+        start_pos: int,
+        end_pos: int
+    ) -> tuple:
+        """
+        Filter file entries by row range for data evolution tables.
+        """
+        plan_start_pos = 0
+        plan_end_pos = 0
+        entry_end_pos = 0  # end row position of current file in all data
+        splits_start_pos = 0
+        filtered_partitioned_files = defaultdict(list)
+
+        # Iterate through all file entries to find files that overlap with 
current shard range
+        for key, file_entries in partitioned_files.items():
+            filtered_entries = []
+            blob_added = False  # If it is true, all blobs corresponding to 
this data file are added
+            for entry in file_entries:
+                if self._is_blob_file(entry.file.file_name):
+                    if blob_added:
+                        filtered_entries.append(entry)
+                    continue
+                blob_added = False
+                entry_begin_pos = entry_end_pos  # Starting row position of 
current file in all data
+                entry_end_pos += entry.file.row_count  # Update to row 
position after current file
+
+                # If current file is completely after shard range, stop 
iteration
+                if entry_begin_pos >= end_pos:
+                    break
+                # If current file is completely before shard range, skip it
+                if entry_end_pos <= start_pos:
+                    continue
+                if entry_begin_pos <= start_pos < entry_end_pos:
+                    splits_start_pos = entry_begin_pos
+                    plan_start_pos = start_pos - entry_begin_pos
+                # If shard end position is within current file, record 
relative end position
+                if entry_begin_pos < end_pos <= entry_end_pos:
+                    plan_end_pos = end_pos - splits_start_pos
+                # Add files that overlap with shard range to result
+                filtered_entries.append(entry)
+                blob_added = True
+            if filtered_entries:
+                filtered_partitioned_files[key] = filtered_entries
+
+        return filtered_partitioned_files, plan_start_pos, plan_end_pos
+
+    def _filter_by_shard(self, partitioned_files: defaultdict) -> tuple:
+        """
+        Filter file entries by shard for data evolution tables.
+        """
+        # Calculate total rows (excluding blob files)
+        total_row = sum(
+            entry.file.row_count
+            for file_entries in partitioned_files.values()
+            for entry in file_entries
+            if not self._is_blob_file(entry.file.file_name)
+        )
+
+        # Calculate shard range using shared helper
+        start_pos, end_pos = self._compute_shard_range(total_row)
+
+        return self._filter_by_row_range(partitioned_files, start_pos, end_pos)
+
+    def _split_by_row_id(self, files: List[DataFileMeta]) -> 
List[List[DataFileMeta]]:
+        """
+        Split files by row ID for data evolution tables.
+        """
+        split_by_row_id = []
+
+        # Filter blob files to only include those within the row ID range of 
non-blob files
+        sorted_files = self._filter_blob(files)
+
+        # Split files by firstRowId
+        last_row_id = -1
+        check_row_id_start = 0
+        current_split = []
+
+        for file in sorted_files:
+            first_row_id = file.first_row_id
+            if first_row_id is None:
+                # Files without firstRowId are treated as individual splits
+                split_by_row_id.append([file])
+                continue
+
+            if not self._is_blob_file(file.file_name) and first_row_id != 
last_row_id:
+                if current_split:
+                    split_by_row_id.append(current_split)
+
+                # Validate that files don't overlap
+                if first_row_id < check_row_id_start:
+                    file_names = [f.file_name for f in sorted_files]
+                    raise ValueError(
+                        f"There are overlapping files in the split: 
{file_names}, "
+                        f"the wrong file is: {file.file_name}"
+                    )
+
+                current_split = []
+                last_row_id = first_row_id
+                check_row_id_start = first_row_id + file.row_count
+
+            current_split.append(file)
+
+        if current_split:
+            split_by_row_id.append(current_split)
+
+        return split_by_row_id
+
+    def _compute_split_file_idx_map(
+        self,
+        plan_start_pos: int,
+        plan_end_pos: int,
+        split: Split,
+        file_end_pos: int
+    ) -> Dict[str, Tuple[int, int]]:
+        """
+        Compute file index map for a split, determining which rows to read 
from each file.
+        For data files, the range is calculated based on the file's position 
in the cumulative row space.
+        For blob files (which may be rolled), the range is calculated based on 
each file's first_row_id.
+        """
+        shard_file_idx_map = {}
+        
+        # Find the first non-blob file to determine the row range for this 
split
+        data_file = None
+        for file in split.files:
+            if not self._is_blob_file(file.file_name):
+                data_file = file
+                break
+        
+        if data_file is None:
+            # No data file, skip this split
+            shard_file_idx_map[self.NEXT_POS_KEY] = file_end_pos
+            return shard_file_idx_map
+
+        # Calculate the row range based on the data file position
+        file_begin_pos = file_end_pos
+        file_end_pos += data_file.row_count
+        data_file_first_row_id = data_file.first_row_id if 
data_file.first_row_id is not None else 0
+
+        # Determine the row range for the data file in this split using shared 
helper
+        data_file_range = self._compute_file_range(
+            plan_start_pos, plan_end_pos, file_begin_pos, data_file.row_count
+        )
+        
+        # Apply ranges to each file in the split
+        for file in split.files:
+            if self._is_blob_file(file.file_name):
+                # For blob files, calculate range based on their first_row_id
+                if data_file_range is None:
+                    # Data file is completely within shard, blob files should 
also be
+                    continue
+                elif data_file_range == (-1, -1):
+                    # Data file is completely outside shard, blob files should 
be skipped
+                    shard_file_idx_map[file.file_name] = (-1, -1)
+                else:
+                    # Calculate blob file's position relative to data file's 
first_row_id
+                    blob_first_row_id = file.first_row_id if file.first_row_id 
is not None else 0
+                    # Blob's position relative to data file start
+                    blob_rel_start = blob_first_row_id - data_file_first_row_id
+                    blob_rel_end = blob_rel_start + file.row_count
+                    
+                    # Shard range relative to data file start
+                    shard_start = data_file_range[0]
+                    shard_end = data_file_range[1]
+                    
+                    # Intersect blob's range with shard range
+                    intersect_start = max(blob_rel_start, shard_start)
+                    intersect_end = min(blob_rel_end, shard_end)
+                    
+                    if intersect_start >= intersect_end:
+                        # Blob file is completely outside shard range
+                        shard_file_idx_map[file.file_name] = (-1, -1)
+                    elif intersect_start == blob_rel_start and intersect_end 
== blob_rel_end:
+                        # Blob file is completely within shard range, no 
slicing needed
+                        pass
+                    else:
+                        # Convert to file-local indices
+                        local_start = intersect_start - blob_rel_start
+                        local_end = intersect_end - blob_rel_start
+                        shard_file_idx_map[file.file_name] = (local_start, 
local_end)
+            else:
+                # Data file
+                if data_file_range is not None:
+                    shard_file_idx_map[file.file_name] = data_file_range
+
+        shard_file_idx_map[self.NEXT_POS_KEY] = file_end_pos
+        return shard_file_idx_map
+
+    def _wrap_to_indexed_splits(self, splits: List[Split]) -> List[Split]:
+        """
+        Wrap splits with IndexedSplit for row range filtering.
+        """
+        indexed_splits = []
+        for split in splits:
+            # Calculate file ranges for this split
+            file_ranges = []
+            for file in split.files:
+                first_row_id = file.first_row_id
+                if first_row_id is not None:
+                    file_ranges.append(Range(
+                        first_row_id,
+                        first_row_id + file.row_count - 1
+                    ))
+
+            if not file_ranges:
+                # No row IDs, keep original split
+                indexed_splits.append(split)
+                continue
+
+            # Merge file ranges
+            file_ranges = Range.merge_sorted_as_possible(file_ranges)
+
+            # Intersect with row_ranges from global index
+            expected = Range.and_(file_ranges, self.row_ranges)
+
+            if not expected:
+                # No intersection, skip this split
+                continue
+
+            # Create scores array if score_getter is provided
+            scores = None
+            if self.score_getter is not None:
+                scores = []
+                for r in expected:
+                    for row_id in range(r.from_, r.to + 1):
+                        score = self.score_getter(row_id)
+                        scores.append(score if score is not None else 0.0)
+
+            indexed_splits.append(IndexedSplit(split, expected, scores))
+
+        return indexed_splits
+
+    @staticmethod
+    def _filter_blob(files: List[DataFileMeta]) -> List[DataFileMeta]:
+        """
+        Filter blob files to only include those within row ID range of 
non-blob files.
+        """
+        result = []
+        row_id_start = -1
+        row_id_end = -1
+
+        for file in files:
+            if not DataEvolutionSplitGenerator._is_blob_file(file.file_name):
+                if file.first_row_id is not None:
+                    row_id_start = file.first_row_id
+                    row_id_end = file.first_row_id + file.row_count
+                result.append(file)
+            else:
+                if file.first_row_id is not None and row_id_start != -1:
+                    if row_id_start <= file.first_row_id < row_id_end:
+                        result.append(file)
+
+        return result
diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py 
b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
index 3d07d0b6cd..e4fdb5d16b 100755
--- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
@@ -16,29 +16,26 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 """
 import os
-from collections import defaultdict
-from typing import Callable, List, Optional, Dict, Set
+from typing import List, Optional, Dict, Set
 
 from pypaimon.common.predicate import Predicate
-from pypaimon.globalindex import VectorSearchGlobalIndexResult, Range
-from pypaimon.globalindex.indexed_split import IndexedSplit
+from pypaimon.globalindex import VectorSearchGlobalIndexResult
 from pypaimon.table.source.deletion_file import DeletionFile
-from pypaimon.table.row.generic_row import GenericRow
 from pypaimon.manifest.index_manifest_file import IndexManifestFile
 from pypaimon.manifest.manifest_file_manager import ManifestFileManager
 from pypaimon.manifest.manifest_list_manager import ManifestListManager
-from pypaimon.manifest.schema.data_file_meta import DataFileMeta
 from pypaimon.manifest.schema.manifest_entry import ManifestEntry
 from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta
-from pypaimon.read.interval_partition import IntervalPartition, SortedRun
 from pypaimon.read.plan import Plan
 from pypaimon.read.push_down_utils import (trim_and_transform_predicate)
+from pypaimon.read.scanner.append_table_split_generator import 
AppendTableSplitGenerator
+from pypaimon.read.scanner.data_evolution_split_generator import 
DataEvolutionSplitGenerator
+from pypaimon.read.scanner.primary_key_table_split_generator import 
PrimaryKeyTableSplitGenerator
 from pypaimon.read.scanner.starting_scanner import StartingScanner
 from pypaimon.read.split import Split
 from pypaimon.snapshot.snapshot_manager import SnapshotManager
 from pypaimon.table.bucket_mode import BucketMode
 from pypaimon.manifest.simple_stats_evolutions import SimpleStatsEvolutions
-from pypaimon.common.options.core_options import MergeEngine
 
 
 class FullStartingScanner(StartingScanner):
@@ -75,7 +72,7 @@ class FullStartingScanner(StartingScanner):
         self.start_pos_of_this_subtask = None
         self.end_pos_of_this_subtask = None
 
-        self.only_read_real_buckets = True if options.bucket() == 
BucketMode.POSTPONE_BUCKET.value else False
+        self.only_read_real_buckets = options.bucket() == 
BucketMode.POSTPONE_BUCKET.value
         self.data_evolution = options.data_evolution_enabled()
         self.deletion_vectors_enabled = options.deletion_vectors_enabled()
 
@@ -91,17 +88,6 @@ class FullStartingScanner(StartingScanner):
         file_entries = self.plan_files()
         if not file_entries:
             return Plan([])
-
-        # Evaluate global index if enabled (for data evolution tables)
-        row_ranges = None
-        score_getter = None
-        if self.data_evolution:
-            global_index_result = self._eval_global_index()
-            if global_index_result is not None:
-                row_ranges = global_index_result.results().to_range_list()
-                if isinstance(global_index_result, 
VectorSearchGlobalIndexResult):
-                    score_getter = global_index_result.score_getter()
-
         # Get deletion files map if deletion vectors are enabled.
         # {partition-bucket -> {filename -> DeletionFile}}
         deletion_files_map: dict[tuple, dict[str, DeletionFile]] = {}
@@ -113,14 +99,46 @@ class FullStartingScanner(StartingScanner):
                 buckets.add((tuple(entry.partition.values), entry.bucket))
             deletion_files_map = self._scan_dv_index(latest_snapshot, buckets)
 
+        # Create appropriate split generator based on table type
         if self.table.is_primary_key_table:
-            splits = self._create_primary_key_splits(file_entries, 
deletion_files_map)
+            split_generator = PrimaryKeyTableSplitGenerator(
+                self.table,
+                self.target_split_size,
+                self.open_file_cost,
+                deletion_files_map
+            )
         elif self.data_evolution:
-            splits = self._create_data_evolution_splits(
-                file_entries, deletion_files_map, row_ranges, score_getter
+            global_index_result = self._eval_global_index()
+            row_ranges = None
+            score_getter = None
+            if global_index_result is not None:
+                row_ranges = global_index_result.results().to_range_list()
+                if isinstance(global_index_result, 
VectorSearchGlobalIndexResult):
+                    score_getter = global_index_result.score_getter()
+            split_generator = DataEvolutionSplitGenerator(
+                self.table,
+                self.target_split_size,
+                self.open_file_cost,
+                deletion_files_map,
+                row_ranges,
+                score_getter
             )
         else:
-            splits = self._create_append_only_splits(file_entries, 
deletion_files_map)
+            split_generator = AppendTableSplitGenerator(
+                self.table,
+                self.target_split_size,
+                self.open_file_cost,
+                deletion_files_map
+            )
+
+        # Configure sharding if needed
+        if self.idx_of_this_subtask is not None:
+            split_generator.with_shard(self.idx_of_this_subtask, 
self.number_of_para_subtasks)
+        elif self.start_pos_of_this_subtask is not None:
+            split_generator.with_slice(self.start_pos_of_this_subtask, 
self.end_pos_of_this_subtask)
+
+        # Generate splits
+        splits = split_generator.create_splits(file_entries)
 
         splits = self._apply_push_down_limit(splits)
         return Plan(splits)
@@ -196,218 +214,32 @@ class FullStartingScanner(StartingScanner):
         return result
 
     def read_manifest_entries(self, manifest_files: List[ManifestFileMeta]) -> 
List[ManifestEntry]:
-        max_workers = 
self.table.options.scan_manifest_parallelism(os.cpu_count() or 8)
-        if max_workers < 8:
-            max_workers = 8
+        max_workers = max(8, 
self.table.options.scan_manifest_parallelism(os.cpu_count() or 8))
         manifest_files = [entry for entry in manifest_files if 
self._filter_manifest_file(entry)]
-        return self.manifest_file_manager.read_entries_parallel(manifest_files,
-                                                                
self._filter_manifest_entry,
-                                                                
max_workers=max_workers)
+        return self.manifest_file_manager.read_entries_parallel(
+            manifest_files,
+            self._filter_manifest_entry,
+            max_workers=max_workers
+        )
 
-    def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 
'FullStartingScanner':
+    def with_shard(self, idx_of_this_subtask: int, number_of_para_subtasks: 
int) -> 'FullStartingScanner':
         if idx_of_this_subtask >= number_of_para_subtasks:
-            raise Exception("idx_of_this_subtask must be less than 
number_of_para_subtasks")
+            raise ValueError("idx_of_this_subtask must be less than 
number_of_para_subtasks")
         if self.start_pos_of_this_subtask is not None:
             raise Exception("with_shard and with_slice cannot be used 
simultaneously")
         self.idx_of_this_subtask = idx_of_this_subtask
         self.number_of_para_subtasks = number_of_para_subtasks
         return self
 
-    def with_slice(self, start_pos, end_pos) -> 'FullStartingScanner':
+    def with_slice(self, start_pos: int, end_pos: int) -> 
'FullStartingScanner':
         if start_pos >= end_pos:
-            raise Exception("start_pos must be less than end_pos")
+            raise ValueError("start_pos must be less than end_pos")
         if self.idx_of_this_subtask is not None:
             raise Exception("with_slice and with_shard cannot be used 
simultaneously")
         self.start_pos_of_this_subtask = start_pos
         self.end_pos_of_this_subtask = end_pos
         return self
 
-    @staticmethod
-    def _append_only_filter_by_slice(partitioned_files: defaultdict,
-                                     start_pos: int,
-                                     end_pos: int) -> (defaultdict, int, int):
-        plan_start_pos = 0
-        plan_end_pos = 0
-        entry_end_pos = 0  # end row position of current file in all data
-        splits_start_pos = 0
-        filtered_partitioned_files = defaultdict(list)
-        # Iterate through all file entries to find files that overlap with 
current shard range
-        for key, file_entries in partitioned_files.items():
-            filtered_entries = []
-            for entry in file_entries:
-                entry_begin_pos = entry_end_pos  # Starting row position of 
current file in all data
-                entry_end_pos += entry.file.row_count  # Update to row 
position after current file
-
-                # If current file is completely after shard range, stop 
iteration
-                if entry_begin_pos >= end_pos:
-                    break
-                # If current file is completely before shard range, skip it
-                if entry_end_pos <= start_pos:
-                    continue
-                if entry_begin_pos <= start_pos < entry_end_pos:
-                    splits_start_pos = entry_begin_pos
-                    plan_start_pos = start_pos - entry_begin_pos
-                # If shard end position is within current file, record 
relative end position
-                if entry_begin_pos < end_pos <= entry_end_pos:
-                    plan_end_pos = end_pos - splits_start_pos
-                # Add files that overlap with shard range to result
-                filtered_entries.append(entry)
-            if filtered_entries:
-                filtered_partitioned_files[key] = filtered_entries
-
-        return filtered_partitioned_files, plan_start_pos, plan_end_pos
-
-    def _append_only_filter_by_shard(self, partitioned_files: defaultdict) -> 
(defaultdict, int, int):
-        """
-        Filter file entries by shard. Only keep the files within the range, 
which means
-        that only the starting and ending files need to be further divided 
subsequently
-        """
-        total_row = 0
-        # Sort by file creation time to ensure consistent sharding
-        for key, file_entries in partitioned_files.items():
-            for entry in file_entries:
-                total_row += entry.file.row_count
-
-        # Calculate number of rows this shard should process using balanced 
distribution
-        # Distribute remainder evenly among first few shards to avoid last 
shard overload
-        base_rows_per_shard = total_row // self.number_of_para_subtasks
-        remainder = total_row % self.number_of_para_subtasks
-
-        # Each of the first 'remainder' shards gets one extra row
-        if self.idx_of_this_subtask < remainder:
-            num_row = base_rows_per_shard + 1
-            start_pos = self.idx_of_this_subtask * (base_rows_per_shard + 1)
-        else:
-            num_row = base_rows_per_shard
-            start_pos = (remainder * (base_rows_per_shard + 1) +
-                         (self.idx_of_this_subtask - remainder) * 
base_rows_per_shard)
-
-        end_pos = start_pos + num_row
-
-        return self._append_only_filter_by_slice(partitioned_files, start_pos, 
end_pos)
-
-    def _data_evolution_filter_by_row_range(self, partitioned_files: 
defaultdict,
-                                            start_pos: int,
-                                            end_pos: int) -> (defaultdict, 
int, int):
-        plan_start_pos = 0
-        plan_end_pos = 0
-        entry_end_pos = 0  # end row position of current file in all data
-        splits_start_pos = 0
-        filtered_partitioned_files = defaultdict(list)
-        # Iterate through all file entries to find files that overlap with 
current shard range
-        for key, file_entries in partitioned_files.items():
-            filtered_entries = []
-            blob_added = False  # If it is true, all blobs corresponding to 
this data file are added
-            for entry in file_entries:
-                if self._is_blob_file(entry.file.file_name):
-                    if blob_added:
-                        filtered_entries.append(entry)
-                    continue
-                blob_added = False
-                entry_begin_pos = entry_end_pos  # Starting row position of 
current file in all data
-                entry_end_pos += entry.file.row_count  # Update to row 
position after current file
-
-                # If current file is completely after shard range, stop 
iteration
-                if entry_begin_pos >= end_pos:
-                    break
-                # If current file is completely before shard range, skip it
-                if entry_end_pos <= start_pos:
-                    continue
-                if entry_begin_pos <= start_pos < entry_end_pos:
-                    splits_start_pos = entry_begin_pos
-                    plan_start_pos = start_pos - entry_begin_pos
-                # If shard end position is within current file, record 
relative end position
-                if entry_begin_pos < end_pos <= entry_end_pos:
-                    plan_end_pos = end_pos - splits_start_pos
-                # Add files that overlap with shard range to result
-                filtered_entries.append(entry)
-                blob_added = True
-            if filtered_entries:
-                filtered_partitioned_files[key] = filtered_entries
-
-        return filtered_partitioned_files, plan_start_pos, plan_end_pos
-
-    def _data_evolution_filter_by_shard(self, partitioned_files: defaultdict) 
-> (defaultdict, int, int):
-        total_row = 0
-        for key, file_entries in partitioned_files.items():
-            for entry in file_entries:
-                if not self._is_blob_file(entry.file.file_name):
-                    total_row += entry.file.row_count
-
-        # Calculate number of rows this shard should process using balanced 
distribution
-        # Distribute remainder evenly among first few shards to avoid last 
shard overload
-        base_rows_per_shard = total_row // self.number_of_para_subtasks
-        remainder = total_row % self.number_of_para_subtasks
-
-        # Each of the first 'remainder' shards gets one extra row
-        if self.idx_of_this_subtask < remainder:
-            num_row = base_rows_per_shard + 1
-            start_pos = self.idx_of_this_subtask * (base_rows_per_shard + 1)
-        else:
-            num_row = base_rows_per_shard
-            start_pos = (remainder * (base_rows_per_shard + 1) +
-                         (self.idx_of_this_subtask - remainder) * 
base_rows_per_shard)
-
-        end_pos = start_pos + num_row
-        return self._data_evolution_filter_by_row_range(partitioned_files, 
start_pos, end_pos)
-
-    def _compute_split_start_end_pos(self, splits: List[Split], 
plan_start_pos, plan_end_pos):
-        """
-        Find files that needs to be divided for each split
-        :param splits: splits
-        :param plan_start_pos: plan begin row in all splits data
-        :param plan_end_pos: plan end row in all splits data
-        """
-        file_end_pos = 0  # end row position of current file in all splits data
-
-        for split in splits:
-            cur_split_end_pos = file_end_pos
-            # Compute split_file_idx_map for data files
-            file_end_pos = self._compute_split_file_idx_map(plan_start_pos, 
plan_end_pos,
-                                                            split, 
cur_split_end_pos, False)
-            # Compute split_file_idx_map for blob files
-            if self.data_evolution:
-                self._compute_split_file_idx_map(plan_start_pos, plan_end_pos,
-                                                 split, cur_split_end_pos, 
True)
-
-    def _compute_split_file_idx_map(self, plan_start_pos, plan_end_pos, split: 
Split,
-                                    file_end_pos: int, is_blob: bool = False):
-        """
-        Traverse all the files in current split, find the starting shard and 
ending shard files,
-        and add them to shard_file_idx_map;
-        - for data file, only two data files will be divided in all splits.
-        - for blob file, perhaps there will be some unnecessary files in 
addition to two files(start and end).
-          Add them to shard_file_idx_map as well, because they need to be 
removed later.
-        """
-        row_cnt = 0
-        for file in split.files:
-            if not is_blob and self._is_blob_file(file.file_name):
-                continue
-            if is_blob and not self._is_blob_file(file.file_name):
-                continue
-            row_cnt += file.row_count
-            file_begin_pos = file_end_pos  # Starting row position of current 
file in all data
-            file_end_pos += file.row_count  # Update to row position after 
current file
-            if file_begin_pos <= plan_start_pos < plan_end_pos <= file_end_pos:
-                split.shard_file_idx_map[file.file_name] = (
-                    plan_start_pos - file_begin_pos, plan_end_pos - 
file_begin_pos)
-            # If shard start position is within current file, record actual 
start position and relative offset
-            elif file_begin_pos < plan_start_pos < file_end_pos:
-                split.shard_file_idx_map[file.file_name] = (plan_start_pos - 
file_begin_pos, file.row_count)
-            # If shard end position is within current file, record relative 
end position
-            elif file_begin_pos < plan_end_pos < file_end_pos:
-                split.shard_file_idx_map[file.file_name] = (0, plan_end_pos - 
file_begin_pos)
-            elif file_end_pos <= plan_start_pos or file_begin_pos >= 
plan_end_pos:
-                split.shard_file_idx_map[file.file_name] = (-1, -1)
-        return file_end_pos
-
-    def _primary_key_filter_by_shard(self, file_entries: List[ManifestEntry]) 
-> List[ManifestEntry]:
-        filtered_entries = []
-        for entry in file_entries:
-            if entry.bucket % self.number_of_para_subtasks == 
self.idx_of_this_subtask:
-                filtered_entries.append(entry)
-        return filtered_entries
-
     def _apply_push_down_limit(self, splits: List[Split]) -> List[Split]:
         if self.limit is None:
             return splits
@@ -525,368 +357,3 @@ class FullStartingScanner(StartingScanner):
             deletion_files[data_file_name] = deletion_file
 
         return deletion_files
-
-    def _get_deletion_files_for_split(self, data_files: List[DataFileMeta],
-                                      deletion_files_map: dict,
-                                      partition: GenericRow,
-                                      bucket: int) -> 
Optional[List[DeletionFile]]:
-        """
-        Get deletion files for the given data files in a split.
-        """
-        if not deletion_files_map:
-            return None
-
-        partition_key = (tuple(partition.values), bucket)
-        file_deletion_map = deletion_files_map.get(partition_key, {})
-
-        if not file_deletion_map:
-            return None
-
-        deletion_files = []
-        for data_file in data_files:
-            deletion_file = file_deletion_map.get(data_file.file_name)
-            if deletion_file:
-                deletion_files.append(deletion_file)
-            else:
-                deletion_files.append(None)
-
-        return deletion_files if any(df is not None for df in deletion_files) 
else None
-
-    def _create_append_only_splits(
-            self, file_entries: List[ManifestEntry], deletion_files_map: dict 
= None) -> List['Split']:
-        partitioned_files = defaultdict(list)
-        for entry in file_entries:
-            partitioned_files[(tuple(entry.partition.values), 
entry.bucket)].append(entry)
-
-        if self.start_pos_of_this_subtask is not None:
-            # shard data range: [plan_start_pos, plan_end_pos)
-            partitioned_files, plan_start_pos, plan_end_pos = \
-                self._append_only_filter_by_slice(partitioned_files,
-                                                  
self.start_pos_of_this_subtask,
-                                                  self.end_pos_of_this_subtask)
-        elif self.idx_of_this_subtask is not None:
-            partitioned_files, plan_start_pos, plan_end_pos = 
self._append_only_filter_by_shard(partitioned_files)
-
-        def weight_func(f: DataFileMeta) -> int:
-            return max(f.file_size, self.open_file_cost)
-
-        splits = []
-        for key, file_entries in partitioned_files.items():
-            if not file_entries:
-                return []
-
-            data_files: List[DataFileMeta] = [e.file for e in file_entries]
-
-            packed_files: List[List[DataFileMeta]] = 
self._pack_for_ordered(data_files, weight_func,
-                                                                            
self.target_split_size)
-            splits += self._build_split_from_pack(packed_files, file_entries, 
False, deletion_files_map)
-        if self.start_pos_of_this_subtask is not None or 
self.idx_of_this_subtask is not None:
-            # When files are combined into splits, it is necessary to find 
files that needs to be divided for each split
-            self._compute_split_start_end_pos(splits, plan_start_pos, 
plan_end_pos)
-        return splits
-
-    def _without_delete_row(self, data_file_meta: DataFileMeta) -> bool:
-        # null to true to be compatible with old version
-        if data_file_meta.delete_row_count is None:
-            return True
-        return data_file_meta.delete_row_count == 0
-
-    def _create_primary_key_splits(
-            self, file_entries: List[ManifestEntry], deletion_files_map: dict 
= None) -> List['Split']:
-        if self.idx_of_this_subtask is not None:
-            file_entries = self._primary_key_filter_by_shard(file_entries)
-        partitioned_files = defaultdict(list)
-        for entry in file_entries:
-            partitioned_files[(tuple(entry.partition.values), 
entry.bucket)].append(entry)
-
-        def single_weight_func(f: DataFileMeta) -> int:
-            return max(f.file_size, self.open_file_cost)
-
-        def weight_func(fl: List[DataFileMeta]) -> int:
-            return max(sum(f.file_size for f in fl), self.open_file_cost)
-
-        merge_engine = self.table.options.merge_engine()
-        merge_engine_first_row = merge_engine == MergeEngine.FIRST_ROW
-
-        splits = []
-        for key, file_entries in partitioned_files.items():
-            if not file_entries:
-                continue
-
-            data_files: List[DataFileMeta] = [e.file for e in file_entries]
-
-            raw_convertible = all(
-                f.level != 0 and self._without_delete_row(f)
-                for f in data_files
-            )
-
-            levels = {f.level for f in data_files}
-            one_level = len(levels) == 1
-
-            use_optimized_path = raw_convertible and (
-                self.deletion_vectors_enabled or merge_engine_first_row or 
one_level)
-            if use_optimized_path:
-                packed_files: List[List[DataFileMeta]] = 
self._pack_for_ordered(
-                    data_files, single_weight_func, self.target_split_size
-                )
-                splits += self._build_split_from_pack(
-                    packed_files, file_entries, True, deletion_files_map,
-                    use_optimized_path)
-            else:
-                partition_sort_runs: List[List[SortedRun]] = 
IntervalPartition(data_files).partition()
-                sections: List[List[DataFileMeta]] = [
-                    [file for s in sl for file in s.files]
-                    for sl in partition_sort_runs
-                ]
-
-                packed_files: List[List[List[DataFileMeta]]] = 
self._pack_for_ordered(sections, weight_func,
-                                                                               
       self.target_split_size)
-
-                flatten_packed_files: List[List[DataFileMeta]] = [
-                    [file for sub_pack in pack for file in sub_pack]
-                    for pack in packed_files
-                ]
-                splits += self._build_split_from_pack(
-                    flatten_packed_files, file_entries, True,
-                    deletion_files_map, False)
-        return splits
-
-    def _create_data_evolution_splits(
-            self, file_entries: List[ManifestEntry], deletion_files_map: dict 
= None,
-            row_ranges: List = None, score_getter=None) -> List['Split']:
-        def sort_key(manifest_entry: ManifestEntry) -> tuple:
-            first_row_id = manifest_entry.file.first_row_id if 
manifest_entry.file.first_row_id is not None else float(
-                '-inf')
-            is_blob = 1 if self._is_blob_file(manifest_entry.file.file_name) 
else 0
-            # For files with same firstRowId, sort by maxSequenceNumber in 
descending order
-            # (larger sequence number means more recent data)
-            max_seq = manifest_entry.file.max_sequence_number
-            return first_row_id, is_blob, -max_seq
-
-        sorted_entries = sorted(file_entries, key=sort_key)
-
-        partitioned_files = defaultdict(list)
-        for entry in sorted_entries:
-            partitioned_files[(tuple(entry.partition.values), 
entry.bucket)].append(entry)
-
-        if self.start_pos_of_this_subtask is not None:
-            # shard data range: [plan_start_pos, plan_end_pos)
-            partitioned_files, plan_start_pos, plan_end_pos = \
-                self._data_evolution_filter_by_row_range(partitioned_files,
-                                                         
self.start_pos_of_this_subtask,
-                                                         
self.end_pos_of_this_subtask)
-        elif self.idx_of_this_subtask is not None:
-            # shard data range: [plan_start_pos, plan_end_pos)
-            partitioned_files, plan_start_pos, plan_end_pos = 
self._data_evolution_filter_by_shard(partitioned_files)
-
-        def weight_func(file_list: List[DataFileMeta]) -> int:
-            return max(sum(f.file_size for f in file_list), 
self.open_file_cost)
-
-        splits = []
-        for key, sorted_entries in partitioned_files.items():
-            if not sorted_entries:
-                continue
-
-            data_files: List[DataFileMeta] = [e.file for e in sorted_entries]
-
-            # Split files by firstRowId for data evolution
-            split_by_row_id = self._split_by_row_id(data_files)
-
-            # Pack the split groups for optimal split sizes
-            packed_files: List[List[List[DataFileMeta]]] = 
self._pack_for_ordered(split_by_row_id, weight_func,
-                                                                               
   self.target_split_size)
-
-            # Flatten the packed files and build splits
-            flatten_packed_files: List[List[DataFileMeta]] = [
-                [file for sub_pack in pack for file in sub_pack]
-                for pack in packed_files
-            ]
-
-            splits += self._build_split_from_pack(flatten_packed_files, 
sorted_entries, False, deletion_files_map)
-
-        if self.start_pos_of_this_subtask is not None or 
self.idx_of_this_subtask is not None:
-            self._compute_split_start_end_pos(splits, plan_start_pos, 
plan_end_pos)
-
-        # Wrap splits with IndexedSplit if row_ranges is provided
-        if row_ranges:
-            splits = self._wrap_to_indexed_splits(splits, row_ranges, 
score_getter)
-
-        return splits
-
-    def _wrap_to_indexed_splits(
-        self,
-        splits: List['Split'],
-        row_ranges: List,
-        score_getter
-    ) -> List['Split']:
-
-        indexed_splits = []
-        for split in splits:
-            # Calculate file ranges for this split
-            file_ranges = []
-            for file in split.files:
-                first_row_id = file.first_row_id
-                if first_row_id is not None:
-                    file_ranges.append(Range(
-                        first_row_id,
-                        first_row_id + file.row_count - 1
-                    ))
-
-            if not file_ranges:
-                # No row IDs, keep original split
-                indexed_splits.append(split)
-                continue
-
-            # Merge file ranges
-            file_ranges = Range.merge_sorted_as_possible(file_ranges)
-
-            # Intersect with row_ranges from global index
-            expected = Range.and_(file_ranges, row_ranges)
-
-            if not expected:
-                # No intersection, skip this split
-                continue
-
-            # Create scores array if score_getter is provided
-            scores = None
-            if score_getter is not None:
-                scores = []
-                for r in expected:
-                    for row_id in range(r.from_, r.to + 1):
-                        score = score_getter(row_id)
-                        scores.append(score if score is not None else 0.0)
-
-            indexed_splits.append(IndexedSplit(split, expected, scores))
-
-        return indexed_splits
-
-    def _split_by_row_id(self, files: List[DataFileMeta]) -> 
List[List[DataFileMeta]]:
-        split_by_row_id = []
-
-        # Filter blob files to only include those within the row ID range of 
non-blob files
-        sorted_files = self._filter_blob(files)
-
-        # Split files by firstRowId
-        last_row_id = -1
-        check_row_id_start = 0
-        current_split = []
-
-        for file in sorted_files:
-            first_row_id = file.first_row_id
-            if first_row_id is None:
-                # Files without firstRowId are treated as individual splits
-                split_by_row_id.append([file])
-                continue
-
-            if not self._is_blob_file(file.file_name) and first_row_id != 
last_row_id:
-                if current_split:
-                    split_by_row_id.append(current_split)
-
-                # Validate that files don't overlap
-                if first_row_id < check_row_id_start:
-                    file_names = [f.file_name for f in sorted_files]
-                    raise ValueError(
-                        f"There are overlapping files in the split: 
{file_names}, "
-                        f"the wrong file is: {file.file_name}"
-                    )
-
-                current_split = []
-                last_row_id = first_row_id
-                check_row_id_start = first_row_id + file.row_count
-
-            current_split.append(file)
-
-        if current_split:
-            split_by_row_id.append(current_split)
-
-        return split_by_row_id
-
-    def _build_split_from_pack(self, packed_files, file_entries, 
for_primary_key_split: bool,
-                               deletion_files_map: dict = None, 
use_optimized_path: bool = False) -> List['Split']:
-        splits = []
-        for file_group in packed_files:
-            if use_optimized_path:
-                raw_convertible = True
-            elif for_primary_key_split:
-                raw_convertible = len(file_group) == 1 and 
self._without_delete_row(file_group[0])
-            else:
-                raw_convertible = True
-
-            file_paths = []
-            total_file_size = 0
-            total_record_count = 0
-
-            for data_file in file_group:
-                data_file.set_file_path(self.table.table_path, 
file_entries[0].partition,
-                                        file_entries[0].bucket)
-                file_paths.append(data_file.file_path)
-                total_file_size += data_file.file_size
-                total_record_count += data_file.row_count
-
-            if file_paths:
-                # Get deletion files for this split
-                data_deletion_files = None
-                if deletion_files_map:
-                    data_deletion_files = self._get_deletion_files_for_split(
-                        file_group,
-                        deletion_files_map,
-                        file_entries[0].partition,
-                        file_entries[0].bucket
-                    )
-
-                split = Split(
-                    files=file_group,
-                    partition=file_entries[0].partition,
-                    bucket=file_entries[0].bucket,
-                    file_paths=file_paths,
-                    row_count=total_record_count,
-                    file_size=total_file_size,
-                    raw_convertible=raw_convertible,
-                    data_deletion_files=data_deletion_files
-                )
-                splits.append(split)
-        return splits
-
-    @staticmethod
-    def _pack_for_ordered(items: List, weight_func: Callable, target_weight: 
int) -> List[List]:
-        packed = []
-        bin_items = []
-        bin_weight = 0
-
-        for item in items:
-            weight = weight_func(item)
-            if bin_weight + weight > target_weight and len(bin_items) > 0:
-                packed.append(list(bin_items))
-                bin_items.clear()
-                bin_weight = 0
-
-            bin_weight += weight
-            bin_items.append(item)
-
-        if len(bin_items) > 0:
-            packed.append(bin_items)
-
-        return packed
-
-    @staticmethod
-    def _is_blob_file(file_name: str) -> bool:
-        return file_name.endswith('.blob')
-
-    @staticmethod
-    def _filter_blob(files: List[DataFileMeta]) -> List[DataFileMeta]:
-        result = []
-        row_id_start = -1
-        row_id_end = -1
-
-        for file in files:
-            if not FullStartingScanner._is_blob_file(file.file_name):
-                if file.first_row_id is not None:
-                    row_id_start = file.first_row_id
-                    row_id_end = file.first_row_id + file.row_count
-                result.append(file)
-            else:
-                if file.first_row_id is not None and row_id_start != -1:
-                    if row_id_start <= file.first_row_id < row_id_end:
-                        result.append(file)
-
-        return result
diff --git 
a/paimon-python/pypaimon/read/scanner/primary_key_table_split_generator.py 
b/paimon-python/pypaimon/read/scanner/primary_key_table_split_generator.py
new file mode 100644
index 0000000000..5955b6aa94
--- /dev/null
+++ b/paimon-python/pypaimon/read/scanner/primary_key_table_split_generator.py
@@ -0,0 +1,126 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+from collections import defaultdict
+from typing import List
+
+from pypaimon.common.options.core_options import MergeEngine
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+from pypaimon.manifest.schema.manifest_entry import ManifestEntry
+from pypaimon.read.interval_partition import IntervalPartition
+from pypaimon.read.scanner.split_generator import AbstractSplitGenerator
+from pypaimon.read.split import Split
+
+
+class PrimaryKeyTableSplitGenerator(AbstractSplitGenerator):
+    """
+    Split generator for primary key tables.
+    """
+
+    def __init__(
+            self,
+            table,
+            target_split_size: int,
+            open_file_cost: int,
+            deletion_files_map=None
+    ):
+        super().__init__(table, target_split_size, open_file_cost, 
deletion_files_map)
+        self.deletion_vectors_enabled = 
table.options.deletion_vectors_enabled()
+        self.merge_engine = table.options.merge_engine()
+
+    def with_slice(self, start_pos: int, end_pos: int):
+        """Primary key tables do not support slice-based sharding."""
+        raise NotImplementedError(
+            "Primary key tables do not support with_slice(). "
+            "Use with_shard() for bucket-based parallel processing instead."
+        )
+
+    def create_splits(self, file_entries: List[ManifestEntry]) -> List[Split]:
+        """
+        Create splits for primary key tables.
+        """
+        if self.idx_of_this_subtask is not None:
+            file_entries = self._filter_by_shard(file_entries)
+
+        partitioned_files = defaultdict(list)
+        for entry in file_entries:
+            partitioned_files[(tuple(entry.partition.values), 
entry.bucket)].append(entry)
+
+        def single_weight_func(f: DataFileMeta) -> int:
+            return max(f.file_size, self.open_file_cost)
+
+        def weight_func(fl: List[DataFileMeta]) -> int:
+            return max(sum(f.file_size for f in fl), self.open_file_cost)
+
+        merge_engine_first_row = self.merge_engine == MergeEngine.FIRST_ROW
+
+        splits = []
+        for key, file_entries_list in partitioned_files.items():
+            if not file_entries_list:
+                continue
+
+            data_files: List[DataFileMeta] = [e.file for e in 
file_entries_list]
+
+            raw_convertible = all(
+                f.level != 0 and self._without_delete_row(f)
+                for f in data_files
+            )
+
+            levels = {f.level for f in data_files}
+            one_level = len(levels) == 1
+
+            use_optimized_path = raw_convertible and (
+                self.deletion_vectors_enabled or merge_engine_first_row or 
one_level
+            )
+
+            if use_optimized_path:
+                packed_files: List[List[DataFileMeta]] = 
self._pack_for_ordered(
+                    data_files, single_weight_func, self.target_split_size
+                )
+                splits += self._build_split_from_pack(
+                    packed_files, file_entries_list, True, use_optimized_path
+                )
+            else:
+                partition_sort_runs = IntervalPartition(data_files).partition()
+                sections: List[List[DataFileMeta]] = [
+                    [file for s in sl for file in s.files]
+                    for sl in partition_sort_runs
+                ]
+
+                packed_files = self._pack_for_ordered(
+                    sections, weight_func, self.target_split_size
+                )
+
+                flatten_packed_files: List[List[DataFileMeta]] = [
+                    [file for sub_pack in pack for file in sub_pack]
+                    for pack in packed_files
+                ]
+                splits += self._build_split_from_pack(
+                    flatten_packed_files, file_entries_list, True, False
+                )
+
+        return splits
+
+    def _filter_by_shard(self, file_entries: List[ManifestEntry]) -> 
List[ManifestEntry]:
+        """
+        Filter file entries by bucket-based sharding.
+        """
+        filtered_entries = []
+        for entry in file_entries:
+            if entry.bucket % self.number_of_para_subtasks == 
self.idx_of_this_subtask:
+                filtered_entries.append(entry)
+        return filtered_entries
diff --git a/paimon-python/pypaimon/read/scanner/split_generator.py 
b/paimon-python/pypaimon/read/scanner/split_generator.py
new file mode 100644
index 0000000000..8f94338900
--- /dev/null
+++ b/paimon-python/pypaimon/read/scanner/split_generator.py
@@ -0,0 +1,246 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+from abc import ABC, abstractmethod
+from typing import Callable, List, Optional, Dict, Tuple
+
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+from pypaimon.manifest.schema.manifest_entry import ManifestEntry
+from pypaimon.read.split import Split
+from pypaimon.table.row.generic_row import GenericRow
+from pypaimon.table.source.deletion_file import DeletionFile
+
+
+class AbstractSplitGenerator(ABC):
+    """
+    Abstract base class for generating splits.
+    """
+    
+    # Special key for tracking file end position in split file index map
+    NEXT_POS_KEY = '_next_pos'
+
+    def __init__(
+        self,
+        table,
+        target_split_size: int,
+        open_file_cost: int,
+        deletion_files_map: Optional[Dict] = None
+    ):
+        self.table = table
+        self.target_split_size = target_split_size
+        self.open_file_cost = open_file_cost
+        self.deletion_files_map = deletion_files_map or {}
+        
+        # Shard configuration
+        self.idx_of_this_subtask = None
+        self.number_of_para_subtasks = None
+        self.start_pos_of_this_subtask = None
+        self.end_pos_of_this_subtask = None
+
+    def with_shard(self, idx_of_this_subtask: int, number_of_para_subtasks: 
int):
+        """Configure sharding for parallel processing."""
+        if idx_of_this_subtask >= number_of_para_subtasks:
+            raise ValueError("idx_of_this_subtask must be less than 
number_of_para_subtasks")
+        if self.start_pos_of_this_subtask is not None:
+            raise ValueError("with_shard and with_slice cannot be used 
simultaneously")
+        self.idx_of_this_subtask = idx_of_this_subtask
+        self.number_of_para_subtasks = number_of_para_subtasks
+        return self
+
+    def with_slice(self, start_pos: int, end_pos: int):
+        """Configure slice range for processing."""
+        if start_pos >= end_pos:
+            raise ValueError("start_pos must be less than end_pos")
+        if self.idx_of_this_subtask is not None:
+            raise ValueError("with_slice and with_shard cannot be used 
simultaneously")
+        self.start_pos_of_this_subtask = start_pos
+        self.end_pos_of_this_subtask = end_pos
+        return self
+
+    @abstractmethod
+    def create_splits(self, file_entries: List[ManifestEntry]) -> List[Split]:
+        """
+        Create splits from manifest entries.
+        """
+        pass
+
+    def _build_split_from_pack(
+        self,
+        packed_files: List[List[DataFileMeta]],
+        file_entries: List[ManifestEntry],
+        for_primary_key_split: bool,
+        use_optimized_path: bool = False
+    ) -> List[Split]:
+        """
+        Build splits from packed files.
+        """
+        splits = []
+        for file_group in packed_files:
+            if use_optimized_path:
+                raw_convertible = True
+            elif for_primary_key_split:
+                raw_convertible = len(file_group) == 1 and 
self._without_delete_row(file_group[0])
+            else:
+                raw_convertible = True
+
+            file_paths = []
+            total_file_size = 0
+            total_record_count = 0
+
+            for data_file in file_group:
+                data_file.set_file_path(
+                    self.table.table_path,
+                    file_entries[0].partition,
+                    file_entries[0].bucket
+                )
+                file_paths.append(data_file.file_path)
+                total_file_size += data_file.file_size
+                total_record_count += data_file.row_count
+
+            if file_paths:
+                # Get deletion files for this split
+                data_deletion_files = None
+                if self.deletion_files_map:
+                    data_deletion_files = self._get_deletion_files_for_split(
+                        file_group,
+                        file_entries[0].partition,
+                        file_entries[0].bucket
+                    )
+
+                split = Split(
+                    files=file_group,
+                    partition=file_entries[0].partition,
+                    bucket=file_entries[0].bucket,
+                    file_paths=file_paths,
+                    row_count=total_record_count,
+                    file_size=total_file_size,
+                    raw_convertible=raw_convertible,
+                    data_deletion_files=data_deletion_files
+                )
+                splits.append(split)
+        return splits
+
+    def _get_deletion_files_for_split(
+        self,
+        data_files: List[DataFileMeta],
+        partition: GenericRow,
+        bucket: int
+    ) -> Optional[List[DeletionFile]]:
+        """Get deletion files for the given data files in a split."""
+        if not self.deletion_files_map:
+            return None
+
+        partition_key = (tuple(partition.values), bucket)
+        file_deletion_map = self.deletion_files_map.get(partition_key, {})
+
+        if not file_deletion_map:
+            return None
+
+        deletion_files = []
+        for data_file in data_files:
+            deletion_file = file_deletion_map.get(data_file.file_name)
+            if deletion_file:
+                deletion_files.append(deletion_file)
+            else:
+                deletion_files.append(None)
+
+        return deletion_files if any(df is not None for df in deletion_files) 
else None
+
+    @staticmethod
+    def _without_delete_row(data_file_meta: DataFileMeta) -> bool:
+        """Check if a data file has no deleted rows."""
+        if data_file_meta.delete_row_count is None:
+            return True
+        return data_file_meta.delete_row_count == 0
+
+    @staticmethod
+    def _pack_for_ordered(
+        items: List,
+        weight_func: Callable,
+        target_weight: int
+    ) -> List[List]:
+        """Pack items into groups based on target weight."""
+        packed = []
+        bin_items = []
+        bin_weight = 0
+
+        for item in items:
+            weight = weight_func(item)
+            if bin_weight + weight > target_weight and len(bin_items) > 0:
+                packed.append(list(bin_items))
+                bin_items.clear()
+                bin_weight = 0
+
+            bin_weight += weight
+            bin_items.append(item)
+
+        if len(bin_items) > 0:
+            packed.append(bin_items)
+
+        return packed
+
+    def _compute_shard_range(self, total_row: int) -> Tuple[int, int]:
+        """
+        Calculate start and end positions for this shard based on total rows.
+        Uses balanced distribution to avoid last shard overload.
+        """
+        base_rows_per_shard = total_row // self.number_of_para_subtasks
+        remainder = total_row % self.number_of_para_subtasks
+
+        # Each of the first 'remainder' shards gets one extra row
+        if self.idx_of_this_subtask < remainder:
+            num_row = base_rows_per_shard + 1
+            start_pos = self.idx_of_this_subtask * (base_rows_per_shard + 1)
+        else:
+            num_row = base_rows_per_shard
+            start_pos = (
+                remainder * (base_rows_per_shard + 1) +
+                (self.idx_of_this_subtask - remainder) * base_rows_per_shard
+            )
+
+        end_pos = start_pos + num_row
+        return start_pos, end_pos
+
+    @staticmethod
+    def _compute_file_range(
+        plan_start_pos: int,
+        plan_end_pos: int,
+        file_begin_pos: int,
+        file_row_count: int
+    ) -> Optional[Tuple[int, int]]:
+        """
+        Compute the row range to read from a file given shard range and file 
position.
+        Returns None if file is completely within shard range (no slicing 
needed).
+        Returns (-1, -1) if file is completely outside shard range.
+        """
+        file_end_pos = file_begin_pos + file_row_count
+
+        if file_begin_pos <= plan_start_pos < plan_end_pos <= file_end_pos:
+            return plan_start_pos - file_begin_pos, plan_end_pos - 
file_begin_pos
+        elif file_begin_pos < plan_start_pos < file_end_pos:
+            return plan_start_pos - file_begin_pos, file_row_count
+        elif file_begin_pos < plan_end_pos < file_end_pos:
+            return 0, plan_end_pos - file_begin_pos
+        elif file_end_pos <= plan_start_pos or file_begin_pos >= plan_end_pos:
+            return -1, -1
+        # File is completely within the shard range
+        return None
+
+    @staticmethod
+    def _is_blob_file(file_name: str) -> bool:
+        """Check if a file is a blob file."""
+        return file_name.endswith('.blob')
diff --git a/paimon-python/pypaimon/read/sliced_split.py 
b/paimon-python/pypaimon/read/sliced_split.py
new file mode 100644
index 0000000000..ff96798633
--- /dev/null
+++ b/paimon-python/pypaimon/read/sliced_split.py
@@ -0,0 +1,110 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""
+SlicedSplit wraps a Split with file index ranges for shard/slice processing.
+"""
+
+from typing import List, Dict, Tuple
+
+from pypaimon.read.split import SplitBase
+
+
+class SlicedSplit(SplitBase):
+    """
+    Wrapper for Split that adds file-level slicing information.
+    
+    This is used when a split needs to be further divided at the file level,
+    storing the start and end row indices for each file in shard_file_idx_map.
+    
+    Maps file_name -> (start_row, end_row) where:
+    - start_row: starting row index within the file (inclusive)
+    - end_row: ending row index within the file (exclusive)
+    - (-1, -1): file should be skipped entirely
+    """
+
+    def __init__(
+        self,
+        data_split: 'Split',
+        shard_file_idx_map: Dict[str, Tuple[int, int]]
+    ):
+        self._data_split = data_split
+        self._shard_file_idx_map = shard_file_idx_map
+
+    def data_split(self) -> 'Split':
+        return self._data_split
+
+    def shard_file_idx_map(self) -> Dict[str, Tuple[int, int]]:
+        return self._shard_file_idx_map
+
+    @property
+    def files(self) -> List['DataFileMeta']:
+        return self._data_split.files
+
+    @property
+    def partition(self) -> 'GenericRow':
+        return self._data_split.partition
+
+    @property
+    def bucket(self) -> int:
+        return self._data_split.bucket
+
+    @property
+    def row_count(self) -> int:
+        if not self._shard_file_idx_map:
+            return self._data_split.row_count
+        
+        total_rows = 0
+        for file in self._data_split.files:
+            if file.file_name in self._shard_file_idx_map:
+                start, end = self._shard_file_idx_map[file.file_name]
+                if start != -1 and end != -1:
+                    total_rows += (end - start)
+            else:
+                total_rows += file.row_count
+        
+        return total_rows
+
+    @property
+    def file_paths(self):
+        return self._data_split.file_paths
+
+    @property
+    def file_size(self):
+        return self._data_split.file_size
+
+    @property
+    def raw_convertible(self):
+        return self._data_split.raw_convertible
+
+    @property
+    def data_deletion_files(self):
+        return self._data_split.data_deletion_files
+
+    def __eq__(self, other):
+        if not isinstance(other, SlicedSplit):
+            return False
+        return (self._data_split == other._data_split and
+                self._shard_file_idx_map == other._shard_file_idx_map)
+
+    def __hash__(self):
+        return hash((id(self._data_split), 
tuple(sorted(self._shard_file_idx_map.items()))))
+
+    def __repr__(self):
+        return (f"SlicedSplit(data_split={self._data_split}, "
+                f"shard_file_idx_map={self._shard_file_idx_map})")
diff --git a/paimon-python/pypaimon/read/split.py 
b/paimon-python/pypaimon/read/split.py
index f5357b37ea..3daccb75f3 100644
--- a/paimon-python/pypaimon/read/split.py
+++ b/paimon-python/pypaimon/read/split.py
@@ -17,7 +17,7 @@
 
################################################################################
 
 from abc import ABC, abstractmethod
-from typing import List, Optional, Dict, Tuple
+from typing import List, Optional
 
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
 from pypaimon.table.row.generic_row import GenericRow
@@ -71,7 +71,6 @@ class Split(SplitBase):
         file_paths: List[str],
         row_count: int,
         file_size: int,
-        shard_file_idx_map: Optional[Dict[str, Tuple[int, int]]] = None,
         raw_convertible: bool = False,
         data_deletion_files: Optional[List[DeletionFile]] = None
     ):
@@ -81,7 +80,6 @@ class Split(SplitBase):
         self._file_paths = file_paths
         self._row_count = row_count
         self._file_size = file_size
-        self.shard_file_idx_map = shard_file_idx_map or {}
         self.raw_convertible = raw_convertible
         self.data_deletion_files = data_deletion_files
 
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index 5df6ccb96e..47edf63d9a 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -51,6 +51,7 @@ from pypaimon.read.reader.key_value_wrap_reader import 
KeyValueWrapReader
 from pypaimon.read.reader.shard_batch_reader import ShardBatchReader
 from pypaimon.read.reader.sort_merge_reader import SortMergeReaderWithMinHeap
 from pypaimon.read.split import Split
+from pypaimon.read.sliced_split import SlicedSplit
 from pypaimon.schema.data_types import DataField
 from pypaimon.table.special_fields import SpecialFields
 
@@ -333,8 +334,12 @@ class RawFileSplitRead(SplitRead):
     def raw_reader_supplier(self, file: DataFileMeta, dv_factory: 
Optional[Callable] = None) -> Optional[RecordReader]:
         read_fields = self._get_final_read_data_fields()
         # If the current file needs to be further divided for reading, use 
ShardBatchReader
-        if file.file_name in self.split.shard_file_idx_map:
-            (start_pos, end_pos) = 
self.split.shard_file_idx_map[file.file_name]
+        # Check if this is a SlicedSplit to get shard_file_idx_map
+        shard_file_idx_map = (
+            self.split.shard_file_idx_map() if isinstance(self.split, 
SlicedSplit) else {}
+        )
+        if file.file_name in shard_file_idx_map:
+            (start_pos, end_pos) = shard_file_idx_map[file.file_name]
             if (start_pos, end_pos) == (-1, -1):
                 return None
             else:
@@ -567,8 +572,12 @@ class DataEvolutionSplitRead(SplitRead):
     def _create_file_reader(self, file: DataFileMeta, read_fields: [str]) -> 
Optional[RecordReader]:
         """Create a file reader for a single file."""
         # If the current file needs to be further divided for reading, use 
ShardBatchReader
-        if file.file_name in self.split.shard_file_idx_map:
-            (begin_pos, end_pos) = 
self.split.shard_file_idx_map[file.file_name]
+        # Check if this is a SlicedSplit to get shard_file_idx_map
+        shard_file_idx_map = (
+            self.split.shard_file_idx_map() if isinstance(self.split, 
SlicedSplit) else {}
+        )
+        if file.file_name in shard_file_idx_map:
+            (begin_pos, end_pos) = shard_file_idx_map[file.file_name]
             if (begin_pos, end_pos) == (-1, -1):
                 return None
             else:

Reply via email to