This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ea197dd9 feat: support RecordBatchReader on boxed trait objects 
(#4475)
5ea197dd9 is described below

commit 5ea197dd9de7cd2c5ad9a37e36a24ddf3ac5688f
Author: Will Jones <[email protected]>
AuthorDate: Mon Jul 3 07:00:11 2023 -0700

    feat: support RecordBatchReader on boxed trait objects (#4475)
    
    * Impl RBR for Box
    
    * Require send to create a FFI stream
---
 arrow-array/src/record_batch.rs | 24 ++++++++++++++++++++++++
 arrow/src/ffi_stream.rs         | 10 +++++-----
 2 files changed, 29 insertions(+), 5 deletions(-)

diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs
index d2e36780a..3134c9ecb 100644
--- a/arrow-array/src/record_batch.rs
+++ b/arrow-array/src/record_batch.rs
@@ -43,6 +43,12 @@ pub trait RecordBatchReader: Iterator<Item = 
Result<RecordBatch, ArrowError>> {
     }
 }
 
+impl<R: RecordBatchReader + ?Sized> RecordBatchReader for Box<R> {
+    fn schema(&self) -> SchemaRef {
+        self.as_ref().schema()
+    }
+}
+
 /// Trait for types that can write `RecordBatch`'s.
 pub trait RecordBatchWriter {
     /// Write a single batch to the writer.
@@ -1115,4 +1121,22 @@ mod tests {
         // Cannot remove metadata
         batch.with_schema(nullable_schema).unwrap_err();
     }
+
+    #[test]
+    fn test_boxed_reader() {
+        // Make sure we can pass a boxed reader to a function generic over
+        // RecordBatchReader.
+        let schema = Schema::new(vec![Field::new("a", DataType::Int32, 
false)]);
+        let schema = Arc::new(schema);
+
+        let reader = RecordBatchIterator::new(std::iter::empty(), schema);
+        let reader: Box<dyn RecordBatchReader + Send> = Box::new(reader);
+
+        fn get_size(reader: impl RecordBatchReader) -> usize {
+            reader.size_hint().0
+        }
+
+        let size = get_size(reader);
+        assert_eq!(size, 0);
+    }
 }
diff --git a/arrow/src/ffi_stream.rs b/arrow/src/ffi_stream.rs
index 5fb1c1073..83d4eead3 100644
--- a/arrow/src/ffi_stream.rs
+++ b/arrow/src/ffi_stream.rs
@@ -119,7 +119,7 @@ unsafe extern "C" fn release_stream(stream: *mut 
FFI_ArrowArrayStream) {
 }
 
 struct StreamPrivateData {
-    batch_reader: Box<dyn RecordBatchReader>,
+    batch_reader: Box<dyn RecordBatchReader + Send>,
     last_error: String,
 }
 
@@ -157,7 +157,7 @@ impl Drop for FFI_ArrowArrayStream {
 
 impl FFI_ArrowArrayStream {
     /// Creates a new [`FFI_ArrowArrayStream`].
-    pub fn new(batch_reader: Box<dyn RecordBatchReader>) -> Self {
+    pub fn new(batch_reader: Box<dyn RecordBatchReader + Send>) -> Self {
         let private_data = Box::new(StreamPrivateData {
             batch_reader,
             last_error: String::new(),
@@ -371,7 +371,7 @@ impl RecordBatchReader for ArrowArrayStreamReader {
 /// Assumes that the pointer represents valid C Stream Interfaces, both in 
memory
 /// representation and lifetime via the `release` mechanism.
 pub unsafe fn export_reader_into_raw(
-    reader: Box<dyn RecordBatchReader>,
+    reader: Box<dyn RecordBatchReader + Send>,
     out_stream: *mut FFI_ArrowArrayStream,
 ) {
     let stream = FFI_ArrowArrayStream::new(reader);
@@ -388,13 +388,13 @@ mod tests {
 
     struct TestRecordBatchReader {
         schema: SchemaRef,
-        iter: Box<dyn Iterator<Item = Result<RecordBatch>>>,
+        iter: Box<dyn Iterator<Item = Result<RecordBatch>> + Send>,
     }
 
     impl TestRecordBatchReader {
         pub fn new(
             schema: SchemaRef,
-            iter: Box<dyn Iterator<Item = Result<RecordBatch>>>,
+            iter: Box<dyn Iterator<Item = Result<RecordBatch>> + Send>,
         ) -> Box<TestRecordBatchReader> {
             Box::new(TestRecordBatchReader { schema, iter })
         }

Reply via email to