This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 98f84503ea Docs: add example of how to read parquet row groups in
parallel (#9396)
98f84503ea is described below
commit 98f84503ea2d22f8182a627968e98d6c2520cd37
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Apr 22 00:21:30 2026 -0400
Docs: add example of how to read parquet row groups in parallel (#9396)
# Which issue does this PR close?
- Closes https://github.com/apache/arrow-rs/issues/9381
# Rationale for this change
It is possible to read a parquet file in parallel today using the
arrow-rs APIs (making an individual reader to read individual parts),
however, it is not always obvious how to do so as @pmarks observes on
https://github.com/apache/arrow-rs/issues/9381
# What changes are included in this PR?
Add additional documentation explaining how to read files in parallel,
along with a doc example
Here is an example of what it looks like rendered:
<img width="1054" height="1050" alt="Screenshot 2026-02-11 at 5 33
02 PM"
src="https://github.com/user-attachments/assets/abfbbb8f-24de-427e-97dd-71977540255f"
/>
# Are these changes tested?
By CI
# Are there any user-facing changes?
more docs; No functional changes
---
parquet/src/arrow/async_reader/mod.rs | 96 ++++++++++++++++++++++++++++++++++-
1 file changed, 94 insertions(+), 2 deletions(-)
diff --git a/parquet/src/arrow/async_reader/mod.rs
b/parquet/src/arrow/async_reader/mod.rs
index 9e45a0c316..6c7890b75c 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -215,7 +215,14 @@ pub struct AsyncReader<T>(T);
/// to use this information to select what specific columns, row groups, etc.
/// they wish to be read by the resulting stream.
///
-/// See examples on [`ParquetRecordBatchStreamBuilder::new`]
+/// See examples on [`ParquetRecordBatchStreamBuilder::new`], including how to
+/// issue multiple I/O requests in parallel using multiple streams.
+///
+/// # See also:
+/// * [`ParquetPushDecoderBuilder`] for lower level control over buffering and
+/// decoding.
+/// * [`ParquetRecordBatchStream::next_row_group`] for I/O prefetching
+///
///
/// See [`ArrowReaderBuilder`] for additional member functions
pub type ParquetRecordBatchStreamBuilder<T> =
ArrowReaderBuilder<AsyncReader<T>>;
@@ -224,6 +231,11 @@ impl<T: AsyncFileReader + Send + 'static>
ParquetRecordBatchStreamBuilder<T> {
/// Create a new [`ParquetRecordBatchStreamBuilder`] for reading from the
/// specified source.
///
+ /// # Examples:
+ /// * [Basic example reading from an async source](#example)
+ /// * [Configuring options and reading
metadata](#example-configuring-options-and-reading-metadata)
+ /// * [Reading Row Groups in
Parallel](#example-reading-row-groups-in-parallel)
+ ///
/// # Example
/// ```
/// # #[tokio::main(flavor="current_thread")]
@@ -282,7 +294,7 @@ impl<T: AsyncFileReader + Send + 'static>
ParquetRecordBatchStreamBuilder<T> {
/// # }
/// ```
///
- /// # Example configuring options and reading metadata
+ /// # Example Configuring Options and Reading Metadata
///
/// There are many options that control the behavior of the reader, such as
/// `with_batch_size`, `with_projection`, `with_filter`, etc...
@@ -351,6 +363,86 @@ impl<T: AsyncFileReader + Send + 'static>
ParquetRecordBatchStreamBuilder<T> {
/// assert_eq!(results.len(), 3);
/// # }
/// ```
+ ///
+ /// # Example reading Row Groups in Parallel
+ ///
+ /// Each [`ParquetRecordBatchStream`] is independent and can be used to
read
+ /// from the same underlying source in parallel. Use
+ /// [`ParquetRecordBatchStream::next_row_group`] with a single stream to
+ /// begin prefetching the next Row Group. To read a file in parallel,
create
+ /// a stream for each subset of the file. For example, you can read each
+ /// row group in parallel by creating a stream for each row group using the
+ /// [`ParquetRecordBatchStreamBuilder::with_row_groups`] API as shown below
+ ///
+ /// ```
+ /// # use std::sync::Arc;
+ /// # use arrow_array::{ArrayRef, Int32Array, RecordBatch};
+ /// # use arrow::util::pretty::pretty_format_batches;
+ /// # use futures::{StreamExt, TryStreamExt};
+ /// # use tempfile::NamedTempFile;
+ /// # use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder,
ProjectionMask};
+ /// # use parquet::arrow::arrow_reader::{ArrowReaderMetadata,
ArrowReaderOptions};
+ /// # use parquet::file::metadata::ParquetMetaDataReader;
+ /// # use parquet::file::properties::{WriterProperties};
+ /// # // write to a temporary file with 10 RowGroups and read back with
async API
+ /// # fn write_file() -> parquet::errors::Result<NamedTempFile> {
+ /// # let mut file = NamedTempFile::new().unwrap();
+ /// # let small_batch = RecordBatch::try_from_iter([
+ /// # ("id", Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4])) as
ArrayRef),
+ /// # ]).unwrap();
+ /// # let props = WriterProperties::builder()
+ /// # .set_max_row_group_row_count(Some(5))
+ /// # .set_write_batch_size(5)
+ /// # .build();
+ /// # let mut writer = ArrowWriter::try_new(&mut file,
small_batch.schema(), Some(props))?;
+ /// # for i in 0..10 {
+ /// # writer.write(&small_batch)?
+ /// # };
+ /// # writer.close()?;
+ /// # Ok(file)
+ /// # }
+ /// # #[tokio::main(flavor="current_thread")]
+ /// # async fn main() -> parquet::errors::Result<()> {
+ /// # let t = write_file()?;
+ /// # let path = t.path();
+ /// // This example uses a tokio::fs::File as the async source, but it
+ /// // could be any async source such as an object store reader)
+ /// let mut file = tokio::fs::File::open(path).await?;
+ /// // To read Row Groups in parallel, create a separate stream builder
for each Row Group.
+ /// // First get the metadata to find the row group information
+ /// let file_size = file.metadata().await?.len();
+ /// let metadata = ParquetMetaDataReader::new().load_and_finish(&mut file,
file_size).await?;
+ /// assert_eq!(metadata.num_row_groups(), 10); // file has 10 row groups
with 5 rows each
+ /// // Create a stream reader for each row group
+ /// let reader_metadata = ArrowReaderMetadata::try_new(
+ /// Arc::new(metadata),
+ /// ArrowReaderOptions::new()
+ /// )?;
+ /// let mut streams = vec![];
+ /// for row_group_index in 0..10 {
+ /// // Each stream needs its own source instance to issue
+ /// // parallel IO requests, so clone the file for each stream
+ /// let this_file = file.try_clone().await?;
+ /// let stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
+ /// this_file,
+ /// reader_metadata.clone()
+ /// )
+ /// .with_row_groups(vec![row_group_index]) // read only this row
group
+ /// .build()?;
+ /// streams.push(stream);
+ /// }
+ /// // Each reader can now be polled independently and in parallel, for
+ /// // example using StreamExt::buffered to read from 3 at a time
+ /// let results = futures::stream::iter(streams)
+ /// .map(|stream| async move { stream })
+ /// .buffered(3)
+ /// .flatten()
+ /// .try_collect::<Vec<_>>().await?;
+ /// // read all 50 rows (10 row groups x 5 rows per group)
+ /// assert_eq!(50, results.iter().map(|s| s.num_rows()).sum::<usize>());
+ /// # Ok(())
+ /// # }
+ /// ```
pub async fn new(input: T) -> Result<Self> {
Self::new_with_options(input, Default::default()).await
}