This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b9ccee6a79 [python] Support read blob row by offsets in with_shard 
feature (#6863)
b9ccee6a79 is described below

commit b9ccee6a79a495178a774fb192893bfce09567f9
Author: umi <[email protected]>
AuthorDate: Tue Dec 23 11:09:29 2025 +0800

    [python] Support read blob row by offsets in with_shard feature (#6863)
---
 .../pypaimon/read/reader/concat_batch_reader.py    |  99 +++++++++++-----
 .../read/reader/data_evolution_merge_reader.py     |  85 --------------
 .../pypaimon/read/reader/data_file_batch_reader.py |  10 +-
 .../pypaimon/read/reader/format_blob_reader.py     |  16 ++-
 .../pypaimon/read/reader/shard_batch_reader.py     |  61 ++++++++++
 .../pypaimon/read/scanner/full_starting_scanner.py |  75 ++++++++----
 paimon-python/pypaimon/read/split.py               |   7 +-
 paimon-python/pypaimon/read/split_read.py          |  77 ++++++++-----
 paimon-python/pypaimon/tests/blob_table_test.py    | 128 ++++++++++++++++++---
 .../pypaimon/tests/data_evolution_test.py          |   2 +-
 10 files changed, 366 insertions(+), 194 deletions(-)

diff --git a/paimon-python/pypaimon/read/reader/concat_batch_reader.py 
b/paimon-python/pypaimon/read/reader/concat_batch_reader.py
index aefff13ebd..3ce9db6f5e 100644
--- a/paimon-python/pypaimon/read/reader/concat_batch_reader.py
+++ b/paimon-python/pypaimon/read/reader/concat_batch_reader.py
@@ -53,36 +53,6 @@ class ConcatBatchReader(RecordBatchReader):
         self.queue.clear()
 
 
-class ShardBatchReader(ConcatBatchReader):
-
-    def __init__(self, readers, split_start_row, split_end_row):
-        super().__init__(readers)
-        self.split_start_row = split_start_row
-        self.split_end_row = split_end_row
-        self.cur_end = 0
-
-    def read_arrow_batch(self) -> Optional[RecordBatch]:
-        batch = super().read_arrow_batch()
-        if batch is None:
-            return None
-        if self.split_start_row is not None or self.split_end_row is not None:
-            cur_begin = self.cur_end  # begin idx of current batch based on 
the split
-            self.cur_end += batch.num_rows
-            # shard the first batch and the last batch
-            if self.split_start_row <= cur_begin < self.cur_end <= 
self.split_end_row:
-                return batch
-            elif cur_begin <= self.split_start_row < self.cur_end:
-                return batch.slice(self.split_start_row - cur_begin,
-                                   min(self.split_end_row, self.cur_end) - 
self.split_start_row)
-            elif cur_begin < self.split_end_row <= self.cur_end:
-                return batch.slice(0, self.split_end_row - cur_begin)
-            else:
-                # return empty RecordBatch if the batch size has not reached 
split_start_row
-                return pa.RecordBatch.from_arrays([], [])
-        else:
-            return batch
-
-
 class MergeAllBatchReader(RecordBatchReader):
     """
     A reader that accepts multiple reader suppliers and concatenates all their 
arrow batches
@@ -98,13 +68,18 @@ class MergeAllBatchReader(RecordBatchReader):
 
     def read_arrow_batch(self) -> Optional[RecordBatch]:
         if self.reader:
-            return self.reader.read_next_batch()
+            try:
+                return self.reader.read_next_batch()
+            except StopIteration:
+                return None
 
         all_batches = []
 
         # Read all batches from all reader suppliers
         for supplier in self.reader_suppliers:
             reader = supplier()
+            if reader is None:
+                continue
             try:
                 while True:
                     batch = reader.read_arrow_batch()
@@ -149,3 +124,65 @@ class MergeAllBatchReader(RecordBatchReader):
     def close(self) -> None:
         self.merged_batch = None
         self.reader = None
+
+
+class DataEvolutionMergeReader(RecordBatchReader):
+    """
+    This is a union reader which contains multiple inner readers, Each reader 
is responsible for reading one file.
+
+    This reader, assembling multiple reader into one big and great reader, 
will merge the batches from all readers.
+
+    For example, if rowOffsets is {0, 2, 0, 1, 2, 1} and fieldOffsets is {0, 
0, 1, 1, 1, 0}, it means:
+     - The first field comes from batch0, and it is at offset 0 in batch0.
+     - The second field comes from batch2, and it is at offset 0 in batch2.
+     - The third field comes from batch0, and it is at offset 1 in batch0.
+     - The fourth field comes from batch1, and it is at offset 1 in batch1.
+     - The fifth field comes from batch2, and it is at offset 1 in batch2.
+     - The sixth field comes from batch1, and it is at offset 0 in batch1.
+    """
+
+    def __init__(self, row_offsets: List[int], field_offsets: List[int], 
readers: List[Optional[RecordBatchReader]]):
+        if row_offsets is None:
+            raise ValueError("Row offsets must not be null")
+        if field_offsets is None:
+            raise ValueError("Field offsets must not be null")
+        if len(row_offsets) != len(field_offsets):
+            raise ValueError("Row offsets and field offsets must have the same 
length")
+        if not row_offsets:
+            raise ValueError("Row offsets must not be empty")
+        if not readers or len(readers) < 1:
+            raise ValueError("Readers should be more than 0")
+        self.row_offsets = row_offsets
+        self.field_offsets = field_offsets
+        self.readers = readers
+
+    def read_arrow_batch(self) -> Optional[RecordBatch]:
+        batches: List[Optional[RecordBatch]] = [None] * len(self.readers)
+        for i, reader in enumerate(self.readers):
+            if reader is not None:
+                batch = reader.read_arrow_batch()
+                if batch is None:
+                    # all readers are aligned, as long as one returns null, 
the others will also have no data
+                    return None
+                batches[i] = batch
+        # Assemble record batches from batches based on row_offsets and 
field_offsets
+        columns = []
+        names = []
+        for i in range(len(self.row_offsets)):
+            batch_index = self.row_offsets[i]
+            field_index = self.field_offsets[i]
+            if batches[batch_index] is not None:
+                column = batches[batch_index].column(field_index)
+                columns.append(column)
+                names.append(batches[batch_index].schema.names[field_index])
+        if columns:
+            return pa.RecordBatch.from_arrays(columns, names)
+        return None
+
+    def close(self) -> None:
+        try:
+            for reader in self.readers:
+                if reader is not None:
+                    reader.close()
+        except Exception as e:
+            raise IOError("Failed to close inner readers") from e
diff --git a/paimon-python/pypaimon/read/reader/data_evolution_merge_reader.py 
b/paimon-python/pypaimon/read/reader/data_evolution_merge_reader.py
deleted file mode 100644
index 43bf926862..0000000000
--- a/paimon-python/pypaimon/read/reader/data_evolution_merge_reader.py
+++ /dev/null
@@ -1,85 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-from typing import List, Optional
-
-import pyarrow as pa
-from pyarrow import RecordBatch
-
-from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
-
-
-class DataEvolutionMergeReader(RecordBatchReader):
-    """
-    This is a union reader which contains multiple inner readers, Each reader 
is responsible for reading one file.
-
-    This reader, assembling multiple reader into one big and great reader, 
will merge the batches from all readers.
-
-    For example, if rowOffsets is {0, 2, 0, 1, 2, 1} and fieldOffsets is {0, 
0, 1, 1, 1, 0}, it means:
-     - The first field comes from batch0, and it is at offset 0 in batch0.
-     - The second field comes from batch2, and it is at offset 0 in batch2.
-     - The third field comes from batch0, and it is at offset 1 in batch0.
-     - The fourth field comes from batch1, and it is at offset 1 in batch1.
-     - The fifth field comes from batch2, and it is at offset 1 in batch2.
-     - The sixth field comes from batch1, and it is at offset 0 in batch1.
-    """
-
-    def __init__(self, row_offsets: List[int], field_offsets: List[int], 
readers: List[Optional[RecordBatchReader]]):
-        if row_offsets is None:
-            raise ValueError("Row offsets must not be null")
-        if field_offsets is None:
-            raise ValueError("Field offsets must not be null")
-        if len(row_offsets) != len(field_offsets):
-            raise ValueError("Row offsets and field offsets must have the same 
length")
-        if not row_offsets:
-            raise ValueError("Row offsets must not be empty")
-        if not readers or len(readers) < 1:
-            raise ValueError("Readers should be more than 0")
-        self.row_offsets = row_offsets
-        self.field_offsets = field_offsets
-        self.readers = readers
-
-    def read_arrow_batch(self) -> Optional[RecordBatch]:
-        batches: List[Optional[RecordBatch]] = [None] * len(self.readers)
-        for i, reader in enumerate(self.readers):
-            if reader is not None:
-                batch = reader.read_arrow_batch()
-                if batch is None:
-                    # all readers are aligned, as long as one returns null, 
the others will also have no data
-                    return None
-                batches[i] = batch
-        # Assemble record batches from batches based on row_offsets and 
field_offsets
-        columns = []
-        names = []
-        for i in range(len(self.row_offsets)):
-            batch_index = self.row_offsets[i]
-            field_index = self.field_offsets[i]
-            if batches[batch_index] is not None:
-                column = batches[batch_index].column(field_index)
-                columns.append(column)
-                names.append(batches[batch_index].schema.names[field_index])
-        if columns:
-            return pa.RecordBatch.from_arrays(columns, names)
-        return None
-
-    def close(self) -> None:
-        try:
-            for reader in self.readers:
-                if reader is not None:
-                    reader.close()
-        except Exception as e:
-            raise IOError("Failed to close inner readers") from e
diff --git a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py 
b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
index 526e501b97..014d4f9da3 100644
--- a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
+++ b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
@@ -22,6 +22,7 @@ import pyarrow as pa
 from pyarrow import RecordBatch
 
 from pypaimon.read.partition_info import PartitionInfo
+from pypaimon.read.reader.format_blob_reader import FormatBlobReader
 from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
 from pypaimon.schema.data_types import DataField, PyarrowFieldParser
 from pypaimon.table.special_fields import SpecialFields
@@ -29,7 +30,7 @@ from pypaimon.table.special_fields import SpecialFields
 
 class DataFileBatchReader(RecordBatchReader):
     """
-    Reads record batch from data files.
+    Reads record batch from files of different formats
     """
 
     def __init__(self, format_reader: RecordBatchReader, index_mapping: 
List[int], partition_info: PartitionInfo,
@@ -48,8 +49,11 @@ class DataFileBatchReader(RecordBatchReader):
         self.max_sequence_number = max_sequence_number
         self.system_fields = system_fields
 
-    def read_arrow_batch(self) -> Optional[RecordBatch]:
-        record_batch = self.format_reader.read_arrow_batch()
+    def read_arrow_batch(self, start_idx=None, end_idx=None) -> 
Optional[RecordBatch]:
+        if isinstance(self.format_reader, FormatBlobReader):
+            record_batch = self.format_reader.read_arrow_batch(start_idx, 
end_idx)
+        else:
+            record_batch = self.format_reader.read_arrow_batch()
         if record_batch is None:
             return None
 
diff --git a/paimon-python/pypaimon/read/reader/format_blob_reader.py 
b/paimon-python/pypaimon/read/reader/format_blob_reader.py
index 5e0affe7d8..ecd740de4d 100644
--- a/paimon-python/pypaimon/read/reader/format_blob_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_blob_reader.py
@@ -63,7 +63,11 @@ class FormatBlobReader(RecordBatchReader):
         self._blob_iterator = None
         self._current_batch = None
 
-    def read_arrow_batch(self) -> Optional[RecordBatch]:
+    def read_arrow_batch(self, start_idx=None, end_idx=None) -> 
Optional[RecordBatch]:
+        """
+         start_idx: start index record of the blob file
+         end_idx: end index record of the blob file
+        """
         if self._blob_iterator is None:
             if self.returned:
                 return None
@@ -73,7 +77,13 @@ class FormatBlobReader(RecordBatchReader):
                 self.blob_offsets, self._fields[0]
             )
             self._blob_iterator = iter(batch_iterator)
-
+        read_size = self._batch_size
+        if start_idx is not None and end_idx is not None:
+            if self._blob_iterator.current_position >= end_idx:
+                return None
+            if self._blob_iterator.current_position < start_idx:
+                self._blob_iterator.current_position = start_idx
+            read_size = min(end_idx - self._blob_iterator.current_position, 
self._batch_size)
         # Collect records for this batch
         pydict_data = {name: [] for name in self._fields}
         records_in_batch = 0
@@ -93,7 +103,7 @@ class FormatBlobReader(RecordBatchReader):
                     pydict_data[field_name].append(blob_data)
 
                 records_in_batch += 1
-                if records_in_batch >= self._batch_size:
+                if records_in_batch >= read_size:
                     break
 
         except StopIteration:
diff --git a/paimon-python/pypaimon/read/reader/shard_batch_reader.py 
b/paimon-python/pypaimon/read/reader/shard_batch_reader.py
new file mode 100644
index 0000000000..0baf63e534
--- /dev/null
+++ b/paimon-python/pypaimon/read/reader/shard_batch_reader.py
@@ -0,0 +1,61 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+from typing import Optional
+
+from pyarrow import RecordBatch
+from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
+from pypaimon.read.reader.format_blob_reader import FormatBlobReader
+
+
+class ShardBatchReader(RecordBatchReader):
+    """
+    A reader that reads a subset of rows from a data file
+    """
+    def __init__(self, reader, start_row, end_row):
+        self.reader = reader
+        self.start_row = start_row
+        self.end_row = end_row
+        self.current_row = 0
+
+    def read_arrow_batch(self) -> Optional[RecordBatch]:
+        # Check if reader is FormatBlobReader (blob type)
+        if isinstance(self.reader.format_reader, FormatBlobReader):
+            # For blob reader, pass begin_idx and end_idx parameters
+            return self.reader.read_arrow_batch(start_idx=self.start_row, 
end_idx=self.end_row)
+        else:
+            # For non-blob reader (DataFileBatchReader), use standard 
read_arrow_batch
+            batch = self.reader.read_arrow_batch()
+
+            if batch is None:
+                return None
+
+            # Apply row range filtering for non-blob readers
+            batch_begin = self.current_row
+            self.current_row += batch.num_rows
+
+            # Check if batch is within the desired range
+            if self.start_row <= batch_begin < self.current_row <= 
self.end_row:  # batch is within the desired range
+                return batch
+            elif batch_begin < self.start_row < self.current_row:  # batch 
starts before the desired range
+                return batch.slice(self.start_row - batch_begin, self.end_row 
- self.start_row)
+            elif batch_begin < self.end_row < self.current_row:  # batch ends 
after the desired range
+                return batch.slice(0, self.end_row - batch_begin)
+            else:  # batch is outside the desired range
+                return self.read_arrow_batch()
+
+    def close(self):
+        self.reader.close()
diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py 
b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
index a0a16c1409..96a848ce01 100644
--- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
@@ -125,6 +125,10 @@ class FullStartingScanner(StartingScanner):
         return self
 
     def _append_only_filter_by_shard(self, partitioned_files: defaultdict) -> 
(defaultdict, int, int):
+        """
+        Filter file entries by shard. Only keep the files within the range, 
which means
+        that only the starting and ending files need to be further divided 
subsequently
+        """
         total_row = 0
         # Sort by file creation time to ensure consistent sharding
         for key, file_entries in partitioned_files.items():
@@ -240,31 +244,54 @@ class FullStartingScanner(StartingScanner):
         return filtered_partitioned_files, plan_start_row, plan_end_row
 
     def _compute_split_start_end_row(self, splits: List[Split], 
plan_start_row, plan_end_row):
-        file_end_row = 0  # end row position of current file in all data
+        """
+        Find files that needs to be divided for each split
+        :param splits: splits
+        :param plan_start_row: plan begin row in all splits data
+        :param plan_end_row: plan end row in all splits data
+        """
+        file_end_row = 0  # end row position of current file in all splits data
 
         for split in splits:
-            row_cnt = 0
-            files = split.files
-            split_start_row = file_end_row
-            # Iterate through all file entries to find files that overlap with 
current shard range
-            for file in files:
-                if self._is_blob_file(file.file_name):
-                    continue
-                row_cnt += file.row_count
-                file_begin_row = file_end_row  # Starting row position of 
current file in all data
-                file_end_row += file.row_count  # Update to row position after 
current file
-
-                # If shard start position is within current file, record 
actual start position and relative offset
-                if file_begin_row <= plan_start_row < file_end_row:
-                    split.split_start_row = plan_start_row - file_begin_row
-
-                # If shard end position is within current file, record 
relative end position
-                if file_begin_row < plan_end_row <= file_end_row:
-                    split.split_end_row = plan_end_row - split_start_row
-            if split.split_start_row is None:
-                split.split_start_row = 0
-            if split.split_end_row is None:
-                split.split_end_row = row_cnt
+            cur_split_end_row = file_end_row
+            # Compute split_file_idx_map for data files
+            file_end_row = self._compute_split_file_idx_map(plan_start_row, 
plan_end_row,
+                                                            split, 
cur_split_end_row, False)
+            # Compute split_file_idx_map for blob files
+            if self.data_evolution:
+                self._compute_split_file_idx_map(plan_start_row, plan_end_row,
+                                                 split, cur_split_end_row, 
True)
+
+    def _compute_split_file_idx_map(self, plan_start_row, plan_end_row, split: 
Split,
+                                    file_end_row: int, is_blob: bool = False):
+        """
+        Traverse all the files in current split, find the starting shard and 
ending shard files,
+        and add them to shard_file_idx_map;
+        - for data file, only two data files will be divided in all splits.
+        - for blob file, perhaps there will be some unnecessary files in 
addition to two files(start and end).
+          Add them to shard_file_idx_map as well, because they need to be 
removed later.
+        """
+        row_cnt = 0
+        for file in split.files:
+            if not is_blob and self._is_blob_file(file.file_name):
+                continue
+            if is_blob and not self._is_blob_file(file.file_name):
+                continue
+            row_cnt += file.row_count
+            file_begin_row = file_end_row  # Starting row position of current 
file in all data
+            file_end_row += file.row_count  # Update to row position after 
current file
+            if file_begin_row <= plan_start_row < plan_end_row <= file_end_row:
+                split.shard_file_idx_map[file.file_name] = (
+                    plan_start_row - file_begin_row, plan_end_row - 
file_begin_row)
+            # If shard start position is within current file, record actual 
start position and relative offset
+            elif file_begin_row < plan_start_row < file_end_row:
+                split.shard_file_idx_map[file.file_name] = (plan_start_row - 
file_begin_row, file.row_count)
+            # If shard end position is within current file, record relative 
end position
+            elif file_begin_row < plan_end_row < file_end_row:
+                split.shard_file_idx_map[file.file_name] = (0, plan_end_row - 
file_begin_row)
+            elif file_end_row <= plan_start_row or file_begin_row >= 
plan_end_row:
+                split.shard_file_idx_map[file.file_name] = (-1, -1)
+        return file_end_row
 
     def _primary_key_filter_by_shard(self, file_entries: List[ManifestEntry]) 
-> List[ManifestEntry]:
         filtered_entries = []
@@ -440,6 +467,7 @@ class FullStartingScanner(StartingScanner):
                                                                             
self.target_split_size)
             splits += self._build_split_from_pack(packed_files, file_entries, 
False, deletion_files_map)
         if self.idx_of_this_subtask is not None:
+            # When files are combined into splits, it is necessary to find 
files that needs to be divided for each split
             self._compute_split_start_end_row(splits, plan_start_row, 
plan_end_row)
         return splits
 
@@ -493,6 +521,7 @@ class FullStartingScanner(StartingScanner):
             partitioned_files[(tuple(entry.partition.values), 
entry.bucket)].append(entry)
 
         if self.idx_of_this_subtask is not None:
+            # shard data range: [plan_start_row, plan_end_row)
             partitioned_files, plan_start_row, plan_end_row = 
self._data_evolution_filter_by_shard(partitioned_files)
 
         def weight_func(file_list: List[DataFileMeta]) -> int:
diff --git a/paimon-python/pypaimon/read/split.py 
b/paimon-python/pypaimon/read/split.py
index 55cb955ad6..272c145137 100644
--- a/paimon-python/pypaimon/read/split.py
+++ b/paimon-python/pypaimon/read/split.py
@@ -16,8 +16,8 @@
 # limitations under the License.
 
################################################################################
 
-from dataclasses import dataclass
-from typing import List, Optional
+from dataclasses import dataclass, field
+from typing import List, Optional, Dict, Tuple
 
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
 from pypaimon.table.row.generic_row import GenericRow
@@ -33,8 +33,7 @@ class Split:
     _file_paths: List[str]
     _row_count: int
     _file_size: int
-    split_start_row: int = None
-    split_end_row: int = None
+    shard_file_idx_map: Dict[str, Tuple[int, int]] = 
field(default_factory=dict)  # file_name -> (start_idx, end_idx)
     raw_convertible: bool = False
     data_deletion_files: Optional[List[DeletionFile]] = None
 
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index 05bcaf994e..8a374f447d 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -30,11 +30,9 @@ from pypaimon.read.interval_partition import 
IntervalPartition, SortedRun
 from pypaimon.read.partition_info import PartitionInfo
 from pypaimon.read.push_down_utils import trim_predicate_by_fields
 from pypaimon.read.reader.concat_batch_reader import (ConcatBatchReader,
-                                                      MergeAllBatchReader,
-                                                      ShardBatchReader)
+                                                      MergeAllBatchReader, 
DataEvolutionMergeReader)
 from pypaimon.read.reader.concat_record_reader import ConcatRecordReader
-from pypaimon.read.reader.data_evolution_merge_reader import \
-    DataEvolutionMergeReader
+
 from pypaimon.read.reader.data_file_batch_reader import DataFileBatchReader
 from pypaimon.read.reader.drop_delete_reader import DropDeleteRecordReader
 from pypaimon.read.reader.empty_record_reader import EmptyFileRecordReader
@@ -50,6 +48,7 @@ from pypaimon.read.reader.iface.record_reader import 
RecordReader
 from pypaimon.read.reader.key_value_unwrap_reader import \
     KeyValueUnwrapRecordReader
 from pypaimon.read.reader.key_value_wrap_reader import KeyValueWrapReader
+from pypaimon.read.reader.shard_batch_reader import ShardBatchReader
 from pypaimon.read.reader.sort_merge_reader import SortMergeReaderWithMinHeap
 from pypaimon.read.split import Split
 from pypaimon.schema.data_types import DataField
@@ -331,9 +330,25 @@ class SplitRead(ABC):
 
 
 class RawFileSplitRead(SplitRead):
-    def raw_reader_supplier(self, file: DataFileMeta, dv_factory: 
Optional[Callable] = None) -> RecordReader:
-        file_batch_reader = self.file_reader_supplier(
-            file, False, self._get_final_read_data_fields(), 
self.row_tracking_enabled)
+    def raw_reader_supplier(self, file: DataFileMeta, dv_factory: 
Optional[Callable] = None) -> Optional[RecordReader]:
+        read_fields = self._get_final_read_data_fields()
+        # If the current file needs to be further divided for reading, use 
ShardBatchReader
+        if file.file_name in self.split.shard_file_idx_map:
+            (start_row, end_row) = 
self.split.shard_file_idx_map[file.file_name]
+            if (start_row, end_row) == (-1, -1):
+                return None
+            else:
+                file_batch_reader = ShardBatchReader(self.file_reader_supplier(
+                    file=file,
+                    for_merge_read=False,
+                    read_fields=read_fields,
+                    row_tracking_enabled=True), start_row, end_row)
+        else:
+            file_batch_reader = self.file_reader_supplier(
+                file=file,
+                for_merge_read=False,
+                read_fields=read_fields,
+                row_tracking_enabled=True)
         dv = dv_factory() if dv_factory else None
         if dv:
             return 
ApplyDeletionVectorReader(RowPositionReader(file_batch_reader), dv)
@@ -353,10 +368,8 @@ class RawFileSplitRead(SplitRead):
 
         if not data_readers:
             return EmptyFileRecordReader()
-        if self.split.split_start_row is not None:
-            concat_reader = ShardBatchReader(data_readers, 
self.split.split_start_row, self.split.split_end_row)
-        else:
-            concat_reader = ConcatBatchReader(data_readers)
+
+        concat_reader = ConcatBatchReader(data_readers)
         # if the table is appendonly table, we don't need extra filter, all 
predicates has pushed down
         if self.table.is_primary_key_table and self.predicate:
             return FilterRecordReader(concat_reader, self.predicate)
@@ -415,7 +428,7 @@ class DataEvolutionSplitRead(SplitRead):
         files = self.split.files
         suppliers = []
 
-        # Split files by row ID using the same logic as Java 
DataEvolutionSplitGenerator.split
+        # Split files by row ID
         split_by_row_id = self._split_by_row_id(files)
 
         for need_merge_files in split_by_row_id:
@@ -428,10 +441,8 @@ class DataEvolutionSplitRead(SplitRead):
                 suppliers.append(
                     lambda files=need_merge_files: 
self._create_union_reader(files)
                 )
-        if self.split.split_start_row is not None:
-            return ShardBatchReader(suppliers, self.split.split_start_row, 
self.split.split_end_row)
-        else:
-            return ConcatBatchReader(suppliers)
+
+        return ConcatBatchReader(suppliers)
 
     def _split_by_row_id(self, files: List[DataFileMeta]) -> 
List[List[DataFileMeta]]:
         """Split files by firstRowId for data evolution."""
@@ -532,15 +543,15 @@ class DataEvolutionSplitRead(SplitRead):
                 self.read_fields = read_fields  # create reader based on 
read_fields
                 # Create reader for this bunch
                 if len(bunch.files()) == 1:
-                    file_record_readers[i] = self._create_file_reader(
+                    suppliers = [lambda r=self._create_file_reader(
                         bunch.files()[0], read_field_names
-                    )
+                    ): r]
+                    file_record_readers[i] = MergeAllBatchReader(suppliers)
                 else:
                     # Create concatenated reader for multiple files
                     suppliers = [
-                        lambda f=file: self._create_file_reader(
-                            f, read_field_names
-                        ) for file in bunch.files()
+                        partial(self._create_file_reader, file=file,
+                                read_fields=read_field_names) for file in 
bunch.files()
                     ]
                     file_record_readers[i] = MergeAllBatchReader(suppliers)
                 self.read_fields = table_fields
@@ -553,13 +564,25 @@ class DataEvolutionSplitRead(SplitRead):
 
         return DataEvolutionMergeReader(row_offsets, field_offsets, 
file_record_readers)
 
-    def _create_file_reader(self, file: DataFileMeta, read_fields: [str]) -> 
RecordReader:
+    def _create_file_reader(self, file: DataFileMeta, read_fields: [str]) -> 
Optional[RecordReader]:
         """Create a file reader for a single file."""
-        return self.file_reader_supplier(
-            file=file,
-            for_merge_read=False,
-            read_fields=read_fields,
-            row_tracking_enabled=True)
+        # If the current file needs to be further divided for reading, use 
ShardBatchReader
+        if file.file_name in self.split.shard_file_idx_map:
+            (begin_row, end_row) = 
self.split.shard_file_idx_map[file.file_name]
+            if (begin_row, end_row) == (-1, -1):
+                return None
+            else:
+                return ShardBatchReader(self.file_reader_supplier(
+                    file=file,
+                    for_merge_read=False,
+                    read_fields=read_fields,
+                    row_tracking_enabled=True), begin_row, end_row)
+        else:
+            return self.file_reader_supplier(
+                file=file,
+                for_merge_read=False,
+                read_fields=read_fields,
+                row_tracking_enabled=True)
 
     def _split_field_bunches(self, need_merge_files: List[DataFileMeta]) -> 
List[FieldBunch]:
         """Split files into field bunches."""
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py 
b/paimon-python/pypaimon/tests/blob_table_test.py
index ce1527761f..56439cee9b 100644
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -17,12 +17,13 @@ limitations under the License.
 """
 import os
 import shutil
+import struct
 import tempfile
 import unittest
 
 import pyarrow as pa
 
-from pypaimon import CatalogFactory
+from pypaimon import CatalogFactory, Schema
 from pypaimon.table.file_store_table import FileStoreTable
 from pypaimon.write.commit_message import CommitMessage
 
@@ -2134,12 +2135,22 @@ class DataBlobWriterTest(unittest.TestCase):
         self.assertEqual(result.column('id').to_pylist(), list(range(1, 
num_blobs + 1)))
 
     def test_blob_write_read_large_data_with_rolling_with_shard(self):
-        from pypaimon import Schema
+        """
+        Test writing and reading large blob data with file rolling and 
sharding.
+
+        Test workflow:
+        - Creates a table with blob column and 10MB target file size
+        - Writes 4 batches of 40 records each (160 total records)
+        - Each record contains a 3MB blob
+        - Reads data using 3-way sharding (shard 0 - 3)
+        - Verifies blob data integrity and size
+        - Compares concatenated sharded results with full table scan
+        """
 
         # Create schema with blob column
         pa_schema = pa.schema([
             ('id', pa.int32()),
-            ('batch_id', pa.int32()),
+            ('record_id_of_batch', pa.int32()),
             ('metadata', pa.string()),
             ('large_blob', pa.large_binary()),
         ])
@@ -2164,16 +2175,16 @@ class DataBlobWriterTest(unittest.TestCase):
 
         actual_size = len(large_blob_data)
         print(f"Created blob data: {actual_size:,} bytes ({actual_size / (1024 
* 1024):.2f} MB)")
+        # Write 4 batches of 40 records
         for i in range(4):
-            # Write 40 batches of data
             write_builder = table.new_batch_write_builder()
             writer = write_builder.new_write()
-            # Write all 40 batches first
-            for batch_id in range(40):
+            # Write 40 records
+            for record_id in range(40):
                 test_data = pa.Table.from_pydict({
-                    'id': [i * 40 + batch_id + 1],  # Unique ID for each row
-                    'batch_id': [batch_id],
-                    'metadata': [f'Large blob batch {batch_id + 1}'],
+                    'id': [i * 40 + record_id + 1],  # Unique ID for each row
+                    'record_id_of_batch': [record_id],
+                    'metadata': [f'Large blob batch {record_id + 1}'],
                     'large_blob': [large_blob_data]
                 }, schema=pa_schema)
                 writer.write_arrow(test_data)
@@ -2190,12 +2201,12 @@ class DataBlobWriterTest(unittest.TestCase):
         result = table_read.to_arrow(table_scan.plan().splits())
 
         # Verify the data
-        self.assertEqual(result.num_rows, 54, "Should have 94 rows")
+        self.assertEqual(result.num_rows, 54, "Should have 54 rows")
         self.assertEqual(result.num_columns, 4, "Should have 4 columns")
 
         # Verify blob data integrity
         blob_data = result.column('large_blob').to_pylist()
-        self.assertEqual(len(blob_data), 54, "Should have 94 blob records")
+        self.assertEqual(len(blob_data), 54, "Should have 54 blob records")
         # Verify each blob
         for i, blob in enumerate(blob_data):
             self.assertEqual(len(blob), len(large_blob_data), f"Blob {i + 1} 
should be {large_blob_size:,} bytes")
@@ -2211,13 +2222,93 @@ class DataBlobWriterTest(unittest.TestCase):
         actual = pa.concat_tables([actual1, actual2, actual3]).sort_by('id')
 
         # Verify the data
-        self.assertEqual(actual.num_rows, 160, "Should have 280 rows")
+        self.assertEqual(actual.num_rows, 160, "Should have 160 rows")
         self.assertEqual(actual.num_columns, 4, "Should have 4 columns")
         self.assertEqual(actual.column('id').to_pylist(), list(range(1, 161)), 
"ID column should match")
         self.assertEqual(actual, expected)
 
+    def test_blob_rolling_with_shard(self):
+        """
+        - Writes 30 records
+        - Each record contains a 3MB blob
+        """
+
+        # Create schema with blob column
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('metadata', pa.string()),
+            ('large_blob', pa.large_binary()),
+        ])
+
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true',
+                'blob.target-file-size': '10MB'
+            }
+        )
+        self.catalog.create_table('test_db.blob_rolling_with_shard1', schema, 
False)
+        table = self.catalog.get_table('test_db.blob_rolling_with_shard1')
+
+        # Create large blob data
+        large_blob_size = 3 * 1024 * 1024  #
+        blob_pattern = b'LARGE_BLOB_PATTERN_' + b'X' * 1024  # ~1KB pattern
+        pattern_size = len(blob_pattern)
+        repetitions = large_blob_size // pattern_size
+        large_blob_data = blob_pattern * repetitions
+
+        actual_size = len(large_blob_data)
+        print(f"Created blob data: {actual_size:,} bytes ({actual_size / (1024 
* 1024):.2f} MB)")
+
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        # Write 30 records
+        for record_id in range(30):
+            test_data = pa.Table.from_pydict({
+                'id': [record_id],  # Unique ID for each row
+                'metadata': [f'Large blob batch {record_id + 1}'],
+                'large_blob': [struct.pack('<I', record_id) + large_blob_data]
+            }, schema=pa_schema)
+            writer.write_arrow(test_data)
+
+        commit_messages = writer.prepare_commit()
+        commit = write_builder.new_commit()
+        commit.commit(commit_messages)
+        writer.close()
+
+        # Read data back
+        read_builder = table.new_read_builder()
+        table_scan = read_builder.new_scan().with_shard(1, 3)
+        table_read = read_builder.new_read()
+        result = table_read.to_arrow(table_scan.plan().splits())
+
+        # Verify the data
+        self.assertEqual(result.num_rows, 10, "Should have 10 rows")
+        self.assertEqual(result.num_columns, 3, "Should have 3 columns")
+
+        # Verify blob data integrity
+        blob_data = result.column('large_blob').to_pylist()
+        self.assertEqual(len(blob_data), 10, "Should have 94 blob records")
+        # Verify each blob
+        for i, blob in enumerate(blob_data):
+            self.assertEqual(len(blob), len(large_blob_data) + 4, f"Blob {i + 
1} should be {large_blob_size:,} bytes")
+        splits = read_builder.new_scan().plan().splits()
+        expected = table_read.to_arrow(splits)
+        splits1 = read_builder.new_scan().with_shard(0, 3).plan().splits()
+        actual1 = table_read.to_arrow(splits1)
+        splits2 = read_builder.new_scan().with_shard(1, 3).plan().splits()
+        actual2 = table_read.to_arrow(splits2)
+        splits3 = read_builder.new_scan().with_shard(2, 3).plan().splits()
+        actual3 = table_read.to_arrow(splits3)
+        actual = pa.concat_tables([actual1, actual2, actual3]).sort_by('id')
+
+        # Verify the data
+        self.assertEqual(actual.num_rows, 30, "Should have 30 rows")
+        self.assertEqual(actual.num_columns, 3, "Should have 3 columns")
+        self.assertEqual(actual, expected)
+
     def test_blob_large_data_volume_with_shard(self):
-        from pypaimon import Schema
 
         # Create schema with blob column
         pa_schema = pa.schema([
@@ -2284,7 +2375,6 @@ class DataBlobWriterTest(unittest.TestCase):
 
     def test_data_blob_writer_with_shard(self):
         """Test DataBlobWriter with mixed data types in blob column."""
-        from pypaimon import Schema
 
         # Create schema with blob column
         pa_schema = pa.schema([
@@ -2345,8 +2435,12 @@ class DataBlobWriterTest(unittest.TestCase):
         self.assertEqual(result.num_columns, 3, "Should have 3 columns")
 
     def test_blob_write_read_large_data_volume_rolling_with_shard(self):
-        from pypaimon import Schema
-
+        """
+        - Writes 10000 records
+        - Each record contains a 5KB blob
+        - 'blob.target-file-size': '10MB'
+        - each blob file contains 2000 records
+        """
         # Create schema with blob column
         pa_schema = pa.schema([
             ('id', pa.int32()),
@@ -2375,7 +2469,7 @@ class DataBlobWriterTest(unittest.TestCase):
 
         actual_size = len(large_blob_data)
         print(f"Created blob data: {actual_size:,} bytes ({actual_size / (1024 
* 1024):.2f} MB)")
-        # Write 40 batches of data
+        # Write 10000 records of data
         num_row = 10000
         expected = pa.Table.from_pydict({
             'id': [1] * num_row,
diff --git a/paimon-python/pypaimon/tests/data_evolution_test.py 
b/paimon-python/pypaimon/tests/data_evolution_test.py
index 896d534f09..38034078e7 100644
--- a/paimon-python/pypaimon/tests/data_evolution_test.py
+++ b/paimon-python/pypaimon/tests/data_evolution_test.py
@@ -112,7 +112,7 @@ class DataEvolutionTest(unittest.TestCase):
         table_commit.commit(table_write.prepare_commit())
         table_write.close()
         table_commit.close()
-        # append:set first_row_id = 100 to modify the row with columns write
+        # append:write (2, "x") and ("y"), set first_row_id = 100
         write0 = write_builder.new_write().with_write_type(['f0', 'f1'])
         write1 = write_builder.new_write().with_write_type(['f2'])
         commit = write_builder.new_commit()


Reply via email to