This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9aec7dd9fa [python] fix data evolution merge batch size mismatch issue
(#7205)
9aec7dd9fa is described below
commit 9aec7dd9fa035c4ba5d5c378013eb5d7489f4d66
Author: XiaoHongbo <[email protected]>
AuthorDate: Wed Feb 4 16:28:37 2026 +0800
[python] fix data evolution merge batch size mismatch issue (#7205)
---
.../pypaimon/read/reader/concat_batch_reader.py | 59 ++++++++++++++----
.../pypaimon/tests/data_evolution_test.py | 70 ++++++++++++++++++++++
2 files changed, 117 insertions(+), 12 deletions(-)
diff --git a/paimon-python/pypaimon/read/reader/concat_batch_reader.py
b/paimon-python/pypaimon/read/reader/concat_batch_reader.py
index 7bf884a88e..4318f883eb 100644
--- a/paimon-python/pypaimon/read/reader/concat_batch_reader.py
+++ b/paimon-python/pypaimon/read/reader/concat_batch_reader.py
@@ -25,6 +25,8 @@ from pyarrow import RecordBatch
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
+_MIN_BATCH_SIZE_TO_REFILL = 1024
+
class ConcatBatchReader(RecordBatchReader):
@@ -162,30 +164,63 @@ class DataEvolutionMergeReader(RecordBatchReader):
self.field_offsets = field_offsets
self.readers = readers
self.schema = schema
+ self._buffers: List[Optional[RecordBatch]] = [None] * len(readers)
def read_arrow_batch(self) -> Optional[RecordBatch]:
batches: List[Optional[RecordBatch]] = [None] * len(self.readers)
for i, reader in enumerate(self.readers):
if reader is not None:
- batch = reader.read_arrow_batch()
- if batch is None:
- # all readers are aligned, as long as one returns null,
the others will also have no data
- return None
- batches[i] = batch
- # Assemble record batches from batches based on row_offsets and
field_offsets
+ if self._buffers[i] is not None:
+ remainder = self._buffers[i]
+ self._buffers[i] = None
+ if remainder.num_rows >= _MIN_BATCH_SIZE_TO_REFILL:
+ batches[i] = remainder
+ else:
+ new_batch = reader.read_arrow_batch()
+ if new_batch is not None and new_batch.num_rows > 0:
+ combined_arrays = [
+ pa.concat_arrays([remainder.column(j),
new_batch.column(j)])
+ for j in range(remainder.num_columns)
+ ]
+ batches[i] = pa.RecordBatch.from_arrays(
+ combined_arrays, schema=remainder.schema
+ )
+ else:
+ batches[i] = remainder
+ else:
+ batch = reader.read_arrow_batch()
+ if batch is None:
+ batches[i] = None
+ else:
+ batches[i] = batch
+ else:
+ batches[i] = None
+
+ if not any(b is not None for b in batches):
+ return None
+
+ min_rows = min(b.num_rows for b in batches if b is not None)
+ if min_rows == 0:
+ return None
+
columns = []
for i in range(len(self.row_offsets)):
batch_index = self.row_offsets[i]
field_index = self.field_offsets[i]
- if batches[batch_index] is not None:
- column = batches[batch_index].column(field_index)
- columns.append(column)
- if columns:
- return pa.RecordBatch.from_arrays(columns, schema=self.schema)
- return None
+ if batch_index >= 0 and batches[batch_index] is not None:
+
columns.append(batches[batch_index].column(field_index).slice(0, min_rows))
+ else:
+ columns.append(pa.nulls(min_rows,
type=self.schema.field(i).type))
+
+ for i in range(len(self.readers)):
+ if batches[i] is not None and batches[i].num_rows > min_rows:
+ self._buffers[i] = batches[i].slice(min_rows,
batches[i].num_rows - min_rows)
+
+ return pa.RecordBatch.from_arrays(columns, schema=self.schema)
def close(self) -> None:
try:
+ self._buffers = [None] * len(self.readers)
for reader in self.readers:
if reader is not None:
reader.close()
diff --git a/paimon-python/pypaimon/tests/data_evolution_test.py
b/paimon-python/pypaimon/tests/data_evolution_test.py
index 92558e8742..f92034f367 100644
--- a/paimon-python/pypaimon/tests/data_evolution_test.py
+++ b/paimon-python/pypaimon/tests/data_evolution_test.py
@@ -94,6 +94,76 @@ class DataEvolutionTest(unittest.TestCase):
self.assertEqual(0, manifest.min_row_id)
self.assertEqual(1, manifest.max_row_id)
+ def test_merge_reader(self):
+ from pypaimon.read.reader.concat_batch_reader import
MergeAllBatchReader
+
+ simple_pa_schema = pa.schema([
+ ('f0', pa.int32()),
+ ('f1', pa.string()),
+ ('f2', pa.string()),
+ ])
+ schema = Schema.from_pyarrow_schema(
+ simple_pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'read.batch-size': '4096',
+ },
+ )
+ self.catalog.create_table('default.test_merge_reader_batch_sizes',
schema, False)
+ table = self.catalog.get_table('default.test_merge_reader_batch_sizes')
+
+ write_builder = table.new_batch_write_builder()
+ size = 5000
+ w0 = write_builder.new_write().with_write_type(['f0', 'f1'])
+ w1 = write_builder.new_write().with_write_type(['f2'])
+ c = write_builder.new_commit()
+ d0 = pa.Table.from_pydict(
+ {'f0': list(range(size)), 'f1': [f'a{i}' for i in range(size)]},
+ schema=pa.schema([('f0', pa.int32()), ('f1', pa.string())]),
+ )
+ d1 = pa.Table.from_pydict(
+ {'f2': [f'b{i}' for i in range(size)]},
+ schema=pa.schema([('f2', pa.string())]),
+ )
+ w0.write_arrow(d0)
+ w1.write_arrow(d1)
+ cmts = w0.prepare_commit() + w1.prepare_commit()
+ for msg in cmts:
+ for nf in msg.new_files:
+ nf.first_row_id = 0
+ c.commit(cmts)
+ w0.close()
+ w1.close()
+ c.close()
+
+ original_merge_all = MergeAllBatchReader
+ call_count = [0]
+
+ def patched_merge_all(reader_suppliers, batch_size=1024):
+ call_count[0] += 1
+ if call_count[0] == 2:
+ batch_size = 999
+ return original_merge_all(reader_suppliers, batch_size=batch_size)
+
+ import pypaimon.read.split_read as split_read_module
+ split_read_module.MergeAllBatchReader = patched_merge_all
+ try:
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ splits = table_scan.plan().splits()
+ actual_data = table_read.to_arrow(splits)
+ expect_data = pa.Table.from_pydict({
+ 'f0': list(range(size)),
+ 'f1': [f'a{i}' for i in range(size)],
+ 'f2': [f'b{i}' for i in range(size)],
+ }, schema=simple_pa_schema)
+ self.assertEqual(actual_data.num_rows, size)
+ self.assertEqual(actual_data, expect_data)
+ finally:
+ split_read_module.MergeAllBatchReader = original_merge_all
+
def test_with_slice(self):
pa_schema = pa.schema([
("id", pa.int64()),