This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new f4d4f76e7 feat: implement generic record batch reader (#3733)
f4d4f76e7 is described below

commit f4d4f76e75c6f3f3127f025d05a526fc5334459a
Author: Will Jones <[email protected]>
AuthorDate: Mon Feb 20 03:43:23 2023 -0800

    feat: implement generic record batch reader (#3733)
---
 arrow-array/src/lib.rs          |  4 ++-
 arrow-array/src/record_batch.rs | 74 +++++++++++++++++++++++++++++++++++++++++
 2 files changed, 77 insertions(+), 1 deletion(-)

diff --git a/arrow-array/src/lib.rs b/arrow-array/src/lib.rs
index d8dc6efe2..2cee2650e 100644
--- a/arrow-array/src/lib.rs
+++ b/arrow-array/src/lib.rs
@@ -165,7 +165,9 @@ pub mod array;
 pub use array::*;
 
 mod record_batch;
-pub use record_batch::{RecordBatch, RecordBatchOptions, RecordBatchReader};
+pub use record_batch::{
+    RecordBatch, RecordBatchIterator, RecordBatchOptions, RecordBatchReader,
+};
 
 mod arithmetic;
 pub use arithmetic::ArrowNativeTypeOp;
diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs
index 035efb4f0..3b517872a 100644
--- a/arrow-array/src/record_batch.rs
+++ b/arrow-array/src/record_batch.rs
@@ -24,6 +24,8 @@ use std::ops::Index;
 use std::sync::Arc;
 
 /// Trait for types that can read `RecordBatch`'s.
+///
+/// To create from an iterator, see [RecordBatchIterator].
 pub trait RecordBatchReader: Iterator<Item = Result<RecordBatch, ArrowError>> {
     /// Returns the schema of this `RecordBatchReader`.
     ///
@@ -491,6 +493,78 @@ impl Index<&str> for RecordBatch {
     }
 }
 
+/// Generic implementation of [RecordBatchReader] that wraps an iterator.
+///
+/// # Example
+///
+/// ```
+/// # use std::sync::Arc;
+/// # use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray, 
RecordBatchIterator, RecordBatchReader};
+/// #
+/// let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2]));
+/// let b: ArrayRef = Arc::new(StringArray::from(vec!["a", "b"]));
+///
+/// let record_batch = RecordBatch::try_from_iter(vec![
+///   ("a", a),
+///   ("b", b),
+/// ]).unwrap();
+///
+/// let batches: Vec<RecordBatch> = vec![record_batch.clone(), 
record_batch.clone()];
+///
+/// let mut reader = RecordBatchIterator::new(batches.into_iter().map(Ok), 
record_batch.schema());
+///
+/// assert_eq!(reader.schema(), record_batch.schema());
+/// assert_eq!(reader.next().unwrap().unwrap(), record_batch);
+/// # assert_eq!(reader.next().unwrap().unwrap(), record_batch);
+/// # assert!(reader.next().is_none());
+/// ```
+pub struct RecordBatchIterator<I>
+where
+    I: IntoIterator<Item = Result<RecordBatch, ArrowError>>,
+{
+    inner: I::IntoIter,
+    inner_schema: SchemaRef,
+}
+
+impl<I> RecordBatchIterator<I>
+where
+    I: IntoIterator<Item = Result<RecordBatch, ArrowError>>,
+{
+    /// Create a new [RecordBatchIterator].
+    ///
+    /// If `iter` is an infallible iterator, use `.map(Ok)`.
+    pub fn new(iter: I, schema: SchemaRef) -> Self {
+        Self {
+            inner: iter.into_iter(),
+            inner_schema: schema,
+        }
+    }
+}
+
+impl<I> Iterator for RecordBatchIterator<I>
+where
+    I: IntoIterator<Item = Result<RecordBatch, ArrowError>>,
+{
+    type Item = I::Item;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        self.inner.next()
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        self.inner.size_hint()
+    }
+}
+
+impl<I> RecordBatchReader for RecordBatchIterator<I>
+where
+    I: IntoIterator<Item = Result<RecordBatch, ArrowError>>,
+{
+    fn schema(&self) -> SchemaRef {
+        self.inner_schema.clone()
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;

Reply via email to