This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 10cf03c358 feat(parquet): Add next_row_group API for
ParquetRecordBatchStream (#6907)
10cf03c358 is described below
commit 10cf03c35808eedc191a9b3330bc7c268b7b71c1
Author: Xuanwo <[email protected]>
AuthorDate: Tue Dec 24 22:22:33 2024 +0800
feat(parquet): Add next_row_group API for ParquetRecordBatchStream (#6907)
* feat(parquet): Add next_row_group API for ParquetRecordBatchStream
Signed-off-by: Xuanwo <[email protected]>
* chore: Returning error instead of using unreachable
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
parquet/src/arrow/async_reader/mod.rs | 132 ++++++++++++++++++++++++++++++++++
1 file changed, 132 insertions(+)
diff --git a/parquet/src/arrow/async_reader/mod.rs
b/parquet/src/arrow/async_reader/mod.rs
index c408456df1..96715e1164 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -613,6 +613,9 @@ impl<T> std::fmt::Debug for StreamState<T> {
/// An asynchronous
[`Stream`](https://docs.rs/futures/latest/futures/stream/trait.Stream.html) of
[`RecordBatch`]
/// for a parquet file that can be constructed using
[`ParquetRecordBatchStreamBuilder`].
+///
+/// `ParquetRecordBatchStream` also provides
[`ParquetRecordBatchStream::next_row_group`] for fetching row groups,
+/// allowing users to decode record batches separately from I/O.
pub struct ParquetRecordBatchStream<T> {
metadata: Arc<ParquetMetaData>,
@@ -654,6 +657,70 @@ impl<T> ParquetRecordBatchStream<T> {
}
}
+impl<T> ParquetRecordBatchStream<T>
+where
+ T: AsyncFileReader + Unpin + Send + 'static,
+{
+ /// Fetches the next row group from the stream.
+ ///
+ /// Users can continue to call this function to get row groups and decode
them concurrently.
+ ///
+ /// ## Notes
+ ///
+ /// ParquetRecordBatchStream should be used either as a `Stream` or with
`next_row_group`; they should not be used simultaneously.
+ ///
+ /// ## Returns
+ ///
+ /// - `Ok(None)` if the stream has ended.
+ /// - `Err(error)` if the stream has errored. All subsequent calls will
return `Ok(None)`.
+ /// - `Ok(Some(reader))` which holds all the data for the row group.
+ pub async fn next_row_group(&mut self) ->
Result<Option<ParquetRecordBatchReader>> {
+ loop {
+ match &mut self.state {
+ StreamState::Decoding(_) | StreamState::Reading(_) => {
+ return Err(ParquetError::General(
+ "Cannot combine the use of next_row_group with the
Stream API".to_string(),
+ ))
+ }
+ StreamState::Init => {
+ let row_group_idx = match self.row_groups.pop_front() {
+ Some(idx) => idx,
+ None => return Ok(None),
+ };
+
+ let row_count =
self.metadata.row_group(row_group_idx).num_rows() as usize;
+
+ let selection = self.selection.as_mut().map(|s|
s.split_off(row_count));
+
+ let reader_factory = self.reader.take().expect("lost
reader");
+
+ let (reader_factory, maybe_reader) = reader_factory
+ .read_row_group(
+ row_group_idx,
+ selection,
+ self.projection.clone(),
+ self.batch_size,
+ )
+ .await
+ .map_err(|err| {
+ self.state = StreamState::Error;
+ err
+ })?;
+ self.reader = Some(reader_factory);
+
+ if let Some(reader) = maybe_reader {
+ return Ok(Some(reader));
+ } else {
+ // All rows skipped, read next row group
+ continue;
+ }
+ }
+ StreamState::Error => return Ok(None), // Ends the stream as
error happens.
+ }
+ }
+ }
+}
+
impl<T> Stream for ParquetRecordBatchStream<T>
where
T: AsyncFileReader + Unpin + Send + 'static,
@@ -1020,6 +1087,71 @@ mod tests {
);
}
+ #[tokio::test]
+ async fn test_async_reader_with_next_row_group() {
+ let testdata = arrow::util::test_util::parquet_test_data();
+ let path = format!("{testdata}/alltypes_plain.parquet");
+ let data = Bytes::from(std::fs::read(path).unwrap());
+
+ let metadata = ParquetMetaDataReader::new()
+ .parse_and_finish(&data)
+ .unwrap();
+ let metadata = Arc::new(metadata);
+
+ assert_eq!(metadata.num_row_groups(), 1);
+
+ let async_reader = TestReader {
+ data: data.clone(),
+ metadata: metadata.clone(),
+ requests: Default::default(),
+ };
+
+ let requests = async_reader.requests.clone();
+ let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
+ .await
+ .unwrap();
+
+ let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1,
2]);
+ let mut stream = builder
+ .with_projection(mask.clone())
+ .with_batch_size(1024)
+ .build()
+ .unwrap();
+
+ let mut readers = vec![];
+ while let Some(reader) = stream.next_row_group().await.unwrap() {
+ readers.push(reader);
+ }
+
+ let async_batches: Vec<_> = readers
+ .into_iter()
+ .flat_map(|r| r.map(|v| v.unwrap()).collect::<Vec<_>>())
+ .collect();
+
+ let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
+ .unwrap()
+ .with_projection(mask)
+ .with_batch_size(104)
+ .build()
+ .unwrap()
+ .collect::<ArrowResult<Vec<_>>>()
+ .unwrap();
+
+ assert_eq!(async_batches, sync_batches);
+
+ let requests = requests.lock().unwrap();
+ let (offset_1, length_1) =
metadata.row_group(0).column(1).byte_range();
+ let (offset_2, length_2) =
metadata.row_group(0).column(2).byte_range();
+
+ assert_eq!(
+ &requests[..],
+ &[
+ offset_1 as usize..(offset_1 + length_1) as usize,
+ offset_2 as usize..(offset_2 + length_2) as usize
+ ]
+ );
+ }
+
#[tokio::test]
async fn test_async_reader_with_index() {
let testdata = arrow::util::test_util::parquet_test_data();