This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 1d3e5e5e9832befd8e571c5c432481bf2af612c6 Author: umi <[email protected]> AuthorDate: Fri Oct 24 10:39:07 2025 +0800 [Python] Blob read supports with_shard (#6465) --- .../pypaimon/read/scanner/full_starting_scanner.py | 62 +++++++++++++++++++++- paimon-python/pypaimon/read/split_read.py | 6 ++- paimon-python/pypaimon/tests/blob_table_test.py | 62 ++++++++++++++++++++++ .../pypaimon/tests/data_evolution_test.py | 52 ++++++++++++++++++ 4 files changed, 179 insertions(+), 3 deletions(-) diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py index 36dba3bdd1..b3c18a7bb6 100644 --- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py +++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py @@ -157,6 +157,66 @@ class FullStartingScanner(StartingScanner): return filtered_partitioned_files, plan_start_row, plan_end_row + def _data_evolution_filter_by_shard(self, partitioned_files: defaultdict) -> (defaultdict, int, int): + total_row = 0 + first_row_id_set = set() + # Sort by file creation time to ensure consistent sharding + for key, file_entries in partitioned_files.items(): + for entry in file_entries: + if entry.file.first_row_id is None: + total_row += entry.file.row_count + elif entry.file.first_row_id not in first_row_id_set: + first_row_id_set.add(entry.file.first_row_id) + total_row += entry.file.row_count + + # Calculate number of rows this shard should process + # Last shard handles all remaining rows (handles non-divisible cases) + if self.idx_of_this_subtask == self.number_of_para_subtasks - 1: + num_row = total_row - total_row // self.number_of_para_subtasks * self.idx_of_this_subtask + else: + num_row = total_row // self.number_of_para_subtasks + # Calculate start row and end row position for current shard in all data + start_row = self.idx_of_this_subtask * (total_row // self.number_of_para_subtasks) + end_row = start_row + num_row + + plan_start_row = 0 + plan_end_row = 0 + entry_end_row = 0 # end row position of current file in all data + splits_start_row = 0 + filtered_partitioned_files = defaultdict(list) + # Iterate through all file entries to find files that overlap with current shard range + for key, file_entries in partitioned_files.items(): + filtered_entries = [] + first_row_id_set = set() + for entry in file_entries: + if entry.file.first_row_id is not None: + if entry.file.first_row_id in first_row_id_set: + filtered_entries.append(entry) + continue + else: + first_row_id_set.add(entry.file.first_row_id) + entry_begin_row = entry_end_row # Starting row position of current file in all data + entry_end_row += entry.file.row_count # Update to row position after current file + + # If current file is completely after shard range, stop iteration + if entry_begin_row >= end_row: + break + # If current file is completely before shard range, skip it + if entry_end_row <= start_row: + continue + if entry_begin_row <= start_row < entry_end_row: + splits_start_row = entry_begin_row + plan_start_row = start_row - entry_begin_row + # If shard end position is within current file, record relative end position + if entry_begin_row < end_row <= entry_end_row: + plan_end_row = end_row - splits_start_row + # Add files that overlap with shard range to result + filtered_entries.append(entry) + if filtered_entries: + filtered_partitioned_files[key] = filtered_entries + + return filtered_partitioned_files, plan_start_row, plan_end_row + def _compute_split_start_end_row(self, splits: List[Split], plan_start_row, plan_end_row): file_end_row = 0 # end row position of current file in all data for split in splits: @@ -356,7 +416,7 @@ class FullStartingScanner(StartingScanner): partitioned_files[(tuple(entry.partition.values), entry.bucket)].append(entry) if self.idx_of_this_subtask is not None: - partitioned_files, plan_start_row, plan_end_row = self._append_only_filter_by_shard(partitioned_files) + partitioned_files, plan_start_row, plan_end_row = self._data_evolution_filter_by_shard(partitioned_files) def weight_func(file_list: List[DataFileMeta]) -> int: return max(sum(f.file_size for f in file_list), self.open_file_cost) diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index 5d59f942bd..a744fbc4f0 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -362,8 +362,10 @@ class DataEvolutionSplitRead(SplitRead): suppliers.append( lambda files=need_merge_files: self._create_union_reader(files) ) - - return ConcatBatchReader(suppliers) + if self.split.split_start_row is not None: + return ShardBatchReader(suppliers, self.split.split_start_row, self.split.split_end_row) + else: + return ConcatBatchReader(suppliers) def _split_by_row_id(self, files: List[DataFileMeta]) -> List[List[DataFileMeta]]: """Split files by firstRowId for data evolution.""" diff --git a/paimon-python/pypaimon/tests/blob_table_test.py b/paimon-python/pypaimon/tests/blob_table_test.py index d24b3f0d5a..fe987ea0a6 100644 --- a/paimon-python/pypaimon/tests/blob_table_test.py +++ b/paimon-python/pypaimon/tests/blob_table_test.py @@ -1272,6 +1272,68 @@ class DataBlobWriterTest(unittest.TestCase): print(f" - Total data size: {total_blob_size:,} bytes ({total_blob_size / (1024*1024*1024):.2f} GB)") # noqa: E501 print(" - All blob content verified as correct") + def test_data_blob_writer_with_shard(self): + """Test DataBlobWriter with mixed data types in blob column.""" + from pypaimon import Schema + + # Create schema with blob column + pa_schema = pa.schema([ + ('id', pa.int32()), + ('type', pa.string()), + ('data', pa.large_binary()), + ]) + + schema = Schema.from_pyarrow_schema( + pa_schema, + options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true' + } + ) + self.catalog.create_table('test_db.with_shard_test', schema, False) + table = self.catalog.get_table('test_db.with_shard_test') + + # Use proper table API to create writer + write_builder = table.new_batch_write_builder() + blob_writer = write_builder.new_write() + + # Test data with different types of blob content + test_data = pa.Table.from_pydict({ + 'id': [1, 2, 3, 4, 5], + 'type': ['text', 'json', 'binary', 'image', 'pdf'], + 'data': [ + b'This is text content', + b'{"key": "value", "number": 42}', + b'\x00\x01\x02\x03\xff\xfe\xfd', + b'PNG_IMAGE_DATA_PLACEHOLDER', + b'%PDF-1.4\nPDF_CONTENT_PLACEHOLDER' + ] + }, schema=pa_schema) + + # Write mixed data + total_rows = 0 + for batch in test_data.to_batches(): + blob_writer.write_arrow_batch(batch) + total_rows += batch.num_rows + + # Test prepare commit + commit_messages = blob_writer.prepare_commit() + # Create commit and commit the data + commit = write_builder.new_commit() + commit.commit(commit_messages) + blob_writer.close() + + # Read data back using table API + read_builder = table.new_read_builder() + table_scan = read_builder.new_scan().with_shard(1, 2) + table_read = read_builder.new_read() + splits = table_scan.plan().splits() + result = table_read.to_arrow(splits) + + # Verify the data was read back correctly + self.assertEqual(result.num_rows, 3, "Should have 5 rows") + self.assertEqual(result.num_columns, 3, "Should have 3 columns") + if __name__ == '__main__': unittest.main() diff --git a/paimon-python/pypaimon/tests/data_evolution_test.py b/paimon-python/pypaimon/tests/data_evolution_test.py index 90abd2f916..1d6402b327 100644 --- a/paimon-python/pypaimon/tests/data_evolution_test.py +++ b/paimon-python/pypaimon/tests/data_evolution_test.py @@ -84,6 +84,58 @@ class DataEvolutionTest(unittest.TestCase): ])) self.assertEqual(actual_data, expect_data) + def test_with_shard(self): + simple_pa_schema = pa.schema([ + ('f0', pa.int8()), + ('f1', pa.int16()), + ]) + schema = Schema.from_pyarrow_schema(simple_pa_schema, + options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'}) + self.catalog.create_table('default.test_with_shard', schema, False) + table = self.catalog.get_table('default.test_with_shard') + + # write 1 + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + expect_data = pa.Table.from_pydict({ + 'f0': [-1, 2], + 'f1': [-1001, 1002] + }, schema=simple_pa_schema) + table_write.write_arrow(expect_data) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + # write 2 + table_write = write_builder.new_write().with_write_type(['f0']) + table_commit = write_builder.new_commit() + data2 = pa.Table.from_pydict({ + 'f0': [3, 4], + }, schema=pa.schema([ + ('f0', pa.int8()), + ])) + table_write.write_arrow(data2) + cmts = table_write.prepare_commit() + cmts[0].new_files[0].first_row_id = 0 + table_commit.commit(cmts) + table_write.close() + table_commit.close() + + read_builder = table.new_read_builder() + table_scan = read_builder.new_scan().with_shard(0, 2) + table_read = read_builder.new_read() + splits = table_scan.plan().splits() + actual_data = table_read.to_arrow(splits) + expect_data = pa.Table.from_pydict({ + 'f0': [3], + 'f1': [-1001] + }, schema=pa.schema([ + ('f0', pa.int8()), + ('f1', pa.int16()), + ])) + self.assertEqual(actual_data, expect_data) + def test_multiple_appends(self): simple_pa_schema = pa.schema([ ('f0', pa.int32()),
