This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4955ef0dcd [python] support read.batch-size and fix default value 
(#7051)
4955ef0dcd is described below

commit 4955ef0dcdaec969d7630ed6884c638ce96b6d0a
Author: XiaoHongbo <[email protected]>
AuthorDate: Thu Jan 15 15:21:49 2026 +0800

    [python] support read.batch-size and fix default value (#7051)
---
 .../pypaimon/common/options/core_options.py        | 10 +++
 .../pypaimon/read/reader/concat_batch_reader.py    |  2 +-
 .../pypaimon/read/reader/format_avro_reader.py     |  2 +-
 .../pypaimon/read/reader/format_blob_reader.py     |  2 +-
 .../pypaimon/read/reader/format_lance_reader.py    |  2 +-
 .../pypaimon/read/reader/format_pyarrow_reader.py  |  2 +-
 paimon-python/pypaimon/read/split_read.py          | 16 +++--
 paimon-python/pypaimon/tests/reader_base_test.py   | 71 ++++++++++++++++++++++
 8 files changed, 96 insertions(+), 11 deletions(-)

diff --git a/paimon-python/pypaimon/common/options/core_options.py 
b/paimon-python/pypaimon/common/options/core_options.py
index be64278e76..43c7de8ff3 100644
--- a/paimon-python/pypaimon/common/options/core_options.py
+++ b/paimon-python/pypaimon/common/options/core_options.py
@@ -411,6 +411,13 @@ class CoreOptions:
         .with_description("Whether to L2 normalize vectors for cosine 
similarity.")
     )
 
+    READ_BATCH_SIZE: ConfigOption[int] = (
+        ConfigOptions.key("read.batch-size")
+        .int_type()
+        .default_value(1024)
+        .with_description("Read batch size for any file format if it 
supports.")
+    )
+
     def __init__(self, options: Options):
         self.options = options
 
@@ -586,3 +593,6 @@ class CoreOptions:
 
     def vector_normalize(self, default=None):
         return self.options.get(CoreOptions.VECTOR_NORMALIZE, default)
+
+    def read_batch_size(self, default=None) -> int:
+        return self.options.get(CoreOptions.READ_BATCH_SIZE, default or 1024)
diff --git a/paimon-python/pypaimon/read/reader/concat_batch_reader.py 
b/paimon-python/pypaimon/read/reader/concat_batch_reader.py
index 3ce9db6f5e..b15a37f79f 100644
--- a/paimon-python/pypaimon/read/reader/concat_batch_reader.py
+++ b/paimon-python/pypaimon/read/reader/concat_batch_reader.py
@@ -60,7 +60,7 @@ class MergeAllBatchReader(RecordBatchReader):
     into a single batch for processing.
     """
 
-    def __init__(self, reader_suppliers: List[Callable], batch_size: int = 
4096):
+    def __init__(self, reader_suppliers: List[Callable], batch_size: int = 
1024):
         self.reader_suppliers = reader_suppliers
         self.merged_batch: Optional[RecordBatch] = None
         self.reader = None
diff --git a/paimon-python/pypaimon/read/reader/format_avro_reader.py 
b/paimon-python/pypaimon/read/reader/format_avro_reader.py
index 4114d8e93b..5dd2738944 100644
--- a/paimon-python/pypaimon/read/reader/format_avro_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_avro_reader.py
@@ -35,7 +35,7 @@ class FormatAvroReader(RecordBatchReader):
     """
 
     def __init__(self, file_io: FileIO, file_path: str, read_fields: 
List[str], full_fields: List[DataField],
-                 push_down_predicate: Any, batch_size: int = 4096):
+                 push_down_predicate: Any, batch_size: int = 1024):
         file_path_for_io = file_io.to_filesystem_path(file_path)
         self._file = file_io.filesystem.open_input_file(file_path_for_io)
         self._avro_reader = fastavro.reader(self._file)
diff --git a/paimon-python/pypaimon/read/reader/format_blob_reader.py 
b/paimon-python/pypaimon/read/reader/format_blob_reader.py
index ecd740de4d..81bcae8a74 100644
--- a/paimon-python/pypaimon/read/reader/format_blob_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_blob_reader.py
@@ -35,7 +35,7 @@ class FormatBlobReader(RecordBatchReader):
 
     def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str],
                  full_fields: List[DataField], push_down_predicate: Any, 
blob_as_descriptor: bool,
-                 batch_size: int = 4096):
+                 batch_size: int = 1024):
         self._file_io = file_io
         self._file_path = file_path
         self._push_down_predicate = push_down_predicate
diff --git a/paimon-python/pypaimon/read/reader/format_lance_reader.py 
b/paimon-python/pypaimon/read/reader/format_lance_reader.py
index a6b3277167..4be30a6f5d 100644
--- a/paimon-python/pypaimon/read/reader/format_lance_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_lance_reader.py
@@ -33,7 +33,7 @@ class FormatLanceReader(RecordBatchReader):
     """
 
     def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str],
-                 push_down_predicate: Any, batch_size: int = 4096):
+                 push_down_predicate: Any, batch_size: int = 1024):
         """Initialize Lance reader."""
         import lance
 
diff --git a/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py 
b/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
index ed560d14a4..699ff48477 100644
--- a/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
@@ -33,7 +33,7 @@ class FormatPyArrowReader(RecordBatchReader):
     """
 
     def __init__(self, file_io: FileIO, file_format: str, file_path: str, 
read_fields: List[str],
-                 push_down_predicate: Any, batch_size: int = 4096):
+                 push_down_predicate: Any, batch_size: int = 1024):
         file_path_for_pyarrow = file_io.to_filesystem_path(file_path)
         self.dataset = ds.dataset(file_path_for_pyarrow, format=file_format, 
filesystem=file_io.filesystem)
         self.read_fields = read_fields
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index 47edf63d9a..eab279cf9c 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -110,20 +110,23 @@ class SplitRead(ABC):
         _, extension = os.path.splitext(file_path)
         file_format = extension[1:]
 
+        batch_size = self.table.options.read_batch_size()
+
         format_reader: RecordBatchReader
         if file_format == CoreOptions.FILE_FORMAT_AVRO:
             format_reader = FormatAvroReader(self.table.file_io, file_path, 
read_file_fields,
-                                             self.read_fields, 
read_arrow_predicate)
+                                             self.read_fields, 
read_arrow_predicate, batch_size=batch_size)
         elif file_format == CoreOptions.FILE_FORMAT_BLOB:
             blob_as_descriptor = 
CoreOptions.blob_as_descriptor(self.table.options)
             format_reader = FormatBlobReader(self.table.file_io, file_path, 
read_file_fields,
-                                             self.read_fields, 
read_arrow_predicate, blob_as_descriptor)
+                                             self.read_fields, 
read_arrow_predicate, blob_as_descriptor,
+                                             batch_size=batch_size)
         elif file_format == CoreOptions.FILE_FORMAT_LANCE:
             format_reader = FormatLanceReader(self.table.file_io, file_path, 
read_file_fields,
-                                              read_arrow_predicate)
+                                              read_arrow_predicate, 
batch_size=batch_size)
         elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format == 
CoreOptions.FILE_FORMAT_ORC:
             format_reader = FormatPyArrowReader(self.table.file_io, 
file_format, file_path,
-                                                read_file_fields, 
read_arrow_predicate)
+                                                read_file_fields, 
read_arrow_predicate, batch_size=batch_size)
         else:
             raise ValueError(f"Unexpected file format: {file_format}")
 
@@ -546,19 +549,20 @@ class DataEvolutionSplitRead(SplitRead):
                 read_field_names = self._remove_partition_fields(read_fields)
                 table_fields = self.read_fields
                 self.read_fields = read_fields  # create reader based on 
read_fields
+                batch_size = self.table.options.read_batch_size()
                 # Create reader for this bunch
                 if len(bunch.files()) == 1:
                     suppliers = [lambda r=self._create_file_reader(
                         bunch.files()[0], read_field_names
                     ): r]
-                    file_record_readers[i] = MergeAllBatchReader(suppliers)
+                    file_record_readers[i] = MergeAllBatchReader(suppliers, 
batch_size=batch_size)
                 else:
                     # Create concatenated reader for multiple files
                     suppliers = [
                         partial(self._create_file_reader, file=file,
                                 read_fields=read_field_names) for file in 
bunch.files()
                     ]
-                    file_record_readers[i] = MergeAllBatchReader(suppliers)
+                    file_record_readers[i] = MergeAllBatchReader(suppliers, 
batch_size=batch_size)
                 self.read_fields = table_fields
 
         # Validate that all required fields are found
diff --git a/paimon-python/pypaimon/tests/reader_base_test.py 
b/paimon-python/pypaimon/tests/reader_base_test.py
index a7b2abd516..3d9ed7f874 100644
--- a/paimon-python/pypaimon/tests/reader_base_test.py
+++ b/paimon-python/pypaimon/tests/reader_base_test.py
@@ -1320,3 +1320,74 @@ class ReaderBasicTest(unittest.TestCase):
 
         # Verify the error message contains the expected text
         self.assertIn("Table Type", str(context.exception))
+
+    def test_read_batch_size_config(self):
+        from pypaimon.common.options.core_options import CoreOptions
+        from pypaimon.common.options import Options
+
+        options = Options({})
+        core_options = CoreOptions(options)
+        self.assertEqual(core_options.read_batch_size(), 1024,
+                         "Default read_batch_size should be 1024")
+
+        options = Options({CoreOptions.READ_BATCH_SIZE.key(): '512'})
+        core_options = CoreOptions(options)
+        self.assertEqual(core_options.read_batch_size(), 512,
+                         "read_batch_size should read from options")
+
+        pa_schema = pa.schema([
+            ('id', pa.int64()),
+            ('value', pa.string())
+        ])
+
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={CoreOptions.READ_BATCH_SIZE.key(): '10'}
+        )
+        self.catalog.create_table('default.test_read_batch_size', schema, 
False)
+        table = self.catalog.get_table('default.test_read_batch_size')
+
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        data = pa.Table.from_pydict({
+            'id': list(range(50)),
+            'value': [f'value_{i}' for i in range(50)]
+        }, schema=pa_schema)
+        table_write.write_arrow(data)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        self.assertEqual(table.options.read_batch_size(), 10,
+                         "Table should have read_batch_size=10 from options")
+
+        read_builder = table.new_read_builder()
+        table_read = read_builder.new_read()
+        splits = read_builder.new_scan().plan().splits()
+        
+        if splits:
+            # Use _create_split_read to create reader
+            split_read = table_read._create_split_read(splits[0])
+            reader = split_read.create_reader()
+            batch_count = 0
+            total_rows = 0
+            max_batch_size = 0
+            
+            try:
+                while True:
+                    batch = reader.read_arrow_batch()
+                    if batch is None:
+                        break
+                    batch_count += 1
+                    batch_rows = batch.num_rows
+                    total_rows += batch_rows
+                    max_batch_size = max(max_batch_size, batch_rows)
+            finally:
+                reader.close()
+            
+            self.assertGreater(batch_count, 1,
+                               f"With batch_size=10, should get multiple 
batches, got {batch_count}")
+            self.assertEqual(total_rows, 50, "Should read all 50 rows")
+            self.assertLessEqual(max_batch_size, 20,
+                                 f"Max batch size should be close to 
configured 10, got {max_batch_size}")

Reply via email to