This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 1d3e5e5e9832befd8e571c5c432481bf2af612c6
Author: umi <[email protected]>
AuthorDate: Fri Oct 24 10:39:07 2025 +0800

    [Python] Blob read supports with_shard (#6465)
---
 .../pypaimon/read/scanner/full_starting_scanner.py | 62 +++++++++++++++++++++-
 paimon-python/pypaimon/read/split_read.py          |  6 ++-
 paimon-python/pypaimon/tests/blob_table_test.py    | 62 ++++++++++++++++++++++
 .../pypaimon/tests/data_evolution_test.py          | 52 ++++++++++++++++++
 4 files changed, 179 insertions(+), 3 deletions(-)

diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py 
b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
index 36dba3bdd1..b3c18a7bb6 100644
--- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
@@ -157,6 +157,66 @@ class FullStartingScanner(StartingScanner):
 
         return filtered_partitioned_files, plan_start_row, plan_end_row
 
+    def _data_evolution_filter_by_shard(self, partitioned_files: defaultdict) 
-> (defaultdict, int, int):
+        total_row = 0
+        first_row_id_set = set()
+        # Sort by file creation time to ensure consistent sharding
+        for key, file_entries in partitioned_files.items():
+            for entry in file_entries:
+                if entry.file.first_row_id is None:
+                    total_row += entry.file.row_count
+                elif entry.file.first_row_id not in first_row_id_set:
+                    first_row_id_set.add(entry.file.first_row_id)
+                    total_row += entry.file.row_count
+
+        # Calculate number of rows this shard should process
+        # Last shard handles all remaining rows (handles non-divisible cases)
+        if self.idx_of_this_subtask == self.number_of_para_subtasks - 1:
+            num_row = total_row - total_row // self.number_of_para_subtasks * 
self.idx_of_this_subtask
+        else:
+            num_row = total_row // self.number_of_para_subtasks
+        # Calculate start row and end row position for current shard in all 
data
+        start_row = self.idx_of_this_subtask * (total_row // 
self.number_of_para_subtasks)
+        end_row = start_row + num_row
+
+        plan_start_row = 0
+        plan_end_row = 0
+        entry_end_row = 0  # end row position of current file in all data
+        splits_start_row = 0
+        filtered_partitioned_files = defaultdict(list)
+        # Iterate through all file entries to find files that overlap with 
current shard range
+        for key, file_entries in partitioned_files.items():
+            filtered_entries = []
+            first_row_id_set = set()
+            for entry in file_entries:
+                if entry.file.first_row_id is not None:
+                    if entry.file.first_row_id in first_row_id_set:
+                        filtered_entries.append(entry)
+                        continue
+                    else:
+                        first_row_id_set.add(entry.file.first_row_id)
+                entry_begin_row = entry_end_row  # Starting row position of 
current file in all data
+                entry_end_row += entry.file.row_count  # Update to row 
position after current file
+
+                # If current file is completely after shard range, stop 
iteration
+                if entry_begin_row >= end_row:
+                    break
+                # If current file is completely before shard range, skip it
+                if entry_end_row <= start_row:
+                    continue
+                if entry_begin_row <= start_row < entry_end_row:
+                    splits_start_row = entry_begin_row
+                    plan_start_row = start_row - entry_begin_row
+                # If shard end position is within current file, record 
relative end position
+                if entry_begin_row < end_row <= entry_end_row:
+                    plan_end_row = end_row - splits_start_row
+                # Add files that overlap with shard range to result
+                filtered_entries.append(entry)
+            if filtered_entries:
+                filtered_partitioned_files[key] = filtered_entries
+
+        return filtered_partitioned_files, plan_start_row, plan_end_row
+
     def _compute_split_start_end_row(self, splits: List[Split], 
plan_start_row, plan_end_row):
         file_end_row = 0  # end row position of current file in all data
         for split in splits:
@@ -356,7 +416,7 @@ class FullStartingScanner(StartingScanner):
             partitioned_files[(tuple(entry.partition.values), 
entry.bucket)].append(entry)
 
         if self.idx_of_this_subtask is not None:
-            partitioned_files, plan_start_row, plan_end_row = 
self._append_only_filter_by_shard(partitioned_files)
+            partitioned_files, plan_start_row, plan_end_row = 
self._data_evolution_filter_by_shard(partitioned_files)
 
         def weight_func(file_list: List[DataFileMeta]) -> int:
             return max(sum(f.file_size for f in file_list), 
self.open_file_cost)
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index 5d59f942bd..a744fbc4f0 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -362,8 +362,10 @@ class DataEvolutionSplitRead(SplitRead):
                 suppliers.append(
                     lambda files=need_merge_files: 
self._create_union_reader(files)
                 )
-
-        return ConcatBatchReader(suppliers)
+        if self.split.split_start_row is not None:
+            return ShardBatchReader(suppliers, self.split.split_start_row, 
self.split.split_end_row)
+        else:
+            return ConcatBatchReader(suppliers)
 
     def _split_by_row_id(self, files: List[DataFileMeta]) -> 
List[List[DataFileMeta]]:
         """Split files by firstRowId for data evolution."""
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py 
b/paimon-python/pypaimon/tests/blob_table_test.py
index d24b3f0d5a..fe987ea0a6 100644
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -1272,6 +1272,68 @@ class DataBlobWriterTest(unittest.TestCase):
         print(f"   - Total data size: {total_blob_size:,} bytes 
({total_blob_size / (1024*1024*1024):.2f} GB)")  # noqa: E501
         print("   - All blob content verified as correct")
 
+    def test_data_blob_writer_with_shard(self):
+        """Test DataBlobWriter with mixed data types in blob column."""
+        from pypaimon import Schema
+
+        # Create schema with blob column
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('type', pa.string()),
+            ('data', pa.large_binary()),
+        ])
+
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true'
+            }
+        )
+        self.catalog.create_table('test_db.with_shard_test', schema, False)
+        table = self.catalog.get_table('test_db.with_shard_test')
+
+        # Use proper table API to create writer
+        write_builder = table.new_batch_write_builder()
+        blob_writer = write_builder.new_write()
+
+        # Test data with different types of blob content
+        test_data = pa.Table.from_pydict({
+            'id': [1, 2, 3, 4, 5],
+            'type': ['text', 'json', 'binary', 'image', 'pdf'],
+            'data': [
+                b'This is text content',
+                b'{"key": "value", "number": 42}',
+                b'\x00\x01\x02\x03\xff\xfe\xfd',
+                b'PNG_IMAGE_DATA_PLACEHOLDER',
+                b'%PDF-1.4\nPDF_CONTENT_PLACEHOLDER'
+            ]
+        }, schema=pa_schema)
+
+        # Write mixed data
+        total_rows = 0
+        for batch in test_data.to_batches():
+            blob_writer.write_arrow_batch(batch)
+            total_rows += batch.num_rows
+
+        # Test prepare commit
+        commit_messages = blob_writer.prepare_commit()
+        # Create commit and commit the data
+        commit = write_builder.new_commit()
+        commit.commit(commit_messages)
+        blob_writer.close()
+
+        # Read data back using table API
+        read_builder = table.new_read_builder()
+        table_scan = read_builder.new_scan().with_shard(1, 2)
+        table_read = read_builder.new_read()
+        splits = table_scan.plan().splits()
+        result = table_read.to_arrow(splits)
+
+        # Verify the data was read back correctly
+        self.assertEqual(result.num_rows, 3, "Should have 5 rows")
+        self.assertEqual(result.num_columns, 3, "Should have 3 columns")
+
 
 if __name__ == '__main__':
     unittest.main()
diff --git a/paimon-python/pypaimon/tests/data_evolution_test.py 
b/paimon-python/pypaimon/tests/data_evolution_test.py
index 90abd2f916..1d6402b327 100644
--- a/paimon-python/pypaimon/tests/data_evolution_test.py
+++ b/paimon-python/pypaimon/tests/data_evolution_test.py
@@ -84,6 +84,58 @@ class DataEvolutionTest(unittest.TestCase):
         ]))
         self.assertEqual(actual_data, expect_data)
 
+    def test_with_shard(self):
+        simple_pa_schema = pa.schema([
+            ('f0', pa.int8()),
+            ('f1', pa.int16()),
+        ])
+        schema = Schema.from_pyarrow_schema(simple_pa_schema,
+                                            options={'row-tracking.enabled': 
'true', 'data-evolution.enabled': 'true'})
+        self.catalog.create_table('default.test_with_shard', schema, False)
+        table = self.catalog.get_table('default.test_with_shard')
+
+        # write 1
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        expect_data = pa.Table.from_pydict({
+            'f0': [-1, 2],
+            'f1': [-1001, 1002]
+        }, schema=simple_pa_schema)
+        table_write.write_arrow(expect_data)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        # write 2
+        table_write = write_builder.new_write().with_write_type(['f0'])
+        table_commit = write_builder.new_commit()
+        data2 = pa.Table.from_pydict({
+            'f0': [3, 4],
+        }, schema=pa.schema([
+            ('f0', pa.int8()),
+        ]))
+        table_write.write_arrow(data2)
+        cmts = table_write.prepare_commit()
+        cmts[0].new_files[0].first_row_id = 0
+        table_commit.commit(cmts)
+        table_write.close()
+        table_commit.close()
+
+        read_builder = table.new_read_builder()
+        table_scan = read_builder.new_scan().with_shard(0, 2)
+        table_read = read_builder.new_read()
+        splits = table_scan.plan().splits()
+        actual_data = table_read.to_arrow(splits)
+        expect_data = pa.Table.from_pydict({
+            'f0': [3],
+            'f1': [-1001]
+        }, schema=pa.schema([
+            ('f0', pa.int8()),
+            ('f1', pa.int16()),
+        ]))
+        self.assertEqual(actual_data, expect_data)
+
     def test_multiple_appends(self):
         simple_pa_schema = pa.schema([
             ('f0', pa.int32()),

Reply via email to