This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 64293b7475 [python] correct with_slice index for split with multi 
files (#7153)
64293b7475 is described below

commit 64293b7475e586f14550abf97ba9f15e9707c1fa
Author: XiaoHongbo <[email protected]>
AuthorDate: Fri Jan 30 10:12:29 2026 +0800

    [python] correct with_slice index for split with multi files (#7153)
---
 .../read/scanner/data_evolution_split_generator.py | 101 ++++++++++-----------
 .../pypaimon/tests/data_evolution_test.py          |  62 +++++++++++++
 2 files changed, 108 insertions(+), 55 deletions(-)

diff --git 
a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py 
b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py
index 2a4628b3d4..d053db5f56 100644
--- a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py
+++ b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py
@@ -317,70 +317,61 @@ class DataEvolutionSplitGenerator(AbstractSplitGenerator):
         """
         shard_file_idx_map = {}
         
-        # Find the first non-blob file to determine the row range for this 
split
-        data_file = None
+        # First pass: data files only. Compute range and apply directly to 
avoid second-pass lookup.
+        current_pos = file_end_pos
+        data_file_infos = []
         for file in split.files:
-            if not self._is_blob_file(file.file_name):
-                data_file = file
-                break
-        
-        if data_file is None:
+            if self._is_blob_file(file.file_name):
+                continue
+            file_begin_pos = current_pos
+            current_pos += file.row_count
+            data_file_range = self._compute_file_range(
+                plan_start_pos, plan_end_pos, file_begin_pos, file.row_count
+            )
+            data_file_infos.append((file, data_file_range))
+            if data_file_range is not None:
+                shard_file_idx_map[file.file_name] = data_file_range
+
+        if not data_file_infos:
             # No data file, skip this split
             shard_file_idx_map[self.NEXT_POS_KEY] = file_end_pos
             return shard_file_idx_map
 
-        # Calculate the row range based on the data file position
-        file_begin_pos = file_end_pos
-        file_end_pos += data_file.row_count
-        data_file_first_row_id = data_file.first_row_id if 
data_file.first_row_id is not None else 0
+        next_pos = current_pos
 
-        # Determine the row range for the data file in this split using shared 
helper
-        data_file_range = self._compute_file_range(
-            plan_start_pos, plan_end_pos, file_begin_pos, data_file.row_count
-        )
-        
-        # Apply ranges to each file in the split
+        # Second pass: only blob files (data files already in 
shard_file_idx_map from first pass)
         for file in split.files:
-            if self._is_blob_file(file.file_name):
-                # For blob files, calculate range based on their first_row_id
-                if data_file_range is None:
-                    # Data file is completely within shard, blob files should 
also be
-                    continue
-                elif data_file_range == (-1, -1):
-                    # Data file is completely outside shard, blob files should 
be skipped
-                    shard_file_idx_map[file.file_name] = (-1, -1)
-                else:
-                    # Calculate blob file's position relative to data file's 
first_row_id
-                    blob_first_row_id = file.first_row_id if file.first_row_id 
is not None else 0
-                    # Blob's position relative to data file start
-                    blob_rel_start = blob_first_row_id - data_file_first_row_id
-                    blob_rel_end = blob_rel_start + file.row_count
-                    
-                    # Shard range relative to data file start
-                    shard_start = data_file_range[0]
-                    shard_end = data_file_range[1]
-                    
-                    # Intersect blob's range with shard range
-                    intersect_start = max(blob_rel_start, shard_start)
-                    intersect_end = min(blob_rel_end, shard_end)
-                    
-                    if intersect_start >= intersect_end:
-                        # Blob file is completely outside shard range
-                        shard_file_idx_map[file.file_name] = (-1, -1)
-                    elif intersect_start == blob_rel_start and intersect_end 
== blob_rel_end:
-                        # Blob file is completely within shard range, no 
slicing needed
-                        pass
-                    else:
-                        # Convert to file-local indices
-                        local_start = intersect_start - blob_rel_start
-                        local_end = intersect_end - blob_rel_start
-                        shard_file_idx_map[file.file_name] = (local_start, 
local_end)
+            if not self._is_blob_file(file.file_name):
+                continue
+            blob_first_row_id = file.first_row_id if file.first_row_id is not 
None else 0
+            data_file_range = None
+            data_file_first_row_id = None
+            for df, fr in data_file_infos:
+                df_first = df.first_row_id if df.first_row_id is not None else 0
+                if df_first <= blob_first_row_id < df_first + df.row_count:
+                    data_file_range = fr
+                    data_file_first_row_id = df_first
+                    break
+            if data_file_range is None:
+                continue
+            if data_file_range == (-1, -1):
+                shard_file_idx_map[file.file_name] = (-1, -1)
+                continue
+            blob_rel_start = blob_first_row_id - data_file_first_row_id
+            blob_rel_end = blob_rel_start + file.row_count
+            shard_start, shard_end = data_file_range
+            intersect_start = max(blob_rel_start, shard_start)
+            intersect_end = min(blob_rel_end, shard_end)
+            if intersect_start >= intersect_end:
+                shard_file_idx_map[file.file_name] = (-1, -1)
+            elif intersect_start == blob_rel_start and intersect_end == 
blob_rel_end:
+                pass
             else:
-                # Data file
-                if data_file_range is not None:
-                    shard_file_idx_map[file.file_name] = data_file_range
+                local_start = intersect_start - blob_rel_start
+                local_end = intersect_end - blob_rel_start
+                shard_file_idx_map[file.file_name] = (local_start, local_end)
 
-        shard_file_idx_map[self.NEXT_POS_KEY] = file_end_pos
+        shard_file_idx_map[self.NEXT_POS_KEY] = next_pos
         return shard_file_idx_map
 
     def _wrap_to_indexed_splits(self, splits: List[Split]) -> List[Split]:
diff --git a/paimon-python/pypaimon/tests/data_evolution_test.py 
b/paimon-python/pypaimon/tests/data_evolution_test.py
index 38034078e7..1ffb7dbcc4 100644
--- a/paimon-python/pypaimon/tests/data_evolution_test.py
+++ b/paimon-python/pypaimon/tests/data_evolution_test.py
@@ -85,6 +85,68 @@ class DataEvolutionTest(unittest.TestCase):
         ]))
         self.assertEqual(actual_data, expect_data)
 
+    def test_with_slice(self):
+        pa_schema = pa.schema([
+            ("id", pa.int64()),
+            ("b", pa.int32()),
+            ("c", pa.int32()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                "row-tracking.enabled": "true",
+                "data-evolution.enabled": "true",
+                "source.split.target-size": "512m",
+            },
+        )
+        table_name = "default.test_with_slice_data_evolution"
+        self.catalog.create_table(table_name, schema, ignore_if_exists=True)
+        table = self.catalog.get_table(table_name)
+
+        for batch in [
+            {"id": [1, 2], "b": [10, 20], "c": [100, 200]},
+            {"id": [1001, 2001], "b": [1011, 2011], "c": [1001, 2001]},
+            {"id": [-1, -2], "b": [-10, -20], "c": [-100, -200]},
+        ]:
+            wb = table.new_batch_write_builder()
+            tw = wb.new_write()
+            tc = wb.new_commit()
+            tw.write_arrow(pa.Table.from_pydict(batch, schema=pa_schema))
+            tc.commit(tw.prepare_commit())
+            tw.close()
+            tc.close()
+
+        rb = table.new_read_builder()
+        full_splits = rb.new_scan().plan().splits()
+        full_result = rb.new_read().to_pandas(full_splits)
+        self.assertEqual(
+            len(full_result),
+            6,
+            "Full scan should return 6 rows",
+        )
+        self.assertEqual(
+            sorted(full_result["id"].tolist()),
+            [-2, -1, 1, 2, 1001, 2001],
+            "Full set ids mismatch",
+        )
+
+        # with_slice(1, 4) -> row indices [1, 2, 3] -> 3 rows with id in (2, 
1001, 2001)
+        scan = rb.new_scan().with_slice(1, 4)
+        splits = scan.plan().splits()
+        result = rb.new_read().to_pandas(splits)
+        self.assertEqual(
+            len(result),
+            3,
+            "with_slice(1, 4) should return 3 rows (indices 1,2,3). "
+            "Bug: DataEvolutionSplitGenerator returns 2 when split has 
multiple data files.",
+        )
+        ids = result["id"].tolist()
+        self.assertEqual(
+            sorted(ids),
+            [2, 1001, 2001],
+            "with_slice(1, 4) should return id in (2, 1001, 2001). Got ids=%s" 
% ids,
+        )
+
     def test_multiple_appends(self):
         simple_pa_schema = pa.schema([
             ('f0', pa.int32()),

Reply via email to