Re: [I] Add tests for yielding / cancelling in SpillManager [datafusion]
alamb closed issue #16482: Add tests for yielding / cancelling in SpillManager URL: https://github.com/apache/datafusion/issues/16482 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [I] Add tests for yielding / cancelling in SpillManager [datafusion]
ding-young commented on issue #16482: URL: https://github.com/apache/datafusion/issues/16482#issuecomment-3007233676 @pepijnve Thanks for the clarification and simplifying it :) I’ll go ahead and open a PR based on that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [I] Add tests for yielding / cancelling in SpillManager [datafusion]
pepijnve commented on issue #16482:
URL: https://github.com/apache/datafusion/issues/16482#issuecomment-3005212491
@ding-young I think the gist of it is correct indeed. I tinkered a bit
locally with `spill_reader_yield` and condensed it down to this which is doing
the same thing but is a bit easier to grasp since it fits on one page.
```
async fn spill_reader_yield(
) -> Result<(), Box> {
use datafusion_physical_plan::common::spawn_buffered;
use datafusion_execution::{RecordBatchStream};
use futures::{Stream};
/// A mock stream that always returns `Poll::Ready(Some(...))`
immediately
let always_ready = make_lazy_exec("value", false).execute(0,
SessionContext::new().task_ctx())?;
// this function makes a consumer stream that resembles how read_stream
from spill file is constructed
let stream = make_cooperative(always_ready);
// Set large buffer so that buffer always has free space for the
producer/sender
let buffer_capacity = 100_000;
let mut mock_stream = spawn_buffered(stream, buffer_capacity);
let schema = mock_stream.schema();
let consumer_stream = futures::stream::poll_fn(move |cx| {
use arrow::compute::concat_batches;
let mut collected = vec![];
// To make sure that inner stream is polled multiple times, loop
forever if inner (producer) stream returns Ready
loop {
match mock_stream.as_mut().poll_next(cx) {
Poll::Ready(Some(Ok(batch))) => {
println!("received batch from inner");
collected.push(batch);
}
Poll::Ready(Some(Err(e))) => {
println!("error from inner");
return Poll::Ready(Some(Err(e)));
}
Poll::Ready(None) => {
println!("inner stream ended");
break;
}
Poll::Pending => {
// polling inner stream may return Pending only when it
reaches budget, since
// we intentionally made ProducerStream always return
Ready
return Poll::Pending;
}
}
}
// This should be unreachable since the stream is canceled
let combined = concat_batches(&mock_stream.schema(), &collected)
.expect("Failed to concat batches");
Poll::Ready(Some(Ok(combined)))
});
let consumer_record_batch_stream =
Box::pin(RecordBatchStreamAdapter::new(
schema,
consumer_stream
));
stream_yields(consumer_record_batch_stream).await
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [I] Add tests for yielding / cancelling in SpillManager [datafusion]
ding-young commented on issue #16482: URL: https://github.com/apache/datafusion/issues/16482#issuecomment-3004872124 Hi, I just came across this issue and wanted to leave a comment after working around writing some test codes. I wrote a test in my local branch where I tried to simulate the behavior of `read_spill_as_stream` as closely as possible, while manually controlling the producer and consumer stream behaviors. - The producer stream is designed to always return Poll::Ready(Some(...)) immediately. - The consumer stream loops tightly over the stream returned from `spawn_buffered`, polling repeatedly as long as it returns Ready. - The buffer capacity for `spawn_buffered` is intentionally set large to avoid triggering a yield. The goal is to approximate a scenario where the cooperative yielding behavior must come from the stream wrapper (i.e., CooperativeStream) and not from channel backpressure or natural I/O waits. Could you confirm whether this matches the intention? Here's some code in my [local branch](https://github.com/apache/datafusion/compare/main...ding-young:datafusion:test-cancel-spill#diff-eba449dc026a6c39c12232a28b4a9b24380fc81ebbcb0fe279bd5b631c50badbR30) I may not have fully understood it, so please feel free to correct me if I got something wrong. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [I] Add tests for yielding / cancelling in SpillManager [datafusion]
alamb commented on issue #16482: URL: https://github.com/apache/datafusion/issues/16482#issuecomment-2992155680 Another potential method might be to use a Barrier or something to precisely control when streams were waiting / etc. Maybe the extra coverage isn't worth the overhead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [I] Add tests for yielding / cancelling in SpillManager [datafusion]
pepijnve commented on issue #16482: URL: https://github.com/apache/datafusion/issues/16482#issuecomment-2991955349 I had a look at this already while working on #16398 but got stuck on figuring out how to actually test this. There's a lot going on in the combination of SpillManager and SPM that make it difficult to be sure you're testing the actual task budget consumption aspect. `SpillManager::read_spill_as_stream` is the function where the cooperative decorator was added #16398. This stream immediately gets wrapped in `RecordBatchReceiverStream` created by `spawn_buffered`. The consequence is that from the point of view of the consumer of the result of `SpillManager::read_spill_as_stream` the stream is already cooperative since `RecordBatchReceiverStream` uses a tokio mpsc channel. So what're really trying to test is that the inner spawned task in `spawn_buffered` that sends to the channel yields every now and then. Since the buffer of the channel is bounded in size, this is likely to happen naturally already. So long story short... I think we might be able to test this by - Writing a test focused on `spawn_buffered` with a sufficiently large buffer size - Set up a very fast consumer of the `RecordBatchReceiverStream` so that the buffer always has room - Use an artificially slow (i.e. do a blocking sleep or something like that in poll_next), always ready producer stream With that combination I think you could in theory have another case of stuck task at https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/common.rs#L108 That would manifest itself as the runtime not being able to cleanly shut down because a deeply hidden inner spawned task refuses to abort. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
