This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new ee95caea8d feat: allow to `spawn`/`spawn_blocking` on a provided
runtime in `RecordBatchReceiverStreamBuilder` (#17239)
ee95caea8d is described below
commit ee95caea8d8ac8ea07e2ba5c36e787f7391d6c4d
Author: Raz Luvaton <[email protected]>
AuthorDate: Sat Aug 23 12:50:46 2025 +0300
feat: allow to `spawn`/`spawn_blocking` on a provided runtime in
`RecordBatchReceiverStreamBuilder` (#17239)
---
datafusion/physical-plan/src/stream.rs | 100 +++++++++++++++++++++++++++++++++
1 file changed, 100 insertions(+)
diff --git a/datafusion/physical-plan/src/stream.rs
b/datafusion/physical-plan/src/stream.rs
index 773a098669..a7e2904ad4 100644
--- a/datafusion/physical-plan/src/stream.rs
+++ b/datafusion/physical-plan/src/stream.rs
@@ -38,6 +38,7 @@ use futures::stream::BoxStream;
use futures::{Future, Stream, StreamExt};
use log::debug;
use pin_project_lite::pin_project;
+use tokio::runtime::Handle;
use tokio::sync::mpsc::{Receiver, Sender};
/// Creates a stream from a collection of producing tasks, routing panics to
the stream.
@@ -84,6 +85,15 @@ impl<O: Send + 'static> ReceiverStreamBuilder<O> {
self.join_set.spawn(task);
}
+ /// Same as [`Self::spawn`] but it spawns the task on the provided runtime
+ pub fn spawn_on<F>(&mut self, task: F, handle: &Handle)
+ where
+ F: Future<Output = Result<()>>,
+ F: Send + 'static,
+ {
+ self.join_set.spawn_on(task, handle);
+ }
+
/// Spawn a blocking task that will be aborted if this builder (or the
stream
/// built from it) are dropped.
///
@@ -97,6 +107,15 @@ impl<O: Send + 'static> ReceiverStreamBuilder<O> {
self.join_set.spawn_blocking(f);
}
+ /// Same as [`Self::spawn_blocking`] but it spawns the blocking task on
the provided runtime
+ pub fn spawn_blocking_on<F>(&mut self, f: F, handle: &Handle)
+ where
+ F: FnOnce() -> Result<()>,
+ F: Send + 'static,
+ {
+ self.join_set.spawn_blocking_on(f, handle);
+ }
+
/// Create a stream of all data written to `tx`
pub fn build(self) -> BoxStream<'static, Result<O>> {
let Self {
@@ -248,6 +267,15 @@ impl RecordBatchReceiverStreamBuilder {
self.inner.spawn(task)
}
+ /// Same as [`Self::spawn`] but it spawns the task on the provided runtime.
+ pub fn spawn_on<F>(&mut self, task: F, handle: &Handle)
+ where
+ F: Future<Output = Result<()>>,
+ F: Send + 'static,
+ {
+ self.inner.spawn_on(task, handle)
+ }
+
/// Spawn a blocking task tied to the builder and stream.
///
/// # Drop / Cancel Behavior
@@ -275,6 +303,15 @@ impl RecordBatchReceiverStreamBuilder {
self.inner.spawn_blocking(f)
}
+ /// Same as [`Self::spawn_blocking`] but it spawns the blocking task on
the provided runtime.
+ pub fn spawn_blocking_on<F>(&mut self, f: F, handle: &Handle)
+ where
+ F: FnOnce() -> Result<()>,
+ F: Send + 'static,
+ {
+ self.inner.spawn_blocking_on(f, handle)
+ }
+
/// Runs the `partition` of the `input` ExecutionPlan on the
/// tokio thread pool and writes its outputs to this stream
///
@@ -822,4 +859,67 @@ mod test {
);
}
}
+
+ #[test]
+ fn record_batch_receiver_stream_builder_spawn_on_runtime() {
+ let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
+ .enable_all()
+ .build()
+ .unwrap();
+
+ let mut builder =
+ RecordBatchReceiverStreamBuilder::new(Arc::new(Schema::empty()),
10);
+
+ let tx1 = builder.tx();
+ builder.spawn_on(
+ async move {
+ tx1.send(Ok(RecordBatch::new_empty(Arc::new(Schema::empty()))))
+ .await
+ .unwrap();
+
+ Ok(())
+ },
+ tokio_runtime.handle(),
+ );
+
+ let tx2 = builder.tx();
+ builder.spawn_blocking_on(
+ move || {
+
tx2.blocking_send(Ok(RecordBatch::new_empty(Arc::new(Schema::empty()))))
+ .unwrap();
+
+ Ok(())
+ },
+ tokio_runtime.handle(),
+ );
+
+ let mut stream = builder.build();
+
+ let mut number_of_batches = 0;
+
+ loop {
+ let poll = stream.poll_next_unpin(&mut Context::from_waker(
+ futures::task::noop_waker_ref(),
+ ));
+
+ match poll {
+ Poll::Ready(None) => {
+ break;
+ }
+ Poll::Ready(Some(Ok(batch))) => {
+ number_of_batches += 1;
+ assert_eq!(batch.num_rows(), 0);
+ }
+ Poll::Ready(Some(Err(e))) => panic!("Unexpected error: {e}"),
+ Poll::Pending => {
+ continue;
+ }
+ }
+ }
+
+ assert_eq!(
+ number_of_batches, 2,
+ "Should have received exactly one empty batch"
+ );
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]