This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2799c014b7 [python] Fix read large volume of blob data (#6701)
2799c014b7 is described below
commit 2799c014b70e2116193bf50cc0b14ce03707bbfc
Author: umi <[email protected]>
AuthorDate: Tue Dec 2 11:11:04 2025 +0800
[python] Fix read large volume of blob data (#6701)
---
.../pypaimon/read/reader/concat_batch_reader.py | 18 +-
.../pypaimon/read/reader/format_blob_reader.py | 7 +-
paimon-python/pypaimon/tests/blob_table_test.py | 243 +++++++++++++++++++++
3 files changed, 258 insertions(+), 10 deletions(-)
diff --git a/paimon-python/pypaimon/read/reader/concat_batch_reader.py
b/paimon-python/pypaimon/read/reader/concat_batch_reader.py
index fee159b209..4b86265d77 100644
--- a/paimon-python/pypaimon/read/reader/concat_batch_reader.py
+++ b/paimon-python/pypaimon/read/reader/concat_batch_reader.py
@@ -20,6 +20,7 @@ import collections
from typing import Callable, List, Optional
import pyarrow as pa
+import pyarrow.dataset as ds
from pyarrow import RecordBatch
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
@@ -89,14 +90,15 @@ class MergeAllBatchReader(RecordBatchReader):
into a single batch for processing.
"""
- def __init__(self, reader_suppliers: List[Callable]):
+ def __init__(self, reader_suppliers: List[Callable], batch_size: int =
4096):
self.reader_suppliers = reader_suppliers
self.merged_batch: Optional[RecordBatch] = None
- self.batch_created = False
+ self.reader = None
+ self._batch_size = batch_size
def read_arrow_batch(self) -> Optional[RecordBatch]:
- if self.batch_created:
- return None
+ if self.reader:
+ return self.reader.read_next_batch()
all_batches = []
@@ -140,10 +142,10 @@ class MergeAllBatchReader(RecordBatchReader):
)
else:
self.merged_batch = None
-
- self.batch_created = True
- return self.merged_batch
+ dataset = ds.InMemoryDataset(self.merged_batch)
+ self.reader = dataset.scanner(batch_size=self._batch_size).to_reader()
+ return self.reader.read_next_batch()
def close(self) -> None:
self.merged_batch = None
- self.batch_created = False
+ self.reader = None
diff --git a/paimon-python/pypaimon/read/reader/format_blob_reader.py
b/paimon-python/pypaimon/read/reader/format_blob_reader.py
index b993dc9221..5e0affe7d8 100644
--- a/paimon-python/pypaimon/read/reader/format_blob_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_blob_reader.py
@@ -34,12 +34,13 @@ from pypaimon.table.row.row_kind import RowKind
class FormatBlobReader(RecordBatchReader):
def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str],
- full_fields: List[DataField], push_down_predicate: Any,
blob_as_descriptor: bool):
+ full_fields: List[DataField], push_down_predicate: Any,
blob_as_descriptor: bool,
+ batch_size: int = 4096):
self._file_io = file_io
self._file_path = file_path
self._push_down_predicate = push_down_predicate
self._blob_as_descriptor = blob_as_descriptor
-
+ self._batch_size = batch_size
# Get file size
self._file_size = file_io.get_file_size(file_path)
@@ -92,6 +93,8 @@ class FormatBlobReader(RecordBatchReader):
pydict_data[field_name].append(blob_data)
records_in_batch += 1
+ if records_in_batch >= self._batch_size:
+ break
except StopIteration:
# Stop immediately when StopIteration occurs
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py
b/paimon-python/pypaimon/tests/blob_table_test.py
index 4dd96dbf26..760fb1b12d 100644
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -1163,6 +1163,116 @@ class DataBlobWriterTest(unittest.TestCase):
print(
f"✅ Large blob end-to-end test passed: wrote and read back
{len(blob_data)} large blob records correctly") # noqa: E501
+ def test_blob_write_read_large_data_volume(self):
+ from pypaimon import Schema
+
+ # Create schema with blob column
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('batch_id', pa.int32()),
+ ('metadata', pa.string()),
+ ('large_blob', pa.large_binary()),
+ ])
+
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true'
+ }
+ )
+ self.catalog.create_table('test_db.blob_large_data_volume', schema,
False)
+ table = self.catalog.get_table('test_db.blob_large_data_volume')
+
+ large_blob_size = 5 * 1024
+ blob_pattern = b'LARGE_BLOB_PATTERN_' + b'X' * 1024 # ~1KB pattern
+ pattern_size = len(blob_pattern)
+ repetitions = large_blob_size // pattern_size
+ large_blob_data = blob_pattern * repetitions
+
+ num_row = 20000
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ expected = pa.Table.from_pydict({
+ 'id': [1] * num_row,
+ 'batch_id': [11] * num_row,
+ 'metadata': [f'Large blob batch {11}'] * num_row,
+ 'large_blob': [i.to_bytes(2, byteorder='little') + large_blob_data
for i in range(num_row)]
+ }, schema=pa_schema)
+ writer.write_arrow(expected)
+
+ # Commit all data at once
+ commit_messages = writer.prepare_commit()
+ commit = write_builder.new_commit()
+ commit.commit(commit_messages)
+ writer.close()
+
+ # Read data back
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ result = table_read.to_arrow(table_scan.plan().splits())
+
+ # Verify the data
+ self.assertEqual(result.num_rows, num_row)
+ self.assertEqual(result.num_columns, 4)
+
+ self.assertEqual(expected, result)
+
+ def test_blob_write_read_large_data_volume_rolling(self):
+ from pypaimon import Schema
+
+ # Create schema with blob column
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('batch_id', pa.int32()),
+ ('metadata', pa.string()),
+ ('large_blob', pa.large_binary()),
+ ])
+
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'blob.target-file-size': '21MB'
+ }
+ )
+ self.catalog.create_table('test_db.large_data_volume_rolling', schema,
False)
+ table = self.catalog.get_table('test_db.large_data_volume_rolling')
+
+ # Create large blob data
+ large_blob_size = 5 * 1024 #
+ blob_pattern = b'LARGE_BLOB_PATTERN_' + b'X' * 1024 # ~1KB pattern
+ pattern_size = len(blob_pattern)
+ repetitions = large_blob_size // pattern_size
+ large_blob_data = blob_pattern * repetitions
+
+ actual_size = len(large_blob_data)
+ print(f"Created blob data: {actual_size:,} bytes ({actual_size / (1024
* 1024):.2f} MB)")
+ num_row = 20000
+ expected = pa.Table.from_pydict({
+ 'id': [1] * num_row,
+ 'batch_id': [11] * num_row,
+ 'metadata': [f'Large blob batch {11}'] * num_row,
+ 'large_blob': [i.to_bytes(2, byteorder='little') + large_blob_data
for i in range(num_row)]
+ }, schema=pa_schema)
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.write_arrow(expected)
+
+ commit_messages = writer.prepare_commit()
+ commit = write_builder.new_commit()
+ commit.commit(commit_messages)
+ writer.close()
+
+ # Read data back
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ result = table_read.to_arrow(table_scan.plan().splits())
+ self.assertEqual(expected, result)
+
def test_blob_write_read_mixed_sizes_end_to_end(self):
"""Test end-to-end blob functionality with mixed blob sizes."""
from pypaimon import Schema
@@ -2106,6 +2216,72 @@ class DataBlobWriterTest(unittest.TestCase):
self.assertEqual(actual.column('id').to_pylist(), list(range(1, 161)),
"ID column should match")
self.assertEqual(actual, expected)
+ def test_blob_large_data_volume_with_shard(self):
+ from pypaimon import Schema
+
+ # Create schema with blob column
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('batch_id', pa.int32()),
+ ('metadata', pa.string()),
+ ('large_blob', pa.large_binary()),
+ ])
+
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true'
+ }
+ )
+ self.catalog.create_table('test_db.blob_large_data_volume_with_shard',
schema, False)
+ table =
self.catalog.get_table('test_db.blob_large_data_volume_with_shard')
+
+ large_blob_size = 5 * 1024
+ blob_pattern = b'LARGE_BLOB_PATTERN_' + b'X' * 1024 # ~1KB pattern
+ pattern_size = len(blob_pattern)
+ repetitions = large_blob_size // pattern_size
+ large_blob_data = blob_pattern * repetitions
+
+ num_row = 20000
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ expected = pa.Table.from_pydict({
+ 'id': [1] * num_row,
+ 'batch_id': [11] * num_row,
+ 'metadata': [f'Large blob batch {11}'] * num_row,
+ 'large_blob': [i.to_bytes(2, byteorder='little') + large_blob_data
for i in range(num_row)]
+ }, schema=pa_schema)
+ writer.write_arrow(expected)
+
+ # Commit all data at once
+ commit_messages = writer.prepare_commit()
+ commit = write_builder.new_commit()
+ commit.commit(commit_messages)
+ writer.close()
+
+ # Read data back
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan().with_shard(2, 3)
+ table_read = read_builder.new_read()
+ result = table_read.to_arrow(table_scan.plan().splits())
+
+ # Verify the data
+ self.assertEqual(6666, result.num_rows)
+ self.assertEqual(4, result.num_columns)
+
+ self.assertEqual(expected.slice(13334, 6666), result)
+ splits = read_builder.new_scan().plan().splits()
+ expected = table_read.to_arrow(splits)
+ splits1 = read_builder.new_scan().with_shard(0, 3).plan().splits()
+ actual1 = table_read.to_arrow(splits1)
+ splits2 = read_builder.new_scan().with_shard(1, 3).plan().splits()
+ actual2 = table_read.to_arrow(splits2)
+ splits3 = read_builder.new_scan().with_shard(2, 3).plan().splits()
+ actual3 = table_read.to_arrow(splits3)
+ actual = pa.concat_tables([actual1, actual2, actual3]).sort_by('id')
+ self.assertEqual(actual, expected)
+
def test_data_blob_writer_with_shard(self):
"""Test DataBlobWriter with mixed data types in blob column."""
from pypaimon import Schema
@@ -2168,6 +2344,73 @@ class DataBlobWriterTest(unittest.TestCase):
self.assertEqual(result.num_rows, 2, "Should have 2 rows")
self.assertEqual(result.num_columns, 3, "Should have 3 columns")
+ def test_blob_write_read_large_data_volume_rolling_with_shard(self):
+ from pypaimon import Schema
+
+ # Create schema with blob column
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('batch_id', pa.int32()),
+ ('metadata', pa.string()),
+ ('large_blob', pa.large_binary()),
+ ])
+
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'blob.target-file-size': '10MB'
+ }
+ )
+
self.catalog.create_table('test_db.test_blob_write_read_large_data_volume_rolling_with_shard',
schema, False)
+ table =
self.catalog.get_table('test_db.test_blob_write_read_large_data_volume_rolling_with_shard')
+
+ # Create large blob data
+ large_blob_size = 5 * 1024 #
+ blob_pattern = b'LARGE_BLOB_PATTERN_' + b'X' * 1024 # ~1KB pattern
+ pattern_size = len(blob_pattern)
+ repetitions = large_blob_size // pattern_size
+ large_blob_data = blob_pattern * repetitions
+
+ actual_size = len(large_blob_data)
+ print(f"Created blob data: {actual_size:,} bytes ({actual_size / (1024
* 1024):.2f} MB)")
+ # Write 40 batches of data
+ num_row = 10000
+ expected = pa.Table.from_pydict({
+ 'id': [1] * num_row,
+ 'batch_id': [11] * num_row,
+ 'metadata': [f'Large blob batch {11}'] * num_row,
+ 'large_blob': [i.to_bytes(2, byteorder='little') + large_blob_data
for i in range(num_row)]
+ }, schema=pa_schema)
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.write_arrow(expected)
+
+ commit_messages = writer.prepare_commit()
+ commit = write_builder.new_commit()
+ commit.commit(commit_messages)
+ writer.close()
+
+ # Read data back
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ result = table_read.to_arrow(table_scan.plan().splits())
+ self.assertEqual(expected, result)
+
+ splits = read_builder.new_scan().plan().splits()
+ expected = table_read.to_arrow(splits)
+ splits1 = read_builder.new_scan().with_shard(0, 3).plan().splits()
+ actual1 = table_read.to_arrow(splits1)
+ splits2 = read_builder.new_scan().with_shard(1, 3).plan().splits()
+ actual2 = table_read.to_arrow(splits2)
+ splits3 = read_builder.new_scan().with_shard(2, 3).plan().splits()
+ actual3 = table_read.to_arrow(splits3)
+ actual = pa.concat_tables([actual1, actual2, actual3]).sort_by('id')
+
+ self.assertEqual(actual, expected)
+
if __name__ == '__main__':
unittest.main()