This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9aec7dd9fa [python] fix data evolution merge batch size mismatch issue 
(#7205)
9aec7dd9fa is described below

commit 9aec7dd9fa035c4ba5d5c378013eb5d7489f4d66
Author: XiaoHongbo <[email protected]>
AuthorDate: Wed Feb 4 16:28:37 2026 +0800

    [python] fix data evolution merge batch size mismatch issue (#7205)
---
 .../pypaimon/read/reader/concat_batch_reader.py    | 59 ++++++++++++++----
 .../pypaimon/tests/data_evolution_test.py          | 70 ++++++++++++++++++++++
 2 files changed, 117 insertions(+), 12 deletions(-)

diff --git a/paimon-python/pypaimon/read/reader/concat_batch_reader.py 
b/paimon-python/pypaimon/read/reader/concat_batch_reader.py
index 7bf884a88e..4318f883eb 100644
--- a/paimon-python/pypaimon/read/reader/concat_batch_reader.py
+++ b/paimon-python/pypaimon/read/reader/concat_batch_reader.py
@@ -25,6 +25,8 @@ from pyarrow import RecordBatch
 
 from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
 
+_MIN_BATCH_SIZE_TO_REFILL = 1024
+
 
 class ConcatBatchReader(RecordBatchReader):
 
@@ -162,30 +164,63 @@ class DataEvolutionMergeReader(RecordBatchReader):
         self.field_offsets = field_offsets
         self.readers = readers
         self.schema = schema
+        self._buffers: List[Optional[RecordBatch]] = [None] * len(readers)
 
     def read_arrow_batch(self) -> Optional[RecordBatch]:
         batches: List[Optional[RecordBatch]] = [None] * len(self.readers)
         for i, reader in enumerate(self.readers):
             if reader is not None:
-                batch = reader.read_arrow_batch()
-                if batch is None:
-                    # all readers are aligned, as long as one returns null, 
the others will also have no data
-                    return None
-                batches[i] = batch
-        # Assemble record batches from batches based on row_offsets and 
field_offsets
+                if self._buffers[i] is not None:
+                    remainder = self._buffers[i]
+                    self._buffers[i] = None
+                    if remainder.num_rows >= _MIN_BATCH_SIZE_TO_REFILL:
+                        batches[i] = remainder
+                    else:
+                        new_batch = reader.read_arrow_batch()
+                        if new_batch is not None and new_batch.num_rows > 0:
+                            combined_arrays = [
+                                pa.concat_arrays([remainder.column(j), 
new_batch.column(j)])
+                                for j in range(remainder.num_columns)
+                            ]
+                            batches[i] = pa.RecordBatch.from_arrays(
+                                combined_arrays, schema=remainder.schema
+                            )
+                        else:
+                            batches[i] = remainder
+                else:
+                    batch = reader.read_arrow_batch()
+                    if batch is None:
+                        batches[i] = None
+                    else:
+                        batches[i] = batch
+            else:
+                batches[i] = None
+
+        if not any(b is not None for b in batches):
+            return None
+
+        min_rows = min(b.num_rows for b in batches if b is not None)
+        if min_rows == 0:
+            return None
+
         columns = []
         for i in range(len(self.row_offsets)):
             batch_index = self.row_offsets[i]
             field_index = self.field_offsets[i]
-            if batches[batch_index] is not None:
-                column = batches[batch_index].column(field_index)
-                columns.append(column)
-        if columns:
-            return pa.RecordBatch.from_arrays(columns, schema=self.schema)
-        return None
+            if batch_index >= 0 and batches[batch_index] is not None:
+                
columns.append(batches[batch_index].column(field_index).slice(0, min_rows))
+            else:
+                columns.append(pa.nulls(min_rows, 
type=self.schema.field(i).type))
+
+        for i in range(len(self.readers)):
+            if batches[i] is not None and batches[i].num_rows > min_rows:
+                self._buffers[i] = batches[i].slice(min_rows, 
batches[i].num_rows - min_rows)
+
+        return pa.RecordBatch.from_arrays(columns, schema=self.schema)
 
     def close(self) -> None:
         try:
+            self._buffers = [None] * len(self.readers)
             for reader in self.readers:
                 if reader is not None:
                     reader.close()
diff --git a/paimon-python/pypaimon/tests/data_evolution_test.py 
b/paimon-python/pypaimon/tests/data_evolution_test.py
index 92558e8742..f92034f367 100644
--- a/paimon-python/pypaimon/tests/data_evolution_test.py
+++ b/paimon-python/pypaimon/tests/data_evolution_test.py
@@ -94,6 +94,76 @@ class DataEvolutionTest(unittest.TestCase):
         self.assertEqual(0, manifest.min_row_id)
         self.assertEqual(1, manifest.max_row_id)
 
+    def test_merge_reader(self):
+        from pypaimon.read.reader.concat_batch_reader import 
MergeAllBatchReader
+
+        simple_pa_schema = pa.schema([
+            ('f0', pa.int32()),
+            ('f1', pa.string()),
+            ('f2', pa.string()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            simple_pa_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true',
+                'read.batch-size': '4096',
+            },
+        )
+        self.catalog.create_table('default.test_merge_reader_batch_sizes', 
schema, False)
+        table = self.catalog.get_table('default.test_merge_reader_batch_sizes')
+
+        write_builder = table.new_batch_write_builder()
+        size = 5000
+        w0 = write_builder.new_write().with_write_type(['f0', 'f1'])
+        w1 = write_builder.new_write().with_write_type(['f2'])
+        c = write_builder.new_commit()
+        d0 = pa.Table.from_pydict(
+            {'f0': list(range(size)), 'f1': [f'a{i}' for i in range(size)]},
+            schema=pa.schema([('f0', pa.int32()), ('f1', pa.string())]),
+        )
+        d1 = pa.Table.from_pydict(
+            {'f2': [f'b{i}' for i in range(size)]},
+            schema=pa.schema([('f2', pa.string())]),
+        )
+        w0.write_arrow(d0)
+        w1.write_arrow(d1)
+        cmts = w0.prepare_commit() + w1.prepare_commit()
+        for msg in cmts:
+            for nf in msg.new_files:
+                nf.first_row_id = 0
+        c.commit(cmts)
+        w0.close()
+        w1.close()
+        c.close()
+
+        original_merge_all = MergeAllBatchReader
+        call_count = [0]
+
+        def patched_merge_all(reader_suppliers, batch_size=1024):
+            call_count[0] += 1
+            if call_count[0] == 2:
+                batch_size = 999
+            return original_merge_all(reader_suppliers, batch_size=batch_size)
+
+        import pypaimon.read.split_read as split_read_module
+        split_read_module.MergeAllBatchReader = patched_merge_all
+        try:
+            read_builder = table.new_read_builder()
+            table_scan = read_builder.new_scan()
+            table_read = read_builder.new_read()
+            splits = table_scan.plan().splits()
+            actual_data = table_read.to_arrow(splits)
+            expect_data = pa.Table.from_pydict({
+                'f0': list(range(size)),
+                'f1': [f'a{i}' for i in range(size)],
+                'f2': [f'b{i}' for i in range(size)],
+            }, schema=simple_pa_schema)
+            self.assertEqual(actual_data.num_rows, size)
+            self.assertEqual(actual_data, expect_data)
+        finally:
+            split_read_module.MergeAllBatchReader = original_merge_all
+
     def test_with_slice(self):
         pa_schema = pa.schema([
             ("id", pa.int64()),

Reply via email to