This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 5ea197dd9 feat: support RecordBatchReader on boxed trait objects
(#4475)
5ea197dd9 is described below
commit 5ea197dd9de7cd2c5ad9a37e36a24ddf3ac5688f
Author: Will Jones <[email protected]>
AuthorDate: Mon Jul 3 07:00:11 2023 -0700
feat: support RecordBatchReader on boxed trait objects (#4475)
* Impl RBR for Box
* Require send to create a FFI stream
---
arrow-array/src/record_batch.rs | 24 ++++++++++++++++++++++++
arrow/src/ffi_stream.rs | 10 +++++-----
2 files changed, 29 insertions(+), 5 deletions(-)
diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs
index d2e36780a..3134c9ecb 100644
--- a/arrow-array/src/record_batch.rs
+++ b/arrow-array/src/record_batch.rs
@@ -43,6 +43,12 @@ pub trait RecordBatchReader: Iterator<Item =
Result<RecordBatch, ArrowError>> {
}
}
+impl<R: RecordBatchReader + ?Sized> RecordBatchReader for Box<R> {
+ fn schema(&self) -> SchemaRef {
+ self.as_ref().schema()
+ }
+}
+
/// Trait for types that can write `RecordBatch`'s.
pub trait RecordBatchWriter {
/// Write a single batch to the writer.
@@ -1115,4 +1121,22 @@ mod tests {
// Cannot remove metadata
batch.with_schema(nullable_schema).unwrap_err();
}
+
+ #[test]
+ fn test_boxed_reader() {
+ // Make sure we can pass a boxed reader to a function generic over
+ // RecordBatchReader.
+ let schema = Schema::new(vec![Field::new("a", DataType::Int32,
false)]);
+ let schema = Arc::new(schema);
+
+ let reader = RecordBatchIterator::new(std::iter::empty(), schema);
+ let reader: Box<dyn RecordBatchReader + Send> = Box::new(reader);
+
+ fn get_size(reader: impl RecordBatchReader) -> usize {
+ reader.size_hint().0
+ }
+
+ let size = get_size(reader);
+ assert_eq!(size, 0);
+ }
}
diff --git a/arrow/src/ffi_stream.rs b/arrow/src/ffi_stream.rs
index 5fb1c1073..83d4eead3 100644
--- a/arrow/src/ffi_stream.rs
+++ b/arrow/src/ffi_stream.rs
@@ -119,7 +119,7 @@ unsafe extern "C" fn release_stream(stream: *mut
FFI_ArrowArrayStream) {
}
struct StreamPrivateData {
- batch_reader: Box<dyn RecordBatchReader>,
+ batch_reader: Box<dyn RecordBatchReader + Send>,
last_error: String,
}
@@ -157,7 +157,7 @@ impl Drop for FFI_ArrowArrayStream {
impl FFI_ArrowArrayStream {
/// Creates a new [`FFI_ArrowArrayStream`].
- pub fn new(batch_reader: Box<dyn RecordBatchReader>) -> Self {
+ pub fn new(batch_reader: Box<dyn RecordBatchReader + Send>) -> Self {
let private_data = Box::new(StreamPrivateData {
batch_reader,
last_error: String::new(),
@@ -371,7 +371,7 @@ impl RecordBatchReader for ArrowArrayStreamReader {
/// Assumes that the pointer represents valid C Stream Interfaces, both in
memory
/// representation and lifetime via the `release` mechanism.
pub unsafe fn export_reader_into_raw(
- reader: Box<dyn RecordBatchReader>,
+ reader: Box<dyn RecordBatchReader + Send>,
out_stream: *mut FFI_ArrowArrayStream,
) {
let stream = FFI_ArrowArrayStream::new(reader);
@@ -388,13 +388,13 @@ mod tests {
struct TestRecordBatchReader {
schema: SchemaRef,
- iter: Box<dyn Iterator<Item = Result<RecordBatch>>>,
+ iter: Box<dyn Iterator<Item = Result<RecordBatch>> + Send>,
}
impl TestRecordBatchReader {
pub fn new(
schema: SchemaRef,
- iter: Box<dyn Iterator<Item = Result<RecordBatch>>>,
+ iter: Box<dyn Iterator<Item = Result<RecordBatch>> + Send>,
) -> Box<TestRecordBatchReader> {
Box::new(TestRecordBatchReader { schema, iter })
}