This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new a62903c5fc Improve `ParquetRecordBatchStreamBuilder` docs / examples 
(#6948)
a62903c5fc is described below

commit a62903c5fcd5c0cf5f55449f65565b607b35e281
Author: Andrew Lamb <[email protected]>
AuthorDate: Fri Jan 10 14:49:27 2025 -0500

    Improve `ParquetRecordBatchStreamBuilder` docs / examples (#6948)
    
    * Improve `ParquetRecordBatchStreamBuilder` docs
    
    * Apply suggestions from code review
    
    Thank you @etseidl  ❤️
    
    Co-authored-by: Ed Seidl <[email protected]>
    
    * Update parquet/src/arrow/async_reader/mod.rs
    
    Co-authored-by: Ed Seidl <[email protected]>
    
    ---------
    
    Co-authored-by: Ed Seidl <[email protected]>
---
 parquet/src/arrow/async_reader/mod.rs | 215 +++++++++++++++++++++-------------
 1 file changed, 133 insertions(+), 82 deletions(-)

diff --git a/parquet/src/arrow/async_reader/mod.rs 
b/parquet/src/arrow/async_reader/mod.rs
index 5323251b07..2c8a59399d 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -15,65 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Provides `async` API for reading parquet files as
+//! [`ParquetRecordBatchStreamBuilder`]:  `async` API for reading Parquet 
files as
 //! [`RecordBatch`]es
 //!
-//! ```
-//! # #[tokio::main(flavor="current_thread")]
-//! # async fn main() {
-//! #
-//! # use arrow_array::RecordBatch;
-//! # use arrow::util::pretty::pretty_format_batches;
-//! # use futures::TryStreamExt;
-//! # use tokio::fs::File;
-//! #
-//! # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
-//! #
-//! # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
-//! #     let formatted = pretty_format_batches(batches).unwrap().to_string();
-//! #     let actual_lines: Vec<_> = formatted.trim().lines().collect();
-//! #     assert_eq!(
-//! #          &actual_lines, expected_lines,
-//! #          "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
-//! #          expected_lines, actual_lines
-//! #      );
-//! #  }
-//! #
-//! let testdata = arrow::util::test_util::parquet_test_data();
-//! let path = format!("{}/alltypes_plain.parquet", testdata);
-//! let file = File::open(path).await.unwrap();
+//! This can be used to decode a Parquet file in streaming fashion (without
+//! downloading the whole file at once) from a remote source, such as an 
object store.
 //!
-//! let builder = ParquetRecordBatchStreamBuilder::new(file)
-//!     .await
-//!     .unwrap()
-//!     .with_batch_size(3);
-//!
-//! let file_metadata = builder.metadata().file_metadata();
-//! let mask = ProjectionMask::roots(file_metadata.schema_descr(), [1, 2, 6]);
-//!
-//! let stream = builder.with_projection(mask).build().unwrap();
-//! let results = stream.try_collect::<Vec<_>>().await.unwrap();
-//! assert_eq!(results.len(), 3);
-//!
-//! assert_batches_eq(
-//!     &results,
-//!     &[
-//!         "+----------+-------------+-----------+",
-//!         "| bool_col | tinyint_col | float_col |",
-//!         "+----------+-------------+-----------+",
-//!         "| true     | 0           | 0.0       |",
-//!         "| false    | 1           | 1.1       |",
-//!         "| true     | 0           | 0.0       |",
-//!         "| false    | 1           | 1.1       |",
-//!         "| true     | 0           | 0.0       |",
-//!         "| false    | 1           | 1.1       |",
-//!         "| true     | 0           | 0.0       |",
-//!         "| false    | 1           | 1.1       |",
-//!         "+----------+-------------+-----------+",
-//!      ],
-//!  );
-//! # }
-//! ```
+//! See example on [`ParquetRecordBatchStreamBuilder::new`]
 
 use std::collections::VecDeque;
 use std::fmt::Formatter;
@@ -249,53 +197,153 @@ impl ArrowReaderMetadata {
 /// breaking the pre-existing ParquetRecordBatchStreamBuilder API
 pub struct AsyncReader<T>(T);
 
-/// A builder used to construct a [`ParquetRecordBatchStream`] for `async` 
reading of a parquet file
+/// A builder for reading parquet files from an `async` source as  
[`ParquetRecordBatchStream`]
 ///
-/// In particular, this handles reading the parquet file metadata, allowing 
consumers
+/// This builder  handles reading the parquet file metadata, allowing consumers
 /// to use this information to select what specific columns, row groups, etc...
 /// they wish to be read by the resulting stream
 ///
+/// See examples on [`ParquetRecordBatchStreamBuilder::new`]
+///
 /// See [`ArrowReaderBuilder`] for additional member functions
 pub type ParquetRecordBatchStreamBuilder<T> = 
ArrowReaderBuilder<AsyncReader<T>>;
 
 impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
-    /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided 
parquet file
+    /// Create a new [`ParquetRecordBatchStreamBuilder`] for reading from the
+    /// specified source.
     ///
     /// # Example
+    /// ```
+    /// # #[tokio::main(flavor="current_thread")]
+    /// # async fn main() {
+    /// #
+    /// # use arrow_array::RecordBatch;
+    /// # use arrow::util::pretty::pretty_format_batches;
+    /// # use futures::TryStreamExt;
+    /// #
+    /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder, 
ProjectionMask};
+    /// #
+    /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: 
&[&str]) {
+    /// #     let formatted = 
pretty_format_batches(batches).unwrap().to_string();
+    /// #     let actual_lines: Vec<_> = formatted.trim().lines().collect();
+    /// #     assert_eq!(
+    /// #          &actual_lines, expected_lines,
+    /// #          "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+    /// #          expected_lines, actual_lines
+    /// #      );
+    /// #  }
+    /// #
+    /// # let testdata = arrow::util::test_util::parquet_test_data();
+    /// # let path = format!("{}/alltypes_plain.parquet", testdata);
+    /// // Use tokio::fs::File to read data using an async I/O. This can be 
replaced with
+    /// // another async I/O reader such as a reader from an object store.
+    /// let file = tokio::fs::File::open(path).await.unwrap();
+    ///
+    /// // Configure options for reading from the async souce
+    /// let builder = ParquetRecordBatchStreamBuilder::new(file)
+    ///     .await
+    ///     .unwrap();
+    /// // Building the stream opens the parquet file (reads metadata, etc) 
and returns
+    /// // a stream that can be used to incrementally read the data in batches
+    /// let stream = builder.build().unwrap();
+    /// // In this example, we collect the stream into a Vec<RecordBatch>
+    /// // but real applications would likely process the batches as they are 
read
+    /// let results = stream.try_collect::<Vec<_>>().await.unwrap();
+    /// // Demonstrate the results are as expected
+    /// assert_batches_eq(
+    ///     &results,
+    ///     &[
+    ///       
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
+    ///       "| id | bool_col | tinyint_col | smallint_col | int_col | 
bigint_col | float_col | double_col | date_string_col  | string_col | 
timestamp_col       |",
+    ///       
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
+    ///       "| 4  | true     | 0           | 0            | 0       | 0      
    | 0.0       | 0.0        | 30332f30312f3039 | 30         | 
2009-03-01T00:00:00 |",
+    ///       "| 5  | false    | 1           | 1            | 1       | 10     
    | 1.1       | 10.1       | 30332f30312f3039 | 31         | 
2009-03-01T00:01:00 |",
+    ///       "| 6  | true     | 0           | 0            | 0       | 0      
    | 0.0       | 0.0        | 30342f30312f3039 | 30         | 
2009-04-01T00:00:00 |",
+    ///       "| 7  | false    | 1           | 1            | 1       | 10     
    | 1.1       | 10.1       | 30342f30312f3039 | 31         | 
2009-04-01T00:01:00 |",
+    ///       "| 2  | true     | 0           | 0            | 0       | 0      
    | 0.0       | 0.0        | 30322f30312f3039 | 30         | 
2009-02-01T00:00:00 |",
+    ///       "| 3  | false    | 1           | 1            | 1       | 10     
    | 1.1       | 10.1       | 30322f30312f3039 | 31         | 
2009-02-01T00:01:00 |",
+    ///       "| 0  | true     | 0           | 0            | 0       | 0      
    | 0.0       | 0.0        | 30312f30312f3039 | 30         | 
2009-01-01T00:00:00 |",
+    ///       "| 1  | false    | 1           | 1            | 1       | 10     
    | 1.1       | 10.1       | 30312f30312f3039 | 31         | 
2009-01-01T00:01:00 |",
+    ///       
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
+    ///      ],
+    ///  );
+    /// # }
+    /// ```
+    ///
+    /// # Example configuring options and reading metadata
+    ///
+    /// There are many options that control the behavior of the reader, such as
+    /// `with_batch_size`, `with_projection`, `with_filter`, etc...
     ///
     /// ```
-    /// # use std::fs::metadata;
-    /// # use std::sync::Arc;
-    /// # use bytes::Bytes;
-    /// # use arrow_array::{Int32Array, RecordBatch};
-    /// # use arrow_schema::{DataType, Field, Schema};
-    /// # use parquet::arrow::arrow_reader::ArrowReaderMetadata;
-    /// # use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder};
-    /// # use tempfile::tempfile;
-    /// # use futures::StreamExt;
     /// # #[tokio::main(flavor="current_thread")]
     /// # async fn main() {
     /// #
-    /// # let mut file = tempfile().unwrap();
-    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", 
DataType::Int32, false)]));
-    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), 
None).unwrap();
-    /// # let batch = RecordBatch::try_new(schema, 
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
-    /// # writer.write(&batch).unwrap();
-    /// # writer.close().unwrap();
-    /// // Open async file containing parquet data
-    /// let mut file = tokio::fs::File::from_std(file);
-    /// // construct the reader
-    /// let mut reader = ParquetRecordBatchStreamBuilder::new(file)
-    ///   .await.unwrap().build().unwrap();
-    /// // Read batche
-    /// let batch: RecordBatch = reader.next().await.unwrap().unwrap();
+    /// # use arrow_array::RecordBatch;
+    /// # use arrow::util::pretty::pretty_format_batches;
+    /// # use futures::TryStreamExt;
+    /// #
+    /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder, 
ProjectionMask};
+    /// #
+    /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: 
&[&str]) {
+    /// #     let formatted = 
pretty_format_batches(batches).unwrap().to_string();
+    /// #     let actual_lines: Vec<_> = formatted.trim().lines().collect();
+    /// #     assert_eq!(
+    /// #          &actual_lines, expected_lines,
+    /// #          "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+    /// #          expected_lines, actual_lines
+    /// #      );
+    /// #  }
+    /// #
+    /// # let testdata = arrow::util::test_util::parquet_test_data();
+    /// # let path = format!("{}/alltypes_plain.parquet", testdata);
+    /// // As before, use tokio::fs::File to read data using an async I/O.
+    /// let file = tokio::fs::File::open(path).await.unwrap();
+    ///
+    /// // Configure options for reading from the async source, in this case 
we set the batch size
+    /// // to 3 which produces 3 rows at a time.
+    /// let builder = ParquetRecordBatchStreamBuilder::new(file)
+    ///     .await
+    ///     .unwrap()
+    ///     .with_batch_size(3);
+    ///
+    /// // We can also read the metadata to inspect the schema and other 
metadata
+    /// // before actually reading the data
+    /// let file_metadata = builder.metadata().file_metadata();
+    /// // Specify that we only want to read the 1st, 2nd, and 6th columns
+    /// let mask = ProjectionMask::roots(file_metadata.schema_descr(), [1, 2, 
6]);
+    ///
+    /// let stream = builder.with_projection(mask).build().unwrap();
+    /// let results = stream.try_collect::<Vec<_>>().await.unwrap();
+    /// // Print out the results
+    /// assert_batches_eq(
+    ///     &results,
+    ///     &[
+    ///         "+----------+-------------+-----------+",
+    ///         "| bool_col | tinyint_col | float_col |",
+    ///         "+----------+-------------+-----------+",
+    ///         "| true     | 0           | 0.0       |",
+    ///         "| false    | 1           | 1.1       |",
+    ///         "| true     | 0           | 0.0       |",
+    ///         "| false    | 1           | 1.1       |",
+    ///         "| true     | 0           | 0.0       |",
+    ///         "| false    | 1           | 1.1       |",
+    ///         "| true     | 0           | 0.0       |",
+    ///         "| false    | 1           | 1.1       |",
+    ///         "+----------+-------------+-----------+",
+    ///      ],
+    ///  );
+    ///
+    /// // The results has 8 rows, so since we set the batch size to 3, we 
expect
+    /// // 3 batches, two with 3 rows each and the last batch with 2 rows.
+    /// assert_eq!(results.len(), 3);
     /// # }
     /// ```
     pub async fn new(input: T) -> Result<Self> {
         Self::new_with_options(input, Default::default()).await
     }
 
-    /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided 
parquet file
+    /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided 
async source
     /// and [`ArrowReaderOptions`]
     pub async fn new_with_options(mut input: T, options: ArrowReaderOptions) 
-> Result<Self> {
         let metadata = ArrowReaderMetadata::load_async(&mut input, 
options).await?;
@@ -352,6 +400,7 @@ impl<T: AsyncFileReader + Send + 'static> 
ParquetRecordBatchStreamBuilder<T> {
     }
 
     /// Read bloom filter for a column in a row group
+    ///
     /// Returns `None` if the column does not have a bloom filter
     ///
     /// We should call this function after other forms pruning, such as 
projection and predicate pushdown.
@@ -415,6 +464,8 @@ impl<T: AsyncFileReader + Send + 'static> 
ParquetRecordBatchStreamBuilder<T> {
     }
 
     /// Build a new [`ParquetRecordBatchStream`]
+    ///
+    /// See examples on [`ParquetRecordBatchStreamBuilder::new`]
     pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
         let num_row_groups = self.metadata.row_groups().len();
 

Reply via email to