Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-13 Thread via GitHub


alamb commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2970160627

   > Look, I see that you are trying to help and we do want to take it. I 
suspect we might be facing a "culture" challenge here: Typically, DF community 
attacks large problems by solving them bit by bit and refining a solution 
iteratively. This is unlike some other projects which front-load the effort by 
going through a more comprehensive design process. We also do that for some 
tasks where this iterative approach is not applicable, but it is not very 
common.
   
   BTW I liked this description so much I ported it to a proposed addition in 
the docs:
   - https://github.com/apache/datafusion/pull/16397


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-09 Thread via GitHub


zhuqi-lucas commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2957658585

   > @zhuqi-lucas, talking about TODO items, in addition to the 4 things I 
noted in [my comment 
above](https://github.com/apache/datafusion/pull/16196#issuecomment-2955853539),
 I suggest the following: As we study interleave-related cases in more detail, 
I think we should add a test case with an interleave in a plan that doesn't 
have `RepartitionExec`. This can happen when data comes in already partitioned. 
I think in such cases we can still trigger non-cancellability with 
`InterleaveExec`'s current code/logic.
   
   
   Sure, thank you @ozankabak , let me create a follow-up ticket to track this!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-09 Thread via GitHub


zhuqi-lucas commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2957652941

   > So maybe we need a follow on PR to fix the cancel test from @pepijnve
   
   Thank you @alamb. @pepijnve can you add a testing case that this PR will not 
succeed? I remember i was adding all cases to test cases, if i was missing any 
testing cases, feel free to add a reproducer, i will continue fix it, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-09 Thread via GitHub


zhuqi-lucas commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2957688586

   > @ozankabak indeed, that was what my original test was simulating. The 
coalesce batches and repartition end up erasing the scenario I was trying to 
demonstrate. I fully agree that the test setup is contrived, but the point of 
the test was not to demonstrate that there was a problem with interleave. It 
was intended to demonstrate brittleness in the approach because certain 
legitimate combinations of execution plans still end up not being cancelable in 
practice.
   > 
   > Hacky patch wrt current main that gets you back to the failing situation 
https://gist.github.com/pepijnve/0e1a66f98033c6c44c62a51fb9dbae5a
   
   Sorry, just saw this testing case, thank you @pepijnve !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-09 Thread via GitHub


zhuqi-lucas commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2957679295

   FYI
   Thank you @ozankabak @alamb @pepijnve , i created a EPIC ticket now, 
currently i added 5 sub-tasks, feel free to add more tasks, we can iterator all 
possible cases and make it perfect. Thanks!
   
   https://github.com/apache/datafusion/issues/16353
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-09 Thread via GitHub


pepijnve commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2957302218

   > Reflecting all this onto the task at hand, this PR (1) solves many cases 
already and (2) introduces some machinery that will be useful as we iterate on 
the full solution. I don't think it is brittle, I think it is imperfect and 
requires refinement.
   
   We're not going to see eye to eye on this one and that's ok; people don't 
always have to agree.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-09 Thread via GitHub


ozankabak commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2956513724

   Great, thanks for the patch. We should use it as one of the new test cases 
in the follow-on PRs.
   
   Look, I see that you are trying to help and we do want to take it. I suspect 
we might be facing a "culture" challenge here: Typically, DF community attacks 
large problems by solving them bit by bit and refining a solution iteratively. 
This is unlike some other projects which front-load the effort by going through 
a more comprehensive design process. We also do that for some tasks where this 
iterative approach is not applicable, but it is not very common.
   
   This "bit by bit approach" doesn't always succeed, every now and then it 
happens that we get stuck or go down the wrong path for a while, and then 
change tacks. However, we still typically prefer to "advance the front" and 
make progress in tangible ways as much as we can (if we see a way). This 
necessarily results in imperfect solutions being the "state of the code" in 
some cases, and they survive in the codebase for a while, but we are good at 
driving things to completion in the long run.
   
   Reflecting all this onto the task at hand, this PR (1) solves many cases 
already and (2) introduces some machinery that will be useful as we iterate on 
the full solution. I don't think it is brittle, I think it is imperfect and 
requires refinement. I am optimistic that we will eventually converge on a good 
approach that requires minimal operator cooperation (but we won't be able to 
reduce that to a strict zero) and is close to being optimal in terms of 
yielding overhead. Where we are at is not where we will ultimately be, this is 
just a step in a long process. I hope that helps. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-09 Thread via GitHub


pepijnve commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2956355078

   @ozankabak indeed, that was what my original test was simulating. The 
coalesce batches and repartition end up erasing the scenario I was trying to 
demonstrate. I fully agree that the test setup is contrived, but the point of 
the test was not to demonstrate that there was a problem with interleave. It 
was intended to demonstrate brittleness in the approach because certain 
legitimate combinations of execution plans still end up not being cancelable in 
practice.
   
   Hacky patch wrt current main that gets you back to the failing situation 
https://gist.github.com/pepijnve/0e1a66f98033c6c44c62a51fb9dbae5a


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-09 Thread via GitHub


ozankabak commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2956234465

   @zhuqi-lucas, talking about TODO items, in addition to the 4 things I noted 
in [my comment 
above](https://github.com/apache/datafusion/pull/16196#issuecomment-2955853539),
 I suggest the following: As we study interleave-related cases in more detail, 
I think we should add a test case with an interleave in a plan that doesn't 
have `RepartitionExec`. This can happen when data comes in already partitioned. 
I think in such cases we can still trigger non-cancellability with 
`InterleaveExec`'s current code/logic.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-09 Thread via GitHub


pepijnve commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2956129436

   > However, I do like a bias of action, and if this PR fixes a real problem, 
I don't think we should bikeshed it indefinitely
   
   @alamb sorry if it came across as bike shedding; I felt there were 
legitimate concerns with the design that were being ignored. I wrote up all my 
notes in #16301.
   
   > I was thinking of YieldStream as such a combinator šŸ¤”
   
   It's a close approximation of what you need, but not as precise as you would 
want. Other PR explains why so I won't repeat it again here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-09 Thread via GitHub


ozankabak commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2955938360

   Is that the interleave test? Sorry the link was not clear


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-09 Thread via GitHub


ozankabak commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2955983864

   BTW if you wanted to check whether this PR covers interleave-related cases 
(which is the test case analyzed in that repo), this PR has two tests for it 
(`test_infinite_interleave_cancel` and `test_infinite_interleave_agg_cancel`). 
We can probably add even more in the future to further our understanding of the 
problem as we keep working on this, it is a good test case


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-09 Thread via GitHub


ozankabak commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2955961098

   His repo seems to be creating a plan manually and applying some old version 
of the rule (which is in that repo, not in DF proper). What are we trying to do 
here? Am I missing something?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-09 Thread via GitHub


alamb commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2955945948

   > Is that the interleave test? Sorry the link was not clear
   
   Sorry -- it was this reproducer 
https://github.com/pepijnve/datafusion_cancel_test
   
   
   I pointed it at a local checkout of datafusion after this PR was merged


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-09 Thread via GitHub


alamb commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2955925839

   So maybe we need a follow on PR to fix the cancel test from @pepijnve 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-09 Thread via GitHub


alamb commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2955922212

   So I just re-ran the reproducer from @pepijnve  in 
https://github.com/apache/datafusion/pull/16196#issuecomment-2921143644 against 
main and this PR doesn't cancel it:
   
   ```
Running `target/debug/datafusion_test`
   Running query; will time out after 5 seconds
   InfiniteStream::poll_next 1 times
   InfiniteStream::poll_next 2 times
   InfiniteStream::poll_next 3 times
   InfiniteStream::poll_next 4 times
   InfiniteStream::poll_next 5 times
   InfiniteStream::poll_next 6 times
   InfiniteStream::poll_next 7 times
   InfiniteStream::poll_next 8 times
   InfiniteStream::poll_next 9 times
   InfiniteStream::poll_next 10 times
   InfiniteStream::poll_next 11 times
   InfiniteStream::poll_next 12 times
   InfiniteStream::poll_next 13 times
   InfiniteStream::poll_next 14 times
   InfiniteStream::poll_next 15 times
   InfiniteStream::poll_next 16 times
   InfiniteStream::poll_next 17 times
   InfiniteStream::poll_next 18 times
   InfiniteStream::poll_next 19 times
   InfiniteStream::poll_next 20 times
   InfiniteStream::poll_next 21 times
   InfiniteStream::poll_next 22 times
   InfiniteStream::poll_next 23 times
   InfiniteStream::poll_next 24 times
   InfiniteStream::poll_next 25 times
   InfiniteStream::poll_next 26 times
   InfiniteStream::poll_next 27 times
   InfiniteStream::poll_next 28 times
   InfiniteStream::poll_next 29 times
   InfiniteStream::poll_next 30 times
   InfiniteStream::poll_next 31 times
   InfiniteStream::poll_next 32 times
   InfiniteStream::poll_next 33 times
   ...
   ```
   
   I did check that aggregating values using range does work, so that is good:
   
   ```
   DataFusion CLI v48.0.0
   > SELECT SUM(value) FROM range(1, 500);
   ^C^C
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-09 Thread via GitHub


ozankabak commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2955853539

   Thanks @alamb for the final review. @zhuqi-lucas, I think it would be great 
to create some tickets to track:
   1. Improving `InsertYieldExec` rule by means of an API that exposes input 
*and* output pipelining behaviors of operators effectively
   2. Investigating whether any already-existing manual yielding (for example, 
like the one in `RepartitionExec`) can now be removed
   
   Also feel free to open any other issues if I'm missing something here


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-09 Thread via GitHub


ozankabak merged PR #16196:
URL: https://github.com/apache/datafusion/pull/16196


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-09 Thread via GitHub


alamb commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2135631720


##
datafusion/common/src/config.rs:
##
@@ -722,6 +722,15 @@ config_namespace! {
 /// then the output will be coerced to a non-view.
 /// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to 
`LargeBinary`.
 pub expand_views_at_output: bool, default = false
+
+/// When DataFusion detects that a plan might not be promply 
cancellable
+/// due to the presence of tight-looping operators, it will attempt to
+/// mitigate this by inserting explicit yielding (in as few places as
+/// possible to avoid performance degradation). This value represents 
the
+/// yielding period (in batches) at such explicit yielding points. The
+/// default value is 64. If set to 0, no DataFusion will not perform

Review Comment:
   I like that this has a "escape valve" too -- if this mechanism isn't working 
we can disable the new yields via config



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-09 Thread via GitHub


alamb commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2135629318


##
datafusion/sqllogictest/test_files/explain.slt:
##
@@ -242,6 +242,7 @@ physical_plan after OutputRequirements DataSourceExec: 
file_groups={1 group: [[W
 physical_plan after LimitAggregation SAME TEXT AS ABOVE

Review Comment:
   Maybe we can add some comments to this effect / create a plan that shows the 
YieldStream being inserted to make it clearer (maybe as a follow on PR)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-09 Thread via GitHub


zhuqi-lucas commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2135460181


##
datafusion/core/tests/execution/infinite_cancel.rs:
##
@@ -0,0 +1,856 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::error::Error;
+use std::fmt::Formatter;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::array::{Array, Int64Array, RecordBatch};
+use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
+use arrow_schema::SortOptions;
+use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, 
TaskContext};
+use datafusion::functions_aggregate::sum;
+use datafusion::physical_expr::aggregate::AggregateExprBuilder;
+use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
+use datafusion::physical_plan::aggregates::{
+AggregateExec, AggregateMode, PhysicalGroupBy,
+};
+use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
+use datafusion::physical_plan::{
+DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
+};
+use datafusion::prelude::SessionContext;
+use datafusion::{common, physical_plan};
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::{JoinType, ScalarValue};
+use datafusion_expr_common::operator::Operator::Gt;
+use datafusion_physical_expr::expressions::{col, BinaryExpr, Column, Literal};
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
+use datafusion_physical_optimizer::wrap_leaves_cancellation::WrapLeaves;
+use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
+use datafusion_physical_plan::filter::FilterExec;
+use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode, 
SortMergeJoinExec};
+use datafusion_physical_plan::projection::ProjectionExec;
+use datafusion_physical_plan::repartition::RepartitionExec;
+use datafusion_physical_plan::sorts::sort::SortExec;
+use datafusion_physical_plan::union::InterleaveExec;
+
+use futures::{Stream, StreamExt};
+use rstest::rstest;
+use tokio::select;
+
+struct InfiniteStream {
+batch: RecordBatch,
+poll_count: usize,
+}
+
+impl RecordBatchStream for InfiniteStream {
+fn schema(&self) -> SchemaRef {
+self.batch.schema()
+}
+}
+
+impl Stream for InfiniteStream {
+type Item = common::Result;
+
+fn poll_next(
+mut self: Pin<&mut Self>,
+_cx: &mut Context<'_>,
+) -> Poll> {
+self.poll_count += 1;
+Poll::Ready(Some(Ok(self.batch.clone(
+}
+}
+
+#[derive(Debug)]
+struct InfiniteExec {

Review Comment:
   Thank you @alamb , good suggestion! Addressed in latest PR, it seems we 
remove MemoryExec now, so i use LazyMemoryExec to address this comments. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-09 Thread via GitHub


zhuqi-lucas commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2135459186


##
datafusion/physical-optimizer/src/optimizer.rs:
##
@@ -137,6 +138,7 @@ impl PhysicalOptimizer {
 // are not present, the load of executors such as join or union 
will be
 // reduced by narrowing their input tables.
 Arc::new(ProjectionPushdown::new()),
+Arc::new(WrapLeaves::new()),

Review Comment:
   Addressed in latest PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-09 Thread via GitHub


zhuqi-lucas commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2135356171


##
datafusion/sqllogictest/test_files/explain.slt:
##
@@ -242,6 +242,7 @@ physical_plan after OutputRequirements DataSourceExec: 
file_groups={1 group: [[W
 physical_plan after LimitAggregation SAME TEXT AS ABOVE

Review Comment:
   Because, we added the built-in leaf nodes which will have built-in 
YieldStream, so the rule will ignore those cases. And almost all cases should 
have built-in YieldStream currently, so it will not add extra YieldStreamExec.
   
   But if customer use custom leaf operator node, it will automatically add the 
YieldStreamExec, this is expected.
   
   
   



##
datafusion/physical-optimizer/src/optimizer.rs:
##
@@ -137,6 +138,7 @@ impl PhysicalOptimizer {
 // are not present, the load of executors such as join or union 
will be
 // reduced by narrowing their input tables.
 Arc::new(ProjectionPushdown::new()),
+Arc::new(WrapLeaves::new()),

Review Comment:
   Good suggestion @alamb , i will address this, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-09 Thread via GitHub


zhuqi-lucas commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2955153699

   > Thank you @zhuqi-lucas and @ozankabak and @pepijnve -- I think this PR is 
really nicely commented and structured. It is easy to read and review.
   > 
   > However, I am sorry but I am a bit confused by the implications of this PR 
now.
   > 
   > From what I can tell, it doesn't insert YieldExec or add Yielding for 
AggregateExec, which is the operator we have real evidence doesn't yield. 
Instead it seems to add yielding for DataSource exec, which already will yield 
when reading Parquet from a remote store, for example šŸ¤”
   
   Thank you @alamb for review, i will try to address the comments and answer 
the question.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-08 Thread via GitHub


ozankabak commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2954860574

   Great, we will incorporate your feedback, answer your questions and write a 
list of follow-on work to serve as a basis for the tickets. I will only merge 
after these are done so we have a clear path to make progress on next week and 
beyond.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-08 Thread via GitHub


alamb commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2954400147

   šŸ¤–: Benchmark completed
   
   Details
   
   
   
   ```
   Comparing HEAD and issue_16193
   
   Benchmark cancellation.json
   
   ā”ā”ā”ā”³ā”ā”ā”³ā”ā”³ā”ā”ā”ā”“
   ā”ƒ Queryā”ƒ HEAD ā”ƒ issue_16193 ā”ƒChange ā”ƒ
   └━━╇━━╇━╇━━━┩
   │ QCancellati… │ 27.20 ms │27.81 ms │ no change │
   ā””ā”€ā”€ā”“ā”€ā”€ā”“ā”€ā”“ā”€ā”€ā”€ā”˜
   ā”ā”³ā”ā”“
   ā”ƒ Benchmark Summary  ā”ƒ ā”ƒ
   └╇━┩
   │ Total Time (HEAD)  │ 27.20ms │
   │ Total Time (issue_16193)   │ 27.81ms │
   │ Average Time (HEAD)│ 27.20ms │
   │ Average Time (issue_16193) │ 27.81ms │
   │ Queries Faster │   0 │
   │ Queries Slower │   0 │
   │ Queries with No Change │   1 │
   │ Queries with Failure   │   0 │
   ā””ā”“ā”€ā”˜
   ```
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-08 Thread via GitHub


alamb commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2954399910

   šŸ¤– `./gh_compare_branch.sh` [Benchmark 
Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh)
 Running
   Linux aal-dev 6.11.0-1013-gcp #13~24.04.1-Ubuntu SMP Wed Apr  2 16:34:16 UTC 
2025 x86_64 x86_64 x86_64 GNU/Linux
   Comparing issue_16193 (56361a43d889b5003da9fe0b3c476f68f0d1f88e) to 
1daa5ed5cc51546904d45e23cc148601d973942a 
[diff](https://github.com/apache/datafusion/compare/1daa5ed5cc51546904d45e23cc148601d973942a..56361a43d889b5003da9fe0b3c476f68f0d1f88e)
   Benchmarks: cancellation
   Results will be posted here when complete
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-08 Thread via GitHub


alamb commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2954399882

   šŸ¤–: Benchmark completed
   
   Details
   
   
   
   ```
   Comparing HEAD and issue_16193
   
   Benchmark clickbench_extended.json
   
   ā”ā”ā”ā”³ā”ā”³ā”ā”³ā”ā”ā”“
   ā”ƒ Queryā”ƒHEAD ā”ƒ issue_16193 ā”ƒ   Change ā”ƒ
   └━━╇━╇━╇━━┩
   │ QQuery 0 │  1774.52 ms │  1915.09 ms │ 1.08x slower │
   │ QQuery 1 │   702.21 ms │   689.79 ms │no change │
   │ QQuery 2 │  1431.65 ms │  1393.68 ms │no change │
   │ QQuery 3 │   699.08 ms │   674.34 ms │no change │
   │ QQuery 4 │  1423.03 ms │  1483.81 ms │no change │
   │ QQuery 5 │ 15706.11 ms │ 16249.84 ms │no change │
   │ QQuery 6 │  2019.19 ms │  2017.67 ms │no change │
   │ QQuery 7 │  2010.38 ms │  2123.45 ms │ 1.06x slower │
   │ QQuery 8 │   828.39 ms │   858.33 ms │no change │
   ā””ā”€ā”€ā”“ā”€ā”“ā”€ā”“ā”€ā”€ā”˜
   ā”ā”³ā”“
   ā”ƒ Benchmark Summary  ā”ƒā”ƒ
   └╇┩
   │ Total Time (HEAD)  │ 26594.57ms │
   │ Total Time (issue_16193)   │ 27405.98ms │
   │ Average Time (HEAD)│  2954.95ms │
   │ Average Time (issue_16193) │  3045.11ms │
   │ Queries Faster │  0 │
   │ Queries Slower │  2 │
   │ Queries with No Change │  7 │
   │ Queries with Failure   │  0 │
   ā””ā”“ā”˜
   
   Benchmark clickbench_partitioned.json
   
   ā”ā”ā”ā”³ā”ā”³ā”ā”³ā”ā”ā”ā”“
   ā”ƒ Queryā”ƒHEAD ā”ƒ issue_16193 ā”ƒChange ā”ƒ
   └━━╇━╇━╇━━━┩
   │ QQuery 0 │15.00 ms │15.22 ms │ no change │
   │ QQuery 1 │32.82 ms │33.10 ms │ no change │
   │ QQuery 2 │80.12 ms │80.05 ms │ no change │
   │ QQuery 3 │97.94 ms │98.43 ms │ no change │
   │ QQuery 4 │   578.12 ms │   585.68 ms │ no change │
   │ QQuery 5 │   820.56 ms │   836.55 ms │ no change │
   │ QQuery 6 │23.39 ms │23.11 ms │ no change │
   │ QQuery 7 │37.30 ms │36.90 ms │ no change │
   │ QQuery 8 │   891.45 ms │   902.35 ms │ no change │
   │ QQuery 9 │  1166.10 ms │  1193.38 ms │ no change │
   │ QQuery 10│   264.85 ms │   263.41 ms │ no change │
   │ QQuery 11│   294.82 ms │   293.40 ms │ no change │
   │ QQuery 12│   896.36 ms │   900.32 ms │ no change │
   │ QQuery 13│  1199.00 ms │  1333.54 ms │  1.11x slower │
   │ QQuery 14│   828.80 ms │   832.24 ms │ no change │
   │ QQuery 15│   818.26 ms │   850.21 ms │ no change │
   │ QQuery 16│  1726.19 ms │  1726.91 ms │ no change │
   │ QQuery 17│  1621.59 ms │  1594.69 ms │ no change │
   │ QQuery 18│  3055.91 ms │  3032.30 ms │ no change │
   │ QQuery 19│84.56 ms │84.16 ms │ no change │
   │ QQuery 20│  1131.97 ms │  1126.00 ms │ no change │
   │ QQuery 21│  1316.89 ms │  1286.64 ms │ no change │
   │ QQuery 22│  2185.01 ms │  2140.56 ms │ no change │
   │ QQuery 23│  8040.54 ms │  7888.40 ms │ no change │
   │ QQuery 24│   465.48 ms │   464.48 ms │ no change │
   │ QQuery 25│   393.23 ms │   390.35 ms │ no change │
   │ QQuery 26│   533.73 ms │   525.41 ms │ no change │
   │ QQuery 27│  1596.30 ms │  1560.51 ms │ no change │
   │ QQuery 28│ 13568.05 ms │ 12429.02 ms │ +1.09x faster │
   │ QQuery 29│   531.82 ms │   517.36 ms │ no change │
   │ QQuery 30│   798.55 ms │   799.26 ms │ no change │
   │ QQuery 31│   857.27 ms │   884.07 ms │ no change │
   │ QQuery 32│  2623.40 ms │  2664.08 ms │ no change │
   │ QQuery 33│  3354.76 ms │  3339.54 ms │ no change │
   │ QQuery 34│  3338.46 ms │  3400.82 ms │ no change │
   │ QQuery 35│  1275.73 ms │  1308.99 ms │ no change │
   │ QQuery 36│   124.96 ms │   125.55 ms │ no change │
   │ QQuery 37│54.57 ms │57.31 ms │  1.05x slower │
   │ QQuery 38│   126.65 ms │   120.73 ms │ no change │
   │ QQuery 39│   193.09 ms │   189.66 ms │ no change │
   │ QQuery 40│49.83 ms │45.90 ms │ +1.09x faster │
   │ QQuery 41│43.40 ms │45.26 ms │ no change │
   │ QQuery 42│38.46 ms │38.37 ms │ no change │
   ā””ā”€ā”€ā”“ā”€ā”“ā”€ā”“ā”€ā”€ā”€ā”˜
   ā”ā”³ā”“
   ā”ƒ Benchmark Summary  ā”ƒā”ƒ
   └╇┩
   │ Total Time (HEAD)  │ 57175.29ms │
   │ Total Time (issue_16193)   │ 56064.2

Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-08 Thread via GitHub


alamb commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2954372429

   šŸ¤– `./gh_compare_branch.sh` [Benchmark 
Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh)
 Running
   Linux aal-dev 6.11.0-1013-gcp #13~24.04.1-Ubuntu SMP Wed Apr  2 16:34:16 UTC 
2025 x86_64 x86_64 x86_64 GNU/Linux
   Comparing issue_16193 (56361a43d889b5003da9fe0b3c476f68f0d1f88e) to 
1daa5ed5cc51546904d45e23cc148601d973942a 
[diff](https://github.com/apache/datafusion/compare/1daa5ed5cc51546904d45e23cc148601d973942a..56361a43d889b5003da9fe0b3c476f68f0d1f88e)
   Benchmarks: tpch_mem clickbench_partitioned clickbench_extended
   Results will be posted here when complete
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-08 Thread via GitHub


alamb commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2954329852

   I am happy to wait for a bit more testing on this PR -- we have now about a 
month before the next release so there is no pressure from there. 
   
   However, I do like a bias of action, and if this PR fixes a real problem, I 
don't think we should bikeshed it indefinitely 
   
   > Unfortunately you do need to do this kind of thing at the operator 
implementation level. I do think there are implementation patterns here that 
could server as building blocks for operators. 'Build a stream async and then 
emit from it' for instance seems to be pretty common. Rather than having a 
bespoke implementation in each operator it would be useful to have a combinator 
that operators can use. Perhaps there's a similar zero cost solution to the 
'drains input before first emit' pattern as well?
   
   I was thinking of `YieldStream` as such a combinator šŸ¤” 
   
   > @alamb FYI I plan to merge this soon. It is OK if you don't have the 
bandwidth to take a look, it is the first step towards the design we discussed 
before.
   
   @ozankabak  -- what are the next steps? I may have lost track -- if this PR 
needs some follow on I think we should file tickets to explain what they are 
before merging it (I can help to file such tickets)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-08 Thread via GitHub


alamb commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2134852899


##
datafusion/physical-optimizer/src/optimizer.rs:
##
@@ -137,6 +138,7 @@ impl PhysicalOptimizer {
 // are not present, the load of executors such as join or union 
will be
 // reduced by narrowing their input tables.
 Arc::new(ProjectionPushdown::new()),
+Arc::new(WrapLeaves::new()),

Review Comment:
   Can we please call this pass something related to Cancel or Yield? Like 
`InsertYieldExec` to make it clearer what it is doing?



##
datafusion/core/tests/execution/infinite_cancel.rs:
##
@@ -0,0 +1,856 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::error::Error;
+use std::fmt::Formatter;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::array::{Array, Int64Array, RecordBatch};
+use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
+use arrow_schema::SortOptions;
+use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, 
TaskContext};
+use datafusion::functions_aggregate::sum;
+use datafusion::physical_expr::aggregate::AggregateExprBuilder;
+use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
+use datafusion::physical_plan::aggregates::{
+AggregateExec, AggregateMode, PhysicalGroupBy,
+};
+use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
+use datafusion::physical_plan::{
+DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
+};
+use datafusion::prelude::SessionContext;
+use datafusion::{common, physical_plan};
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::{JoinType, ScalarValue};
+use datafusion_expr_common::operator::Operator::Gt;
+use datafusion_physical_expr::expressions::{col, BinaryExpr, Column, Literal};
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
+use datafusion_physical_optimizer::wrap_leaves_cancellation::WrapLeaves;
+use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
+use datafusion_physical_plan::filter::FilterExec;
+use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode, 
SortMergeJoinExec};
+use datafusion_physical_plan::projection::ProjectionExec;
+use datafusion_physical_plan::repartition::RepartitionExec;
+use datafusion_physical_plan::sorts::sort::SortExec;
+use datafusion_physical_plan::union::InterleaveExec;
+
+use futures::{Stream, StreamExt};
+use rstest::rstest;
+use tokio::select;
+
+struct InfiniteStream {
+batch: RecordBatch,
+poll_count: usize,
+}
+
+impl RecordBatchStream for InfiniteStream {
+fn schema(&self) -> SchemaRef {
+self.batch.schema()
+}
+}
+
+impl Stream for InfiniteStream {
+type Item = common::Result;
+
+fn poll_next(
+mut self: Pin<&mut Self>,
+_cx: &mut Context<'_>,
+) -> Poll> {
+self.poll_count += 1;
+Poll::Ready(Some(Ok(self.batch.clone(
+}
+}
+
+#[derive(Debug)]
+struct InfiniteExec {

Review Comment:
   Instead of a new exec, perhaps we could use `MemoryExec` (with like 1000 
`batch.clone()` for example) to show it is configured correctly



##
datafusion/sqllogictest/test_files/explain.slt:
##
@@ -242,6 +242,7 @@ physical_plan after OutputRequirements DataSourceExec: 
file_groups={1 group: [[W
 physical_plan after LimitAggregation SAME TEXT AS ABOVE

Review Comment:
   I am confused about why YieldStreamExec does not appear in more of the 
explain `slt` plans



##
datafusion/physical-plan/src/yield_stream.rs:
##
@@ -0,0 +1,271 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the 

Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-08 Thread via GitHub


pepijnve commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2954218719

   > As time permits, we can explore alternate, more universal strategies for 
cancellation
   > 100% agree with not merging this until we are in agreement
   
   I can't help but feel that this is needlessly being rushed. Committing to a 
new public API on an extension point before it's clearly proven feels like a 
bad idea to me. If it was purely an implementation detail it would be less of 
an issue. What's the hurry?
   
   The more I've been digging into the code over the past few days the clearer 
it is that getting yielding just right while avoiding wasteful work is 
something you need to be careful about. See for instance #16196.
   We started out with concerns that yielding needlessly would introduce 
performance overhead, but now this PR does so even when it's not necessary at 
all. Admittedly it's nowhere near as extreme as the above issue, but still 
waste is waste.
   
   Wouldn't it be prudent to give this some more time to mature and maybe see 
if there are better strategies? It's not a universal solution, but just as an 
example what I found in #16319 is that restructuring the operator code a little 
bit makes them behave much nicer from the caller's perspective. Unfortunately 
you do need to do this kind of thing at the operator implementation level. I do 
think there are implementation patterns here that could server as building 
blocks for operators. 'Build a stream async and then emit from it' for instance 
seems to be pretty common. Rather than having a bespoke implementation in each 
operator it would be useful to have a combinator that operators can use. 
Perhaps there's a similar zero cost solution to the 'drains input before first 
emit' pattern as well?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-08 Thread via GitHub


ozankabak commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2954144239

   @alamb FYI I plan to merge this soon. It is OK if you don't have the 
bandwidth to take a look, it is the first step towards the design we discussed 
before.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-07 Thread via GitHub


zhuqi-lucas commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2952375579

   > @zhuqi-lucas, I wanted to make a few final finishing touches as we gave a 
chance in case @alamb wants to take a final look. I changed the config 
terminology from "frequency" to "period" because the former was kind of a 
misnomer. I also did some refactoring to remove some code repeatitions. Can you 
please double check to make sure all looks good on your end? Thanks.
   
   Thank you @ozankabak , the final refractor and name change looks good to me. 
Yeah , let's wait @alamb to get the final look. Thanks! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-07 Thread via GitHub


zhuqi-lucas commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2952376120

   > > I will investigate that if we can remove some internal yield logic, such 
as repartition? etc
   > 
   > Good idea, I'm curious to see if you can. `RepartitionExec` is a little 
bit of an outlier because it also breaks the volcano flow by actively draining 
its input pushing it into channels. As you experiment with it, let's make sure 
to test cases with nested repartition operators so that we don't miss any 
corner cases.
   
   Thank you, it makes sense, i will experiment it as a follow-up.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-07 Thread via GitHub


ozankabak commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2952147349

   @zhuqi-lucas, I wanted to make a few final finishing touches as we gave a 
chance in case @alamb wants to take a final look. I changed the config 
terminology from "frequency" to "period" because the former was kind of a 
misnomer. I also did some refactoring to remove some code repeatitions. Can you 
please double check to make sure all looks good on your end? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-07 Thread via GitHub


ozankabak commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2952017708

   I merged the latest from main, this is good to go


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-06 Thread via GitHub


ozankabak commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2951967426

   > I will investigate that if we can remove some internal yield logic, such 
as repartition? etc
   
   Good idea, I'm curious to see if you can. `RepartitionExec` is a little bit 
of an outlier because it breaks the volcano flow by actively draining its input 
pushing it into channels. As you experiment with it, let's make sure to test 
cases with nested repartition operators so that we don't miss any corner cases.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-06 Thread via GitHub


zhuqi-lucas commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2951559003

   > I found some time to work on this tonight and it looks good to me now.
   > 
   > To summarize where we are:
   > 
   > * We add yields to all leaf nodes, but no yields to any intermediate node.
   > * We added a bunch of tests to cover some corner cases and all of them 
pass.
   > * There is a single new `with_cooperative_yields` API, which returns a 
cooperatively yielding version of a plan object (if it exists). If it doesn't 
exist for a leaf node, we add an auxiliary operator to handle yielding.
   > 
   > Future work:
   > 
   > * We will study input-side pipelining behaviors and improve the pipelining 
API, so that we only trigger explicit yielding when it is necessary. Given the 
small number of leaf nodes, we are not that far off from optimality even as is, 
which is great. We have some ideas on what to try here, but the current state 
seems quite good -- so we can merge it to fix downstream issues as we make 
further progress.
   > * We will think about supporting cases involving non-volcano (i.e. spill) 
data flow.
   > 
   > @zhuqi-lucas and @alamb, PTAL
   
   Thank you ,  i agree that we are in good state, because this PR can help 
both datafusion operator and also custom defined operator automatically.
   
   I will also help investigate following case, may be as a follow-up ticket, 
thanks!
   
   > We will think about supporting cases involving non-volcano (i.e. spill) 
data flow.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-06 Thread via GitHub


ozankabak commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2132866332


##
datafusion/datasource/src/source.rs:
##
@@ -179,12 +180,17 @@ pub trait DataSource: Send + Sync + Debug {
 /// the [`FileSource`] trait.
 ///
 /// [`FileSource`]: crate::file::FileSource
+/// We now add a `cooperative` flag to
+/// let it optionally yield back to the runtime periodically.
+/// Default is `true`, meaning it will yield back to the runtime for 
cooperative scheduling.
 #[derive(Clone, Debug)]
 pub struct DataSourceExec {
 /// The source of the data -- for example, `FileScanConfig` or 
`MemorySourceConfig`
 data_source: Arc,
 /// Cached plan properties such as sort order
 cache: PlanProperties,
+/// Indicates whether to enable cooperative yielding mode.
+cooperative: bool,

Review Comment:
   Agreed, this is good for now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-06 Thread via GitHub


pepijnve commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2950129956

   One performance aspect I've been looking at is the cost of yielding. There's 
no magic as far as I can tell. Returning a Pending simply leads to a full 
unwind of the call stack by virtue of the return bubbling up all the way up to 
the tokio executor and then a full descent back to the point where you left off 
using function calls.
   That would suggest it's most interesting to do a cooperative yield from as 
shallow a point as possible in the call tree rather than from the deepest 
possible point so that you keep the roundtrip to the executor and back as short 
as possible.
   
   Running with target_partitions = 1, shows that for queries like the deeply 
nested window/sort query you linked to @ozankabak the call stack can get pretty 
deep. It's essentially proportionate to the depth of the plan tree.
   
   To mitigate this, would it make sense for pipeline breaking operators to run 
their pipeline breaking portion in a SpawnedTask instead of as child? I'm 
thinking of the sort phase of sort, the build phase of join, etc. Regardless of 
how where you inject Pending that seems beneficial to keep the call stack that 
needs to be unwound shallow.
   
   Note that this same argument does suggest it could more interesting to do 
the cooperative yield where the looping is happening rather than where the data 
is produced. The loop is the shallowest point, definitely if you spawn a task 
since that gets you a new root, while the producer is the deepest point.
   
   Cutting the call stack using spawned tasks may also mitigate the deeply 
nested query concern regarding checking for yielding at multiple levels. The 
yield is never going to go beyond the scope of a single task.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-06 Thread via GitHub


ozankabak commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2949658301

   Great! šŸš€


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-06 Thread via GitHub


zhuqi-lucas commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2949615347

   Good result, removed all ignore, corner cases all passed:
   
   ```rust
   running 18 tests
   test 
execution::infinite_cancel::test_infinite_join_agg_cancel::pretend_finite_1_false
 ... ok
   test 
execution::infinite_cancel::test_infinite_agg_cancel::pretend_finite_1_false 
... ok
   test 
execution::infinite_cancel::test_infinite_agg_cancel::pretend_finite_2_true ... 
ok
   test 
execution::infinite_cancel::test_infinite_join_cancel::pretend_finite_1_false 
... ok
   test 
execution::infinite_cancel::test_filter_reject_all_batches_cancel::pretend_finite_2_true
 ... ok
   test 
execution::infinite_cancel::test_filter_reject_all_batches_cancel::pretend_finite_1_false
 ... ok
   test 
execution::infinite_cancel::test_infinite_join_cancel::pretend_finite_2_true 
... ok
   test 
execution::infinite_cancel::test_infinite_join_agg_cancel::pretend_finite_2_true
 ... ok
   test 
execution::infinite_cancel::test_infinite_hash_join_without_repartition_and_no_agg::pretend_finite_1_false
 ... ok
   test 
execution::infinite_cancel::test_infinite_hash_join_without_repartition_and_no_agg::pretend_finite_2_true
 ... ok
   test 
execution::infinite_cancel::test_infinite_interleave_agg_cancel::pretend_finite_2_true
 ... ok
   test 
execution::infinite_cancel::test_infinite_interleave_cancel::pretend_finite_2_true
 ... ok
   test 
execution::infinite_cancel::test_infinite_interleave_agg_cancel::pretend_finite_1_false
 ... ok
   test 
execution::infinite_cancel::test_infinite_interleave_cancel::pretend_finite_1_false
 ... ok
   test 
execution::infinite_cancel::test_infinite_sort_merge_join_without_repartition_and_no_agg::pretend_finite_1_false
 ... ok
   test 
execution::infinite_cancel::test_infinite_sort_merge_join_without_repartition_and_no_agg::pretend_finite_2_true
 ... ok
   test 
execution::infinite_cancel::test_infinite_sort_cancel::pretend_finite_1_false 
... ok
   test 
execution::infinite_cancel::test_infinite_sort_cancel::pretend_finite_2_true 
... ok
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-06 Thread via GitHub


zhuqi-lucas commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2949490754

   > @zhuqi-lucas it may make sense at this point to add built-in yielding to 
the remaining two sources so that you don't have to deal with the diffs
   
   I agree @ozankabak , good point.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-06 Thread via GitHub


ozankabak commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2949486347

   @zhuqi-lucas it may make sense at this point to add built-in yielding to the 
remaining two sources so that you don't have to deal with the diffs


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-06 Thread via GitHub


ozankabak commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2949371256

   > I am wandering a easy solution(may be), we just remove final emissiontype 
check in the rule, and add yield based leaf nodes, it seems can solve all our 
problems:
   
   @zhuqi-lucas I think this is a good idea, let's see where we are at when we 
unconditionally do this at leaf nodes. This would be easy and quick to do. 
Then, we can think about the right API to use instead of emission type to 
arrive at the optimal solution as we collect more tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-06 Thread via GitHub


zhuqi-lucas commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2949387541

   > > I am wandering a easy solution(may be), we just remove final 
emissiontype check in the rule, and add yield based leaf nodes, it seems can 
solve all our problems:
   > 
   > @zhuqi-lucas I think this is a good idea, let's see where we are at when 
we unconditionally do this at leaf nodes. This would be easy and quick to do. 
Then, we can think about the right API to use instead of emission type to 
arrive at the optimal solution as we collect more tests.
   
   Thank you @ozankabak , now i added all 4 reproduce cases which we add ignore 
for those cases.
   
   Let me change the rule to see if we can fix all the corner cases, if so, 
it's very good to see.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-06 Thread via GitHub


pepijnve commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2949117788

   > Tests pass even if add in the "pretending" (because the join code seems to 
yield naturally)
   
   The hash join test I have does fail so I dug into this. It's passing for you 
for two reasons:
   - There's an aggregation in that test, so the plan has emission type final 
and the yield wrapper gets injected
   - If you remove the aggregation there's still a RepartitionExec which itself 
has an ad hoc version of the cooperative yield logic in place at 
https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/repartition/mod.rs#L980.
 It also decouples producer and consumer via the distribution channels. If the 
consumer drains faster than the producer fills you'll get a natural pending 
that way.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-06 Thread via GitHub


zhuqi-lucas commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2949226146

   > @zhuqi-lucas this sort-merge test might also be useful to integrate 
https://github.com/pepijnve/datafusion/blob/cancel_safety/datafusion/core/tests/execution/yielding.rs#L373
 I can adapt this into a PR on your branch if you like. Tests case are no 
longer drop-in I'm afraid. I changed things to make use of an additional tokio 
runtime in order for the tests to not hang in case of failure.
   
   Try to add it also, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-06 Thread via GitHub


zhuqi-lucas commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2949221841

   > There are two versions of the join test, one with and and one without 
aggregation (`test_infinite_join_cancel`). So the reason it is passing is 
something else (maybe the presence of `RepartitionExec`). @zhuqi-lucas if you 
add a third version without repartitioning and it fails, we can use it as a 
useful test vehicle next week. Thanks.
   
   I see it now, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-06 Thread via GitHub


zhuqi-lucas commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2949211366

   Thank you @ozankabak @pepijnve 
   I am wandering a easy solution(may be), we just remove final check in the 
rule, and add yield based leaf nodes, it seems can solve all our problems:
   1. The corner will also be resolved since we have yield in leaf nodes.
   2. The customers will not be exposed new YieldStreamExec when they don't add 
leaf node custom exec, because we will add those built-in yieldstream, it will 
cover all the cases.
   3. When customers add custom exec, we also can handle this, it will 
automatically add YieldStreamExec, this is expected.
   4. The performance will not be affected based our previous testing.
   
   Any idea? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-06 Thread via GitHub


pepijnve commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2949146871

   @zhuqi-lucas this sort-merge test might also be useful to integrate 
https://github.com/pepijnve/datafusion/blob/cancel_safety/datafusion/core/tests/execution/yielding.rs#L373
 I can adapt this into a PR on your branch if you like. Tests case are no 
longer drop-in I'm afraid. I changed things to make use of an additional tokio 
runtime in order for the tests to not hang in case of failure.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-06 Thread via GitHub


ozankabak commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2949140262

   There are two versions of the join test, one with and and one without 
aggregation (`test_infinite_join_cancel`). So the reason it is passing is 
something else (maybe the presence of `RepartitionExec`). @zhuqi-lucas if you 
add a third version without repartitioning and it fails, we can use it as a 
useful test vehicle next week. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-06 Thread via GitHub


zhuqi-lucas commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2132112459


##
datafusion/physical-optimizer/src/wrap_leaves_cancellation.rs:
##
@@ -76,7 +77,8 @@ impl WrapLeaves {
 plan: Arc,
 yield_frequency: usize,
 ) -> Result>> {
-let is_pipeline_breaker = plan.properties().emission_type == 
EmissionType::Final;
+// todo this is a bit of a hack, we should probably have a more 
explicit way to handle
+let is_pipeline_breaker = plan.properties().emission_type == 
EmissionType::Final|| plan.as_any().is::();

Review Comment:
   This code was removed by me, and we need to find a better solution for the 
filter and join corner cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-06 Thread via GitHub


zhuqi-lucas commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2949124590

   > @zhuqi-lucas, I took the liberty of parametrizing the tests and hardening 
them. We now have two failing (ignored) tests (the filter test, which is 
probably trivial to fix and more related to filter itself, and a pure 
interleave test without the aggregation).
   > 
   > The test harness is now good for a first cut. I hope to see whether our 
approach bears fruit next week, or learn whether we face some fundamental issue.
   
   Thank you @ozankabak , it makes sense to me. And i will also think about the 
solution for the fail test case.
   
   
   
   > > Tests pass even if add in the "pretending" (because the join code seems 
to yield naturally)
   > 
   > The hash join test I have does fail so I dug into this. It's passing for 
you for two reasons:
   > 
   > * There's an aggregation in that test, so the plan has emission type final 
and the yield wrapper gets injected
   > * If you remove the aggregation there's still a RepartitionExec which 
itself has an ad hoc version of the cooperative yield logic in place at 
https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/repartition/mod.rs#L980.
 It also decouples producer and consumer via the distribution channels. If the 
consumer drains faster than the producer fills you'll get a natural pending 
that way.
   
   Thank you @pepijnve , i agree, i still didn't add the corner case for join, 
i think it will also happen similar to filter, but the solution may be similar. 
I will remove the aggregation for the join corner case.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-06 Thread via GitHub


ozankabak commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2949112371

   @zhuqi-lucas, I took the liberty of parametrizing the tests and hardening 
them. We now have two failing tests (the filter test, which is probably trivial 
to fix and more related to filter itself, and a pure interleave test without 
the aggregation).
   
   The test harness is now good for a first cut. I hope to see whether our 
approach bears fruit next week, or learn whether we face some fundamental issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-06 Thread via GitHub


ozankabak commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2948780089

   @zhuqi-lucas, I didn't get a chance to take a deep look as I'm traveling, 
but a cursory look suggests the only open issue is with the filter test. Is 
that right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-06 Thread via GitHub


pepijnve commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2131883855


##
datafusion/physical-optimizer/src/wrap_leaves_cancellation.rs:
##
@@ -76,7 +77,8 @@ impl WrapLeaves {
 plan: Arc,
 yield_frequency: usize,
 ) -> Result>> {
-let is_pipeline_breaker = plan.properties().emission_type == 
EmissionType::Final;
+// todo this is a bit of a hack, we should probably have a more 
explicit way to handle
+let is_pipeline_breaker = plan.properties().emission_type == 
EmissionType::Final|| plan.as_any().is::();

Review Comment:
   I was curious to see how this case could be handled. The consequence is that 
in many situations the yield logic will be injected unnecessarily causing the 
root stream to emit pending even though ready's are arriving at a steady pace. 
This is a bit at odds with the desire to keep performance overhead minimal.
   I don't think there's a way to solve this external to `FilterExecStream` 
since you're trying to count the number of times it consecutively discarded 
full batches, not the number of times it polled its input.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-06 Thread via GitHub


zhuqi-lucas commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2948651054

   > > most CPU/IO cost still comes from predicate evaluation or hash‐join 
builds, etc. The variability you’re seeing in PR 16262’s benchmarks is 
therefore probably just noise rather than a real performance regression.
   > 
   > That is my expectation as well. I wanted to evaluate the query @ozankabak 
referred to with many nested pipeline blockers. It's kind of hard to assess the 
impact with a noisy measuring device.
   > 
   > @zhuqi-lucas I would still like to pursue the alternative approach 
(basically an extend version of what you originally proposed), but I'm reaching 
the limits of my current Rust skills and would like to solicit some input from 
others. I don't want to do that here though to keep this PR focussed on the 
optimizer rule approach. Would you be offended if I made a secondary draft PR 
that I can point people to where I clearly state it's a potential alternative 
for this one but not intended to replace it? I would like to write up the 
various design tradeoffs I've been looking at, but this comment thread is not 
the right place. Not sure where else would be appropriate besides another PR.
   
   @pepijnve Of course, you can do another draft PR, if after discussion, the 
another direction is the better way, we also can go there.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-06 Thread via GitHub


pepijnve commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2948608948

   > most CPU/IO cost still comes from predicate evaluation or hash‐join 
builds, etc. The variability you’re seeing in PR 16262’s benchmarks is 
therefore probably just noise rather than a real performance regression.
   
   That is my expectation as well. I wanted to evaluate the query @ozankabak 
referred to with many nested pipeline blockers. It's kind of hard to assess the 
impact with a noisy measuring device.
   
   @zhuqi-lucas I would still like to pursue the alternative approach 
(basically an extend version of what you originally proposed), but I'm reaching 
the limits of my current Rust skills and would like to solicit some input from 
others. I don't want to do that here though to keep this PR focussed on the 
optimizer rule approach. Would you be offended if I made a secondary draft PR 
that I can point people to where I clearly state it's a potential alternative 
for this one but not intended to replace it? I would like to write up the 
various design tradeoffs I've been looking at, but this comment thread is not 
the right place. Not sure where else would be appropriate besides another PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-06 Thread via GitHub


zhuqi-lucas commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2948434379

   Updated the reproducer case now in latest PR, and i commented it because our 
rule or built-in Yield still not fix this.
   
   ```rust
   #[tokio::test]
   async fn test_filter_reject_all_batches_cancel() -> Result<(), Box> {
   // 1) Create a Session, Schema, and an 8K-row RecordBatch
   let session_ctx = SessionContext::new();
   let schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
   "value",
   DataType::Int64,
   false,
   )]));
   
   // Build a batch with values 0..8191
   let mut builder = Int64Array::builder(8_192);
   for v in 0..8_192 {
   builder.append_value(v);
   }
   let batch = Arc::new(RecordBatch::try_new(
   schema.clone(),
   vec![Arc::new(builder.finish())],
   )?);
   
   // 2a) Wrap this batch in an InfiniteExec
   let infinite = Arc::new(InfiniteExec::new(&batch));
   
   // 2b) Construct a FilterExec that is always false: ā€œvalue > 1ā€ (no 
rows pass)
   let false_predicate: Arc = Arc::new(BinaryExpr::new(
   Arc::new(Column::new_with_schema("value", &schema)?),
   Gt,
   Arc::new(Literal::new(ScalarValue::Int64(Some(10_000,
   ));
   let filtered: Arc =
   Arc::new(FilterExec::try_new(false_predicate, infinite.clone())?);
   
   // 2c) Use CoalesceBatchesExec to guarantee each Filter pull always 
yields an 8192-row batch
   let coalesced: Arc =
   Arc::new(CoalesceBatchesExec::new(filtered.clone(), 8_192));
   
   // 2d) Hash-repartition into 1 partition (so that a later global 
aggregation would run on a single partition)
   let exprs: Vec> =
   vec![Arc::new(Column::new_with_schema("value", &schema)?)];
   let part = Partitioning::Hash(exprs.clone(), 1);
   let hashed: Arc =
   Arc::new(RepartitionExec::try_new(coalesced.clone(), part.clone())?);
   
   // 4) WrapLeaves to insert YieldExec—so that the InfiniteExec yields 
control between batches
   let config = ConfigOptions::new();
   let optimized = WrapLeaves::new().optimize(hashed, &config)?;
   
   // 5) Execute with a 1-second timeout. Because Filter discards all 8192 
rows each time
   //without ever producing output, no batch will arrive within 1 
second. And since
   //emission type is not Final, we never see an end‐of‐stream marker.
   let mut stream = physical_plan::execute_stream(optimized, 
session_ctx.task_ctx())?;
   const TIMEOUT: u64 = 1;
   let result = select! {
   batch_opt = stream.next() => batch_opt,
   _ = tokio::time::sleep(tokio::time::Duration::from_secs(TIMEOUT)) => 
{
   None
   }
   };
   assert!(
   result.is_none(),
   "Expected no output for infinite + filter(all-false) + aggregate, 
but got a batch"
   );
   Ok(())
   }
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-06 Thread via GitHub


zhuqi-lucas commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2948429253

   @pepijnve  I don’t expect either placement of YieldExec to materially affect 
throughput—most CPU/IO cost still comes from predicate evaluation or hash‐join 
builds. The variability you’re seeing in PR 16262’s benchmarks is therefore 
probably just noise (e.g. OS scheduling, JIT warmup) rather than a real 
performance regression.
   
   > @zhuqi-lucas @alamb I wanted to work on measuring the performance impact 
of this PR today, but looking at [#16262 
(review)](https://github.com/apache/datafusion/pull/16262#pullrequestreview-2903139531)
 I'm a bit surprised to see similar variability being reported as I'm seeing in 
the benchmark results of my own experimental branch. For PR 16262 no changes 
were made to the production code but you still see performance deltas. This 
begs the question how the benchmarks can/should be used to evaluate code 
changes. How are you guys making use of these results? Should I be using 
something else for coarse grained performance impact assessment?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-05 Thread via GitHub


pepijnve commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2948268910

   @zhuqi-lucas @alamb I wanted to work on measuring the performance impact of 
this PR today, but looking at 
https://github.com/apache/datafusion/pull/16262#pullrequestreview-2903139531 
I'm a bit surprised to see similar variability being reported as I'm seeing in 
the benchmark results of my own experimental branch. For PR 16262 no changes 
were made to the production code but you still see performance deltas. This 
begs the question how the benchmarks can/should be used to evaluate code 
changes. How are you guys making use of these results? Should I be using 
something else for coarse grained performance impact assessment?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-05 Thread via GitHub


ozankabak commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2945161909

   I will be traveling tomorrow, but myself and @berkaysynnada will help drive 
this to completion early next week. I made some progress on sketching out a 
good API and will circle back next week. In the meantime, let's collect all the 
test cases we can. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-05 Thread via GitHub


pepijnve commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2945139509

   Changing hats to DataFusion user mode where I need to make sure that the end 
users of our system can press 'cancel' at any time and that works as expected.
   
   From that perspective here's a possible useful test case (and maybe an 
illustration of a more general problem): suppose you have a query like `select 
sum(size) as sum from t group by name order by sum` that produces a large 
number of distinct groups. The query plan for this today is:
   
   ```
   SortPreservingMergeExec: [sum@0 ASC NULLS LAST]
 SortExec: expr=[sum@0 ASC NULLS LAST], preserve_partitioning=[true]
   ProjectionExec: expr=[sum(t.size)@1 as sum]
 AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], 
aggr=[sum(t.size)]
   CoalesceBatchesExec: target_batch_size=8192
 RepartitionExec: partitioning=Hash([name@0], 10), 
input_partitions=10
   AggregateExec: mode=Partial, gby=[name@0 as name], 
aggr=[sum(t.size)]
 RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1
   DataSourceExec: file_groups={1 group: [[]]}, 
projection=[name, size], file_type=...
   ```
   
   If I'm reading the code correctly, once the `FinalPartitioned` aggregation 
has drained the original input it may switch over to reading back spill files. 
At that point the original input (and yield exec wrapping it) are taken out of 
the picture. Unless I'm mistaken, once the query hits that phase it may not be 
interruptible again unless some yield guarantee is injected again.
   I don't have a good idea for how to write a practical test case for this 
though. You would have to drive a sufficiently large query all the way to this 
point to be able to observer the behavior.
   
   I wonder if this illustrates that only analyzing the static picture of the 
query at planning time is insufficient because it does not (and probably 
cannot) take the dynamic behavior of the query into account. The actual tree of 
streams and the points where you might need yield wrappers can change as the 
query is executing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-05 Thread via GitHub


zhuqi-lucas commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2944834501

   @ozankabak @pepijnve 
   Interesting, i have added interleave corner testing case 
**test_infinite_interleave_agg_cancel** now which try to reproduce the corner 
case, but it works very well for our current PR, i can't reproduce the corner 
cases.
   
   The case is very corner case which will have more than 30 infinite source, 
in theory it's easy to reproduce it, but it works well, so i am wandering we 
don't need to do anything for interleave until now?
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-05 Thread via GitHub


pepijnve commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2944505070

   > I feel like we are getting close to a point where we start having 
not-so-fruitful discussions. I think I have made a good effort to make my 
arguments and reasoning clear.
   
   @ozankabak My apologies. I didn't mean to derail your efforts here and I'll 
refrain from adding any more noise to the thread (beyond this, sorry). I 
appreciate the fact that you guys have much much more experience working in 
this codebase. I'm really trying to make a good faith contribution here where 
we compare the pros/cons of both approaches via measurements (API impact, 
performance impact, etc.), but I'll back off.
   
   FWIW, I've added some more tests cases in the meantime that you guys can use 
or ignore however you see fit. I also have some benchmark results from a first 
run at https://gist.github.com/pepijnve/21fbd480ae3e60f780446ace974d3ef5. It's 
a very mixed bag. I'm going to run the suite again a couple of times to see if 
this is consistent or not before I dig deeper.
   
   During the runs I'm seeing so much variability in runtime on both branches 
that I have my doubts how meaningful these results are. Would it be useful to 
let the benchmark perform more runs and adapt the tool a bit to report on mean 
and standard deviation rather than just average?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-05 Thread via GitHub


zhuqi-lucas commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2128845315


##
datafusion/common/src/config.rs:
##
@@ -722,6 +722,19 @@ config_namespace! {
 /// then the output will be coerced to a non-view.
 /// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to 
`LargeBinary`.
 pub expand_views_at_output: bool, default = false
+
+/// When true, the optimizer will insert a Yield operator at the leaf 
nodes of any pipeline
+/// that contains a pipeline-breaking operator, allowing the Tokio 
scheduler to switch to
+/// other tasks while waiting.
+/// Default: true (enabled).
+pub enable_add_yield_for_pipeline_break: bool, default = true
+
+/// Yield frequency in batches, it represents how many batches to 
process before yielding
+/// to the Tokio scheduler. The default value is 64, which means that 
after processing
+/// 64 batches, the execution will yield control back to the Tokio 
scheduler.
+/// This setting is only effective when 
`enable_add_yield_for_pipeline_break` is set to true.
+/// This value should be greater than 0.
+pub yield_frequency_for_pipeline_break: usize, default = 64

Review Comment:
   Addressed in latest RP, thanks!



##
datafusion/physical-plan/src/execution_plan.rs:
##
@@ -75,7 +75,19 @@ use futures::stream::{StreamExt, TryStreamExt};
 /// [`execute`]: ExecutionPlan::execute
 /// [`required_input_distribution`]: ExecutionPlan::required_input_distribution
 /// [`required_input_ordering`]: ExecutionPlan::required_input_ordering
-pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
+/// The core trait for a physical execution plan node. Every operator
+/// implements this trait. We have extended it by adding two new methods
+/// for ā€œcooperative yieldingā€ support:
+///
+/// 1. `yields_cooperatively()` indicates whether this operator already
+///supports async/yield behavior internally (default: false).
+///
+/// 2. `with_cooperative_yields(self: Arc)` returns an alternate
+///plan node that has built-in yielding; if not available, returns None.
+///
+/// Because `with_cooperative_yields` moves `Arc` into `Arc`,
+/// we must ensure `Self: 'static`. Therefore, we add `+ 'static` here.
+pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync + 'static {

Review Comment:
   Addressed in latest RP, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-05 Thread via GitHub


ozankabak commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2944338401

   Great, let's add tests for all cases we currently fail to cover (interleave, 
join, filter). We will then use them as litmus tests as we iterate on the API 
and the rule.
   
   šŸš€


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-05 Thread via GitHub


zhuqi-lucas commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2128845907


##
datafusion/physical-plan/src/execution_plan.rs:
##
@@ -75,7 +75,19 @@ use futures::stream::{StreamExt, TryStreamExt};
 /// [`execute`]: ExecutionPlan::execute
 /// [`required_input_distribution`]: ExecutionPlan::required_input_distribution
 /// [`required_input_ordering`]: ExecutionPlan::required_input_ordering
-pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
+/// The core trait for a physical execution plan node. Every operator
+/// implements this trait. We have extended it by adding two new methods
+/// for ā€œcooperative yieldingā€ support:
+///
+/// 1. `yields_cooperatively()` indicates whether this operator already
+///supports async/yield behavior internally (default: false).
+///
+/// 2. `with_cooperative_yields(self: Arc)` returns an alternate
+///plan node that has built-in yielding; if not available, returns None.
+///
+/// Because `with_cooperative_yields` moves `Arc` into `Arc`,
+/// we must ensure `Self: 'static`. Therefore, we add `+ 'static` here.

Review Comment:
   Addressed in latest RP, thanks!



##
datafusion/physical-plan/src/memory.rs:
##
@@ -139,13 +140,18 @@ pub trait LazyBatchGenerator: Send + Sync + fmt::Debug + 
fmt::Display {
 ///
 /// This plan generates output batches lazily, it doesn't have to buffer all 
batches
 /// in memory up front (compared to `MemorySourceConfig`), thus consuming 
constant memory.
+/// We now add a `cooperative` flag to
+/// let it optionally yield back to the runtime periodically.
+/// Default is `true`, meaning it will yield back to the runtime for 
cooperative scheduling.

Review Comment:
   Addressed in latest RP, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-05 Thread via GitHub


zhuqi-lucas commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2128846243


##
datafusion/sqllogictest/test_files/group_by.slt:
##
@@ -4113,7 +4113,7 @@ EXPLAIN SELECT lhs.c, rhs.c, lhs.sum1, rhs.sum1
 
 logical_plan
 01)Projection: lhs.c, rhs.c, lhs.sum1, rhs.sum1
-02)--Cross Join:
+02)--Cross Join: 

Review Comment:
   Addressed in latest RP, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-05 Thread via GitHub


zhuqi-lucas commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2128830214


##
datafusion/physical-plan/src/execution_plan.rs:
##
@@ -546,6 +558,23 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
 child_pushdown_result,
 ))
 }
+
+/// Whether this operator supports cooperative yielding. Default is false.
+fn yields_cooperatively(&self) -> bool {
+false

Review Comment:
   Thank you @ozankabak for this good point.
   
   But i tried to add emission_type related behaviour, but actually currently, 
we only use emission_type final to choose the leaf nodes to add the yield 
stream. 
   
   So this return false, it's the clear and easy way until now. If we change 
our design later, such as we add the yield behaviour not for leaves, but before 
the  emission_type final node, we may can change this to related emission_type, 
but it's also not a easy way, because we need to insert to before emission_type 
final node not after the emission_type final node.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-05 Thread via GitHub


zhuqi-lucas commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2128809976


##
datafusion/physical-plan/src/execution_plan.rs:
##
@@ -75,7 +75,19 @@ use futures::stream::{StreamExt, TryStreamExt};
 /// [`execute`]: ExecutionPlan::execute
 /// [`required_input_distribution`]: ExecutionPlan::required_input_distribution
 /// [`required_input_ordering`]: ExecutionPlan::required_input_ordering
-pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
+/// The core trait for a physical execution plan node. Every operator
+/// implements this trait. We have extended it by adding two new methods
+/// for ā€œcooperative yieldingā€ support:
+///
+/// 1. `yields_cooperatively()` indicates whether this operator already
+///supports async/yield behavior internally (default: false).
+///
+/// 2. `with_cooperative_yields(self: Arc)` returns an alternate
+///plan node that has built-in yielding; if not available, returns None.
+///
+/// Because `with_cooperative_yields` moves `Arc` into `Arc`,
+/// we must ensure `Self: 'static`. Therefore, we add `+ 'static` here.

Review Comment:
   I agree, it's neat. Thank you!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-05 Thread via GitHub


zhuqi-lucas commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2128809331


##
datafusion/datasource/src/source.rs:
##
@@ -179,12 +180,17 @@ pub trait DataSource: Send + Sync + Debug {
 /// the [`FileSource`] trait.
 ///
 /// [`FileSource`]: crate::file::FileSource
+/// We now add a `cooperative` flag to
+/// let it optionally yield back to the runtime periodically.
+/// Default is `true`, meaning it will yield back to the runtime for 
cooperative scheduling.
 #[derive(Clone, Debug)]
 pub struct DataSourceExec {
 /// The source of the data -- for example, `FileScanConfig` or 
`MemorySourceConfig`
 data_source: Arc,
 /// Cached plan properties such as sort order
 cache: PlanProperties,
+/// Indicates whether to enable cooperative yielding mode.
+cooperative: bool,

Review Comment:
   Thank you @ozankabak  for this question, here is my understanding.
   
   I was trying to add this in data_source, but it seems not a good way because:
   
   1. We need to add it in all data_source reference implement. Such as 
FileScanConfig, MemorySourceConfig.
   2. If we add new source based data_source, we need to add it again.
   3. FileScanConfig also need different FileSource: ParquetSource, CsvSource, 
etc. Which add complexity for this config to inject.
   
   It's easy for us to do in high level, DataSourceExec is the good place for 
my trying until now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-05 Thread via GitHub


ozankabak commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2943759107

   > @ozankabak I still don't think you need all this API work since there's a 
zero API change way to deal with cancellation already. Tests all pass with no 
API changes in the 'all Stream implementations must be well behaved Tokio 
citizens' approach. I understand the performance concern, but maybe it's a bit 
premature to design APIs before knowing what the actual performance impact is? 
In terms of code changes I don't think the complexity argument holds since the 
required code changes were fairly trivial.
   
   > I've found a dedicated machine to run the benchmarks on in the meantime. 
It's 10 year old hardware (Xeon E5620) so the compile times take forever, but 
should be good enough for relative comparisons. Will post results when I get 
them.
   
   @pepijnve, I respect your opinion but we will need to agree to disagree. 
After spending a lot of time (and writing a lot of upstream *and* downstream DF 
code) over the last few years for leveraging the async runtime and its 
challenges/advantages, issues related to pipeline-breaking, performance 
implications of these things, the APIs `ExecutionPlan` objects should provide, 
responsibilities of the planner vs. operators and others, my intuition tells me 
that:
   1. There is some information the `ExecutionPlan` API doesn't expose yet 
about input pipelining behavior and propagation of pendings, and it should -- 
not just for this use case, but others too.
   2. There is a way to solve this problem universally with optimally minimal 
overhead and it is not that hard to figure out.
   3. This way will also help us reduce the responsibility of user-defined 
operators, and solve cancellability even without their strict cooperation.
   4. There is a lot of downstream users who define user-defined operators, and 
any "win" (in the above sense) for such use cases is important for our project 
goals.
   5. I suspect (and my confidence is somewhat lower on this point relative to 
others) we will always be able to construct some cases, however contrived, 
where an "everyone always yields" solution will suffer from performance 
problems.
   6. My intuition could be wrong, and if this thesis (1-2-3) indeed turns out 
to be wrong, we can take the learnings and fall back to another solution, where 
your proposal would be a good candidate.
   
   I feel like we are getting close to a point where we start having 
not-so-fruitful discussions. I think I have made a good effort to make my 
arguments and reasoning clear. We will see where this effort goes (and I'm very 
hopeful that we will succeed), and if you want to help, we will always take it 
with appreciation. I think you can probably relate to the position that I would 
like to focus my thinking on getting 1-2-3 done (if possible) in the 
short-term, instead of repeatedly spending time on justifying what we are 
doing. If this route doesn't work, I will gladly help with finding another 
solution (and maybe that will your approach).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-05 Thread via GitHub


zhuqi-lucas commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2128549342


##
datafusion/physical-plan/src/execution_plan.rs:
##
@@ -75,7 +75,19 @@ use futures::stream::{StreamExt, TryStreamExt};
 /// [`execute`]: ExecutionPlan::execute
 /// [`required_input_distribution`]: ExecutionPlan::required_input_distribution
 /// [`required_input_ordering`]: ExecutionPlan::required_input_ordering
-pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
+/// The core trait for a physical execution plan node. Every operator
+/// implements this trait. We have extended it by adding two new methods
+/// for ā€œcooperative yieldingā€ support:
+///
+/// 1. `yields_cooperatively()` indicates whether this operator already
+///supports async/yield behavior internally (default: false).
+///
+/// 2. `with_cooperative_yields(self: Arc)` returns an alternate
+///plan node that has built-in yielding; if not available, returns None.
+///
+/// Because `with_cooperative_yields` moves `Arc` into `Arc`,
+/// we must ensure `Self: 'static`. Therefore, we add `+ 'static` here.
+pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync + 'static {

Review Comment:
   Yeah, i misunderstand the logic here, thanks @ozankabak .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-05 Thread via GitHub


zhuqi-lucas commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2128526165


##
datafusion/common/src/config.rs:
##
@@ -722,6 +722,19 @@ config_namespace! {
 /// then the output will be coerced to a non-view.
 /// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to 
`LargeBinary`.
 pub expand_views_at_output: bool, default = false
+
+/// When true, the optimizer will insert a Yield operator at the leaf 
nodes of any pipeline
+/// that contains a pipeline-breaking operator, allowing the Tokio 
scheduler to switch to
+/// other tasks while waiting.
+/// Default: true (enabled).
+pub enable_add_yield_for_pipeline_break: bool, default = true
+
+/// Yield frequency in batches, it represents how many batches to 
process before yielding
+/// to the Tokio scheduler. The default value is 64, which means that 
after processing
+/// 64 batches, the execution will yield control back to the Tokio 
scheduler.
+/// This setting is only effective when 
`enable_add_yield_for_pipeline_break` is set to true.
+/// This value should be greater than 0.
+pub yield_frequency_for_pipeline_break: usize, default = 64

Review Comment:
   Great suggestion! @ozankabak  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-05 Thread via GitHub


zhuqi-lucas commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2128523292


##
datafusion/sqllogictest/test_files/joins.slt:
##
@@ -4702,7 +4702,8 @@ physical_plan
 01)CrossJoinExec
 02)--DataSourceExec: partitions=1, partition_sizes=[0]
 03)--ProjectionExec: expr=[1 as Int64(1)]
-04)PlaceholderRowExec
+04)YieldStreamExec frequency=64
+05)--PlaceholderRowExec

Review Comment:
   Got it, let's focus the major design part first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-05 Thread via GitHub


zhuqi-lucas commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2943654190

   > @zhuqi-lucas regarding corner cases, I've added two additional tests on my 
branch for filter and join.
   > 
   > Filter can refuse to cancel if the filter rejects many full batches. I've 
added a test case that filters out everything to emulate this.
   > 
   > Joins can block in their build phase. I've emulated this situation using 
the infinite source that pretends to be bounded rather than unbounded.
   > 
   > In both cases the emission type is not final for these plans.
   
   
   
   > Left an initial round of reviews, let's address them and do another round
   
   
   Thank you @ozankabak for review and great suggestions, i will try to address 
soon. 
   
   
   
   
   > @zhuqi-lucas regarding corner cases, I've added two additional tests on my 
branch for filter and join.
   > 
   > Filter can refuse to cancel if the filter rejects many full batches. I've 
added a test case that filters out everything to emulate this.
   > 
   > Joins can block in their build phase. I've emulated this situation using 
the infinite source that pretends to be bounded rather than unbounded.
   > 
   > In both cases the emission type is not final for these plans.
   
   Thank you @pepijnve ,  let me try to learn and add the complex testing 
cases, it's really helpful.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-05 Thread via GitHub


pepijnve commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2943553646

   @zhuqi-lucas FYI, I've reworked the 'infinite stream' to 'range stream' in 
my tests. It simply emits a `Range` now. I've added an additional evil 
test case for sort-merge join where I'm inner joining `[i64::MIN, 0]` with `[0, 
i64::MAX]` with no further statistics. Again to emulate the join blocking in 
its data collection phase.
   
   @ozankabak I still don't think you need all this API work since there's a 
zero API change way to deal with cancellation already. Tests all pass with no 
API changes in the 'all Stream implementations must be well behaved Tokio 
citizens' approach. I understand the performance concern, but maybe it's a bit 
premature to design APIs before knowing what the actual performance impact is? 
In terms of code changes I don't think the complexity argument holds since the 
required code changes were fairly trivial.
   
   I've found a dedicated machine to run the benchmarks on in the meantime. 
It's 10 year old hardware (Xeon E5620) so the compile times take forever, but 
should be good enough for relative comparisons. Will post results when I get 
them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-05 Thread via GitHub


ozankabak commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2943327837

   Hmm, I wonder if we design the `yields_cooperatively` API to return the 
following `enum`:
   ```rust
   pub enum InputPipelineBehavior {
  Source(bool),
  Intermediate(Vec),
   }
   ```
   where the operator returns:
   - `Source(false)` when it is a source and doesn't cooperate,
   - `Source(true)` when it is a source and cooperates,
   - `Intermediate(Vec)` when it is an intermediate node with children. 
Each boolean in the vector indicates whether the operator cooperates while it 
consumes that child.
   
   The default implementation would return `Source(false)` for leaf nodes, and 
`Intermediate(vec![self.pipeline_behavior() != EmissionType::Final, 
self.children().len())` for non-leaf nodes. Plans like joins can override this 
depending on whether they are in `CollectLeft` mode etc.
   
   Thoughts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-05 Thread via GitHub


ozankabak commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2128238867


##
datafusion/datasource/src/source.rs:
##
@@ -179,12 +180,17 @@ pub trait DataSource: Send + Sync + Debug {
 /// the [`FileSource`] trait.
 ///
 /// [`FileSource`]: crate::file::FileSource
+/// We now add a `cooperative` flag to
+/// let it optionally yield back to the runtime periodically.
+/// Default is `true`, meaning it will yield back to the runtime for 
cooperative scheduling.
 #[derive(Clone, Debug)]
 pub struct DataSourceExec {
 /// The source of the data -- for example, `FileScanConfig` or 
`MemorySourceConfig`
 data_source: Arc,
 /// Cached plan properties such as sort order
 cache: PlanProperties,
+/// Indicates whether to enable cooperative yielding mode.
+cooperative: bool,

Review Comment:
   Should this flag should come from the `data_source` object? That is what 
gives us the stream (which may or may not yield to the runtime).



##
datafusion/datasource/src/source.rs:
##
@@ -256,7 +262,39 @@ impl ExecutionPlan for DataSourceExec {
 partition: usize,
 context: Arc,
 ) -> Result {
-self.data_source.open(partition, context)
+// 1. Get the ā€œbaseā€ stream exactly as before, without yielding.
+let stream = self.data_source.open(partition, Arc::clone(&context));
+
+// 2. If cooperative == false, return base_stream immediately.
+if !self.cooperative {
+return stream;
+}
+
+let frequency = context
+.session_config()
+.options()
+.optimizer
+.yield_frequency_for_pipeline_break;
+
+// 3. If cooperative == true, wrap the stream into a YieldStream.
+let yielding_stream = YieldStream::new(stream?, frequency);
+Ok(Box::pin(yielding_stream))
+}
+
+/// Override: this operator *does* support cooperative yielding when 
`cooperative == true`.
+fn yields_cooperatively(&self) -> bool {
+self.cooperative
+}
+
+/// If `cooperative == true`, return `Some(self.clone())` so the optimizer 
knows
+/// we can replace a plain DataSourceExec with this same node (it already 
yields).
+/// Otherwise, return None.
+fn with_cooperative_yields(self: Arc) -> Option> {
+if self.cooperative {
+Some(self)
+} else {
+None
+}

Review Comment:
   ```suggestion
   self.cooperative.then_some(self)
   ```



##
datafusion/sqllogictest/test_files/joins.slt:
##
@@ -4702,7 +4702,8 @@ physical_plan
 01)CrossJoinExec
 02)--DataSourceExec: partitions=1, partition_sizes=[0]
 03)--ProjectionExec: expr=[1 as Int64(1)]
-04)PlaceholderRowExec
+04)YieldStreamExec frequency=64
+05)--PlaceholderRowExec

Review Comment:
   Maybe we can, but let's first finalize the design and implementation for 
others.



##
datafusion/physical-plan/src/memory.rs:
##
@@ -139,13 +140,18 @@ pub trait LazyBatchGenerator: Send + Sync + fmt::Debug + 
fmt::Display {
 ///
 /// This plan generates output batches lazily, it doesn't have to buffer all 
batches
 /// in memory up front (compared to `MemorySourceConfig`), thus consuming 
constant memory.
+/// We now add a `cooperative` flag to
+/// let it optionally yield back to the runtime periodically.
+/// Default is `true`, meaning it will yield back to the runtime for 
cooperative scheduling.

Review Comment:
   Unnecessary



##
datafusion/core/tests/physical_optimizer/enforce_distribution.rs:
##
@@ -3534,6 +3534,7 @@ async fn test_distribute_sort_memtable() -> Result<()> {
 let session_config = SessionConfig::new()
 .with_repartition_file_min_size(1000)
 .with_target_partitions(3);
+

Review Comment:
   ```suggestion
   ```



##
datafusion/common/src/config.rs:
##
@@ -722,6 +722,19 @@ config_namespace! {
 /// then the output will be coerced to a non-view.
 /// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to 
`LargeBinary`.
 pub expand_views_at_output: bool, default = false
+
+/// When true, the optimizer will insert a Yield operator at the leaf 
nodes of any pipeline
+/// that contains a pipeline-breaking operator, allowing the Tokio 
scheduler to switch to
+/// other tasks while waiting.
+/// Default: true (enabled).
+pub enable_add_yield_for_pipeline_break: bool, default = true
+
+/// Yield frequency in batches, it represents how many batches to 
process before yielding
+/// to the Tokio scheduler. The default value is 64, which means that 
after processing
+/// 64 batches, the execution will yield control back to the Tokio 
scheduler.
+/// This setting is only effective when 
`enable_add_yield_for_pipeline_break` is set to true.
+/// This value should be greater than 0.
+pub yiel

Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-05 Thread via GitHub


pepijnve commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2943221060

   @zhuqi-lucas regarding corner cases, I've added two additional tests on my 
branch for filter and join.
   
   Filter can refuse to cancel if the filter rejects many full batches. I've 
added a test case that filters out everything to emulate this.
   
   Joins can block in their build phase. I've emulated this situation using the 
infinite source that pretends to be bounded rather than unbounded.
   
   In both cases the emission type is not final for these plans.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-04 Thread via GitHub


zhuqi-lucas commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2128032920


##
datafusion/physical-optimizer/src/wrap_leaves_cancellation.rs:
##
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::PhysicalOptimizerRule;
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
+use datafusion_common::Result;
+use datafusion_physical_plan::execution_plan::EmissionType;
+use datafusion_physical_plan::yield_stream::YieldStreamExec;
+use datafusion_physical_plan::ExecutionPlan;
+use std::fmt::{Debug, Formatter};
+use std::sync::Arc;
+
+/// WrapLeaves is a PhysicalOptimizerRule that finds every
+/// pipeline‐breaking node (emission_type == Final) and then
+/// wraps all of its leaf children in YieldStreamExec.
+pub struct WrapLeaves {}
+
+impl WrapLeaves {
+pub fn new() -> Self {
+Self {}
+}
+
+/// This function is called on every plan node during transform_down().
+/// If the node is a leaf (no children), we wrap it in a new 
YieldStreamExec
+/// and stop recursing further under that branch (TreeNodeRecursion::Jump).
+fn wrap_leaves(
+plan: Arc,
+yield_frequency: usize,
+) -> Result>> {
+if plan.children().is_empty() {
+// If the leaf node already has a built-in yielding variant:

Review Comment:
   This is the logic which we support built-in checking and exclude to add a 
new YieldStreamExec.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-04 Thread via GitHub


zhuqi-lucas commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2128017451


##
datafusion/sqllogictest/test_files/joins.slt:
##
@@ -4702,7 +4702,8 @@ physical_plan
 01)CrossJoinExec
 02)--DataSourceExec: partitions=1, partition_sizes=[0]
 03)--ProjectionExec: expr=[1 as Int64(1)]
-04)PlaceholderRowExec
+04)YieldStreamExec frequency=64
+05)--PlaceholderRowExec

Review Comment:
   I have enabled the flag default to true, for all the slt/testing, only two 
cases for adding YieldStreamExec, they are PlaceholderRowExec and WorkTableExec.
   Do we also need to make PlaceholderRowExec and WorkTableExec as a built-in 
YieldStream? Or it's ok for us to add YieldStreamExec for this two cases?
   
   cc @ozankabak @alamb 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-04 Thread via GitHub


zhuqi-lucas commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2128017451


##
datafusion/sqllogictest/test_files/joins.slt:
##
@@ -4702,7 +4702,8 @@ physical_plan
 01)CrossJoinExec
 02)--DataSourceExec: partitions=1, partition_sizes=[0]
 03)--ProjectionExec: expr=[1 as Int64(1)]
-04)PlaceholderRowExec
+04)YieldStreamExec frequency=64
+05)--PlaceholderRowExec

Review Comment:
   I have enabled the flag default to true, for all the slt/testing, only one 
case for adding YieldStreamExec, it's PlaceholderRowExec.
   
   Do we also need to make PlaceholderRowExec as a built-in YieldStream? Or 
it's ok for us to add YieldStreamExec for this only cases?
   
   cc @ozankabak @alamb 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-04 Thread via GitHub


zhuqi-lucas commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2128014148


##
datafusion/physical-plan/src/memory.rs:
##
@@ -139,13 +140,18 @@ pub trait LazyBatchGenerator: Send + Sync + fmt::Debug + 
fmt::Display {
 ///
 /// This plan generates output batches lazily, it doesn't have to buffer all 
batches
 /// in memory up front (compared to `MemorySourceConfig`), thus consuming 
constant memory.
+/// We now add a `cooperative` flag to
+/// let it optionally yield back to the runtime periodically.
+/// Default is `true`, meaning it will yield back to the runtime for 
cooperative scheduling.
 pub struct LazyMemoryExec {
 /// Schema representing the data
 schema: SchemaRef,
 /// Functions to generate batches for each partition
 batch_generators: Vec>>,
 /// Plan properties cache storing equivalence properties, partitioning, 
and execution mode
 cache: PlanProperties,
+/// Indicates whether to enable cooperative yielding mode.
+cooperative: bool,

Review Comment:
   Also enable LazyMemoryExec as a built-in, because it's a normal case, for 
example, the following cases will hit this:
   
   ```rust
   SET datafusion.execution.target_partitions = 1;
   
   SELECT SUM(value) FROM range(1, 500);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-04 Thread via GitHub


zhuqi-lucas commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2942862579

   > > I am onboard with the approach in this PR, and it looks good to me 
overall. Just needs some finishing touches:
   > > 
   > > * To avoid such a large diff (which is mostly plans in tests), I suggest 
gating this behind a configuration flag, which would default to `false` for 
now. We will turn it on in subsequent PRs. We just need to add a few tests with 
this flag turned on to get it exercised in CI.
   > > * In the next PR (or this PR if you want), before we turn the flag on, 
we can add an API to ask whether a leaf supports built-in yielding. If it does, 
the rule wouldn't add `YieldStreamExec` at all to that leaf. If we add built-in 
yielding support to a few common leaf nodes, and turn the configuration flag on 
after doing so, plan changes will be minimal.
   > > * In another subsequent PR, we can fix `InterleaveExec` (@berkaysynnada 
and I started discussing how one can approach this problem).
   > > * I suggest making the number of batches to yield not a constant but a 
parameter of `YieldStreamExec`. Also, when we display it, showing its child is 
not informative (we see that from the print-out already). Displaying yielding 
frequency is probably a better idea.
   > > 
   > > Thanks @zhuqi-lucas for the great work. Love it.
   > 
   > Thank you @ozankabak , i have done in the latest PR:
   > 
   > 1. Gating this behind a configuration flag, which would default to `false` 
for now.
   > 2. Only enable it for a small testing case and the end to end testing.
   > 3. Making the number of batches to yield not a constant but a parameter of 
`YieldStreamExec`, also show the frequency for the display.
   > 
   > Not done until now:
   > 
   > 1. Add an API to ask whether a leaf supports built-in yielding. If it 
does, the rule wouldn't add `YieldStreamExec` at all to that leaf. If we add 
built-in yielding support to a few common leaf nodes, and turn the 
configuration flag on after doing so, plan changes will be minimal.
   > 2. Fix `InterleaveExec` related corner cases.
   > 
   > What do you mean built-in yielding? If it means we add some leaf nodes to 
support built-in yielding support? Thanks!
   > 
   > If it means, we manually wrap some YieldStream to those execs? And after 
that, we optimize it again, so we need to exclude those operators?
   
   Updated :
   
   I have done all the remaining subtasks:
   1. Add an API to ask whether a leaf supports built-in yielding. If it does, 
the rule wouldn't add `YieldStreamExec` at all to that leaf. If we add built-in 
yielding support to a few common leaf nodes, and turn the configuration flag on 
after doing so, plan changes will be minimal.
   2. And setting DataSourceExec/LazyMemoryExec as a built-in YieldStream 
because they are the most common cases.
   3. Enabled the flag to true now, only small number slt shows we need to add 
YieldStreamExec, it means DataSourceExec/LazyMemoryExec built-in will cover 
almost all cases.
   
   Remaining things:
   1. Add InterleaveExec, etc corner cases reproduce testing.
   2. Make all the corner cases passing the testing? It also can be done by 
changing the optimizer rule to match.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-04 Thread via GitHub


zhuqi-lucas commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2942579857

   > Oh boy, it's going to take some time to grok how that actually gets 
evaluated. It surprised me a bit that this gets planned as one linear chain. Is 
there an explanation somewhere about how that works? Or what would be a good 
way to try to understand?
   > 
   > Regardless of that we it's useful to benchmark. @zhuqi-lucas are you 
familiar with setting up benchmarks in the project? It's all new to me so it 
might take me some time to figure out.
   
   @pepijnve Here is the benchmark readme for datafusion:
   
   https://github.com/apache/datafusion/tree/main/benchmarks
   
   
   And we may start from clickbench. But if we want to mock custom complex sql, 
we may need to look into the table field from those benchmark data.
   
   For example, the tpch data generate:
   
   ```rust
   Create / download a specific dataset (TPCH)
   
   ./bench.sh data tpch
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-04 Thread via GitHub


zhuqi-lucas commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2942575262

   > > What do you mean built-in yielding? If it means we add some leaf nodes 
to support built-in yielding support? Thanks!
   > 
   > Here is rough sketch of what I meant, let's discuss and optimize.
   > 
   > 1. We add built-in yielding capabilities to `DataSourceExec` via a 
constructor argument; i.e. it yields internally every now and then if created 
with that constructor argument.
   > 2. We add two `ExecutionPlan` APIs:
   > 
   > * `yields_cooperatively` (we can choose the name later, this is a 
placeholder), which returns a boolean value. The default implementation simply 
checks `EmissionType` of the operator and returns accordingly.
   > * `with_cooperative_yields`, which _optionally_ returns a 
cooperatively-yielding version of the operator if it exists. The default 
implementation checks `yields_cooperatively` and returns self if true, and 
`None` otherwise.
   > 
   > 3. We override `with_cooperative_yields` for `DataSourceExec` and return 
the yielding version.
   > 4. If the rule decides that it needs a leaf to yield, it checks whether a 
yielding variant exists, and replaces it with a yielding variant if it does. If 
it doesn't, it adds a `YieldStreamExec`.
   > 
   > After our investigations, if it turns out that `InterleaveExec` or some 
other operator can always misbehave, we can change the return value of 
`yields_cooperatively` to signal that and tweak the rule to add intermediate 
`YieldStreamExec` nodes too.
   > 
   > @zhuqi-lucas, I may be missing some detail, but does it make sense from a 
general point of view?
   
   Good example, thank you @ozankabak !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-04 Thread via GitHub


pepijnve commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2941721735

   Oh boy, it's going to take some time to grok how that actually gets 
evaluated. It surprised me a bit that this gets planned as one linear chain. Is 
there an explanation somewhere about how that works? Or what would be a good 
way to try to understand?
   
   Regardless of that we it's useful to benchmark. @zhuqi-lucas are you 
familiar with setting up benchmarks in the project? It's all new to me so it 
might take me some time to figure out.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-04 Thread via GitHub


ozankabak commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2941415843

   I think queries of the following form (not this particular one, as it works 
on a small dataset, but of similar form) could be interesting test vehicles for 
you:
   
   
https://github.com/apache/datafusion/blob/bf7859e5d9dbdc260674f5333a5cafa9c6e7bc12/datafusion/sqllogictest/test_files/window.slt#L3017C2-L3063C264


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-04 Thread via GitHub


pepijnve commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2940698205

   @ozankabak I patched the gh_compare script to run the benchmark suite 
locally and it's running now. I'll report back when it completes.
   Any examples of specific queries I could try beyond those?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-04 Thread via GitHub


ozankabak commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2940683168

   @pepijnve I would suggest testing with deep plans to see what happens. I am 
also curious to see the results.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-04 Thread via GitHub


ozankabak commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2940614830

   > What do you mean built-in yielding? If it means we add some leaf nodes to 
support built-in yielding support? Thanks!
   
   Here is rough sketch of what I meant, let's discuss and optimize.
   1. We add built-in yielding capabilities to `DataSourceExec` via a 
constructor argument; i.e. it yields internally every now and then if created 
with that constructor argument.
   2. We add two `ExecutionPlan` APIs:
 - `yields_cooperatively` (we can choose the name later, this is a 
placeholder), which returns a boolean value. The default implementation simply 
checks `EmissionType` of the operator and returns accordingly.
 - `with_cooperative_yields`, which *optionally* returns a 
cooperatively-yielding version of the operator if it exists. The default 
implementation checks `yields_cooperatively` and returns self if true, and 
`None` otherwise.
   3. We override `with_cooperative_yields` for `DataSourceExec` and return the 
yielding version.
   4. If the rule decides that it needs the leaf to yield, it checks if a 
yielding variant exists, and replaces it with a yielding variant. If it 
doesn't, it adds a `YieldStreamExec`.
   
   After our investigations, if it turns out that `InterleaveExec` or some 
other operator can always misbehave, we can change the return value of 
`yields_cooperatively` to signal that and tweak the rule to add intermediate 
`YieldStreamExec` nodes too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-04 Thread via GitHub


pepijnve commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2940587625

   Just FYI, I've completed (I think at least) the exercise of adapting the 
operators in https://github.com/pepijnve/datafusion/tree/cancel_safety. I 
wanted to finish that if only for the learning experience.
   
   Honestly, I don't think it's that dramatic a code change and intuitively at 
least I would expect overhead to be negligible. I would be interested to 
measure what the actual impact is on the benchmarks though and compare that to 
the variant here. @alamb is there any particular environment required to run 
that branch comparison script?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-04 Thread via GitHub


zhuqi-lucas commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2940467339

   > I am onboard with the approach in this PR, and it looks good to me 
overall. Just needs some finishing touches:
   > 
   > * To avoid such a large diff (which is mostly plans in tests), I suggest 
gating this behind a configuration flag, which would default to `false` for 
now. We will turn it on in subsequent PRs. We just need to add a few tests with 
this flag turned on to get it exercised in CI.
   > * In the next PR (or this PR if you want), before we turn the flag on, we 
can add an API to ask whether a leaf supports built-in yielding. If it does, 
the rule wouldn't add `YieldStreamExec` at all to that leaf. If we add built-in 
yielding support to a few common leaf nodes, and turn the configuration flag on 
after doing so, plan changes will be minimal.
   > * In another subsequent PR, we can fix `InterleaveExec` (@berkaysynnada 
and I started discussing how one can approach this problem).
   > * I suggest making the number of batches to yield not a constant but a 
parameter of `YieldStreamExec`. Also, when we display it, showing its child is 
not informative (we see that from the print-out already). Displaying yielding 
frequency is probably a better idea.
   > 
   > Thanks @zhuqi-lucas for the great work. Love it.
   
   Thank you @ozankabak , i have done in the latest PR:
   
   1. Gating this behind a configuration flag, which would default to `false` 
for now. 
   2. Only enable it for a small testing case and the end to end testing.
   3. Making the number of batches to yield not a constant but a parameter of 
`YieldStreamExec`, also show the frequency for the display.
   
   Not done until now:
   1. Add an API to ask whether a leaf supports built-in yielding. If it does, 
the rule wouldn't add `YieldStreamExec` at all to that leaf. If we add built-in 
yielding support to a few common leaf nodes, and turn the configuration flag on 
after doing so, plan changes will be minimal.
   2. Fix `InterleaveExec` related corner cases.
   
   
   What do you mean built-in yielding? If it means we add some leaf nodes to 
support built-in yielding support? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-04 Thread via GitHub


ozankabak commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2940167521

   > This assumption turns out to be quite easy to break as in the interleave 
example.
   
   I would say it is not *that* easy to break, took you some effort to contrive 
an example. Besides, this happens due to a particular behavior by the current 
implementation of `InterleaveExec`, which (I suspect) we can fix.
   
   Let's go through the following exercise together: In the most general case, 
let's assume that I'm wrong and there is a certain class of operators that is 
always prone to "eating" `Pending` values due to the nature of the actual 
operation they perform. If we find this to be the case, we have discovered an 
important piece of information we should expose via the `ExecutionPlan` APIs. 
We would then do that, and simply update the rule here to also insert a 
`YieldExec` right after such operators.
   
   This way, we would retain the ability to insert minimum necessary overhead 
to any given plan to have cancellability.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-04 Thread via GitHub


zhuqi-lucas commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2940060922

   Thank you folks, i will forward this solution and do next clean up and 
optimize.
   
   For the interleave and SortPreservingMergeExec, etc corner cases, i agree 
that we can add follow-up for it.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]

2025-06-04 Thread via GitHub


pepijnve commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2940030602

   > The rule only inserts YieldExecs if the plan involves pipeline-breaking, 
and only once at leaves.
   
   The assumption this design makes is that a `Pending` result produced at a 
leaf is guaranteed to get passed up the call chain all the way to the root of 
the plan. I've been trying to find ways to do so, but besides panicking I don't 
think there's any way to long jump back to orchestrate a forced return of 
control to the tokio executor directly. This assumption turns out to be quite 
easy to break as in the interleave example.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



  1   2   >