comphead commented on code in PR #16616:
URL: https://github.com/apache/datafusion/pull/16616#discussion_r2175983510


##########
datafusion/core/tests/execution/coop.rs:
##########
@@ -250,6 +252,61 @@ async fn agg_grouped_topk_yields(
     query_yields(aggr, session_ctx.task_ctx()).await
 }
 
+#[rstest]
+#[tokio::test]
+// A test that mocks the behavior of `SpillManager::read_spill_as_stream` 
without file access
+// to verify that a cooperative stream would properly yields in a spill file 
read scenario
+async fn spill_reader_stream_yield() -> Result<(), Box<dyn Error>> {
+    use arrow::compute::concat_batches;
+    use datafusion_physical_plan::common::spawn_buffered;
+
+    // A mock stream that always returns `Poll::Ready(Some(...))` immediately
+    let always_ready =
+        make_lazy_exec("value", false).execute(0, 
SessionContext::new().task_ctx())?;
+
+    // this function makes a consumer stream that resembles how read_stream 
from spill file is constructed
+    let stream = make_cooperative(always_ready);
+
+    // Set large buffer so that buffer always has free space for the 
producer/sender
+    let buffer_capacity = 100_000;
+    let mut mock_stream = spawn_buffered(stream, buffer_capacity);
+    let schema = mock_stream.schema();
+
+    let consumer_stream = futures::stream::poll_fn(move |cx| {
+        let mut collected = vec![];

Review Comment:
   I would think of having some cap of iterations or timelimit instead of 
infinite loop?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to