This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new f4d4f76e7 feat: implement generic record batch reader (#3733)
f4d4f76e7 is described below
commit f4d4f76e75c6f3f3127f025d05a526fc5334459a
Author: Will Jones <[email protected]>
AuthorDate: Mon Feb 20 03:43:23 2023 -0800
feat: implement generic record batch reader (#3733)
---
arrow-array/src/lib.rs | 4 ++-
arrow-array/src/record_batch.rs | 74 +++++++++++++++++++++++++++++++++++++++++
2 files changed, 77 insertions(+), 1 deletion(-)
diff --git a/arrow-array/src/lib.rs b/arrow-array/src/lib.rs
index d8dc6efe2..2cee2650e 100644
--- a/arrow-array/src/lib.rs
+++ b/arrow-array/src/lib.rs
@@ -165,7 +165,9 @@ pub mod array;
pub use array::*;
mod record_batch;
-pub use record_batch::{RecordBatch, RecordBatchOptions, RecordBatchReader};
+pub use record_batch::{
+ RecordBatch, RecordBatchIterator, RecordBatchOptions, RecordBatchReader,
+};
mod arithmetic;
pub use arithmetic::ArrowNativeTypeOp;
diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs
index 035efb4f0..3b517872a 100644
--- a/arrow-array/src/record_batch.rs
+++ b/arrow-array/src/record_batch.rs
@@ -24,6 +24,8 @@ use std::ops::Index;
use std::sync::Arc;
/// Trait for types that can read `RecordBatch`'s.
+///
+/// To create from an iterator, see [RecordBatchIterator].
pub trait RecordBatchReader: Iterator<Item = Result<RecordBatch, ArrowError>> {
/// Returns the schema of this `RecordBatchReader`.
///
@@ -491,6 +493,78 @@ impl Index<&str> for RecordBatch {
}
}
+/// Generic implementation of [RecordBatchReader] that wraps an iterator.
+///
+/// # Example
+///
+/// ```
+/// # use std::sync::Arc;
+/// # use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray,
RecordBatchIterator, RecordBatchReader};
+/// #
+/// let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2]));
+/// let b: ArrayRef = Arc::new(StringArray::from(vec!["a", "b"]));
+///
+/// let record_batch = RecordBatch::try_from_iter(vec![
+/// ("a", a),
+/// ("b", b),
+/// ]).unwrap();
+///
+/// let batches: Vec<RecordBatch> = vec![record_batch.clone(),
record_batch.clone()];
+///
+/// let mut reader = RecordBatchIterator::new(batches.into_iter().map(Ok),
record_batch.schema());
+///
+/// assert_eq!(reader.schema(), record_batch.schema());
+/// assert_eq!(reader.next().unwrap().unwrap(), record_batch);
+/// # assert_eq!(reader.next().unwrap().unwrap(), record_batch);
+/// # assert!(reader.next().is_none());
+/// ```
+pub struct RecordBatchIterator<I>
+where
+ I: IntoIterator<Item = Result<RecordBatch, ArrowError>>,
+{
+ inner: I::IntoIter,
+ inner_schema: SchemaRef,
+}
+
+impl<I> RecordBatchIterator<I>
+where
+ I: IntoIterator<Item = Result<RecordBatch, ArrowError>>,
+{
+ /// Create a new [RecordBatchIterator].
+ ///
+ /// If `iter` is an infallible iterator, use `.map(Ok)`.
+ pub fn new(iter: I, schema: SchemaRef) -> Self {
+ Self {
+ inner: iter.into_iter(),
+ inner_schema: schema,
+ }
+ }
+}
+
+impl<I> Iterator for RecordBatchIterator<I>
+where
+ I: IntoIterator<Item = Result<RecordBatch, ArrowError>>,
+{
+ type Item = I::Item;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ self.inner.next()
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ self.inner.size_hint()
+ }
+}
+
+impl<I> RecordBatchReader for RecordBatchIterator<I>
+where
+ I: IntoIterator<Item = Result<RecordBatch, ArrowError>>,
+{
+ fn schema(&self) -> SchemaRef {
+ self.inner_schema.clone()
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;