This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 98f84503ea Docs: add example of how to read parquet row groups in 
parallel (#9396)
98f84503ea is described below

commit 98f84503ea2d22f8182a627968e98d6c2520cd37
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Apr 22 00:21:30 2026 -0400

    Docs: add example of how to read parquet row groups in parallel (#9396)
    
    # Which issue does this PR close?
    
    - Closes https://github.com/apache/arrow-rs/issues/9381
    
    # Rationale for this change
    
    It is possible to read a parquet file in parallel today using the
    arrow-rs APIs (making an individual reader to read individual parts),
    however, it is not always obvious how to do so as @pmarks observes on
    https://github.com/apache/arrow-rs/issues/9381
    
    # What changes are included in this PR?
    
    Add additional documentation explaining how to read files in parallel,
    along with a doc example
    
    Here is an example of what it looks like rendered:
    
    <img width="1054" height="1050" alt="Screenshot 2026-02-11 at 5 33
    02 PM"
    
src="https://github.com/user-attachments/assets/abfbbb8f-24de-427e-97dd-71977540255f";
    />
    
    
    # Are these changes tested?
    
    By CI
    # Are there any user-facing changes?
    more docs; No functional changes
---
 parquet/src/arrow/async_reader/mod.rs | 96 ++++++++++++++++++++++++++++++++++-
 1 file changed, 94 insertions(+), 2 deletions(-)

diff --git a/parquet/src/arrow/async_reader/mod.rs 
b/parquet/src/arrow/async_reader/mod.rs
index 9e45a0c316..6c7890b75c 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -215,7 +215,14 @@ pub struct AsyncReader<T>(T);
 /// to use this information to select what specific columns, row groups, etc.
 /// they wish to be read by the resulting stream.
 ///
-/// See examples on [`ParquetRecordBatchStreamBuilder::new`]
+/// See examples on [`ParquetRecordBatchStreamBuilder::new`], including how to
+/// issue multiple I/O requests in parallel using multiple streams.
+///
+/// # See also:
+/// * [`ParquetPushDecoderBuilder`] for lower level control over buffering and
+///   decoding.
+/// * [`ParquetRecordBatchStream::next_row_group`] for I/O prefetching
+///
 ///
 /// See [`ArrowReaderBuilder`] for additional member functions
 pub type ParquetRecordBatchStreamBuilder<T> = 
ArrowReaderBuilder<AsyncReader<T>>;
@@ -224,6 +231,11 @@ impl<T: AsyncFileReader + Send + 'static> 
ParquetRecordBatchStreamBuilder<T> {
     /// Create a new [`ParquetRecordBatchStreamBuilder`] for reading from the
     /// specified source.
     ///
+    /// # Examples:
+    /// * [Basic example reading from an async source](#example)
+    /// * [Configuring options and reading 
metadata](#example-configuring-options-and-reading-metadata)
+    /// * [Reading Row Groups in 
Parallel](#example-reading-row-groups-in-parallel)
+    ///
     /// # Example
     /// ```
     /// # #[tokio::main(flavor="current_thread")]
@@ -282,7 +294,7 @@ impl<T: AsyncFileReader + Send + 'static> 
ParquetRecordBatchStreamBuilder<T> {
     /// # }
     /// ```
     ///
-    /// # Example configuring options and reading metadata
+    /// # Example Configuring Options and Reading Metadata
     ///
     /// There are many options that control the behavior of the reader, such as
     /// `with_batch_size`, `with_projection`, `with_filter`, etc...
@@ -351,6 +363,86 @@ impl<T: AsyncFileReader + Send + 'static> 
ParquetRecordBatchStreamBuilder<T> {
     /// assert_eq!(results.len(), 3);
     /// # }
     /// ```
+    ///
+    /// # Example reading Row Groups in Parallel
+    ///
+    /// Each [`ParquetRecordBatchStream`] is independent and can be used to 
read
+    /// from the same underlying source in parallel. Use
+    /// [`ParquetRecordBatchStream::next_row_group`] with a single stream to
+    /// begin prefetching the next Row Group. To read a file in parallel, 
create
+    /// a stream for each subset of the file. For example, you can read each
+    /// row group in parallel by creating a stream for each row group using the
+    /// [`ParquetRecordBatchStreamBuilder::with_row_groups`] API as shown below
+    ///
+    /// ```
+    /// # use std::sync::Arc;
+    /// # use arrow_array::{ArrayRef, Int32Array, RecordBatch};
+    /// # use arrow::util::pretty::pretty_format_batches;
+    /// # use futures::{StreamExt, TryStreamExt};
+    /// # use tempfile::NamedTempFile;
+    /// # use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, 
ProjectionMask};
+    /// # use parquet::arrow::arrow_reader::{ArrowReaderMetadata, 
ArrowReaderOptions};
+    /// # use parquet::file::metadata::ParquetMetaDataReader;
+    /// # use parquet::file::properties::{WriterProperties};
+    /// # // write to a temporary file with 10 RowGroups and read back with 
async API
+    /// # fn write_file() -> parquet::errors::Result<NamedTempFile> {
+    /// #   let mut file = NamedTempFile::new().unwrap();
+    /// #   let small_batch = RecordBatch::try_from_iter([
+    /// #      ("id", Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4])) as 
ArrayRef),
+    /// #   ]).unwrap();
+    /// #   let props = WriterProperties::builder()
+    /// #     .set_max_row_group_row_count(Some(5))
+    /// #     .set_write_batch_size(5)
+    /// #     .build();
+    /// #   let mut writer = ArrowWriter::try_new(&mut file, 
small_batch.schema(), Some(props))?;
+    /// #   for i in 0..10 {
+    /// #     writer.write(&small_batch)?
+    /// #   };
+    /// #   writer.close()?;
+    /// #   Ok(file)
+    /// # }
+    /// # #[tokio::main(flavor="current_thread")]
+    /// # async fn main() -> parquet::errors::Result<()> {
+    /// # let t = write_file()?;
+    /// # let path = t.path();
+    /// // This example uses a tokio::fs::File as the async source, but it
+    /// // could be any async source such as an object store reader)
+    /// let mut file = tokio::fs::File::open(path).await?;
+    /// // To read Row Groups in parallel, create a separate stream builder 
for each Row Group.
+    /// // First get the metadata to find the row group information
+    /// let file_size = file.metadata().await?.len();
+    /// let metadata = ParquetMetaDataReader::new().load_and_finish(&mut file, 
file_size).await?;
+    /// assert_eq!(metadata.num_row_groups(), 10); // file has 10 row groups 
with 5 rows each
+    /// // Create a stream reader for each row group
+    /// let reader_metadata = ArrowReaderMetadata::try_new(
+    ///   Arc::new(metadata),
+    ///   ArrowReaderOptions::new()
+    /// )?;
+    /// let mut streams = vec![];
+    ///  for row_group_index in 0..10 {
+    ///   // Each stream needs its own source instance to issue
+    ///   // parallel IO requests, so clone the file for each stream
+    ///   let this_file = file.try_clone().await?;
+    ///   let stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
+    ///        this_file,
+    ///        reader_metadata.clone()
+    ///      )
+    ///      .with_row_groups(vec![row_group_index]) // read only this row 
group
+    ///      .build()?;
+    ///     streams.push(stream);
+    /// }
+    /// // Each reader can now be polled independently and in parallel, for
+    /// // example using StreamExt::buffered to read from 3 at a time
+    /// let results = futures::stream::iter(streams)
+    ///  .map(|stream| async move { stream })
+    ///  .buffered(3)
+    ///  .flatten()
+    ///  .try_collect::<Vec<_>>().await?;
+    /// // read all 50 rows (10 row groups x 5 rows per group)
+    /// assert_eq!(50, results.iter().map(|s| s.num_rows()).sum::<usize>());
+    /// # Ok(())
+    /// # }
+    /// ```
     pub async fn new(input: T) -> Result<Self> {
         Self::new_with_options(input, Default::default()).await
     }

Reply via email to