This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new ea19ce86d6 Separate metadata fetch from `ArrowReaderBuilder`
construction (#4674) (#4676)
ea19ce86d6 is described below
commit ea19ce86d6c12f837eecc901cc66f7dd96883d48
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Thu Aug 10 17:40:49 2023 +0100
Separate metadata fetch from `ArrowReaderBuilder` construction (#4674)
(#4676)
* Separate metadata fetch from builder construction (#4674)
* Clippy
* Docs tweaks
* Wrap ParquetField in Arc
* Move load to ArrowReaderMetadata
---
parquet/src/arrow/array_reader/mod.rs | 43 ------
parquet/src/arrow/arrow_reader/mod.rs | 262 ++++++++++++++++++++++++++++------
parquet/src/arrow/async_reader/mod.rs | 67 ++++++---
parquet/src/file/metadata.rs | 4 +-
parquet/src/file/serialized_reader.rs | 5 -
5 files changed, 265 insertions(+), 116 deletions(-)
diff --git a/parquet/src/arrow/array_reader/mod.rs
b/parquet/src/arrow/array_reader/mod.rs
index 1e781fb73c..625ac034ef 100644
--- a/parquet/src/arrow/array_reader/mod.rs
+++ b/parquet/src/arrow/array_reader/mod.rs
@@ -118,49 +118,6 @@ impl RowGroups for Arc<dyn FileReader> {
}
}
-pub(crate) struct FileReaderRowGroups {
- /// The underling file reader
- reader: Arc<dyn FileReader>,
- /// Optional list of row group indices to scan
- row_groups: Option<Vec<usize>>,
-}
-
-impl FileReaderRowGroups {
- /// Creates a new [`RowGroups`] from a `FileReader` and an optional
- /// list of row group indexes to scan
- pub fn new(reader: Arc<dyn FileReader>, row_groups: Option<Vec<usize>>) ->
Self {
- Self { reader, row_groups }
- }
-}
-
-impl RowGroups for FileReaderRowGroups {
- fn num_rows(&self) -> usize {
- match &self.row_groups {
- None => self.reader.metadata().file_metadata().num_rows() as usize,
- Some(row_groups) => {
- let meta = self.reader.metadata().row_groups();
- row_groups
- .iter()
- .map(|x| meta[*x].num_rows() as usize)
- .sum()
- }
- }
- }
-
- fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
- let iterator = match &self.row_groups {
- Some(row_groups) => FilePageIterator::with_row_groups(
- i,
- Box::new(row_groups.clone().into_iter()),
- Arc::clone(&self.reader),
- )?,
- None => FilePageIterator::new(i, Arc::clone(&self.reader))?,
- };
-
- Ok(Box::new(iterator))
- }
-}
-
/// Uses `record_reader` to read up to `batch_size` records from `pages`
///
/// Returns the number of records read, which can be less than `batch_size` if
diff --git a/parquet/src/arrow/arrow_reader/mod.rs
b/parquet/src/arrow/arrow_reader/mod.rs
index 988738dac6..7e4423b864 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -26,19 +26,21 @@ use arrow_array::{RecordBatch, RecordBatchReader};
use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
use arrow_select::filter::prep_null_mask_filter;
-use crate::arrow::array_reader::{build_array_reader, ArrayReader,
FileReaderRowGroups};
+use crate::arrow::array_reader::{build_array_reader, ArrayReader};
use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField};
use crate::arrow::{FieldLevels, ProjectionMask};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::ParquetMetaData;
-use crate::file::reader::{ChunkReader, SerializedFileReader};
-use crate::file::serialized_reader::ReadOptionsBuilder;
+use crate::file::reader::{ChunkReader, SerializedPageReader};
use crate::schema::types::SchemaDescriptor;
mod filter;
mod selection;
pub use crate::arrow::array_reader::RowGroups;
+use crate::column::page::{PageIterator, PageReader};
+use crate::file::footer;
+use crate::file::page_index::index_reader;
pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
pub use selection::{RowSelection, RowSelector};
@@ -57,7 +59,7 @@ pub struct ArrowReaderBuilder<T> {
pub(crate) schema: SchemaRef,
- pub(crate) fields: Option<ParquetField>,
+ pub(crate) fields: Option<Arc<ParquetField>>,
pub(crate) batch_size: usize,
@@ -75,27 +77,12 @@ pub struct ArrowReaderBuilder<T> {
}
impl<T> ArrowReaderBuilder<T> {
- pub(crate) fn new_builder(
- input: T,
- metadata: Arc<ParquetMetaData>,
- options: ArrowReaderOptions,
- ) -> Result<Self> {
- let kv_metadata = match options.skip_arrow_metadata {
- true => None,
- false => metadata.file_metadata().key_value_metadata(),
- };
-
- let (schema, fields) = parquet_to_arrow_schema_and_fields(
- metadata.file_metadata().schema_descr(),
- ProjectionMask::all(),
- kv_metadata,
- )?;
-
- Ok(Self {
+ pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self
{
+ Self {
input,
- metadata,
- schema: Arc::new(schema),
- fields,
+ metadata: metadata.metadata,
+ schema: metadata.schema,
+ fields: metadata.fields,
batch_size: 1024,
row_groups: None,
projection: ProjectionMask::all(),
@@ -103,7 +90,7 @@ impl<T> ArrowReaderBuilder<T> {
selection: None,
limit: None,
offset: None,
- })
+ }
}
/// Returns a reference to the [`ParquetMetaData`] for this parquet file
@@ -234,48 +221,184 @@ impl ArrowReaderOptions {
}
}
+/// The cheaply clone-able metadata necessary to construct a
[`ArrowReaderBuilder`]
+///
+/// This allows loading the metadata for a file once and then using this to
construct
+/// multiple separate readers, for example, to distribute readers across
multiple threads
+#[derive(Debug, Clone)]
+pub struct ArrowReaderMetadata {
+ pub(crate) metadata: Arc<ParquetMetaData>,
+
+ pub(crate) schema: SchemaRef,
+
+ pub(crate) fields: Option<Arc<ParquetField>>,
+}
+
+impl ArrowReaderMetadata {
+ /// Loads [`ArrowReaderMetadata`] from the provided [`ChunkReader`]
+ ///
+ /// See [`ParquetRecordBatchReaderBuilder::new_with_metadata`] for how
this can be used
+ pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) ->
Result<Self> {
+ let mut metadata = footer::parse_metadata(reader)?;
+ if options.page_index {
+ let column_index = metadata
+ .row_groups()
+ .iter()
+ .map(|rg| index_reader::read_columns_indexes(reader,
rg.columns()))
+ .collect::<Result<Vec<_>>>()?;
+ metadata.set_column_index(Some(column_index));
+
+ let offset_index = metadata
+ .row_groups()
+ .iter()
+ .map(|rg| index_reader::read_pages_locations(reader,
rg.columns()))
+ .collect::<Result<Vec<_>>>()?;
+
+ metadata.set_offset_index(Some(offset_index))
+ }
+ Self::try_new(Arc::new(metadata), options)
+ }
+
+ pub(crate) fn try_new(
+ metadata: Arc<ParquetMetaData>,
+ options: ArrowReaderOptions,
+ ) -> Result<Self> {
+ let kv_metadata = match options.skip_arrow_metadata {
+ true => None,
+ false => metadata.file_metadata().key_value_metadata(),
+ };
+
+ let (schema, fields) = parquet_to_arrow_schema_and_fields(
+ metadata.file_metadata().schema_descr(),
+ ProjectionMask::all(),
+ kv_metadata,
+ )?;
+
+ Ok(Self {
+ metadata,
+ schema: Arc::new(schema),
+ fields: fields.map(Arc::new),
+ })
+ }
+
+ /// Returns a reference to the [`ParquetMetaData`] for this parquet file
+ pub fn metadata(&self) -> &Arc<ParquetMetaData> {
+ &self.metadata
+ }
+
+ /// Returns the parquet [`SchemaDescriptor`] for this parquet file
+ pub fn parquet_schema(&self) -> &SchemaDescriptor {
+ self.metadata.file_metadata().schema_descr()
+ }
+
+ /// Returns the arrow [`SchemaRef`] for this parquet file
+ pub fn schema(&self) -> &SchemaRef {
+ &self.schema
+ }
+}
+
#[doc(hidden)]
/// A newtype used within [`ReaderOptionsBuilder`] to distinguish sync readers
from async
-pub struct SyncReader<T: ChunkReader>(SerializedFileReader<T>);
+pub struct SyncReader<T: ChunkReader>(T);
/// A synchronous builder used to construct [`ParquetRecordBatchReader`] for a
file
///
/// For an async API see
[`crate::arrow::async_reader::ParquetRecordBatchStreamBuilder`]
pub type ParquetRecordBatchReaderBuilder<T> =
ArrowReaderBuilder<SyncReader<T>>;
-impl<T: ChunkReader + 'static> ArrowReaderBuilder<SyncReader<T>> {
+impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
/// Create a new [`ParquetRecordBatchReaderBuilder`]
+ ///
+ /// ```
+ /// # use std::sync::Arc;
+ /// # use bytes::Bytes;
+ /// # use arrow_array::{Int32Array, RecordBatch};
+ /// # use arrow_schema::{DataType, Field, Schema};
+ /// # use parquet::arrow::arrow_reader::{ParquetRecordBatchReader,
ParquetRecordBatchReaderBuilder};
+ /// # use parquet::arrow::ArrowWriter;
+ /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
+ /// # let schema = Arc::new(Schema::new(vec![Field::new("i32",
DataType::Int32, false)]));
+ /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(),
None).unwrap();
+ /// # let batch = RecordBatch::try_new(schema,
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
+ /// # writer.write(&batch).unwrap();
+ /// # writer.close().unwrap();
+ /// # let file = Bytes::from(file);
+ /// #
+ /// let mut builder =
ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
+ ///
+ /// // Inspect metadata
+ /// assert_eq!(builder.metadata().num_row_groups(), 1);
+ ///
+ /// // Construct reader
+ /// let mut reader: ParquetRecordBatchReader =
builder.with_row_groups(vec![0]).build().unwrap();
+ ///
+ /// // Read data
+ /// let _batch = reader.next().unwrap().unwrap();
+ /// ```
pub fn try_new(reader: T) -> Result<Self> {
Self::try_new_with_options(reader, Default::default())
}
/// Create a new [`ParquetRecordBatchReaderBuilder`] with
[`ArrowReaderOptions`]
pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) ->
Result<Self> {
- let reader = match options.page_index {
- true => {
- let read_options =
ReadOptionsBuilder::new().with_page_index().build();
- SerializedFileReader::new_with_options(reader, read_options)?
- }
- false => SerializedFileReader::new(reader)?,
- };
+ let metadata = ArrowReaderMetadata::load(&reader, options)?;
+ Ok(Self::new_with_metadata(reader, metadata))
+ }
- let metadata = Arc::clone(reader.metadata_ref());
- Self::new_builder(SyncReader(reader), metadata, options)
+ /// Create a [`ParquetRecordBatchReaderBuilder`] from the provided
[`ArrowReaderMetadata`]
+ ///
+ /// This allows loading metadata once and using it to create multiple
builders with
+ /// potentially different settings
+ ///
+ /// ```
+ /// # use std::fs::metadata;
+ /// # use std::sync::Arc;
+ /// # use bytes::Bytes;
+ /// # use arrow_array::{Int32Array, RecordBatch};
+ /// # use arrow_schema::{DataType, Field, Schema};
+ /// # use parquet::arrow::arrow_reader::{ArrowReaderMetadata,
ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
+ /// # use parquet::arrow::ArrowWriter;
+ /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
+ /// # let schema = Arc::new(Schema::new(vec![Field::new("i32",
DataType::Int32, false)]));
+ /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(),
None).unwrap();
+ /// # let batch = RecordBatch::try_new(schema,
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
+ /// # writer.write(&batch).unwrap();
+ /// # writer.close().unwrap();
+ /// # let file = Bytes::from(file);
+ /// #
+ /// let metadata = ArrowReaderMetadata::load(&file,
Default::default()).unwrap();
+ /// let mut a =
ParquetRecordBatchReaderBuilder::new_with_metadata(file.clone(),
metadata.clone()).build().unwrap();
+ /// let mut b = ParquetRecordBatchReaderBuilder::new_with_metadata(file,
metadata).build().unwrap();
+ ///
+ /// // Should be able to read from both in parallel
+ /// assert_eq!(a.next().unwrap().unwrap(), b.next().unwrap().unwrap());
+ /// ```
+ pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
+ Self::new_builder(SyncReader(input), metadata)
}
/// Build a [`ParquetRecordBatchReader`]
///
/// Note: this will eagerly evaluate any `RowFilter` before returning
pub fn build(self) -> Result<ParquetRecordBatchReader> {
- let reader = FileReaderRowGroups::new(Arc::new(self.input.0),
self.row_groups);
-
- let mut filter = self.filter;
- let mut selection = self.selection;
-
// Try to avoid allocate large buffer
let batch_size = self
.batch_size
.min(self.metadata.file_metadata().num_rows() as usize);
+
+ let row_groups = self
+ .row_groups
+ .unwrap_or_else(|| (0..self.metadata.num_row_groups()).collect());
+
+ let reader = ReaderRowGroups {
+ reader: Arc::new(self.input.0),
+ metadata: self.metadata,
+ row_groups,
+ };
+
+ let mut filter = self.filter;
+ let mut selection = self.selection;
+
if let Some(filter) = filter.as_mut() {
for predicate in filter.predicates.iter_mut() {
if !selects_any(selection.as_ref()) {
@@ -283,7 +406,7 @@ impl<T: ChunkReader + 'static>
ArrowReaderBuilder<SyncReader<T>> {
}
let array_reader = build_array_reader(
- self.fields.as_ref(),
+ self.fields.as_deref(),
predicate.projection(),
&reader,
)?;
@@ -298,7 +421,7 @@ impl<T: ChunkReader + 'static>
ArrowReaderBuilder<SyncReader<T>> {
}
let array_reader =
- build_array_reader(self.fields.as_ref(), &self.projection,
&reader)?;
+ build_array_reader(self.fields.as_deref(), &self.projection,
&reader)?;
// If selection is empty, truncate
if !selects_any(selection.as_ref()) {
@@ -313,6 +436,59 @@ impl<T: ChunkReader + 'static>
ArrowReaderBuilder<SyncReader<T>> {
}
}
+struct ReaderRowGroups<T: ChunkReader> {
+ reader: Arc<T>,
+
+ metadata: Arc<ParquetMetaData>,
+ /// Optional list of row group indices to scan
+ row_groups: Vec<usize>,
+}
+
+impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
+ fn num_rows(&self) -> usize {
+ let meta = self.metadata.row_groups();
+ self.row_groups
+ .iter()
+ .map(|x| meta[*x].num_rows() as usize)
+ .sum()
+ }
+
+ fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
+ Ok(Box::new(ReaderPageIterator {
+ column_idx: i,
+ reader: self.reader.clone(),
+ metadata: self.metadata.clone(),
+ row_groups: self.row_groups.clone().into_iter(),
+ }))
+ }
+}
+
+struct ReaderPageIterator<T: ChunkReader> {
+ reader: Arc<T>,
+ column_idx: usize,
+ row_groups: std::vec::IntoIter<usize>,
+ metadata: Arc<ParquetMetaData>,
+}
+
+impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
+ type Item = Result<Box<dyn PageReader>>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ let rg_idx = self.row_groups.next()?;
+ let rg = self.metadata.row_group(rg_idx);
+ let meta = rg.column(self.column_idx);
+ let offset_index = self.metadata.offset_index();
+ let page_locations = offset_index.map(|i|
i[rg_idx][self.column_idx].clone());
+ let total_rows = rg.num_rows() as usize;
+ let reader = self.reader.clone();
+
+ let ret = SerializedPageReader::new(reader, meta, total_rows,
page_locations);
+ Some(ret.map(|x| Box::new(x) as _))
+ }
+}
+
+impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
+
/// An `Iterator<Item = ArrowResult<RecordBatch>>` that yields [`RecordBatch`]
/// read from a parquet data source
pub struct ParquetRecordBatchReader {
diff --git a/parquet/src/arrow/async_reader/mod.rs
b/parquet/src/arrow/async_reader/mod.rs
index f17fb0751d..c7e0f64783 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -96,8 +96,9 @@ use arrow_schema::SchemaRef;
use crate::arrow::array_reader::{build_array_reader, RowGroups};
use crate::arrow::arrow_reader::{
- apply_range, evaluate_predicate, selects_any, ArrowReaderBuilder,
ArrowReaderOptions,
- ParquetRecordBatchReader, RowFilter, RowSelection,
+ apply_range, evaluate_predicate, selects_any, ArrowReaderBuilder,
+ ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
RowFilter,
+ RowSelection,
};
use crate::arrow::ProjectionMask;
@@ -205,6 +206,29 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send>
AsyncFileReader for T {
}
}
+impl ArrowReaderMetadata {
+ /// Returns a new [`ArrowReaderMetadata`] for this builder
+ ///
+ /// See [`ParquetRecordBatchStreamBuilder::new_with_metadata`] for how
this can be used
+ pub async fn load_async<T: AsyncFileReader>(
+ input: &mut T,
+ options: ArrowReaderOptions,
+ ) -> Result<Self> {
+ let mut metadata = input.get_metadata().await?;
+
+ if options.page_index
+ && metadata.column_index().is_none()
+ && metadata.offset_index().is_none()
+ {
+ let m = Arc::try_unwrap(metadata).unwrap_or_else(|e|
e.as_ref().clone());
+ let mut loader = MetadataLoader::new(input, m);
+ loader.load_page_index(true, true).await?;
+ metadata = Arc::new(loader.finish())
+ }
+ Self::try_new(metadata, options)
+ }
+}
+
#[doc(hidden)]
/// A newtype used within [`ReaderOptionsBuilder`] to distinguish sync readers
from async
///
@@ -217,33 +241,30 @@ pub struct AsyncReader<T>(T);
/// In particular, this handles reading the parquet file metadata, allowing
consumers
/// to use this information to select what specific columns, row groups, etc...
/// they wish to be read by the resulting stream
-///
pub type ParquetRecordBatchStreamBuilder<T> =
ArrowReaderBuilder<AsyncReader<T>>;
-impl<T: AsyncFileReader + Send + 'static> ArrowReaderBuilder<AsyncReader<T>> {
+impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
/// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided
parquet file
- pub async fn new(mut input: T) -> Result<Self> {
- let metadata = input.get_metadata().await?;
- Self::new_builder(AsyncReader(input), metadata, Default::default())
+ pub async fn new(input: T) -> Result<Self> {
+ Self::new_with_options(input, Default::default()).await
}
+ /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided
parquet file
+ /// and [`ArrowReaderOptions`]
pub async fn new_with_options(
mut input: T,
options: ArrowReaderOptions,
) -> Result<Self> {
- let mut metadata = input.get_metadata().await?;
-
- if options.page_index
- && metadata.column_index().is_none()
- && metadata.offset_index().is_none()
- {
- let m = Arc::try_unwrap(metadata).unwrap_or_else(|e|
e.as_ref().clone());
- let mut loader = MetadataLoader::new(&mut input, m);
- loader.load_page_index(true, true).await?;
- metadata = Arc::new(loader.finish())
- }
+ let metadata = ArrowReaderMetadata::load_async(&mut input,
options).await?;
+ Ok(Self::new_with_metadata(input, metadata))
+ }
- Self::new_builder(AsyncReader(input), metadata, options)
+ /// Create a [`ParquetRecordBatchStreamBuilder`] from the provided
[`ArrowReaderMetadata`]
+ ///
+ /// This allows loading metadata once and using it to create multiple
builders with
+ /// potentially different settings
+ pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
+ Self::new_builder(AsyncReader(input), metadata)
}
/// Build a new [`ParquetRecordBatchStream`]
@@ -297,7 +318,7 @@ type ReadResult<T> = Result<(ReaderFactory<T>,
Option<ParquetRecordBatchReader>)
struct ReaderFactory<T> {
metadata: Arc<ParquetMetaData>,
- fields: Option<ParquetField>,
+ fields: Option<Arc<ParquetField>>,
input: T,
@@ -350,7 +371,7 @@ where
.await?;
let array_reader = build_array_reader(
- self.fields.as_ref(),
+ self.fields.as_deref(),
predicate_projection,
&row_group,
)?;
@@ -403,7 +424,7 @@ where
let reader = ParquetRecordBatchReader::new(
batch_size,
- build_array_reader(self.fields.as_ref(), &projection, &row_group)?,
+ build_array_reader(self.fields.as_deref(), &projection,
&row_group)?,
selection,
);
@@ -1409,7 +1430,7 @@ mod tests {
let reader_factory = ReaderFactory {
metadata,
- fields,
+ fields: fields.map(Arc::new),
input: async_reader,
filter: None,
limit: None,
diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs
index b2a2b3eee5..a5e2de6b06 100644
--- a/parquet/src/file/metadata.rs
+++ b/parquet/src/file/metadata.rs
@@ -155,13 +155,13 @@ impl ParquetMetaData {
}
/// Override the column index
- #[allow(dead_code)]
+ #[cfg(feature = "arrow")]
pub(crate) fn set_column_index(&mut self, index:
Option<ParquetColumnIndex>) {
self.column_index = index;
}
/// Override the offset index
- #[allow(dead_code)]
+ #[cfg(feature = "arrow")]
pub(crate) fn set_offset_index(&mut self, index:
Option<ParquetOffsetIndex>) {
self.offset_index = index;
}
diff --git a/parquet/src/file/serialized_reader.rs
b/parquet/src/file/serialized_reader.rs
index f8716359e0..3dac8ee558 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -242,11 +242,6 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
})
}
}
-
- #[cfg(feature = "arrow")]
- pub(crate) fn metadata_ref(&self) -> &Arc<ParquetMetaData> {
- &self.metadata
- }
}
/// Get midpoint offset for a row group