This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 51b02f18e8 [Parquet] perf: preallocate capacity for ArrayReaderBuilder 
(#9093)
51b02f18e8 is described below

commit 51b02f18e8325b279ee9d7032b294956d49b60c5
Author: Lanqing Yang <[email protected]>
AuthorDate: Thu Apr 16 23:50:40 2026 -0700

    [Parquet] perf: preallocate capacity for ArrayReaderBuilder (#9093)
    
    # Which issue does this PR close?
    
    
    
    - Closes #9059.
    
    # Rationale for this change
    
    reduce allocation cost mentioned in
    https://github.com/apache/arrow-rs/issues/9059 from experiment:
    Pre-allocation overhead may offset the savings from avoiding incremental
    growth
    
    # What changes are included in this PR?
    - add with_capacity method to ValuesBuffer trait, and remove defaults to
    enforce the capacity hint is required for ArrayReaderBuilder.
    
    - The capacity hint will be passed down to GenericRecordReader to
    preallocate the buffer.
    
    # Are there any user-facing changes?
    
    yes ArrayReaders needs an extra capacity variable to indicate the
    preferred batch size and we will provision buffer with this capacity.
    
    ---------
    
    Co-authored-by: Claude Opus 4.6 (1M context) <[email protected]>
    Co-authored-by: Andrew Lamb <[email protected]>
---
 parquet/benches/arrow_reader.rs                    | 92 +++++++++++++++++-----
 parquet/src/arrow/array_reader/builder.rs          | 58 +++++++++++---
 parquet/src/arrow/array_reader/byte_array.rs       | 48 ++++++-----
 .../arrow/array_reader/byte_array_dictionary.rs    | 23 +++---
 parquet/src/arrow/array_reader/byte_view_array.rs  | 54 ++++++++-----
 .../src/arrow/array_reader/fixed_len_byte_array.rs | 18 ++++-
 parquet/src/arrow/array_reader/list_array.rs       |  2 +
 parquet/src/arrow/array_reader/null_array.rs       | 10 ++-
 parquet/src/arrow/array_reader/primitive_array.rs  | 58 ++++++++++----
 parquet/src/arrow/arrow_reader/mod.rs              | 13 ++-
 parquet/src/arrow/buffer/dictionary_buffer.rs      | 26 +++---
 parquet/src/arrow/buffer/offset_buffer.rs          | 40 ++++++----
 parquet/src/arrow/buffer/view_buffer.rs            | 43 +++++-----
 .../src/arrow/push_decoder/reader_builder/mod.rs   |  2 +
 parquet/src/arrow/record_reader/buffer.rs          | 12 ++-
 parquet/src/arrow/record_reader/mod.rs             | 49 +++++++++---
 16 files changed, 381 insertions(+), 167 deletions(-)

diff --git a/parquet/benches/arrow_reader.rs b/parquet/benches/arrow_reader.rs
index 14fa16b353..9325a1faf5 100644
--- a/parquet/benches/arrow_reader.rs
+++ b/parquet/benches/arrow_reader.rs
@@ -27,6 +27,7 @@ use parquet::arrow::array_reader::{
     ListArrayReader, make_byte_array_reader, make_byte_view_array_reader,
     make_fixed_len_byte_array_reader,
 };
+use parquet::arrow::arrow_reader::DEFAULT_BATCH_SIZE;
 use parquet::basic::Type;
 use parquet::data_type::{ByteArray, FixedLenByteArrayType};
 use parquet::util::{DataPageBuilder, DataPageBuilderImpl, 
InMemoryPageIterator};
@@ -709,15 +710,23 @@ fn create_primitive_array_reader(
     use parquet::arrow::array_reader::PrimitiveArrayReader;
     match column_desc.physical_type() {
         Type::INT32 => {
-            let reader =
-                
PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, 
None)
-                    .unwrap();
+            let reader = PrimitiveArrayReader::<Int32Type>::new(
+                Box::new(page_iterator),
+                column_desc,
+                None,
+                DEFAULT_BATCH_SIZE,
+            )
+            .unwrap();
             Box::new(reader)
         }
         Type::INT64 => {
-            let reader =
-                
PrimitiveArrayReader::<Int64Type>::new(Box::new(page_iterator), column_desc, 
None)
-                    .unwrap();
+            let reader = PrimitiveArrayReader::<Int64Type>::new(
+                Box::new(page_iterator),
+                column_desc,
+                None,
+                DEFAULT_BATCH_SIZE,
+            )
+            .unwrap();
             Box::new(reader)
         }
         _ => unreachable!(),
@@ -730,9 +739,13 @@ fn create_f16_by_bytes_reader(
 ) -> Box<dyn ArrayReader> {
     let physical_type = column_desc.physical_type();
     match physical_type {
-        Type::FIXED_LEN_BYTE_ARRAY => {
-            make_fixed_len_byte_array_reader(Box::new(page_iterator), 
column_desc, None).unwrap()
-        }
+        Type::FIXED_LEN_BYTE_ARRAY => make_fixed_len_byte_array_reader(
+            Box::new(page_iterator),
+            column_desc,
+            None,
+            DEFAULT_BATCH_SIZE,
+        )
+        .unwrap(),
         _ => unimplemented!(),
     }
 }
@@ -743,12 +756,20 @@ fn create_decimal_by_bytes_reader(
 ) -> Box<dyn ArrayReader> {
     let physical_type = column_desc.physical_type();
     match physical_type {
-        Type::BYTE_ARRAY => {
-            make_byte_array_reader(Box::new(page_iterator), column_desc, 
None).unwrap()
-        }
-        Type::FIXED_LEN_BYTE_ARRAY => {
-            make_fixed_len_byte_array_reader(Box::new(page_iterator), 
column_desc, None).unwrap()
-        }
+        Type::BYTE_ARRAY => make_byte_array_reader(
+            Box::new(page_iterator),
+            column_desc,
+            None,
+            DEFAULT_BATCH_SIZE,
+        )
+        .unwrap(),
+        Type::FIXED_LEN_BYTE_ARRAY => make_fixed_len_byte_array_reader(
+            Box::new(page_iterator),
+            column_desc,
+            None,
+            DEFAULT_BATCH_SIZE,
+        )
+        .unwrap(),
         _ => unimplemented!(),
     }
 }
@@ -757,28 +778,52 @@ fn create_fixed_len_byte_array_reader(
     page_iterator: impl PageIterator + 'static,
     column_desc: ColumnDescPtr,
 ) -> Box<dyn ArrayReader> {
-    make_fixed_len_byte_array_reader(Box::new(page_iterator), column_desc, 
None).unwrap()
+    make_fixed_len_byte_array_reader(
+        Box::new(page_iterator),
+        column_desc,
+        None,
+        DEFAULT_BATCH_SIZE,
+    )
+    .unwrap()
 }
 
 fn create_byte_array_reader(
     page_iterator: impl PageIterator + 'static,
     column_desc: ColumnDescPtr,
 ) -> Box<dyn ArrayReader> {
-    make_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
+    make_byte_array_reader(
+        Box::new(page_iterator),
+        column_desc,
+        None,
+        DEFAULT_BATCH_SIZE,
+    )
+    .unwrap()
 }
 
 fn create_byte_view_array_reader(
     page_iterator: impl PageIterator + 'static,
     column_desc: ColumnDescPtr,
 ) -> Box<dyn ArrayReader> {
-    make_byte_view_array_reader(Box::new(page_iterator), column_desc, 
None).unwrap()
+    make_byte_view_array_reader(
+        Box::new(page_iterator),
+        column_desc,
+        None,
+        DEFAULT_BATCH_SIZE,
+    )
+    .unwrap()
 }
 
 fn create_string_view_byte_array_reader(
     page_iterator: impl PageIterator + 'static,
     column_desc: ColumnDescPtr,
 ) -> Box<dyn ArrayReader> {
-    make_byte_view_array_reader(Box::new(page_iterator), column_desc, 
None).unwrap()
+    make_byte_view_array_reader(
+        Box::new(page_iterator),
+        column_desc,
+        None,
+        DEFAULT_BATCH_SIZE,
+    )
+    .unwrap()
 }
 
 fn create_string_byte_array_dictionary_reader(
@@ -788,8 +833,13 @@ fn create_string_byte_array_dictionary_reader(
     use parquet::arrow::array_reader::make_byte_array_dictionary_reader;
     let arrow_type = DataType::Dictionary(Box::new(DataType::Int32), 
Box::new(DataType::Utf8));
 
-    make_byte_array_dictionary_reader(Box::new(page_iterator), column_desc, 
Some(arrow_type))
-        .unwrap()
+    make_byte_array_dictionary_reader(
+        Box::new(page_iterator),
+        column_desc,
+        Some(arrow_type),
+        DEFAULT_BATCH_SIZE,
+    )
+    .unwrap()
 }
 
 fn create_string_list_reader(
diff --git a/parquet/src/arrow/array_reader/builder.rs 
b/parquet/src/arrow/array_reader/builder.rs
index 818e06e8b8..d806b2147a 100644
--- a/parquet/src/arrow/array_reader/builder.rs
+++ b/parquet/src/arrow/array_reader/builder.rs
@@ -33,6 +33,7 @@ use crate::arrow::array_reader::{
     NullArrayReader, PrimitiveArrayReader, RowGroups, StructArrayReader,
     make_byte_array_dictionary_reader, make_byte_array_reader,
 };
+use crate::arrow::arrow_reader::DEFAULT_BATCH_SIZE;
 use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
 use crate::arrow::schema::{ParquetField, ParquetFieldType, VirtualColumnType};
 use crate::basic::Type as PhysicalType;
@@ -96,18 +97,30 @@ pub struct ArrayReaderBuilder<'a> {
     parquet_metadata: Option<&'a ParquetMetaData>,
     /// metrics
     metrics: &'a ArrowReaderMetrics,
+    /// Batch size for pre-allocating internal buffers
+    batch_size: usize,
 }
 
 impl<'a> ArrayReaderBuilder<'a> {
+    /// Create a new `ArrayReaderBuilder`
     pub fn new(row_groups: &'a dyn RowGroups, metrics: &'a ArrowReaderMetrics) 
-> Self {
         Self {
             row_groups,
             cache_options: None,
             parquet_metadata: None,
             metrics,
+            batch_size: DEFAULT_BATCH_SIZE,
         }
     }
 
+    /// Set the batch size used to pre-allocate internal buffers.
+    ///
+    /// This avoids reallocations when reading the first batch of data.
+    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
+        self.batch_size = batch_size;
+        self
+    }
+
     /// Add cache options to the builder
     pub fn with_cache_options(mut self, cache_options: Option<&'a 
CacheOptions<'a>>) -> Self {
         self.cache_options = cache_options;
@@ -414,18 +427,21 @@ impl<'a> ArrayReaderBuilder<'a> {
                 page_iterator,
                 column_desc,
                 arrow_type,
+                self.batch_size,
             )?) as _,
             PhysicalType::INT32 => {
                 if let Some(DataType::Null) = arrow_type {
                     Box::new(NullArrayReader::<Int32Type>::new(
                         page_iterator,
                         column_desc,
+                        self.batch_size,
                     )?) as _
                 } else {
                     Box::new(PrimitiveArrayReader::<Int32Type>::new(
                         page_iterator,
                         column_desc,
                         arrow_type,
+                        self.batch_size,
                     )?) as _
                 }
             }
@@ -433,36 +449,56 @@ impl<'a> ArrayReaderBuilder<'a> {
                 page_iterator,
                 column_desc,
                 arrow_type,
+                self.batch_size,
             )?) as _,
             PhysicalType::INT96 => 
Box::new(PrimitiveArrayReader::<Int96Type>::new(
                 page_iterator,
                 column_desc,
                 arrow_type,
+                self.batch_size,
             )?) as _,
             PhysicalType::FLOAT => 
Box::new(PrimitiveArrayReader::<FloatType>::new(
                 page_iterator,
                 column_desc,
                 arrow_type,
+                self.batch_size,
             )?) as _,
             PhysicalType::DOUBLE => 
Box::new(PrimitiveArrayReader::<DoubleType>::new(
                 page_iterator,
                 column_desc,
                 arrow_type,
+                self.batch_size,
             )?) as _,
             PhysicalType::BYTE_ARRAY => match arrow_type {
-                Some(DataType::Dictionary(_, _)) => {
-                    make_byte_array_dictionary_reader(page_iterator, 
column_desc, arrow_type)?
+                Some(DataType::Dictionary(_, _)) => 
make_byte_array_dictionary_reader(
+                    page_iterator,
+                    column_desc,
+                    arrow_type,
+                    self.batch_size,
+                )?,
+                Some(DataType::Utf8View | DataType::BinaryView) => 
make_byte_view_array_reader(
+                    page_iterator,
+                    column_desc,
+                    arrow_type,
+                    self.batch_size,
+                )?,
+                _ => {
+                    make_byte_array_reader(page_iterator, column_desc, 
arrow_type, self.batch_size)?
                 }
-                Some(DataType::Utf8View | DataType::BinaryView) => {
-                    make_byte_view_array_reader(page_iterator, column_desc, 
arrow_type)?
-                }
-                _ => make_byte_array_reader(page_iterator, column_desc, 
arrow_type)?,
             },
             PhysicalType::FIXED_LEN_BYTE_ARRAY => match arrow_type {
-                Some(DataType::Dictionary(_, _)) => {
-                    make_byte_array_dictionary_reader(page_iterator, 
column_desc, arrow_type)?
-                }
-                _ => make_fixed_len_byte_array_reader(page_iterator, 
column_desc, arrow_type)?,
+                Some(DataType::Dictionary(_, _)) => 
make_byte_array_dictionary_reader(
+                    page_iterator,
+                    column_desc,
+                    arrow_type,
+                    self.batch_size,
+                )?,
+                _ => make_fixed_len_byte_array_reader(
+                    page_iterator,
+                    column_desc,
+                    arrow_type,
+                    self.batch_size,
+                )?,
             },
         };
         Ok(Some(reader))
@@ -533,6 +569,7 @@ mod tests {
 
         let metrics = ArrowReaderMetrics::disabled();
         let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics)
+            .with_batch_size(DEFAULT_BATCH_SIZE)
             .build_array_reader(fields.as_ref(), &mask)
             .unwrap();
 
@@ -566,6 +603,7 @@ mod tests {
 
         let metrics = ArrowReaderMetrics::disabled();
         let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics)
+            .with_batch_size(DEFAULT_BATCH_SIZE)
             .with_parquet_metadata(file_reader.metadata())
             .build_array_reader(fields.as_ref(), &mask)
             .unwrap();
diff --git a/parquet/src/arrow/array_reader/byte_array.rs 
b/parquet/src/arrow/array_reader/byte_array.rs
index 2d0d44fbe2..cf40d0576d 100644
--- a/parquet/src/arrow/array_reader/byte_array.rs
+++ b/parquet/src/arrow/array_reader/byte_array.rs
@@ -38,10 +38,14 @@ use std::any::Any;
 use std::sync::Arc;
 
 /// Returns an [`ArrayReader`] that decodes the provided byte array column
+///
+/// `batch_size` is used to pre-allocate internal buffers,
+/// avoiding reallocations when reading the first batch of data.
 pub fn make_byte_array_reader(
     pages: Box<dyn PageIterator>,
     column_desc: ColumnDescPtr,
     arrow_type: Option<ArrowType>,
+    batch_size: usize,
 ) -> Result<Box<dyn ArrayReader>> {
     // Check if Arrow type is specified, else create it from Parquet type
     let data_type = match arrow_type {
@@ -56,13 +60,13 @@ pub fn make_byte_array_reader(
         | ArrowType::Utf8
         | ArrowType::Decimal128(_, _)
         | ArrowType::Decimal256(_, _) => {
-            let reader = GenericRecordReader::new(column_desc);
+            let reader = GenericRecordReader::new(column_desc, batch_size);
             Ok(Box::new(ByteArrayReader::<i32>::new(
                 pages, data_type, reader,
             )))
         }
         ArrowType::LargeUtf8 | ArrowType::LargeBinary => {
-            let reader = GenericRecordReader::new(column_desc);
+            let reader = GenericRecordReader::new(column_desc, batch_size);
             Ok(Box::new(ByteArrayReader::<i64>::new(
                 pages, data_type, reader,
             )))
@@ -202,7 +206,7 @@ impl<I: OffsetSizeTrait> ColumnValueDecoder for 
ByteArrayColumnValueDecoder<I> {
             ));
         }
 
-        let mut buffer = OffsetBuffer::default();
+        let mut buffer = OffsetBuffer::with_capacity(0);
         let mut decoder = ByteArrayDecoderPlain::new(
             buf,
             num_values as usize,
@@ -481,24 +485,28 @@ impl ByteArrayDecoderDeltaLength {
         let initial_values_length = output.values.len();
 
         let to_read = len.min(self.lengths.len() - self.length_offset);
-        output.offsets.reserve(to_read);
-
         let src_lengths = &self.lengths[self.length_offset..self.length_offset 
+ to_read];
-
         let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
-        output.values.reserve(total_bytes);
 
-        let mut current_offset = self.data_offset;
-        for length in src_lengths {
-            let end_offset = current_offset + *length as usize;
-            output.try_push(
-                &self.data.as_ref()[current_offset..end_offset],
-                self.validate_utf8,
-            )?;
-            current_offset = end_offset;
-        }
+        // Reserve capacity for both offsets and values upfront
+        output.offsets.reserve(to_read);
+        output.values.reserve(total_bytes);
 
-        self.data_offset = current_offset;
+        // Delta length data is contiguous — copy all value bytes at once
+        let data_end = self.data_offset + total_bytes;
+        output
+            .values
+            
.extend_from_slice(&self.data.as_ref()[self.data_offset..data_end]);
+
+        // Compute and extend offsets in batch using extend
+        let base_offset = initial_values_length;
+        let mut running = base_offset;
+        output.offsets.extend(src_lengths.iter().map(|length| {
+            running += *length as usize;
+            I::from_usize(running).expect("index overflow decoding byte array")
+        }));
+
+        self.data_offset = data_end;
         self.length_offset += to_read;
 
         if self.validate_utf8 {
@@ -623,7 +631,7 @@ mod tests {
             .unwrap();
 
         for (encoding, page) in pages {
-            let mut output = OffsetBuffer::<i32>::default();
+            let mut output = OffsetBuffer::<i32>::with_capacity(0);
             decoder.set_data(encoding, page, 4, Some(4)).unwrap();
 
             assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
@@ -678,7 +686,7 @@ mod tests {
             .unwrap();
 
         for (encoding, page) in pages {
-            let mut output = OffsetBuffer::<i32>::default();
+            let mut output = OffsetBuffer::<i32>::with_capacity(0);
             decoder.set_data(encoding, page, 4, Some(4)).unwrap();
 
             assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
@@ -722,7 +730,7 @@ mod tests {
 
         // test nulls read
         for (encoding, page) in pages.clone() {
-            let mut output = OffsetBuffer::<i32>::default();
+            let mut output = OffsetBuffer::<i32>::with_capacity(0);
             decoder.set_data(encoding, page, 4, None).unwrap();
             assert_eq!(decoder.read(&mut output, 1024).unwrap(), 0);
         }
diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs 
b/parquet/src/arrow/array_reader/byte_array_dictionary.rs
index 1f77b4bd2f..01dd9bcf8b 100644
--- a/parquet/src/arrow/array_reader/byte_array_dictionary.rs
+++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs
@@ -39,14 +39,14 @@ use crate::util::bit_util::FromBitpacked;
 /// A macro to reduce verbosity of [`make_byte_array_dictionary_reader`]
 macro_rules! make_reader {
     (
-        ($pages:expr, $column_desc:expr, $data_type:expr) => match ($k:expr, 
$v:expr) {
+        ($pages:expr, $column_desc:expr, $data_type:expr, $batch_size:expr) => 
match ($k:expr, $v:expr) {
             $(($key_arrow:pat, $value_arrow:pat) => ($key_type:ty, 
$value_type:ty),)+
         }
     ) => {
         match (($k, $v)) {
             $(
                 ($key_arrow, $value_arrow) => {
-                    let reader = GenericRecordReader::new($column_desc);
+                    let reader = GenericRecordReader::new($column_desc, 
$batch_size);
                     Ok(Box::new(ByteArrayDictionaryReader::<$key_type, 
$value_type>::new(
                         $pages, $data_type, reader,
                     )))
@@ -72,10 +72,13 @@ macro_rules! make_reader {
 /// It is therefore recommended that if `pages` contains data from multiple 
column chunks,
 /// that the read batch size used is a divisor of the row group size
 ///
+/// `batch_size` is used to pre-allocate internal buffers,
+/// avoiding reallocations when reading the first batch of data.
 pub fn make_byte_array_dictionary_reader(
     pages: Box<dyn PageIterator>,
     column_desc: ColumnDescPtr,
     arrow_type: Option<ArrowType>,
+    batch_size: usize,
 ) -> Result<Box<dyn ArrayReader>> {
     // Check if Arrow type is specified, else create it from Parquet type
     let data_type = match arrow_type {
@@ -88,7 +91,7 @@ pub fn make_byte_array_dictionary_reader(
     match &data_type {
         ArrowType::Dictionary(key_type, value_type) => {
             make_reader! {
-                (pages, column_desc, data_type) => match (key_type.as_ref(), 
value_type.as_ref()) {
+                (pages, column_desc, data_type, batch_size) => match 
(key_type.as_ref(), value_type.as_ref()) {
                     (ArrowType::UInt8, ArrowType::Binary | ArrowType::Utf8 | 
ArrowType::FixedSizeBinary(_)) => (u8, i32),
                     (ArrowType::UInt8, ArrowType::LargeBinary | 
ArrowType::LargeUtf8) => (u8, i64),
                     (ArrowType::Int8, ArrowType::Binary | ArrowType::Utf8 | 
ArrowType::FixedSizeBinary(_)) => (i8, i32),
@@ -272,7 +275,7 @@ where
         }
 
         let len = num_values as usize;
-        let mut buffer = OffsetBuffer::<V>::default();
+        let mut buffer = OffsetBuffer::<V>::with_capacity(0);
         let mut decoder = ByteArrayDecoderPlain::new(buf, len, Some(len), 
self.validate_utf8);
         decoder.read(&mut buffer, usize::MAX)?;
 
@@ -425,7 +428,7 @@ mod tests {
             .set_data(Encoding::RLE_DICTIONARY, encoded, 14, Some(data.len()))
             .unwrap();
 
-        let mut output = DictionaryBuffer::<i32, i32>::default();
+        let mut output = DictionaryBuffer::<i32, i32>::with_capacity(0);
         assert_eq!(decoder.read(&mut output, 3).unwrap(), 3);
 
         let mut valid = vec![false, false, true, true, false, true];
@@ -491,7 +494,7 @@ mod tests {
             .set_data(Encoding::RLE_DICTIONARY, encoded, 7, Some(data.len()))
             .unwrap();
 
-        let mut output = DictionaryBuffer::<i32, i32>::default();
+        let mut output = DictionaryBuffer::<i32, i32>::with_capacity(0);
 
         // read two skip one
         assert_eq!(decoder.read(&mut output, 2).unwrap(), 2);
@@ -542,7 +545,7 @@ mod tests {
             .unwrap();
 
         // Read all pages into single buffer
-        let mut output = DictionaryBuffer::<i32, i32>::default();
+        let mut output = DictionaryBuffer::<i32, i32>::with_capacity(0);
 
         for (encoding, page) in pages {
             decoder.set_data(encoding, page, 4, Some(4)).unwrap();
@@ -585,7 +588,7 @@ mod tests {
             .unwrap();
 
         // Read all pages into single buffer
-        let mut output = DictionaryBuffer::<i32, i32>::default();
+        let mut output = DictionaryBuffer::<i32, i32>::with_capacity(0);
 
         for (encoding, page) in pages {
             decoder.set_data(encoding, page, 4, Some(4)).unwrap();
@@ -649,7 +652,7 @@ mod tests {
             .unwrap();
 
         for (encoding, page) in pages.clone() {
-            let mut output = DictionaryBuffer::<i32, i32>::default();
+            let mut output = DictionaryBuffer::<i32, i32>::with_capacity(0);
             decoder.set_data(encoding, page, 8, None).unwrap();
             assert_eq!(decoder.read(&mut output, 1024).unwrap(), 0);
 
@@ -664,7 +667,7 @@ mod tests {
         }
 
         for (encoding, page) in pages {
-            let mut output = DictionaryBuffer::<i32, i32>::default();
+            let mut output = DictionaryBuffer::<i32, i32>::with_capacity(0);
             decoder.set_data(encoding, page, 8, None).unwrap();
             assert_eq!(decoder.skip_values(1024).unwrap(), 0);
 
diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs 
b/parquet/src/arrow/array_reader/byte_view_array.rs
index 1933654118..c134261609 100644
--- a/parquet/src/arrow/array_reader/byte_view_array.rs
+++ b/parquet/src/arrow/array_reader/byte_view_array.rs
@@ -36,10 +36,14 @@ use bytes::Bytes;
 use std::any::Any;
 
 /// Returns an [`ArrayReader`] that decodes the provided byte array column to 
view types.
+///
+/// `batch_size` is used to pre-allocate internal buffers,
+/// avoiding reallocations when reading the first batch of data.
 pub fn make_byte_view_array_reader(
     pages: Box<dyn PageIterator>,
     column_desc: ColumnDescPtr,
     arrow_type: Option<ArrowType>,
+    batch_size: usize,
 ) -> Result<Box<dyn ArrayReader>> {
     // Check if Arrow type is specified, else create it from Parquet type
     let data_type = match arrow_type {
@@ -52,7 +56,7 @@ pub fn make_byte_view_array_reader(
 
     match data_type {
         ArrowType::BinaryView | ArrowType::Utf8View => {
-            let reader = GenericRecordReader::new(column_desc);
+            let reader = GenericRecordReader::new(column_desc, batch_size);
             Ok(Box::new(ByteViewArrayReader::new(pages, data_type, reader)))
         }
 
@@ -162,13 +166,10 @@ impl ColumnValueDecoder for 
ByteViewArrayColumnValueDecoder {
             ));
         }
 
-        let mut buffer = ViewBuffer::default();
-        let mut decoder = ByteViewArrayDecoderPlain::new(
-            buf,
-            num_values as usize,
-            Some(num_values as usize),
-            self.validate_utf8,
-        );
+        let num_values = num_values as usize;
+        let mut buffer = ViewBuffer::with_capacity(num_values);
+        let mut decoder =
+            ByteViewArrayDecoderPlain::new(buf, num_values, Some(num_values), 
self.validate_utf8);
         decoder.read(&mut buffer, usize::MAX)?;
         self.dict = Some(buffer);
         Ok(())
@@ -674,13 +675,20 @@ impl ByteViewArrayDecoderDelta {
     // 
<https://parquet.apache.org/docs/file-format/data-pages/encodings/#delta-strings-delta_byte_array--7>
 
     fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
-        output.views.reserve(len.min(self.decoder.remaining()));
+        let to_reserve = len.min(self.decoder.remaining());
+        output.views.reserve(to_reserve);
 
         // array buffer only have long strings
         let mut array_buffer: Vec<u8> = Vec::with_capacity(4096);
 
         let buffer_id = output.buffers.len() as u32;
 
+        // Use unsafe ptr writes instead of per-element push to avoid
+        // repeated length checks. Safety: we reserved enough space above.
+        let views_ptr = output.views.as_mut_ptr();
+        let initial_len = output.views.len();
+        let mut write_count = 0;
+
         let read = if !self.validate_utf8 {
             self.decoder.read(len, |bytes| {
                 let offset = array_buffer.len();
@@ -690,18 +698,18 @@ impl ByteViewArrayDecoderDelta {
                     array_buffer.extend_from_slice(bytes);
                 }
 
-                // # Safety
-                // The buffer_id is the last buffer in the output buffers
-                // The offset is calculated from the buffer, so it is valid
+                // Safety: views_ptr is valid for writes, we reserved enough 
space,
+                // and write_count < to_reserve.
                 unsafe {
-                    output.append_raw_view_unchecked(view);
+                    views_ptr.add(initial_len + write_count).write(view);
                 }
+                write_count += 1;
                 Ok(())
             })?
         } else {
             // utf8 validation buffer has only short strings. These short
             // strings are inlined into the views but we copy them into a
-            // contiguous buffer to accelerate validation.®
+            // contiguous buffer to accelerate validation.
             let mut utf8_validation_buffer = Vec::with_capacity(4096);
 
             let v = self.decoder.read(len, |bytes| {
@@ -714,13 +722,12 @@ impl ByteViewArrayDecoderDelta {
                     utf8_validation_buffer.extend_from_slice(bytes);
                 }
 
-                // # Safety
-                // The buffer_id is the last buffer in the output buffers
-                // The offset is calculated from the buffer, so it is valid
-                // Utf-8 validation is done later
+                // Safety: views_ptr is valid for writes, we reserved enough 
space,
+                // and write_count < to_reserve. Utf-8 validation is done 
later.
                 unsafe {
-                    output.append_raw_view_unchecked(view);
+                    views_ptr.add(initial_len + write_count).write(view);
                 }
+                write_count += 1;
                 Ok(())
             })?;
             check_valid_utf8(&array_buffer)?;
@@ -728,6 +735,11 @@ impl ByteViewArrayDecoderDelta {
             v
         };
 
+        // Safety: we wrote exactly `read` views via ptr writes above
+        unsafe {
+            output.views.set_len(initial_len + read);
+        }
+
         let actual_block_id = 
output.append_block(Buffer::from_vec(array_buffer));
         assert_eq!(actual_block_id, buffer_id);
         Ok(read)
@@ -769,7 +781,7 @@ mod tests {
             .unwrap();
 
         for (encoding, page) in pages {
-            let mut output = ViewBuffer::default();
+            let mut output = ViewBuffer::with_capacity(0);
             decoder.set_data(encoding, page, 4, Some(4)).unwrap();
 
             assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
@@ -812,7 +824,7 @@ mod tests {
         let column_desc = utf8_column();
         let mut decoder = ByteViewArrayColumnValueDecoder::new(&column_desc);
 
-        let mut view_buffer = ViewBuffer::default();
+        let mut view_buffer = ViewBuffer::with_capacity(0);
         decoder.set_data(Encoding::PLAIN, pages, 4, None).unwrap();
         decoder.read(&mut view_buffer, 1).unwrap();
         decoder.read(&mut view_buffer, 1).unwrap();
diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs 
b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
index 2297926add..d562c88cb8 100644
--- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
+++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
@@ -40,10 +40,14 @@ use std::ops::Range;
 use std::sync::Arc;
 
 /// Returns an [`ArrayReader`] that decodes the provided fixed length byte 
array column
+///
+/// `batch_size` is used to pre-allocate internal buffers,
+/// avoiding reallocations when reading the first batch of data.
 pub fn make_fixed_len_byte_array_reader(
     pages: Box<dyn PageIterator>,
     column_desc: ColumnDescPtr,
     arrow_type: Option<ArrowType>,
+    batch_size: usize,
 ) -> Result<Box<dyn ArrayReader>> {
     // Check if Arrow type is specified, else create it from Parquet type
     let data_type = match arrow_type {
@@ -126,6 +130,7 @@ pub fn make_fixed_len_byte_array_reader(
         column_desc,
         data_type,
         byte_length,
+        batch_size,
     )))
 }
 
@@ -144,14 +149,16 @@ impl FixedLenByteArrayReader {
         column_desc: ColumnDescPtr,
         data_type: ArrowType,
         byte_length: usize,
+        batch_size: usize,
     ) -> Self {
+        let record_reader = GenericRecordReader::new(column_desc, batch_size);
         Self {
             data_type,
             byte_length,
             pages,
             def_levels_buffer: None,
             rep_levels_buffer: None,
-            record_reader: GenericRecordReader::new(column_desc),
+            record_reader,
         }
     }
 }
@@ -284,6 +291,15 @@ fn move_values<F>(
 }
 
 impl ValuesBuffer for FixedLenByteArrayBuffer {
+    fn with_capacity(_capacity: usize) -> Self {
+        // byte_length is not known at trait level, so we return a default 
buffer
+        // The decoder will pre-allocate when it knows both capacity and 
byte_length
+        Self {
+            buffer: Vec::new(),
+            byte_length: None,
+        }
+    }
+
     fn pad_nulls(
         &mut self,
         read_offset: usize,
diff --git a/parquet/src/arrow/array_reader/list_array.rs 
b/parquet/src/arrow/array_reader/list_array.rs
index 1d5c68c22e..a7adc01b91 100644
--- a/parquet/src/arrow/array_reader/list_array.rs
+++ b/parquet/src/arrow/array_reader/list_array.rs
@@ -249,6 +249,7 @@ mod tests {
     use crate::arrow::array_reader::ArrayReaderBuilder;
     use crate::arrow::array_reader::list_array::ListArrayReader;
     use crate::arrow::array_reader::test_util::InMemoryArrayReader;
+    use crate::arrow::arrow_reader::DEFAULT_BATCH_SIZE;
     use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
     use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
     use crate::arrow::{ArrowWriter, ProjectionMask, parquet_to_arrow_schema};
@@ -567,6 +568,7 @@ mod tests {
 
         let metrics = ArrowReaderMetrics::disabled();
         let mut array_reader = ArrayReaderBuilder::new(&file_reader, &metrics)
+            .with_batch_size(DEFAULT_BATCH_SIZE)
             .build_array_reader(fields.as_ref(), &mask)
             .unwrap();
 
diff --git a/parquet/src/arrow/array_reader/null_array.rs 
b/parquet/src/arrow/array_reader/null_array.rs
index 4ddd1df864..97bc68d6d2 100644
--- a/parquet/src/arrow/array_reader/null_array.rs
+++ b/parquet/src/arrow/array_reader/null_array.rs
@@ -47,8 +47,14 @@ where
     T::T: ArrowNativeType,
 {
     /// Construct null array reader.
-    pub fn new(pages: Box<dyn PageIterator>, column_desc: ColumnDescPtr) -> 
Result<Self> {
-        let record_reader = RecordReader::<T>::new(column_desc);
+    ///
+    /// `batch_size` is used to pre-allocate internal buffers.
+    pub fn new(
+        pages: Box<dyn PageIterator>,
+        column_desc: ColumnDescPtr,
+        batch_size: usize,
+    ) -> Result<Self> {
+        let record_reader = RecordReader::<T>::new(column_desc, batch_size);
 
         Ok(Self {
             data_type: ArrowType::Null,
diff --git a/parquet/src/arrow/array_reader/primitive_array.rs 
b/parquet/src/arrow/array_reader/primitive_array.rs
index e1c944f60c..71218a6282 100644
--- a/parquet/src/arrow/array_reader/primitive_array.rs
+++ b/parquet/src/arrow/array_reader/primitive_array.rs
@@ -104,10 +104,13 @@ where
     Vec<T::T>: IntoBuffer,
 {
     /// Construct primitive array reader.
+    ///
+    /// `batch_size` is used to pre-allocate internal buffers.
     pub fn new(
         pages: Box<dyn PageIterator>,
         column_desc: ColumnDescPtr,
         arrow_type: Option<ArrowType>,
+        batch_size: usize,
     ) -> Result<Self> {
         // Check if Arrow type is specified, else create it from Parquet type
         let data_type = match arrow_type {
@@ -117,7 +120,7 @@ where
                 .clone(),
         };
 
-        let record_reader = RecordReader::<T>::new(column_desc);
+        let record_reader = RecordReader::<T>::new(column_desc, batch_size);
 
         Ok(Self {
             data_type,
@@ -436,6 +439,7 @@ fn pack_dictionary_impl<K: ArrowDictionaryKeyType, V: 
ArrowPrimitiveType>(
 mod tests {
     use super::*;
     use crate::arrow::array_reader::test_util::EmptyPageIterator;
+    use crate::arrow::arrow_reader::DEFAULT_BATCH_SIZE;
     use crate::basic::Encoding;
     use crate::column::page::Page;
     use crate::data_type::{Int32Type, Int64Type};
@@ -510,6 +514,7 @@ mod tests {
             Box::<EmptyPageIterator>::default(),
             schema.column(0),
             None,
+            DEFAULT_BATCH_SIZE,
         )
         .unwrap();
 
@@ -552,9 +557,13 @@ mod tests {
             );
             let page_iterator = InMemoryPageIterator::new(page_lists);
 
-            let mut array_reader =
-                
PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, 
None)
-                    .unwrap();
+            let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
+                Box::new(page_iterator),
+                column_desc,
+                None,
+                DEFAULT_BATCH_SIZE,
+            )
+            .unwrap();
 
             // Read first 50 values, which are all from the first column chunk
             let array = array_reader.next_batch(50).unwrap();
@@ -623,6 +632,7 @@ mod tests {
                     Box::new(page_iterator),
                     column_desc.clone(),
                     None,
+                    DEFAULT_BATCH_SIZE,
                 )
                 .expect("Unable to get array reader");
 
@@ -758,9 +768,13 @@ mod tests {
 
             let page_iterator = InMemoryPageIterator::new(page_lists);
 
-            let mut array_reader =
-                
PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, 
None)
-                    .unwrap();
+            let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
+                Box::new(page_iterator),
+                column_desc,
+                None,
+                DEFAULT_BATCH_SIZE,
+            )
+            .unwrap();
 
             let mut accu_len: usize = 0;
 
@@ -834,9 +848,13 @@ mod tests {
             );
             let page_iterator = InMemoryPageIterator::new(page_lists);
 
-            let mut array_reader =
-                
PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, 
None)
-                    .unwrap();
+            let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
+                Box::new(page_iterator),
+                column_desc,
+                None,
+                DEFAULT_BATCH_SIZE,
+            )
+            .unwrap();
 
             // read data from the reader
             // the data type is decimal(8,2)
@@ -893,9 +911,13 @@ mod tests {
             );
             let page_iterator = InMemoryPageIterator::new(page_lists);
 
-            let mut array_reader =
-                
PrimitiveArrayReader::<Int64Type>::new(Box::new(page_iterator), column_desc, 
None)
-                    .unwrap();
+            let mut array_reader = PrimitiveArrayReader::<Int64Type>::new(
+                Box::new(page_iterator),
+                column_desc,
+                None,
+                DEFAULT_BATCH_SIZE,
+            )
+            .unwrap();
 
             // read data from the reader
             // the data type is decimal(18,4)
@@ -955,9 +977,13 @@ mod tests {
             );
             let page_iterator = InMemoryPageIterator::new(page_lists);
 
-            let mut array_reader =
-                
PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, 
None)
-                    .unwrap();
+            let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
+                Box::new(page_iterator),
+                column_desc,
+                None,
+                DEFAULT_BATCH_SIZE,
+            )
+            .unwrap();
 
             // read data from the reader
             // the data type is date
diff --git a/parquet/src/arrow/arrow_reader/mod.rs 
b/parquet/src/arrow/arrow_reader/mod.rs
index 1b02c4ae25..8a7e602618 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -57,6 +57,9 @@ mod read_plan;
 pub(crate) mod selection;
 pub mod statistics;
 
+/// Default batch size for reading parquet files
+pub const DEFAULT_BATCH_SIZE: usize = 1024;
+
 /// Builder for constructing Parquet readers that decode into [Apache Arrow]
 /// arrays.
 ///
@@ -168,7 +171,7 @@ impl<T> ArrowReaderBuilder<T> {
             metadata: metadata.metadata,
             schema: metadata.schema,
             fields: metadata.fields,
-            batch_size: 1024,
+            batch_size: DEFAULT_BATCH_SIZE,
             row_groups: None,
             projection: ProjectionMask::all(),
             filter: None,
@@ -196,7 +199,7 @@ impl<T> ArrowReaderBuilder<T> {
         &self.schema
     }
 
-    /// Set the size of [`RecordBatch`] to produce. Defaults to 1024
+    /// Set the size of [`RecordBatch`] to produce. Defaults to 
[`DEFAULT_BATCH_SIZE`]
     /// If the batch_size more than the file row count, use the file row count.
     pub fn with_batch_size(self, batch_size: usize) -> Self {
         // Try to avoid allocate large buffer
@@ -1213,7 +1216,11 @@ impl<T: ChunkReader + 'static> 
ParquetRecordBatchReaderBuilder<T> {
                     break;
                 }
 
+                let mut cache_projection = predicate.projection().clone();
+                cache_projection.intersect(&projection);
+
                 let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
+                    .with_batch_size(batch_size)
                     .with_parquet_metadata(&reader.metadata)
                     .build_array_reader(fields.as_deref(), 
predicate.projection())?;
 
@@ -1222,6 +1229,7 @@ impl<T: ChunkReader + 'static> 
ParquetRecordBatchReaderBuilder<T> {
         }
 
         let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
+            .with_batch_size(batch_size)
             .with_parquet_metadata(&reader.metadata)
             .build_array_reader(fields.as_deref(), &projection)?;
 
@@ -1529,6 +1537,7 @@ impl ParquetRecordBatchReader {
         // note metrics are not supported in this API
         let metrics = ArrowReaderMetrics::disabled();
         let array_reader = ArrayReaderBuilder::new(row_groups, &metrics)
+            .with_batch_size(batch_size)
             .with_parquet_metadata(row_groups.metadata())
             .build_array_reader(levels.levels.as_ref(), 
&ProjectionMask::all())?;
 
diff --git a/parquet/src/arrow/buffer/dictionary_buffer.rs 
b/parquet/src/arrow/buffer/dictionary_buffer.rs
index 71fb18917d..c3cd5744d2 100644
--- a/parquet/src/arrow/buffer/dictionary_buffer.rs
+++ b/parquet/src/arrow/buffer/dictionary_buffer.rs
@@ -38,14 +38,6 @@ pub enum DictionaryBuffer<K: ArrowNativeType, V: 
OffsetSizeTrait> {
     Values { values: OffsetBuffer<V> },
 }
 
-impl<K: ArrowNativeType, V: OffsetSizeTrait> Default for DictionaryBuffer<K, 
V> {
-    fn default() -> Self {
-        Self::Values {
-            values: Default::default(),
-        }
-    }
-}
-
 impl<K: ArrowNativeType + Ord, V: OffsetSizeTrait> DictionaryBuffer<K, V> {
     #[allow(unused)]
     pub fn len(&self) -> usize {
@@ -103,7 +95,7 @@ impl<K: ArrowNativeType + Ord, V: OffsetSizeTrait> 
DictionaryBuffer<K, V> {
         match self {
             Self::Values { values } => Ok(values),
             Self::Dict { keys, values } => {
-                let mut spilled = OffsetBuffer::default();
+                let mut spilled = OffsetBuffer::with_capacity(0);
                 let data = values.to_data();
                 let dict_buffers = data.buffers();
                 let dict_offsets = dict_buffers[0].typed_data::<V>();
@@ -202,6 +194,12 @@ impl<K: ArrowNativeType + Ord, V: OffsetSizeTrait> 
DictionaryBuffer<K, V> {
 }
 
 impl<K: ArrowNativeType, V: OffsetSizeTrait> ValuesBuffer for 
DictionaryBuffer<K, V> {
+    fn with_capacity(capacity: usize) -> Self {
+        Self::Values {
+            values: OffsetBuffer::with_capacity(capacity),
+        }
+    }
+
     fn pad_nulls(
         &mut self,
         read_offset: usize,
@@ -288,7 +286,7 @@ mod tests {
 
         let d1: ArrayRef = Arc::new(StringArray::from(vec!["hello", "world", 
"", "a", "b"]));
 
-        let mut buffer = DictionaryBuffer::<i32, i32>::default();
+        let mut buffer = DictionaryBuffer::<i32, i32>::with_capacity(0);
 
         // Read some data preserving the dictionary
         let values = &[1, 0, 3, 2, 4];
@@ -310,7 +308,7 @@ mod tests {
         buffer.pad_nulls(read_offset, 2, 5, null_buffer.as_slice());
 
         assert_eq!(buffer.len(), 13);
-        let split = std::mem::take(&mut buffer);
+        let split = std::mem::replace(&mut buffer, 
DictionaryBuffer::with_capacity(0));
 
         let array = split.into_array(Some(null_buffer), &dict_type).unwrap();
         assert_eq!(array.data_type(), &dict_type);
@@ -345,7 +343,7 @@ mod tests {
             .unwrap()
             .extend_from_slice(&[0, 1, 0, 1]);
 
-        let array = std::mem::take(&mut buffer)
+        let array = std::mem::replace(&mut buffer, 
DictionaryBuffer::with_capacity(0))
             .into_array(None, &dict_type)
             .unwrap();
         assert_eq!(array.data_type(), &dict_type);
@@ -373,7 +371,7 @@ mod tests {
         let dict_type =
             ArrowType::Dictionary(Box::new(ArrowType::Int32), 
Box::new(ArrowType::Utf8));
 
-        let mut buffer = DictionaryBuffer::<i32, i32>::default();
+        let mut buffer = DictionaryBuffer::<i32, i32>::with_capacity(0);
         let d = Arc::new(StringArray::from(vec!["", "f"])) as ArrayRef;
         buffer.as_keys(&d).unwrap().extend_from_slice(&[0, 2, 0]);
 
@@ -384,7 +382,7 @@ mod tests {
             err
         );
 
-        let mut buffer = DictionaryBuffer::<i32, i32>::default();
+        let mut buffer = DictionaryBuffer::<i32, i32>::with_capacity(0);
         let d = Arc::new(StringArray::from(vec![""])) as ArrayRef;
         buffer.as_keys(&d).unwrap().extend_from_slice(&[0, 1, 0]);
 
diff --git a/parquet/src/arrow/buffer/offset_buffer.rs 
b/parquet/src/arrow/buffer/offset_buffer.rs
index 209ed4e5c1..2d9d1c9b6b 100644
--- a/parquet/src/arrow/buffer/offset_buffer.rs
+++ b/parquet/src/arrow/buffer/offset_buffer.rs
@@ -32,18 +32,20 @@ pub struct OffsetBuffer<I: OffsetSizeTrait> {
     pub values: Vec<u8>,
 }
 
-impl<I: OffsetSizeTrait> Default for OffsetBuffer<I> {
-    fn default() -> Self {
-        let mut offsets = Vec::new();
-        offsets.resize(1, I::default());
+impl<I: OffsetSizeTrait> OffsetBuffer<I> {
+    /// Create a new `OffsetBuffer` with capacity for at least `capacity` 
elements
+    ///
+    /// Pre-allocates the offsets vector to avoid reallocations during reading.
+    /// The values vector is not pre-allocated as its size is unpredictable.
+    pub fn with_capacity(capacity: usize) -> Self {
+        let mut offsets = Vec::with_capacity(capacity + 1);
+        offsets.push(I::default());
         Self {
             offsets,
             values: Vec::new(),
         }
     }
-}
 
-impl<I: OffsetSizeTrait> OffsetBuffer<I> {
     /// Returns the number of byte arrays in this buffer
     pub fn len(&self) -> usize {
         self.offsets.len() - 1
@@ -93,6 +95,8 @@ impl<I: OffsetSizeTrait> OffsetBuffer<I> {
         dict_offsets: &[V],
         dict_values: &[u8],
     ) -> Result<()> {
+        self.offsets.reserve(keys.len());
+
         for key in keys {
             let index = key.as_usize();
             if index + 1 >= dict_offsets.len() {
@@ -105,7 +109,11 @@ impl<I: OffsetSizeTrait> OffsetBuffer<I> {
             let end_offset = dict_offsets[index + 1].as_usize();
 
             // Dictionary values are verified when decoding dictionary page
-            self.try_push(&dict_values[start_offset..end_offset], false)?;
+            self.values
+                .extend_from_slice(&dict_values[start_offset..end_offset]);
+            let index_offset = I::from_usize(self.values.len())
+                .ok_or_else(|| general_err!("index overflow decoding byte 
array"))?;
+            self.offsets.push(index_offset);
         }
         Ok(())
     }
@@ -139,6 +147,10 @@ impl<I: OffsetSizeTrait> OffsetBuffer<I> {
 }
 
 impl<I: OffsetSizeTrait> ValuesBuffer for OffsetBuffer<I> {
+    fn with_capacity(capacity: usize) -> Self {
+        Self::with_capacity(capacity)
+    }
+
     fn pad_nulls(
         &mut self,
         read_offset: usize,
@@ -195,7 +207,7 @@ mod tests {
 
     #[test]
     fn test_offset_buffer_empty() {
-        let buffer = OffsetBuffer::<i32>::default();
+        let buffer = OffsetBuffer::<i32>::with_capacity(0);
         let array = buffer.into_array(None, ArrowType::Utf8);
         let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
         assert_eq!(strings.len(), 0);
@@ -203,7 +215,7 @@ mod tests {
 
     #[test]
     fn test_offset_buffer_append() {
-        let mut buffer = OffsetBuffer::<i64>::default();
+        let mut buffer = OffsetBuffer::<i64>::with_capacity(0);
         buffer.try_push("hello".as_bytes(), true).unwrap();
         buffer.try_push("bar".as_bytes(), true).unwrap();
         buffer
@@ -220,11 +232,11 @@ mod tests {
 
     #[test]
     fn test_offset_buffer() {
-        let mut buffer = OffsetBuffer::<i32>::default();
+        let mut buffer = OffsetBuffer::<i32>::with_capacity(0);
         for v in ["hello", "world", "cupcakes", "a", "b", "c"] {
             buffer.try_push(v.as_bytes(), false).unwrap()
         }
-        let split = std::mem::take(&mut buffer);
+        let split = std::mem::replace(&mut buffer, 
OffsetBuffer::with_capacity(0));
 
         let array = split.into_array(None, ArrowType::Utf8);
         let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
@@ -244,7 +256,7 @@ mod tests {
 
     #[test]
     fn test_offset_buffer_pad_nulls() {
-        let mut buffer = OffsetBuffer::<i32>::default();
+        let mut buffer = OffsetBuffer::<i32>::with_capacity(0);
         let values = ["a", "b", "c", "def", "gh"];
         for v in &values {
             buffer.try_push(v.as_bytes(), false).unwrap()
@@ -287,7 +299,7 @@ mod tests {
         let valid_4_byte_utf8 = &[0b11110010, 0b10101000, 0b10101001, 
0b10100101];
         std::str::from_utf8(valid_4_byte_utf8).unwrap();
 
-        let mut buffer = OffsetBuffer::<i32>::default();
+        let mut buffer = OffsetBuffer::<i32>::with_capacity(0);
         buffer.try_push(valid_2_byte_utf8, true).unwrap();
         buffer.try_push(valid_3_byte_utf8, true).unwrap();
         buffer.try_push(valid_4_byte_utf8, true).unwrap();
@@ -320,7 +332,7 @@ mod tests {
 
     #[test]
     fn test_pad_nulls_empty() {
-        let mut buffer = OffsetBuffer::<i32>::default();
+        let mut buffer = OffsetBuffer::<i32>::with_capacity(0);
         let valid_mask = Buffer::from_iter(std::iter::repeat_n(false, 9));
         buffer.pad_nulls(0, 0, 9, valid_mask.as_slice());
 
diff --git a/parquet/src/arrow/buffer/view_buffer.rs 
b/parquet/src/arrow/buffer/view_buffer.rs
index a93674663f..b1bdeb64e5 100644
--- a/parquet/src/arrow/buffer/view_buffer.rs
+++ b/parquet/src/arrow/buffer/view_buffer.rs
@@ -33,6 +33,14 @@ pub struct ViewBuffer {
 }
 
 impl ViewBuffer {
+    /// Create a new ViewBuffer with capacity for the specified number of views
+    pub fn with_capacity(capacity: usize) -> Self {
+        Self {
+            views: Vec::with_capacity(capacity),
+            buffers: Vec::new(),
+        }
+    }
+
     pub fn is_empty(&self) -> bool {
         self.views.is_empty()
     }
@@ -43,15 +51,6 @@ impl ViewBuffer {
         block_id
     }
 
-    /// Directly append a view to the view array.
-    /// This is used when we create a StringViewArray from a dictionary whose 
values are StringViewArray.
-    ///
-    /// # Safety
-    /// The `view` must be a valid view as per the ByteView spec.
-    pub unsafe fn append_raw_view_unchecked(&mut self, view: u128) {
-        self.views.push(view);
-    }
-
     /// Converts this into an [`ArrayRef`] with the provided `data_type` and 
`null_buffer`
     pub fn into_array(self, null_buffer: Option<Buffer>, data_type: 
&ArrowType) -> ArrayRef {
         let len = self.views.len();
@@ -72,6 +71,10 @@ impl ViewBuffer {
 }
 
 impl ValuesBuffer for ViewBuffer {
+    fn with_capacity(capacity: usize) -> Self {
+        Self::with_capacity(capacity)
+    }
+
     fn pad_nulls(
         &mut self,
         read_offset: usize,
@@ -94,7 +97,7 @@ mod tests {
 
     #[test]
     fn test_view_buffer_empty() {
-        let buffer = ViewBuffer::default();
+        let buffer = ViewBuffer::with_capacity(0);
         let array = buffer.into_array(None, &ArrowType::Utf8View);
         let strings = array
             .as_any()
@@ -105,16 +108,14 @@ mod tests {
 
     #[test]
     fn test_view_buffer_append_view() {
-        let mut buffer = ViewBuffer::default();
+        let mut buffer = ViewBuffer::with_capacity(0);
         let data = b"0123456789long string to test string view";
         let string_buffer = Buffer::from(data);
         let block_id = buffer.append_block(string_buffer);
 
-        unsafe {
-            buffer.append_raw_view_unchecked(make_view(&data[0..1], block_id, 
0));
-            buffer.append_raw_view_unchecked(make_view(&data[1..10], block_id, 
1));
-            buffer.append_raw_view_unchecked(make_view(&data[10..41], 
block_id, 10));
-        }
+        buffer.views.push(make_view(&data[0..1], block_id, 0));
+        buffer.views.push(make_view(&data[1..10], block_id, 1));
+        buffer.views.push(make_view(&data[10..41], block_id, 10));
 
         let array = buffer.into_array(None, &ArrowType::Utf8View);
         let string_array = array
@@ -133,16 +134,14 @@ mod tests {
 
     #[test]
     fn test_view_buffer_pad_null() {
-        let mut buffer = ViewBuffer::default();
+        let mut buffer = ViewBuffer::with_capacity(0);
         let data = b"0123456789long string to test string view";
         let string_buffer = Buffer::from(data);
         let block_id = buffer.append_block(string_buffer);
 
-        unsafe {
-            buffer.append_raw_view_unchecked(make_view(&data[0..1], block_id, 
0));
-            buffer.append_raw_view_unchecked(make_view(&data[1..10], block_id, 
1));
-            buffer.append_raw_view_unchecked(make_view(&data[10..41], 
block_id, 10));
-        }
+        buffer.views.push(make_view(&data[0..1], block_id, 0));
+        buffer.views.push(make_view(&data[1..10], block_id, 1));
+        buffer.views.push(make_view(&data[10..41], block_id, 10));
 
         let valid = [true, false, false, true, false, false, true];
         let valid_mask = Buffer::from_iter(valid.iter().copied());
diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs 
b/parquet/src/arrow/push_decoder/reader_builder/mod.rs
index 922d8070c0..a95be9d87d 100644
--- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs
+++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs
@@ -438,6 +438,7 @@ impl RowGroupReaderBuilder {
                 let cache_options = filter_info.cache_builder().producer();
 
                 let array_reader = ArrayReaderBuilder::new(&row_group, 
&self.metrics)
+                    .with_batch_size(self.batch_size)
                     .with_cache_options(Some(&cache_options))
                     .with_parquet_metadata(&self.metadata)
                     .build_array_reader(self.fields.as_deref(), 
predicate.projection())?;
@@ -614,6 +615,7 @@ impl RowGroupReaderBuilder {
 
                 // if we have any cached results, connect them up
                 let array_reader_builder = ArrayReaderBuilder::new(&row_group, 
&self.metrics)
+                    .with_batch_size(self.batch_size)
                     .with_parquet_metadata(&self.metadata);
                 let array_reader = if let Some(cache_info) = 
cache_info.as_ref() {
                     let cache_options: CacheOptions = 
cache_info.builder().consumer();
diff --git a/parquet/src/arrow/record_reader/buffer.rs 
b/parquet/src/arrow/record_reader/buffer.rs
index 880407a547..6e0855dda3 100644
--- a/parquet/src/arrow/record_reader/buffer.rs
+++ b/parquet/src/arrow/record_reader/buffer.rs
@@ -18,7 +18,13 @@
 use crate::arrow::buffer::bit_util::iter_set_bits_rev;
 
 /// A buffer that supports padding with nulls
-pub trait ValuesBuffer: Default {
+pub trait ValuesBuffer {
+    /// Create a new buffer with capacity for at least `capacity` elements
+    ///
+    /// This allows pre-allocating buffers to avoid reallocations during 
reading,
+    /// improving performance when the number of values is known in advance.
+    fn with_capacity(capacity: usize) -> Self;
+
     /// If a column contains nulls, more level data may be read than value 
data, as null
     /// values are not encoded. Therefore, first the levels data is read, the 
null count
     /// determined, and then the corresponding number of values read to a 
[`ValuesBuffer`].
@@ -43,6 +49,10 @@ pub trait ValuesBuffer: Default {
 }
 
 impl<T: Copy + Default> ValuesBuffer for Vec<T> {
+    fn with_capacity(capacity: usize) -> Self {
+        Vec::with_capacity(capacity)
+    }
+
     fn pad_nulls(
         &mut self,
         read_offset: usize,
diff --git a/parquet/src/arrow/record_reader/mod.rs 
b/parquet/src/arrow/record_reader/mod.rs
index 2092b4972d..a33b489c62 100644
--- a/parquet/src/arrow/record_reader/mod.rs
+++ b/parquet/src/arrow/record_reader/mod.rs
@@ -50,7 +50,9 @@ pub(crate) type ColumnReader<CV> =
 pub struct GenericRecordReader<V, CV> {
     column_desc: ColumnDescPtr,
 
-    values: V,
+    /// Values buffer, lazily initialized on first read to avoid
+    /// allocating a buffer that may never be used (e.g., after the last batch)
+    values: Option<V>,
     def_levels: Option<DefinitionLevelBuffer>,
     rep_levels: Option<Vec<i16>>,
     column_reader: Option<ColumnReader<CV>>,
@@ -58,6 +60,8 @@ pub struct GenericRecordReader<V, CV> {
     num_values: usize,
     /// Number of buffered records
     num_records: usize,
+    /// Capacity hint for pre-allocating buffers based on batch size
+    capacity_hint: usize,
 }
 
 impl<V, CV> GenericRecordReader<V, CV>
@@ -66,20 +70,25 @@ where
     CV: ColumnValueDecoder<Buffer = V>,
 {
     /// Create a new [`GenericRecordReader`]
-    pub fn new(desc: ColumnDescPtr) -> Self {
+    ///
+    /// The capacity is used to pre-allocate internal buffers, avoiding 
reallocations
+    /// when reading the first batch of data. For optimal performance, set 
this to
+    /// the expected batch size.
+    pub fn new(desc: ColumnDescPtr, capacity: usize) -> Self {
         let def_levels = (desc.max_def_level() > 0)
             .then(|| DefinitionLevelBuffer::new(&desc, 
packed_null_mask(&desc)));
 
         let rep_levels = (desc.max_rep_level() > 0).then(Vec::new);
 
         Self {
-            values: V::default(),
+            values: None, // Lazily initialized on first read
             def_levels,
             rep_levels,
             column_reader: None,
             column_desc: desc,
             num_values: 0,
             num_records: 0,
+            capacity_hint: capacity,
         }
     }
 
@@ -169,7 +178,9 @@ where
     /// Returns currently stored buffer data.
     /// The side effect is similar to `consume_def_levels`.
     pub fn consume_record_data(&mut self) -> V {
-        std::mem::take(&mut self.values)
+        // Take the buffer, leaving None. The next read will lazily allocate a 
new buffer.
+        // This avoids allocating a buffer that may never be used (e.g., after 
the last batch).
+        self.values.take().unwrap_or_else(|| V::with_capacity(0))
     }
 
     /// Returns currently stored null bitmap data for nullable columns.
@@ -208,12 +219,23 @@ where
 
     /// Try to read one batch of data returning the number of records read
     fn read_one_batch(&mut self, batch_size: usize) -> Result<usize> {
+        // Update capacity hint to the largest batch size seen
+        if batch_size > self.capacity_hint {
+            self.capacity_hint = batch_size;
+        }
+
+        // Lazily initialize buffer on first read
+        let capacity_hint = self.capacity_hint;
+        let values = self
+            .values
+            .get_or_insert_with(|| V::with_capacity(capacity_hint));
+
         let (records_read, values_read, levels_read) =
             self.column_reader.as_mut().unwrap().read_records(
                 batch_size,
                 self.def_levels.as_mut(),
                 self.rep_levels.as_mut(),
-                &mut self.values,
+                values,
             )?;
 
         if values_read < levels_read {
@@ -221,7 +243,7 @@ where
                 general_err!("Definition levels should exist when data is less 
than levels!")
             })?;
 
-            self.values.pad_nulls(
+            values.pad_nulls(
                 self.num_values,
                 values_read,
                 levels_read,
@@ -248,6 +270,7 @@ mod tests {
 
     use arrow::buffer::Buffer;
 
+    use crate::arrow::arrow_reader::DEFAULT_BATCH_SIZE;
     use crate::basic::Encoding;
     use crate::data_type::Int32Type;
     use crate::schema::parser::parse_message_type;
@@ -272,7 +295,7 @@ mod tests {
             .unwrap();
 
         // Construct record reader
-        let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());
+        let mut record_reader = RecordReader::<Int32Type>::new(desc.clone(), 
DEFAULT_BATCH_SIZE);
 
         // First page
 
@@ -345,7 +368,7 @@ mod tests {
             .unwrap();
 
         // Construct record reader
-        let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());
+        let mut record_reader = RecordReader::<Int32Type>::new(desc.clone(), 
DEFAULT_BATCH_SIZE);
 
         // First page
 
@@ -447,7 +470,7 @@ mod tests {
             .unwrap();
 
         // Construct record reader
-        let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());
+        let mut record_reader = RecordReader::<Int32Type>::new(desc.clone(), 
DEFAULT_BATCH_SIZE);
 
         // First page
 
@@ -550,7 +573,7 @@ mod tests {
             .unwrap();
 
         // Construct record reader
-        let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());
+        let mut record_reader = RecordReader::<Int32Type>::new(desc.clone(), 
DEFAULT_BATCH_SIZE);
 
         {
             let values = [100; 5000];
@@ -600,7 +623,7 @@ mod tests {
         pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
         let page = pb.consume();
 
-        let mut record_reader = RecordReader::<Int32Type>::new(desc);
+        let mut record_reader = RecordReader::<Int32Type>::new(desc, 
DEFAULT_BATCH_SIZE);
         let page_reader = 
Box::new(InMemoryPageReader::new(vec![page.clone()]));
         record_reader.set_page_reader(page_reader).unwrap();
         assert_eq!(record_reader.read_records(4).unwrap(), 4);
@@ -639,7 +662,7 @@ mod tests {
             .unwrap();
 
         // Construct record reader
-        let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());
+        let mut record_reader = RecordReader::<Int32Type>::new(desc.clone(), 
DEFAULT_BATCH_SIZE);
 
         // First page
 
@@ -713,7 +736,7 @@ mod tests {
             .unwrap();
 
         // Construct record reader
-        let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());
+        let mut record_reader = RecordReader::<Int32Type>::new(desc.clone(), 
DEFAULT_BATCH_SIZE);
 
         // First page
 

Reply via email to