This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 51b02f18e8 [Parquet] perf: preallocate capacity for ArrayReaderBuilder
(#9093)
51b02f18e8 is described below
commit 51b02f18e8325b279ee9d7032b294956d49b60c5
Author: Lanqing Yang <[email protected]>
AuthorDate: Thu Apr 16 23:50:40 2026 -0700
[Parquet] perf: preallocate capacity for ArrayReaderBuilder (#9093)
# Which issue does this PR close?
- Closes #9059.
# Rationale for this change
reduce allocation cost mentioned in
https://github.com/apache/arrow-rs/issues/9059 from experiment:
Pre-allocation overhead may offset the savings from avoiding incremental
growth
# What changes are included in this PR?
- add with_capacity method to ValuesBuffer trait, and remove defaults to
enforce the capacity hint is required for ArrayReaderBuilder.
- The capacity hint will be passed down to GenericRecordReader to
preallocate the buffer.
# Are there any user-facing changes?
yes ArrayReaders needs an extra capacity variable to indicate the
preferred batch size and we will provision buffer with this capacity.
---------
Co-authored-by: Claude Opus 4.6 (1M context) <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
---
parquet/benches/arrow_reader.rs | 92 +++++++++++++++++-----
parquet/src/arrow/array_reader/builder.rs | 58 +++++++++++---
parquet/src/arrow/array_reader/byte_array.rs | 48 ++++++-----
.../arrow/array_reader/byte_array_dictionary.rs | 23 +++---
parquet/src/arrow/array_reader/byte_view_array.rs | 54 ++++++++-----
.../src/arrow/array_reader/fixed_len_byte_array.rs | 18 ++++-
parquet/src/arrow/array_reader/list_array.rs | 2 +
parquet/src/arrow/array_reader/null_array.rs | 10 ++-
parquet/src/arrow/array_reader/primitive_array.rs | 58 ++++++++++----
parquet/src/arrow/arrow_reader/mod.rs | 13 ++-
parquet/src/arrow/buffer/dictionary_buffer.rs | 26 +++---
parquet/src/arrow/buffer/offset_buffer.rs | 40 ++++++----
parquet/src/arrow/buffer/view_buffer.rs | 43 +++++-----
.../src/arrow/push_decoder/reader_builder/mod.rs | 2 +
parquet/src/arrow/record_reader/buffer.rs | 12 ++-
parquet/src/arrow/record_reader/mod.rs | 49 +++++++++---
16 files changed, 381 insertions(+), 167 deletions(-)
diff --git a/parquet/benches/arrow_reader.rs b/parquet/benches/arrow_reader.rs
index 14fa16b353..9325a1faf5 100644
--- a/parquet/benches/arrow_reader.rs
+++ b/parquet/benches/arrow_reader.rs
@@ -27,6 +27,7 @@ use parquet::arrow::array_reader::{
ListArrayReader, make_byte_array_reader, make_byte_view_array_reader,
make_fixed_len_byte_array_reader,
};
+use parquet::arrow::arrow_reader::DEFAULT_BATCH_SIZE;
use parquet::basic::Type;
use parquet::data_type::{ByteArray, FixedLenByteArrayType};
use parquet::util::{DataPageBuilder, DataPageBuilderImpl,
InMemoryPageIterator};
@@ -709,15 +710,23 @@ fn create_primitive_array_reader(
use parquet::arrow::array_reader::PrimitiveArrayReader;
match column_desc.physical_type() {
Type::INT32 => {
- let reader =
-
PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc,
None)
- .unwrap();
+ let reader = PrimitiveArrayReader::<Int32Type>::new(
+ Box::new(page_iterator),
+ column_desc,
+ None,
+ DEFAULT_BATCH_SIZE,
+ )
+ .unwrap();
Box::new(reader)
}
Type::INT64 => {
- let reader =
-
PrimitiveArrayReader::<Int64Type>::new(Box::new(page_iterator), column_desc,
None)
- .unwrap();
+ let reader = PrimitiveArrayReader::<Int64Type>::new(
+ Box::new(page_iterator),
+ column_desc,
+ None,
+ DEFAULT_BATCH_SIZE,
+ )
+ .unwrap();
Box::new(reader)
}
_ => unreachable!(),
@@ -730,9 +739,13 @@ fn create_f16_by_bytes_reader(
) -> Box<dyn ArrayReader> {
let physical_type = column_desc.physical_type();
match physical_type {
- Type::FIXED_LEN_BYTE_ARRAY => {
- make_fixed_len_byte_array_reader(Box::new(page_iterator),
column_desc, None).unwrap()
- }
+ Type::FIXED_LEN_BYTE_ARRAY => make_fixed_len_byte_array_reader(
+ Box::new(page_iterator),
+ column_desc,
+ None,
+ DEFAULT_BATCH_SIZE,
+ )
+ .unwrap(),
_ => unimplemented!(),
}
}
@@ -743,12 +756,20 @@ fn create_decimal_by_bytes_reader(
) -> Box<dyn ArrayReader> {
let physical_type = column_desc.physical_type();
match physical_type {
- Type::BYTE_ARRAY => {
- make_byte_array_reader(Box::new(page_iterator), column_desc,
None).unwrap()
- }
- Type::FIXED_LEN_BYTE_ARRAY => {
- make_fixed_len_byte_array_reader(Box::new(page_iterator),
column_desc, None).unwrap()
- }
+ Type::BYTE_ARRAY => make_byte_array_reader(
+ Box::new(page_iterator),
+ column_desc,
+ None,
+ DEFAULT_BATCH_SIZE,
+ )
+ .unwrap(),
+ Type::FIXED_LEN_BYTE_ARRAY => make_fixed_len_byte_array_reader(
+ Box::new(page_iterator),
+ column_desc,
+ None,
+ DEFAULT_BATCH_SIZE,
+ )
+ .unwrap(),
_ => unimplemented!(),
}
}
@@ -757,28 +778,52 @@ fn create_fixed_len_byte_array_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
) -> Box<dyn ArrayReader> {
- make_fixed_len_byte_array_reader(Box::new(page_iterator), column_desc,
None).unwrap()
+ make_fixed_len_byte_array_reader(
+ Box::new(page_iterator),
+ column_desc,
+ None,
+ DEFAULT_BATCH_SIZE,
+ )
+ .unwrap()
}
fn create_byte_array_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
) -> Box<dyn ArrayReader> {
- make_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
+ make_byte_array_reader(
+ Box::new(page_iterator),
+ column_desc,
+ None,
+ DEFAULT_BATCH_SIZE,
+ )
+ .unwrap()
}
fn create_byte_view_array_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
) -> Box<dyn ArrayReader> {
- make_byte_view_array_reader(Box::new(page_iterator), column_desc,
None).unwrap()
+ make_byte_view_array_reader(
+ Box::new(page_iterator),
+ column_desc,
+ None,
+ DEFAULT_BATCH_SIZE,
+ )
+ .unwrap()
}
fn create_string_view_byte_array_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
) -> Box<dyn ArrayReader> {
- make_byte_view_array_reader(Box::new(page_iterator), column_desc,
None).unwrap()
+ make_byte_view_array_reader(
+ Box::new(page_iterator),
+ column_desc,
+ None,
+ DEFAULT_BATCH_SIZE,
+ )
+ .unwrap()
}
fn create_string_byte_array_dictionary_reader(
@@ -788,8 +833,13 @@ fn create_string_byte_array_dictionary_reader(
use parquet::arrow::array_reader::make_byte_array_dictionary_reader;
let arrow_type = DataType::Dictionary(Box::new(DataType::Int32),
Box::new(DataType::Utf8));
- make_byte_array_dictionary_reader(Box::new(page_iterator), column_desc,
Some(arrow_type))
- .unwrap()
+ make_byte_array_dictionary_reader(
+ Box::new(page_iterator),
+ column_desc,
+ Some(arrow_type),
+ DEFAULT_BATCH_SIZE,
+ )
+ .unwrap()
}
fn create_string_list_reader(
diff --git a/parquet/src/arrow/array_reader/builder.rs
b/parquet/src/arrow/array_reader/builder.rs
index 818e06e8b8..d806b2147a 100644
--- a/parquet/src/arrow/array_reader/builder.rs
+++ b/parquet/src/arrow/array_reader/builder.rs
@@ -33,6 +33,7 @@ use crate::arrow::array_reader::{
NullArrayReader, PrimitiveArrayReader, RowGroups, StructArrayReader,
make_byte_array_dictionary_reader, make_byte_array_reader,
};
+use crate::arrow::arrow_reader::DEFAULT_BATCH_SIZE;
use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
use crate::arrow::schema::{ParquetField, ParquetFieldType, VirtualColumnType};
use crate::basic::Type as PhysicalType;
@@ -96,18 +97,30 @@ pub struct ArrayReaderBuilder<'a> {
parquet_metadata: Option<&'a ParquetMetaData>,
/// metrics
metrics: &'a ArrowReaderMetrics,
+ /// Batch size for pre-allocating internal buffers
+ batch_size: usize,
}
impl<'a> ArrayReaderBuilder<'a> {
+ /// Create a new `ArrayReaderBuilder`
pub fn new(row_groups: &'a dyn RowGroups, metrics: &'a ArrowReaderMetrics)
-> Self {
Self {
row_groups,
cache_options: None,
parquet_metadata: None,
metrics,
+ batch_size: DEFAULT_BATCH_SIZE,
}
}
+ /// Set the batch size used to pre-allocate internal buffers.
+ ///
+ /// This avoids reallocations when reading the first batch of data.
+ pub fn with_batch_size(mut self, batch_size: usize) -> Self {
+ self.batch_size = batch_size;
+ self
+ }
+
/// Add cache options to the builder
pub fn with_cache_options(mut self, cache_options: Option<&'a
CacheOptions<'a>>) -> Self {
self.cache_options = cache_options;
@@ -414,18 +427,21 @@ impl<'a> ArrayReaderBuilder<'a> {
page_iterator,
column_desc,
arrow_type,
+ self.batch_size,
)?) as _,
PhysicalType::INT32 => {
if let Some(DataType::Null) = arrow_type {
Box::new(NullArrayReader::<Int32Type>::new(
page_iterator,
column_desc,
+ self.batch_size,
)?) as _
} else {
Box::new(PrimitiveArrayReader::<Int32Type>::new(
page_iterator,
column_desc,
arrow_type,
+ self.batch_size,
)?) as _
}
}
@@ -433,36 +449,56 @@ impl<'a> ArrayReaderBuilder<'a> {
page_iterator,
column_desc,
arrow_type,
+ self.batch_size,
)?) as _,
PhysicalType::INT96 =>
Box::new(PrimitiveArrayReader::<Int96Type>::new(
page_iterator,
column_desc,
arrow_type,
+ self.batch_size,
)?) as _,
PhysicalType::FLOAT =>
Box::new(PrimitiveArrayReader::<FloatType>::new(
page_iterator,
column_desc,
arrow_type,
+ self.batch_size,
)?) as _,
PhysicalType::DOUBLE =>
Box::new(PrimitiveArrayReader::<DoubleType>::new(
page_iterator,
column_desc,
arrow_type,
+ self.batch_size,
)?) as _,
PhysicalType::BYTE_ARRAY => match arrow_type {
- Some(DataType::Dictionary(_, _)) => {
- make_byte_array_dictionary_reader(page_iterator,
column_desc, arrow_type)?
+ Some(DataType::Dictionary(_, _)) =>
make_byte_array_dictionary_reader(
+ page_iterator,
+ column_desc,
+ arrow_type,
+ self.batch_size,
+ )?,
+ Some(DataType::Utf8View | DataType::BinaryView) =>
make_byte_view_array_reader(
+ page_iterator,
+ column_desc,
+ arrow_type,
+ self.batch_size,
+ )?,
+ _ => {
+ make_byte_array_reader(page_iterator, column_desc,
arrow_type, self.batch_size)?
}
- Some(DataType::Utf8View | DataType::BinaryView) => {
- make_byte_view_array_reader(page_iterator, column_desc,
arrow_type)?
- }
- _ => make_byte_array_reader(page_iterator, column_desc,
arrow_type)?,
},
PhysicalType::FIXED_LEN_BYTE_ARRAY => match arrow_type {
- Some(DataType::Dictionary(_, _)) => {
- make_byte_array_dictionary_reader(page_iterator,
column_desc, arrow_type)?
- }
- _ => make_fixed_len_byte_array_reader(page_iterator,
column_desc, arrow_type)?,
+ Some(DataType::Dictionary(_, _)) =>
make_byte_array_dictionary_reader(
+ page_iterator,
+ column_desc,
+ arrow_type,
+ self.batch_size,
+ )?,
+ _ => make_fixed_len_byte_array_reader(
+ page_iterator,
+ column_desc,
+ arrow_type,
+ self.batch_size,
+ )?,
},
};
Ok(Some(reader))
@@ -533,6 +569,7 @@ mod tests {
let metrics = ArrowReaderMetrics::disabled();
let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics)
+ .with_batch_size(DEFAULT_BATCH_SIZE)
.build_array_reader(fields.as_ref(), &mask)
.unwrap();
@@ -566,6 +603,7 @@ mod tests {
let metrics = ArrowReaderMetrics::disabled();
let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics)
+ .with_batch_size(DEFAULT_BATCH_SIZE)
.with_parquet_metadata(file_reader.metadata())
.build_array_reader(fields.as_ref(), &mask)
.unwrap();
diff --git a/parquet/src/arrow/array_reader/byte_array.rs
b/parquet/src/arrow/array_reader/byte_array.rs
index 2d0d44fbe2..cf40d0576d 100644
--- a/parquet/src/arrow/array_reader/byte_array.rs
+++ b/parquet/src/arrow/array_reader/byte_array.rs
@@ -38,10 +38,14 @@ use std::any::Any;
use std::sync::Arc;
/// Returns an [`ArrayReader`] that decodes the provided byte array column
+///
+/// `batch_size` is used to pre-allocate internal buffers,
+/// avoiding reallocations when reading the first batch of data.
pub fn make_byte_array_reader(
pages: Box<dyn PageIterator>,
column_desc: ColumnDescPtr,
arrow_type: Option<ArrowType>,
+ batch_size: usize,
) -> Result<Box<dyn ArrayReader>> {
// Check if Arrow type is specified, else create it from Parquet type
let data_type = match arrow_type {
@@ -56,13 +60,13 @@ pub fn make_byte_array_reader(
| ArrowType::Utf8
| ArrowType::Decimal128(_, _)
| ArrowType::Decimal256(_, _) => {
- let reader = GenericRecordReader::new(column_desc);
+ let reader = GenericRecordReader::new(column_desc, batch_size);
Ok(Box::new(ByteArrayReader::<i32>::new(
pages, data_type, reader,
)))
}
ArrowType::LargeUtf8 | ArrowType::LargeBinary => {
- let reader = GenericRecordReader::new(column_desc);
+ let reader = GenericRecordReader::new(column_desc, batch_size);
Ok(Box::new(ByteArrayReader::<i64>::new(
pages, data_type, reader,
)))
@@ -202,7 +206,7 @@ impl<I: OffsetSizeTrait> ColumnValueDecoder for
ByteArrayColumnValueDecoder<I> {
));
}
- let mut buffer = OffsetBuffer::default();
+ let mut buffer = OffsetBuffer::with_capacity(0);
let mut decoder = ByteArrayDecoderPlain::new(
buf,
num_values as usize,
@@ -481,24 +485,28 @@ impl ByteArrayDecoderDeltaLength {
let initial_values_length = output.values.len();
let to_read = len.min(self.lengths.len() - self.length_offset);
- output.offsets.reserve(to_read);
-
let src_lengths = &self.lengths[self.length_offset..self.length_offset
+ to_read];
-
let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
- output.values.reserve(total_bytes);
- let mut current_offset = self.data_offset;
- for length in src_lengths {
- let end_offset = current_offset + *length as usize;
- output.try_push(
- &self.data.as_ref()[current_offset..end_offset],
- self.validate_utf8,
- )?;
- current_offset = end_offset;
- }
+ // Reserve capacity for both offsets and values upfront
+ output.offsets.reserve(to_read);
+ output.values.reserve(total_bytes);
- self.data_offset = current_offset;
+ // Delta length data is contiguous — copy all value bytes at once
+ let data_end = self.data_offset + total_bytes;
+ output
+ .values
+
.extend_from_slice(&self.data.as_ref()[self.data_offset..data_end]);
+
+ // Compute and extend offsets in batch using extend
+ let base_offset = initial_values_length;
+ let mut running = base_offset;
+ output.offsets.extend(src_lengths.iter().map(|length| {
+ running += *length as usize;
+ I::from_usize(running).expect("index overflow decoding byte array")
+ }));
+
+ self.data_offset = data_end;
self.length_offset += to_read;
if self.validate_utf8 {
@@ -623,7 +631,7 @@ mod tests {
.unwrap();
for (encoding, page) in pages {
- let mut output = OffsetBuffer::<i32>::default();
+ let mut output = OffsetBuffer::<i32>::with_capacity(0);
decoder.set_data(encoding, page, 4, Some(4)).unwrap();
assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
@@ -678,7 +686,7 @@ mod tests {
.unwrap();
for (encoding, page) in pages {
- let mut output = OffsetBuffer::<i32>::default();
+ let mut output = OffsetBuffer::<i32>::with_capacity(0);
decoder.set_data(encoding, page, 4, Some(4)).unwrap();
assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
@@ -722,7 +730,7 @@ mod tests {
// test nulls read
for (encoding, page) in pages.clone() {
- let mut output = OffsetBuffer::<i32>::default();
+ let mut output = OffsetBuffer::<i32>::with_capacity(0);
decoder.set_data(encoding, page, 4, None).unwrap();
assert_eq!(decoder.read(&mut output, 1024).unwrap(), 0);
}
diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs
b/parquet/src/arrow/array_reader/byte_array_dictionary.rs
index 1f77b4bd2f..01dd9bcf8b 100644
--- a/parquet/src/arrow/array_reader/byte_array_dictionary.rs
+++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs
@@ -39,14 +39,14 @@ use crate::util::bit_util::FromBitpacked;
/// A macro to reduce verbosity of [`make_byte_array_dictionary_reader`]
macro_rules! make_reader {
(
- ($pages:expr, $column_desc:expr, $data_type:expr) => match ($k:expr,
$v:expr) {
+ ($pages:expr, $column_desc:expr, $data_type:expr, $batch_size:expr) =>
match ($k:expr, $v:expr) {
$(($key_arrow:pat, $value_arrow:pat) => ($key_type:ty,
$value_type:ty),)+
}
) => {
match (($k, $v)) {
$(
($key_arrow, $value_arrow) => {
- let reader = GenericRecordReader::new($column_desc);
+ let reader = GenericRecordReader::new($column_desc,
$batch_size);
Ok(Box::new(ByteArrayDictionaryReader::<$key_type,
$value_type>::new(
$pages, $data_type, reader,
)))
@@ -72,10 +72,13 @@ macro_rules! make_reader {
/// It is therefore recommended that if `pages` contains data from multiple
column chunks,
/// that the read batch size used is a divisor of the row group size
///
+/// `batch_size` is used to pre-allocate internal buffers,
+/// avoiding reallocations when reading the first batch of data.
pub fn make_byte_array_dictionary_reader(
pages: Box<dyn PageIterator>,
column_desc: ColumnDescPtr,
arrow_type: Option<ArrowType>,
+ batch_size: usize,
) -> Result<Box<dyn ArrayReader>> {
// Check if Arrow type is specified, else create it from Parquet type
let data_type = match arrow_type {
@@ -88,7 +91,7 @@ pub fn make_byte_array_dictionary_reader(
match &data_type {
ArrowType::Dictionary(key_type, value_type) => {
make_reader! {
- (pages, column_desc, data_type) => match (key_type.as_ref(),
value_type.as_ref()) {
+ (pages, column_desc, data_type, batch_size) => match
(key_type.as_ref(), value_type.as_ref()) {
(ArrowType::UInt8, ArrowType::Binary | ArrowType::Utf8 |
ArrowType::FixedSizeBinary(_)) => (u8, i32),
(ArrowType::UInt8, ArrowType::LargeBinary |
ArrowType::LargeUtf8) => (u8, i64),
(ArrowType::Int8, ArrowType::Binary | ArrowType::Utf8 |
ArrowType::FixedSizeBinary(_)) => (i8, i32),
@@ -272,7 +275,7 @@ where
}
let len = num_values as usize;
- let mut buffer = OffsetBuffer::<V>::default();
+ let mut buffer = OffsetBuffer::<V>::with_capacity(0);
let mut decoder = ByteArrayDecoderPlain::new(buf, len, Some(len),
self.validate_utf8);
decoder.read(&mut buffer, usize::MAX)?;
@@ -425,7 +428,7 @@ mod tests {
.set_data(Encoding::RLE_DICTIONARY, encoded, 14, Some(data.len()))
.unwrap();
- let mut output = DictionaryBuffer::<i32, i32>::default();
+ let mut output = DictionaryBuffer::<i32, i32>::with_capacity(0);
assert_eq!(decoder.read(&mut output, 3).unwrap(), 3);
let mut valid = vec![false, false, true, true, false, true];
@@ -491,7 +494,7 @@ mod tests {
.set_data(Encoding::RLE_DICTIONARY, encoded, 7, Some(data.len()))
.unwrap();
- let mut output = DictionaryBuffer::<i32, i32>::default();
+ let mut output = DictionaryBuffer::<i32, i32>::with_capacity(0);
// read two skip one
assert_eq!(decoder.read(&mut output, 2).unwrap(), 2);
@@ -542,7 +545,7 @@ mod tests {
.unwrap();
// Read all pages into single buffer
- let mut output = DictionaryBuffer::<i32, i32>::default();
+ let mut output = DictionaryBuffer::<i32, i32>::with_capacity(0);
for (encoding, page) in pages {
decoder.set_data(encoding, page, 4, Some(4)).unwrap();
@@ -585,7 +588,7 @@ mod tests {
.unwrap();
// Read all pages into single buffer
- let mut output = DictionaryBuffer::<i32, i32>::default();
+ let mut output = DictionaryBuffer::<i32, i32>::with_capacity(0);
for (encoding, page) in pages {
decoder.set_data(encoding, page, 4, Some(4)).unwrap();
@@ -649,7 +652,7 @@ mod tests {
.unwrap();
for (encoding, page) in pages.clone() {
- let mut output = DictionaryBuffer::<i32, i32>::default();
+ let mut output = DictionaryBuffer::<i32, i32>::with_capacity(0);
decoder.set_data(encoding, page, 8, None).unwrap();
assert_eq!(decoder.read(&mut output, 1024).unwrap(), 0);
@@ -664,7 +667,7 @@ mod tests {
}
for (encoding, page) in pages {
- let mut output = DictionaryBuffer::<i32, i32>::default();
+ let mut output = DictionaryBuffer::<i32, i32>::with_capacity(0);
decoder.set_data(encoding, page, 8, None).unwrap();
assert_eq!(decoder.skip_values(1024).unwrap(), 0);
diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs
b/parquet/src/arrow/array_reader/byte_view_array.rs
index 1933654118..c134261609 100644
--- a/parquet/src/arrow/array_reader/byte_view_array.rs
+++ b/parquet/src/arrow/array_reader/byte_view_array.rs
@@ -36,10 +36,14 @@ use bytes::Bytes;
use std::any::Any;
/// Returns an [`ArrayReader`] that decodes the provided byte array column to
view types.
+///
+/// `batch_size` is used to pre-allocate internal buffers,
+/// avoiding reallocations when reading the first batch of data.
pub fn make_byte_view_array_reader(
pages: Box<dyn PageIterator>,
column_desc: ColumnDescPtr,
arrow_type: Option<ArrowType>,
+ batch_size: usize,
) -> Result<Box<dyn ArrayReader>> {
// Check if Arrow type is specified, else create it from Parquet type
let data_type = match arrow_type {
@@ -52,7 +56,7 @@ pub fn make_byte_view_array_reader(
match data_type {
ArrowType::BinaryView | ArrowType::Utf8View => {
- let reader = GenericRecordReader::new(column_desc);
+ let reader = GenericRecordReader::new(column_desc, batch_size);
Ok(Box::new(ByteViewArrayReader::new(pages, data_type, reader)))
}
@@ -162,13 +166,10 @@ impl ColumnValueDecoder for
ByteViewArrayColumnValueDecoder {
));
}
- let mut buffer = ViewBuffer::default();
- let mut decoder = ByteViewArrayDecoderPlain::new(
- buf,
- num_values as usize,
- Some(num_values as usize),
- self.validate_utf8,
- );
+ let num_values = num_values as usize;
+ let mut buffer = ViewBuffer::with_capacity(num_values);
+ let mut decoder =
+ ByteViewArrayDecoderPlain::new(buf, num_values, Some(num_values),
self.validate_utf8);
decoder.read(&mut buffer, usize::MAX)?;
self.dict = Some(buffer);
Ok(())
@@ -674,13 +675,20 @@ impl ByteViewArrayDecoderDelta {
//
<https://parquet.apache.org/docs/file-format/data-pages/encodings/#delta-strings-delta_byte_array--7>
fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
- output.views.reserve(len.min(self.decoder.remaining()));
+ let to_reserve = len.min(self.decoder.remaining());
+ output.views.reserve(to_reserve);
// array buffer only have long strings
let mut array_buffer: Vec<u8> = Vec::with_capacity(4096);
let buffer_id = output.buffers.len() as u32;
+ // Use unsafe ptr writes instead of per-element push to avoid
+ // repeated length checks. Safety: we reserved enough space above.
+ let views_ptr = output.views.as_mut_ptr();
+ let initial_len = output.views.len();
+ let mut write_count = 0;
+
let read = if !self.validate_utf8 {
self.decoder.read(len, |bytes| {
let offset = array_buffer.len();
@@ -690,18 +698,18 @@ impl ByteViewArrayDecoderDelta {
array_buffer.extend_from_slice(bytes);
}
- // # Safety
- // The buffer_id is the last buffer in the output buffers
- // The offset is calculated from the buffer, so it is valid
+ // Safety: views_ptr is valid for writes, we reserved enough
space,
+ // and write_count < to_reserve.
unsafe {
- output.append_raw_view_unchecked(view);
+ views_ptr.add(initial_len + write_count).write(view);
}
+ write_count += 1;
Ok(())
})?
} else {
// utf8 validation buffer has only short strings. These short
// strings are inlined into the views but we copy them into a
- // contiguous buffer to accelerate validation.®
+ // contiguous buffer to accelerate validation.
let mut utf8_validation_buffer = Vec::with_capacity(4096);
let v = self.decoder.read(len, |bytes| {
@@ -714,13 +722,12 @@ impl ByteViewArrayDecoderDelta {
utf8_validation_buffer.extend_from_slice(bytes);
}
- // # Safety
- // The buffer_id is the last buffer in the output buffers
- // The offset is calculated from the buffer, so it is valid
- // Utf-8 validation is done later
+ // Safety: views_ptr is valid for writes, we reserved enough
space,
+ // and write_count < to_reserve. Utf-8 validation is done
later.
unsafe {
- output.append_raw_view_unchecked(view);
+ views_ptr.add(initial_len + write_count).write(view);
}
+ write_count += 1;
Ok(())
})?;
check_valid_utf8(&array_buffer)?;
@@ -728,6 +735,11 @@ impl ByteViewArrayDecoderDelta {
v
};
+ // Safety: we wrote exactly `read` views via ptr writes above
+ unsafe {
+ output.views.set_len(initial_len + read);
+ }
+
let actual_block_id =
output.append_block(Buffer::from_vec(array_buffer));
assert_eq!(actual_block_id, buffer_id);
Ok(read)
@@ -769,7 +781,7 @@ mod tests {
.unwrap();
for (encoding, page) in pages {
- let mut output = ViewBuffer::default();
+ let mut output = ViewBuffer::with_capacity(0);
decoder.set_data(encoding, page, 4, Some(4)).unwrap();
assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
@@ -812,7 +824,7 @@ mod tests {
let column_desc = utf8_column();
let mut decoder = ByteViewArrayColumnValueDecoder::new(&column_desc);
- let mut view_buffer = ViewBuffer::default();
+ let mut view_buffer = ViewBuffer::with_capacity(0);
decoder.set_data(Encoding::PLAIN, pages, 4, None).unwrap();
decoder.read(&mut view_buffer, 1).unwrap();
decoder.read(&mut view_buffer, 1).unwrap();
diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
index 2297926add..d562c88cb8 100644
--- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
+++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
@@ -40,10 +40,14 @@ use std::ops::Range;
use std::sync::Arc;
/// Returns an [`ArrayReader`] that decodes the provided fixed length byte
array column
+///
+/// `batch_size` is used to pre-allocate internal buffers,
+/// avoiding reallocations when reading the first batch of data.
pub fn make_fixed_len_byte_array_reader(
pages: Box<dyn PageIterator>,
column_desc: ColumnDescPtr,
arrow_type: Option<ArrowType>,
+ batch_size: usize,
) -> Result<Box<dyn ArrayReader>> {
// Check if Arrow type is specified, else create it from Parquet type
let data_type = match arrow_type {
@@ -126,6 +130,7 @@ pub fn make_fixed_len_byte_array_reader(
column_desc,
data_type,
byte_length,
+ batch_size,
)))
}
@@ -144,14 +149,16 @@ impl FixedLenByteArrayReader {
column_desc: ColumnDescPtr,
data_type: ArrowType,
byte_length: usize,
+ batch_size: usize,
) -> Self {
+ let record_reader = GenericRecordReader::new(column_desc, batch_size);
Self {
data_type,
byte_length,
pages,
def_levels_buffer: None,
rep_levels_buffer: None,
- record_reader: GenericRecordReader::new(column_desc),
+ record_reader,
}
}
}
@@ -284,6 +291,15 @@ fn move_values<F>(
}
impl ValuesBuffer for FixedLenByteArrayBuffer {
+ fn with_capacity(_capacity: usize) -> Self {
+ // byte_length is not known at trait level, so we return a default
buffer
+ // The decoder will pre-allocate when it knows both capacity and
byte_length
+ Self {
+ buffer: Vec::new(),
+ byte_length: None,
+ }
+ }
+
fn pad_nulls(
&mut self,
read_offset: usize,
diff --git a/parquet/src/arrow/array_reader/list_array.rs
b/parquet/src/arrow/array_reader/list_array.rs
index 1d5c68c22e..a7adc01b91 100644
--- a/parquet/src/arrow/array_reader/list_array.rs
+++ b/parquet/src/arrow/array_reader/list_array.rs
@@ -249,6 +249,7 @@ mod tests {
use crate::arrow::array_reader::ArrayReaderBuilder;
use crate::arrow::array_reader::list_array::ListArrayReader;
use crate::arrow::array_reader::test_util::InMemoryArrayReader;
+ use crate::arrow::arrow_reader::DEFAULT_BATCH_SIZE;
use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
use crate::arrow::{ArrowWriter, ProjectionMask, parquet_to_arrow_schema};
@@ -567,6 +568,7 @@ mod tests {
let metrics = ArrowReaderMetrics::disabled();
let mut array_reader = ArrayReaderBuilder::new(&file_reader, &metrics)
+ .with_batch_size(DEFAULT_BATCH_SIZE)
.build_array_reader(fields.as_ref(), &mask)
.unwrap();
diff --git a/parquet/src/arrow/array_reader/null_array.rs
b/parquet/src/arrow/array_reader/null_array.rs
index 4ddd1df864..97bc68d6d2 100644
--- a/parquet/src/arrow/array_reader/null_array.rs
+++ b/parquet/src/arrow/array_reader/null_array.rs
@@ -47,8 +47,14 @@ where
T::T: ArrowNativeType,
{
/// Construct null array reader.
- pub fn new(pages: Box<dyn PageIterator>, column_desc: ColumnDescPtr) ->
Result<Self> {
- let record_reader = RecordReader::<T>::new(column_desc);
+ ///
+ /// `batch_size` is used to pre-allocate internal buffers.
+ pub fn new(
+ pages: Box<dyn PageIterator>,
+ column_desc: ColumnDescPtr,
+ batch_size: usize,
+ ) -> Result<Self> {
+ let record_reader = RecordReader::<T>::new(column_desc, batch_size);
Ok(Self {
data_type: ArrowType::Null,
diff --git a/parquet/src/arrow/array_reader/primitive_array.rs
b/parquet/src/arrow/array_reader/primitive_array.rs
index e1c944f60c..71218a6282 100644
--- a/parquet/src/arrow/array_reader/primitive_array.rs
+++ b/parquet/src/arrow/array_reader/primitive_array.rs
@@ -104,10 +104,13 @@ where
Vec<T::T>: IntoBuffer,
{
/// Construct primitive array reader.
+ ///
+ /// `batch_size` is used to pre-allocate internal buffers.
pub fn new(
pages: Box<dyn PageIterator>,
column_desc: ColumnDescPtr,
arrow_type: Option<ArrowType>,
+ batch_size: usize,
) -> Result<Self> {
// Check if Arrow type is specified, else create it from Parquet type
let data_type = match arrow_type {
@@ -117,7 +120,7 @@ where
.clone(),
};
- let record_reader = RecordReader::<T>::new(column_desc);
+ let record_reader = RecordReader::<T>::new(column_desc, batch_size);
Ok(Self {
data_type,
@@ -436,6 +439,7 @@ fn pack_dictionary_impl<K: ArrowDictionaryKeyType, V:
ArrowPrimitiveType>(
mod tests {
use super::*;
use crate::arrow::array_reader::test_util::EmptyPageIterator;
+ use crate::arrow::arrow_reader::DEFAULT_BATCH_SIZE;
use crate::basic::Encoding;
use crate::column::page::Page;
use crate::data_type::{Int32Type, Int64Type};
@@ -510,6 +514,7 @@ mod tests {
Box::<EmptyPageIterator>::default(),
schema.column(0),
None,
+ DEFAULT_BATCH_SIZE,
)
.unwrap();
@@ -552,9 +557,13 @@ mod tests {
);
let page_iterator = InMemoryPageIterator::new(page_lists);
- let mut array_reader =
-
PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc,
None)
- .unwrap();
+ let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
+ Box::new(page_iterator),
+ column_desc,
+ None,
+ DEFAULT_BATCH_SIZE,
+ )
+ .unwrap();
// Read first 50 values, which are all from the first column chunk
let array = array_reader.next_batch(50).unwrap();
@@ -623,6 +632,7 @@ mod tests {
Box::new(page_iterator),
column_desc.clone(),
None,
+ DEFAULT_BATCH_SIZE,
)
.expect("Unable to get array reader");
@@ -758,9 +768,13 @@ mod tests {
let page_iterator = InMemoryPageIterator::new(page_lists);
- let mut array_reader =
-
PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc,
None)
- .unwrap();
+ let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
+ Box::new(page_iterator),
+ column_desc,
+ None,
+ DEFAULT_BATCH_SIZE,
+ )
+ .unwrap();
let mut accu_len: usize = 0;
@@ -834,9 +848,13 @@ mod tests {
);
let page_iterator = InMemoryPageIterator::new(page_lists);
- let mut array_reader =
-
PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc,
None)
- .unwrap();
+ let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
+ Box::new(page_iterator),
+ column_desc,
+ None,
+ DEFAULT_BATCH_SIZE,
+ )
+ .unwrap();
// read data from the reader
// the data type is decimal(8,2)
@@ -893,9 +911,13 @@ mod tests {
);
let page_iterator = InMemoryPageIterator::new(page_lists);
- let mut array_reader =
-
PrimitiveArrayReader::<Int64Type>::new(Box::new(page_iterator), column_desc,
None)
- .unwrap();
+ let mut array_reader = PrimitiveArrayReader::<Int64Type>::new(
+ Box::new(page_iterator),
+ column_desc,
+ None,
+ DEFAULT_BATCH_SIZE,
+ )
+ .unwrap();
// read data from the reader
// the data type is decimal(18,4)
@@ -955,9 +977,13 @@ mod tests {
);
let page_iterator = InMemoryPageIterator::new(page_lists);
- let mut array_reader =
-
PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc,
None)
- .unwrap();
+ let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
+ Box::new(page_iterator),
+ column_desc,
+ None,
+ DEFAULT_BATCH_SIZE,
+ )
+ .unwrap();
// read data from the reader
// the data type is date
diff --git a/parquet/src/arrow/arrow_reader/mod.rs
b/parquet/src/arrow/arrow_reader/mod.rs
index 1b02c4ae25..8a7e602618 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -57,6 +57,9 @@ mod read_plan;
pub(crate) mod selection;
pub mod statistics;
+/// Default batch size for reading parquet files
+pub const DEFAULT_BATCH_SIZE: usize = 1024;
+
/// Builder for constructing Parquet readers that decode into [Apache Arrow]
/// arrays.
///
@@ -168,7 +171,7 @@ impl<T> ArrowReaderBuilder<T> {
metadata: metadata.metadata,
schema: metadata.schema,
fields: metadata.fields,
- batch_size: 1024,
+ batch_size: DEFAULT_BATCH_SIZE,
row_groups: None,
projection: ProjectionMask::all(),
filter: None,
@@ -196,7 +199,7 @@ impl<T> ArrowReaderBuilder<T> {
&self.schema
}
- /// Set the size of [`RecordBatch`] to produce. Defaults to 1024
+ /// Set the size of [`RecordBatch`] to produce. Defaults to
[`DEFAULT_BATCH_SIZE`]
/// If the batch_size more than the file row count, use the file row count.
pub fn with_batch_size(self, batch_size: usize) -> Self {
// Try to avoid allocate large buffer
@@ -1213,7 +1216,11 @@ impl<T: ChunkReader + 'static>
ParquetRecordBatchReaderBuilder<T> {
break;
}
+ let mut cache_projection = predicate.projection().clone();
+ cache_projection.intersect(&projection);
+
let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
+ .with_batch_size(batch_size)
.with_parquet_metadata(&reader.metadata)
.build_array_reader(fields.as_deref(),
predicate.projection())?;
@@ -1222,6 +1229,7 @@ impl<T: ChunkReader + 'static>
ParquetRecordBatchReaderBuilder<T> {
}
let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
+ .with_batch_size(batch_size)
.with_parquet_metadata(&reader.metadata)
.build_array_reader(fields.as_deref(), &projection)?;
@@ -1529,6 +1537,7 @@ impl ParquetRecordBatchReader {
// note metrics are not supported in this API
let metrics = ArrowReaderMetrics::disabled();
let array_reader = ArrayReaderBuilder::new(row_groups, &metrics)
+ .with_batch_size(batch_size)
.with_parquet_metadata(row_groups.metadata())
.build_array_reader(levels.levels.as_ref(),
&ProjectionMask::all())?;
diff --git a/parquet/src/arrow/buffer/dictionary_buffer.rs
b/parquet/src/arrow/buffer/dictionary_buffer.rs
index 71fb18917d..c3cd5744d2 100644
--- a/parquet/src/arrow/buffer/dictionary_buffer.rs
+++ b/parquet/src/arrow/buffer/dictionary_buffer.rs
@@ -38,14 +38,6 @@ pub enum DictionaryBuffer<K: ArrowNativeType, V:
OffsetSizeTrait> {
Values { values: OffsetBuffer<V> },
}
-impl<K: ArrowNativeType, V: OffsetSizeTrait> Default for DictionaryBuffer<K,
V> {
- fn default() -> Self {
- Self::Values {
- values: Default::default(),
- }
- }
-}
-
impl<K: ArrowNativeType + Ord, V: OffsetSizeTrait> DictionaryBuffer<K, V> {
#[allow(unused)]
pub fn len(&self) -> usize {
@@ -103,7 +95,7 @@ impl<K: ArrowNativeType + Ord, V: OffsetSizeTrait>
DictionaryBuffer<K, V> {
match self {
Self::Values { values } => Ok(values),
Self::Dict { keys, values } => {
- let mut spilled = OffsetBuffer::default();
+ let mut spilled = OffsetBuffer::with_capacity(0);
let data = values.to_data();
let dict_buffers = data.buffers();
let dict_offsets = dict_buffers[0].typed_data::<V>();
@@ -202,6 +194,12 @@ impl<K: ArrowNativeType + Ord, V: OffsetSizeTrait>
DictionaryBuffer<K, V> {
}
impl<K: ArrowNativeType, V: OffsetSizeTrait> ValuesBuffer for
DictionaryBuffer<K, V> {
+ fn with_capacity(capacity: usize) -> Self {
+ Self::Values {
+ values: OffsetBuffer::with_capacity(capacity),
+ }
+ }
+
fn pad_nulls(
&mut self,
read_offset: usize,
@@ -288,7 +286,7 @@ mod tests {
let d1: ArrayRef = Arc::new(StringArray::from(vec!["hello", "world",
"", "a", "b"]));
- let mut buffer = DictionaryBuffer::<i32, i32>::default();
+ let mut buffer = DictionaryBuffer::<i32, i32>::with_capacity(0);
// Read some data preserving the dictionary
let values = &[1, 0, 3, 2, 4];
@@ -310,7 +308,7 @@ mod tests {
buffer.pad_nulls(read_offset, 2, 5, null_buffer.as_slice());
assert_eq!(buffer.len(), 13);
- let split = std::mem::take(&mut buffer);
+ let split = std::mem::replace(&mut buffer,
DictionaryBuffer::with_capacity(0));
let array = split.into_array(Some(null_buffer), &dict_type).unwrap();
assert_eq!(array.data_type(), &dict_type);
@@ -345,7 +343,7 @@ mod tests {
.unwrap()
.extend_from_slice(&[0, 1, 0, 1]);
- let array = std::mem::take(&mut buffer)
+ let array = std::mem::replace(&mut buffer,
DictionaryBuffer::with_capacity(0))
.into_array(None, &dict_type)
.unwrap();
assert_eq!(array.data_type(), &dict_type);
@@ -373,7 +371,7 @@ mod tests {
let dict_type =
ArrowType::Dictionary(Box::new(ArrowType::Int32),
Box::new(ArrowType::Utf8));
- let mut buffer = DictionaryBuffer::<i32, i32>::default();
+ let mut buffer = DictionaryBuffer::<i32, i32>::with_capacity(0);
let d = Arc::new(StringArray::from(vec!["", "f"])) as ArrayRef;
buffer.as_keys(&d).unwrap().extend_from_slice(&[0, 2, 0]);
@@ -384,7 +382,7 @@ mod tests {
err
);
- let mut buffer = DictionaryBuffer::<i32, i32>::default();
+ let mut buffer = DictionaryBuffer::<i32, i32>::with_capacity(0);
let d = Arc::new(StringArray::from(vec![""])) as ArrayRef;
buffer.as_keys(&d).unwrap().extend_from_slice(&[0, 1, 0]);
diff --git a/parquet/src/arrow/buffer/offset_buffer.rs
b/parquet/src/arrow/buffer/offset_buffer.rs
index 209ed4e5c1..2d9d1c9b6b 100644
--- a/parquet/src/arrow/buffer/offset_buffer.rs
+++ b/parquet/src/arrow/buffer/offset_buffer.rs
@@ -32,18 +32,20 @@ pub struct OffsetBuffer<I: OffsetSizeTrait> {
pub values: Vec<u8>,
}
-impl<I: OffsetSizeTrait> Default for OffsetBuffer<I> {
- fn default() -> Self {
- let mut offsets = Vec::new();
- offsets.resize(1, I::default());
+impl<I: OffsetSizeTrait> OffsetBuffer<I> {
+ /// Create a new `OffsetBuffer` with capacity for at least `capacity`
elements
+ ///
+ /// Pre-allocates the offsets vector to avoid reallocations during reading.
+ /// The values vector is not pre-allocated as its size is unpredictable.
+ pub fn with_capacity(capacity: usize) -> Self {
+ let mut offsets = Vec::with_capacity(capacity + 1);
+ offsets.push(I::default());
Self {
offsets,
values: Vec::new(),
}
}
-}
-impl<I: OffsetSizeTrait> OffsetBuffer<I> {
/// Returns the number of byte arrays in this buffer
pub fn len(&self) -> usize {
self.offsets.len() - 1
@@ -93,6 +95,8 @@ impl<I: OffsetSizeTrait> OffsetBuffer<I> {
dict_offsets: &[V],
dict_values: &[u8],
) -> Result<()> {
+ self.offsets.reserve(keys.len());
+
for key in keys {
let index = key.as_usize();
if index + 1 >= dict_offsets.len() {
@@ -105,7 +109,11 @@ impl<I: OffsetSizeTrait> OffsetBuffer<I> {
let end_offset = dict_offsets[index + 1].as_usize();
// Dictionary values are verified when decoding dictionary page
- self.try_push(&dict_values[start_offset..end_offset], false)?;
+ self.values
+ .extend_from_slice(&dict_values[start_offset..end_offset]);
+ let index_offset = I::from_usize(self.values.len())
+ .ok_or_else(|| general_err!("index overflow decoding byte
array"))?;
+ self.offsets.push(index_offset);
}
Ok(())
}
@@ -139,6 +147,10 @@ impl<I: OffsetSizeTrait> OffsetBuffer<I> {
}
impl<I: OffsetSizeTrait> ValuesBuffer for OffsetBuffer<I> {
+ fn with_capacity(capacity: usize) -> Self {
+ Self::with_capacity(capacity)
+ }
+
fn pad_nulls(
&mut self,
read_offset: usize,
@@ -195,7 +207,7 @@ mod tests {
#[test]
fn test_offset_buffer_empty() {
- let buffer = OffsetBuffer::<i32>::default();
+ let buffer = OffsetBuffer::<i32>::with_capacity(0);
let array = buffer.into_array(None, ArrowType::Utf8);
let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!(strings.len(), 0);
@@ -203,7 +215,7 @@ mod tests {
#[test]
fn test_offset_buffer_append() {
- let mut buffer = OffsetBuffer::<i64>::default();
+ let mut buffer = OffsetBuffer::<i64>::with_capacity(0);
buffer.try_push("hello".as_bytes(), true).unwrap();
buffer.try_push("bar".as_bytes(), true).unwrap();
buffer
@@ -220,11 +232,11 @@ mod tests {
#[test]
fn test_offset_buffer() {
- let mut buffer = OffsetBuffer::<i32>::default();
+ let mut buffer = OffsetBuffer::<i32>::with_capacity(0);
for v in ["hello", "world", "cupcakes", "a", "b", "c"] {
buffer.try_push(v.as_bytes(), false).unwrap()
}
- let split = std::mem::take(&mut buffer);
+ let split = std::mem::replace(&mut buffer,
OffsetBuffer::with_capacity(0));
let array = split.into_array(None, ArrowType::Utf8);
let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
@@ -244,7 +256,7 @@ mod tests {
#[test]
fn test_offset_buffer_pad_nulls() {
- let mut buffer = OffsetBuffer::<i32>::default();
+ let mut buffer = OffsetBuffer::<i32>::with_capacity(0);
let values = ["a", "b", "c", "def", "gh"];
for v in &values {
buffer.try_push(v.as_bytes(), false).unwrap()
@@ -287,7 +299,7 @@ mod tests {
let valid_4_byte_utf8 = &[0b11110010, 0b10101000, 0b10101001,
0b10100101];
std::str::from_utf8(valid_4_byte_utf8).unwrap();
- let mut buffer = OffsetBuffer::<i32>::default();
+ let mut buffer = OffsetBuffer::<i32>::with_capacity(0);
buffer.try_push(valid_2_byte_utf8, true).unwrap();
buffer.try_push(valid_3_byte_utf8, true).unwrap();
buffer.try_push(valid_4_byte_utf8, true).unwrap();
@@ -320,7 +332,7 @@ mod tests {
#[test]
fn test_pad_nulls_empty() {
- let mut buffer = OffsetBuffer::<i32>::default();
+ let mut buffer = OffsetBuffer::<i32>::with_capacity(0);
let valid_mask = Buffer::from_iter(std::iter::repeat_n(false, 9));
buffer.pad_nulls(0, 0, 9, valid_mask.as_slice());
diff --git a/parquet/src/arrow/buffer/view_buffer.rs
b/parquet/src/arrow/buffer/view_buffer.rs
index a93674663f..b1bdeb64e5 100644
--- a/parquet/src/arrow/buffer/view_buffer.rs
+++ b/parquet/src/arrow/buffer/view_buffer.rs
@@ -33,6 +33,14 @@ pub struct ViewBuffer {
}
impl ViewBuffer {
+ /// Create a new ViewBuffer with capacity for the specified number of views
+ pub fn with_capacity(capacity: usize) -> Self {
+ Self {
+ views: Vec::with_capacity(capacity),
+ buffers: Vec::new(),
+ }
+ }
+
pub fn is_empty(&self) -> bool {
self.views.is_empty()
}
@@ -43,15 +51,6 @@ impl ViewBuffer {
block_id
}
- /// Directly append a view to the view array.
- /// This is used when we create a StringViewArray from a dictionary whose
values are StringViewArray.
- ///
- /// # Safety
- /// The `view` must be a valid view as per the ByteView spec.
- pub unsafe fn append_raw_view_unchecked(&mut self, view: u128) {
- self.views.push(view);
- }
-
/// Converts this into an [`ArrayRef`] with the provided `data_type` and
`null_buffer`
pub fn into_array(self, null_buffer: Option<Buffer>, data_type:
&ArrowType) -> ArrayRef {
let len = self.views.len();
@@ -72,6 +71,10 @@ impl ViewBuffer {
}
impl ValuesBuffer for ViewBuffer {
+ fn with_capacity(capacity: usize) -> Self {
+ Self::with_capacity(capacity)
+ }
+
fn pad_nulls(
&mut self,
read_offset: usize,
@@ -94,7 +97,7 @@ mod tests {
#[test]
fn test_view_buffer_empty() {
- let buffer = ViewBuffer::default();
+ let buffer = ViewBuffer::with_capacity(0);
let array = buffer.into_array(None, &ArrowType::Utf8View);
let strings = array
.as_any()
@@ -105,16 +108,14 @@ mod tests {
#[test]
fn test_view_buffer_append_view() {
- let mut buffer = ViewBuffer::default();
+ let mut buffer = ViewBuffer::with_capacity(0);
let data = b"0123456789long string to test string view";
let string_buffer = Buffer::from(data);
let block_id = buffer.append_block(string_buffer);
- unsafe {
- buffer.append_raw_view_unchecked(make_view(&data[0..1], block_id,
0));
- buffer.append_raw_view_unchecked(make_view(&data[1..10], block_id,
1));
- buffer.append_raw_view_unchecked(make_view(&data[10..41],
block_id, 10));
- }
+ buffer.views.push(make_view(&data[0..1], block_id, 0));
+ buffer.views.push(make_view(&data[1..10], block_id, 1));
+ buffer.views.push(make_view(&data[10..41], block_id, 10));
let array = buffer.into_array(None, &ArrowType::Utf8View);
let string_array = array
@@ -133,16 +134,14 @@ mod tests {
#[test]
fn test_view_buffer_pad_null() {
- let mut buffer = ViewBuffer::default();
+ let mut buffer = ViewBuffer::with_capacity(0);
let data = b"0123456789long string to test string view";
let string_buffer = Buffer::from(data);
let block_id = buffer.append_block(string_buffer);
- unsafe {
- buffer.append_raw_view_unchecked(make_view(&data[0..1], block_id,
0));
- buffer.append_raw_view_unchecked(make_view(&data[1..10], block_id,
1));
- buffer.append_raw_view_unchecked(make_view(&data[10..41],
block_id, 10));
- }
+ buffer.views.push(make_view(&data[0..1], block_id, 0));
+ buffer.views.push(make_view(&data[1..10], block_id, 1));
+ buffer.views.push(make_view(&data[10..41], block_id, 10));
let valid = [true, false, false, true, false, false, true];
let valid_mask = Buffer::from_iter(valid.iter().copied());
diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs
b/parquet/src/arrow/push_decoder/reader_builder/mod.rs
index 922d8070c0..a95be9d87d 100644
--- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs
+++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs
@@ -438,6 +438,7 @@ impl RowGroupReaderBuilder {
let cache_options = filter_info.cache_builder().producer();
let array_reader = ArrayReaderBuilder::new(&row_group,
&self.metrics)
+ .with_batch_size(self.batch_size)
.with_cache_options(Some(&cache_options))
.with_parquet_metadata(&self.metadata)
.build_array_reader(self.fields.as_deref(),
predicate.projection())?;
@@ -614,6 +615,7 @@ impl RowGroupReaderBuilder {
// if we have any cached results, connect them up
let array_reader_builder = ArrayReaderBuilder::new(&row_group,
&self.metrics)
+ .with_batch_size(self.batch_size)
.with_parquet_metadata(&self.metadata);
let array_reader = if let Some(cache_info) =
cache_info.as_ref() {
let cache_options: CacheOptions =
cache_info.builder().consumer();
diff --git a/parquet/src/arrow/record_reader/buffer.rs
b/parquet/src/arrow/record_reader/buffer.rs
index 880407a547..6e0855dda3 100644
--- a/parquet/src/arrow/record_reader/buffer.rs
+++ b/parquet/src/arrow/record_reader/buffer.rs
@@ -18,7 +18,13 @@
use crate::arrow::buffer::bit_util::iter_set_bits_rev;
/// A buffer that supports padding with nulls
-pub trait ValuesBuffer: Default {
+pub trait ValuesBuffer {
+ /// Create a new buffer with capacity for at least `capacity` elements
+ ///
+ /// This allows pre-allocating buffers to avoid reallocations during
reading,
+ /// improving performance when the number of values is known in advance.
+ fn with_capacity(capacity: usize) -> Self;
+
/// If a column contains nulls, more level data may be read than value
data, as null
/// values are not encoded. Therefore, first the levels data is read, the
null count
/// determined, and then the corresponding number of values read to a
[`ValuesBuffer`].
@@ -43,6 +49,10 @@ pub trait ValuesBuffer: Default {
}
impl<T: Copy + Default> ValuesBuffer for Vec<T> {
+ fn with_capacity(capacity: usize) -> Self {
+ Vec::with_capacity(capacity)
+ }
+
fn pad_nulls(
&mut self,
read_offset: usize,
diff --git a/parquet/src/arrow/record_reader/mod.rs
b/parquet/src/arrow/record_reader/mod.rs
index 2092b4972d..a33b489c62 100644
--- a/parquet/src/arrow/record_reader/mod.rs
+++ b/parquet/src/arrow/record_reader/mod.rs
@@ -50,7 +50,9 @@ pub(crate) type ColumnReader<CV> =
pub struct GenericRecordReader<V, CV> {
column_desc: ColumnDescPtr,
- values: V,
+ /// Values buffer, lazily initialized on first read to avoid
+ /// allocating a buffer that may never be used (e.g., after the last batch)
+ values: Option<V>,
def_levels: Option<DefinitionLevelBuffer>,
rep_levels: Option<Vec<i16>>,
column_reader: Option<ColumnReader<CV>>,
@@ -58,6 +60,8 @@ pub struct GenericRecordReader<V, CV> {
num_values: usize,
/// Number of buffered records
num_records: usize,
+ /// Capacity hint for pre-allocating buffers based on batch size
+ capacity_hint: usize,
}
impl<V, CV> GenericRecordReader<V, CV>
@@ -66,20 +70,25 @@ where
CV: ColumnValueDecoder<Buffer = V>,
{
/// Create a new [`GenericRecordReader`]
- pub fn new(desc: ColumnDescPtr) -> Self {
+ ///
+ /// The capacity is used to pre-allocate internal buffers, avoiding
reallocations
+ /// when reading the first batch of data. For optimal performance, set
this to
+ /// the expected batch size.
+ pub fn new(desc: ColumnDescPtr, capacity: usize) -> Self {
let def_levels = (desc.max_def_level() > 0)
.then(|| DefinitionLevelBuffer::new(&desc,
packed_null_mask(&desc)));
let rep_levels = (desc.max_rep_level() > 0).then(Vec::new);
Self {
- values: V::default(),
+ values: None, // Lazily initialized on first read
def_levels,
rep_levels,
column_reader: None,
column_desc: desc,
num_values: 0,
num_records: 0,
+ capacity_hint: capacity,
}
}
@@ -169,7 +178,9 @@ where
/// Returns currently stored buffer data.
/// The side effect is similar to `consume_def_levels`.
pub fn consume_record_data(&mut self) -> V {
- std::mem::take(&mut self.values)
+ // Take the buffer, leaving None. The next read will lazily allocate a
new buffer.
+ // This avoids allocating a buffer that may never be used (e.g., after
the last batch).
+ self.values.take().unwrap_or_else(|| V::with_capacity(0))
}
/// Returns currently stored null bitmap data for nullable columns.
@@ -208,12 +219,23 @@ where
/// Try to read one batch of data returning the number of records read
fn read_one_batch(&mut self, batch_size: usize) -> Result<usize> {
+ // Update capacity hint to the largest batch size seen
+ if batch_size > self.capacity_hint {
+ self.capacity_hint = batch_size;
+ }
+
+ // Lazily initialize buffer on first read
+ let capacity_hint = self.capacity_hint;
+ let values = self
+ .values
+ .get_or_insert_with(|| V::with_capacity(capacity_hint));
+
let (records_read, values_read, levels_read) =
self.column_reader.as_mut().unwrap().read_records(
batch_size,
self.def_levels.as_mut(),
self.rep_levels.as_mut(),
- &mut self.values,
+ values,
)?;
if values_read < levels_read {
@@ -221,7 +243,7 @@ where
general_err!("Definition levels should exist when data is less
than levels!")
})?;
- self.values.pad_nulls(
+ values.pad_nulls(
self.num_values,
values_read,
levels_read,
@@ -248,6 +270,7 @@ mod tests {
use arrow::buffer::Buffer;
+ use crate::arrow::arrow_reader::DEFAULT_BATCH_SIZE;
use crate::basic::Encoding;
use crate::data_type::Int32Type;
use crate::schema::parser::parse_message_type;
@@ -272,7 +295,7 @@ mod tests {
.unwrap();
// Construct record reader
- let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());
+ let mut record_reader = RecordReader::<Int32Type>::new(desc.clone(),
DEFAULT_BATCH_SIZE);
// First page
@@ -345,7 +368,7 @@ mod tests {
.unwrap();
// Construct record reader
- let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());
+ let mut record_reader = RecordReader::<Int32Type>::new(desc.clone(),
DEFAULT_BATCH_SIZE);
// First page
@@ -447,7 +470,7 @@ mod tests {
.unwrap();
// Construct record reader
- let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());
+ let mut record_reader = RecordReader::<Int32Type>::new(desc.clone(),
DEFAULT_BATCH_SIZE);
// First page
@@ -550,7 +573,7 @@ mod tests {
.unwrap();
// Construct record reader
- let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());
+ let mut record_reader = RecordReader::<Int32Type>::new(desc.clone(),
DEFAULT_BATCH_SIZE);
{
let values = [100; 5000];
@@ -600,7 +623,7 @@ mod tests {
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();
- let mut record_reader = RecordReader::<Int32Type>::new(desc);
+ let mut record_reader = RecordReader::<Int32Type>::new(desc,
DEFAULT_BATCH_SIZE);
let page_reader =
Box::new(InMemoryPageReader::new(vec![page.clone()]));
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(record_reader.read_records(4).unwrap(), 4);
@@ -639,7 +662,7 @@ mod tests {
.unwrap();
// Construct record reader
- let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());
+ let mut record_reader = RecordReader::<Int32Type>::new(desc.clone(),
DEFAULT_BATCH_SIZE);
// First page
@@ -713,7 +736,7 @@ mod tests {
.unwrap();
// Construct record reader
- let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());
+ let mut record_reader = RecordReader::<Int32Type>::new(desc.clone(),
DEFAULT_BATCH_SIZE);
// First page