This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 64293b7475 [python] correct with_slice index for split with multi
files (#7153)
64293b7475 is described below
commit 64293b7475e586f14550abf97ba9f15e9707c1fa
Author: XiaoHongbo <[email protected]>
AuthorDate: Fri Jan 30 10:12:29 2026 +0800
[python] correct with_slice index for split with multi files (#7153)
---
.../read/scanner/data_evolution_split_generator.py | 101 ++++++++++-----------
.../pypaimon/tests/data_evolution_test.py | 62 +++++++++++++
2 files changed, 108 insertions(+), 55 deletions(-)
diff --git
a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py
b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py
index 2a4628b3d4..d053db5f56 100644
--- a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py
+++ b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py
@@ -317,70 +317,61 @@ class DataEvolutionSplitGenerator(AbstractSplitGenerator):
"""
shard_file_idx_map = {}
- # Find the first non-blob file to determine the row range for this
split
- data_file = None
+ # First pass: data files only. Compute range and apply directly to
avoid second-pass lookup.
+ current_pos = file_end_pos
+ data_file_infos = []
for file in split.files:
- if not self._is_blob_file(file.file_name):
- data_file = file
- break
-
- if data_file is None:
+ if self._is_blob_file(file.file_name):
+ continue
+ file_begin_pos = current_pos
+ current_pos += file.row_count
+ data_file_range = self._compute_file_range(
+ plan_start_pos, plan_end_pos, file_begin_pos, file.row_count
+ )
+ data_file_infos.append((file, data_file_range))
+ if data_file_range is not None:
+ shard_file_idx_map[file.file_name] = data_file_range
+
+ if not data_file_infos:
# No data file, skip this split
shard_file_idx_map[self.NEXT_POS_KEY] = file_end_pos
return shard_file_idx_map
- # Calculate the row range based on the data file position
- file_begin_pos = file_end_pos
- file_end_pos += data_file.row_count
- data_file_first_row_id = data_file.first_row_id if
data_file.first_row_id is not None else 0
+ next_pos = current_pos
- # Determine the row range for the data file in this split using shared
helper
- data_file_range = self._compute_file_range(
- plan_start_pos, plan_end_pos, file_begin_pos, data_file.row_count
- )
-
- # Apply ranges to each file in the split
+ # Second pass: only blob files (data files already in
shard_file_idx_map from first pass)
for file in split.files:
- if self._is_blob_file(file.file_name):
- # For blob files, calculate range based on their first_row_id
- if data_file_range is None:
- # Data file is completely within shard, blob files should
also be
- continue
- elif data_file_range == (-1, -1):
- # Data file is completely outside shard, blob files should
be skipped
- shard_file_idx_map[file.file_name] = (-1, -1)
- else:
- # Calculate blob file's position relative to data file's
first_row_id
- blob_first_row_id = file.first_row_id if file.first_row_id
is not None else 0
- # Blob's position relative to data file start
- blob_rel_start = blob_first_row_id - data_file_first_row_id
- blob_rel_end = blob_rel_start + file.row_count
-
- # Shard range relative to data file start
- shard_start = data_file_range[0]
- shard_end = data_file_range[1]
-
- # Intersect blob's range with shard range
- intersect_start = max(blob_rel_start, shard_start)
- intersect_end = min(blob_rel_end, shard_end)
-
- if intersect_start >= intersect_end:
- # Blob file is completely outside shard range
- shard_file_idx_map[file.file_name] = (-1, -1)
- elif intersect_start == blob_rel_start and intersect_end
== blob_rel_end:
- # Blob file is completely within shard range, no
slicing needed
- pass
- else:
- # Convert to file-local indices
- local_start = intersect_start - blob_rel_start
- local_end = intersect_end - blob_rel_start
- shard_file_idx_map[file.file_name] = (local_start,
local_end)
+ if not self._is_blob_file(file.file_name):
+ continue
+ blob_first_row_id = file.first_row_id if file.first_row_id is not
None else 0
+ data_file_range = None
+ data_file_first_row_id = None
+ for df, fr in data_file_infos:
+ df_first = df.first_row_id if df.first_row_id is not None else 0
+ if df_first <= blob_first_row_id < df_first + df.row_count:
+ data_file_range = fr
+ data_file_first_row_id = df_first
+ break
+ if data_file_range is None:
+ continue
+ if data_file_range == (-1, -1):
+ shard_file_idx_map[file.file_name] = (-1, -1)
+ continue
+ blob_rel_start = blob_first_row_id - data_file_first_row_id
+ blob_rel_end = blob_rel_start + file.row_count
+ shard_start, shard_end = data_file_range
+ intersect_start = max(blob_rel_start, shard_start)
+ intersect_end = min(blob_rel_end, shard_end)
+ if intersect_start >= intersect_end:
+ shard_file_idx_map[file.file_name] = (-1, -1)
+ elif intersect_start == blob_rel_start and intersect_end ==
blob_rel_end:
+ pass
else:
- # Data file
- if data_file_range is not None:
- shard_file_idx_map[file.file_name] = data_file_range
+ local_start = intersect_start - blob_rel_start
+ local_end = intersect_end - blob_rel_start
+ shard_file_idx_map[file.file_name] = (local_start, local_end)
- shard_file_idx_map[self.NEXT_POS_KEY] = file_end_pos
+ shard_file_idx_map[self.NEXT_POS_KEY] = next_pos
return shard_file_idx_map
def _wrap_to_indexed_splits(self, splits: List[Split]) -> List[Split]:
diff --git a/paimon-python/pypaimon/tests/data_evolution_test.py
b/paimon-python/pypaimon/tests/data_evolution_test.py
index 38034078e7..1ffb7dbcc4 100644
--- a/paimon-python/pypaimon/tests/data_evolution_test.py
+++ b/paimon-python/pypaimon/tests/data_evolution_test.py
@@ -85,6 +85,68 @@ class DataEvolutionTest(unittest.TestCase):
]))
self.assertEqual(actual_data, expect_data)
+ def test_with_slice(self):
+ pa_schema = pa.schema([
+ ("id", pa.int64()),
+ ("b", pa.int32()),
+ ("c", pa.int32()),
+ ])
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ "row-tracking.enabled": "true",
+ "data-evolution.enabled": "true",
+ "source.split.target-size": "512m",
+ },
+ )
+ table_name = "default.test_with_slice_data_evolution"
+ self.catalog.create_table(table_name, schema, ignore_if_exists=True)
+ table = self.catalog.get_table(table_name)
+
+ for batch in [
+ {"id": [1, 2], "b": [10, 20], "c": [100, 200]},
+ {"id": [1001, 2001], "b": [1011, 2011], "c": [1001, 2001]},
+ {"id": [-1, -2], "b": [-10, -20], "c": [-100, -200]},
+ ]:
+ wb = table.new_batch_write_builder()
+ tw = wb.new_write()
+ tc = wb.new_commit()
+ tw.write_arrow(pa.Table.from_pydict(batch, schema=pa_schema))
+ tc.commit(tw.prepare_commit())
+ tw.close()
+ tc.close()
+
+ rb = table.new_read_builder()
+ full_splits = rb.new_scan().plan().splits()
+ full_result = rb.new_read().to_pandas(full_splits)
+ self.assertEqual(
+ len(full_result),
+ 6,
+ "Full scan should return 6 rows",
+ )
+ self.assertEqual(
+ sorted(full_result["id"].tolist()),
+ [-2, -1, 1, 2, 1001, 2001],
+ "Full set ids mismatch",
+ )
+
+ # with_slice(1, 4) -> row indices [1, 2, 3] -> 3 rows with id in (2,
1001, 2001)
+ scan = rb.new_scan().with_slice(1, 4)
+ splits = scan.plan().splits()
+ result = rb.new_read().to_pandas(splits)
+ self.assertEqual(
+ len(result),
+ 3,
+ "with_slice(1, 4) should return 3 rows (indices 1,2,3). "
+ "Bug: DataEvolutionSplitGenerator returns 2 when split has
multiple data files.",
+ )
+ ids = result["id"].tolist()
+ self.assertEqual(
+ sorted(ids),
+ [2, 1001, 2001],
+ "with_slice(1, 4) should return id in (2, 1001, 2001). Got ids=%s"
% ids,
+ )
+
def test_multiple_appends(self):
simple_pa_schema = pa.schema([
('f0', pa.int32()),