This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new a62903c5fc Improve `ParquetRecordBatchStreamBuilder` docs / examples
(#6948)
a62903c5fc is described below
commit a62903c5fcd5c0cf5f55449f65565b607b35e281
Author: Andrew Lamb <[email protected]>
AuthorDate: Fri Jan 10 14:49:27 2025 -0500
Improve `ParquetRecordBatchStreamBuilder` docs / examples (#6948)
* Improve `ParquetRecordBatchStreamBuilder` docs
* Apply suggestions from code review
Thank you @etseidl ❤️
Co-authored-by: Ed Seidl <[email protected]>
* Update parquet/src/arrow/async_reader/mod.rs
Co-authored-by: Ed Seidl <[email protected]>
---------
Co-authored-by: Ed Seidl <[email protected]>
---
parquet/src/arrow/async_reader/mod.rs | 215 +++++++++++++++++++++-------------
1 file changed, 133 insertions(+), 82 deletions(-)
diff --git a/parquet/src/arrow/async_reader/mod.rs
b/parquet/src/arrow/async_reader/mod.rs
index 5323251b07..2c8a59399d 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -15,65 +15,13 @@
// specific language governing permissions and limitations
// under the License.
-//! Provides `async` API for reading parquet files as
+//! [`ParquetRecordBatchStreamBuilder`]: `async` API for reading Parquet
files as
//! [`RecordBatch`]es
//!
-//! ```
-//! # #[tokio::main(flavor="current_thread")]
-//! # async fn main() {
-//! #
-//! # use arrow_array::RecordBatch;
-//! # use arrow::util::pretty::pretty_format_batches;
-//! # use futures::TryStreamExt;
-//! # use tokio::fs::File;
-//! #
-//! # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
-//! #
-//! # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
-//! # let formatted = pretty_format_batches(batches).unwrap().to_string();
-//! # let actual_lines: Vec<_> = formatted.trim().lines().collect();
-//! # assert_eq!(
-//! # &actual_lines, expected_lines,
-//! # "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
-//! # expected_lines, actual_lines
-//! # );
-//! # }
-//! #
-//! let testdata = arrow::util::test_util::parquet_test_data();
-//! let path = format!("{}/alltypes_plain.parquet", testdata);
-//! let file = File::open(path).await.unwrap();
+//! This can be used to decode a Parquet file in streaming fashion (without
+//! downloading the whole file at once) from a remote source, such as an
object store.
//!
-//! let builder = ParquetRecordBatchStreamBuilder::new(file)
-//! .await
-//! .unwrap()
-//! .with_batch_size(3);
-//!
-//! let file_metadata = builder.metadata().file_metadata();
-//! let mask = ProjectionMask::roots(file_metadata.schema_descr(), [1, 2, 6]);
-//!
-//! let stream = builder.with_projection(mask).build().unwrap();
-//! let results = stream.try_collect::<Vec<_>>().await.unwrap();
-//! assert_eq!(results.len(), 3);
-//!
-//! assert_batches_eq(
-//! &results,
-//! &[
-//! "+----------+-------------+-----------+",
-//! "| bool_col | tinyint_col | float_col |",
-//! "+----------+-------------+-----------+",
-//! "| true | 0 | 0.0 |",
-//! "| false | 1 | 1.1 |",
-//! "| true | 0 | 0.0 |",
-//! "| false | 1 | 1.1 |",
-//! "| true | 0 | 0.0 |",
-//! "| false | 1 | 1.1 |",
-//! "| true | 0 | 0.0 |",
-//! "| false | 1 | 1.1 |",
-//! "+----------+-------------+-----------+",
-//! ],
-//! );
-//! # }
-//! ```
+//! See example on [`ParquetRecordBatchStreamBuilder::new`]
use std::collections::VecDeque;
use std::fmt::Formatter;
@@ -249,53 +197,153 @@ impl ArrowReaderMetadata {
/// breaking the pre-existing ParquetRecordBatchStreamBuilder API
pub struct AsyncReader<T>(T);
-/// A builder used to construct a [`ParquetRecordBatchStream`] for `async`
reading of a parquet file
+/// A builder for reading parquet files from an `async` source as
[`ParquetRecordBatchStream`]
///
-/// In particular, this handles reading the parquet file metadata, allowing
consumers
+/// This builder handles reading the parquet file metadata, allowing consumers
/// to use this information to select what specific columns, row groups, etc...
/// they wish to be read by the resulting stream
///
+/// See examples on [`ParquetRecordBatchStreamBuilder::new`]
+///
/// See [`ArrowReaderBuilder`] for additional member functions
pub type ParquetRecordBatchStreamBuilder<T> =
ArrowReaderBuilder<AsyncReader<T>>;
impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
- /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided
parquet file
+ /// Create a new [`ParquetRecordBatchStreamBuilder`] for reading from the
+ /// specified source.
///
/// # Example
+ /// ```
+ /// # #[tokio::main(flavor="current_thread")]
+ /// # async fn main() {
+ /// #
+ /// # use arrow_array::RecordBatch;
+ /// # use arrow::util::pretty::pretty_format_batches;
+ /// # use futures::TryStreamExt;
+ /// #
+ /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder,
ProjectionMask};
+ /// #
+ /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines:
&[&str]) {
+ /// # let formatted =
pretty_format_batches(batches).unwrap().to_string();
+ /// # let actual_lines: Vec<_> = formatted.trim().lines().collect();
+ /// # assert_eq!(
+ /// # &actual_lines, expected_lines,
+ /// # "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+ /// # expected_lines, actual_lines
+ /// # );
+ /// # }
+ /// #
+ /// # let testdata = arrow::util::test_util::parquet_test_data();
+ /// # let path = format!("{}/alltypes_plain.parquet", testdata);
+ /// // Use tokio::fs::File to read data using an async I/O. This can be
replaced with
+ /// // another async I/O reader such as a reader from an object store.
+ /// let file = tokio::fs::File::open(path).await.unwrap();
+ ///
+ /// // Configure options for reading from the async souce
+ /// let builder = ParquetRecordBatchStreamBuilder::new(file)
+ /// .await
+ /// .unwrap();
+ /// // Building the stream opens the parquet file (reads metadata, etc)
and returns
+ /// // a stream that can be used to incrementally read the data in batches
+ /// let stream = builder.build().unwrap();
+ /// // In this example, we collect the stream into a Vec<RecordBatch>
+ /// // but real applications would likely process the batches as they are
read
+ /// let results = stream.try_collect::<Vec<_>>().await.unwrap();
+ /// // Demonstrate the results are as expected
+ /// assert_batches_eq(
+ /// &results,
+ /// &[
+ ///
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
+ /// "| id | bool_col | tinyint_col | smallint_col | int_col |
bigint_col | float_col | double_col | date_string_col | string_col |
timestamp_col |",
+ ///
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
+ /// "| 4 | true | 0 | 0 | 0 | 0
| 0.0 | 0.0 | 30332f30312f3039 | 30 |
2009-03-01T00:00:00 |",
+ /// "| 5 | false | 1 | 1 | 1 | 10
| 1.1 | 10.1 | 30332f30312f3039 | 31 |
2009-03-01T00:01:00 |",
+ /// "| 6 | true | 0 | 0 | 0 | 0
| 0.0 | 0.0 | 30342f30312f3039 | 30 |
2009-04-01T00:00:00 |",
+ /// "| 7 | false | 1 | 1 | 1 | 10
| 1.1 | 10.1 | 30342f30312f3039 | 31 |
2009-04-01T00:01:00 |",
+ /// "| 2 | true | 0 | 0 | 0 | 0
| 0.0 | 0.0 | 30322f30312f3039 | 30 |
2009-02-01T00:00:00 |",
+ /// "| 3 | false | 1 | 1 | 1 | 10
| 1.1 | 10.1 | 30322f30312f3039 | 31 |
2009-02-01T00:01:00 |",
+ /// "| 0 | true | 0 | 0 | 0 | 0
| 0.0 | 0.0 | 30312f30312f3039 | 30 |
2009-01-01T00:00:00 |",
+ /// "| 1 | false | 1 | 1 | 1 | 10
| 1.1 | 10.1 | 30312f30312f3039 | 31 |
2009-01-01T00:01:00 |",
+ ///
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
+ /// ],
+ /// );
+ /// # }
+ /// ```
+ ///
+ /// # Example configuring options and reading metadata
+ ///
+ /// There are many options that control the behavior of the reader, such as
+ /// `with_batch_size`, `with_projection`, `with_filter`, etc...
///
/// ```
- /// # use std::fs::metadata;
- /// # use std::sync::Arc;
- /// # use bytes::Bytes;
- /// # use arrow_array::{Int32Array, RecordBatch};
- /// # use arrow_schema::{DataType, Field, Schema};
- /// # use parquet::arrow::arrow_reader::ArrowReaderMetadata;
- /// # use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder};
- /// # use tempfile::tempfile;
- /// # use futures::StreamExt;
/// # #[tokio::main(flavor="current_thread")]
/// # async fn main() {
/// #
- /// # let mut file = tempfile().unwrap();
- /// # let schema = Arc::new(Schema::new(vec![Field::new("i32",
DataType::Int32, false)]));
- /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(),
None).unwrap();
- /// # let batch = RecordBatch::try_new(schema,
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
- /// # writer.write(&batch).unwrap();
- /// # writer.close().unwrap();
- /// // Open async file containing parquet data
- /// let mut file = tokio::fs::File::from_std(file);
- /// // construct the reader
- /// let mut reader = ParquetRecordBatchStreamBuilder::new(file)
- /// .await.unwrap().build().unwrap();
- /// // Read batche
- /// let batch: RecordBatch = reader.next().await.unwrap().unwrap();
+ /// # use arrow_array::RecordBatch;
+ /// # use arrow::util::pretty::pretty_format_batches;
+ /// # use futures::TryStreamExt;
+ /// #
+ /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder,
ProjectionMask};
+ /// #
+ /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines:
&[&str]) {
+ /// # let formatted =
pretty_format_batches(batches).unwrap().to_string();
+ /// # let actual_lines: Vec<_> = formatted.trim().lines().collect();
+ /// # assert_eq!(
+ /// # &actual_lines, expected_lines,
+ /// # "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+ /// # expected_lines, actual_lines
+ /// # );
+ /// # }
+ /// #
+ /// # let testdata = arrow::util::test_util::parquet_test_data();
+ /// # let path = format!("{}/alltypes_plain.parquet", testdata);
+ /// // As before, use tokio::fs::File to read data using an async I/O.
+ /// let file = tokio::fs::File::open(path).await.unwrap();
+ ///
+ /// // Configure options for reading from the async source, in this case
we set the batch size
+ /// // to 3 which produces 3 rows at a time.
+ /// let builder = ParquetRecordBatchStreamBuilder::new(file)
+ /// .await
+ /// .unwrap()
+ /// .with_batch_size(3);
+ ///
+ /// // We can also read the metadata to inspect the schema and other
metadata
+ /// // before actually reading the data
+ /// let file_metadata = builder.metadata().file_metadata();
+ /// // Specify that we only want to read the 1st, 2nd, and 6th columns
+ /// let mask = ProjectionMask::roots(file_metadata.schema_descr(), [1, 2,
6]);
+ ///
+ /// let stream = builder.with_projection(mask).build().unwrap();
+ /// let results = stream.try_collect::<Vec<_>>().await.unwrap();
+ /// // Print out the results
+ /// assert_batches_eq(
+ /// &results,
+ /// &[
+ /// "+----------+-------------+-----------+",
+ /// "| bool_col | tinyint_col | float_col |",
+ /// "+----------+-------------+-----------+",
+ /// "| true | 0 | 0.0 |",
+ /// "| false | 1 | 1.1 |",
+ /// "| true | 0 | 0.0 |",
+ /// "| false | 1 | 1.1 |",
+ /// "| true | 0 | 0.0 |",
+ /// "| false | 1 | 1.1 |",
+ /// "| true | 0 | 0.0 |",
+ /// "| false | 1 | 1.1 |",
+ /// "+----------+-------------+-----------+",
+ /// ],
+ /// );
+ ///
+ /// // The results has 8 rows, so since we set the batch size to 3, we
expect
+ /// // 3 batches, two with 3 rows each and the last batch with 2 rows.
+ /// assert_eq!(results.len(), 3);
/// # }
/// ```
pub async fn new(input: T) -> Result<Self> {
Self::new_with_options(input, Default::default()).await
}
- /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided
parquet file
+ /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided
async source
/// and [`ArrowReaderOptions`]
pub async fn new_with_options(mut input: T, options: ArrowReaderOptions)
-> Result<Self> {
let metadata = ArrowReaderMetadata::load_async(&mut input,
options).await?;
@@ -352,6 +400,7 @@ impl<T: AsyncFileReader + Send + 'static>
ParquetRecordBatchStreamBuilder<T> {
}
/// Read bloom filter for a column in a row group
+ ///
/// Returns `None` if the column does not have a bloom filter
///
/// We should call this function after other forms pruning, such as
projection and predicate pushdown.
@@ -415,6 +464,8 @@ impl<T: AsyncFileReader + Send + 'static>
ParquetRecordBatchStreamBuilder<T> {
}
/// Build a new [`ParquetRecordBatchStream`]
+ ///
+ /// See examples on [`ParquetRecordBatchStreamBuilder::new`]
pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
let num_row_groups = self.metadata.row_groups().len();