Re: [I] Add tests for yielding / cancelling in SpillManager [datafusion]

2025-08-01 Thread via GitHub


alamb closed issue #16482: Add tests for yielding / cancelling in SpillManager
URL: https://github.com/apache/datafusion/issues/16482


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] Add tests for yielding / cancelling in SpillManager [datafusion]

2025-06-25 Thread via GitHub


ding-young commented on issue #16482:
URL: https://github.com/apache/datafusion/issues/16482#issuecomment-3007233676

   @pepijnve Thanks for the clarification and simplifying it :) I’ll go ahead 
and open a PR based on that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] Add tests for yielding / cancelling in SpillManager [datafusion]

2025-06-25 Thread via GitHub


pepijnve commented on issue #16482:
URL: https://github.com/apache/datafusion/issues/16482#issuecomment-3005212491

   @ding-young I think the gist of it is correct indeed. I tinkered a bit 
locally with `spill_reader_yield` and condensed it down to this which is doing 
the same thing but is a bit easier to grasp since it fits on one page.
   
   ```
   async fn spill_reader_yield(
   ) -> Result<(), Box> {
   use datafusion_physical_plan::common::spawn_buffered;
   use datafusion_execution::{RecordBatchStream};
   use futures::{Stream};
   
   /// A mock stream that always returns `Poll::Ready(Some(...))` 
immediately
   let always_ready = make_lazy_exec("value", false).execute(0, 
SessionContext::new().task_ctx())?;
   
   // this function makes a consumer stream that resembles how read_stream 
from spill file is constructed 
   let stream = make_cooperative(always_ready);
   
   // Set large buffer so that buffer always has free space for the 
producer/sender 
   let buffer_capacity = 100_000;  
   let mut mock_stream = spawn_buffered(stream, buffer_capacity);
   let schema = mock_stream.schema();
   
   let consumer_stream = futures::stream::poll_fn(move |cx| {
   use arrow::compute::concat_batches;
   let mut collected = vec![];
   // To make sure that inner stream is polled multiple times, loop 
forever if inner (producer) stream returns Ready
   loop {
   match mock_stream.as_mut().poll_next(cx) {
   Poll::Ready(Some(Ok(batch))) => {
   println!("received batch from inner");
   collected.push(batch);
   }
   Poll::Ready(Some(Err(e))) => {
   println!("error from inner");
   return Poll::Ready(Some(Err(e)));
   }
   Poll::Ready(None) => {
   println!("inner stream ended");
   break;
   }
   Poll::Pending => {
   // polling inner stream may return Pending only when it 
reaches budget, since 
   // we intentionally made ProducerStream always return 
Ready
   return Poll::Pending;
   }
   }
   }
   
   // This should be unreachable since the stream is canceled
   let combined = concat_batches(&mock_stream.schema(), &collected)
   .expect("Failed to concat batches");
   
   Poll::Ready(Some(Ok(combined)))
   });
   
   let consumer_record_batch_stream = 
Box::pin(RecordBatchStreamAdapter::new(
   schema,
   consumer_stream
   ));
   
   stream_yields(consumer_record_batch_stream).await
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] Add tests for yielding / cancelling in SpillManager [datafusion]

2025-06-25 Thread via GitHub


ding-young commented on issue #16482:
URL: https://github.com/apache/datafusion/issues/16482#issuecomment-3004872124

   Hi, I just came across this issue and wanted to leave a comment after 
working around writing some test codes. 
   
   I wrote a test in my local branch where I tried to simulate the behavior of 
`read_spill_as_stream` as closely as possible, while manually controlling the 
producer and consumer stream behaviors.
   
- The producer stream is designed to always return Poll::Ready(Some(...)) 
immediately.
- The consumer stream loops tightly over the stream returned from 
`spawn_buffered`, polling repeatedly as long as it returns Ready.
- The buffer capacity for `spawn_buffered` is intentionally set large to 
avoid triggering a yield.
   
   The goal is to approximate a scenario where the cooperative yielding 
behavior must come from the stream wrapper (i.e., CooperativeStream) and not 
from channel backpressure or natural I/O waits. 
   
   Could you confirm whether this matches the intention? Here's some code in my 
[local 
branch](https://github.com/apache/datafusion/compare/main...ding-young:datafusion:test-cancel-spill#diff-eba449dc026a6c39c12232a28b4a9b24380fc81ebbcb0fe279bd5b631c50badbR30)
 
   
   I may not have fully understood it, so please feel free to correct me if I 
got something wrong. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] Add tests for yielding / cancelling in SpillManager [datafusion]

2025-06-20 Thread via GitHub


alamb commented on issue #16482:
URL: https://github.com/apache/datafusion/issues/16482#issuecomment-2992155680

   Another potential method might be to use a Barrier or something to precisely 
control when streams were waiting / etc. 
   
   Maybe the extra coverage isn't worth the overhead. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] Add tests for yielding / cancelling in SpillManager [datafusion]

2025-06-20 Thread via GitHub


pepijnve commented on issue #16482:
URL: https://github.com/apache/datafusion/issues/16482#issuecomment-2991955349

   I had a look at this already while working on #16398 but got stuck on 
figuring out how to actually test this. There's a lot going on in the 
combination of SpillManager and SPM that make it difficult to be sure you're 
testing the actual task budget consumption aspect.
   
   `SpillManager::read_spill_as_stream` is the function where the cooperative 
decorator was added #16398. This stream immediately gets wrapped in 
`RecordBatchReceiverStream` created by `spawn_buffered`. The consequence is 
that from the point of view of the consumer of the result of 
`SpillManager::read_spill_as_stream` the stream is already cooperative since 
`RecordBatchReceiverStream` uses a tokio mpsc channel.
   
   So what're really trying to test is that the inner spawned task in 
`spawn_buffered` that sends to the channel yields every now and then. Since the 
buffer of the channel is bounded in size, this is likely to happen naturally 
already.
   
   So long story short... I think we might be able to test this by
   - Writing a test focused on `spawn_buffered` with a sufficiently large 
buffer size
   - Set up a very fast consumer of the `RecordBatchReceiverStream` so that the 
buffer always has room
   - Use an artificially slow (i.e. do a blocking sleep or something like that 
in poll_next), always ready producer stream
   
   With that combination I think you could in theory have another case of stuck 
task at 
https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/common.rs#L108
   That would manifest itself as the runtime not being able to cleanly shut 
down because a deeply hidden inner spawned task refuses to abort.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]