This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new ee95caea8d feat: allow to `spawn`/`spawn_blocking` on a provided 
runtime in `RecordBatchReceiverStreamBuilder` (#17239)
ee95caea8d is described below

commit ee95caea8d8ac8ea07e2ba5c36e787f7391d6c4d
Author: Raz Luvaton <[email protected]>
AuthorDate: Sat Aug 23 12:50:46 2025 +0300

    feat: allow to `spawn`/`spawn_blocking` on a provided runtime in 
`RecordBatchReceiverStreamBuilder` (#17239)
---
 datafusion/physical-plan/src/stream.rs | 100 +++++++++++++++++++++++++++++++++
 1 file changed, 100 insertions(+)

diff --git a/datafusion/physical-plan/src/stream.rs 
b/datafusion/physical-plan/src/stream.rs
index 773a098669..a7e2904ad4 100644
--- a/datafusion/physical-plan/src/stream.rs
+++ b/datafusion/physical-plan/src/stream.rs
@@ -38,6 +38,7 @@ use futures::stream::BoxStream;
 use futures::{Future, Stream, StreamExt};
 use log::debug;
 use pin_project_lite::pin_project;
+use tokio::runtime::Handle;
 use tokio::sync::mpsc::{Receiver, Sender};
 
 /// Creates a stream from a collection of producing tasks, routing panics to 
the stream.
@@ -84,6 +85,15 @@ impl<O: Send + 'static> ReceiverStreamBuilder<O> {
         self.join_set.spawn(task);
     }
 
+    /// Same as [`Self::spawn`] but it spawns the task on the provided runtime
+    pub fn spawn_on<F>(&mut self, task: F, handle: &Handle)
+    where
+        F: Future<Output = Result<()>>,
+        F: Send + 'static,
+    {
+        self.join_set.spawn_on(task, handle);
+    }
+
     /// Spawn a blocking task that will be aborted if this builder (or the 
stream
     /// built from it) are dropped.
     ///
@@ -97,6 +107,15 @@ impl<O: Send + 'static> ReceiverStreamBuilder<O> {
         self.join_set.spawn_blocking(f);
     }
 
+    /// Same as [`Self::spawn_blocking`] but it spawns the blocking task on 
the provided runtime
+    pub fn spawn_blocking_on<F>(&mut self, f: F, handle: &Handle)
+    where
+        F: FnOnce() -> Result<()>,
+        F: Send + 'static,
+    {
+        self.join_set.spawn_blocking_on(f, handle);
+    }
+
     /// Create a stream of all data written to `tx`
     pub fn build(self) -> BoxStream<'static, Result<O>> {
         let Self {
@@ -248,6 +267,15 @@ impl RecordBatchReceiverStreamBuilder {
         self.inner.spawn(task)
     }
 
+    /// Same as [`Self::spawn`] but it spawns the task on the provided runtime.
+    pub fn spawn_on<F>(&mut self, task: F, handle: &Handle)
+    where
+        F: Future<Output = Result<()>>,
+        F: Send + 'static,
+    {
+        self.inner.spawn_on(task, handle)
+    }
+
     /// Spawn a blocking task tied to the builder and stream.
     ///
     /// # Drop / Cancel Behavior
@@ -275,6 +303,15 @@ impl RecordBatchReceiverStreamBuilder {
         self.inner.spawn_blocking(f)
     }
 
+    /// Same as [`Self::spawn_blocking`] but it spawns the blocking task on 
the provided runtime.
+    pub fn spawn_blocking_on<F>(&mut self, f: F, handle: &Handle)
+    where
+        F: FnOnce() -> Result<()>,
+        F: Send + 'static,
+    {
+        self.inner.spawn_blocking_on(f, handle)
+    }
+
     /// Runs the `partition` of the `input` ExecutionPlan on the
     /// tokio thread pool and writes its outputs to this stream
     ///
@@ -822,4 +859,67 @@ mod test {
             );
         }
     }
+
+    #[test]
+    fn record_batch_receiver_stream_builder_spawn_on_runtime() {
+        let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
+            .enable_all()
+            .build()
+            .unwrap();
+
+        let mut builder =
+            RecordBatchReceiverStreamBuilder::new(Arc::new(Schema::empty()), 
10);
+
+        let tx1 = builder.tx();
+        builder.spawn_on(
+            async move {
+                tx1.send(Ok(RecordBatch::new_empty(Arc::new(Schema::empty()))))
+                    .await
+                    .unwrap();
+
+                Ok(())
+            },
+            tokio_runtime.handle(),
+        );
+
+        let tx2 = builder.tx();
+        builder.spawn_blocking_on(
+            move || {
+                
tx2.blocking_send(Ok(RecordBatch::new_empty(Arc::new(Schema::empty()))))
+                    .unwrap();
+
+                Ok(())
+            },
+            tokio_runtime.handle(),
+        );
+
+        let mut stream = builder.build();
+
+        let mut number_of_batches = 0;
+
+        loop {
+            let poll = stream.poll_next_unpin(&mut Context::from_waker(
+                futures::task::noop_waker_ref(),
+            ));
+
+            match poll {
+                Poll::Ready(None) => {
+                    break;
+                }
+                Poll::Ready(Some(Ok(batch))) => {
+                    number_of_batches += 1;
+                    assert_eq!(batch.num_rows(), 0);
+                }
+                Poll::Ready(Some(Err(e))) => panic!("Unexpected error: {e}"),
+                Poll::Pending => {
+                    continue;
+                }
+            }
+        }
+
+        assert_eq!(
+            number_of_batches, 2,
+            "Should have received exactly one empty batch"
+        );
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to