This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c1415be9b [python] Refactor to File Scanner and add 
_filter_manifest_files_by_row_ranges (#7177)
6c1415be9b is described below

commit 6c1415be9b025156e6e815172374da92318bf200
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Feb 2 22:06:18 2026 +0800

    [python] Refactor to File Scanner and add 
_filter_manifest_files_by_row_ranges (#7177)
    
    1. Simplify StartingScanner to FileScanner, just one class.
    2. Introduce `_filter_manifest_files_by_row_ranges` to accelerate query 
with global index.
---
 paimon-python/pypaimon/globalindex/__init__.py     |   8 +-
 .../globalindex/faiss/faiss_vector_reader.py       |   4 +-
 .../pypaimon/globalindex/vector_search_result.py   |  20 +--
 .../pypaimon/manifest/manifest_file_manager.py     |  16 +--
 .../pypaimon/manifest/manifest_list_manager.py     |   8 +-
 .../pypaimon/manifest/schema/manifest_file_meta.py |   5 +
 .../read/scanner/empty_starting_scanner.py         |  25 ----
 .../{full_starting_scanner.py => file_scanner.py}  | 146 +++++++++++++++------
 .../read/scanner/incremental_starting_scanner.py   |  69 ----------
 .../pypaimon/read/scanner/starting_scanner.py      |  28 ----
 paimon-python/pypaimon/read/table_scan.py          |  66 +++++++---
 paimon-python/pypaimon/tests/binary_row_test.py    |  42 +++---
 .../pypaimon/tests/py36/rest_ao_read_write_test.py |   6 +-
 paimon-python/pypaimon/tests/reader_base_test.py   |  28 ++--
 .../pypaimon/tests/reader_primary_key_test.py      |   4 +-
 .../pypaimon/tests/schema_evolution_read_test.py   |   6 +-
 paimon-python/pypaimon/write/file_store_commit.py  |   4 +-
 17 files changed, 232 insertions(+), 253 deletions(-)

diff --git a/paimon-python/pypaimon/globalindex/__init__.py 
b/paimon-python/pypaimon/globalindex/__init__.py
index 529b90c478..ae8e6328d1 100644
--- a/paimon-python/pypaimon/globalindex/__init__.py
+++ b/paimon-python/pypaimon/globalindex/__init__.py
@@ -20,8 +20,8 @@ from pypaimon.globalindex.global_index_result import 
GlobalIndexResult
 from pypaimon.globalindex.global_index_reader import GlobalIndexReader, 
FieldRef
 from pypaimon.globalindex.vector_search import VectorSearch
 from pypaimon.globalindex.vector_search_result import (
-    VectorSearchGlobalIndexResult,
-    DictBasedVectorSearchResult,
+    ScoredGlobalIndexResult,
+    DictBasedScoredIndexResult,
     ScoreGetter,
 )
 from pypaimon.globalindex.global_index_meta import GlobalIndexMeta, 
GlobalIndexIOMeta
@@ -38,8 +38,8 @@ __all__ = [
     'GlobalIndexReader',
     'FieldRef',
     'VectorSearch',
-    'VectorSearchGlobalIndexResult',
-    'DictBasedVectorSearchResult',
+    'ScoredGlobalIndexResult',
+    'DictBasedScoredIndexResult',
     'ScoreGetter',
     'GlobalIndexMeta',
     'GlobalIndexIOMeta',
diff --git a/paimon-python/pypaimon/globalindex/faiss/faiss_vector_reader.py 
b/paimon-python/pypaimon/globalindex/faiss/faiss_vector_reader.py
index c2460d9b9e..3201894d27 100644
--- a/paimon-python/pypaimon/globalindex/faiss/faiss_vector_reader.py
+++ b/paimon-python/pypaimon/globalindex/faiss/faiss_vector_reader.py
@@ -30,7 +30,7 @@ import numpy as np
 from pypaimon.globalindex.global_index_reader import GlobalIndexReader
 from pypaimon.globalindex.global_index_result import GlobalIndexResult
 from pypaimon.globalindex.global_index_meta import GlobalIndexIOMeta
-from pypaimon.globalindex.vector_search_result import 
DictBasedVectorSearchResult
+from pypaimon.globalindex.vector_search_result import 
DictBasedScoredIndexResult
 from pypaimon.globalindex.roaring_bitmap import RoaringBitmap64
 from pypaimon.globalindex.faiss.faiss_options import (
     FaissVectorIndexOptions,
@@ -170,7 +170,7 @@ class FaissVectorGlobalIndexReader(GlobalIndexReader):
         if not results:
             return None
         
-        return DictBasedVectorSearchResult(results)
+        return DictBasedScoredIndexResult(results)
 
     def _configure_search_params(self, index: FaissIndex) -> None:
         """Configure search parameters based on index type."""
diff --git a/paimon-python/pypaimon/globalindex/vector_search_result.py 
b/paimon-python/pypaimon/globalindex/vector_search_result.py
index 62ccb85502..e4fdcd6cc8 100644
--- a/paimon-python/pypaimon/globalindex/vector_search_result.py
+++ b/paimon-python/pypaimon/globalindex/vector_search_result.py
@@ -29,7 +29,7 @@ from pypaimon.globalindex.roaring_bitmap import 
RoaringBitmap64
 ScoreGetter = Callable[[int], Optional[float]]
 
 
-class VectorSearchGlobalIndexResult(GlobalIndexResult):
+class ScoredGlobalIndexResult(GlobalIndexResult):
     """
     Vector search global index result for vector index.
     
@@ -41,7 +41,7 @@ class VectorSearchGlobalIndexResult(GlobalIndexResult):
         """Returns a function to get the score for a given row ID."""
         pass
 
-    def offset(self, offset: int) -> 'VectorSearchGlobalIndexResult':
+    def offset(self, offset: int) -> 'ScoredGlobalIndexResult':
         """Returns a new result with row IDs offset by the given amount."""
         if offset == 0:
             return self
@@ -53,14 +53,14 @@ class VectorSearchGlobalIndexResult(GlobalIndexResult):
         for row_id in bitmap:
             offset_bitmap.add(row_id + offset)
         
-        return SimpleVectorSearchGlobalIndexResult(
+        return SimpleScoredGlobalIndexResult(
             offset_bitmap,
             lambda row_id: this_score_getter(row_id - offset)
         )
 
     def or_(self, other: GlobalIndexResult) -> GlobalIndexResult:
         """Returns the union of this result and the other result."""
-        if not isinstance(other, VectorSearchGlobalIndexResult):
+        if not isinstance(other, ScoredGlobalIndexResult):
             return super().or_(other)
         
         this_row_ids = self.results()
@@ -76,18 +76,18 @@ class VectorSearchGlobalIndexResult(GlobalIndexResult):
                 return this_score_getter(row_id)
             return other_score_getter(row_id)
         
-        return SimpleVectorSearchGlobalIndexResult(result_or, 
combined_score_getter)
+        return SimpleScoredGlobalIndexResult(result_or, combined_score_getter)
 
     @staticmethod
     def create(
         supplier: Callable[[], RoaringBitmap64],
         score_getter: ScoreGetter
-    ) -> 'VectorSearchGlobalIndexResult':
+    ) -> 'ScoredGlobalIndexResult':
         """Creates a new VectorSearchGlobalIndexResult from supplier."""
-        return LazyVectorSearchGlobalIndexResult(supplier, score_getter)
+        return LazyScoredGlobalIndexResult(supplier, score_getter)
 
 
-class SimpleVectorSearchGlobalIndexResult(VectorSearchGlobalIndexResult):
+class SimpleScoredGlobalIndexResult(ScoredGlobalIndexResult):
     """Simple implementation of VectorSearchGlobalIndexResult."""
 
     def __init__(self, bitmap: RoaringBitmap64, score_getter_fn: ScoreGetter):
@@ -101,7 +101,7 @@ class 
SimpleVectorSearchGlobalIndexResult(VectorSearchGlobalIndexResult):
         return self._score_getter_fn
 
 
-class LazyVectorSearchGlobalIndexResult(VectorSearchGlobalIndexResult):
+class LazyScoredGlobalIndexResult(ScoredGlobalIndexResult):
     """Lazy implementation of VectorSearchGlobalIndexResult."""
 
     def __init__(self, supplier: Callable[[], RoaringBitmap64], 
score_getter_fn: ScoreGetter):
@@ -118,7 +118,7 @@ class 
LazyVectorSearchGlobalIndexResult(VectorSearchGlobalIndexResult):
         return self._score_getter_fn
 
 
-class DictBasedVectorSearchResult(VectorSearchGlobalIndexResult):
+class DictBasedScoredIndexResult(ScoredGlobalIndexResult):
     """Vector search result backed by a dictionary of row_id -> score."""
 
     def __init__(self, id_to_scores: Dict[int, float]):
diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py 
b/paimon-python/pypaimon/manifest/manifest_file_manager.py
index f6ae41e3d3..0ed5091825 100644
--- a/paimon-python/pypaimon/manifest/manifest_file_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py
@@ -53,15 +53,15 @@ class ManifestFileManager:
         def _process_single_manifest(manifest_file: ManifestFileMeta) -> 
List[ManifestEntry]:
             return self.read(manifest_file.file_name, manifest_entry_filter, 
drop_stats)
 
-        def _entry_identifier(entry: ManifestEntry) -> tuple:
+        def _entry_identifier(e: ManifestEntry) -> tuple:
             return (
-                tuple(entry.partition.values),
-                entry.bucket,
-                entry.file.level,
-                entry.file.file_name,
-                tuple(entry.file.extra_files) if entry.file.extra_files else 
(),
-                entry.file.embedded_index,
-                entry.file.external_path,
+                tuple(e.partition.values),
+                e.bucket,
+                e.file.level,
+                e.file.file_name,
+                tuple(e.file.extra_files) if e.file.extra_files else (),
+                e.file.embedded_index,
+                e.file.external_path,
             )
 
         deleted_entry_keys = set()
diff --git a/paimon-python/pypaimon/manifest/manifest_list_manager.py 
b/paimon-python/pypaimon/manifest/manifest_list_manager.py
index a1897fbee7..0bf9931630 100644
--- a/paimon-python/pypaimon/manifest/manifest_list_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_list_manager.py
@@ -17,7 +17,7 @@
 
################################################################################
 
 from io import BytesIO
-from typing import List
+from typing import List, Optional
 
 import fastavro
 
@@ -40,7 +40,9 @@ class ManifestListManager:
         self.manifest_path = f"{manifest_path}/manifest"
         self.file_io = self.table.file_io
 
-    def read_all(self, snapshot: Snapshot) -> List[ManifestFileMeta]:
+    def read_all(self, snapshot: Optional[Snapshot]) -> List[ManifestFileMeta]:
+        if snapshot is None:
+            return []
         manifest_files = []
         base_manifests = self.read(snapshot.base_manifest_list)
         manifest_files.extend(base_manifests)
@@ -79,6 +81,8 @@ class ManifestListManager:
                 num_deleted_files=record['_NUM_DELETED_FILES'],
                 partition_stats=partition_stats,
                 schema_id=record['_SCHEMA_ID'],
+                min_row_id=record['_MIN_ROW_ID'],
+                max_row_id=record['_MAX_ROW_ID'],
             )
             manifest_files.append(manifest_file_meta)
 
diff --git a/paimon-python/pypaimon/manifest/schema/manifest_file_meta.py 
b/paimon-python/pypaimon/manifest/schema/manifest_file_meta.py
index a830be6cc2..ed42dd7ecd 100644
--- a/paimon-python/pypaimon/manifest/schema/manifest_file_meta.py
+++ b/paimon-python/pypaimon/manifest/schema/manifest_file_meta.py
@@ -18,6 +18,7 @@
 
 from dataclasses import dataclass
 
+from typing import Optional
 from pypaimon.manifest.schema.simple_stats import (PARTITION_STATS_SCHEMA,
                                                    SimpleStats)
 
@@ -31,6 +32,8 @@ class ManifestFileMeta:
     partition_stats: SimpleStats
     schema_id: int
 
+    min_row_id: Optional[int] = None
+    max_row_id: Optional[int] = None
 
 MANIFEST_FILE_META_SCHEMA = {
     "type": "record",
@@ -43,5 +46,7 @@ MANIFEST_FILE_META_SCHEMA = {
         {"name": "_NUM_DELETED_FILES", "type": "long"},
         {"name": "_PARTITION_STATS", "type": PARTITION_STATS_SCHEMA},
         {"name": "_SCHEMA_ID", "type": "long"},
+        {"name": "_MIN_ROW_ID", "type": ["null", "long"], "default": None},
+        {"name": "_MAX_ROW_ID", "type": ["null", "long"], "default": None},
     ]
 }
diff --git a/paimon-python/pypaimon/read/scanner/empty_starting_scanner.py 
b/paimon-python/pypaimon/read/scanner/empty_starting_scanner.py
deleted file mode 100644
index 9c870aa588..0000000000
--- a/paimon-python/pypaimon/read/scanner/empty_starting_scanner.py
+++ /dev/null
@@ -1,25 +0,0 @@
-"""
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-"""
-from pypaimon.read.plan import Plan
-from pypaimon.read.scanner.starting_scanner import StartingScanner
-
-
-class EmptyStartingScanner(StartingScanner):
-
-    def scan(self) -> Plan:
-        return Plan([])
diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py 
b/paimon-python/pypaimon/read/scanner/file_scanner.py
similarity index 76%
rename from paimon-python/pypaimon/read/scanner/full_starting_scanner.py
rename to paimon-python/pypaimon/read/scanner/file_scanner.py
index f415ab61ef..0f53ef0cd3 100755
--- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/file_scanner.py
@@ -16,10 +16,10 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 """
 import os
-from typing import List, Optional, Dict, Set
+from typing import List, Optional, Dict, Set, Callable
 
 from pypaimon.common.predicate import Predicate
-from pypaimon.globalindex import VectorSearchGlobalIndexResult
+from pypaimon.globalindex import ScoredGlobalIndexResult
 from pypaimon.table.source.deletion_file import DeletionFile
 from pypaimon.manifest.index_manifest_file import IndexManifestFile
 from pypaimon.manifest.manifest_file_manager import ManifestFileManager
@@ -31,24 +31,73 @@ from pypaimon.read.push_down_utils import 
(trim_and_transform_predicate)
 from pypaimon.read.scanner.append_table_split_generator import 
AppendTableSplitGenerator
 from pypaimon.read.scanner.data_evolution_split_generator import 
DataEvolutionSplitGenerator
 from pypaimon.read.scanner.primary_key_table_split_generator import 
PrimaryKeyTableSplitGenerator
-from pypaimon.read.scanner.starting_scanner import StartingScanner
 from pypaimon.read.split import DataSplit
 from pypaimon.snapshot.snapshot_manager import SnapshotManager
 from pypaimon.table.bucket_mode import BucketMode
 from pypaimon.manifest.simple_stats_evolutions import SimpleStatsEvolutions
 
 
-class FullStartingScanner(StartingScanner):
+def _filter_manifest_files_by_row_ranges(
+        manifest_files: List[ManifestFileMeta],
+        row_ranges: List) -> List[ManifestFileMeta]:
+    """
+    Filter manifest files by row ranges.
+
+    Only keep manifest files that have min_row_id and max_row_id and overlap 
with the given row ranges.
+
+    Args:
+        manifest_files: List of manifest file metadata
+        row_ranges: List of row ranges to filter by
+
+    Returns:
+        Filtered list of manifest files
+    """
+    from pypaimon.globalindex.range import Range
+
+    filtered_files = []
+    for manifest in manifest_files:
+        min_row_id = manifest.min_row_id
+        max_row_id = manifest.max_row_id
+
+        # If min_row_id or max_row_id is None, we cannot filter, keep the file
+        if min_row_id is None or max_row_id is None:
+            filtered_files.append(manifest)
+            continue
+
+        # Check if manifest row range overlaps with any of the expected row 
ranges
+        manifest_row_range = Range(min_row_id, max_row_id)
+        should_keep = False
+
+        for expected_range in row_ranges:
+            # Check if ranges intersect
+            intersect = Range.intersect(
+                manifest_row_range.from_,
+                manifest_row_range.to,
+                expected_range.from_,
+                expected_range.to)
+            if intersect:
+                should_keep = True
+                break
+
+        if should_keep:
+            filtered_files.append(manifest)
+
+    return filtered_files
+
+
+class FileScanner:
     def __init__(
         self,
         table,
-        predicate: Optional[Predicate],
-        limit: Optional[int],
+        manifest_scanner: Callable[[], List[ManifestFileMeta]],
+        predicate: Optional[Predicate] = None,
+        limit: Optional[int] = None,
         vector_search: Optional['VectorSearch'] = None
     ):
         from pypaimon.table.file_store_table import FileStoreTable
 
         self.table: FileStoreTable = table
+        self.manifest_scanner = manifest_scanner
         self.predicate = predicate
         self.limit = limit
         self.vector_search = vector_search
@@ -84,53 +133,39 @@ class FullStartingScanner(StartingScanner):
             self.table.table_schema.id
         )
 
-    def scan(self) -> Plan:
-        file_entries = self.plan_files()
-        if not file_entries:
-            return Plan([])
-        # Get deletion files map if deletion vectors are enabled.
-        # {partition-bucket -> {filename -> DeletionFile}}
-        deletion_files_map: dict[tuple, dict[str, DeletionFile]] = {}
-        if self.deletion_vectors_enabled:
-            latest_snapshot = self.snapshot_manager.get_latest_snapshot()
-            # Extract unique partition-bucket pairs from file entries
-            buckets = set()
-            for entry in file_entries:
-                buckets.add((tuple(entry.partition.values), entry.bucket))
-            deletion_files_map = self._scan_dv_index(latest_snapshot, buckets)
+    def _deletion_files_map(self, entries: List[ManifestEntry]) -> Dict[tuple, 
Dict[str, DeletionFile]]:
+        if not self.deletion_vectors_enabled:
+            return {}
+        # Extract unique partition-bucket pairs from file entries
+        bucket_files = set()
+        for e in entries:
+            bucket_files.add((tuple(e.partition.values), e.bucket))
+        return 
self._scan_dv_index(self.snapshot_manager.get_latest_snapshot(), bucket_files)
 
+    def scan(self) -> Plan:
         # Create appropriate split generator based on table type
         if self.table.is_primary_key_table:
+            entries = self.plan_files()
             split_generator = PrimaryKeyTableSplitGenerator(
                 self.table,
                 self.target_split_size,
                 self.open_file_cost,
-                deletion_files_map
+                self._deletion_files_map(entries)
             )
         elif self.data_evolution:
-            global_index_result = self._eval_global_index()
-            row_ranges = None
-            score_getter = None
-            if global_index_result is not None:
-                row_ranges = global_index_result.results().to_range_list()
-                if isinstance(global_index_result, 
VectorSearchGlobalIndexResult):
-                    score_getter = global_index_result.score_getter()
-            split_generator = DataEvolutionSplitGenerator(
-                self.table,
-                self.target_split_size,
-                self.open_file_cost,
-                deletion_files_map,
-                row_ranges,
-                score_getter
-            )
+            entries, split_generator = 
self._create_data_evolution_split_generator()
         else:
+            entries = self.plan_files()
             split_generator = AppendTableSplitGenerator(
                 self.table,
                 self.target_split_size,
                 self.open_file_cost,
-                deletion_files_map
+                self._deletion_files_map(entries)
             )
 
+        if not entries:
+            return Plan([])
+
         # Configure sharding if needed
         if self.idx_of_this_subtask is not None:
             split_generator.with_shard(self.idx_of_this_subtask, 
self.number_of_para_subtasks)
@@ -138,16 +173,41 @@ class FullStartingScanner(StartingScanner):
             split_generator.with_slice(self.start_pos_of_this_subtask, 
self.end_pos_of_this_subtask)
 
         # Generate splits
-        splits = split_generator.create_splits(file_entries)
+        splits = split_generator.create_splits(entries)
 
         splits = self._apply_push_down_limit(splits)
         return Plan(splits)
 
+    def _create_data_evolution_split_generator(self):
+        row_ranges = None
+        score_getter = None
+        global_index_result = self._eval_global_index()
+        if global_index_result is not None:
+            row_ranges = global_index_result.results().to_range_list()
+            if isinstance(global_index_result, ScoredGlobalIndexResult):
+                score_getter = global_index_result.score_getter()
+
+        manifest_files = self.manifest_scanner()
+
+        # Filter manifest files by row ranges if available
+        if row_ranges is not None:
+            manifest_files = 
_filter_manifest_files_by_row_ranges(manifest_files, row_ranges)
+
+        entries = self.read_manifest_entries(manifest_files)
+
+        return entries, DataEvolutionSplitGenerator(
+            self.table,
+            self.target_split_size,
+            self.open_file_cost,
+            self._deletion_files_map(entries),
+            row_ranges,
+            score_getter
+        )
+
     def plan_files(self) -> List[ManifestEntry]:
-        latest_snapshot = self.snapshot_manager.get_latest_snapshot()
-        if not latest_snapshot:
+        manifest_files = self.manifest_scanner()
+        if len(manifest_files) == 0:
             return []
-        manifest_files = self.manifest_list_manager.read_all(latest_snapshot)
         return self.read_manifest_entries(manifest_files)
 
     def _eval_global_index(self):
@@ -222,7 +282,7 @@ class FullStartingScanner(StartingScanner):
             max_workers=max_workers
         )
 
-    def with_shard(self, idx_of_this_subtask: int, number_of_para_subtasks: 
int) -> 'FullStartingScanner':
+    def with_shard(self, idx_of_this_subtask: int, number_of_para_subtasks: 
int) -> 'FileScanner':
         if idx_of_this_subtask >= number_of_para_subtasks:
             raise ValueError("idx_of_this_subtask must be less than 
number_of_para_subtasks")
         if self.start_pos_of_this_subtask is not None:
@@ -231,7 +291,7 @@ class FullStartingScanner(StartingScanner):
         self.number_of_para_subtasks = number_of_para_subtasks
         return self
 
-    def with_slice(self, start_pos: int, end_pos: int) -> 
'FullStartingScanner':
+    def with_slice(self, start_pos: int, end_pos: int) -> 'FileScanner':
         if start_pos >= end_pos:
             raise ValueError("start_pos must be less than end_pos")
         if self.idx_of_this_subtask is not None:
diff --git 
a/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py 
b/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py
deleted file mode 100644
index 56c186188a..0000000000
--- a/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py
+++ /dev/null
@@ -1,69 +0,0 @@
-"""
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-"""
-from typing import List, Optional
-
-from pypaimon.common.predicate import Predicate
-from pypaimon.manifest.schema.manifest_entry import ManifestEntry
-from pypaimon.read.scanner.full_starting_scanner import FullStartingScanner
-from pypaimon.snapshot.snapshot_manager import SnapshotManager
-
-
-class IncrementalStartingScanner(FullStartingScanner):
-    def __init__(self, table, predicate: Optional[Predicate], limit: 
Optional[int],
-                 start: int, end: int):
-        super().__init__(table, predicate, limit)
-        self.startingSnapshotId = start
-        self.endingSnapshotId = end
-
-    def plan_files(self) -> List[ManifestEntry]:
-        snapshots_in_range = []
-        for snapshot_id in range(self.startingSnapshotId + 1, 
self.endingSnapshotId + 1):
-            snapshot = self.snapshot_manager.get_snapshot_by_id(snapshot_id)
-            if snapshot.commit_kind == "APPEND":
-                snapshots_in_range.append(snapshot)
-
-        # Collect all file entries from all snapshots in range
-        file_entries = []
-
-        for snapshot in snapshots_in_range:
-            manifest_files = self.manifest_list_manager.read_delta(snapshot)
-            entries = self.read_manifest_entries(manifest_files)
-            file_entries.extend(entries)
-        return file_entries
-
-    @staticmethod
-    def between_timestamps(table, predicate: Optional[Predicate], limit: 
Optional[int],
-                           start_timestamp: int, end_timestamp: int) -> 
'IncrementalStartingScanner':
-        """
-        Create an IncrementalStartingScanner for snapshots between two 
timestamps.
-        """
-        snapshot_manager = SnapshotManager(table)
-        starting_snapshot = 
snapshot_manager.earlier_or_equal_time_mills(start_timestamp)
-        earliest_snapshot = snapshot_manager.try_get_earliest_snapshot()
-
-        # If earliest_snapshot.time_millis > start_timestamp we should include 
the earliest_snapshot
-        if starting_snapshot is None or (earliest_snapshot and 
earliest_snapshot.time_millis > start_timestamp):
-            start_id = earliest_snapshot.id - 1 if earliest_snapshot else -1
-        else:
-            start_id = starting_snapshot.id
-
-        end_snapshot = 
snapshot_manager.earlier_or_equal_time_mills(end_timestamp)
-        latest_snapshot = snapshot_manager.get_latest_snapshot()
-        end_id = end_snapshot.id if end_snapshot else (latest_snapshot.id if 
latest_snapshot else -1)
-
-        return IncrementalStartingScanner(table, predicate, limit, start_id, 
end_id)
diff --git a/paimon-python/pypaimon/read/scanner/starting_scanner.py 
b/paimon-python/pypaimon/read/scanner/starting_scanner.py
deleted file mode 100644
index 7e6cdfd81a..0000000000
--- a/paimon-python/pypaimon/read/scanner/starting_scanner.py
+++ /dev/null
@@ -1,28 +0,0 @@
-"""
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-"""
-from abc import ABC, abstractmethod
-
-from pypaimon.read.plan import Plan
-
-
-class StartingScanner(ABC):
-    """Helper class for the first planning of TableScan."""
-
-    @abstractmethod
-    def scan(self) -> Plan:
-        """Plan the files to read."""
diff --git a/paimon-python/pypaimon/read/table_scan.py 
b/paimon-python/pypaimon/read/table_scan.py
index 8276163450..cca81998b2 100755
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -22,12 +22,9 @@ from pypaimon.common.options.core_options import CoreOptions
 from pypaimon.common.predicate import Predicate
 
 from pypaimon.read.plan import Plan
-from pypaimon.read.scanner.empty_starting_scanner import EmptyStartingScanner
-from pypaimon.read.scanner.full_starting_scanner import FullStartingScanner
-from pypaimon.read.scanner.incremental_starting_scanner import \
-    IncrementalStartingScanner
-from pypaimon.read.scanner.starting_scanner import StartingScanner
+from pypaimon.read.scanner.file_scanner import FileScanner
 from pypaimon.snapshot.snapshot_manager import SnapshotManager
+from pypaimon.manifest.manifest_list_manager import ManifestListManager
 
 if TYPE_CHECKING:
     from pypaimon.globalindex.vector_search import VectorSearch
@@ -49,23 +46,25 @@ class TableScan:
         self.predicate = predicate
         self.limit = limit
         self.vector_search = vector_search
-        self.starting_scanner = self._create_starting_scanner()
+        self.file_scanner = self._create_file_scanner()
 
     def plan(self) -> Plan:
-        return self.starting_scanner.scan()
+        return self.file_scanner.scan()
 
-    def _create_starting_scanner(self) -> Optional[StartingScanner]:
+    def _create_file_scanner(self) -> FileScanner:
         options = self.table.options.options
+        snapshot_manager = SnapshotManager(self.table)
+        manifest_list_manager = ManifestListManager(self.table)
         if options.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP):
             ts = 
options.get(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP).split(",")
             if len(ts) != 2:
                 raise ValueError(
                     "The incremental-between-timestamp must specific 
start(exclusive) and end timestamp. But is: " +
                     options.get(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP))
-            earliest_snapshot = 
SnapshotManager(self.table).try_get_earliest_snapshot()
-            latest_snapshot = SnapshotManager(self.table).get_latest_snapshot()
+            earliest_snapshot = snapshot_manager.try_get_earliest_snapshot()
+            latest_snapshot = snapshot_manager.get_latest_snapshot()
             if earliest_snapshot is None or latest_snapshot is None:
-                return EmptyStartingScanner()
+                return FileScanner(self.table, lambda: [])
             start_timestamp = int(ts[0])
             end_timestamp = int(ts[1])
             if start_timestamp >= end_timestamp:
@@ -73,20 +72,53 @@ class TableScan:
                     "Ending timestamp %s should be >= starting timestamp %s." 
% (end_timestamp, start_timestamp))
             if (start_timestamp == end_timestamp or start_timestamp > 
latest_snapshot.time_millis
                     or end_timestamp < earliest_snapshot.time_millis):
-                return EmptyStartingScanner()
-            return IncrementalStartingScanner.between_timestamps(self.table, 
self.predicate, self.limit,
-                                                                 
start_timestamp, end_timestamp)
-        return FullStartingScanner(
+                return FileScanner(self.table, lambda: [])
+
+            starting_snapshot = 
snapshot_manager.earlier_or_equal_time_mills(start_timestamp)
+            earliest_snapshot = snapshot_manager.try_get_earliest_snapshot()
+
+            # If earliest_snapshot.time_millis > start_timestamp we should 
include the earliest_snapshot
+            if starting_snapshot is None or (earliest_snapshot and 
earliest_snapshot.time_millis > start_timestamp):
+                start_id = earliest_snapshot.id - 1 if earliest_snapshot else 
-1
+            else:
+                start_id = starting_snapshot.id
+
+            end_snapshot = 
snapshot_manager.earlier_or_equal_time_mills(end_timestamp)
+            latest_snapshot = snapshot_manager.get_latest_snapshot()
+            end_id = end_snapshot.id if end_snapshot else (latest_snapshot.id 
if latest_snapshot else -1)
+
+            def incremental_manifest():
+                snapshots_in_range = []
+                for snapshot_id in range(start_id + 1, end_id + 1):
+                    snapshot = snapshot_manager.get_snapshot_by_id(snapshot_id)
+                    if snapshot.commit_kind == "APPEND":
+                        snapshots_in_range.append(snapshot)
+
+                manifests = []
+
+                for snapshot in snapshots_in_range:
+                    manifest_files = manifest_list_manager.read_delta(snapshot)
+                    manifests.extend(manifest_files)
+                return manifests
+
+            return FileScanner(self.table, incremental_manifest, 
self.predicate, self.limit)
+
+        def all_manifests():
+            snapshot = snapshot_manager.get_latest_snapshot()
+            return manifest_list_manager.read_all(snapshot)
+
+        return FileScanner(
             self.table,
+            all_manifests,
             self.predicate,
             self.limit,
             vector_search=self.vector_search
         )
 
     def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 
'TableScan':
-        self.starting_scanner.with_shard(idx_of_this_subtask, 
number_of_para_subtasks)
+        self.file_scanner.with_shard(idx_of_this_subtask, 
number_of_para_subtasks)
         return self
 
     def with_slice(self, start_pos, end_pos) -> 'TableScan':
-        self.starting_scanner.with_slice(start_pos, end_pos)
+        self.file_scanner.with_slice(start_pos, end_pos)
         return self
diff --git a/paimon-python/pypaimon/tests/binary_row_test.py 
b/paimon-python/pypaimon/tests/binary_row_test.py
index 5bfabfb121..1f440a9969 100644
--- a/paimon-python/pypaimon/tests/binary_row_test.py
+++ b/paimon-python/pypaimon/tests/binary_row_test.py
@@ -26,7 +26,7 @@ import pyarrow as pa
 from pypaimon import CatalogFactory, Schema
 from pypaimon.manifest.schema.manifest_entry import ManifestEntry
 from pypaimon.manifest.schema.simple_stats import SimpleStats
-from pypaimon.read.scanner.full_starting_scanner import FullStartingScanner
+from pypaimon.read.scanner.file_scanner import FileScanner
 from pypaimon.table.row.generic_row import GenericRow, GenericRowDeserializer
 
 
@@ -112,10 +112,10 @@ class BinaryRowTest(unittest.TestCase):
 
     def test_is_not_null_append(self):
         table = self.catalog.get_table('default.test_append')
-        starting_scanner = FullStartingScanner(table, None, None)
-        latest_snapshot = 
starting_scanner.snapshot_manager.get_latest_snapshot()
-        manifest_files = 
starting_scanner.manifest_list_manager.read_all(latest_snapshot)
-        manifest_entries = 
starting_scanner.manifest_file_manager.read(manifest_files[0].file_name)
+        file_scanner = FileScanner(table, lambda: [])
+        latest_snapshot = file_scanner.snapshot_manager.get_latest_snapshot()
+        manifest_files = 
file_scanner.manifest_list_manager.read_all(latest_snapshot)
+        manifest_entries = 
file_scanner.manifest_file_manager.read(manifest_files[0].file_name)
         self._transform_manifest_entries(manifest_entries, [])
         l = ['abc', 'abbc', 'bc', 'd', None]
         for i, entry in enumerate(manifest_entries):
@@ -125,7 +125,7 @@ class BinaryRowTest(unittest.TestCase):
                 GenericRow([l[i]], [table.fields[1]]),
                 [1 if l[i] is None else 0],
             )
-        
starting_scanner.manifest_file_manager.write(manifest_files[0].file_name, 
manifest_entries)
+        file_scanner.manifest_file_manager.write(manifest_files[0].file_name, 
manifest_entries)
 
         read_builder = table.new_read_builder()
         predicate_builder = read_builder.new_predicate_builder()
@@ -249,10 +249,10 @@ class BinaryRowTest(unittest.TestCase):
         table_write.close()
         table_commit.close()
 
-        starting_scanner = FullStartingScanner(table, None, None)
-        latest_snapshot = 
starting_scanner.snapshot_manager.get_latest_snapshot()
-        manifest_files = 
starting_scanner.manifest_list_manager.read_all(latest_snapshot)
-        manifest_entries = 
starting_scanner.manifest_file_manager.read(manifest_files[0].file_name)
+        file_scanner = FileScanner(table, lambda: [])
+        latest_snapshot = file_scanner.snapshot_manager.get_latest_snapshot()
+        manifest_files = 
file_scanner.manifest_list_manager.read_all(latest_snapshot)
+        manifest_entries = 
file_scanner.manifest_file_manager.read(manifest_files[0].file_name)
         self._transform_manifest_entries(manifest_entries, [])
         for i, entry in enumerate(manifest_entries):
             entry.file.value_stats_cols = ['f2', 'f6', 'f8']
@@ -261,7 +261,7 @@ class BinaryRowTest(unittest.TestCase):
                 GenericRow([10 * (i + 1), 100 * (i + 1), 5 - i], 
[table.fields[2], table.fields[6], table.fields[8]]),
                 [0, 0, 0],
             )
-        
starting_scanner.manifest_file_manager.write(manifest_files[0].file_name, 
manifest_entries)
+        file_scanner.manifest_file_manager.write(manifest_files[0].file_name, 
manifest_entries)
         # Build multiple predicates and combine them
         read_builder = table.new_read_builder()
         predicate_builder = read_builder.new_predicate_builder()
@@ -288,10 +288,10 @@ class BinaryRowTest(unittest.TestCase):
                          }
         self.assertEqual(expected_data, actual.to_pydict())
 
-        starting_scanner = FullStartingScanner(table, None, None)
-        latest_snapshot = 
starting_scanner.snapshot_manager.get_latest_snapshot()
-        manifest_files = 
starting_scanner.manifest_list_manager.read_all(latest_snapshot)
-        manifest_entries = 
starting_scanner.manifest_file_manager.read(manifest_files[0].file_name)
+        file_scanner = FileScanner(table, lambda: [])
+        latest_snapshot = file_scanner.snapshot_manager.get_latest_snapshot()
+        manifest_files = 
file_scanner.manifest_list_manager.read_all(latest_snapshot)
+        manifest_entries = 
file_scanner.manifest_file_manager.read(manifest_files[0].file_name)
         self._transform_manifest_entries(manifest_entries, [])
         for i, entry in enumerate(manifest_entries):
             entry.file.value_stats_cols = ['f2', 'f6', 'f8']
@@ -300,7 +300,7 @@ class BinaryRowTest(unittest.TestCase):
                 GenericRow([0, 100 * (i + 1), 5 - i], [table.fields[2], 
table.fields[6], table.fields[8]]),
                 [0, 0, 0],
             )
-        
starting_scanner.manifest_file_manager.write(manifest_files[0].file_name, 
manifest_entries)
+        file_scanner.manifest_file_manager.write(manifest_files[0].file_name, 
manifest_entries)
         splits, actual = self._read_result(read_builder.with_filter(combined))
         self.assertFalse(actual)
 
@@ -319,10 +319,10 @@ class BinaryRowTest(unittest.TestCase):
                                                                                
 trimmed_pk_fields)
 
     def _overwrite_manifest_entry(self, table):
-        starting_scanner = FullStartingScanner(table, None, None)
-        latest_snapshot = 
starting_scanner.snapshot_manager.get_latest_snapshot()
-        manifest_files = 
starting_scanner.manifest_list_manager.read_all(latest_snapshot)
-        manifest_entries = 
starting_scanner.manifest_file_manager.read(manifest_files[0].file_name)
+        file_scanner = FileScanner(table, lambda: [])
+        latest_snapshot = file_scanner.snapshot_manager.get_latest_snapshot()
+        manifest_files = 
file_scanner.manifest_list_manager.read_all(latest_snapshot)
+        manifest_entries = 
file_scanner.manifest_file_manager.read(manifest_files[0].file_name)
         self._transform_manifest_entries(manifest_entries, [])
         for i, entry in enumerate(manifest_entries):
             entry.file.value_stats_cols = ['f2']
@@ -331,4 +331,4 @@ class BinaryRowTest(unittest.TestCase):
                 GenericRow([6 + i], [table.fields[2]]),
                 [0],
             )
-        
starting_scanner.manifest_file_manager.write(manifest_files[0].file_name, 
manifest_entries)
+        file_scanner.manifest_file_manager.write(manifest_files[0].file_name, 
manifest_entries)
diff --git a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py 
b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
index 7f27b15b31..cfdf33b755 100644
--- a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
+++ b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
@@ -181,10 +181,10 @@ class RESTAOReadWritePy36Test(RESTBaseTest):
 
         # to test GenericRow ability
         latest_snapshot = SnapshotManager(table).get_latest_snapshot()
-        manifest_files = 
table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot)
-        manifest_entries = 
table_scan.starting_scanner.manifest_file_manager.read(
+        manifest_files = 
table_scan.file_scanner.manifest_list_manager.read_all(latest_snapshot)
+        manifest_entries = table_scan.file_scanner.manifest_file_manager.read(
             manifest_files[0].file_name,
-            lambda row: 
table_scan.starting_scanner._filter_manifest_entry(row),
+            lambda row: table_scan.file_scanner._filter_manifest_entry(row),
             drop_stats=False)
         # Python write does not produce value stats
         if stats_enabled:
diff --git a/paimon-python/pypaimon/tests/reader_base_test.py 
b/paimon-python/pypaimon/tests/reader_base_test.py
index 3d9ed7f874..8a07c66239 100644
--- a/paimon-python/pypaimon/tests/reader_base_test.py
+++ b/paimon-python/pypaimon/tests/reader_base_test.py
@@ -227,9 +227,9 @@ class ReaderBasicTest(unittest.TestCase):
 
         # to test GenericRow ability
         latest_snapshot = SnapshotManager(table).get_latest_snapshot()
-        manifest_files = 
table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot)
-        manifest_entries = 
table_scan.starting_scanner.manifest_file_manager.read(
-            manifest_files[0].file_name, lambda row: 
table_scan.starting_scanner._filter_manifest_entry(row), False)
+        manifest_files = 
table_scan.file_scanner.manifest_list_manager.read_all(latest_snapshot)
+        manifest_entries = table_scan.file_scanner.manifest_file_manager.read(
+            manifest_files[0].file_name, lambda row: 
table_scan.file_scanner._filter_manifest_entry(row), False)
 
         # Python write does not produce value stats
         if stats_enabled:
@@ -513,10 +513,10 @@ class ReaderBasicTest(unittest.TestCase):
         pk_read_builder = pk_table.new_read_builder()
         pk_table_scan = pk_read_builder.new_scan()
         latest_snapshot = SnapshotManager(pk_table).get_latest_snapshot()
-        pk_manifest_files = 
pk_table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot)
-        pk_manifest_entries = 
pk_table_scan.starting_scanner.manifest_file_manager.read(
+        pk_manifest_files = 
pk_table_scan.file_scanner.manifest_list_manager.read_all(latest_snapshot)
+        pk_manifest_entries = 
pk_table_scan.file_scanner.manifest_file_manager.read(
             pk_manifest_files[0].file_name,
-            lambda row: 
pk_table_scan.starting_scanner._filter_manifest_entry(row),
+            lambda row: pk_table_scan.file_scanner._filter_manifest_entry(row),
             False
         )
 
@@ -530,7 +530,7 @@ class ReaderBasicTest(unittest.TestCase):
                          f"table.fields should NOT contain system fields, but 
got: {pk_table_field_names}")
 
         if pk_file_meta.value_stats_cols is None:
-            pk_value_stats_fields = 
pk_table_scan.starting_scanner.manifest_file_manager._get_value_stats_fields(
+            pk_value_stats_fields = 
pk_table_scan.file_scanner.manifest_file_manager._get_value_stats_fields(
                 {'_VALUE_STATS_COLS': None},
                 pk_table.fields
             )
@@ -583,10 +583,10 @@ class ReaderBasicTest(unittest.TestCase):
         read_builder = table.new_read_builder()
         table_scan = read_builder.new_scan()
         latest_snapshot = SnapshotManager(table).get_latest_snapshot()
-        manifest_files = 
table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot)
-        manifest_entries = 
table_scan.starting_scanner.manifest_file_manager.read(
+        manifest_files = 
table_scan.file_scanner.manifest_list_manager.read_all(latest_snapshot)
+        manifest_entries = table_scan.file_scanner.manifest_file_manager.read(
             manifest_files[0].file_name,
-            lambda row: 
table_scan.starting_scanner._filter_manifest_entry(row),
+            lambda row: table_scan.file_scanner._filter_manifest_entry(row),
             False
         )
 
@@ -1090,10 +1090,10 @@ class ReaderBasicTest(unittest.TestCase):
         read_builder = table.new_read_builder()
         table_scan = read_builder.new_scan()
         latest_snapshot = SnapshotManager(table).get_latest_snapshot()
-        manifest_files = 
table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot)
-        manifest_entries = 
table_scan.starting_scanner.manifest_file_manager.read(
+        manifest_files = 
table_scan.file_scanner.manifest_list_manager.read_all(latest_snapshot)
+        manifest_entries = table_scan.file_scanner.manifest_file_manager.read(
             manifest_files[0].file_name,
-            lambda row: 
table_scan.starting_scanner._filter_manifest_entry(row),
+            lambda row: table_scan.file_scanner._filter_manifest_entry(row),
             False
         )
 
@@ -1146,7 +1146,7 @@ class ReaderBasicTest(unittest.TestCase):
                 self.assertFalse(is_system_field,
                                  f"value_stats_cols should not contain system 
field: {field_name}")
             
-            value_stats_fields = 
table_scan.starting_scanner.manifest_file_manager._get_value_stats_fields(
+            value_stats_fields = 
table_scan.file_scanner.manifest_file_manager._get_value_stats_fields(
                 {'_VALUE_STATS_COLS': file_meta.value_stats_cols},
                 table.fields
             )
diff --git a/paimon-python/pypaimon/tests/reader_primary_key_test.py 
b/paimon-python/pypaimon/tests/reader_primary_key_test.py
index c22346afe7..811cbb24dd 100644
--- a/paimon-python/pypaimon/tests/reader_primary_key_test.py
+++ b/paimon-python/pypaimon/tests/reader_primary_key_test.py
@@ -358,10 +358,10 @@ class PkReaderTest(unittest.TestCase):
         latest_snapshot = snapshot_manager.get_latest_snapshot()
         read_builder = table.new_read_builder()
         table_scan = read_builder.new_scan()
-        manifest_list_manager = 
table_scan.starting_scanner.manifest_list_manager
+        manifest_list_manager = table_scan.file_scanner.manifest_list_manager
         manifest_files = manifest_list_manager.read_all(latest_snapshot)
 
-        manifest_file_manager = 
table_scan.starting_scanner.manifest_file_manager
+        manifest_file_manager = table_scan.file_scanner.manifest_file_manager
         creation_times_found = []
         for manifest_file_meta in manifest_files:
             entries = manifest_file_manager.read(manifest_file_meta.file_name, 
drop_stats=False)
diff --git a/paimon-python/pypaimon/tests/schema_evolution_read_test.py 
b/paimon-python/pypaimon/tests/schema_evolution_read_test.py
index a67a927a5e..dffd985bb6 100644
--- a/paimon-python/pypaimon/tests/schema_evolution_read_test.py
+++ b/paimon-python/pypaimon/tests/schema_evolution_read_test.py
@@ -259,14 +259,14 @@ class SchemaEvolutionReadTest(unittest.TestCase):
         schema_manager.commit(TableSchema.from_schema(schema_id=0, 
schema=schema))
         schema_manager.commit(TableSchema.from_schema(schema_id=1, 
schema=schema2))
         # scan filter for schema evolution
-        latest_snapshot = 
table1.new_read_builder().new_scan().starting_scanner.snapshot_manager.get_latest_snapshot()
+        latest_snapshot = 
table1.new_read_builder().new_scan().file_scanner.snapshot_manager.get_latest_snapshot()
         table2.table_path = table1.table_path
         new_read_buidler = table2.new_read_builder()
         predicate_builder = new_read_buidler.new_predicate_builder()
         predicate = predicate_builder.less_than('user_id', 3)
         new_scan = new_read_buidler.with_filter(predicate).new_scan()
-        manifest_files = 
new_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot)
-        entries = 
new_scan.starting_scanner.read_manifest_entries(manifest_files)
+        manifest_files = 
new_scan.file_scanner.manifest_list_manager.read_all(latest_snapshot)
+        entries = new_scan.file_scanner.read_manifest_entries(manifest_files)
         self.assertEqual(1, len(entries))  # verify scan filter success for 
schema evolution
 
     def test_schema_evolution_with_read_filter(self):
diff --git a/paimon-python/pypaimon/write/file_store_commit.py 
b/paimon-python/pypaimon/write/file_store_commit.py
index 71c22224dc..6833ec3fd5 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -28,7 +28,7 @@ from pypaimon.manifest.manifest_list_manager import 
ManifestListManager
 from pypaimon.manifest.schema.manifest_entry import ManifestEntry
 from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta
 from pypaimon.manifest.schema.simple_stats import SimpleStats
-from pypaimon.read.scanner.full_starting_scanner import FullStartingScanner
+from pypaimon.read.scanner.file_scanner import FileScanner
 from pypaimon.snapshot.snapshot import Snapshot
 from pypaimon.snapshot.snapshot_commit import (PartitionStatistics,
                                                SnapshotCommit)
@@ -371,7 +371,7 @@ class FileStoreCommit:
         """Generate commit entries for OVERWRITE mode based on latest 
snapshot."""
         entries = []
         current_entries = [] if latestSnapshot is None \
-            else (FullStartingScanner(self.table, partition_filter, None).
+            else (FileScanner(self.table, lambda: [], partition_filter).
                   
read_manifest_entries(self.manifest_list_manager.read_all(latestSnapshot)))
         for entry in current_entries:
             entry.kind = 1  # DELETE

Reply via email to