Re: [I] [Epic] Pipeline breaking cancellation support and improvement [datafusion]

2025-06-20 Thread via GitHub


alamb closed issue #16353: [Epic] Pipeline breaking cancellation support and 
improvement
URL: https://github.com/apache/datafusion/issues/16353


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] [Epic] Pipeline breaking cancellation support and improvement [datafusion]

2025-06-13 Thread via GitHub


pepijnve commented on issue #16353:
URL: https://github.com/apache/datafusion/issues/16353#issuecomment-2970454499

   @ozankabak draft PR is in good enough shape to review I think. The general 
idea is still the same as what was there before. 
   
   The last thing I'm still working on is the ExecutionPlan API where I would 
like to replace `with_cooperative_yielding` with a plan property. Trying to get 
things to be more declarative.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] [Epic] Pipeline breaking cancellation support and improvement [datafusion]

2025-06-13 Thread via GitHub


ozankabak commented on issue #16353:
URL: https://github.com/apache/datafusion/issues/16353#issuecomment-2970156909

   > @alamb @ozankabak wdyt? Maybe this is what you were going for all along 
and I'm just slowly catching up :smiling:
   
   > The change of heart comes from the realization that Tokio itself also 
takes a 'consume at the leaves' strategy and having a task wide budget ensures 
that tasks cannot silently ignore the yield request. Once one resource depletes 
the budget, it's no longer possible to make progress anywhere else provided all 
resource participate in the budgeting system.
   
   A draft PR would be good to have, I think I can make better comments then. 
However, solving this at the stream level in a way transparent to the operator 
builder would be great in general. That was my original intention, but we 
weren't able to materialize that solution in a reasonable amount of time. Hence 
the current solution, which is basically a fallback that has some 
characteristics of the ideal solution (e.g. "transparency", focusing on leaves 
etc.), but requires support from the planner via to-be-designed APIs. The 
current approach can evolve into a decent one with such APIs, but it would 
always be worse than a proper lower-level solution. It would be good if we can 
build that, but in the meantime, I am glad that we have some solution that 
works for many cases.
   
   If it turns out that we can arrive at a proper stream-based solution 
quickly, we can retire this one quickly. Otherwise, we can incrementally 
improve what we have today as alternatives go through design/experimentation 
etc.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] [Epic] Pipeline breaking cancellation support and improvement [datafusion]

2025-06-13 Thread via GitHub


pepijnve commented on issue #16353:
URL: https://github.com/apache/datafusion/issues/16353#issuecomment-2970116925

   > This is true in theory -- but I think we also take pains to try and avoid 
"over scheduling" tasks in tokio -- for example, we purposely only have N input 
partitions (and hence N streams) per scan, even if there are 100+ files -- the 
goal is to keep all the cores busy, but not oversubscribed.
   
   What I was trying to say is that from a scheduling/yielding pov you can 
reason about each box in isolation. Whether you actually try to make 100s of 
concurrent (not parallel) tasks or not is a rabbit hole for another thread 😄


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] [Epic] Pipeline breaking cancellation support and improvement [datafusion]

2025-06-13 Thread via GitHub


zhuqi-lucas commented on issue #16353:
URL: https://github.com/apache/datafusion/issues/16353#issuecomment-2970055623

   > > So it means this sub-task corner case can be resolved?
   > 
   > Yes, that's correct.
   > 
   > > I am curious what's the budget count since we can't config it from 
datafusion, will it affect performance or other things? It seems not, because 
we already use RecordBatchReceiverStream for the budget?
   > 
   > [@zhuqi-lucas](https://github.com/zhuqi-lucas) that's correct, you can't 
configure it at the moment. That's the case for `RecordBatchReceiverStream` 
today as well indeed. Tokio hardcodes the magic number `128` (see 
https://github.com/tokio-rs/tokio/blob/master/tokio/src/task/coop/mod.rs#L116).
   > 
   > > If we have to share the one budget for all leaf nodes, will some leaf 
node very aggressive consuming budget will affect the total fairness or 
performance?
   > 
   > The budget is per spawned task. Every time the tokio scheduler lets a task 
run it gives it a budget of 128 which the task can then deplete until it hits 
zero. Then the task is coaxed towards yielding by making all budget aware Tokio 
resources return `Pending`. From the perspective of DataFusion code I don't 
think this really changes all that much. It's the exact same behavior you have 
today already when the source streams are `RecordBatchReceiverStream`. So the 
moment you have a repartition/coalesce you're getting exactly this with the 
current code.
   
   Got it, it makes sense to me @pepijnve! Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] [Epic] Pipeline breaking cancellation support and improvement [datafusion]

2025-06-13 Thread via GitHub


pepijnve commented on issue #16353:
URL: https://github.com/apache/datafusion/issues/16353#issuecomment-2970025341

   > I am curious what's the budget count since we can't config it from 
datafusion, will it affect performance or other things? It seems not, because 
we already use RecordBatchReceiverStream for the budget?
   
   @zhuqi-lucas that's correct, you can't configure it at the moment. That's 
the case for `RecordBatchReceiverStream` today as well indeed. Tokio hardcodes 
the magic number `128` (see 
https://github.com/tokio-rs/tokio/blob/master/tokio/src/task/coop/mod.rs#L116).
   
   > If we have to share the one budget for all leaf nodes, will some leaf node 
very aggressive consuming budget will affect the total fairness or performance?
   
   The budget is per spawned task. Every time the tokio scheduler lets a task 
run it gives it a budget of 128 which the task can then deplete until it hits 
zero. Then the task is coaxed towards yielding by making all budget aware Tokio 
resources return `Pending`.
   From the perspective of DataFusion code I don't think this really changes 
all that much. It's the exact same behavior you have today already when the 
source streams are `RecordBatchReceiverStream`. So the moment you have a 
repartition/coalesce you're getting exactly this with the current code.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] [Epic] Pipeline breaking cancellation support and improvement [datafusion]

2025-06-13 Thread via GitHub


zhuqi-lucas commented on issue #16353:
URL: https://github.com/apache/datafusion/issues/16353#issuecomment-2969886007

   > > fixes any bugs / adds features over the current one,
   > > Is "just" cleaner way to implement the same thing (this is also a fine 
thing to contribute as well).
   > 
   > There are a couple of benefits.
   > 
   > It removes the edge case seen in the interleave operator (or any `select!` 
style code in general). With the current per stream counter, one stream might 
want to yield, but the parent stream may decide to poll another stream in 
response which happens to be ready. The end result is that two cooperating 
streams may turn into a non-cooperating when they are merged. To fix this, you 
would need to adjust the merging operator as well and we're basically back 
where we started. If all cooperating streams use the same budget, then this 
problem goes away. Once the yield point has been hit, all cooperating streams 
will yield.
   
   So it means this sub-task corner case can be resolved?
   
   Fix the corner case provided in this link: 
https://gist.github.com/pepijnve/0e1a66f98033c6c44c62a51fb9dbae5a
   
   > Using the task budget also avoids the 'redundant yield' problem in the 
current version. If you now do a simple `SELECT * FROM ...` query, by default 
you'll get a `Pending` after every 64 `Ready(RecordBatch)`. With the task 
budget you will only actually inject the `Pending` when it's actually 
necessary. The system automatically does the right thing.
   
   I am curious what's the budget count since we can't config it from 
datafusion, will it affect performance or other things? It seems not, because 
we already use RecordBatchReceiverStream for the budget?
   
   
   Another question:
   
   If we have share the one budget  for all leaf nodes, will some leaf node 
very aggressive consuming budget will affect the total fairness or performance? 
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] [Epic] Pipeline breaking cancellation support and improvement [datafusion]

2025-06-13 Thread via GitHub


pepijnve commented on issue #16353:
URL: https://github.com/apache/datafusion/issues/16353#issuecomment-2969854461

   > fixes any bugs / adds features over the current one,
   > Is "just" cleaner way to implement the same thing (this is also a fine 
thing to contribute as well).
   
   There are a couple of benefits.
   
   It removes the edge case seen in the interleave operator (or any `select!` 
style code in general). With the current per stream counter, one stream might 
want to yield, but the parent stream may decide to poll another stream in 
response which happens to be ready. The end result is that two cooperating 
streams may turn into a non-cooperating when they are merged. To fix this, you 
would need to adjust the merging operator as well and we're basically back 
where we started.
   If all cooperating streams use the same budget, then this problem goes away. 
Once the yield point has been hit, all cooperating streams will yield.
   
   Using the task budget also avoids the 'redundant yield' problem in the 
current version. If you now do a simple `SELECT * FROM ...` query, by default 
you'll get a `Pending` after every 64 `Ready(RecordBatch)`. With the task 
budget you will only actually inject the `Pending` when it's actually 
necessary. The system automatically does the right thing.
   
   Finally it aligns the cooperative yielding strategy across the library. 
`RecordBatchReceiverStream` is implicitly already using this strategy in a way 
you cannot opt out of. It's better to have one consist way of solving this 
cancellation problem once and for all.
   
   I have a patch almost ready. I'll make a draft PR already so this all 
becomes a bit more tangible.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] [Epic] Pipeline breaking cancellation support and improvement [datafusion]

2025-06-13 Thread via GitHub


alamb commented on issue #16353:
URL: https://github.com/apache/datafusion/issues/16353#issuecomment-2969824040

   > As we’ve discussed above the channel receiver is already doing that for 
us. For some reason file IO was not. I’m not sure I understand why that’s the 
case and will try to figure out why tomorrow.
   
   This is consistent with our observations at InfluxData:  we saw 
uncancellable queries when feeding our plan from an in memory cache (not a file 
/ memory)
   
   > 
https://github.com/pepijnve/datafusion/blob/cancel_spec/dev/design/cancellation.md
   
   This is a really nice writeup: it matches my understanding / mental model. 
It would also make the start of a great blog post for the DataFusion blog  FWIW 
and I filed a ticket to track that idea 🎣 :
   - https://github.com/apache/datafusion/issues/16396
   
   > The more I read the more I can see DataFusion is basically a modern day 
Volcano.
   
   I think this is an accurate assessment, though I would probably phrase it as 
"DataFusion uses Volcano-style parallelism where operators are single threaded 
and Exchange (`RepartitionExec`) operators handle parallelism". The other 
prevalent style is called "Morsel Driven Parallelism" popularized by DuckDB and 
TUM/Umbra [in this paper](https://db.in.tum.de/~leis/papers/morsels.pdf) which 
uses operators that are explicitly multi-threaded.
   
   > Each of the colored blocks is an independently executing sub portion of 
the query. Translated to Tokio each of these colored blocks is a separate 
concurrent task. Each of those tasks needs to be cooperatively scheduled to 
guarantee all of them get a fair share of time to run.
   
   This is true in theory -- but I think we also take pains to try and avoid 
"over scheduling" tasks in tokio -- for example, we purposely only have `N` 
input partitions (and hence N streams) per scan, even if there are 100+ files 
-- the goal is to keep all the cores busy, but not oversubscribed.
   
   > So what does all this mean in terms of implementation:
   
   This also sounds fine to me, and would be happy to review PRs, etc. However 
it is not 100% clear  if your proposed design 
   1. fixes any bugs / adds features over the current one, 
   2. Is "just" cleaner way to implement the same thing (this is also a fine 
thing to contribute as well). 
   
   For example, I wonder if there are additional tests / cases that would be 
improved with the proposed implementation 🤔 
   
   > The one thing that we still cannot solve automatically then is dynamic 
query planning. Operators that create streams dynamically still have to make 
sure they set things up correctly themselves.
   
   In my opinion this is fine -- if operators are making dynamic streams, that 
is an advanced usecase that today must still handle canceling / yielding. I 
think it is ok if we can't find a way to automatically provide yielding 
behavior to them (they are no worse off then today)
   
   > One possible downside to this approach is that the cooperative scheduling 
budget is implementation specific to the Tokio runtime. DataFusion becomes more 
tied to Tokio rather than less. Not sure if that's an issue or not.
   
   I personally don't think this is an issue as I don't see any movement and 
have not heard any desire to move away from tokio.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] [Epic] Pipeline breaking cancellation support and improvement [datafusion]

2025-06-13 Thread via GitHub


pepijnve commented on issue #16353:
URL: https://github.com/apache/datafusion/issues/16353#issuecomment-2969344331

   > pipeline mode and pipeline breaking
   
   I'm starting to realize we might have been placing too much emphasis on this 
aspect. I've been doing my homework by reading the Volcano paper. I had never 
read that paper in depth (never has a need to), I just knew that people used 
the term to kind of refer to 'the iterator approach'. The more I read the more 
I can see DataFusion is basically a modern day Volcano.
   
   One thing DataFusion does not have explicitly, as far as I know, is an 
exchange operator. I say not explicitly, because the essential 
demand/data-driven switch part of exchange is present in a couple of operators 
like Repartition and Coalesce. Perhaps these dataflow change points are a 
better way of looking at the problem.
   
   I really miss having a white board, but here's an approximation :smiling: 
This is from the Volcano paper (with annotations by me of course).
   
   https://github.com/user-attachments/assets/76518a8f-f2fc-4def-b206-8a3fe00b3dff";
 />
   
   Each of the colored blocks is an independently executing sub portion of the 
query. Translated to Tokio each of these colored blocks is a separate 
concurrent task. Each of those tasks needs to be cooperatively scheduled to 
guarantee all of them get a fair share of time to run.
   
   As we've concluded earlier the output side of the exchange-like operators is 
already handling this for us implicitly because they consume tokio task budget. 
The table sources (Scan in the image) do not.
   
   Perhaps this reframing of the problem is the path to a general purpose 
solution. To verify correct scheduling behavior, you can first subdivide the 
plan into subplans using the exchange-like operators as cut points. Per sub 
plan you can then look at all the leave nodes. Each leave node that 'inserts' 
work into the task needs to consume from the same task-wide tokio budget, not a 
per operator budget as we're doing today.
   
   So what does all this mean in terms of implementation:
   
   - Replace the per operator counters with consuming the Tokio task budget. 
DataFusion is already doing this today so there's precedent for it, and it 
resolves a bunch of side effects. I've opened a PR in tokio to allow us to use 
the necessary API for this https://github.com/tokio-rs/tokio/pull/7405. I think 
we can approximate `poll_proceed` with a combination of 'has budget' and 
'consume budget' in the meantime.
   - Remove the configuration option
   - Consider renaming YieldStream to CooperativeStream.
   - I think I would prefer a declarative property on `ExecutionPlan` that 
communicates if an operator consumes the task budget (not sure what the best 
description of this would be) instead of `with_cooperative_yielding`. It's not 
really something you want to opt-in to after all and the exchange-like 
operators have no way of opting out.
   
   The one thing that we still cannot solve automatically then is dynamic query 
planning. Operators that create streams dynamically still have to make sure 
they set things up correctly themselves.
   
   One possible downside to this approach is that the cooperative scheduling 
budget is implementation specific to the Tokio runtime. DataFusion becomes more 
tied to Tokio rather than less. Not sure if that's an issue or not.
   
   @alamb @ozankabak wdyt? Maybe this is what you were going for all along and 
I'm just slowly catching up :smiling:
   
   The change of heart comes from the realization that Tokio itself also takes 
a 'consume at the leaves' strategy and having a task wide budget ensures that 
tasks cannot silently ignore the yield request. Once one resource depletes the 
budget, it's no longer possible to make progress anywhere else provided all 
resource participate in the budgeting system.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] [Epic] Pipeline breaking cancellation support and improvement [datafusion]

2025-06-12 Thread via GitHub


zhuqi-lucas commented on issue #16353:
URL: https://github.com/apache/datafusion/issues/16353#issuecomment-2968831295

   > Made some progress on the problem statement already. I gave the AI the 
facts, it turned it into something I would actually enjoy reading. I'm going to 
work on the way things work today next. Feedback already welcome. 
https://github.com/pepijnve/datafusion/blob/cancel_spec/dev/design/cancellation.md
   
   This is a good start, thanks. May be we also can add the statement for 
pipeline mode and pipeline breaking.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] [Epic] Pipeline breaking cancellation support and improvement [datafusion]

2025-06-12 Thread via GitHub


pepijnve commented on issue #16353:
URL: https://github.com/apache/datafusion/issues/16353#issuecomment-2967172675

   Made some progress on the problem statement already. I gave the AI the 
facts, it turned it into something I would actually enjoy reading. I'm going to 
work on the way things work today next. Feedback already welcome.
   
https://github.com/pepijnve/datafusion/blob/cancel_spec/dev/design/cancellation.md


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] [Epic] Pipeline breaking cancellation support and improvement [datafusion]

2025-06-12 Thread via GitHub


zhuqi-lucas commented on issue #16353:
URL: https://github.com/apache/datafusion/issues/16353#issuecomment-2965878805

   > [@zhuqi-lucas](https://github.com/zhuqi-lucas) with all the various 
sprawling discussion threads I think we may have gotten to a point where it's 
no longer easy to have an overview of what the problems are and what the final 
goal may look like. To make matters worse there are multiple options. I was 
thinking it might be useful to try and put together some kind of design 
document / overview that clearly describes the cancellation/abort problem, its 
root cause(s), the existing mitigations and possible future mitigations. Having 
a bit of a mea culpa moment, so I would like to take a shot at making a first 
draft. Is that ok for you? Any preference wrt tooling? I usually write my 
technical docs in asciidoc+plantuml for ease of version control and diffing, 
but willing to use whatever people prefer.
   
   
   Thank you @pepijnve , I am ok with it, a design will make it clear.
   
   Both docs in asciidoc+plantuml or just Markdown are fine. 
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] [Epic] Pipeline breaking cancellation support and improvement [datafusion]

2025-06-12 Thread via GitHub


pepijnve commented on issue #16353:
URL: https://github.com/apache/datafusion/issues/16353#issuecomment-2965825518

   @zhuqi-lucas with all the various sprawling discussion threads I think we 
may have gotten to a point where it's no longer easy to have an overview of 
what the problems are and what the final goal may look like. To make matters 
worse there are multiple options. I was thinking it might be useful to try and 
put together some kind of design document / overview that clearly describes the 
cancellation/abort problem, its root cause(s), the existing mitigations and 
possible future mitigations. Having a bit of a mea culpa moment, so I would 
like to take a shot at making a first draft. Is that ok for you?
   Any preference wrt tooling? I usually write my technical docs in 
asciidoc+plantuml for ease of version control and diffing, but willing to use 
whatever people prefer.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] [Epic] Pipeline breaking cancellation support and improvement [datafusion]

2025-06-11 Thread via GitHub


zhuqi-lucas commented on issue #16353:
URL: https://github.com/apache/datafusion/issues/16353#issuecomment-2965191039

   > Figured it out. File access is done via object store and object store uses 
`std::fs::File`, not `tokio::fs::File`. Even if it would, from browsing the 
code it doesn't look to me like Tokio's file stuff consumes budget.
   > 
   > I tried reworking YieldStream to use the Tokio budget, but I don't see a 
way to. I rephrased the issue I opened at 
[tokio-rs/tokio#7403](https://github.com/tokio-rs/tokio/issues/7403).
   
   Interesting finding, i check the spilling file, that's the truth that we are 
using std::fs::File. So before the solution which using Tokio budget. We should 
add YieldStream to SpillReaderStream.
   
   
   ```rust
   use std::fs::File;
   
   impl SpillReaderStream {
   fn new(schema: SchemaRef, spill_file: RefCountedTempFile) -> Self {
   Self {
   schema,
   state: SpillReaderStreamState::Uninitialized(spill_file),
   }
   }
   
   fn poll_next_inner(
   &mut self,
   cx: &mut Context<'_>,
   ) -> Poll>> {
   match &mut self.state {
   SpillReaderStreamState::Uninitialized(_) => {
   // Temporarily replace with `Done` to be able to pass the 
file to the task.
   let SpillReaderStreamState::Uninitialized(spill_file) =
   std::mem::replace(&mut self.state, 
SpillReaderStreamState::Done)
   else {
   unreachable!()
   };
   
   let task = SpawnedTask::spawn_blocking(move || {
   let file = 
BufReader::new(File::open(spill_file.path())?);
   // SAFETY: DataFusion's spill writer strictly follows 
Arrow IPC specifications
   // with validated schemas and buffers. Skip redundant 
validation during read
   // to speedup read operation. This is safe for 
DataFusion as input guaranteed to be correct when written.
   let mut reader = unsafe {
   StreamReader::try_new(file, 
None)?.with_skip_validation(true)
   };
   
   let next_batch = reader.next().transpose()?;
   
   Ok((reader, next_batch))
   });
   
   self.state = SpillReaderStreamState::ReadInProgress(task);
   
   // Poll again immediately so the inner task is polled and 
the waker is
   // registered.
   self.poll_next_inner(cx)
   }
   
   SpillReaderStreamState::ReadInProgress(task) => {
   let result = futures::ready!(task.poll_unpin(cx))
   .unwrap_or_else(|err| 
Err(DataFusionError::External(Box::new(err;
   
   match result {
   Ok((reader, batch)) => {
   match batch {
   Some(batch) => {
   self.state = 
SpillReaderStreamState::Waiting(reader);
   
   Poll::Ready(Some(Ok(batch)))
   }
   None => {
   // Stream is done
   self.state = SpillReaderStreamState::Done;
   
   Poll::Ready(None)
   }
   }
   }
   Err(err) => {
   self.state = SpillReaderStreamState::Done;
   
   Poll::Ready(Some(Err(err)))
   }
   }
   }
   
   SpillReaderStreamState::Waiting(_) => {
   // Temporarily replace with `Done` to be able to pass the 
file to the task.
   let SpillReaderStreamState::Waiting(mut reader) =
   std::mem::replace(&mut self.state, 
SpillReaderStreamState::Done)
   else {
   unreachable!()
   };
   
   let task = SpawnedTask::spawn_blocking(move || {
   let next_batch = reader.next().transpose()?;
   
   Ok((reader, next_batch))
   });
   
   self.state = SpillReaderStreamState::ReadInProgress(task);
   
   // Poll again immediately so the inner task is polled and 
the waker is
   // registered.
   self.poll_next_inner(cx)
   }
   
   SpillReaderStreamState::Done => Poll::Ready(None),
   }
   }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...

Re: [I] [Epic] Pipeline breaking cancellation support and improvement [datafusion]

2025-06-11 Thread via GitHub


pepijnve commented on issue #16353:
URL: https://github.com/apache/datafusion/issues/16353#issuecomment-2964427447

   Figured it out. File access is done via object store and object store uses 
`std::fs::File`, not `tokio::fs::File`. Even if it would, from browsing the 
code it doesn't look to me like Tokio's file stuff consumes budget.
   
   I tried reworking YieldStream to use the Tokio budget, but I don't see a way 
to. I rephrased the issue I opened at 
https://github.com/tokio-rs/tokio/issues/7403.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] [Epic] Pipeline breaking cancellation support and improvement [datafusion]

2025-06-11 Thread via GitHub


pepijnve commented on issue #16353:
URL: https://github.com/apache/datafusion/issues/16353#issuecomment-2964042212

   Some more information to share and time to eat some humble pie for me. 
Google led me to a [withoutboats 
post](https://internals.rust-lang.org/t/runtime-agnostic-cooperative-task-scheduling-budget/18796)
 which led me to a [Tokio blog post](https://tokio.rs/blog/2020-04-preemption) 
which led to an aha moment. It’s worth reading the entire post but the key 
quote was
   
   >  As long as the task has budget remaining, the resource operates as it did 
previously. Each asynchronous operation (actions that users must .await on) 
decrements the task's budget. Once the task is out of budget, all Tokio 
resources will perpetually return "not ready" until the task yields back to the 
scheduler. At that point, the budget is reset, and future .awaits on Tokio 
resources will again function normally.
   
   So this is the “consume at the source” idea we have now, but with a 
task-wide latch per tick rather than per resource. And the latch only resets 
when you actually yield to the runtime. This removes all the edge cases as long 
as we ensure all sources are using the same task budget.
   
   As we’ve discussed above the channel receiver is already doing that for us. 
For some reason file IO was not. I’m not sure I understand why that’s the case 
and will try to figure out why tomorrow.
   Perhaps we can have the “it works automagically” cake and eat it after all.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] [Epic] Pipeline breaking cancellation support and improvement [datafusion]

2025-06-11 Thread via GitHub


alamb commented on issue #16353:
URL: https://github.com/apache/datafusion/issues/16353#issuecomment-2963580078

   > Sorry, I usually do this around a white board. 
   
   Yeah I agree doing this kind of thing with a whiteboard is easier. I am 
happy to set up a video call with the relevant parties if that might be more 
efficient (and it would be nice to meet you all "face to face")
   
   > First and foremost I'm trying to communicate what I see as intrinsic 
problems with the external optimizer rule approach. This was countered with a 
belief that it can be solved, but I don't see it yet myself. 
   
   Indeed -- it is my perspective that
   1.  https://github.com/apache/datafusion/pull/16196 handles most, but not 
all, cases where an `Stream` implementation may not yield directly.
   2. For some of the more advanced cases you describe (like switching a stream 
mid-execution) the general optimizer approach will not permit cancelling and 
thus the individual stream implementations need to be made yield aware.
   
   
   > stream in a YieldStream". 
[#16301](https://github.com/apache/datafusion/pull/16301) does not make use of 
Tokio's coop budget yet however. Instead it's the per-operator counter variant. 
The diff per operator does illustrate the scope and nature of the change 
`Stream` implementers would need to make.
   
   This makes sense (and I am sure we could encapsulate the changes from 16301 
more too if we go with that approach)
   
   I wonder if your main concern is that the optimizer approach we merged does 
not handle all of the cases, so it potentially leaves a trap for future Stream 
implementors (who do more avanced streams)?
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] [Epic] Pipeline breaking cancellation support and improvement [datafusion]

2025-06-11 Thread via GitHub


alamb commented on issue #16353:
URL: https://github.com/apache/datafusion/issues/16353#issuecomment-2963478363

   > I see this as a Stream implementation problem. 
   
   I see the wisdom of this view. 
   
   In my mind I think the DataFusion philosophy is "keep the barrier to entry 
as low as possible, and provide APIs that people can use to customize / 
optimize when needed / necessary"
   
   Do you think the current implementation will work "most of the time" and can 
be customized when needed for advanced usecases? Or is there something we can't 
do with the current APIs?
   
   From my perspective, as long as the overhead **required** of people 
implementing custom operators is low but they can customize when necessary it 
is good
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] [Epic] Pipeline breaking cancellation support and improvement [datafusion]

2025-06-11 Thread via GitHub


pepijnve commented on issue #16353:
URL: https://github.com/apache/datafusion/issues/16353#issuecomment-2963507764

   @alamb our messages crossed paths in the ether.
   
   > I have somewhat lost track of what exactly you are proposing.
   
   Sorry, I usually do this around a white board. First and foremost I'm trying 
to communicate what I see as intrinsic problems with the external optimizer 
rule approach. This was countered with a belief that it can be solved, but I 
don't see it yet myself. Happy to be proven wrong of course.
   I've explained the problem to 3 or 4 colleagues in the meantime as unbiased 
as I possibly could and they basically all tell me the same.
   
   >Is it the approach in one of these PRs:
   > https://github.com/apache/datafusion/pull/16301
   > https://github.com/apache/datafusion/pull/16319
   
   A bit of both :)
   
   The rationale behind #16319 is twofold.
   - By using a JoinHandle you naturally solve the cancellation problem. A 
JoinHandle is pending once, and wakes when the result is ready. The caller is 
unblocked immediately and can `select!` on a timeout. The subtle effect of it 
is that you shift from a cancellation problem for the caller to an abort 
problem for the spawned task. Aborting an AbortHandle still requires 
cooperation from the task in order for it to actually stop.
   - My hunch was that by splitting deep call stacks into multiple chained 
shallow call stacks the cost of yielding to the runtime could be reduced. That 
was just a first principles reasoning thing that I wanted to try out. Still 
working on measuring if it has a performance benefit.
   
   #16301 is the variant of yielding where we would punt on the idea of 
implementing yielding via an optimizer rule or some other automatic mechanism. 
Instead you state that every `Stream` is responsible for its own behavior 
(which the guidelines kind of already do). And then we fix the base library 
implementations.
   I think this fixes cancellation for everyone except downstream users with 
custom pipeline breaking operators. For those people I would propose providing 
utilities in the library to make doing the right thing as low-effort as 
possible and provide best practice examples. Something akin to "If your code 
has looping patterns call 'consume_budget' every so often" or "wrap your input 
stream in a YieldStream".
   #16301 does not make use of Tokio's coop budget yet however. Instead it's 
the per-operator counter variant. The diff per operator does illustrate the 
scope and nature of the change `Stream` implementers would need to make.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] [Epic] Pipeline breaking cancellation support and improvement [datafusion]

2025-06-11 Thread via GitHub


alamb commented on issue #16353:
URL: https://github.com/apache/datafusion/issues/16353#issuecomment-2963459115

   > At the risk of making myself unpopular, I feel it's relevant to share my 
findings with you guys.
   
   Not at all --- this is great stuff -- thank you @pepijnve for continuing to 
push to get better. 
   
   In my mind, given the tests that @zhuqi-lucas  added in 
https://github.com/apache/datafusion/pull/16196, we are now in a great position 
to revisit the design. If we can come up with something simpler that still 
achieves the same aim that is a great outcome
   
   > The benefits I see of trying to leverage the same mechanism elsewhere in 
DataFusion are:
   > 
   > * There is only one cooperative yielding mechanism at play. This is easier 
to reason about than multiple interacting ones.
   > * There is no need for additional API. DataFusion is already using this in 
the current released version.
   > * There are fewer corner cases. Once the budget is depleted, any point in 
the code checking the budget will yield since all those points are checking the 
same shared counter.
   
   A single framework that is general purpose and uses a mechanism in tokio 
certainly sounds compelling to me as well. 
   
   I have somewhat lost track of what exactly you are proposing. Is it the 
approach in one of these PRs:
   - https://github.com/apache/datafusion/pull/16301
   - https://github.com/apache/datafusion/pull/16319
   
   Or is it something new based on some other research?
   
   What I think is important is that the solution
   1. is well documented and clear
   2. works with both built in operators as well as user defined ones
   3. is minimally invasive (e.g. people implementing operators don't have to 
know too much about streams / polling, etc)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] [Epic] Pipeline breaking cancellation support and improvement [datafusion]

2025-06-11 Thread via GitHub


pepijnve commented on issue #16353:
URL: https://github.com/apache/datafusion/issues/16353#issuecomment-2963459881

   @zhuqi-lucas it would be useful to have some additional voices in this 
discussion. I can share my opinion, but it's only one opinion. I feel like I'm 
just going to keep repeating the same critique over and over again otherwise.
   
   In short, I'm not convinced this is a physical plan optimization problem. It 
was a good idea, but I don't think it can be refined into something 
sufficiently precise. By trying to make a generic/universal solution you end up 
with something unnecessarily complicated.
   
   I see this as a `Stream` implementation problem. The place to fix this is in 
the Rust code of the stream implementations because the decision to consume 
budget or not is tightly coupled to the implementation logic. You're trying to 
count how many times a Future or Stream was able to make progress towards its 
goal. A YieldStream is always going to be a crude approximation of that. 
Implementations may opt to use YieldStream as a convenience of course.
   
   You could try to describe the need for a YieldStream around a child 
declaratively, but the ability to switch streams dynamically is at odds with a 
one-shot approach like an optimizer rule. Additionally, but this is arguably 
just aesthetic, I don't think you ever want to see YieldExec showing up in 
explain plans.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] [Epic] Pipeline breaking cancellation support and improvement [datafusion]

2025-06-11 Thread via GitHub


zhuqi-lucas commented on issue #16353:
URL: https://github.com/apache/datafusion/issues/16353#issuecomment-2962336330

   > At the risk of making myself unpopular, I feel it's relevant to share my 
findings with you guys.
   > 
   > Working on [#16322](https://github.com/apache/datafusion/pull/16322) led 
me into the tokio implementation, in particular it led me to this line in the 
[Chan 
implementation](https://github.com/tokio-rs/tokio/blob/master/tokio/src/sync/mpsc/chan.rs#L295).
 This is the code that handles RecordBatch passing in RecordBatchReceiverStream.
   > 
   > I was immediately reminded of the cancellation discussions. Without 
realizing it DataFusion is actually already using Tokio's coop mechanism. This 
strengthens my belief that the PR that was merged is going about things the 
wrong way. It introduces API which overlaps 100% with something that already 
exists and is already being used. I don't think it's a good idea to have 
multiple mechanisms for the same thing. Pipeline-blocking operators exactly 
match the pattern described in [the Tokio cooperative scheduling 
documentation](https://docs.rs/tokio/latest/tokio/task/coop/index.html#cooperative-scheduling)
 so why would you not use the solution the runtime provides which you're 
already using in quite a few place already (everywhere 
RecordBatchReceiverStream is used)?
   
   Thank you @pepijnve , do you mean we can replace YieldStream with Tokio's 
coop? Or change the rule for adding Yield also? 
   
   I am still not looking into the Tokio's coop, maybe we can also add a 
sub-task for it, and list the benefit for it:
   
   Such as 
   1. The performance will be better after using Tokio's coop with benchmark 
result?
   2. Or we can handle more corner cases, and automatically handling 
user-defined exec? 
   3. Or we will have more clear and easy API?
   4. ETC
   
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] [Epic] Pipeline breaking cancellation support and improvement [datafusion]

2025-06-11 Thread via GitHub


zhuqi-lucas commented on issue #16353:
URL: https://github.com/apache/datafusion/issues/16353#issuecomment-2963178882

   > > But most other operators (Projection, Filter, HashAggregate, HashJoin, 
WindowAgg, simple TableScans, etc.) do not use an MPSC channel.
   > > That means, we still need to insert explicit yield points 
YieldStream/PollBudget to avoid starving the thread.
   > 
   > You're indeed 100% dependent on your child streams which is what makes the 
current solution somewhat brittle. If that happens to use a Receiver (or some 
other implementation that consumes budget) it will work. If it's some other 
stream that does not you may have issues again. Because the sources are user 
definable, I think it's wise to take a defensive stance in the implementation 
of operators and assume you don't know what they will or will not do. The 
current implementation attempts to fix this by ensuring the sources have yield 
points. That breaks when streams are swapped dynamically because you no longer 
have a way to ensure they contain the necessary yield points. This is a point 
the DataFusion library cannot currently intercept. The current implementation 
with the non-task wise budge also breaks when an intermediate operator uses 
`select!` (or something similar where you read from whatever stream happens to 
be ready) since this can obscure the Pending result from a stream.
  There's no way to guarantee that Pending bubbles all the way up.
   > 
   > > I believe no major difference for it? Please correct me if i am wrong.
   > 
   > The point of contention was where you put these yield points. Do you 
instrument all leave nodes, or do you instrument consumers that may refuse to 
yield. To make the system robust I really think you need to do this in the 
consumers. It's also beneficial for locality of reasoning. You can look at the 
implementation of an operator and assess that it's correct from a cooperative 
scheduling point of view without having to look at any other code. The 
objection was that there are many, many operators out there in the wild 
downstream of DataFusion. That's one that I do not have an answer for. How many 
people are building custom pipeline blocking operators?
   > 
   > It's important to note that you would only need to take action in 
operators where you can see from the implementation that it may not return 
_any_ value, either Ready or Pending, relatively quickly. That's basically 
anything that loops over input streams an unbounded number of times.
   > 
   > * Project (or any other simple transformation operator) doesn't need to do 
anything since it takes one record batch in and immediately emits another one.
   > * Table scans shouldn't either. They'll yield naturally if their input is 
not ready, and otherwise they'll return a RecordBatch.
   > * Filter in theory should not do anything, the exception being dropping 
lots of batches entirely.
   > * Joins depends. A build/probe style implementation probably should 
consume during build, not during probe. But it depends on the implementation.
   > * Aggregation and sorting do need to consume since those can block for an 
extended period time.
   
   Thank you, i may got your point,  i was thinking optimize the rule, is it a 
similar point?
   
   ```rust
// traverse all nodes, not just leaves
   plan.transform_down(|plan| {
   // wrap if leaf OR long-running
   if plan.children().is_empty() || is_long_running(plan.as_ref()) {
   // use existing cooperative variant if available
   let wrapped = plan
   .clone()
   .with_cooperative_yields()
   .unwrap_or_else(|| 
Arc::new(YieldStreamExec::new(Arc::clone(&plan), yield_period)));
   Ok(Transformed::new(wrapped, true, TreeNodeRecursion::Jump))
   } else {
   Ok(Transformed::no(plan))
   }
   })
   .map(|t| t.data)
   ```
   
   1. Leaf-only wrapping can be bypassed if someone plugs in a custom Stream or 
uses select!‑style combinators.
   2. By also wrapping every consumer that does heavy looping—aggregations, 
sorts, joins, window funcs—you guarantee that no matter how the streams are 
composed, there’s always an explicit YieldStreamExec (or the built‑in 
cooperative variant) in the path. (This can be optimized to PollBudget if 
possible)
   3. We still avoid unnecessary overhead on “simple” operators like Projection 
or basic TableScan, because they’re neither leaves with no loops nor in your 
“long‑running” list.
   
   
   Thanks!
   
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Re: [I] [Epic] Pipeline breaking cancellation support and improvement [datafusion]

2025-06-11 Thread via GitHub


pepijnve commented on issue #16353:
URL: https://github.com/apache/datafusion/issues/16353#issuecomment-2963029503

   > But most other operators (Projection, Filter, HashAggregate, HashJoin, 
WindowAgg, simple TableScans, etc.) do not use an MPSC channel.
   > That means, we still need to insert explicit yield points 
YieldStream/PollBudget to avoid starving the thread.
   
   You're indeed 100% dependent on your child streams which is what makes the 
current solution somewhat brittle. If that happens to use a Receiver (or some 
other implementation that consumes budget) it will work. If it's some other 
stream that does not you may have issues again. Because the sources are user 
definable, I think it's wise to take a defensive stance in the implementation 
of operators and assume you don't know what they will or will not do.
   The current implementation attempts to fix this by ensuring the sources have 
yield points. That breaks when streams are swapped dynamically because you no 
longer have a way to ensure they contain the necessary yield points. This is a 
point the DataFusion library cannot currently intercept.
   The current implementation with the non-task wise budge also breaks when an 
intermediate operator uses `select!` (or something similar where you read from 
whatever stream happens to be ready) since this can obscure the Pending result 
from a stream. There's no way to guarantee that Pending bubbles all the way up.
   
   > I believe no major difference for it? Please correct me if i am wrong.
   
   The point of contention was where you put these yield points. Do you 
instrument all leave nodes, or do you instrument consumers that may refuse to 
yield. To make the system robust I really think you need to do this in the 
consumers. It's also beneficial for locality of reasoning. You can look at the 
implementation of an operator and assess that it's correct from a cooperative 
scheduling point of view without having to look at any other code.
   The objection was that there are many, many operators out there in the wild 
downstream of DataFusion. That's one that I do not have an answer for. How many 
people are building custom pipeline blocking operators?
   
   It's important to note that you would only need to take action in operators 
where you can see from the implementation that it may not return _any_ value, 
either Ready or Pending, relatively quickly. That's basically anything that 
loops over input streams an unbounded number of times.
   - Project (or any other simple transformation operator) doesn't need to do 
anything since it takes one record batch in and immediately emits another one.
   - Table scans shouldn't either. They'll yield naturally if their input is 
not ready, and otherwise they'll return a RecordBatch.
   - Filter in theory should not do anything, the exception being dropping lots 
of batches entirely.
   - Joins depends. A build/probe style implementation probably should consume 
during build, not during probe. But it depends on the implementation.
   - Aggregation and sorting do need to consume since those can block for an 
extended period time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] [Epic] Pipeline breaking cancellation support and improvement [datafusion]

2025-06-11 Thread via GitHub


zhuqi-lucas commented on issue #16353:
URL: https://github.com/apache/datafusion/issues/16353#issuecomment-2962673909

   > Tokio's cooperative budget is essentially a counter per task that can be 
decremented at any point in the task. When the counter hits zero you'll return 
Pending from the function trying to consume the budget. That's basically what 
YieldStream is doing but with a local counter rather than a task wide one.
   > 
   > DataFusion's `ReceiverStreamBuilder` makes use of 
`tokio::sync::mpsc::Receiver`. Whenever `Receiver::recv` is being called, that 
counter is being decremented, and you'll get a Pending result when the budget 
is depleted. This is the same thing as what YieldStream is trying to do.
   > 
   > The benefits I see of trying to leverage the same mechanism elsewhere in 
DataFusion are:
   > 
   > * There is only one cooperative yielding mechanism at play. This is easier 
to reason about than multiple interacting ones.
   > * There is no need for additional API. DataFusion is already using this in 
the current released version.
   > * There are fewer corner cases. Once the budget is depleted, any point in 
the code checking the budget will yield since all those points are checking the 
same shared counter.
   > 
   > The downsides remain:
   > 
   > * Code that loops may still need to have yield points added to it in order 
to not yield unnecessarily.
   > * It's not yet 100% clear to me how you can use this in manually written 
Futures and Streams. The required bits for that seem to only be crate visible 
in the current Tokio release. I've raised the question here [Example of using 
cooperative scheduling budget in manual Future/Stream implementations 
tokio-rs/tokio#7403](https://github.com/tokio-rs/tokio/issues/7403)
   > * I have not made a performance analysis of this yet, but since it's used 
quite extensively already it's likely to be ok. Needs to be evaluated.
   
   
   You’re right—that comment overstates things. In DataFusion, only operators 
that fan out work into multiple spawned tasks and then re-aggregate via a Tokio 
MPSC channel actually consume the cooperative budget automatically (because 
each Receiver::recv().await call decrements it). Examples include:
   ```rust
   CoalescePartitionsExec 
   
   SortPreservingMergeExec
   ```
   
   All of those use RecordBatchReceiverStreamBuilder::run_input, whose 
.next().await is really rx.recv().await under the hood—and that is what charges 
the Tokio coop budget.
   
   But most other operators (Projection, Filter, HashAggregate, HashJoin, 
WindowAgg, simple TableScans, etc.) do not use an MPSC channel. They execute 
pull-based within a single Stream implementation, and never call recv(), so 
they don’t automatically consume any cooperative budget.
   
   That means, we still need to insert explicit yield points 
YieldStream/PollBudget to avoid starving the thread. 
   
   I believe no major difference for it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] [Epic] Pipeline breaking cancellation support and improvement [datafusion]

2025-06-11 Thread via GitHub


pepijnve commented on issue #16353:
URL: https://github.com/apache/datafusion/issues/16353#issuecomment-2962400698

   Tokio's cooperative budget is essentially a counter per task that can be 
decremented by tasks. When the counter hits zero you'll return Pending. That's 
basically what YieldStream is doing but with a local counter rather than a task 
wide one.
   
   DataFusion's `ReceiverStreamBuilder` makes use of 
`tokio::sync::mpsc::Receiver`. Whenever `Receiver::recv` is being called, that 
counter is being decremented, and you'll get a Pending result when the budget 
is depleted.
   This is the same thing as what YieldStream is trying to do.
   
   The benefits I see of trying to leverage the same mechanism elsewhere in 
DataFusion are:
   - There is only one cooperative yielding mechanism at play. This is easier 
to reason about than multiple interacting ones.
   - There is no need for additional API. DataFusion is already using this in 
the current released version.
   - There are fewer corner cases. Once the budget is depleted, any point in 
the code checking the budget will yield since all those points are checking the 
same shared counter.
   
   The downsides remain:
   - Code that loops may still need to have yield points added to it in order 
to not yield unnecessarily.
   - It's not yet 100% clear to me how you can use this in manually written 
Futures and Streams. The required bits for that seem to only be crate visible 
in the current Tokio release. I've raised the question here 
https://github.com/tokio-rs/tokio/issues/7403
   - I have not made a performance analysis of this yet, but since it's used 
quite extensively already it's likely to be ok. Needs to be evaluated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] [Epic] Pipeline breaking cancellation support and improvement [datafusion]

2025-06-11 Thread via GitHub


pepijnve commented on issue #16353:
URL: https://github.com/apache/datafusion/issues/16353#issuecomment-2961917602

   A colleague of mine advised me to refer to #16301 one more time for people 
who were not involved with the cancellation work that was already done. I was 
reluctant to do so myself.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] [Epic] Pipeline breaking cancellation support and improvement [datafusion]

2025-06-11 Thread via GitHub


pepijnve commented on issue #16353:
URL: https://github.com/apache/datafusion/issues/16353#issuecomment-2961873505

   At the risk of making myself unpopular, I feel it's relevant to share my 
findings with you guys.
   
   Working on #16322 led me into the tokio implementation, in particular it led 
me to this line in the [Chan 
implementation](https://github.com/tokio-rs/tokio/blob/master/tokio/src/sync/mpsc/chan.rs#L295).
 This is the code that handles RecordBatch passing in RecordBatchReceiverStream.
   
   I was immediately reminded of the cancellation discussions. Without 
realizing it DataFusion is actually already using Tokio's coop mechanism. This 
strengthens my belief that the PR that was merged is going about things the 
wrong way. It introduces API which overlaps 100% with something that already 
exists and is already being used. I don't think it's a good idea to have 
multiple mechanisms for the same thing. Pipeline-blocking operators exactly 
match the pattern described in [the Tokio cooperative scheduling 
documentation](https://docs.rs/tokio/latest/tokio/task/coop/index.html#cooperative-scheduling)
 so why would you not use the solution the runtime provides which you're 
already using in quite a few place anyway?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] [Epic] Pipeline breaking cancellation support and improvement [datafusion]

2025-06-10 Thread via GitHub


zhuqi-lucas commented on issue #16353:
URL: https://github.com/apache/datafusion/issues/16353#issuecomment-2961255260

   Thank you @alamb !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] [Epic] Pipeline breaking cancellation support and improvement [datafusion]

2025-06-10 Thread via GitHub


alamb commented on issue #16353:
URL: https://github.com/apache/datafusion/issues/16353#issuecomment-2960837514

   Thank you @zhuqi-lucas  -- I also added this as a wishlist item for 
https://github.com/apache/datafusion/issues/16235


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]