This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e2401be70e [python] add test case for reading blob by to_iterator
(#6536)
e2401be70e is described below
commit e2401be70ebe2176c6e3481601d32aabb1384392
Author: XiaoHongbo <[email protected]>
AuthorDate: Thu Nov 6 11:25:52 2025 +0800
[python] add test case for reading blob by to_iterator (#6536)
---
paimon-python/pypaimon/tests/blob_table_test.py | 80 +++++++++++++++++++++++++
1 file changed, 80 insertions(+)
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py
b/paimon-python/pypaimon/tests/blob_table_test.py
index 67813ca5c3..1b76499efb 100644
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -1438,6 +1438,86 @@ class DataBlobWriterTest(unittest.TestCase):
self.assertEqual(result.num_rows, 3, "Should have 5 rows")
self.assertEqual(result.num_columns, 3, "Should have 3 columns")
+ def test_blob_read_row_by_row_iterator(self):
+ """Test reading blob data row by row using to_iterator()."""
+ from pypaimon import Schema
+ from pypaimon.table.row.blob import Blob
+ from pypaimon.table.row.internal_row import RowKind
+
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('blob_data', pa.large_binary()),
+ ])
+
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true'
+ }
+ )
+ self.catalog.create_table('test_db.blob_iterator_test', schema, False)
+ table = self.catalog.get_table('test_db.blob_iterator_test')
+
+ expected_data = {
+ 1: {'name': 'Alice', 'blob': b'blob_1'},
+ 2: {'name': 'Bob', 'blob': b'blob_2_data'},
+ 3: {'name': 'Charlie', 'blob': b'blob_3_content'},
+ 4: {'name': 'David', 'blob': b'blob_4_large_content'},
+ 5: {'name': 'Eve', 'blob': b'blob_5_very_large_content_data'}
+ }
+
+ test_data = pa.Table.from_pydict({
+ 'id': list(expected_data.keys()),
+ 'name': [expected_data[i]['name'] for i in expected_data.keys()],
+ 'blob_data': [expected_data[i]['blob'] for i in
expected_data.keys()]
+ }, schema=pa_schema)
+
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.write_arrow(test_data)
+ commit_messages = writer.prepare_commit()
+ commit = write_builder.new_commit()
+ commit.commit(commit_messages)
+ writer.close()
+
+ # Verify blob files were created
+ file_names = [f.file_name for f in commit_messages[0].new_files]
+ self.assertGreater(
+ len([f for f in file_names if f.endswith('.blob')]), 0,
+ "Should have at least one blob file")
+
+ # Read using to_iterator
+ iterator = table.new_read_builder().new_read().to_iterator(
+ table.new_read_builder().new_scan().plan().splits())
+
+ rows = []
+ value = next(iterator, None)
+ while value is not None:
+ rows.append(value)
+ value = next(iterator, None)
+
+ self.assertEqual(len(rows), 5, "Should have 5 rows")
+
+ for row in rows:
+ row_id = row.get_field(0)
+ self.assertIn(row_id, expected_data, f"ID {row_id} should be in
expected data")
+
+ expected = expected_data[row_id]
+ self.assertEqual(row.get_field(1), expected['name'], f"Row
{row_id}: name should match")
+
+ row_blob = row.get_field(2)
+ blob_bytes = row_blob.to_data() if isinstance(row_blob, Blob) else
row_blob
+ self.assertIsInstance(blob_bytes, bytes, f"Row {row_id}: blob
should be bytes")
+ self.assertEqual(blob_bytes, expected['blob'], f"Row {row_id}:
blob data should match")
+ self.assertEqual(len(blob_bytes), len(expected['blob']), f"Row
{row_id}: blob size should match")
+
+ self.assertIn(
+ row.get_row_kind(),
+ [RowKind.INSERT, RowKind.UPDATE_BEFORE, RowKind.UPDATE_AFTER,
RowKind.DELETE],
+ f"Row {row_id}: RowKind should be valid")
+
if __name__ == '__main__':
unittest.main()