This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 048c0d4d83a1818abeb130e94f7819dff629b18f Author: XiaoHongbo <[email protected]> AuthorDate: Thu Nov 6 11:25:52 2025 +0800 [python] add test case for reading blob by to_iterator (#6536) --- paimon-python/pypaimon/tests/blob_table_test.py | 80 +++++++++++++++++++++++++ 1 file changed, 80 insertions(+) diff --git a/paimon-python/pypaimon/tests/blob_table_test.py b/paimon-python/pypaimon/tests/blob_table_test.py index fe987ea0a6..b9e2760e3e 100644 --- a/paimon-python/pypaimon/tests/blob_table_test.py +++ b/paimon-python/pypaimon/tests/blob_table_test.py @@ -1334,6 +1334,86 @@ class DataBlobWriterTest(unittest.TestCase): self.assertEqual(result.num_rows, 3, "Should have 5 rows") self.assertEqual(result.num_columns, 3, "Should have 3 columns") + def test_blob_read_row_by_row_iterator(self): + """Test reading blob data row by row using to_iterator().""" + from pypaimon import Schema + from pypaimon.table.row.blob import Blob + from pypaimon.table.row.internal_row import RowKind + + pa_schema = pa.schema([ + ('id', pa.int32()), + ('name', pa.string()), + ('blob_data', pa.large_binary()), + ]) + + schema = Schema.from_pyarrow_schema( + pa_schema, + options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true' + } + ) + self.catalog.create_table('test_db.blob_iterator_test', schema, False) + table = self.catalog.get_table('test_db.blob_iterator_test') + + expected_data = { + 1: {'name': 'Alice', 'blob': b'blob_1'}, + 2: {'name': 'Bob', 'blob': b'blob_2_data'}, + 3: {'name': 'Charlie', 'blob': b'blob_3_content'}, + 4: {'name': 'David', 'blob': b'blob_4_large_content'}, + 5: {'name': 'Eve', 'blob': b'blob_5_very_large_content_data'} + } + + test_data = pa.Table.from_pydict({ + 'id': list(expected_data.keys()), + 'name': [expected_data[i]['name'] for i in expected_data.keys()], + 'blob_data': [expected_data[i]['blob'] for i in expected_data.keys()] + }, schema=pa_schema) + + write_builder = table.new_batch_write_builder() + writer = write_builder.new_write() + writer.write_arrow(test_data) + commit_messages = writer.prepare_commit() + commit = write_builder.new_commit() + commit.commit(commit_messages) + writer.close() + + # Verify blob files were created + file_names = [f.file_name for f in commit_messages[0].new_files] + self.assertGreater( + len([f for f in file_names if f.endswith('.blob')]), 0, + "Should have at least one blob file") + + # Read using to_iterator + iterator = table.new_read_builder().new_read().to_iterator( + table.new_read_builder().new_scan().plan().splits()) + + rows = [] + value = next(iterator, None) + while value is not None: + rows.append(value) + value = next(iterator, None) + + self.assertEqual(len(rows), 5, "Should have 5 rows") + + for row in rows: + row_id = row.get_field(0) + self.assertIn(row_id, expected_data, f"ID {row_id} should be in expected data") + + expected = expected_data[row_id] + self.assertEqual(row.get_field(1), expected['name'], f"Row {row_id}: name should match") + + row_blob = row.get_field(2) + blob_bytes = row_blob.to_data() if isinstance(row_blob, Blob) else row_blob + self.assertIsInstance(blob_bytes, bytes, f"Row {row_id}: blob should be bytes") + self.assertEqual(blob_bytes, expected['blob'], f"Row {row_id}: blob data should match") + self.assertEqual(len(blob_bytes), len(expected['blob']), f"Row {row_id}: blob size should match") + + self.assertIn( + row.get_row_kind(), + [RowKind.INSERT, RowKind.UPDATE_BEFORE, RowKind.UPDATE_AFTER, RowKind.DELETE], + f"Row {row_id}: RowKind should be valid") + if __name__ == '__main__': unittest.main()
