comphead commented on code in PR #16616: URL: https://github.com/apache/datafusion/pull/16616#discussion_r2175983510
########## datafusion/core/tests/execution/coop.rs: ########## @@ -250,6 +252,61 @@ async fn agg_grouped_topk_yields( query_yields(aggr, session_ctx.task_ctx()).await } +#[rstest] +#[tokio::test] +// A test that mocks the behavior of `SpillManager::read_spill_as_stream` without file access +// to verify that a cooperative stream would properly yields in a spill file read scenario +async fn spill_reader_stream_yield() -> Result<(), Box<dyn Error>> { + use arrow::compute::concat_batches; + use datafusion_physical_plan::common::spawn_buffered; + + // A mock stream that always returns `Poll::Ready(Some(...))` immediately + let always_ready = + make_lazy_exec("value", false).execute(0, SessionContext::new().task_ctx())?; + + // this function makes a consumer stream that resembles how read_stream from spill file is constructed + let stream = make_cooperative(always_ready); + + // Set large buffer so that buffer always has free space for the producer/sender + let buffer_capacity = 100_000; + let mut mock_stream = spawn_buffered(stream, buffer_capacity); + let schema = mock_stream.schema(); + + let consumer_stream = futures::stream::poll_fn(move |cx| { + let mut collected = vec![]; Review Comment: I would think of having some cap of iterations or timelimit instead of infinite loop? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org