This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 032fee4b4c [Python] Introduce incremental-between read by timestamp
(#6391)
032fee4b4c is described below
commit 032fee4b4c196e5d773f78dd1384c5356493ba06
Author: umi <[email protected]>
AuthorDate: Fri Oct 17 10:03:44 2025 +0800
[Python] Introduce incremental-between read by timestamp (#6391)
---
docs/content/program-api/python-api.md | 192 ++++++++-
paimon-python/pypaimon/__init__.py | 2 +-
paimon-python/pypaimon/common/core_options.py | 1 +
paimon-python/pypaimon/filesystem/pvfs.py | 2 +-
.../pypaimon/manifest/manifest_file_manager.py | 3 +-
.../pypaimon/manifest/manifest_list_manager.py | 3 +
paimon-python/pypaimon/read/scanner/__init__.py | 17 +
.../read/scanner/empty_starting_scanner.py | 25 ++
.../full_starting_scanner.py} | 151 +------
.../read/scanner/incremental_starting_scanner.py | 77 ++++
.../pypaimon/read/scanner/starting_scanner.py | 28 ++
paimon-python/pypaimon/read/table_read.py | 2 +-
paimon-python/pypaimon/read/table_scan.py | 463 ++-------------------
.../pypaimon/snapshot/snapshot_manager.py | 55 +++
paimon-python/pypaimon/table/file_store_table.py | 17 +
.../pypaimon/tests/filesystem_catalog_test.py | 3 +-
paimon-python/pypaimon/tests/predicates_test.py | 3 +-
.../pypaimon/tests/py36/ao_predicate_test.py | 8 +-
.../pypaimon/tests/py36/ao_simple_test.py | 6 +-
.../pypaimon/tests/py36/rest_ao_read_write_test.py | 53 ++-
.../pypaimon/tests/reader_append_only_test.py | 71 +++-
paimon-python/pypaimon/tests/reader_base_test.py | 27 +-
.../pypaimon/tests/reader_primary_key_test.py | 79 +++-
.../pypaimon/tests/rest/rest_base_test.py | 3 +-
paimon-python/pypaimon/write/file_store_commit.py | 4 +-
25 files changed, 670 insertions(+), 625 deletions(-)
diff --git a/docs/content/program-api/python-api.md
b/docs/content/program-api/python-api.md
index 6f7d8bb697..579fce109e 100644
--- a/docs/content/program-api/python-api.md
+++ b/docs/content/program-api/python-api.md
@@ -286,7 +286,8 @@ for batch in table_read.to_arrow_batch_reader(splits):
```
#### Python Iterator
-You can read the data row by row into a native Python iterator.
+
+You can read the data row by row into a native Python iterator.
This is convenient for custom row-based processing logic.
```python
@@ -365,23 +366,177 @@ print(ray_dataset.to_pandas())
# ...
```
+### Incremental Read Between Timestamps
+
+This API allows reading data committed between two snapshot timestamps. The
steps are as follows.
+
+- Set the option `CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP` on a copied table
via `table.copy({...})`. The value must
+ be a string: `"startMillis,endMillis"`, where `startMillis` is exclusive and
`endMillis` is inclusive.
+- Use `SnapshotManager` to obtain snapshot timestamps or you can determine
them by yourself.
+- Read the data as above.
+
+Example:
+
+```python
+from pypaimon import CatalogFactory
+from pypaimon.common.core_options import CoreOptions
+from pypaimon.snapshot.snapshot_manager import SnapshotManager
+
+# Prepare catalog and obtain a table
+catalog = CatalogFactory.create({'warehouse': '/path/to/warehouse'})
+table = catalog.get_table('default.your_table_name')
+
+# Assume the table has at least two snapshots (1 and 2)
+snapshot_manager = SnapshotManager(table)
+t1 = snapshot_manager.get_snapshot_by_id(1).time_millis
+t2 = snapshot_manager.get_snapshot_by_id(2).time_millis
+
+# Read records committed between [t1, t2]
+table_inc = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP:
f"{t1},{t2}"})
+
+read_builder = table_inc.new_read_builder()
+table_scan = read_builder.new_scan()
+table_read = read_builder.new_read()
+splits = table_scan.plan().splits()
+
+# To Arrow
+arrow_table = table_read.to_arrow(splits)
+
+# Or to pandas
+pandas_df = table_read.to_pandas(splits)
+```
+
+### Shard Read
+
+Shard Read allows you to read data in parallel by dividing the table into
multiple shards. This is useful for
+distributed processing and parallel computation.
+
+You can specify the shard index and total number of shards to read a specific
portion of the data:
+
+```python
+# Prepare read builder
+table = catalog.get_table('database_name.table_name')
+read_builder = table.new_read_builder()
+table_read = read_builder.new_read()
+
+# Read the second shard (index 1) out of 3 total shards
+splits = read_builder.new_scan().with_shard(1, 3).plan().splits()
+
+# Read all shards and concatenate results
+splits1 = read_builder.new_scan().with_shard(0, 3).plan().splits()
+splits2 = read_builder.new_scan().with_shard(1, 3).plan().splits()
+splits3 = read_builder.new_scan().with_shard(2, 3).plan().splits()
+
+# Combine results from all shards
+
+all_splits = splits1 + splits2 + splits3
+pa_table = table_read.to_arrow(all_splits)
+```
+
+Example with shard read:
+
+```python
+import pyarrow as pa
+from pypaimon import CatalogFactory, Schema
+
+# Create catalog
+catalog_options = {'warehouse': 'file:///path/to/warehouse'}
+catalog = CatalogFactory.create(catalog_options)
+catalog.create_database("default", False)
+# Define schema
+pa_schema = pa.schema([
+ ('user_id', pa.int64()),
+ ('item_id', pa.int64()),
+ ('behavior', pa.string()),
+ ('dt', pa.string()),
+])
+
+# Create table and write data
+schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt'])
+catalog.create_table('default.test_table', schema, False)
+table = catalog.get_table('default.test_table')
+
+# Write data in two batches
+write_builder = table.new_batch_write_builder()
+
+# First write
+table_write = write_builder.new_write()
+table_commit = write_builder.new_commit()
+data1 = {
+ 'user_id': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14],
+ 'item_id': [1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009, 1010,
1011, 1012, 1013, 1014],
+ 'behavior': ['a', 'b', 'c', None, 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k',
'l', 'm'],
+ 'dt': ['p1', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2',
'p1', 'p2', 'p1'],
+}
+pa_table = pa.Table.from_pydict(data1, schema=pa_schema)
+table_write.write_arrow(pa_table)
+table_commit.commit(table_write.prepare_commit())
+table_write.close()
+table_commit.close()
+
+# Second write
+table_write = write_builder.new_write()
+table_commit = write_builder.new_commit()
+data2 = {
+ 'user_id': [5, 6, 7, 8, 18],
+ 'item_id': [1005, 1006, 1007, 1008, 1018],
+ 'behavior': ['e', 'f', 'g', 'h', 'z'],
+ 'dt': ['p2', 'p1', 'p2', 'p2', 'p1'],
+}
+pa_table = pa.Table.from_pydict(data2, schema=pa_schema)
+table_write.write_arrow(pa_table)
+table_commit.commit(table_write.prepare_commit())
+table_write.close()
+table_commit.close()
+
+# Read specific shard
+read_builder = table.new_read_builder()
+table_read = read_builder.new_read()
+
+# Read shard 2 out of 3 total shards
+splits = read_builder.new_scan().with_shard(2, 3).plan().splits()
+shard_data = table_read.to_arrow(splits)
+
+# Verify shard distribution by reading all shards
+splits1 = read_builder.new_scan().with_shard(0, 3).plan().splits()
+splits2 = read_builder.new_scan().with_shard(1, 3).plan().splits()
+splits3 = read_builder.new_scan().with_shard(2, 3).plan().splits()
+
+# Combine all shards should equal full table read
+all_shards_data = pa.concat_tables([
+ table_read.to_arrow(splits1),
+ table_read.to_arrow(splits2),
+ table_read.to_arrow(splits3),
+])
+full_table_data = table_read.to_arrow(read_builder.new_scan().plan().splits())
+```
+
+Key points about shard read:
+
+- **Shard Index**: Zero-based index of the shard to read (0 to total_shards-1)
+- **Total Shards**: Total number of shards to divide the data into
+- **Data Distribution**: Data is distributed evenly across shards, with
remainder rows going to the last shard
+- **Parallel Processing**: Each shard can be processed independently for
better performance
+- **Consistency**: Combining all shards should produce the complete table data
+
## Data Types
-| Python Native Type | PyArrow Type | Paimon Type |
-| :--- | :--- | :--- |
-| `int` | `pyarrow.int8()` | `TINYINT` |
-| `int` | `pyarrow.int16()` | `SMALLINT` |
-| `int` | `pyarrow.int32()` | `INT` |
-| `int` | `pyarrow.int64()` | `BIGINT` |
-| `float` | `pyarrow.float32()` | `FLOAT` |
-| `float` | `pyarrow.float64()` | `DOUBLE` |
-| `bool` | `pyarrow.bool_()` | `BOOLEAN` |
-| `str` | `pyarrow.string()` | `STRING`, `CHAR(n)`, `VARCHAR(n)` |
-| `bytes` | `pyarrow.binary()` | `BYTES`, `VARBINARY(n)` |
-| `bytes` | `pyarrow.binary(length)` | `BINARY(length)` |
-| `decimal.Decimal` | `pyarrow.decimal128(precision, scale)` |
`DECIMAL(precision, scale)` |
-| `datetime.datetime` | `pyarrow.timestamp(unit, tz=None)` | `TIMESTAMP(p)` |
-| `datetime.date` | `pyarrow.date32()` | `DATE` |
-| `datetime.time` | `pyarrow.time32(unit)` or `pyarrow.time64(unit)` |
`TIME(p)` |
+
+| Python Native Type | PyArrow Type |
Paimon Type |
+|:--------------------|:-------------------------------------------------|:----------------------------------|
+| `int` | `pyarrow.int8()` |
`TINYINT` |
+| `int` | `pyarrow.int16()` |
`SMALLINT` |
+| `int` | `pyarrow.int32()` |
`INT` |
+| `int` | `pyarrow.int64()` |
`BIGINT` |
+| `float` | `pyarrow.float32()` |
`FLOAT` |
+| `float` | `pyarrow.float64()` |
`DOUBLE` |
+| `bool` | `pyarrow.bool_()` |
`BOOLEAN` |
+| `str` | `pyarrow.string()` |
`STRING`, `CHAR(n)`, `VARCHAR(n)` |
+| `bytes` | `pyarrow.binary()` |
`BYTES`, `VARBINARY(n)` |
+| `bytes` | `pyarrow.binary(length)` |
`BINARY(length)` |
+| `decimal.Decimal` | `pyarrow.decimal128(precision, scale)` |
`DECIMAL(precision, scale)` |
+| `datetime.datetime` | `pyarrow.timestamp(unit, tz=None)` |
`TIMESTAMP(p)` |
+| `datetime.date` | `pyarrow.date32()` |
`DATE` |
+| `datetime.time` | `pyarrow.time32(unit)` or `pyarrow.time64(unit)` |
`TIME(p)` |
## Predicate
@@ -402,5 +557,4 @@ print(ray_dataset.to_pandas())
| f.contains(literal) | PredicateBuilder.contains(f, literal) |
| f is in [l1, l2] | PredicateBuilder.is_in(f, [l1, l2]) |
| f is not in [l1, l2] | PredicateBuilder.is_not_in(f, [l1, l2]) |
-| lower <= f <= upper | PredicateBuilder.between(f, lower, upper) |
-
+| lower <= f <= upper | PredicateBuilder.between(f, lower, upper) |
\ No newline at end of file
diff --git a/paimon-python/pypaimon/__init__.py
b/paimon-python/pypaimon/__init__.py
index 060529a185..5313e8e18a 100644
--- a/paimon-python/pypaimon/__init__.py
+++ b/paimon-python/pypaimon/__init__.py
@@ -15,8 +15,8 @@
# specific language governing permissions and limitations
# under the License.
-from pypaimon.filesystem.pvfs import PaimonVirtualFileSystem
from pypaimon.catalog.catalog_factory import CatalogFactory
+from pypaimon.filesystem.pvfs import PaimonVirtualFileSystem
from pypaimon.schema.schema import Schema
__all__ = [
diff --git a/paimon-python/pypaimon/common/core_options.py
b/paimon-python/pypaimon/common/core_options.py
index 754bb17c05..82b788438e 100644
--- a/paimon-python/pypaimon/common/core_options.py
+++ b/paimon-python/pypaimon/common/core_options.py
@@ -45,5 +45,6 @@ class CoreOptions(str, Enum):
FILE_BLOCK_SIZE = "file.block-size"
# Scan options
SCAN_FALLBACK_BRANCH = "scan.fallback-branch"
+ INCREMENTAL_BETWEEN_TIMESTAMP = "incremental-between-timestamp"
# Commit options
COMMIT_USER_PREFIX = "commit.user-prefix"
diff --git a/paimon-python/pypaimon/filesystem/pvfs.py
b/paimon-python/pypaimon/filesystem/pvfs.py
index d7dabbbcaa..15f9f9f8dc 100644
--- a/paimon-python/pypaimon/filesystem/pvfs.py
+++ b/paimon-python/pypaimon/filesystem/pvfs.py
@@ -29,7 +29,7 @@ from fsspec import AbstractFileSystem
from fsspec.implementations.local import LocalFileSystem
from readerwriterlock import rwlock
-from pypaimon.api.api_response import GetTableTokenResponse, GetTableResponse
+from pypaimon.api.api_response import GetTableResponse, GetTableTokenResponse
from pypaimon.api.client import AlreadyExistsException, NoSuchResourceException
from pypaimon.api.rest_api import RESTApi
from pypaimon.common.config import CatalogOptions, OssOptions, PVFSOptions
diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py
b/paimon-python/pypaimon/manifest/manifest_file_manager.py
index e3c9601cf4..9893b8659a 100644
--- a/paimon-python/pypaimon/manifest/manifest_file_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py
@@ -15,10 +15,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-import fastavro
from io import BytesIO
from typing import List
+import fastavro
+
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.manifest.schema.manifest_entry import (MANIFEST_ENTRY_SCHEMA,
ManifestEntry)
diff --git a/paimon-python/pypaimon/manifest/manifest_list_manager.py
b/paimon-python/pypaimon/manifest/manifest_list_manager.py
index 2fc1eea011..0fc58652f6 100644
--- a/paimon-python/pypaimon/manifest/manifest_list_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_list_manager.py
@@ -47,6 +47,9 @@ class ManifestListManager:
manifest_files.extend(delta_manifests)
return manifest_files
+ def read_delta(self, snapshot: Snapshot) -> List[ManifestFileMeta]:
+ return self.read(snapshot.delta_manifest_list)
+
def read(self, manifest_list_name: str) -> List[ManifestFileMeta]:
manifest_files = []
diff --git a/paimon-python/pypaimon/read/scanner/__init__.py
b/paimon-python/pypaimon/read/scanner/__init__.py
new file mode 100644
index 0000000000..53ed4d36c2
--- /dev/null
+++ b/paimon-python/pypaimon/read/scanner/__init__.py
@@ -0,0 +1,17 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
diff --git a/paimon-python/pypaimon/read/scanner/empty_starting_scanner.py
b/paimon-python/pypaimon/read/scanner/empty_starting_scanner.py
new file mode 100644
index 0000000000..9c870aa588
--- /dev/null
+++ b/paimon-python/pypaimon/read/scanner/empty_starting_scanner.py
@@ -0,0 +1,25 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+from pypaimon.read.plan import Plan
+from pypaimon.read.scanner.starting_scanner import StartingScanner
+
+
+class EmptyStartingScanner(StartingScanner):
+
+ def scan(self) -> Plan:
+ return Plan([])
diff --git a/paimon-python/pypaimon/read/table_scan.py
b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
similarity index 72%
copy from paimon-python/pypaimon/read/table_scan.py
copy to paimon-python/pypaimon/read/scanner/full_starting_scanner.py
index d76725ca97..4275b3f3e2 100644
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
@@ -1,21 +1,20 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
from collections import defaultdict
from typing import Callable, List, Optional
@@ -29,17 +28,15 @@ from pypaimon.read.interval_partition import
IntervalPartition, SortedRun
from pypaimon.read.plan import Plan
from pypaimon.read.push_down_utils import (extract_predicate_to_dict,
extract_predicate_to_list)
+from pypaimon.read.scanner.starting_scanner import StartingScanner
from pypaimon.read.split import Split
from pypaimon.schema.data_types import DataField
from pypaimon.snapshot.snapshot_manager import SnapshotManager
from pypaimon.table.bucket_mode import BucketMode
-class TableScan:
- """Implementation of TableScan for native Python reading."""
-
- def __init__(self, table, predicate: Optional[Predicate], limit:
Optional[int],
- read_type: List[DataField]):
+class FullStartingScanner(StartingScanner):
+ def __init__(self, table, predicate: Optional[Predicate], limit:
Optional[int], read_type: List[DataField]):
from pypaimon.table.file_store_table import FileStoreTable
self.table: FileStoreTable = table
@@ -68,16 +65,13 @@ class TableScan:
self.only_read_real_buckets = True if int(
self.table.options.get('bucket', -1)) ==
BucketMode.POSTPONE_BUCKET.value else False
- self.data_evolution = self.table.options.get('data-evolution.enabled',
'false').lower() == 'true'
- def plan(self) -> Plan:
+ def scan(self) -> Plan:
file_entries = self.plan_files()
if not file_entries:
return Plan([])
if self.table.is_primary_key_table:
splits = self._create_primary_key_splits(file_entries)
- elif self.data_evolution:
- splits = self._create_data_evolution_splits(file_entries)
else:
splits = self._create_append_only_splits(file_entries)
@@ -256,48 +250,6 @@ class TableScan:
"row_count": file_entry.file.row_count,
})
- def _create_data_evolution_splits(self, file_entries: List[ManifestEntry])
-> List['Split']:
- """
- Create data evolution splits for append-only tables with schema
evolution.
- This method groups files by firstRowId and creates splits that can
handle
- column merging across different schema versions.
- """
- partitioned_files = defaultdict(list)
- for entry in file_entries:
- partitioned_files[(tuple(entry.partition.values),
entry.bucket)].append(entry)
-
- if self.idx_of_this_subtask is not None:
- partitioned_files, plan_start_row, plan_end_row =
self._append_only_filter_by_shard(partitioned_files)
-
- def weight_func(file_list: List[DataFileMeta]) -> int:
- return max(sum(f.file_size for f in file_list),
self.open_file_cost)
-
- splits = []
- for key, file_entries in partitioned_files.items():
- if not file_entries:
- continue
-
- data_files: List[DataFileMeta] = [e.file for e in file_entries]
-
- # Split files by firstRowId for data evolution
- split_by_row_id = self._split_by_row_id(data_files)
-
- # Pack the split groups for optimal split sizes
- packed_files: List[List[List[DataFileMeta]]] =
self._pack_for_ordered(split_by_row_id, weight_func,
-
self.target_split_size)
-
- # Flatten the packed files and build splits
- flatten_packed_files: List[List[DataFileMeta]] = [
- [file for sub_pack in pack for file in sub_pack]
- for pack in packed_files
- ]
-
- splits += self._build_split_from_pack(flatten_packed_files,
file_entries, False)
-
- if self.idx_of_this_subtask is not None:
- self._compute_split_start_end_row(splits, plan_start_row,
plan_end_row)
- return splits
-
def _create_append_only_splits(self, file_entries: List[ManifestEntry]) ->
List['Split']:
partitioned_files = defaultdict(list)
for entry in file_entries:
@@ -405,64 +357,3 @@ class TableScan:
packed.append(bin_items)
return packed
-
- @staticmethod
- def _is_blob_file(file_name: str) -> bool:
- """Check if a file is a blob file based on its extension."""
- return file_name.endswith('.blob')
-
- def _split_by_row_id(self, files: List[DataFileMeta]) ->
List[List[DataFileMeta]]:
- """
- Split files by firstRowId for data evolution.
- This method groups files that have the same firstRowId, which is
essential
- for handling schema evolution where files with different schemas need
to be
- read together to merge columns.
- """
- split_by_row_id = []
-
- # Sort files by firstRowId and then by maxSequenceNumber
- # Files with null firstRowId are treated as having Long.MIN_VALUE
- def sort_key(file: DataFileMeta) -> tuple:
- first_row_id = file.first_row_id if file.first_row_id is not None
else float('-inf')
- is_blob = 1 if self._is_blob_file(file.file_name) else 0
- # For files with same firstRowId, sort by maxSequenceNumber in
descending order
- # (larger sequence number means more recent data)
- max_seq = file.max_sequence_number
- return (first_row_id, is_blob, -max_seq)
-
- sorted_files = sorted(files, key=sort_key)
-
- # Split files by firstRowId
- last_row_id = -1
- check_row_id_start = 0
- current_split = []
-
- for file in sorted_files:
- first_row_id = file.first_row_id
- if first_row_id is None:
- # Files without firstRowId are treated as individual splits
- split_by_row_id.append([file])
- continue
-
- if not self._is_blob_file(file.file_name) and first_row_id !=
last_row_id:
- if current_split:
- split_by_row_id.append(current_split)
-
- # Validate that files don't overlap
- if first_row_id < check_row_id_start:
- file_names = [f.file_name for f in sorted_files]
- raise ValueError(
- f"There are overlapping files in the split:
{file_names}, "
- f"the wrong file is: {file.file_name}"
- )
-
- current_split = []
- last_row_id = first_row_id
- check_row_id_start = first_row_id + file.row_count
-
- current_split.append(file)
-
- if current_split:
- split_by_row_id.append(current_split)
-
- return split_by_row_id
diff --git
a/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py
b/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py
new file mode 100644
index 0000000000..f1a9cd03bc
--- /dev/null
+++ b/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py
@@ -0,0 +1,77 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+from typing import List, Optional
+
+from pypaimon.common.predicate import Predicate
+from pypaimon.manifest.schema.manifest_entry import ManifestEntry
+from pypaimon.read.scanner.full_starting_scanner import FullStartingScanner
+from pypaimon.schema.data_types import DataField
+from pypaimon.snapshot.snapshot_manager import SnapshotManager
+
+
+class IncrementalStartingScanner(FullStartingScanner):
+ def __init__(self, table, predicate: Optional[Predicate], limit:
Optional[int],
+ read_type: List[DataField], start: int, end: int):
+ super().__init__(table, predicate, limit, read_type)
+ self.startingSnapshotId = start
+ self.endingSnapshotId = end
+
+ def plan_files(self) -> List[ManifestEntry]:
+ snapshots_in_range = []
+ for snapshot_id in range(self.startingSnapshotId + 1,
self.endingSnapshotId + 1):
+ snapshot = self.snapshot_manager.get_snapshot_by_id(snapshot_id)
+ if snapshot.commit_kind == "APPEND":
+ snapshots_in_range.append(snapshot)
+
+ # Collect all file entries from all snapshots in range
+ file_entries = []
+
+ for snapshot in snapshots_in_range:
+ # Get manifest files for this snapshot
+ manifest_files = self.manifest_list_manager.read_delta(snapshot)
+
+ # Read all entries from manifest files
+ for manifest_file in manifest_files:
+ entries =
self.manifest_file_manager.read(manifest_file.file_name)
+ file_entries.extend(entries)
+ if self.predicate:
+ file_entries = self._filter_by_predicate(file_entries)
+ return file_entries
+
+ @staticmethod
+ def between_timestamps(table, predicate: Optional[Predicate], limit:
Optional[int],
+ read_type: List[DataField], start_timestamp: int,
+ end_timestamp: int) -> 'IncrementalStartingScanner':
+ """
+ Create an IncrementalStartingScanner for snapshots between two
timestamps.
+ """
+ snapshot_manager = SnapshotManager(table)
+ starting_snapshot =
snapshot_manager.earlier_or_equal_time_mills(start_timestamp)
+ earliest_snapshot = snapshot_manager.try_get_earliest_snapshot()
+
+ # If earliest_snapshot.time_millis > start_timestamp we should include
the earliest_snapshot
+ if starting_snapshot is None or (earliest_snapshot and
earliest_snapshot.time_millis > start_timestamp):
+ start_id = earliest_snapshot.id - 1 if earliest_snapshot else -1
+ else:
+ start_id = starting_snapshot.id
+
+ end_snapshot =
snapshot_manager.earlier_or_equal_time_mills(end_timestamp)
+ latest_snapshot = snapshot_manager.get_latest_snapshot()
+ end_id = end_snapshot.id if end_snapshot else (latest_snapshot.id if
latest_snapshot else -1)
+
+ return IncrementalStartingScanner(table, predicate, limit, read_type,
start_id, end_id)
diff --git a/paimon-python/pypaimon/read/scanner/starting_scanner.py
b/paimon-python/pypaimon/read/scanner/starting_scanner.py
new file mode 100644
index 0000000000..7e6cdfd81a
--- /dev/null
+++ b/paimon-python/pypaimon/read/scanner/starting_scanner.py
@@ -0,0 +1,28 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+from abc import ABC, abstractmethod
+
+from pypaimon.read.plan import Plan
+
+
+class StartingScanner(ABC):
+ """Helper class for the first planning of TableScan."""
+
+ @abstractmethod
+ def scan(self) -> Plan:
+ """Plan the files to read."""
diff --git a/paimon-python/pypaimon/read/table_read.py
b/paimon-python/pypaimon/read/table_read.py
index b33fb2c6ad..cf32d04d72 100644
--- a/paimon-python/pypaimon/read/table_read.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -15,7 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-from typing import Iterator, List, Optional, Any
+from typing import Any, Iterator, List, Optional
import pandas
import pyarrow
diff --git a/paimon-python/pypaimon/read/table_scan.py
b/paimon-python/pypaimon/read/table_scan.py
index d76725ca97..93a7babb30 100644
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -16,23 +16,19 @@
# limitations under the License.
################################################################################
-from collections import defaultdict
-from typing import Callable, List, Optional
+from typing import List, Optional
+from pypaimon.common.core_options import CoreOptions
from pypaimon.common.predicate import Predicate
-from pypaimon.common.predicate_builder import PredicateBuilder
-from pypaimon.manifest.manifest_file_manager import ManifestFileManager
-from pypaimon.manifest.manifest_list_manager import ManifestListManager
-from pypaimon.manifest.schema.data_file_meta import DataFileMeta
-from pypaimon.manifest.schema.manifest_entry import ManifestEntry
-from pypaimon.read.interval_partition import IntervalPartition, SortedRun
+
from pypaimon.read.plan import Plan
-from pypaimon.read.push_down_utils import (extract_predicate_to_dict,
- extract_predicate_to_list)
-from pypaimon.read.split import Split
+from pypaimon.read.scanner.empty_starting_scanner import EmptyStartingScanner
+from pypaimon.read.scanner.full_starting_scanner import FullStartingScanner
+from pypaimon.read.scanner.incremental_starting_scanner import \
+ IncrementalStartingScanner
+from pypaimon.read.scanner.starting_scanner import StartingScanner
from pypaimon.schema.data_types import DataField
from pypaimon.snapshot.snapshot_manager import SnapshotManager
-from pypaimon.table.bucket_mode import BucketMode
class TableScan:
@@ -46,423 +42,36 @@ class TableScan:
self.predicate = predicate
self.limit = limit
self.read_type = read_type
-
- self.snapshot_manager = SnapshotManager(table)
- self.manifest_list_manager = ManifestListManager(table)
- self.manifest_file_manager = ManifestFileManager(table)
-
- pk_conditions = []
- trimmed_pk = [field.name for field in
self.table.table_schema.get_trimmed_primary_key_fields()]
- extract_predicate_to_list(pk_conditions, self.predicate, trimmed_pk)
- self.primary_key_predicate =
PredicateBuilder(self.table.fields).and_predicates(pk_conditions)
-
- partition_conditions = defaultdict(list)
- extract_predicate_to_dict(partition_conditions, self.predicate,
self.table.partition_keys)
- self.partition_key_predicate = partition_conditions
-
- self.target_split_size = 128 * 1024 * 1024
- self.open_file_cost = 4 * 1024 * 1024
-
- self.idx_of_this_subtask = None
- self.number_of_para_subtasks = None
-
- self.only_read_real_buckets = True if int(
- self.table.options.get('bucket', -1)) ==
BucketMode.POSTPONE_BUCKET.value else False
- self.data_evolution = self.table.options.get('data-evolution.enabled',
'false').lower() == 'true'
+ self.starting_scanner = self._create_starting_scanner()
def plan(self) -> Plan:
- file_entries = self.plan_files()
- if not file_entries:
- return Plan([])
- if self.table.is_primary_key_table:
- splits = self._create_primary_key_splits(file_entries)
- elif self.data_evolution:
- splits = self._create_data_evolution_splits(file_entries)
- else:
- splits = self._create_append_only_splits(file_entries)
-
- splits = self._apply_push_down_limit(splits)
- return Plan(splits)
-
- def plan_files(self) -> List[ManifestEntry]:
- latest_snapshot = self.snapshot_manager.get_latest_snapshot()
- if not latest_snapshot:
- return []
- manifest_files = self.manifest_list_manager.read_all(latest_snapshot)
-
- deleted_entries = set()
- added_entries = []
- # TODO: filter manifest files by predicate
- for manifest_file in manifest_files:
- manifest_entries =
self.manifest_file_manager.read(manifest_file.file_name,
- lambda row:
self._bucket_filter(row))
- for entry in manifest_entries:
- if entry.kind == 0:
- added_entries.append(entry)
- else:
- deleted_entries.add((tuple(entry.partition.values),
entry.bucket, entry.file.file_name))
-
- file_entries = [
- entry for entry in added_entries
- if (tuple(entry.partition.values), entry.bucket,
entry.file.file_name) not in deleted_entries
- ]
- if self.predicate:
- file_entries = self._filter_by_predicate(file_entries)
- return file_entries
+ return self.starting_scanner.scan()
+
+ def _create_starting_scanner(self) -> Optional[StartingScanner]:
+ options = self.table.options
+ if CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP in options:
+ ts = options[CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP].split(",")
+ if len(ts) != 2:
+ raise ValueError(
+ "The incremental-between-timestamp must specific
start(exclusive) and end timestamp. But is: " +
+ options[CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP])
+ earliest_snapshot =
SnapshotManager(self.table).try_get_earliest_snapshot()
+ latest_snapshot = SnapshotManager(self.table).get_latest_snapshot()
+ if earliest_snapshot is None or latest_snapshot is None:
+ return EmptyStartingScanner()
+ start_timestamp = int(ts[0])
+ end_timestamp = int(ts[1])
+ if start_timestamp >= end_timestamp:
+ raise ValueError(
+ "Ending timestamp %s should be >= starting timestamp %s."
% (end_timestamp, start_timestamp))
+ if (start_timestamp == end_timestamp or start_timestamp >
latest_snapshot.time_millis
+ or end_timestamp < earliest_snapshot.time_millis):
+ return EmptyStartingScanner()
+ return IncrementalStartingScanner.between_timestamps(self.table,
self.predicate, self.limit, self.read_type,
+
start_timestamp,
+ end_timestamp)
+ return FullStartingScanner(self.table, self.predicate, self.limit,
self.read_type)
def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) ->
'TableScan':
- if idx_of_this_subtask >= number_of_para_subtasks:
- raise Exception("idx_of_this_subtask must be less than
number_of_para_subtasks")
- self.idx_of_this_subtask = idx_of_this_subtask
- self.number_of_para_subtasks = number_of_para_subtasks
+ self.starting_scanner.with_shard(idx_of_this_subtask,
number_of_para_subtasks)
return self
-
- def _append_only_filter_by_shard(self, partitioned_files: defaultdict) ->
(defaultdict, int, int):
- total_row = 0
- # Sort by file creation time to ensure consistent sharding
- for key, file_entries in partitioned_files.items():
- for entry in file_entries:
- total_row += entry.file.row_count
-
- # Calculate number of rows this shard should process
- # Last shard handles all remaining rows (handles non-divisible cases)
- if self.idx_of_this_subtask == self.number_of_para_subtasks - 1:
- num_row = total_row - total_row // self.number_of_para_subtasks *
self.idx_of_this_subtask
- else:
- num_row = total_row // self.number_of_para_subtasks
- # Calculate start row and end row position for current shard in all
data
- start_row = self.idx_of_this_subtask * (total_row //
self.number_of_para_subtasks)
- end_row = start_row + num_row
-
- plan_start_row = 0
- plan_end_row = 0
- entry_end_row = 0 # end row position of current file in all data
- splits_start_row = 0
- filtered_partitioned_files = defaultdict(list)
- # Iterate through all file entries to find files that overlap with
current shard range
- for key, file_entries in partitioned_files.items():
- filtered_entries = []
- for entry in file_entries:
- entry_begin_row = entry_end_row # Starting row position of
current file in all data
- entry_end_row += entry.file.row_count # Update to row
position after current file
-
- # If current file is completely after shard range, stop
iteration
- if entry_begin_row >= end_row:
- break
- # If current file is completely before shard range, skip it
- if entry_end_row <= start_row:
- continue
- if entry_begin_row <= start_row < entry_end_row:
- splits_start_row = entry_begin_row
- plan_start_row = start_row - entry_begin_row
- # If shard end position is within current file, record
relative end position
- if entry_begin_row < end_row <= entry_end_row:
- plan_end_row = end_row - splits_start_row
- # Add files that overlap with shard range to result
- filtered_entries.append(entry)
- if filtered_entries:
- filtered_partitioned_files[key] = filtered_entries
-
- return filtered_partitioned_files, plan_start_row, plan_end_row
-
- def _compute_split_start_end_row(self, splits: List[Split],
plan_start_row, plan_end_row):
- file_end_row = 0 # end row position of current file in all data
- for split in splits:
- files = split.files
- split_start_row = file_end_row
- # Iterate through all file entries to find files that overlap with
current shard range
- for file in files:
- file_begin_row = file_end_row # Starting row position of
current file in all data
- file_end_row += file.row_count # Update to row position after
current file
-
- # If shard start position is within current file, record
actual start position and relative offset
- if file_begin_row <= plan_start_row < file_end_row:
- split.split_start_row = plan_start_row - file_begin_row
-
- # If shard end position is within current file, record
relative end position
- if file_begin_row < plan_end_row <= file_end_row:
- split.split_end_row = plan_end_row - split_start_row
- if split.split_start_row is None:
- split.split_start_row = 0
- if split.split_end_row is None:
- split.split_end_row = split.row_count
-
- def _primary_key_filter_by_shard(self, file_entries: List[ManifestEntry])
-> List[ManifestEntry]:
- filtered_entries = []
- for entry in file_entries:
- if entry.bucket % self.number_of_para_subtasks ==
self.idx_of_this_subtask:
- filtered_entries.append(entry)
- return filtered_entries
-
- def _bucket_filter(self, entry: Optional[ManifestEntry]) -> bool:
- bucket = entry.bucket
- if self.only_read_real_buckets and bucket < 0:
- return False
- return True
-
- def _apply_push_down_limit(self, splits: List[Split]) -> List[Split]:
- if self.limit is None:
- return splits
- scanned_row_count = 0
- limited_splits = []
-
- for split in splits:
- if split.raw_convertible:
- limited_splits.append(split)
- scanned_row_count += split.row_count
- if scanned_row_count >= self.limit:
- return limited_splits
-
- return limited_splits
-
- def _filter_by_predicate(self, file_entries: List[ManifestEntry]) ->
List[ManifestEntry]:
- if not self.predicate:
- return file_entries
-
- filtered_files = []
- for file_entry in file_entries:
- if self.partition_key_predicate and not
self._filter_by_partition(file_entry):
- continue
- if not self._filter_by_stats(file_entry):
- continue
- filtered_files.append(file_entry)
-
- return filtered_files
-
- def _filter_by_partition(self, file_entry: ManifestEntry) -> bool:
- partition_dict = file_entry.partition.to_dict()
- for field_name, conditions in self.partition_key_predicate.items():
- partition_value = partition_dict[field_name]
- for predicate in conditions:
- if not predicate.test_by_value(partition_value):
- return False
- return True
-
- def _filter_by_stats(self, file_entry: ManifestEntry) -> bool:
- if file_entry.kind != 0:
- return False
- if self.table.is_primary_key_table:
- predicate = self.primary_key_predicate
- stats = file_entry.file.key_stats
- else:
- predicate = self.predicate
- stats = file_entry.file.value_stats
- return predicate.test_by_stats({
- "min_values": stats.min_values.to_dict(),
- "max_values": stats.max_values.to_dict(),
- "null_counts": {
- stats.min_values.fields[i].name: stats.null_counts[i] for i in
range(len(stats.min_values.fields))
- },
- "row_count": file_entry.file.row_count,
- })
-
- def _create_data_evolution_splits(self, file_entries: List[ManifestEntry])
-> List['Split']:
- """
- Create data evolution splits for append-only tables with schema
evolution.
- This method groups files by firstRowId and creates splits that can
handle
- column merging across different schema versions.
- """
- partitioned_files = defaultdict(list)
- for entry in file_entries:
- partitioned_files[(tuple(entry.partition.values),
entry.bucket)].append(entry)
-
- if self.idx_of_this_subtask is not None:
- partitioned_files, plan_start_row, plan_end_row =
self._append_only_filter_by_shard(partitioned_files)
-
- def weight_func(file_list: List[DataFileMeta]) -> int:
- return max(sum(f.file_size for f in file_list),
self.open_file_cost)
-
- splits = []
- for key, file_entries in partitioned_files.items():
- if not file_entries:
- continue
-
- data_files: List[DataFileMeta] = [e.file for e in file_entries]
-
- # Split files by firstRowId for data evolution
- split_by_row_id = self._split_by_row_id(data_files)
-
- # Pack the split groups for optimal split sizes
- packed_files: List[List[List[DataFileMeta]]] =
self._pack_for_ordered(split_by_row_id, weight_func,
-
self.target_split_size)
-
- # Flatten the packed files and build splits
- flatten_packed_files: List[List[DataFileMeta]] = [
- [file for sub_pack in pack for file in sub_pack]
- for pack in packed_files
- ]
-
- splits += self._build_split_from_pack(flatten_packed_files,
file_entries, False)
-
- if self.idx_of_this_subtask is not None:
- self._compute_split_start_end_row(splits, plan_start_row,
plan_end_row)
- return splits
-
- def _create_append_only_splits(self, file_entries: List[ManifestEntry]) ->
List['Split']:
- partitioned_files = defaultdict(list)
- for entry in file_entries:
- partitioned_files[(tuple(entry.partition.values),
entry.bucket)].append(entry)
-
- if self.idx_of_this_subtask is not None:
- partitioned_files, plan_start_row, plan_end_row =
self._append_only_filter_by_shard(partitioned_files)
-
- def weight_func(f: DataFileMeta) -> int:
- return max(f.file_size, self.open_file_cost)
-
- splits = []
- for key, file_entries in partitioned_files.items():
- if not file_entries:
- return []
-
- data_files: List[DataFileMeta] = [e.file for e in file_entries]
-
- packed_files: List[List[DataFileMeta]] =
self._pack_for_ordered(data_files, weight_func,
-
self.target_split_size)
- splits += self._build_split_from_pack(packed_files, file_entries,
False)
- if self.idx_of_this_subtask is not None:
- self._compute_split_start_end_row(splits, plan_start_row,
plan_end_row)
- return splits
-
- def _create_primary_key_splits(self, file_entries: List[ManifestEntry]) ->
List['Split']:
- if self.idx_of_this_subtask is not None:
- file_entries = self._primary_key_filter_by_shard(file_entries)
- partitioned_files = defaultdict(list)
- for entry in file_entries:
- partitioned_files[(tuple(entry.partition.values),
entry.bucket)].append(entry)
-
- def weight_func(fl: List[DataFileMeta]) -> int:
- return max(sum(f.file_size for f in fl), self.open_file_cost)
-
- splits = []
- for key, file_entries in partitioned_files.items():
- if not file_entries:
- return []
-
- data_files: List[DataFileMeta] = [e.file for e in file_entries]
- partition_sort_runs: List[List[SortedRun]] =
IntervalPartition(data_files).partition()
- sections: List[List[DataFileMeta]] = [
- [file for s in sl for file in s.files]
- for sl in partition_sort_runs
- ]
-
- packed_files: List[List[List[DataFileMeta]]] =
self._pack_for_ordered(sections, weight_func,
-
self.target_split_size)
- flatten_packed_files: List[List[DataFileMeta]] = [
- [file for sub_pack in pack for file in sub_pack]
- for pack in packed_files
- ]
- splits += self._build_split_from_pack(flatten_packed_files,
file_entries, True)
- return splits
-
- def _build_split_from_pack(self, packed_files, file_entries,
for_primary_key_split: bool) -> List['Split']:
- splits = []
- for file_group in packed_files:
- raw_convertible = True
- if for_primary_key_split:
- raw_convertible = len(file_group) == 1
-
- file_paths = []
- total_file_size = 0
- total_record_count = 0
-
- for data_file in file_group:
- data_file.set_file_path(self.table.table_path,
file_entries[0].partition,
- file_entries[0].bucket)
- file_paths.append(data_file.file_path)
- total_file_size += data_file.file_size
- total_record_count += data_file.row_count
-
- if file_paths:
- split = Split(
- files=file_group,
- partition=file_entries[0].partition,
- bucket=file_entries[0].bucket,
- _file_paths=file_paths,
- _row_count=total_record_count,
- _file_size=total_file_size,
- raw_convertible=raw_convertible
- )
- splits.append(split)
- return splits
-
- @staticmethod
- def _pack_for_ordered(items: List, weight_func: Callable, target_weight:
int) -> List[List]:
- packed = []
- bin_items = []
- bin_weight = 0
-
- for item in items:
- weight = weight_func(item)
- if bin_weight + weight > target_weight and len(bin_items) > 0:
- packed.append(list(bin_items))
- bin_items.clear()
- bin_weight = 0
-
- bin_weight += weight
- bin_items.append(item)
-
- if len(bin_items) > 0:
- packed.append(bin_items)
-
- return packed
-
- @staticmethod
- def _is_blob_file(file_name: str) -> bool:
- """Check if a file is a blob file based on its extension."""
- return file_name.endswith('.blob')
-
- def _split_by_row_id(self, files: List[DataFileMeta]) ->
List[List[DataFileMeta]]:
- """
- Split files by firstRowId for data evolution.
- This method groups files that have the same firstRowId, which is
essential
- for handling schema evolution where files with different schemas need
to be
- read together to merge columns.
- """
- split_by_row_id = []
-
- # Sort files by firstRowId and then by maxSequenceNumber
- # Files with null firstRowId are treated as having Long.MIN_VALUE
- def sort_key(file: DataFileMeta) -> tuple:
- first_row_id = file.first_row_id if file.first_row_id is not None
else float('-inf')
- is_blob = 1 if self._is_blob_file(file.file_name) else 0
- # For files with same firstRowId, sort by maxSequenceNumber in
descending order
- # (larger sequence number means more recent data)
- max_seq = file.max_sequence_number
- return (first_row_id, is_blob, -max_seq)
-
- sorted_files = sorted(files, key=sort_key)
-
- # Split files by firstRowId
- last_row_id = -1
- check_row_id_start = 0
- current_split = []
-
- for file in sorted_files:
- first_row_id = file.first_row_id
- if first_row_id is None:
- # Files without firstRowId are treated as individual splits
- split_by_row_id.append([file])
- continue
-
- if not self._is_blob_file(file.file_name) and first_row_id !=
last_row_id:
- if current_split:
- split_by_row_id.append(current_split)
-
- # Validate that files don't overlap
- if first_row_id < check_row_id_start:
- file_names = [f.file_name for f in sorted_files]
- raise ValueError(
- f"There are overlapping files in the split:
{file_names}, "
- f"the wrong file is: {file.file_name}"
- )
-
- current_split = []
- last_row_id = first_row_id
- check_row_id_start = first_row_id + file.row_count
-
- current_split.append(file)
-
- if current_split:
- split_by_row_id.append(current_split)
-
- return split_by_row_id
diff --git a/paimon-python/pypaimon/snapshot/snapshot_manager.py
b/paimon-python/pypaimon/snapshot/snapshot_manager.py
index 2ded357802..87b42c6f35 100644
--- a/paimon-python/pypaimon/snapshot/snapshot_manager.py
+++ b/paimon-python/pypaimon/snapshot/snapshot_manager.py
@@ -59,3 +59,58 @@ class SnapshotManager:
Path to the snapshot file
"""
return self.snapshot_dir / f"snapshot-{snapshot_id}"
+
+ def try_get_earliest_snapshot(self) -> Optional[Snapshot]:
+ if self.file_io.exists(self.snapshot_dir / "EARLIEST"):
+ earliest_content = self.file_io.read_file_utf8(self.snapshot_dir /
"EARLIEST")
+ earliest_snapshot_id = int(earliest_content.strip())
+ return self.get_snapshot_by_id(earliest_snapshot_id)
+ else:
+ return self.get_snapshot_by_id(1)
+
+ def earlier_or_equal_time_mills(self, timestamp: int) ->
Optional[Snapshot]:
+ """
+ Find the latest snapshot with time_millis <= the given timestamp.
+
+ Args:
+ timestamp: The timestamp to compare against
+
+ Returns:
+ The latest snapshot with time_millis <= timestamp, or None if no
such snapshot exists
+ """
+ earliest = 1
+ latest = self.get_latest_snapshot().id
+ final_snapshot = None
+
+ while earliest <= latest:
+ mid = earliest + (latest - earliest) // 2
+ snapshot = self.get_snapshot_by_id(mid)
+ commit_time = snapshot.time_millis
+
+ if commit_time > timestamp:
+ latest = mid - 1
+ elif commit_time < timestamp:
+ earliest = mid + 1
+ final_snapshot = snapshot
+ else:
+ final_snapshot = snapshot
+ break
+
+ return final_snapshot
+
+ def get_snapshot_by_id(self, snapshot_id: int) -> Optional[Snapshot]:
+ """
+ Get a snapshot by its ID.
+
+ Args:
+ snapshot_id: The snapshot ID
+
+ Returns:
+ The snapshot with the specified ID, or None if not found
+ """
+ snapshot_file = self.get_snapshot_path(snapshot_id)
+ if not self.file_io.exists(snapshot_file):
+ return None
+
+ snapshot_content = self.file_io.read_file_utf8(snapshot_file)
+ return JSON.from_json(snapshot_content, Snapshot)
diff --git a/paimon-python/pypaimon/table/file_store_table.py
b/paimon-python/pypaimon/table/file_store_table.py
index e0d7c7e603..57ccbf83bb 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -104,3 +104,20 @@ class FileStoreTable(Table):
return DynamicBucketRowKeyExtractor(self.table_schema)
else:
raise ValueError(f"Unsupported bucket mode: {bucket_mode}")
+
+ def copy(self, options: dict) -> 'FileStoreTable':
+ if CoreOptions.BUCKET in options and options.get(CoreOptions.BUCKET)
!= self.options.get(CoreOptions.BUCKET):
+ raise ValueError("Cannot change bucket number")
+ new_options = self.options.copy()
+ for k, v in options.items():
+ if v is None:
+ new_options.pop(k)
+ else:
+ new_options[k] = v
+ new_table_schema = self.table_schema.copy(new_options=new_options)
+ return FileStoreTable(self.file_io, self.identifier, self.table_path,
new_table_schema,
+ self.catalog_environment)
+
+ def add_options(self, options: dict):
+ for key, value in options.items():
+ self.options[key] = value
diff --git a/paimon-python/pypaimon/tests/filesystem_catalog_test.py
b/paimon-python/pypaimon/tests/filesystem_catalog_test.py
index d6b9433cba..530e33aa78 100644
--- a/paimon-python/pypaimon/tests/filesystem_catalog_test.py
+++ b/paimon-python/pypaimon/tests/filesystem_catalog_test.py
@@ -19,13 +19,12 @@ import shutil
import tempfile
import unittest
+from pypaimon import CatalogFactory, Schema
from pypaimon.catalog.catalog_exception import (DatabaseAlreadyExistException,
DatabaseNotExistException,
TableAlreadyExistException,
TableNotExistException)
-from pypaimon import CatalogFactory
from pypaimon.schema.data_types import AtomicType, DataField
-from pypaimon import Schema
from pypaimon.table.file_store_table import FileStoreTable
diff --git a/paimon-python/pypaimon/tests/predicates_test.py
b/paimon-python/pypaimon/tests/predicates_test.py
index 7976094290..9ab1cfbb37 100644
--- a/paimon-python/pypaimon/tests/predicates_test.py
+++ b/paimon-python/pypaimon/tests/predicates_test.py
@@ -23,8 +23,7 @@ import unittest
import pandas as pd
import pyarrow as pa
-from pypaimon import CatalogFactory
-from pypaimon import Schema
+from pypaimon import CatalogFactory, Schema
def _check_filtered_result(read_builder, expected_df):
diff --git a/paimon-python/pypaimon/tests/py36/ao_predicate_test.py
b/paimon-python/pypaimon/tests/py36/ao_predicate_test.py
index 92a3e9601a..b06a69baa8 100644
--- a/paimon-python/pypaimon/tests/py36/ao_predicate_test.py
+++ b/paimon-python/pypaimon/tests/py36/ao_predicate_test.py
@@ -22,12 +22,12 @@ import unittest
import pandas as pd
import pyarrow as pa
-from pypaimon import CatalogFactory
-from pypaimon import Schema
-from pypaimon.tests.predicates_test import _random_format,
_check_filtered_result
+from pypaimon import CatalogFactory, Schema
+from pypaimon.tests.predicates_test import (_check_filtered_result,
+ _random_format)
-class PredicatePy36Test(unittest.TestCase):
+class AOPredicatePy36Test(unittest.TestCase):
@classmethod
def setUpClass(cls):
diff --git a/paimon-python/pypaimon/tests/py36/ao_simple_test.py
b/paimon-python/pypaimon/tests/py36/ao_simple_test.py
index e2a61df301..efb3189e06 100644
--- a/paimon-python/pypaimon/tests/py36/ao_simple_test.py
+++ b/paimon-python/pypaimon/tests/py36/ao_simple_test.py
@@ -20,8 +20,10 @@ from unittest.mock import patch
import pyarrow as pa
from pypaimon import Schema
-from pypaimon.catalog.catalog_exception import TableNotExistException,
TableAlreadyExistException, \
- DatabaseNotExistException, DatabaseAlreadyExistException
+from pypaimon.catalog.catalog_exception import (DatabaseAlreadyExistException,
+ DatabaseNotExistException,
+ TableAlreadyExistException,
+ TableNotExistException)
from pypaimon.common.config import OssOptions
from pypaimon.common.file_io import FileIO
from pypaimon.tests.py36.pyarrow_compat import table_sort_by
diff --git a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
index e6374132af..fe6902a5f5 100644
--- a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
+++ b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
@@ -16,34 +16,36 @@ See the License for the specific language governing
permissions and
limitations under the License.
"""
import logging
-from datetime import datetime, date
+import time
+from datetime import date, datetime
from decimal import Decimal
from unittest.mock import Mock
-import pandas as pd
import numpy as np
+import pandas as pd
import pyarrow as pa
+from pypaimon import CatalogFactory, Schema
from pypaimon.api.options import Options
from pypaimon.catalog.catalog_context import CatalogContext
-from pypaimon import CatalogFactory
from pypaimon.catalog.rest.rest_catalog import RESTCatalog
+from pypaimon.common.core_options import CoreOptions
from pypaimon.common.identifier import Identifier
from pypaimon.manifest.manifest_file_manager import ManifestFileManager
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.manifest.schema.manifest_entry import ManifestEntry
from pypaimon.manifest.schema.simple_stats import SimpleStats
-from pypaimon.schema.data_types import DataField, AtomicType
-from pypaimon import Schema
-from pypaimon.table.row.generic_row import GenericRow, GenericRowSerializer,
GenericRowDeserializer
+from pypaimon.schema.data_types import AtomicType, DataField
+from pypaimon.snapshot.snapshot_manager import SnapshotManager
+from pypaimon.table.row.generic_row import (GenericRow, GenericRowDeserializer,
+ GenericRowSerializer)
from pypaimon.table.row.row_kind import RowKind
from pypaimon.tests.py36.pyarrow_compat import table_sort_by
from pypaimon.tests.rest.rest_base_test import RESTBaseTest
-
from pypaimon.write.file_store_commit import FileStoreCommit
-class RESTReadWritePy36Test(RESTBaseTest):
+class RESTAOReadWritePy36Test(RESTBaseTest):
def test_overwrite(self):
simple_pa_schema = pa.schema([
@@ -175,10 +177,10 @@ class RESTReadWritePy36Test(RESTBaseTest):
self.assertEqual(actual_data, expect_data)
# to test GenericRow ability
- latest_snapshot = table_scan.snapshot_manager.get_latest_snapshot()
- manifest_files =
table_scan.manifest_list_manager.read_all(latest_snapshot)
- manifest_entries =
table_scan.manifest_file_manager.read(manifest_files[0].file_name,
- lambda row:
table_scan._bucket_filter(row))
+ latest_snapshot = SnapshotManager(table).get_latest_snapshot()
+ manifest_files =
table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot)
+ manifest_entries =
table_scan.starting_scanner.manifest_file_manager.read(
+ manifest_files[0].file_name, lambda row:
table_scan.starting_scanner._bucket_filter(row))
min_value_stats =
manifest_entries[0].file.value_stats.min_values.values
max_value_stats =
manifest_entries[0].file.value_stats.max_values.values
expected_min_values = [col[0].as_py() for col in expect_data]
@@ -737,6 +739,33 @@ class RESTReadWritePy36Test(RESTBaseTest):
test_name="specific_case"
)
+ def test_incremental_timestamp(self):
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'])
+ self.rest_catalog.create_table('default.test_incremental_parquet',
schema, False)
+ table = self.rest_catalog.get_table('default.test_incremental_parquet')
+ timestamp = int(time.time() * 1000)
+ self._write_test_table(table)
+
+ snapshot_manager = SnapshotManager(table)
+ t1 = snapshot_manager.get_snapshot_by_id(1).time_millis
+ t2 = snapshot_manager.get_snapshot_by_id(2).time_millis
+ # test 1
+ table = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP:
str(timestamp - 1) + ',' + str(timestamp)})
+ read_builder = table.new_read_builder()
+ actual = self._read_test_table(read_builder)
+ self.assertEqual(len(actual), 0)
+ # test 2
+ table = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP:
str(timestamp) + ',' + str(t2)})
+ read_builder = table.new_read_builder()
+ actual = table_sort_by(self._read_test_table(read_builder), 'user_id')
+ self.assertEqual(self.expected, actual)
+ # test 3
+ table = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP: str(t1)
+ ',' + str(t2)})
+ read_builder = table.new_read_builder()
+ actual = table_sort_by(self._read_test_table(read_builder), 'user_id')
+ expected = self.expected.slice(4, 4)
+ self.assertEqual(expected, actual)
+
def _test_value_stats_cols_case(self, manifest_manager, table,
value_stats_cols, expected_fields_count, test_name):
"""Helper method to test a specific _VALUE_STATS_COLS case."""
diff --git a/paimon-python/pypaimon/tests/reader_append_only_test.py
b/paimon-python/pypaimon/tests/reader_append_only_test.py
index db0cbcccd1..0367ab409c 100644
--- a/paimon-python/pypaimon/tests/reader_append_only_test.py
+++ b/paimon-python/pypaimon/tests/reader_append_only_test.py
@@ -18,14 +18,16 @@
import os
import tempfile
+import time
import unittest
-import pyarrow as pa
import numpy as np
import pandas as pd
+import pyarrow as pa
-from pypaimon import CatalogFactory
-from pypaimon import Schema
+from pypaimon import CatalogFactory, Schema
+from pypaimon.common.core_options import CoreOptions
+from pypaimon.snapshot.snapshot_manager import SnapshotManager
class AoReaderTest(unittest.TestCase):
@@ -277,6 +279,69 @@ class AoReaderTest(unittest.TestCase):
# might be split of "dt=1" or split of "dt=2"
self.assertEqual(actual.num_rows, 4)
+ def test_incremental_timestamp(self):
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'])
+ self.catalog.create_table('default.test_incremental_parquet', schema,
False)
+ table = self.catalog.get_table('default.test_incremental_parquet')
+ timestamp = int(time.time() * 1000)
+ self._write_test_table(table)
+
+ snapshot_manager = SnapshotManager(table)
+ t1 = snapshot_manager.get_snapshot_by_id(1).time_millis
+ t2 = snapshot_manager.get_snapshot_by_id(2).time_millis
+ # test 1
+ table = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP:
str(timestamp - 1) + ',' + str(timestamp)})
+ read_builder = table.new_read_builder()
+ actual = self._read_test_table(read_builder)
+ self.assertEqual(len(actual), 0)
+ # test 2
+ table = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP:
str(timestamp) + ',' + str(t2)})
+ read_builder = table.new_read_builder()
+ actual = self._read_test_table(read_builder).sort_by('user_id')
+ self.assertEqual(self.expected, actual)
+ # test 3
+ table = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP: str(t1)
+ ',' + str(t2)})
+ read_builder = table.new_read_builder()
+ actual = self._read_test_table(read_builder).sort_by('user_id')
+ expected = self.expected.slice(4, 4)
+ self.assertEqual(expected, actual)
+
+ def test_incremental_read_multi_snapshots(self):
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'])
+ self.catalog.create_table('default.test_incremental_100', schema,
False)
+ table = self.catalog.get_table('default.test_incremental_100')
+
+ write_builder = table.new_batch_write_builder()
+ for i in range(1, 101):
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ pa_table = pa.Table.from_pydict({
+ 'user_id': [i],
+ 'item_id': [1000 + i],
+ 'behavior': [f'snap{i}'],
+ 'dt': ['p1' if i % 2 == 1 else 'p2'],
+ }, schema=self.pa_schema)
+ table_write.write_arrow(pa_table)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ snapshot_manager = SnapshotManager(table)
+ t10 = snapshot_manager.get_snapshot_by_id(10).time_millis
+ t20 = snapshot_manager.get_snapshot_by_id(20).time_millis
+
+ table_inc = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP:
f"{t10},{t20}"})
+ read_builder = table_inc.new_read_builder()
+ actual = self._read_test_table(read_builder).sort_by('user_id')
+
+ expected = pa.Table.from_pydict({
+ 'user_id': list(range(11, 21)),
+ 'item_id': [1000 + i for i in range(11, 21)],
+ 'behavior': [f'snap{i}' for i in range(11, 21)],
+ 'dt': ['p1' if i % 2 == 1 else 'p2' for i in range(11, 21)],
+ }, schema=self.pa_schema).sort_by('user_id')
+ self.assertEqual(expected, actual)
+
def _write_test_table(self, table):
write_builder = table.new_batch_write_builder()
diff --git a/paimon-python/pypaimon/tests/reader_base_test.py
b/paimon-python/pypaimon/tests/reader_base_test.py
index 6e9dc1ffc6..6bb2cdd675 100644
--- a/paimon-python/pypaimon/tests/reader_base_test.py
+++ b/paimon-python/pypaimon/tests/reader_base_test.py
@@ -16,29 +16,28 @@
# limitations under the License.
################################################################################
-import os
import glob
+import os
import shutil
import tempfile
import unittest
-from datetime import datetime, date, time
+from datetime import date, datetime, time
from decimal import Decimal
from unittest.mock import Mock
import pandas as pd
import pyarrow as pa
-from pypaimon.table.row.generic_row import GenericRow
-
-from pypaimon.schema.data_types import (ArrayType, AtomicType, DataField,
- MapType, PyarrowFieldParser)
-from pypaimon.schema.table_schema import TableSchema
-from pypaimon import CatalogFactory
-from pypaimon import Schema
+from pypaimon import CatalogFactory, Schema
from pypaimon.manifest.manifest_file_manager import ManifestFileManager
-from pypaimon.manifest.schema.simple_stats import SimpleStats
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.manifest.schema.manifest_entry import ManifestEntry
+from pypaimon.manifest.schema.simple_stats import SimpleStats
+from pypaimon.schema.data_types import (ArrayType, AtomicType, DataField,
+ MapType, PyarrowFieldParser)
+from pypaimon.schema.table_schema import TableSchema
+from pypaimon.snapshot.snapshot_manager import SnapshotManager
+from pypaimon.table.row.generic_row import GenericRow
from pypaimon.write.file_store_commit import FileStoreCommit
@@ -215,10 +214,10 @@ class ReaderBasicTest(unittest.TestCase):
self.assertEqual(actual_data, expect_data)
# to test GenericRow ability
- latest_snapshot = table_scan.snapshot_manager.get_latest_snapshot()
- manifest_files =
table_scan.manifest_list_manager.read_all(latest_snapshot)
- manifest_entries =
table_scan.manifest_file_manager.read(manifest_files[0].file_name,
- lambda row:
table_scan._bucket_filter(row))
+ latest_snapshot = SnapshotManager(table).get_latest_snapshot()
+ manifest_files =
table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot)
+ manifest_entries =
table_scan.starting_scanner.manifest_file_manager.read(
+ manifest_files[0].file_name, lambda row:
table_scan.starting_scanner._bucket_filter(row))
min_value_stats =
manifest_entries[0].file.value_stats.min_values.values
max_value_stats =
manifest_entries[0].file.value_stats.max_values.values
expected_min_values = [col[0].as_py() for col in expect_data]
diff --git a/paimon-python/pypaimon/tests/reader_primary_key_test.py
b/paimon-python/pypaimon/tests/reader_primary_key_test.py
index b992595fc9..bcbc94bd68 100644
--- a/paimon-python/pypaimon/tests/reader_primary_key_test.py
+++ b/paimon-python/pypaimon/tests/reader_primary_key_test.py
@@ -19,12 +19,14 @@
import os
import shutil
import tempfile
+import time
import unittest
import pyarrow as pa
-from pypaimon import CatalogFactory
-from pypaimon import Schema
+from pypaimon import CatalogFactory, Schema
+from pypaimon.common.core_options import CoreOptions
+from pypaimon.snapshot.snapshot_manager import SnapshotManager
class PkReaderTest(unittest.TestCase):
@@ -184,6 +186,79 @@ class PkReaderTest(unittest.TestCase):
expected = self.expected.select(['dt', 'user_id', 'behavior'])
self.assertEqual(actual, expected)
+ def test_incremental_timestamp(self):
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
+ partition_keys=['dt'],
+ primary_keys=['user_id', 'dt'],
+ options={'bucket': '2'})
+ self.catalog.create_table('default.test_incremental_parquet', schema,
False)
+ table = self.catalog.get_table('default.test_incremental_parquet')
+ timestamp = int(time.time() * 1000)
+ self._write_test_table(table)
+
+ snapshot_manager = SnapshotManager(table)
+ t1 = snapshot_manager.get_snapshot_by_id(1).time_millis
+ t2 = snapshot_manager.get_snapshot_by_id(2).time_millis
+ # test 1
+ table = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP:
str(timestamp - 1) + ',' + str(timestamp)})
+ read_builder = table.new_read_builder()
+ actual = self._read_test_table(read_builder)
+ self.assertEqual(len(actual), 0)
+ # test 2
+ table = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP:
str(timestamp) + ',' + str(t2)})
+ read_builder = table.new_read_builder()
+ actual = self._read_test_table(read_builder).sort_by('user_id')
+ self.assertEqual(self.expected, actual)
+ # test 3
+ table = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP: str(t1)
+ ',' + str(t2)})
+ read_builder = table.new_read_builder()
+ actual = self._read_test_table(read_builder).sort_by('user_id')
+ expected = pa.Table.from_pydict({
+ "user_id": [2, 5, 7, 8],
+ "item_id": [1002, 1005, 1007, 1008],
+ "behavior": ["b-new", "e", "g", "h"],
+ "dt": ["p1", "p2", "p1", "p2"]
+ }, schema=self.pa_schema)
+ self.assertEqual(expected, actual)
+
+ def test_incremental_read_multi_snapshots(self):
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
+ partition_keys=['dt'],
+ primary_keys=['user_id', 'dt'],
+ options={'bucket': '2'})
+
self.catalog.create_table('default.test_incremental_read_multi_snapshots',
schema, False)
+ table =
self.catalog.get_table('default.test_incremental_read_multi_snapshots')
+ write_builder = table.new_batch_write_builder()
+ for i in range(1, 101):
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ pa_table = pa.Table.from_pydict({
+ 'user_id': [i],
+ 'item_id': [1000 + i],
+ 'behavior': [f'snap{i}'],
+ 'dt': ['p1' if i % 2 == 1 else 'p2'],
+ }, schema=self.pa_schema)
+ table_write.write_arrow(pa_table)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ snapshot_manager = SnapshotManager(table)
+ t10 = snapshot_manager.get_snapshot_by_id(10).time_millis
+ t20 = snapshot_manager.get_snapshot_by_id(20).time_millis
+
+ table_inc = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP:
f"{t10},{t20}"})
+ read_builder = table_inc.new_read_builder()
+ actual = self._read_test_table(read_builder).sort_by('user_id')
+
+ expected = pa.Table.from_pydict({
+ 'user_id': list(range(11, 21)),
+ 'item_id': [1000 + i for i in range(11, 21)],
+ 'behavior': [f'snap{i}' for i in range(11, 21)],
+ 'dt': ['p1' if i % 2 == 1 else 'p2' for i in range(11, 21)],
+ }, schema=self.pa_schema).sort_by('user_id')
+ self.assertEqual(expected, actual)
+
def _write_test_table(self, table):
write_builder = table.new_batch_write_builder()
diff --git a/paimon-python/pypaimon/tests/rest/rest_base_test.py
b/paimon-python/pypaimon/tests/rest/rest_base_test.py
index 3a83ccb285..e45fac5cde 100644
--- a/paimon-python/pypaimon/tests/rest/rest_base_test.py
+++ b/paimon-python/pypaimon/tests/rest/rest_base_test.py
@@ -25,17 +25,16 @@ import uuid
import pyarrow as pa
+from pypaimon import CatalogFactory, Schema
from pypaimon.api.api_response import ConfigResponse
from pypaimon.api.auth import BearTokenAuthProvider
from pypaimon.api.options import Options
from pypaimon.catalog.catalog_context import CatalogContext
-from pypaimon import CatalogFactory
from pypaimon.catalog.rest.rest_catalog import RESTCatalog
from pypaimon.catalog.rest.table_metadata import TableMetadata
from pypaimon.common.identifier import Identifier
from pypaimon.schema.data_types import (ArrayType, AtomicType, DataField,
MapType)
-from pypaimon import Schema
from pypaimon.schema.table_schema import TableSchema
from pypaimon.tests.rest.rest_server import RESTCatalogServer
diff --git a/paimon-python/pypaimon/write/file_store_commit.py
b/paimon-python/pypaimon/write/file_store_commit.py
index 10d2796b76..4e5b4d723e 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -27,7 +27,7 @@ from pypaimon.manifest.manifest_list_manager import
ManifestListManager
from pypaimon.manifest.schema.manifest_entry import ManifestEntry
from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta
from pypaimon.manifest.schema.simple_stats import SimpleStats
-from pypaimon.read.table_scan import TableScan
+from pypaimon.read.scanner.full_starting_scanner import FullStartingScanner
from pypaimon.snapshot.snapshot import Snapshot
from pypaimon.snapshot.snapshot_commit import (PartitionStatistics,
SnapshotCommit)
@@ -101,7 +101,7 @@ class FileStoreCommit:
f"in {msg.partition} does not belong to
this partition")
commit_entries = []
- current_entries = TableScan(self.table, partition_filter, None,
[]).plan_files()
+ current_entries = FullStartingScanner(self.table, partition_filter,
None, []).plan_files()
for entry in current_entries:
entry.kind = 1
commit_entries.append(entry)