This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6c1415be9b [python] Refactor to File Scanner and add
_filter_manifest_files_by_row_ranges (#7177)
6c1415be9b is described below
commit 6c1415be9b025156e6e815172374da92318bf200
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Feb 2 22:06:18 2026 +0800
[python] Refactor to File Scanner and add
_filter_manifest_files_by_row_ranges (#7177)
1. Simplify StartingScanner to FileScanner, just one class.
2. Introduce `_filter_manifest_files_by_row_ranges` to accelerate query
with global index.
---
paimon-python/pypaimon/globalindex/__init__.py | 8 +-
.../globalindex/faiss/faiss_vector_reader.py | 4 +-
.../pypaimon/globalindex/vector_search_result.py | 20 +--
.../pypaimon/manifest/manifest_file_manager.py | 16 +--
.../pypaimon/manifest/manifest_list_manager.py | 8 +-
.../pypaimon/manifest/schema/manifest_file_meta.py | 5 +
.../read/scanner/empty_starting_scanner.py | 25 ----
.../{full_starting_scanner.py => file_scanner.py} | 146 +++++++++++++++------
.../read/scanner/incremental_starting_scanner.py | 69 ----------
.../pypaimon/read/scanner/starting_scanner.py | 28 ----
paimon-python/pypaimon/read/table_scan.py | 66 +++++++---
paimon-python/pypaimon/tests/binary_row_test.py | 42 +++---
.../pypaimon/tests/py36/rest_ao_read_write_test.py | 6 +-
paimon-python/pypaimon/tests/reader_base_test.py | 28 ++--
.../pypaimon/tests/reader_primary_key_test.py | 4 +-
.../pypaimon/tests/schema_evolution_read_test.py | 6 +-
paimon-python/pypaimon/write/file_store_commit.py | 4 +-
17 files changed, 232 insertions(+), 253 deletions(-)
diff --git a/paimon-python/pypaimon/globalindex/__init__.py
b/paimon-python/pypaimon/globalindex/__init__.py
index 529b90c478..ae8e6328d1 100644
--- a/paimon-python/pypaimon/globalindex/__init__.py
+++ b/paimon-python/pypaimon/globalindex/__init__.py
@@ -20,8 +20,8 @@ from pypaimon.globalindex.global_index_result import
GlobalIndexResult
from pypaimon.globalindex.global_index_reader import GlobalIndexReader,
FieldRef
from pypaimon.globalindex.vector_search import VectorSearch
from pypaimon.globalindex.vector_search_result import (
- VectorSearchGlobalIndexResult,
- DictBasedVectorSearchResult,
+ ScoredGlobalIndexResult,
+ DictBasedScoredIndexResult,
ScoreGetter,
)
from pypaimon.globalindex.global_index_meta import GlobalIndexMeta,
GlobalIndexIOMeta
@@ -38,8 +38,8 @@ __all__ = [
'GlobalIndexReader',
'FieldRef',
'VectorSearch',
- 'VectorSearchGlobalIndexResult',
- 'DictBasedVectorSearchResult',
+ 'ScoredGlobalIndexResult',
+ 'DictBasedScoredIndexResult',
'ScoreGetter',
'GlobalIndexMeta',
'GlobalIndexIOMeta',
diff --git a/paimon-python/pypaimon/globalindex/faiss/faiss_vector_reader.py
b/paimon-python/pypaimon/globalindex/faiss/faiss_vector_reader.py
index c2460d9b9e..3201894d27 100644
--- a/paimon-python/pypaimon/globalindex/faiss/faiss_vector_reader.py
+++ b/paimon-python/pypaimon/globalindex/faiss/faiss_vector_reader.py
@@ -30,7 +30,7 @@ import numpy as np
from pypaimon.globalindex.global_index_reader import GlobalIndexReader
from pypaimon.globalindex.global_index_result import GlobalIndexResult
from pypaimon.globalindex.global_index_meta import GlobalIndexIOMeta
-from pypaimon.globalindex.vector_search_result import
DictBasedVectorSearchResult
+from pypaimon.globalindex.vector_search_result import
DictBasedScoredIndexResult
from pypaimon.globalindex.roaring_bitmap import RoaringBitmap64
from pypaimon.globalindex.faiss.faiss_options import (
FaissVectorIndexOptions,
@@ -170,7 +170,7 @@ class FaissVectorGlobalIndexReader(GlobalIndexReader):
if not results:
return None
- return DictBasedVectorSearchResult(results)
+ return DictBasedScoredIndexResult(results)
def _configure_search_params(self, index: FaissIndex) -> None:
"""Configure search parameters based on index type."""
diff --git a/paimon-python/pypaimon/globalindex/vector_search_result.py
b/paimon-python/pypaimon/globalindex/vector_search_result.py
index 62ccb85502..e4fdcd6cc8 100644
--- a/paimon-python/pypaimon/globalindex/vector_search_result.py
+++ b/paimon-python/pypaimon/globalindex/vector_search_result.py
@@ -29,7 +29,7 @@ from pypaimon.globalindex.roaring_bitmap import
RoaringBitmap64
ScoreGetter = Callable[[int], Optional[float]]
-class VectorSearchGlobalIndexResult(GlobalIndexResult):
+class ScoredGlobalIndexResult(GlobalIndexResult):
"""
Vector search global index result for vector index.
@@ -41,7 +41,7 @@ class VectorSearchGlobalIndexResult(GlobalIndexResult):
"""Returns a function to get the score for a given row ID."""
pass
- def offset(self, offset: int) -> 'VectorSearchGlobalIndexResult':
+ def offset(self, offset: int) -> 'ScoredGlobalIndexResult':
"""Returns a new result with row IDs offset by the given amount."""
if offset == 0:
return self
@@ -53,14 +53,14 @@ class VectorSearchGlobalIndexResult(GlobalIndexResult):
for row_id in bitmap:
offset_bitmap.add(row_id + offset)
- return SimpleVectorSearchGlobalIndexResult(
+ return SimpleScoredGlobalIndexResult(
offset_bitmap,
lambda row_id: this_score_getter(row_id - offset)
)
def or_(self, other: GlobalIndexResult) -> GlobalIndexResult:
"""Returns the union of this result and the other result."""
- if not isinstance(other, VectorSearchGlobalIndexResult):
+ if not isinstance(other, ScoredGlobalIndexResult):
return super().or_(other)
this_row_ids = self.results()
@@ -76,18 +76,18 @@ class VectorSearchGlobalIndexResult(GlobalIndexResult):
return this_score_getter(row_id)
return other_score_getter(row_id)
- return SimpleVectorSearchGlobalIndexResult(result_or,
combined_score_getter)
+ return SimpleScoredGlobalIndexResult(result_or, combined_score_getter)
@staticmethod
def create(
supplier: Callable[[], RoaringBitmap64],
score_getter: ScoreGetter
- ) -> 'VectorSearchGlobalIndexResult':
+ ) -> 'ScoredGlobalIndexResult':
"""Creates a new VectorSearchGlobalIndexResult from supplier."""
- return LazyVectorSearchGlobalIndexResult(supplier, score_getter)
+ return LazyScoredGlobalIndexResult(supplier, score_getter)
-class SimpleVectorSearchGlobalIndexResult(VectorSearchGlobalIndexResult):
+class SimpleScoredGlobalIndexResult(ScoredGlobalIndexResult):
"""Simple implementation of VectorSearchGlobalIndexResult."""
def __init__(self, bitmap: RoaringBitmap64, score_getter_fn: ScoreGetter):
@@ -101,7 +101,7 @@ class
SimpleVectorSearchGlobalIndexResult(VectorSearchGlobalIndexResult):
return self._score_getter_fn
-class LazyVectorSearchGlobalIndexResult(VectorSearchGlobalIndexResult):
+class LazyScoredGlobalIndexResult(ScoredGlobalIndexResult):
"""Lazy implementation of VectorSearchGlobalIndexResult."""
def __init__(self, supplier: Callable[[], RoaringBitmap64],
score_getter_fn: ScoreGetter):
@@ -118,7 +118,7 @@ class
LazyVectorSearchGlobalIndexResult(VectorSearchGlobalIndexResult):
return self._score_getter_fn
-class DictBasedVectorSearchResult(VectorSearchGlobalIndexResult):
+class DictBasedScoredIndexResult(ScoredGlobalIndexResult):
"""Vector search result backed by a dictionary of row_id -> score."""
def __init__(self, id_to_scores: Dict[int, float]):
diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py
b/paimon-python/pypaimon/manifest/manifest_file_manager.py
index f6ae41e3d3..0ed5091825 100644
--- a/paimon-python/pypaimon/manifest/manifest_file_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py
@@ -53,15 +53,15 @@ class ManifestFileManager:
def _process_single_manifest(manifest_file: ManifestFileMeta) ->
List[ManifestEntry]:
return self.read(manifest_file.file_name, manifest_entry_filter,
drop_stats)
- def _entry_identifier(entry: ManifestEntry) -> tuple:
+ def _entry_identifier(e: ManifestEntry) -> tuple:
return (
- tuple(entry.partition.values),
- entry.bucket,
- entry.file.level,
- entry.file.file_name,
- tuple(entry.file.extra_files) if entry.file.extra_files else
(),
- entry.file.embedded_index,
- entry.file.external_path,
+ tuple(e.partition.values),
+ e.bucket,
+ e.file.level,
+ e.file.file_name,
+ tuple(e.file.extra_files) if e.file.extra_files else (),
+ e.file.embedded_index,
+ e.file.external_path,
)
deleted_entry_keys = set()
diff --git a/paimon-python/pypaimon/manifest/manifest_list_manager.py
b/paimon-python/pypaimon/manifest/manifest_list_manager.py
index a1897fbee7..0bf9931630 100644
--- a/paimon-python/pypaimon/manifest/manifest_list_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_list_manager.py
@@ -17,7 +17,7 @@
################################################################################
from io import BytesIO
-from typing import List
+from typing import List, Optional
import fastavro
@@ -40,7 +40,9 @@ class ManifestListManager:
self.manifest_path = f"{manifest_path}/manifest"
self.file_io = self.table.file_io
- def read_all(self, snapshot: Snapshot) -> List[ManifestFileMeta]:
+ def read_all(self, snapshot: Optional[Snapshot]) -> List[ManifestFileMeta]:
+ if snapshot is None:
+ return []
manifest_files = []
base_manifests = self.read(snapshot.base_manifest_list)
manifest_files.extend(base_manifests)
@@ -79,6 +81,8 @@ class ManifestListManager:
num_deleted_files=record['_NUM_DELETED_FILES'],
partition_stats=partition_stats,
schema_id=record['_SCHEMA_ID'],
+ min_row_id=record['_MIN_ROW_ID'],
+ max_row_id=record['_MAX_ROW_ID'],
)
manifest_files.append(manifest_file_meta)
diff --git a/paimon-python/pypaimon/manifest/schema/manifest_file_meta.py
b/paimon-python/pypaimon/manifest/schema/manifest_file_meta.py
index a830be6cc2..ed42dd7ecd 100644
--- a/paimon-python/pypaimon/manifest/schema/manifest_file_meta.py
+++ b/paimon-python/pypaimon/manifest/schema/manifest_file_meta.py
@@ -18,6 +18,7 @@
from dataclasses import dataclass
+from typing import Optional
from pypaimon.manifest.schema.simple_stats import (PARTITION_STATS_SCHEMA,
SimpleStats)
@@ -31,6 +32,8 @@ class ManifestFileMeta:
partition_stats: SimpleStats
schema_id: int
+ min_row_id: Optional[int] = None
+ max_row_id: Optional[int] = None
MANIFEST_FILE_META_SCHEMA = {
"type": "record",
@@ -43,5 +46,7 @@ MANIFEST_FILE_META_SCHEMA = {
{"name": "_NUM_DELETED_FILES", "type": "long"},
{"name": "_PARTITION_STATS", "type": PARTITION_STATS_SCHEMA},
{"name": "_SCHEMA_ID", "type": "long"},
+ {"name": "_MIN_ROW_ID", "type": ["null", "long"], "default": None},
+ {"name": "_MAX_ROW_ID", "type": ["null", "long"], "default": None},
]
}
diff --git a/paimon-python/pypaimon/read/scanner/empty_starting_scanner.py
b/paimon-python/pypaimon/read/scanner/empty_starting_scanner.py
deleted file mode 100644
index 9c870aa588..0000000000
--- a/paimon-python/pypaimon/read/scanner/empty_starting_scanner.py
+++ /dev/null
@@ -1,25 +0,0 @@
-"""
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-"""
-from pypaimon.read.plan import Plan
-from pypaimon.read.scanner.starting_scanner import StartingScanner
-
-
-class EmptyStartingScanner(StartingScanner):
-
- def scan(self) -> Plan:
- return Plan([])
diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
b/paimon-python/pypaimon/read/scanner/file_scanner.py
similarity index 76%
rename from paimon-python/pypaimon/read/scanner/full_starting_scanner.py
rename to paimon-python/pypaimon/read/scanner/file_scanner.py
index f415ab61ef..0f53ef0cd3 100755
--- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/file_scanner.py
@@ -16,10 +16,10 @@ See the License for the specific language governing
permissions and
limitations under the License.
"""
import os
-from typing import List, Optional, Dict, Set
+from typing import List, Optional, Dict, Set, Callable
from pypaimon.common.predicate import Predicate
-from pypaimon.globalindex import VectorSearchGlobalIndexResult
+from pypaimon.globalindex import ScoredGlobalIndexResult
from pypaimon.table.source.deletion_file import DeletionFile
from pypaimon.manifest.index_manifest_file import IndexManifestFile
from pypaimon.manifest.manifest_file_manager import ManifestFileManager
@@ -31,24 +31,73 @@ from pypaimon.read.push_down_utils import
(trim_and_transform_predicate)
from pypaimon.read.scanner.append_table_split_generator import
AppendTableSplitGenerator
from pypaimon.read.scanner.data_evolution_split_generator import
DataEvolutionSplitGenerator
from pypaimon.read.scanner.primary_key_table_split_generator import
PrimaryKeyTableSplitGenerator
-from pypaimon.read.scanner.starting_scanner import StartingScanner
from pypaimon.read.split import DataSplit
from pypaimon.snapshot.snapshot_manager import SnapshotManager
from pypaimon.table.bucket_mode import BucketMode
from pypaimon.manifest.simple_stats_evolutions import SimpleStatsEvolutions
-class FullStartingScanner(StartingScanner):
+def _filter_manifest_files_by_row_ranges(
+ manifest_files: List[ManifestFileMeta],
+ row_ranges: List) -> List[ManifestFileMeta]:
+ """
+ Filter manifest files by row ranges.
+
+ Only keep manifest files that have min_row_id and max_row_id and overlap
with the given row ranges.
+
+ Args:
+ manifest_files: List of manifest file metadata
+ row_ranges: List of row ranges to filter by
+
+ Returns:
+ Filtered list of manifest files
+ """
+ from pypaimon.globalindex.range import Range
+
+ filtered_files = []
+ for manifest in manifest_files:
+ min_row_id = manifest.min_row_id
+ max_row_id = manifest.max_row_id
+
+ # If min_row_id or max_row_id is None, we cannot filter, keep the file
+ if min_row_id is None or max_row_id is None:
+ filtered_files.append(manifest)
+ continue
+
+ # Check if manifest row range overlaps with any of the expected row
ranges
+ manifest_row_range = Range(min_row_id, max_row_id)
+ should_keep = False
+
+ for expected_range in row_ranges:
+ # Check if ranges intersect
+ intersect = Range.intersect(
+ manifest_row_range.from_,
+ manifest_row_range.to,
+ expected_range.from_,
+ expected_range.to)
+ if intersect:
+ should_keep = True
+ break
+
+ if should_keep:
+ filtered_files.append(manifest)
+
+ return filtered_files
+
+
+class FileScanner:
def __init__(
self,
table,
- predicate: Optional[Predicate],
- limit: Optional[int],
+ manifest_scanner: Callable[[], List[ManifestFileMeta]],
+ predicate: Optional[Predicate] = None,
+ limit: Optional[int] = None,
vector_search: Optional['VectorSearch'] = None
):
from pypaimon.table.file_store_table import FileStoreTable
self.table: FileStoreTable = table
+ self.manifest_scanner = manifest_scanner
self.predicate = predicate
self.limit = limit
self.vector_search = vector_search
@@ -84,53 +133,39 @@ class FullStartingScanner(StartingScanner):
self.table.table_schema.id
)
- def scan(self) -> Plan:
- file_entries = self.plan_files()
- if not file_entries:
- return Plan([])
- # Get deletion files map if deletion vectors are enabled.
- # {partition-bucket -> {filename -> DeletionFile}}
- deletion_files_map: dict[tuple, dict[str, DeletionFile]] = {}
- if self.deletion_vectors_enabled:
- latest_snapshot = self.snapshot_manager.get_latest_snapshot()
- # Extract unique partition-bucket pairs from file entries
- buckets = set()
- for entry in file_entries:
- buckets.add((tuple(entry.partition.values), entry.bucket))
- deletion_files_map = self._scan_dv_index(latest_snapshot, buckets)
+ def _deletion_files_map(self, entries: List[ManifestEntry]) -> Dict[tuple,
Dict[str, DeletionFile]]:
+ if not self.deletion_vectors_enabled:
+ return {}
+ # Extract unique partition-bucket pairs from file entries
+ bucket_files = set()
+ for e in entries:
+ bucket_files.add((tuple(e.partition.values), e.bucket))
+ return
self._scan_dv_index(self.snapshot_manager.get_latest_snapshot(), bucket_files)
+ def scan(self) -> Plan:
# Create appropriate split generator based on table type
if self.table.is_primary_key_table:
+ entries = self.plan_files()
split_generator = PrimaryKeyTableSplitGenerator(
self.table,
self.target_split_size,
self.open_file_cost,
- deletion_files_map
+ self._deletion_files_map(entries)
)
elif self.data_evolution:
- global_index_result = self._eval_global_index()
- row_ranges = None
- score_getter = None
- if global_index_result is not None:
- row_ranges = global_index_result.results().to_range_list()
- if isinstance(global_index_result,
VectorSearchGlobalIndexResult):
- score_getter = global_index_result.score_getter()
- split_generator = DataEvolutionSplitGenerator(
- self.table,
- self.target_split_size,
- self.open_file_cost,
- deletion_files_map,
- row_ranges,
- score_getter
- )
+ entries, split_generator =
self._create_data_evolution_split_generator()
else:
+ entries = self.plan_files()
split_generator = AppendTableSplitGenerator(
self.table,
self.target_split_size,
self.open_file_cost,
- deletion_files_map
+ self._deletion_files_map(entries)
)
+ if not entries:
+ return Plan([])
+
# Configure sharding if needed
if self.idx_of_this_subtask is not None:
split_generator.with_shard(self.idx_of_this_subtask,
self.number_of_para_subtasks)
@@ -138,16 +173,41 @@ class FullStartingScanner(StartingScanner):
split_generator.with_slice(self.start_pos_of_this_subtask,
self.end_pos_of_this_subtask)
# Generate splits
- splits = split_generator.create_splits(file_entries)
+ splits = split_generator.create_splits(entries)
splits = self._apply_push_down_limit(splits)
return Plan(splits)
+ def _create_data_evolution_split_generator(self):
+ row_ranges = None
+ score_getter = None
+ global_index_result = self._eval_global_index()
+ if global_index_result is not None:
+ row_ranges = global_index_result.results().to_range_list()
+ if isinstance(global_index_result, ScoredGlobalIndexResult):
+ score_getter = global_index_result.score_getter()
+
+ manifest_files = self.manifest_scanner()
+
+ # Filter manifest files by row ranges if available
+ if row_ranges is not None:
+ manifest_files =
_filter_manifest_files_by_row_ranges(manifest_files, row_ranges)
+
+ entries = self.read_manifest_entries(manifest_files)
+
+ return entries, DataEvolutionSplitGenerator(
+ self.table,
+ self.target_split_size,
+ self.open_file_cost,
+ self._deletion_files_map(entries),
+ row_ranges,
+ score_getter
+ )
+
def plan_files(self) -> List[ManifestEntry]:
- latest_snapshot = self.snapshot_manager.get_latest_snapshot()
- if not latest_snapshot:
+ manifest_files = self.manifest_scanner()
+ if len(manifest_files) == 0:
return []
- manifest_files = self.manifest_list_manager.read_all(latest_snapshot)
return self.read_manifest_entries(manifest_files)
def _eval_global_index(self):
@@ -222,7 +282,7 @@ class FullStartingScanner(StartingScanner):
max_workers=max_workers
)
- def with_shard(self, idx_of_this_subtask: int, number_of_para_subtasks:
int) -> 'FullStartingScanner':
+ def with_shard(self, idx_of_this_subtask: int, number_of_para_subtasks:
int) -> 'FileScanner':
if idx_of_this_subtask >= number_of_para_subtasks:
raise ValueError("idx_of_this_subtask must be less than
number_of_para_subtasks")
if self.start_pos_of_this_subtask is not None:
@@ -231,7 +291,7 @@ class FullStartingScanner(StartingScanner):
self.number_of_para_subtasks = number_of_para_subtasks
return self
- def with_slice(self, start_pos: int, end_pos: int) ->
'FullStartingScanner':
+ def with_slice(self, start_pos: int, end_pos: int) -> 'FileScanner':
if start_pos >= end_pos:
raise ValueError("start_pos must be less than end_pos")
if self.idx_of_this_subtask is not None:
diff --git
a/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py
b/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py
deleted file mode 100644
index 56c186188a..0000000000
--- a/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py
+++ /dev/null
@@ -1,69 +0,0 @@
-"""
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-"""
-from typing import List, Optional
-
-from pypaimon.common.predicate import Predicate
-from pypaimon.manifest.schema.manifest_entry import ManifestEntry
-from pypaimon.read.scanner.full_starting_scanner import FullStartingScanner
-from pypaimon.snapshot.snapshot_manager import SnapshotManager
-
-
-class IncrementalStartingScanner(FullStartingScanner):
- def __init__(self, table, predicate: Optional[Predicate], limit:
Optional[int],
- start: int, end: int):
- super().__init__(table, predicate, limit)
- self.startingSnapshotId = start
- self.endingSnapshotId = end
-
- def plan_files(self) -> List[ManifestEntry]:
- snapshots_in_range = []
- for snapshot_id in range(self.startingSnapshotId + 1,
self.endingSnapshotId + 1):
- snapshot = self.snapshot_manager.get_snapshot_by_id(snapshot_id)
- if snapshot.commit_kind == "APPEND":
- snapshots_in_range.append(snapshot)
-
- # Collect all file entries from all snapshots in range
- file_entries = []
-
- for snapshot in snapshots_in_range:
- manifest_files = self.manifest_list_manager.read_delta(snapshot)
- entries = self.read_manifest_entries(manifest_files)
- file_entries.extend(entries)
- return file_entries
-
- @staticmethod
- def between_timestamps(table, predicate: Optional[Predicate], limit:
Optional[int],
- start_timestamp: int, end_timestamp: int) ->
'IncrementalStartingScanner':
- """
- Create an IncrementalStartingScanner for snapshots between two
timestamps.
- """
- snapshot_manager = SnapshotManager(table)
- starting_snapshot =
snapshot_manager.earlier_or_equal_time_mills(start_timestamp)
- earliest_snapshot = snapshot_manager.try_get_earliest_snapshot()
-
- # If earliest_snapshot.time_millis > start_timestamp we should include
the earliest_snapshot
- if starting_snapshot is None or (earliest_snapshot and
earliest_snapshot.time_millis > start_timestamp):
- start_id = earliest_snapshot.id - 1 if earliest_snapshot else -1
- else:
- start_id = starting_snapshot.id
-
- end_snapshot =
snapshot_manager.earlier_or_equal_time_mills(end_timestamp)
- latest_snapshot = snapshot_manager.get_latest_snapshot()
- end_id = end_snapshot.id if end_snapshot else (latest_snapshot.id if
latest_snapshot else -1)
-
- return IncrementalStartingScanner(table, predicate, limit, start_id,
end_id)
diff --git a/paimon-python/pypaimon/read/scanner/starting_scanner.py
b/paimon-python/pypaimon/read/scanner/starting_scanner.py
deleted file mode 100644
index 7e6cdfd81a..0000000000
--- a/paimon-python/pypaimon/read/scanner/starting_scanner.py
+++ /dev/null
@@ -1,28 +0,0 @@
-"""
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-"""
-from abc import ABC, abstractmethod
-
-from pypaimon.read.plan import Plan
-
-
-class StartingScanner(ABC):
- """Helper class for the first planning of TableScan."""
-
- @abstractmethod
- def scan(self) -> Plan:
- """Plan the files to read."""
diff --git a/paimon-python/pypaimon/read/table_scan.py
b/paimon-python/pypaimon/read/table_scan.py
index 8276163450..cca81998b2 100755
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -22,12 +22,9 @@ from pypaimon.common.options.core_options import CoreOptions
from pypaimon.common.predicate import Predicate
from pypaimon.read.plan import Plan
-from pypaimon.read.scanner.empty_starting_scanner import EmptyStartingScanner
-from pypaimon.read.scanner.full_starting_scanner import FullStartingScanner
-from pypaimon.read.scanner.incremental_starting_scanner import \
- IncrementalStartingScanner
-from pypaimon.read.scanner.starting_scanner import StartingScanner
+from pypaimon.read.scanner.file_scanner import FileScanner
from pypaimon.snapshot.snapshot_manager import SnapshotManager
+from pypaimon.manifest.manifest_list_manager import ManifestListManager
if TYPE_CHECKING:
from pypaimon.globalindex.vector_search import VectorSearch
@@ -49,23 +46,25 @@ class TableScan:
self.predicate = predicate
self.limit = limit
self.vector_search = vector_search
- self.starting_scanner = self._create_starting_scanner()
+ self.file_scanner = self._create_file_scanner()
def plan(self) -> Plan:
- return self.starting_scanner.scan()
+ return self.file_scanner.scan()
- def _create_starting_scanner(self) -> Optional[StartingScanner]:
+ def _create_file_scanner(self) -> FileScanner:
options = self.table.options.options
+ snapshot_manager = SnapshotManager(self.table)
+ manifest_list_manager = ManifestListManager(self.table)
if options.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP):
ts =
options.get(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP).split(",")
if len(ts) != 2:
raise ValueError(
"The incremental-between-timestamp must specific
start(exclusive) and end timestamp. But is: " +
options.get(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP))
- earliest_snapshot =
SnapshotManager(self.table).try_get_earliest_snapshot()
- latest_snapshot = SnapshotManager(self.table).get_latest_snapshot()
+ earliest_snapshot = snapshot_manager.try_get_earliest_snapshot()
+ latest_snapshot = snapshot_manager.get_latest_snapshot()
if earliest_snapshot is None or latest_snapshot is None:
- return EmptyStartingScanner()
+ return FileScanner(self.table, lambda: [])
start_timestamp = int(ts[0])
end_timestamp = int(ts[1])
if start_timestamp >= end_timestamp:
@@ -73,20 +72,53 @@ class TableScan:
"Ending timestamp %s should be >= starting timestamp %s."
% (end_timestamp, start_timestamp))
if (start_timestamp == end_timestamp or start_timestamp >
latest_snapshot.time_millis
or end_timestamp < earliest_snapshot.time_millis):
- return EmptyStartingScanner()
- return IncrementalStartingScanner.between_timestamps(self.table,
self.predicate, self.limit,
-
start_timestamp, end_timestamp)
- return FullStartingScanner(
+ return FileScanner(self.table, lambda: [])
+
+ starting_snapshot =
snapshot_manager.earlier_or_equal_time_mills(start_timestamp)
+ earliest_snapshot = snapshot_manager.try_get_earliest_snapshot()
+
+ # If earliest_snapshot.time_millis > start_timestamp we should
include the earliest_snapshot
+ if starting_snapshot is None or (earliest_snapshot and
earliest_snapshot.time_millis > start_timestamp):
+ start_id = earliest_snapshot.id - 1 if earliest_snapshot else
-1
+ else:
+ start_id = starting_snapshot.id
+
+ end_snapshot =
snapshot_manager.earlier_or_equal_time_mills(end_timestamp)
+ latest_snapshot = snapshot_manager.get_latest_snapshot()
+ end_id = end_snapshot.id if end_snapshot else (latest_snapshot.id
if latest_snapshot else -1)
+
+ def incremental_manifest():
+ snapshots_in_range = []
+ for snapshot_id in range(start_id + 1, end_id + 1):
+ snapshot = snapshot_manager.get_snapshot_by_id(snapshot_id)
+ if snapshot.commit_kind == "APPEND":
+ snapshots_in_range.append(snapshot)
+
+ manifests = []
+
+ for snapshot in snapshots_in_range:
+ manifest_files = manifest_list_manager.read_delta(snapshot)
+ manifests.extend(manifest_files)
+ return manifests
+
+ return FileScanner(self.table, incremental_manifest,
self.predicate, self.limit)
+
+ def all_manifests():
+ snapshot = snapshot_manager.get_latest_snapshot()
+ return manifest_list_manager.read_all(snapshot)
+
+ return FileScanner(
self.table,
+ all_manifests,
self.predicate,
self.limit,
vector_search=self.vector_search
)
def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) ->
'TableScan':
- self.starting_scanner.with_shard(idx_of_this_subtask,
number_of_para_subtasks)
+ self.file_scanner.with_shard(idx_of_this_subtask,
number_of_para_subtasks)
return self
def with_slice(self, start_pos, end_pos) -> 'TableScan':
- self.starting_scanner.with_slice(start_pos, end_pos)
+ self.file_scanner.with_slice(start_pos, end_pos)
return self
diff --git a/paimon-python/pypaimon/tests/binary_row_test.py
b/paimon-python/pypaimon/tests/binary_row_test.py
index 5bfabfb121..1f440a9969 100644
--- a/paimon-python/pypaimon/tests/binary_row_test.py
+++ b/paimon-python/pypaimon/tests/binary_row_test.py
@@ -26,7 +26,7 @@ import pyarrow as pa
from pypaimon import CatalogFactory, Schema
from pypaimon.manifest.schema.manifest_entry import ManifestEntry
from pypaimon.manifest.schema.simple_stats import SimpleStats
-from pypaimon.read.scanner.full_starting_scanner import FullStartingScanner
+from pypaimon.read.scanner.file_scanner import FileScanner
from pypaimon.table.row.generic_row import GenericRow, GenericRowDeserializer
@@ -112,10 +112,10 @@ class BinaryRowTest(unittest.TestCase):
def test_is_not_null_append(self):
table = self.catalog.get_table('default.test_append')
- starting_scanner = FullStartingScanner(table, None, None)
- latest_snapshot =
starting_scanner.snapshot_manager.get_latest_snapshot()
- manifest_files =
starting_scanner.manifest_list_manager.read_all(latest_snapshot)
- manifest_entries =
starting_scanner.manifest_file_manager.read(manifest_files[0].file_name)
+ file_scanner = FileScanner(table, lambda: [])
+ latest_snapshot = file_scanner.snapshot_manager.get_latest_snapshot()
+ manifest_files =
file_scanner.manifest_list_manager.read_all(latest_snapshot)
+ manifest_entries =
file_scanner.manifest_file_manager.read(manifest_files[0].file_name)
self._transform_manifest_entries(manifest_entries, [])
l = ['abc', 'abbc', 'bc', 'd', None]
for i, entry in enumerate(manifest_entries):
@@ -125,7 +125,7 @@ class BinaryRowTest(unittest.TestCase):
GenericRow([l[i]], [table.fields[1]]),
[1 if l[i] is None else 0],
)
-
starting_scanner.manifest_file_manager.write(manifest_files[0].file_name,
manifest_entries)
+ file_scanner.manifest_file_manager.write(manifest_files[0].file_name,
manifest_entries)
read_builder = table.new_read_builder()
predicate_builder = read_builder.new_predicate_builder()
@@ -249,10 +249,10 @@ class BinaryRowTest(unittest.TestCase):
table_write.close()
table_commit.close()
- starting_scanner = FullStartingScanner(table, None, None)
- latest_snapshot =
starting_scanner.snapshot_manager.get_latest_snapshot()
- manifest_files =
starting_scanner.manifest_list_manager.read_all(latest_snapshot)
- manifest_entries =
starting_scanner.manifest_file_manager.read(manifest_files[0].file_name)
+ file_scanner = FileScanner(table, lambda: [])
+ latest_snapshot = file_scanner.snapshot_manager.get_latest_snapshot()
+ manifest_files =
file_scanner.manifest_list_manager.read_all(latest_snapshot)
+ manifest_entries =
file_scanner.manifest_file_manager.read(manifest_files[0].file_name)
self._transform_manifest_entries(manifest_entries, [])
for i, entry in enumerate(manifest_entries):
entry.file.value_stats_cols = ['f2', 'f6', 'f8']
@@ -261,7 +261,7 @@ class BinaryRowTest(unittest.TestCase):
GenericRow([10 * (i + 1), 100 * (i + 1), 5 - i],
[table.fields[2], table.fields[6], table.fields[8]]),
[0, 0, 0],
)
-
starting_scanner.manifest_file_manager.write(manifest_files[0].file_name,
manifest_entries)
+ file_scanner.manifest_file_manager.write(manifest_files[0].file_name,
manifest_entries)
# Build multiple predicates and combine them
read_builder = table.new_read_builder()
predicate_builder = read_builder.new_predicate_builder()
@@ -288,10 +288,10 @@ class BinaryRowTest(unittest.TestCase):
}
self.assertEqual(expected_data, actual.to_pydict())
- starting_scanner = FullStartingScanner(table, None, None)
- latest_snapshot =
starting_scanner.snapshot_manager.get_latest_snapshot()
- manifest_files =
starting_scanner.manifest_list_manager.read_all(latest_snapshot)
- manifest_entries =
starting_scanner.manifest_file_manager.read(manifest_files[0].file_name)
+ file_scanner = FileScanner(table, lambda: [])
+ latest_snapshot = file_scanner.snapshot_manager.get_latest_snapshot()
+ manifest_files =
file_scanner.manifest_list_manager.read_all(latest_snapshot)
+ manifest_entries =
file_scanner.manifest_file_manager.read(manifest_files[0].file_name)
self._transform_manifest_entries(manifest_entries, [])
for i, entry in enumerate(manifest_entries):
entry.file.value_stats_cols = ['f2', 'f6', 'f8']
@@ -300,7 +300,7 @@ class BinaryRowTest(unittest.TestCase):
GenericRow([0, 100 * (i + 1), 5 - i], [table.fields[2],
table.fields[6], table.fields[8]]),
[0, 0, 0],
)
-
starting_scanner.manifest_file_manager.write(manifest_files[0].file_name,
manifest_entries)
+ file_scanner.manifest_file_manager.write(manifest_files[0].file_name,
manifest_entries)
splits, actual = self._read_result(read_builder.with_filter(combined))
self.assertFalse(actual)
@@ -319,10 +319,10 @@ class BinaryRowTest(unittest.TestCase):
trimmed_pk_fields)
def _overwrite_manifest_entry(self, table):
- starting_scanner = FullStartingScanner(table, None, None)
- latest_snapshot =
starting_scanner.snapshot_manager.get_latest_snapshot()
- manifest_files =
starting_scanner.manifest_list_manager.read_all(latest_snapshot)
- manifest_entries =
starting_scanner.manifest_file_manager.read(manifest_files[0].file_name)
+ file_scanner = FileScanner(table, lambda: [])
+ latest_snapshot = file_scanner.snapshot_manager.get_latest_snapshot()
+ manifest_files =
file_scanner.manifest_list_manager.read_all(latest_snapshot)
+ manifest_entries =
file_scanner.manifest_file_manager.read(manifest_files[0].file_name)
self._transform_manifest_entries(manifest_entries, [])
for i, entry in enumerate(manifest_entries):
entry.file.value_stats_cols = ['f2']
@@ -331,4 +331,4 @@ class BinaryRowTest(unittest.TestCase):
GenericRow([6 + i], [table.fields[2]]),
[0],
)
-
starting_scanner.manifest_file_manager.write(manifest_files[0].file_name,
manifest_entries)
+ file_scanner.manifest_file_manager.write(manifest_files[0].file_name,
manifest_entries)
diff --git a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
index 7f27b15b31..cfdf33b755 100644
--- a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
+++ b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
@@ -181,10 +181,10 @@ class RESTAOReadWritePy36Test(RESTBaseTest):
# to test GenericRow ability
latest_snapshot = SnapshotManager(table).get_latest_snapshot()
- manifest_files =
table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot)
- manifest_entries =
table_scan.starting_scanner.manifest_file_manager.read(
+ manifest_files =
table_scan.file_scanner.manifest_list_manager.read_all(latest_snapshot)
+ manifest_entries = table_scan.file_scanner.manifest_file_manager.read(
manifest_files[0].file_name,
- lambda row:
table_scan.starting_scanner._filter_manifest_entry(row),
+ lambda row: table_scan.file_scanner._filter_manifest_entry(row),
drop_stats=False)
# Python write does not produce value stats
if stats_enabled:
diff --git a/paimon-python/pypaimon/tests/reader_base_test.py
b/paimon-python/pypaimon/tests/reader_base_test.py
index 3d9ed7f874..8a07c66239 100644
--- a/paimon-python/pypaimon/tests/reader_base_test.py
+++ b/paimon-python/pypaimon/tests/reader_base_test.py
@@ -227,9 +227,9 @@ class ReaderBasicTest(unittest.TestCase):
# to test GenericRow ability
latest_snapshot = SnapshotManager(table).get_latest_snapshot()
- manifest_files =
table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot)
- manifest_entries =
table_scan.starting_scanner.manifest_file_manager.read(
- manifest_files[0].file_name, lambda row:
table_scan.starting_scanner._filter_manifest_entry(row), False)
+ manifest_files =
table_scan.file_scanner.manifest_list_manager.read_all(latest_snapshot)
+ manifest_entries = table_scan.file_scanner.manifest_file_manager.read(
+ manifest_files[0].file_name, lambda row:
table_scan.file_scanner._filter_manifest_entry(row), False)
# Python write does not produce value stats
if stats_enabled:
@@ -513,10 +513,10 @@ class ReaderBasicTest(unittest.TestCase):
pk_read_builder = pk_table.new_read_builder()
pk_table_scan = pk_read_builder.new_scan()
latest_snapshot = SnapshotManager(pk_table).get_latest_snapshot()
- pk_manifest_files =
pk_table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot)
- pk_manifest_entries =
pk_table_scan.starting_scanner.manifest_file_manager.read(
+ pk_manifest_files =
pk_table_scan.file_scanner.manifest_list_manager.read_all(latest_snapshot)
+ pk_manifest_entries =
pk_table_scan.file_scanner.manifest_file_manager.read(
pk_manifest_files[0].file_name,
- lambda row:
pk_table_scan.starting_scanner._filter_manifest_entry(row),
+ lambda row: pk_table_scan.file_scanner._filter_manifest_entry(row),
False
)
@@ -530,7 +530,7 @@ class ReaderBasicTest(unittest.TestCase):
f"table.fields should NOT contain system fields, but
got: {pk_table_field_names}")
if pk_file_meta.value_stats_cols is None:
- pk_value_stats_fields =
pk_table_scan.starting_scanner.manifest_file_manager._get_value_stats_fields(
+ pk_value_stats_fields =
pk_table_scan.file_scanner.manifest_file_manager._get_value_stats_fields(
{'_VALUE_STATS_COLS': None},
pk_table.fields
)
@@ -583,10 +583,10 @@ class ReaderBasicTest(unittest.TestCase):
read_builder = table.new_read_builder()
table_scan = read_builder.new_scan()
latest_snapshot = SnapshotManager(table).get_latest_snapshot()
- manifest_files =
table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot)
- manifest_entries =
table_scan.starting_scanner.manifest_file_manager.read(
+ manifest_files =
table_scan.file_scanner.manifest_list_manager.read_all(latest_snapshot)
+ manifest_entries = table_scan.file_scanner.manifest_file_manager.read(
manifest_files[0].file_name,
- lambda row:
table_scan.starting_scanner._filter_manifest_entry(row),
+ lambda row: table_scan.file_scanner._filter_manifest_entry(row),
False
)
@@ -1090,10 +1090,10 @@ class ReaderBasicTest(unittest.TestCase):
read_builder = table.new_read_builder()
table_scan = read_builder.new_scan()
latest_snapshot = SnapshotManager(table).get_latest_snapshot()
- manifest_files =
table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot)
- manifest_entries =
table_scan.starting_scanner.manifest_file_manager.read(
+ manifest_files =
table_scan.file_scanner.manifest_list_manager.read_all(latest_snapshot)
+ manifest_entries = table_scan.file_scanner.manifest_file_manager.read(
manifest_files[0].file_name,
- lambda row:
table_scan.starting_scanner._filter_manifest_entry(row),
+ lambda row: table_scan.file_scanner._filter_manifest_entry(row),
False
)
@@ -1146,7 +1146,7 @@ class ReaderBasicTest(unittest.TestCase):
self.assertFalse(is_system_field,
f"value_stats_cols should not contain system
field: {field_name}")
- value_stats_fields =
table_scan.starting_scanner.manifest_file_manager._get_value_stats_fields(
+ value_stats_fields =
table_scan.file_scanner.manifest_file_manager._get_value_stats_fields(
{'_VALUE_STATS_COLS': file_meta.value_stats_cols},
table.fields
)
diff --git a/paimon-python/pypaimon/tests/reader_primary_key_test.py
b/paimon-python/pypaimon/tests/reader_primary_key_test.py
index c22346afe7..811cbb24dd 100644
--- a/paimon-python/pypaimon/tests/reader_primary_key_test.py
+++ b/paimon-python/pypaimon/tests/reader_primary_key_test.py
@@ -358,10 +358,10 @@ class PkReaderTest(unittest.TestCase):
latest_snapshot = snapshot_manager.get_latest_snapshot()
read_builder = table.new_read_builder()
table_scan = read_builder.new_scan()
- manifest_list_manager =
table_scan.starting_scanner.manifest_list_manager
+ manifest_list_manager = table_scan.file_scanner.manifest_list_manager
manifest_files = manifest_list_manager.read_all(latest_snapshot)
- manifest_file_manager =
table_scan.starting_scanner.manifest_file_manager
+ manifest_file_manager = table_scan.file_scanner.manifest_file_manager
creation_times_found = []
for manifest_file_meta in manifest_files:
entries = manifest_file_manager.read(manifest_file_meta.file_name,
drop_stats=False)
diff --git a/paimon-python/pypaimon/tests/schema_evolution_read_test.py
b/paimon-python/pypaimon/tests/schema_evolution_read_test.py
index a67a927a5e..dffd985bb6 100644
--- a/paimon-python/pypaimon/tests/schema_evolution_read_test.py
+++ b/paimon-python/pypaimon/tests/schema_evolution_read_test.py
@@ -259,14 +259,14 @@ class SchemaEvolutionReadTest(unittest.TestCase):
schema_manager.commit(TableSchema.from_schema(schema_id=0,
schema=schema))
schema_manager.commit(TableSchema.from_schema(schema_id=1,
schema=schema2))
# scan filter for schema evolution
- latest_snapshot =
table1.new_read_builder().new_scan().starting_scanner.snapshot_manager.get_latest_snapshot()
+ latest_snapshot =
table1.new_read_builder().new_scan().file_scanner.snapshot_manager.get_latest_snapshot()
table2.table_path = table1.table_path
new_read_buidler = table2.new_read_builder()
predicate_builder = new_read_buidler.new_predicate_builder()
predicate = predicate_builder.less_than('user_id', 3)
new_scan = new_read_buidler.with_filter(predicate).new_scan()
- manifest_files =
new_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot)
- entries =
new_scan.starting_scanner.read_manifest_entries(manifest_files)
+ manifest_files =
new_scan.file_scanner.manifest_list_manager.read_all(latest_snapshot)
+ entries = new_scan.file_scanner.read_manifest_entries(manifest_files)
self.assertEqual(1, len(entries)) # verify scan filter success for
schema evolution
def test_schema_evolution_with_read_filter(self):
diff --git a/paimon-python/pypaimon/write/file_store_commit.py
b/paimon-python/pypaimon/write/file_store_commit.py
index 71c22224dc..6833ec3fd5 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -28,7 +28,7 @@ from pypaimon.manifest.manifest_list_manager import
ManifestListManager
from pypaimon.manifest.schema.manifest_entry import ManifestEntry
from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta
from pypaimon.manifest.schema.simple_stats import SimpleStats
-from pypaimon.read.scanner.full_starting_scanner import FullStartingScanner
+from pypaimon.read.scanner.file_scanner import FileScanner
from pypaimon.snapshot.snapshot import Snapshot
from pypaimon.snapshot.snapshot_commit import (PartitionStatistics,
SnapshotCommit)
@@ -371,7 +371,7 @@ class FileStoreCommit:
"""Generate commit entries for OVERWRITE mode based on latest
snapshot."""
entries = []
current_entries = [] if latestSnapshot is None \
- else (FullStartingScanner(self.table, partition_filter, None).
+ else (FileScanner(self.table, lambda: [], partition_filter).
read_manifest_entries(self.manifest_list_manager.read_all(latestSnapshot)))
for entry in current_entries:
entry.kind = 1 # DELETE