This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2799c014b7 [python] Fix read large volume of blob data (#6701)
2799c014b7 is described below

commit 2799c014b70e2116193bf50cc0b14ce03707bbfc
Author: umi <[email protected]>
AuthorDate: Tue Dec 2 11:11:04 2025 +0800

    [python] Fix read large volume of blob data (#6701)
---
 .../pypaimon/read/reader/concat_batch_reader.py    |  18 +-
 .../pypaimon/read/reader/format_blob_reader.py     |   7 +-
 paimon-python/pypaimon/tests/blob_table_test.py    | 243 +++++++++++++++++++++
 3 files changed, 258 insertions(+), 10 deletions(-)

diff --git a/paimon-python/pypaimon/read/reader/concat_batch_reader.py 
b/paimon-python/pypaimon/read/reader/concat_batch_reader.py
index fee159b209..4b86265d77 100644
--- a/paimon-python/pypaimon/read/reader/concat_batch_reader.py
+++ b/paimon-python/pypaimon/read/reader/concat_batch_reader.py
@@ -20,6 +20,7 @@ import collections
 from typing import Callable, List, Optional
 
 import pyarrow as pa
+import pyarrow.dataset as ds
 from pyarrow import RecordBatch
 
 from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
@@ -89,14 +90,15 @@ class MergeAllBatchReader(RecordBatchReader):
     into a single batch for processing.
     """
 
-    def __init__(self, reader_suppliers: List[Callable]):
+    def __init__(self, reader_suppliers: List[Callable], batch_size: int = 
4096):
         self.reader_suppliers = reader_suppliers
         self.merged_batch: Optional[RecordBatch] = None
-        self.batch_created = False
+        self.reader = None
+        self._batch_size = batch_size
 
     def read_arrow_batch(self) -> Optional[RecordBatch]:
-        if self.batch_created:
-            return None
+        if self.reader:
+            return self.reader.read_next_batch()
 
         all_batches = []
 
@@ -140,10 +142,10 @@ class MergeAllBatchReader(RecordBatchReader):
                     )
         else:
             self.merged_batch = None
-
-        self.batch_created = True
-        return self.merged_batch
+        dataset = ds.InMemoryDataset(self.merged_batch)
+        self.reader = dataset.scanner(batch_size=self._batch_size).to_reader()
+        return self.reader.read_next_batch()
 
     def close(self) -> None:
         self.merged_batch = None
-        self.batch_created = False
+        self.reader = None
diff --git a/paimon-python/pypaimon/read/reader/format_blob_reader.py 
b/paimon-python/pypaimon/read/reader/format_blob_reader.py
index b993dc9221..5e0affe7d8 100644
--- a/paimon-python/pypaimon/read/reader/format_blob_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_blob_reader.py
@@ -34,12 +34,13 @@ from pypaimon.table.row.row_kind import RowKind
 class FormatBlobReader(RecordBatchReader):
 
     def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str],
-                 full_fields: List[DataField], push_down_predicate: Any, 
blob_as_descriptor: bool):
+                 full_fields: List[DataField], push_down_predicate: Any, 
blob_as_descriptor: bool,
+                 batch_size: int = 4096):
         self._file_io = file_io
         self._file_path = file_path
         self._push_down_predicate = push_down_predicate
         self._blob_as_descriptor = blob_as_descriptor
-
+        self._batch_size = batch_size
         # Get file size
         self._file_size = file_io.get_file_size(file_path)
 
@@ -92,6 +93,8 @@ class FormatBlobReader(RecordBatchReader):
                     pydict_data[field_name].append(blob_data)
 
                 records_in_batch += 1
+                if records_in_batch >= self._batch_size:
+                    break
 
         except StopIteration:
             # Stop immediately when StopIteration occurs
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py 
b/paimon-python/pypaimon/tests/blob_table_test.py
index 4dd96dbf26..760fb1b12d 100644
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -1163,6 +1163,116 @@ class DataBlobWriterTest(unittest.TestCase):
         print(
             f"✅ Large blob end-to-end test passed: wrote and read back 
{len(blob_data)} large blob records correctly")  # noqa: E501
 
+    def test_blob_write_read_large_data_volume(self):
+        from pypaimon import Schema
+
+        # Create schema with blob column
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('batch_id', pa.int32()),
+            ('metadata', pa.string()),
+            ('large_blob', pa.large_binary()),
+        ])
+
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true'
+            }
+        )
+        self.catalog.create_table('test_db.blob_large_data_volume', schema, 
False)
+        table = self.catalog.get_table('test_db.blob_large_data_volume')
+
+        large_blob_size = 5 * 1024
+        blob_pattern = b'LARGE_BLOB_PATTERN_' + b'X' * 1024  # ~1KB pattern
+        pattern_size = len(blob_pattern)
+        repetitions = large_blob_size // pattern_size
+        large_blob_data = blob_pattern * repetitions
+
+        num_row = 20000
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        expected = pa.Table.from_pydict({
+            'id': [1] * num_row,
+            'batch_id': [11] * num_row,
+            'metadata': [f'Large blob batch {11}'] * num_row,
+            'large_blob': [i.to_bytes(2, byteorder='little') + large_blob_data 
for i in range(num_row)]
+        }, schema=pa_schema)
+        writer.write_arrow(expected)
+
+        # Commit all data at once
+        commit_messages = writer.prepare_commit()
+        commit = write_builder.new_commit()
+        commit.commit(commit_messages)
+        writer.close()
+
+        # Read data back
+        read_builder = table.new_read_builder()
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        result = table_read.to_arrow(table_scan.plan().splits())
+
+        # Verify the data
+        self.assertEqual(result.num_rows, num_row)
+        self.assertEqual(result.num_columns, 4)
+
+        self.assertEqual(expected, result)
+
+    def test_blob_write_read_large_data_volume_rolling(self):
+        from pypaimon import Schema
+
+        # Create schema with blob column
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('batch_id', pa.int32()),
+            ('metadata', pa.string()),
+            ('large_blob', pa.large_binary()),
+        ])
+
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true',
+                'blob.target-file-size': '21MB'
+            }
+        )
+        self.catalog.create_table('test_db.large_data_volume_rolling', schema, 
False)
+        table = self.catalog.get_table('test_db.large_data_volume_rolling')
+
+        # Create large blob data
+        large_blob_size = 5 * 1024  #
+        blob_pattern = b'LARGE_BLOB_PATTERN_' + b'X' * 1024  # ~1KB pattern
+        pattern_size = len(blob_pattern)
+        repetitions = large_blob_size // pattern_size
+        large_blob_data = blob_pattern * repetitions
+
+        actual_size = len(large_blob_data)
+        print(f"Created blob data: {actual_size:,} bytes ({actual_size / (1024 
* 1024):.2f} MB)")
+        num_row = 20000
+        expected = pa.Table.from_pydict({
+            'id': [1] * num_row,
+            'batch_id': [11] * num_row,
+            'metadata': [f'Large blob batch {11}'] * num_row,
+            'large_blob': [i.to_bytes(2, byteorder='little') + large_blob_data 
for i in range(num_row)]
+        }, schema=pa_schema)
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        writer.write_arrow(expected)
+
+        commit_messages = writer.prepare_commit()
+        commit = write_builder.new_commit()
+        commit.commit(commit_messages)
+        writer.close()
+
+        # Read data back
+        read_builder = table.new_read_builder()
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        result = table_read.to_arrow(table_scan.plan().splits())
+        self.assertEqual(expected, result)
+
     def test_blob_write_read_mixed_sizes_end_to_end(self):
         """Test end-to-end blob functionality with mixed blob sizes."""
         from pypaimon import Schema
@@ -2106,6 +2216,72 @@ class DataBlobWriterTest(unittest.TestCase):
         self.assertEqual(actual.column('id').to_pylist(), list(range(1, 161)), 
"ID column should match")
         self.assertEqual(actual, expected)
 
+    def test_blob_large_data_volume_with_shard(self):
+        from pypaimon import Schema
+
+        # Create schema with blob column
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('batch_id', pa.int32()),
+            ('metadata', pa.string()),
+            ('large_blob', pa.large_binary()),
+        ])
+
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true'
+            }
+        )
+        self.catalog.create_table('test_db.blob_large_data_volume_with_shard', 
schema, False)
+        table = 
self.catalog.get_table('test_db.blob_large_data_volume_with_shard')
+
+        large_blob_size = 5 * 1024
+        blob_pattern = b'LARGE_BLOB_PATTERN_' + b'X' * 1024  # ~1KB pattern
+        pattern_size = len(blob_pattern)
+        repetitions = large_blob_size // pattern_size
+        large_blob_data = blob_pattern * repetitions
+
+        num_row = 20000
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        expected = pa.Table.from_pydict({
+            'id': [1] * num_row,
+            'batch_id': [11] * num_row,
+            'metadata': [f'Large blob batch {11}'] * num_row,
+            'large_blob': [i.to_bytes(2, byteorder='little') + large_blob_data 
for i in range(num_row)]
+        }, schema=pa_schema)
+        writer.write_arrow(expected)
+
+        # Commit all data at once
+        commit_messages = writer.prepare_commit()
+        commit = write_builder.new_commit()
+        commit.commit(commit_messages)
+        writer.close()
+
+        # Read data back
+        read_builder = table.new_read_builder()
+        table_scan = read_builder.new_scan().with_shard(2, 3)
+        table_read = read_builder.new_read()
+        result = table_read.to_arrow(table_scan.plan().splits())
+
+        # Verify the data
+        self.assertEqual(6666, result.num_rows)
+        self.assertEqual(4, result.num_columns)
+
+        self.assertEqual(expected.slice(13334, 6666), result)
+        splits = read_builder.new_scan().plan().splits()
+        expected = table_read.to_arrow(splits)
+        splits1 = read_builder.new_scan().with_shard(0, 3).plan().splits()
+        actual1 = table_read.to_arrow(splits1)
+        splits2 = read_builder.new_scan().with_shard(1, 3).plan().splits()
+        actual2 = table_read.to_arrow(splits2)
+        splits3 = read_builder.new_scan().with_shard(2, 3).plan().splits()
+        actual3 = table_read.to_arrow(splits3)
+        actual = pa.concat_tables([actual1, actual2, actual3]).sort_by('id')
+        self.assertEqual(actual, expected)
+
     def test_data_blob_writer_with_shard(self):
         """Test DataBlobWriter with mixed data types in blob column."""
         from pypaimon import Schema
@@ -2168,6 +2344,73 @@ class DataBlobWriterTest(unittest.TestCase):
         self.assertEqual(result.num_rows, 2, "Should have 2 rows")
         self.assertEqual(result.num_columns, 3, "Should have 3 columns")
 
+    def test_blob_write_read_large_data_volume_rolling_with_shard(self):
+        from pypaimon import Schema
+
+        # Create schema with blob column
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('batch_id', pa.int32()),
+            ('metadata', pa.string()),
+            ('large_blob', pa.large_binary()),
+        ])
+
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true',
+                'blob.target-file-size': '10MB'
+            }
+        )
+        
self.catalog.create_table('test_db.test_blob_write_read_large_data_volume_rolling_with_shard',
 schema, False)
+        table = 
self.catalog.get_table('test_db.test_blob_write_read_large_data_volume_rolling_with_shard')
+
+        # Create large blob data
+        large_blob_size = 5 * 1024  #
+        blob_pattern = b'LARGE_BLOB_PATTERN_' + b'X' * 1024  # ~1KB pattern
+        pattern_size = len(blob_pattern)
+        repetitions = large_blob_size // pattern_size
+        large_blob_data = blob_pattern * repetitions
+
+        actual_size = len(large_blob_data)
+        print(f"Created blob data: {actual_size:,} bytes ({actual_size / (1024 
* 1024):.2f} MB)")
+        # Write 40 batches of data
+        num_row = 10000
+        expected = pa.Table.from_pydict({
+            'id': [1] * num_row,
+            'batch_id': [11] * num_row,
+            'metadata': [f'Large blob batch {11}'] * num_row,
+            'large_blob': [i.to_bytes(2, byteorder='little') + large_blob_data 
for i in range(num_row)]
+        }, schema=pa_schema)
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        writer.write_arrow(expected)
+
+        commit_messages = writer.prepare_commit()
+        commit = write_builder.new_commit()
+        commit.commit(commit_messages)
+        writer.close()
+
+        # Read data back
+        read_builder = table.new_read_builder()
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        result = table_read.to_arrow(table_scan.plan().splits())
+        self.assertEqual(expected, result)
+
+        splits = read_builder.new_scan().plan().splits()
+        expected = table_read.to_arrow(splits)
+        splits1 = read_builder.new_scan().with_shard(0, 3).plan().splits()
+        actual1 = table_read.to_arrow(splits1)
+        splits2 = read_builder.new_scan().with_shard(1, 3).plan().splits()
+        actual2 = table_read.to_arrow(splits2)
+        splits3 = read_builder.new_scan().with_shard(2, 3).plan().splits()
+        actual3 = table_read.to_arrow(splits3)
+        actual = pa.concat_tables([actual1, actual2, actual3]).sort_by('id')
+
+        self.assertEqual(actual, expected)
+
 
 if __name__ == '__main__':
     unittest.main()

Reply via email to