Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
alamb commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2970160627 > Look, I see that you are trying to help and we do want to take it. I suspect we might be facing a "culture" challenge here: Typically, DF community attacks large problems by solving them bit by bit and refining a solution iteratively. This is unlike some other projects which front-load the effort by going through a more comprehensive design process. We also do that for some tasks where this iterative approach is not applicable, but it is not very common. BTW I liked this description so much I ported it to a proposed addition in the docs: - https://github.com/apache/datafusion/pull/16397 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2957658585 > @zhuqi-lucas, talking about TODO items, in addition to the 4 things I noted in [my comment above](https://github.com/apache/datafusion/pull/16196#issuecomment-2955853539), I suggest the following: As we study interleave-related cases in more detail, I think we should add a test case with an interleave in a plan that doesn't have `RepartitionExec`. This can happen when data comes in already partitioned. I think in such cases we can still trigger non-cancellability with `InterleaveExec`'s current code/logic. Sure, thank you @ozankabak , let me create a follow-up ticket to track this! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2957652941 > So maybe we need a follow on PR to fix the cancel test from @pepijnve Thank you @alamb. @pepijnve can you add a testing case that this PR will not succeed? I remember i was adding all cases to test cases, if i was missing any testing cases, feel free to add a reproducer, i will continue fix it, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2957688586 > @ozankabak indeed, that was what my original test was simulating. The coalesce batches and repartition end up erasing the scenario I was trying to demonstrate. I fully agree that the test setup is contrived, but the point of the test was not to demonstrate that there was a problem with interleave. It was intended to demonstrate brittleness in the approach because certain legitimate combinations of execution plans still end up not being cancelable in practice. > > Hacky patch wrt current main that gets you back to the failing situation https://gist.github.com/pepijnve/0e1a66f98033c6c44c62a51fb9dbae5a Sorry, just saw this testing case, thank you @pepijnve ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2957679295 FYI Thank you @ozankabak @alamb @pepijnve , i created a EPIC ticket now, currently i added 5 sub-tasks, feel free to add more tasks, we can iterator all possible cases and make it perfect. Thanks! https://github.com/apache/datafusion/issues/16353 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
pepijnve commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2957302218 > Reflecting all this onto the task at hand, this PR (1) solves many cases already and (2) introduces some machinery that will be useful as we iterate on the full solution. I don't think it is brittle, I think it is imperfect and requires refinement. We're not going to see eye to eye on this one and that's ok; people don't always have to agree. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
ozankabak commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2956513724 Great, thanks for the patch. We should use it as one of the new test cases in the follow-on PRs. Look, I see that you are trying to help and we do want to take it. I suspect we might be facing a "culture" challenge here: Typically, DF community attacks large problems by solving them bit by bit and refining a solution iteratively. This is unlike some other projects which front-load the effort by going through a more comprehensive design process. We also do that for some tasks where this iterative approach is not applicable, but it is not very common. This "bit by bit approach" doesn't always succeed, every now and then it happens that we get stuck or go down the wrong path for a while, and then change tacks. However, we still typically prefer to "advance the front" and make progress in tangible ways as much as we can (if we see a way). This necessarily results in imperfect solutions being the "state of the code" in some cases, and they survive in the codebase for a while, but we are good at driving things to completion in the long run. Reflecting all this onto the task at hand, this PR (1) solves many cases already and (2) introduces some machinery that will be useful as we iterate on the full solution. I don't think it is brittle, I think it is imperfect and requires refinement. I am optimistic that we will eventually converge on a good approach that requires minimal operator cooperation (but we won't be able to reduce that to a strict zero) and is close to being optimal in terms of yielding overhead. Where we are at is not where we will ultimately be, this is just a step in a long process. I hope that helps. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
pepijnve commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2956355078 @ozankabak indeed, that was what my original test was simulating. The coalesce batches and repartition end up erasing the scenario I was trying to demonstrate. I fully agree that the test setup is contrived, but the point of the test was not to demonstrate that there was a problem with interleave. It was intended to demonstrate brittleness in the approach because certain legitimate combinations of execution plans still end up not being cancelable in practice. Hacky patch wrt current main that gets you back to the failing situation https://gist.github.com/pepijnve/0e1a66f98033c6c44c62a51fb9dbae5a -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
ozankabak commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2956234465 @zhuqi-lucas, talking about TODO items, in addition to the 4 things I noted in [my comment above](https://github.com/apache/datafusion/pull/16196#issuecomment-2955853539), I suggest the following: As we study interleave-related cases in more detail, I think we should add a test case with an interleave in a plan that doesn't have `RepartitionExec`. This can happen when data comes in already partitioned. I think in such cases we can still trigger non-cancellability with `InterleaveExec`'s current code/logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
pepijnve commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2956129436 > However, I do like a bias of action, and if this PR fixes a real problem, I don't think we should bikeshed it indefinitely @alamb sorry if it came across as bike shedding; I felt there were legitimate concerns with the design that were being ignored. I wrote up all my notes in #16301. > I was thinking of YieldStream as such a combinator š¤ It's a close approximation of what you need, but not as precise as you would want. Other PR explains why so I won't repeat it again here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
ozankabak commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2955938360 Is that the interleave test? Sorry the link was not clear -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
ozankabak commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2955983864 BTW if you wanted to check whether this PR covers interleave-related cases (which is the test case analyzed in that repo), this PR has two tests for it (`test_infinite_interleave_cancel` and `test_infinite_interleave_agg_cancel`). We can probably add even more in the future to further our understanding of the problem as we keep working on this, it is a good test case -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
ozankabak commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2955961098 His repo seems to be creating a plan manually and applying some old version of the rule (which is in that repo, not in DF proper). What are we trying to do here? Am I missing something? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
alamb commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2955945948 > Is that the interleave test? Sorry the link was not clear Sorry -- it was this reproducer https://github.com/pepijnve/datafusion_cancel_test I pointed it at a local checkout of datafusion after this PR was merged -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
alamb commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2955925839 So maybe we need a follow on PR to fix the cancel test from @pepijnve -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
alamb commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2955922212 So I just re-ran the reproducer from @pepijnve in https://github.com/apache/datafusion/pull/16196#issuecomment-2921143644 against main and this PR doesn't cancel it: ``` Running `target/debug/datafusion_test` Running query; will time out after 5 seconds InfiniteStream::poll_next 1 times InfiniteStream::poll_next 2 times InfiniteStream::poll_next 3 times InfiniteStream::poll_next 4 times InfiniteStream::poll_next 5 times InfiniteStream::poll_next 6 times InfiniteStream::poll_next 7 times InfiniteStream::poll_next 8 times InfiniteStream::poll_next 9 times InfiniteStream::poll_next 10 times InfiniteStream::poll_next 11 times InfiniteStream::poll_next 12 times InfiniteStream::poll_next 13 times InfiniteStream::poll_next 14 times InfiniteStream::poll_next 15 times InfiniteStream::poll_next 16 times InfiniteStream::poll_next 17 times InfiniteStream::poll_next 18 times InfiniteStream::poll_next 19 times InfiniteStream::poll_next 20 times InfiniteStream::poll_next 21 times InfiniteStream::poll_next 22 times InfiniteStream::poll_next 23 times InfiniteStream::poll_next 24 times InfiniteStream::poll_next 25 times InfiniteStream::poll_next 26 times InfiniteStream::poll_next 27 times InfiniteStream::poll_next 28 times InfiniteStream::poll_next 29 times InfiniteStream::poll_next 30 times InfiniteStream::poll_next 31 times InfiniteStream::poll_next 32 times InfiniteStream::poll_next 33 times ... ``` I did check that aggregating values using range does work, so that is good: ``` DataFusion CLI v48.0.0 > SELECT SUM(value) FROM range(1, 500); ^C^C ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
ozankabak commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2955853539 Thanks @alamb for the final review. @zhuqi-lucas, I think it would be great to create some tickets to track: 1. Improving `InsertYieldExec` rule by means of an API that exposes input *and* output pipelining behaviors of operators effectively 2. Investigating whether any already-existing manual yielding (for example, like the one in `RepartitionExec`) can now be removed Also feel free to open any other issues if I'm missing something here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
ozankabak merged PR #16196: URL: https://github.com/apache/datafusion/pull/16196 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
alamb commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2135631720
##
datafusion/common/src/config.rs:
##
@@ -722,6 +722,15 @@ config_namespace! {
/// then the output will be coerced to a non-view.
/// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to
`LargeBinary`.
pub expand_views_at_output: bool, default = false
+
+/// When DataFusion detects that a plan might not be promply
cancellable
+/// due to the presence of tight-looping operators, it will attempt to
+/// mitigate this by inserting explicit yielding (in as few places as
+/// possible to avoid performance degradation). This value represents
the
+/// yielding period (in batches) at such explicit yielding points. The
+/// default value is 64. If set to 0, no DataFusion will not perform
Review Comment:
I like that this has a "escape valve" too -- if this mechanism isn't working
we can disable the new yields via config
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
alamb commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2135629318
##
datafusion/sqllogictest/test_files/explain.slt:
##
@@ -242,6 +242,7 @@ physical_plan after OutputRequirements DataSourceExec:
file_groups={1 group: [[W
physical_plan after LimitAggregation SAME TEXT AS ABOVE
Review Comment:
Maybe we can add some comments to this effect / create a plan that shows the
YieldStream being inserted to make it clearer (maybe as a follow on PR)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2135460181
##
datafusion/core/tests/execution/infinite_cancel.rs:
##
@@ -0,0 +1,856 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::error::Error;
+use std::fmt::Formatter;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::array::{Array, Int64Array, RecordBatch};
+use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
+use arrow_schema::SortOptions;
+use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream,
TaskContext};
+use datafusion::functions_aggregate::sum;
+use datafusion::physical_expr::aggregate::AggregateExprBuilder;
+use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
+use datafusion::physical_plan::aggregates::{
+AggregateExec, AggregateMode, PhysicalGroupBy,
+};
+use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
+use datafusion::physical_plan::{
+DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
+};
+use datafusion::prelude::SessionContext;
+use datafusion::{common, physical_plan};
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::{JoinType, ScalarValue};
+use datafusion_expr_common::operator::Operator::Gt;
+use datafusion_physical_expr::expressions::{col, BinaryExpr, Column, Literal};
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
+use datafusion_physical_optimizer::wrap_leaves_cancellation::WrapLeaves;
+use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
+use datafusion_physical_plan::filter::FilterExec;
+use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode,
SortMergeJoinExec};
+use datafusion_physical_plan::projection::ProjectionExec;
+use datafusion_physical_plan::repartition::RepartitionExec;
+use datafusion_physical_plan::sorts::sort::SortExec;
+use datafusion_physical_plan::union::InterleaveExec;
+
+use futures::{Stream, StreamExt};
+use rstest::rstest;
+use tokio::select;
+
+struct InfiniteStream {
+batch: RecordBatch,
+poll_count: usize,
+}
+
+impl RecordBatchStream for InfiniteStream {
+fn schema(&self) -> SchemaRef {
+self.batch.schema()
+}
+}
+
+impl Stream for InfiniteStream {
+type Item = common::Result;
+
+fn poll_next(
+mut self: Pin<&mut Self>,
+_cx: &mut Context<'_>,
+) -> Poll> {
+self.poll_count += 1;
+Poll::Ready(Some(Ok(self.batch.clone(
+}
+}
+
+#[derive(Debug)]
+struct InfiniteExec {
Review Comment:
Thank you @alamb , good suggestion! Addressed in latest PR, it seems we
remove MemoryExec now, so i use LazyMemoryExec to address this comments. Thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2135459186
##
datafusion/physical-optimizer/src/optimizer.rs:
##
@@ -137,6 +138,7 @@ impl PhysicalOptimizer {
// are not present, the load of executors such as join or union
will be
// reduced by narrowing their input tables.
Arc::new(ProjectionPushdown::new()),
+Arc::new(WrapLeaves::new()),
Review Comment:
Addressed in latest PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2135356171
##
datafusion/sqllogictest/test_files/explain.slt:
##
@@ -242,6 +242,7 @@ physical_plan after OutputRequirements DataSourceExec:
file_groups={1 group: [[W
physical_plan after LimitAggregation SAME TEXT AS ABOVE
Review Comment:
Because, we added the built-in leaf nodes which will have built-in
YieldStream, so the rule will ignore those cases. And almost all cases should
have built-in YieldStream currently, so it will not add extra YieldStreamExec.
But if customer use custom leaf operator node, it will automatically add the
YieldStreamExec, this is expected.
##
datafusion/physical-optimizer/src/optimizer.rs:
##
@@ -137,6 +138,7 @@ impl PhysicalOptimizer {
// are not present, the load of executors such as join or union
will be
// reduced by narrowing their input tables.
Arc::new(ProjectionPushdown::new()),
+Arc::new(WrapLeaves::new()),
Review Comment:
Good suggestion @alamb , i will address this, thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2955153699 > Thank you @zhuqi-lucas and @ozankabak and @pepijnve -- I think this PR is really nicely commented and structured. It is easy to read and review. > > However, I am sorry but I am a bit confused by the implications of this PR now. > > From what I can tell, it doesn't insert YieldExec or add Yielding for AggregateExec, which is the operator we have real evidence doesn't yield. Instead it seems to add yielding for DataSource exec, which already will yield when reading Parquet from a remote store, for example š¤ Thank you @alamb for review, i will try to address the comments and answer the question. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
ozankabak commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2954860574 Great, we will incorporate your feedback, answer your questions and write a list of follow-on work to serve as a basis for the tickets. I will only merge after these are done so we have a clear path to make progress on next week and beyond. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
alamb commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2954400147 š¤: Benchmark completed Details ``` Comparing HEAD and issue_16193 Benchmark cancellation.json āāāā³āāā³āā³āāāā ā Queryā HEAD ā issue_16193 āChange ā ā”āāāāāāāāāāāā© ā QCancellati⦠ā 27.20 ms ā27.81 ms ā no change ā āāāā“āāā“āā“āāāā āā³āā ā Benchmark Summary ā ā ā”āāā© ā Total Time (HEAD) ā 27.20ms ā ā Total Time (issue_16193) ā 27.81ms ā ā Average Time (HEAD)ā 27.20ms ā ā Average Time (issue_16193) ā 27.81ms ā ā Queries Faster ā 0 ā ā Queries Slower ā 0 ā ā Queries with No Change ā 1 ā ā Queries with Failure ā 0 ā āā“āā ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
alamb commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2954399910 š¤ `./gh_compare_branch.sh` [Benchmark Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh) Running Linux aal-dev 6.11.0-1013-gcp #13~24.04.1-Ubuntu SMP Wed Apr 2 16:34:16 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux Comparing issue_16193 (56361a43d889b5003da9fe0b3c476f68f0d1f88e) to 1daa5ed5cc51546904d45e23cc148601d973942a [diff](https://github.com/apache/datafusion/compare/1daa5ed5cc51546904d45e23cc148601d973942a..56361a43d889b5003da9fe0b3c476f68f0d1f88e) Benchmarks: cancellation Results will be posted here when complete -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
alamb commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2954399882 š¤: Benchmark completed Details ``` Comparing HEAD and issue_16193 Benchmark clickbench_extended.json āāāā³āā³āā³āāā ā QueryāHEAD ā issue_16193 ā Change ā ā”āāāāāāāāāā© ā QQuery 0 ā 1774.52 ms ā 1915.09 ms ā 1.08x slower ā ā QQuery 1 ā 702.21 ms ā 689.79 ms āno change ā ā QQuery 2 ā 1431.65 ms ā 1393.68 ms āno change ā ā QQuery 3 ā 699.08 ms ā 674.34 ms āno change ā ā QQuery 4 ā 1423.03 ms ā 1483.81 ms āno change ā ā QQuery 5 ā 15706.11 ms ā 16249.84 ms āno change ā ā QQuery 6 ā 2019.19 ms ā 2017.67 ms āno change ā ā QQuery 7 ā 2010.38 ms ā 2123.45 ms ā 1.06x slower ā ā QQuery 8 ā 828.39 ms ā 858.33 ms āno change ā āāāā“āā“āā“āāā āā³ā ā Benchmark Summary āā ā”āā© ā Total Time (HEAD) ā 26594.57ms ā ā Total Time (issue_16193) ā 27405.98ms ā ā Average Time (HEAD)ā 2954.95ms ā ā Average Time (issue_16193) ā 3045.11ms ā ā Queries Faster ā 0 ā ā Queries Slower ā 2 ā ā Queries with No Change ā 7 ā ā Queries with Failure ā 0 ā āā“ā Benchmark clickbench_partitioned.json āāāā³āā³āā³āāāā ā QueryāHEAD ā issue_16193 āChange ā ā”āāāāāāāāāāā© ā QQuery 0 ā15.00 ms ā15.22 ms ā no change ā ā QQuery 1 ā32.82 ms ā33.10 ms ā no change ā ā QQuery 2 ā80.12 ms ā80.05 ms ā no change ā ā QQuery 3 ā97.94 ms ā98.43 ms ā no change ā ā QQuery 4 ā 578.12 ms ā 585.68 ms ā no change ā ā QQuery 5 ā 820.56 ms ā 836.55 ms ā no change ā ā QQuery 6 ā23.39 ms ā23.11 ms ā no change ā ā QQuery 7 ā37.30 ms ā36.90 ms ā no change ā ā QQuery 8 ā 891.45 ms ā 902.35 ms ā no change ā ā QQuery 9 ā 1166.10 ms ā 1193.38 ms ā no change ā ā QQuery 10ā 264.85 ms ā 263.41 ms ā no change ā ā QQuery 11ā 294.82 ms ā 293.40 ms ā no change ā ā QQuery 12ā 896.36 ms ā 900.32 ms ā no change ā ā QQuery 13ā 1199.00 ms ā 1333.54 ms ā 1.11x slower ā ā QQuery 14ā 828.80 ms ā 832.24 ms ā no change ā ā QQuery 15ā 818.26 ms ā 850.21 ms ā no change ā ā QQuery 16ā 1726.19 ms ā 1726.91 ms ā no change ā ā QQuery 17ā 1621.59 ms ā 1594.69 ms ā no change ā ā QQuery 18ā 3055.91 ms ā 3032.30 ms ā no change ā ā QQuery 19ā84.56 ms ā84.16 ms ā no change ā ā QQuery 20ā 1131.97 ms ā 1126.00 ms ā no change ā ā QQuery 21ā 1316.89 ms ā 1286.64 ms ā no change ā ā QQuery 22ā 2185.01 ms ā 2140.56 ms ā no change ā ā QQuery 23ā 8040.54 ms ā 7888.40 ms ā no change ā ā QQuery 24ā 465.48 ms ā 464.48 ms ā no change ā ā QQuery 25ā 393.23 ms ā 390.35 ms ā no change ā ā QQuery 26ā 533.73 ms ā 525.41 ms ā no change ā ā QQuery 27ā 1596.30 ms ā 1560.51 ms ā no change ā ā QQuery 28ā 13568.05 ms ā 12429.02 ms ā +1.09x faster ā ā QQuery 29ā 531.82 ms ā 517.36 ms ā no change ā ā QQuery 30ā 798.55 ms ā 799.26 ms ā no change ā ā QQuery 31ā 857.27 ms ā 884.07 ms ā no change ā ā QQuery 32ā 2623.40 ms ā 2664.08 ms ā no change ā ā QQuery 33ā 3354.76 ms ā 3339.54 ms ā no change ā ā QQuery 34ā 3338.46 ms ā 3400.82 ms ā no change ā ā QQuery 35ā 1275.73 ms ā 1308.99 ms ā no change ā ā QQuery 36ā 124.96 ms ā 125.55 ms ā no change ā ā QQuery 37ā54.57 ms ā57.31 ms ā 1.05x slower ā ā QQuery 38ā 126.65 ms ā 120.73 ms ā no change ā ā QQuery 39ā 193.09 ms ā 189.66 ms ā no change ā ā QQuery 40ā49.83 ms ā45.90 ms ā +1.09x faster ā ā QQuery 41ā43.40 ms ā45.26 ms ā no change ā ā QQuery 42ā38.46 ms ā38.37 ms ā no change ā āāāā“āā“āā“āāāā āā³ā ā Benchmark Summary āā ā”āā© ā Total Time (HEAD) ā 57175.29ms ā ā Total Time (issue_16193) ā 56064.2
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
alamb commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2954372429 š¤ `./gh_compare_branch.sh` [Benchmark Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh) Running Linux aal-dev 6.11.0-1013-gcp #13~24.04.1-Ubuntu SMP Wed Apr 2 16:34:16 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux Comparing issue_16193 (56361a43d889b5003da9fe0b3c476f68f0d1f88e) to 1daa5ed5cc51546904d45e23cc148601d973942a [diff](https://github.com/apache/datafusion/compare/1daa5ed5cc51546904d45e23cc148601d973942a..56361a43d889b5003da9fe0b3c476f68f0d1f88e) Benchmarks: tpch_mem clickbench_partitioned clickbench_extended Results will be posted here when complete -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
alamb commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2954329852 I am happy to wait for a bit more testing on this PR -- we have now about a month before the next release so there is no pressure from there. However, I do like a bias of action, and if this PR fixes a real problem, I don't think we should bikeshed it indefinitely > Unfortunately you do need to do this kind of thing at the operator implementation level. I do think there are implementation patterns here that could server as building blocks for operators. 'Build a stream async and then emit from it' for instance seems to be pretty common. Rather than having a bespoke implementation in each operator it would be useful to have a combinator that operators can use. Perhaps there's a similar zero cost solution to the 'drains input before first emit' pattern as well? I was thinking of `YieldStream` as such a combinator š¤ > @alamb FYI I plan to merge this soon. It is OK if you don't have the bandwidth to take a look, it is the first step towards the design we discussed before. @ozankabak -- what are the next steps? I may have lost track -- if this PR needs some follow on I think we should file tickets to explain what they are before merging it (I can help to file such tickets) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
alamb commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2134852899
##
datafusion/physical-optimizer/src/optimizer.rs:
##
@@ -137,6 +138,7 @@ impl PhysicalOptimizer {
// are not present, the load of executors such as join or union
will be
// reduced by narrowing their input tables.
Arc::new(ProjectionPushdown::new()),
+Arc::new(WrapLeaves::new()),
Review Comment:
Can we please call this pass something related to Cancel or Yield? Like
`InsertYieldExec` to make it clearer what it is doing?
##
datafusion/core/tests/execution/infinite_cancel.rs:
##
@@ -0,0 +1,856 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::error::Error;
+use std::fmt::Formatter;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::array::{Array, Int64Array, RecordBatch};
+use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
+use arrow_schema::SortOptions;
+use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream,
TaskContext};
+use datafusion::functions_aggregate::sum;
+use datafusion::physical_expr::aggregate::AggregateExprBuilder;
+use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
+use datafusion::physical_plan::aggregates::{
+AggregateExec, AggregateMode, PhysicalGroupBy,
+};
+use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
+use datafusion::physical_plan::{
+DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
+};
+use datafusion::prelude::SessionContext;
+use datafusion::{common, physical_plan};
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::{JoinType, ScalarValue};
+use datafusion_expr_common::operator::Operator::Gt;
+use datafusion_physical_expr::expressions::{col, BinaryExpr, Column, Literal};
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
+use datafusion_physical_optimizer::wrap_leaves_cancellation::WrapLeaves;
+use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
+use datafusion_physical_plan::filter::FilterExec;
+use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode,
SortMergeJoinExec};
+use datafusion_physical_plan::projection::ProjectionExec;
+use datafusion_physical_plan::repartition::RepartitionExec;
+use datafusion_physical_plan::sorts::sort::SortExec;
+use datafusion_physical_plan::union::InterleaveExec;
+
+use futures::{Stream, StreamExt};
+use rstest::rstest;
+use tokio::select;
+
+struct InfiniteStream {
+batch: RecordBatch,
+poll_count: usize,
+}
+
+impl RecordBatchStream for InfiniteStream {
+fn schema(&self) -> SchemaRef {
+self.batch.schema()
+}
+}
+
+impl Stream for InfiniteStream {
+type Item = common::Result;
+
+fn poll_next(
+mut self: Pin<&mut Self>,
+_cx: &mut Context<'_>,
+) -> Poll> {
+self.poll_count += 1;
+Poll::Ready(Some(Ok(self.batch.clone(
+}
+}
+
+#[derive(Debug)]
+struct InfiniteExec {
Review Comment:
Instead of a new exec, perhaps we could use `MemoryExec` (with like 1000
`batch.clone()` for example) to show it is configured correctly
##
datafusion/sqllogictest/test_files/explain.slt:
##
@@ -242,6 +242,7 @@ physical_plan after OutputRequirements DataSourceExec:
file_groups={1 group: [[W
physical_plan after LimitAggregation SAME TEXT AS ABOVE
Review Comment:
I am confused about why YieldStreamExec does not appear in more of the
explain `slt` plans
##
datafusion/physical-plan/src/yield_stream.rs:
##
@@ -0,0 +1,271 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
pepijnve commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2954218719 > As time permits, we can explore alternate, more universal strategies for cancellation > 100% agree with not merging this until we are in agreement I can't help but feel that this is needlessly being rushed. Committing to a new public API on an extension point before it's clearly proven feels like a bad idea to me. If it was purely an implementation detail it would be less of an issue. What's the hurry? The more I've been digging into the code over the past few days the clearer it is that getting yielding just right while avoiding wasteful work is something you need to be careful about. See for instance #16196. We started out with concerns that yielding needlessly would introduce performance overhead, but now this PR does so even when it's not necessary at all. Admittedly it's nowhere near as extreme as the above issue, but still waste is waste. Wouldn't it be prudent to give this some more time to mature and maybe see if there are better strategies? It's not a universal solution, but just as an example what I found in #16319 is that restructuring the operator code a little bit makes them behave much nicer from the caller's perspective. Unfortunately you do need to do this kind of thing at the operator implementation level. I do think there are implementation patterns here that could server as building blocks for operators. 'Build a stream async and then emit from it' for instance seems to be pretty common. Rather than having a bespoke implementation in each operator it would be useful to have a combinator that operators can use. Perhaps there's a similar zero cost solution to the 'drains input before first emit' pattern as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
ozankabak commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2954144239 @alamb FYI I plan to merge this soon. It is OK if you don't have the bandwidth to take a look, it is the first step towards the design we discussed before. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2952375579 > @zhuqi-lucas, I wanted to make a few final finishing touches as we gave a chance in case @alamb wants to take a final look. I changed the config terminology from "frequency" to "period" because the former was kind of a misnomer. I also did some refactoring to remove some code repeatitions. Can you please double check to make sure all looks good on your end? Thanks. Thank you @ozankabak , the final refractor and name change looks good to me. Yeah , let's wait @alamb to get the final look. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2952376120 > > I will investigate that if we can remove some internal yield logic, such as repartition? etc > > Good idea, I'm curious to see if you can. `RepartitionExec` is a little bit of an outlier because it also breaks the volcano flow by actively draining its input pushing it into channels. As you experiment with it, let's make sure to test cases with nested repartition operators so that we don't miss any corner cases. Thank you, it makes sense, i will experiment it as a follow-up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
ozankabak commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2952147349 @zhuqi-lucas, I wanted to make a few final finishing touches as we gave a chance in case @alamb wants to take a final look. I changed the config terminology from "frequency" to "period" because the former was kind of a misnomer. I also did some refactoring to remove some code repeatitions. Can you please double check to make sure all looks good on your end? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
ozankabak commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2952017708 I merged the latest from main, this is good to go -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
ozankabak commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2951967426 > I will investigate that if we can remove some internal yield logic, such as repartition? etc Good idea, I'm curious to see if you can. `RepartitionExec` is a little bit of an outlier because it breaks the volcano flow by actively draining its input pushing it into channels. As you experiment with it, let's make sure to test cases with nested repartition operators so that we don't miss any corner cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2951559003 > I found some time to work on this tonight and it looks good to me now. > > To summarize where we are: > > * We add yields to all leaf nodes, but no yields to any intermediate node. > * We added a bunch of tests to cover some corner cases and all of them pass. > * There is a single new `with_cooperative_yields` API, which returns a cooperatively yielding version of a plan object (if it exists). If it doesn't exist for a leaf node, we add an auxiliary operator to handle yielding. > > Future work: > > * We will study input-side pipelining behaviors and improve the pipelining API, so that we only trigger explicit yielding when it is necessary. Given the small number of leaf nodes, we are not that far off from optimality even as is, which is great. We have some ideas on what to try here, but the current state seems quite good -- so we can merge it to fix downstream issues as we make further progress. > * We will think about supporting cases involving non-volcano (i.e. spill) data flow. > > @zhuqi-lucas and @alamb, PTAL Thank you , i agree that we are in good state, because this PR can help both datafusion operator and also custom defined operator automatically. I will also help investigate following case, may be as a follow-up ticket, thanks! > We will think about supporting cases involving non-volcano (i.e. spill) data flow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
ozankabak commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2132866332
##
datafusion/datasource/src/source.rs:
##
@@ -179,12 +180,17 @@ pub trait DataSource: Send + Sync + Debug {
/// the [`FileSource`] trait.
///
/// [`FileSource`]: crate::file::FileSource
+/// We now add a `cooperative` flag to
+/// let it optionally yield back to the runtime periodically.
+/// Default is `true`, meaning it will yield back to the runtime for
cooperative scheduling.
#[derive(Clone, Debug)]
pub struct DataSourceExec {
/// The source of the data -- for example, `FileScanConfig` or
`MemorySourceConfig`
data_source: Arc,
/// Cached plan properties such as sort order
cache: PlanProperties,
+/// Indicates whether to enable cooperative yielding mode.
+cooperative: bool,
Review Comment:
Agreed, this is good for now
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
pepijnve commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2950129956 One performance aspect I've been looking at is the cost of yielding. There's no magic as far as I can tell. Returning a Pending simply leads to a full unwind of the call stack by virtue of the return bubbling up all the way up to the tokio executor and then a full descent back to the point where you left off using function calls. That would suggest it's most interesting to do a cooperative yield from as shallow a point as possible in the call tree rather than from the deepest possible point so that you keep the roundtrip to the executor and back as short as possible. Running with target_partitions = 1, shows that for queries like the deeply nested window/sort query you linked to @ozankabak the call stack can get pretty deep. It's essentially proportionate to the depth of the plan tree. To mitigate this, would it make sense for pipeline breaking operators to run their pipeline breaking portion in a SpawnedTask instead of as child? I'm thinking of the sort phase of sort, the build phase of join, etc. Regardless of how where you inject Pending that seems beneficial to keep the call stack that needs to be unwound shallow. Note that this same argument does suggest it could more interesting to do the cooperative yield where the looping is happening rather than where the data is produced. The loop is the shallowest point, definitely if you spawn a task since that gets you a new root, while the producer is the deepest point. Cutting the call stack using spawned tasks may also mitigate the deeply nested query concern regarding checking for yielding at multiple levels. The yield is never going to go beyond the scope of a single task. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
ozankabak commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2949658301 Great! š -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2949615347 Good result, removed all ignore, corner cases all passed: ```rust running 18 tests test execution::infinite_cancel::test_infinite_join_agg_cancel::pretend_finite_1_false ... ok test execution::infinite_cancel::test_infinite_agg_cancel::pretend_finite_1_false ... ok test execution::infinite_cancel::test_infinite_agg_cancel::pretend_finite_2_true ... ok test execution::infinite_cancel::test_infinite_join_cancel::pretend_finite_1_false ... ok test execution::infinite_cancel::test_filter_reject_all_batches_cancel::pretend_finite_2_true ... ok test execution::infinite_cancel::test_filter_reject_all_batches_cancel::pretend_finite_1_false ... ok test execution::infinite_cancel::test_infinite_join_cancel::pretend_finite_2_true ... ok test execution::infinite_cancel::test_infinite_join_agg_cancel::pretend_finite_2_true ... ok test execution::infinite_cancel::test_infinite_hash_join_without_repartition_and_no_agg::pretend_finite_1_false ... ok test execution::infinite_cancel::test_infinite_hash_join_without_repartition_and_no_agg::pretend_finite_2_true ... ok test execution::infinite_cancel::test_infinite_interleave_agg_cancel::pretend_finite_2_true ... ok test execution::infinite_cancel::test_infinite_interleave_cancel::pretend_finite_2_true ... ok test execution::infinite_cancel::test_infinite_interleave_agg_cancel::pretend_finite_1_false ... ok test execution::infinite_cancel::test_infinite_interleave_cancel::pretend_finite_1_false ... ok test execution::infinite_cancel::test_infinite_sort_merge_join_without_repartition_and_no_agg::pretend_finite_1_false ... ok test execution::infinite_cancel::test_infinite_sort_merge_join_without_repartition_and_no_agg::pretend_finite_2_true ... ok test execution::infinite_cancel::test_infinite_sort_cancel::pretend_finite_1_false ... ok test execution::infinite_cancel::test_infinite_sort_cancel::pretend_finite_2_true ... ok ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2949490754 > @zhuqi-lucas it may make sense at this point to add built-in yielding to the remaining two sources so that you don't have to deal with the diffs I agree @ozankabak , good point. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
ozankabak commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2949486347 @zhuqi-lucas it may make sense at this point to add built-in yielding to the remaining two sources so that you don't have to deal with the diffs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
ozankabak commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2949371256 > I am wandering a easy solution(may be), we just remove final emissiontype check in the rule, and add yield based leaf nodes, it seems can solve all our problems: @zhuqi-lucas I think this is a good idea, let's see where we are at when we unconditionally do this at leaf nodes. This would be easy and quick to do. Then, we can think about the right API to use instead of emission type to arrive at the optimal solution as we collect more tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2949387541 > > I am wandering a easy solution(may be), we just remove final emissiontype check in the rule, and add yield based leaf nodes, it seems can solve all our problems: > > @zhuqi-lucas I think this is a good idea, let's see where we are at when we unconditionally do this at leaf nodes. This would be easy and quick to do. Then, we can think about the right API to use instead of emission type to arrive at the optimal solution as we collect more tests. Thank you @ozankabak , now i added all 4 reproduce cases which we add ignore for those cases. Let me change the rule to see if we can fix all the corner cases, if so, it's very good to see. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
pepijnve commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2949117788 > Tests pass even if add in the "pretending" (because the join code seems to yield naturally) The hash join test I have does fail so I dug into this. It's passing for you for two reasons: - There's an aggregation in that test, so the plan has emission type final and the yield wrapper gets injected - If you remove the aggregation there's still a RepartitionExec which itself has an ad hoc version of the cooperative yield logic in place at https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/repartition/mod.rs#L980. It also decouples producer and consumer via the distribution channels. If the consumer drains faster than the producer fills you'll get a natural pending that way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2949226146 > @zhuqi-lucas this sort-merge test might also be useful to integrate https://github.com/pepijnve/datafusion/blob/cancel_safety/datafusion/core/tests/execution/yielding.rs#L373 I can adapt this into a PR on your branch if you like. Tests case are no longer drop-in I'm afraid. I changed things to make use of an additional tokio runtime in order for the tests to not hang in case of failure. Try to add it also, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2949221841 > There are two versions of the join test, one with and and one without aggregation (`test_infinite_join_cancel`). So the reason it is passing is something else (maybe the presence of `RepartitionExec`). @zhuqi-lucas if you add a third version without repartitioning and it fails, we can use it as a useful test vehicle next week. Thanks. I see it now, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2949211366 Thank you @ozankabak @pepijnve I am wandering a easy solution(may be), we just remove final check in the rule, and add yield based leaf nodes, it seems can solve all our problems: 1. The corner will also be resolved since we have yield in leaf nodes. 2. The customers will not be exposed new YieldStreamExec when they don't add leaf node custom exec, because we will add those built-in yieldstream, it will cover all the cases. 3. When customers add custom exec, we also can handle this, it will automatically add YieldStreamExec, this is expected. 4. The performance will not be affected based our previous testing. Any idea? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
pepijnve commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2949146871 @zhuqi-lucas this sort-merge test might also be useful to integrate https://github.com/pepijnve/datafusion/blob/cancel_safety/datafusion/core/tests/execution/yielding.rs#L373 I can adapt this into a PR on your branch if you like. Tests case are no longer drop-in I'm afraid. I changed things to make use of an additional tokio runtime in order for the tests to not hang in case of failure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
ozankabak commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2949140262 There are two versions of the join test, one with and and one without aggregation (`test_infinite_join_cancel`). So the reason it is passing is something else (maybe the presence of `RepartitionExec`). @zhuqi-lucas if you add a third version without repartitioning and it fails, we can use it as a useful test vehicle next week. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2132112459
##
datafusion/physical-optimizer/src/wrap_leaves_cancellation.rs:
##
@@ -76,7 +77,8 @@ impl WrapLeaves {
plan: Arc,
yield_frequency: usize,
) -> Result>> {
-let is_pipeline_breaker = plan.properties().emission_type ==
EmissionType::Final;
+// todo this is a bit of a hack, we should probably have a more
explicit way to handle
+let is_pipeline_breaker = plan.properties().emission_type ==
EmissionType::Final|| plan.as_any().is::();
Review Comment:
This code was removed by me, and we need to find a better solution for the
filter and join corner cases.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2949124590 > @zhuqi-lucas, I took the liberty of parametrizing the tests and hardening them. We now have two failing (ignored) tests (the filter test, which is probably trivial to fix and more related to filter itself, and a pure interleave test without the aggregation). > > The test harness is now good for a first cut. I hope to see whether our approach bears fruit next week, or learn whether we face some fundamental issue. Thank you @ozankabak , it makes sense to me. And i will also think about the solution for the fail test case. > > Tests pass even if add in the "pretending" (because the join code seems to yield naturally) > > The hash join test I have does fail so I dug into this. It's passing for you for two reasons: > > * There's an aggregation in that test, so the plan has emission type final and the yield wrapper gets injected > * If you remove the aggregation there's still a RepartitionExec which itself has an ad hoc version of the cooperative yield logic in place at https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/repartition/mod.rs#L980. It also decouples producer and consumer via the distribution channels. If the consumer drains faster than the producer fills you'll get a natural pending that way. Thank you @pepijnve , i agree, i still didn't add the corner case for join, i think it will also happen similar to filter, but the solution may be similar. I will remove the aggregation for the join corner case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
ozankabak commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2949112371 @zhuqi-lucas, I took the liberty of parametrizing the tests and hardening them. We now have two failing tests (the filter test, which is probably trivial to fix and more related to filter itself, and a pure interleave test without the aggregation). The test harness is now good for a first cut. I hope to see whether our approach bears fruit next week, or learn whether we face some fundamental issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
ozankabak commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2948780089 @zhuqi-lucas, I didn't get a chance to take a deep look as I'm traveling, but a cursory look suggests the only open issue is with the filter test. Is that right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
pepijnve commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2131883855
##
datafusion/physical-optimizer/src/wrap_leaves_cancellation.rs:
##
@@ -76,7 +77,8 @@ impl WrapLeaves {
plan: Arc,
yield_frequency: usize,
) -> Result>> {
-let is_pipeline_breaker = plan.properties().emission_type ==
EmissionType::Final;
+// todo this is a bit of a hack, we should probably have a more
explicit way to handle
+let is_pipeline_breaker = plan.properties().emission_type ==
EmissionType::Final|| plan.as_any().is::();
Review Comment:
I was curious to see how this case could be handled. The consequence is that
in many situations the yield logic will be injected unnecessarily causing the
root stream to emit pending even though ready's are arriving at a steady pace.
This is a bit at odds with the desire to keep performance overhead minimal.
I don't think there's a way to solve this external to `FilterExecStream`
since you're trying to count the number of times it consecutively discarded
full batches, not the number of times it polled its input.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2948651054 > > most CPU/IO cost still comes from predicate evaluation or hashājoin builds, etc. The variability youāre seeing in PR 16262ās benchmarks is therefore probably just noise rather than a real performance regression. > > That is my expectation as well. I wanted to evaluate the query @ozankabak referred to with many nested pipeline blockers. It's kind of hard to assess the impact with a noisy measuring device. > > @zhuqi-lucas I would still like to pursue the alternative approach (basically an extend version of what you originally proposed), but I'm reaching the limits of my current Rust skills and would like to solicit some input from others. I don't want to do that here though to keep this PR focussed on the optimizer rule approach. Would you be offended if I made a secondary draft PR that I can point people to where I clearly state it's a potential alternative for this one but not intended to replace it? I would like to write up the various design tradeoffs I've been looking at, but this comment thread is not the right place. Not sure where else would be appropriate besides another PR. @pepijnve Of course, you can do another draft PR, if after discussion, the another direction is the better way, we also can go there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
pepijnve commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2948608948 > most CPU/IO cost still comes from predicate evaluation or hashājoin builds, etc. The variability youāre seeing in PR 16262ās benchmarks is therefore probably just noise rather than a real performance regression. That is my expectation as well. I wanted to evaluate the query @ozankabak referred to with many nested pipeline blockers. It's kind of hard to assess the impact with a noisy measuring device. @zhuqi-lucas I would still like to pursue the alternative approach (basically an extend version of what you originally proposed), but I'm reaching the limits of my current Rust skills and would like to solicit some input from others. I don't want to do that here though to keep this PR focussed on the optimizer rule approach. Would you be offended if I made a secondary draft PR that I can point people to where I clearly state it's a potential alternative for this one but not intended to replace it? I would like to write up the various design tradeoffs I've been looking at, but this comment thread is not the right place. Not sure where else would be appropriate besides another PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2948434379
Updated the reproducer case now in latest PR, and i commented it because our
rule or built-in Yield still not fix this.
```rust
#[tokio::test]
async fn test_filter_reject_all_batches_cancel() -> Result<(), Box> {
// 1) Create a Session, Schema, and an 8K-row RecordBatch
let session_ctx = SessionContext::new();
let schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
"value",
DataType::Int64,
false,
)]));
// Build a batch with values 0..8191
let mut builder = Int64Array::builder(8_192);
for v in 0..8_192 {
builder.append_value(v);
}
let batch = Arc::new(RecordBatch::try_new(
schema.clone(),
vec![Arc::new(builder.finish())],
)?);
// 2a) Wrap this batch in an InfiniteExec
let infinite = Arc::new(InfiniteExec::new(&batch));
// 2b) Construct a FilterExec that is always false: āvalue > 1ā (no
rows pass)
let false_predicate: Arc = Arc::new(BinaryExpr::new(
Arc::new(Column::new_with_schema("value", &schema)?),
Gt,
Arc::new(Literal::new(ScalarValue::Int64(Some(10_000,
));
let filtered: Arc =
Arc::new(FilterExec::try_new(false_predicate, infinite.clone())?);
// 2c) Use CoalesceBatchesExec to guarantee each Filter pull always
yields an 8192-row batch
let coalesced: Arc =
Arc::new(CoalesceBatchesExec::new(filtered.clone(), 8_192));
// 2d) Hash-repartition into 1 partition (so that a later global
aggregation would run on a single partition)
let exprs: Vec> =
vec![Arc::new(Column::new_with_schema("value", &schema)?)];
let part = Partitioning::Hash(exprs.clone(), 1);
let hashed: Arc =
Arc::new(RepartitionExec::try_new(coalesced.clone(), part.clone())?);
// 4) WrapLeaves to insert YieldExecāso that the InfiniteExec yields
control between batches
let config = ConfigOptions::new();
let optimized = WrapLeaves::new().optimize(hashed, &config)?;
// 5) Execute with a 1-second timeout. Because Filter discards all 8192
rows each time
//without ever producing output, no batch will arrive within 1
second. And since
//emission type is not Final, we never see an endāofāstream marker.
let mut stream = physical_plan::execute_stream(optimized,
session_ctx.task_ctx())?;
const TIMEOUT: u64 = 1;
let result = select! {
batch_opt = stream.next() => batch_opt,
_ = tokio::time::sleep(tokio::time::Duration::from_secs(TIMEOUT)) =>
{
None
}
};
assert!(
result.is_none(),
"Expected no output for infinite + filter(all-false) + aggregate,
but got a batch"
);
Ok(())
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2948429253 @pepijnve I donāt expect either placement of YieldExec to materially affect throughputāmost CPU/IO cost still comes from predicate evaluation or hashājoin builds. The variability youāre seeing in PR 16262ās benchmarks is therefore probably just noise (e.g. OS scheduling, JIT warmup) rather than a real performance regression. > @zhuqi-lucas @alamb I wanted to work on measuring the performance impact of this PR today, but looking at [#16262 (review)](https://github.com/apache/datafusion/pull/16262#pullrequestreview-2903139531) I'm a bit surprised to see similar variability being reported as I'm seeing in the benchmark results of my own experimental branch. For PR 16262 no changes were made to the production code but you still see performance deltas. This begs the question how the benchmarks can/should be used to evaluate code changes. How are you guys making use of these results? Should I be using something else for coarse grained performance impact assessment? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
pepijnve commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2948268910 @zhuqi-lucas @alamb I wanted to work on measuring the performance impact of this PR today, but looking at https://github.com/apache/datafusion/pull/16262#pullrequestreview-2903139531 I'm a bit surprised to see similar variability being reported as I'm seeing in the benchmark results of my own experimental branch. For PR 16262 no changes were made to the production code but you still see performance deltas. This begs the question how the benchmarks can/should be used to evaluate code changes. How are you guys making use of these results? Should I be using something else for coarse grained performance impact assessment? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
ozankabak commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2945161909 I will be traveling tomorrow, but myself and @berkaysynnada will help drive this to completion early next week. I made some progress on sketching out a good API and will circle back next week. In the meantime, let's collect all the test cases we can. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
pepijnve commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2945139509
Changing hats to DataFusion user mode where I need to make sure that the end
users of our system can press 'cancel' at any time and that works as expected.
From that perspective here's a possible useful test case (and maybe an
illustration of a more general problem): suppose you have a query like `select
sum(size) as sum from t group by name order by sum` that produces a large
number of distinct groups. The query plan for this today is:
```
SortPreservingMergeExec: [sum@0 ASC NULLS LAST]
SortExec: expr=[sum@0 ASC NULLS LAST], preserve_partitioning=[true]
ProjectionExec: expr=[sum(t.size)@1 as sum]
AggregateExec: mode=FinalPartitioned, gby=[name@0 as name],
aggr=[sum(t.size)]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([name@0], 10),
input_partitions=10
AggregateExec: mode=Partial, gby=[name@0 as name],
aggr=[sum(t.size)]
RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1
DataSourceExec: file_groups={1 group: [[]]},
projection=[name, size], file_type=...
```
If I'm reading the code correctly, once the `FinalPartitioned` aggregation
has drained the original input it may switch over to reading back spill files.
At that point the original input (and yield exec wrapping it) are taken out of
the picture. Unless I'm mistaken, once the query hits that phase it may not be
interruptible again unless some yield guarantee is injected again.
I don't have a good idea for how to write a practical test case for this
though. You would have to drive a sufficiently large query all the way to this
point to be able to observer the behavior.
I wonder if this illustrates that only analyzing the static picture of the
query at planning time is insufficient because it does not (and probably
cannot) take the dynamic behavior of the query into account. The actual tree of
streams and the points where you might need yield wrappers can change as the
query is executing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2944834501 @ozankabak @pepijnve Interesting, i have added interleave corner testing case **test_infinite_interleave_agg_cancel** now which try to reproduce the corner case, but it works very well for our current PR, i can't reproduce the corner cases. The case is very corner case which will have more than 30 infinite source, in theory it's easy to reproduce it, but it works well, so i am wandering we don't need to do anything for interleave until now? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
pepijnve commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2944505070 > I feel like we are getting close to a point where we start having not-so-fruitful discussions. I think I have made a good effort to make my arguments and reasoning clear. @ozankabak My apologies. I didn't mean to derail your efforts here and I'll refrain from adding any more noise to the thread (beyond this, sorry). I appreciate the fact that you guys have much much more experience working in this codebase. I'm really trying to make a good faith contribution here where we compare the pros/cons of both approaches via measurements (API impact, performance impact, etc.), but I'll back off. FWIW, I've added some more tests cases in the meantime that you guys can use or ignore however you see fit. I also have some benchmark results from a first run at https://gist.github.com/pepijnve/21fbd480ae3e60f780446ace974d3ef5. It's a very mixed bag. I'm going to run the suite again a couple of times to see if this is consistent or not before I dig deeper. During the runs I'm seeing so much variability in runtime on both branches that I have my doubts how meaningful these results are. Would it be useful to let the benchmark perform more runs and adapt the tool a bit to report on mean and standard deviation rather than just average? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2128845315
##
datafusion/common/src/config.rs:
##
@@ -722,6 +722,19 @@ config_namespace! {
/// then the output will be coerced to a non-view.
/// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to
`LargeBinary`.
pub expand_views_at_output: bool, default = false
+
+/// When true, the optimizer will insert a Yield operator at the leaf
nodes of any pipeline
+/// that contains a pipeline-breaking operator, allowing the Tokio
scheduler to switch to
+/// other tasks while waiting.
+/// Default: true (enabled).
+pub enable_add_yield_for_pipeline_break: bool, default = true
+
+/// Yield frequency in batches, it represents how many batches to
process before yielding
+/// to the Tokio scheduler. The default value is 64, which means that
after processing
+/// 64 batches, the execution will yield control back to the Tokio
scheduler.
+/// This setting is only effective when
`enable_add_yield_for_pipeline_break` is set to true.
+/// This value should be greater than 0.
+pub yield_frequency_for_pipeline_break: usize, default = 64
Review Comment:
Addressed in latest RP, thanks!
##
datafusion/physical-plan/src/execution_plan.rs:
##
@@ -75,7 +75,19 @@ use futures::stream::{StreamExt, TryStreamExt};
/// [`execute`]: ExecutionPlan::execute
/// [`required_input_distribution`]: ExecutionPlan::required_input_distribution
/// [`required_input_ordering`]: ExecutionPlan::required_input_ordering
-pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
+/// The core trait for a physical execution plan node. Every operator
+/// implements this trait. We have extended it by adding two new methods
+/// for ācooperative yieldingā support:
+///
+/// 1. `yields_cooperatively()` indicates whether this operator already
+///supports async/yield behavior internally (default: false).
+///
+/// 2. `with_cooperative_yields(self: Arc)` returns an alternate
+///plan node that has built-in yielding; if not available, returns None.
+///
+/// Because `with_cooperative_yields` moves `Arc` into `Arc`,
+/// we must ensure `Self: 'static`. Therefore, we add `+ 'static` here.
+pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync + 'static {
Review Comment:
Addressed in latest RP, thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
ozankabak commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2944338401 Great, let's add tests for all cases we currently fail to cover (interleave, join, filter). We will then use them as litmus tests as we iterate on the API and the rule. š -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2128845907
##
datafusion/physical-plan/src/execution_plan.rs:
##
@@ -75,7 +75,19 @@ use futures::stream::{StreamExt, TryStreamExt};
/// [`execute`]: ExecutionPlan::execute
/// [`required_input_distribution`]: ExecutionPlan::required_input_distribution
/// [`required_input_ordering`]: ExecutionPlan::required_input_ordering
-pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
+/// The core trait for a physical execution plan node. Every operator
+/// implements this trait. We have extended it by adding two new methods
+/// for ācooperative yieldingā support:
+///
+/// 1. `yields_cooperatively()` indicates whether this operator already
+///supports async/yield behavior internally (default: false).
+///
+/// 2. `with_cooperative_yields(self: Arc)` returns an alternate
+///plan node that has built-in yielding; if not available, returns None.
+///
+/// Because `with_cooperative_yields` moves `Arc` into `Arc`,
+/// we must ensure `Self: 'static`. Therefore, we add `+ 'static` here.
Review Comment:
Addressed in latest RP, thanks!
##
datafusion/physical-plan/src/memory.rs:
##
@@ -139,13 +140,18 @@ pub trait LazyBatchGenerator: Send + Sync + fmt::Debug +
fmt::Display {
///
/// This plan generates output batches lazily, it doesn't have to buffer all
batches
/// in memory up front (compared to `MemorySourceConfig`), thus consuming
constant memory.
+/// We now add a `cooperative` flag to
+/// let it optionally yield back to the runtime periodically.
+/// Default is `true`, meaning it will yield back to the runtime for
cooperative scheduling.
Review Comment:
Addressed in latest RP, thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on code in PR #16196: URL: https://github.com/apache/datafusion/pull/16196#discussion_r2128846243 ## datafusion/sqllogictest/test_files/group_by.slt: ## @@ -4113,7 +4113,7 @@ EXPLAIN SELECT lhs.c, rhs.c, lhs.sum1, rhs.sum1 logical_plan 01)Projection: lhs.c, rhs.c, lhs.sum1, rhs.sum1 -02)--Cross Join: +02)--Cross Join: Review Comment: Addressed in latest RP, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2128830214
##
datafusion/physical-plan/src/execution_plan.rs:
##
@@ -546,6 +558,23 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
child_pushdown_result,
))
}
+
+/// Whether this operator supports cooperative yielding. Default is false.
+fn yields_cooperatively(&self) -> bool {
+false
Review Comment:
Thank you @ozankabak for this good point.
But i tried to add emission_type related behaviour, but actually currently,
we only use emission_type final to choose the leaf nodes to add the yield
stream.
So this return false, it's the clear and easy way until now. If we change
our design later, such as we add the yield behaviour not for leaves, but before
the emission_type final node, we may can change this to related emission_type,
but it's also not a easy way, because we need to insert to before emission_type
final node not after the emission_type final node.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2128809976
##
datafusion/physical-plan/src/execution_plan.rs:
##
@@ -75,7 +75,19 @@ use futures::stream::{StreamExt, TryStreamExt};
/// [`execute`]: ExecutionPlan::execute
/// [`required_input_distribution`]: ExecutionPlan::required_input_distribution
/// [`required_input_ordering`]: ExecutionPlan::required_input_ordering
-pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
+/// The core trait for a physical execution plan node. Every operator
+/// implements this trait. We have extended it by adding two new methods
+/// for ācooperative yieldingā support:
+///
+/// 1. `yields_cooperatively()` indicates whether this operator already
+///supports async/yield behavior internally (default: false).
+///
+/// 2. `with_cooperative_yields(self: Arc)` returns an alternate
+///plan node that has built-in yielding; if not available, returns None.
+///
+/// Because `with_cooperative_yields` moves `Arc` into `Arc`,
+/// we must ensure `Self: 'static`. Therefore, we add `+ 'static` here.
Review Comment:
I agree, it's neat. Thank you!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2128809331
##
datafusion/datasource/src/source.rs:
##
@@ -179,12 +180,17 @@ pub trait DataSource: Send + Sync + Debug {
/// the [`FileSource`] trait.
///
/// [`FileSource`]: crate::file::FileSource
+/// We now add a `cooperative` flag to
+/// let it optionally yield back to the runtime periodically.
+/// Default is `true`, meaning it will yield back to the runtime for
cooperative scheduling.
#[derive(Clone, Debug)]
pub struct DataSourceExec {
/// The source of the data -- for example, `FileScanConfig` or
`MemorySourceConfig`
data_source: Arc,
/// Cached plan properties such as sort order
cache: PlanProperties,
+/// Indicates whether to enable cooperative yielding mode.
+cooperative: bool,
Review Comment:
Thank you @ozankabak for this question, here is my understanding.
I was trying to add this in data_source, but it seems not a good way because:
1. We need to add it in all data_source reference implement. Such as
FileScanConfig, MemorySourceConfig.
2. If we add new source based data_source, we need to add it again.
3. FileScanConfig also need different FileSource: ParquetSource, CsvSource,
etc. Which add complexity for this config to inject.
It's easy for us to do in high level, DataSourceExec is the good place for
my trying until now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
ozankabak commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2943759107 > @ozankabak I still don't think you need all this API work since there's a zero API change way to deal with cancellation already. Tests all pass with no API changes in the 'all Stream implementations must be well behaved Tokio citizens' approach. I understand the performance concern, but maybe it's a bit premature to design APIs before knowing what the actual performance impact is? In terms of code changes I don't think the complexity argument holds since the required code changes were fairly trivial. > I've found a dedicated machine to run the benchmarks on in the meantime. It's 10 year old hardware (Xeon E5620) so the compile times take forever, but should be good enough for relative comparisons. Will post results when I get them. @pepijnve, I respect your opinion but we will need to agree to disagree. After spending a lot of time (and writing a lot of upstream *and* downstream DF code) over the last few years for leveraging the async runtime and its challenges/advantages, issues related to pipeline-breaking, performance implications of these things, the APIs `ExecutionPlan` objects should provide, responsibilities of the planner vs. operators and others, my intuition tells me that: 1. There is some information the `ExecutionPlan` API doesn't expose yet about input pipelining behavior and propagation of pendings, and it should -- not just for this use case, but others too. 2. There is a way to solve this problem universally with optimally minimal overhead and it is not that hard to figure out. 3. This way will also help us reduce the responsibility of user-defined operators, and solve cancellability even without their strict cooperation. 4. There is a lot of downstream users who define user-defined operators, and any "win" (in the above sense) for such use cases is important for our project goals. 5. I suspect (and my confidence is somewhat lower on this point relative to others) we will always be able to construct some cases, however contrived, where an "everyone always yields" solution will suffer from performance problems. 6. My intuition could be wrong, and if this thesis (1-2-3) indeed turns out to be wrong, we can take the learnings and fall back to another solution, where your proposal would be a good candidate. I feel like we are getting close to a point where we start having not-so-fruitful discussions. I think I have made a good effort to make my arguments and reasoning clear. We will see where this effort goes (and I'm very hopeful that we will succeed), and if you want to help, we will always take it with appreciation. I think you can probably relate to the position that I would like to focus my thinking on getting 1-2-3 done (if possible) in the short-term, instead of repeatedly spending time on justifying what we are doing. If this route doesn't work, I will gladly help with finding another solution (and maybe that will your approach). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2128549342
##
datafusion/physical-plan/src/execution_plan.rs:
##
@@ -75,7 +75,19 @@ use futures::stream::{StreamExt, TryStreamExt};
/// [`execute`]: ExecutionPlan::execute
/// [`required_input_distribution`]: ExecutionPlan::required_input_distribution
/// [`required_input_ordering`]: ExecutionPlan::required_input_ordering
-pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
+/// The core trait for a physical execution plan node. Every operator
+/// implements this trait. We have extended it by adding two new methods
+/// for ācooperative yieldingā support:
+///
+/// 1. `yields_cooperatively()` indicates whether this operator already
+///supports async/yield behavior internally (default: false).
+///
+/// 2. `with_cooperative_yields(self: Arc)` returns an alternate
+///plan node that has built-in yielding; if not available, returns None.
+///
+/// Because `with_cooperative_yields` moves `Arc` into `Arc`,
+/// we must ensure `Self: 'static`. Therefore, we add `+ 'static` here.
+pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync + 'static {
Review Comment:
Yeah, i misunderstand the logic here, thanks @ozankabak .
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2128526165
##
datafusion/common/src/config.rs:
##
@@ -722,6 +722,19 @@ config_namespace! {
/// then the output will be coerced to a non-view.
/// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to
`LargeBinary`.
pub expand_views_at_output: bool, default = false
+
+/// When true, the optimizer will insert a Yield operator at the leaf
nodes of any pipeline
+/// that contains a pipeline-breaking operator, allowing the Tokio
scheduler to switch to
+/// other tasks while waiting.
+/// Default: true (enabled).
+pub enable_add_yield_for_pipeline_break: bool, default = true
+
+/// Yield frequency in batches, it represents how many batches to
process before yielding
+/// to the Tokio scheduler. The default value is 64, which means that
after processing
+/// 64 batches, the execution will yield control back to the Tokio
scheduler.
+/// This setting is only effective when
`enable_add_yield_for_pipeline_break` is set to true.
+/// This value should be greater than 0.
+pub yield_frequency_for_pipeline_break: usize, default = 64
Review Comment:
Great suggestion! @ozankabak
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on code in PR #16196: URL: https://github.com/apache/datafusion/pull/16196#discussion_r2128523292 ## datafusion/sqllogictest/test_files/joins.slt: ## @@ -4702,7 +4702,8 @@ physical_plan 01)CrossJoinExec 02)--DataSourceExec: partitions=1, partition_sizes=[0] 03)--ProjectionExec: expr=[1 as Int64(1)] -04)PlaceholderRowExec +04)YieldStreamExec frequency=64 +05)--PlaceholderRowExec Review Comment: Got it, let's focus the major design part first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2943654190 > @zhuqi-lucas regarding corner cases, I've added two additional tests on my branch for filter and join. > > Filter can refuse to cancel if the filter rejects many full batches. I've added a test case that filters out everything to emulate this. > > Joins can block in their build phase. I've emulated this situation using the infinite source that pretends to be bounded rather than unbounded. > > In both cases the emission type is not final for these plans. > Left an initial round of reviews, let's address them and do another round Thank you @ozankabak for review and great suggestions, i will try to address soon. > @zhuqi-lucas regarding corner cases, I've added two additional tests on my branch for filter and join. > > Filter can refuse to cancel if the filter rejects many full batches. I've added a test case that filters out everything to emulate this. > > Joins can block in their build phase. I've emulated this situation using the infinite source that pretends to be bounded rather than unbounded. > > In both cases the emission type is not final for these plans. Thank you @pepijnve , let me try to learn and add the complex testing cases, it's really helpful. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
pepijnve commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2943553646 @zhuqi-lucas FYI, I've reworked the 'infinite stream' to 'range stream' in my tests. It simply emits a `Range` now. I've added an additional evil test case for sort-merge join where I'm inner joining `[i64::MIN, 0]` with `[0, i64::MAX]` with no further statistics. Again to emulate the join blocking in its data collection phase. @ozankabak I still don't think you need all this API work since there's a zero API change way to deal with cancellation already. Tests all pass with no API changes in the 'all Stream implementations must be well behaved Tokio citizens' approach. I understand the performance concern, but maybe it's a bit premature to design APIs before knowing what the actual performance impact is? In terms of code changes I don't think the complexity argument holds since the required code changes were fairly trivial. I've found a dedicated machine to run the benchmarks on in the meantime. It's 10 year old hardware (Xeon E5620) so the compile times take forever, but should be good enough for relative comparisons. Will post results when I get them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
ozankabak commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2943327837
Hmm, I wonder if we design the `yields_cooperatively` API to return the
following `enum`:
```rust
pub enum InputPipelineBehavior {
Source(bool),
Intermediate(Vec),
}
```
where the operator returns:
- `Source(false)` when it is a source and doesn't cooperate,
- `Source(true)` when it is a source and cooperates,
- `Intermediate(Vec)` when it is an intermediate node with children.
Each boolean in the vector indicates whether the operator cooperates while it
consumes that child.
The default implementation would return `Source(false)` for leaf nodes, and
`Intermediate(vec![self.pipeline_behavior() != EmissionType::Final,
self.children().len())` for non-leaf nodes. Plans like joins can override this
depending on whether they are in `CollectLeft` mode etc.
Thoughts?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
ozankabak commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2128238867
##
datafusion/datasource/src/source.rs:
##
@@ -179,12 +180,17 @@ pub trait DataSource: Send + Sync + Debug {
/// the [`FileSource`] trait.
///
/// [`FileSource`]: crate::file::FileSource
+/// We now add a `cooperative` flag to
+/// let it optionally yield back to the runtime periodically.
+/// Default is `true`, meaning it will yield back to the runtime for
cooperative scheduling.
#[derive(Clone, Debug)]
pub struct DataSourceExec {
/// The source of the data -- for example, `FileScanConfig` or
`MemorySourceConfig`
data_source: Arc,
/// Cached plan properties such as sort order
cache: PlanProperties,
+/// Indicates whether to enable cooperative yielding mode.
+cooperative: bool,
Review Comment:
Should this flag should come from the `data_source` object? That is what
gives us the stream (which may or may not yield to the runtime).
##
datafusion/datasource/src/source.rs:
##
@@ -256,7 +262,39 @@ impl ExecutionPlan for DataSourceExec {
partition: usize,
context: Arc,
) -> Result {
-self.data_source.open(partition, context)
+// 1. Get the ābaseā stream exactly as before, without yielding.
+let stream = self.data_source.open(partition, Arc::clone(&context));
+
+// 2. If cooperative == false, return base_stream immediately.
+if !self.cooperative {
+return stream;
+}
+
+let frequency = context
+.session_config()
+.options()
+.optimizer
+.yield_frequency_for_pipeline_break;
+
+// 3. If cooperative == true, wrap the stream into a YieldStream.
+let yielding_stream = YieldStream::new(stream?, frequency);
+Ok(Box::pin(yielding_stream))
+}
+
+/// Override: this operator *does* support cooperative yielding when
`cooperative == true`.
+fn yields_cooperatively(&self) -> bool {
+self.cooperative
+}
+
+/// If `cooperative == true`, return `Some(self.clone())` so the optimizer
knows
+/// we can replace a plain DataSourceExec with this same node (it already
yields).
+/// Otherwise, return None.
+fn with_cooperative_yields(self: Arc) -> Option> {
+if self.cooperative {
+Some(self)
+} else {
+None
+}
Review Comment:
```suggestion
self.cooperative.then_some(self)
```
##
datafusion/sqllogictest/test_files/joins.slt:
##
@@ -4702,7 +4702,8 @@ physical_plan
01)CrossJoinExec
02)--DataSourceExec: partitions=1, partition_sizes=[0]
03)--ProjectionExec: expr=[1 as Int64(1)]
-04)PlaceholderRowExec
+04)YieldStreamExec frequency=64
+05)--PlaceholderRowExec
Review Comment:
Maybe we can, but let's first finalize the design and implementation for
others.
##
datafusion/physical-plan/src/memory.rs:
##
@@ -139,13 +140,18 @@ pub trait LazyBatchGenerator: Send + Sync + fmt::Debug +
fmt::Display {
///
/// This plan generates output batches lazily, it doesn't have to buffer all
batches
/// in memory up front (compared to `MemorySourceConfig`), thus consuming
constant memory.
+/// We now add a `cooperative` flag to
+/// let it optionally yield back to the runtime periodically.
+/// Default is `true`, meaning it will yield back to the runtime for
cooperative scheduling.
Review Comment:
Unnecessary
##
datafusion/core/tests/physical_optimizer/enforce_distribution.rs:
##
@@ -3534,6 +3534,7 @@ async fn test_distribute_sort_memtable() -> Result<()> {
let session_config = SessionConfig::new()
.with_repartition_file_min_size(1000)
.with_target_partitions(3);
+
Review Comment:
```suggestion
```
##
datafusion/common/src/config.rs:
##
@@ -722,6 +722,19 @@ config_namespace! {
/// then the output will be coerced to a non-view.
/// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to
`LargeBinary`.
pub expand_views_at_output: bool, default = false
+
+/// When true, the optimizer will insert a Yield operator at the leaf
nodes of any pipeline
+/// that contains a pipeline-breaking operator, allowing the Tokio
scheduler to switch to
+/// other tasks while waiting.
+/// Default: true (enabled).
+pub enable_add_yield_for_pipeline_break: bool, default = true
+
+/// Yield frequency in batches, it represents how many batches to
process before yielding
+/// to the Tokio scheduler. The default value is 64, which means that
after processing
+/// 64 batches, the execution will yield control back to the Tokio
scheduler.
+/// This setting is only effective when
`enable_add_yield_for_pipeline_break` is set to true.
+/// This value should be greater than 0.
+pub yiel
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
pepijnve commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2943221060 @zhuqi-lucas regarding corner cases, I've added two additional tests on my branch for filter and join. Filter can refuse to cancel if the filter rejects many full batches. I've added a test case that filters out everything to emulate this. Joins can block in their build phase. I've emulated this situation using the infinite source that pretends to be bounded rather than unbounded. In both cases the emission type is not final for these plans. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2128032920
##
datafusion/physical-optimizer/src/wrap_leaves_cancellation.rs:
##
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::PhysicalOptimizerRule;
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
+use datafusion_common::Result;
+use datafusion_physical_plan::execution_plan::EmissionType;
+use datafusion_physical_plan::yield_stream::YieldStreamExec;
+use datafusion_physical_plan::ExecutionPlan;
+use std::fmt::{Debug, Formatter};
+use std::sync::Arc;
+
+/// WrapLeaves is a PhysicalOptimizerRule that finds every
+/// pipelineābreaking node (emission_type == Final) and then
+/// wraps all of its leaf children in YieldStreamExec.
+pub struct WrapLeaves {}
+
+impl WrapLeaves {
+pub fn new() -> Self {
+Self {}
+}
+
+/// This function is called on every plan node during transform_down().
+/// If the node is a leaf (no children), we wrap it in a new
YieldStreamExec
+/// and stop recursing further under that branch (TreeNodeRecursion::Jump).
+fn wrap_leaves(
+plan: Arc,
+yield_frequency: usize,
+) -> Result>> {
+if plan.children().is_empty() {
+// If the leaf node already has a built-in yielding variant:
Review Comment:
This is the logic which we support built-in checking and exclude to add a
new YieldStreamExec.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on code in PR #16196: URL: https://github.com/apache/datafusion/pull/16196#discussion_r2128017451 ## datafusion/sqllogictest/test_files/joins.slt: ## @@ -4702,7 +4702,8 @@ physical_plan 01)CrossJoinExec 02)--DataSourceExec: partitions=1, partition_sizes=[0] 03)--ProjectionExec: expr=[1 as Int64(1)] -04)PlaceholderRowExec +04)YieldStreamExec frequency=64 +05)--PlaceholderRowExec Review Comment: I have enabled the flag default to true, for all the slt/testing, only two cases for adding YieldStreamExec, they are PlaceholderRowExec and WorkTableExec. Do we also need to make PlaceholderRowExec and WorkTableExec as a built-in YieldStream? Or it's ok for us to add YieldStreamExec for this two cases? cc @ozankabak @alamb -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on code in PR #16196: URL: https://github.com/apache/datafusion/pull/16196#discussion_r2128017451 ## datafusion/sqllogictest/test_files/joins.slt: ## @@ -4702,7 +4702,8 @@ physical_plan 01)CrossJoinExec 02)--DataSourceExec: partitions=1, partition_sizes=[0] 03)--ProjectionExec: expr=[1 as Int64(1)] -04)PlaceholderRowExec +04)YieldStreamExec frequency=64 +05)--PlaceholderRowExec Review Comment: I have enabled the flag default to true, for all the slt/testing, only one case for adding YieldStreamExec, it's PlaceholderRowExec. Do we also need to make PlaceholderRowExec as a built-in YieldStream? Or it's ok for us to add YieldStreamExec for this only cases? cc @ozankabak @alamb -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2128014148
##
datafusion/physical-plan/src/memory.rs:
##
@@ -139,13 +140,18 @@ pub trait LazyBatchGenerator: Send + Sync + fmt::Debug +
fmt::Display {
///
/// This plan generates output batches lazily, it doesn't have to buffer all
batches
/// in memory up front (compared to `MemorySourceConfig`), thus consuming
constant memory.
+/// We now add a `cooperative` flag to
+/// let it optionally yield back to the runtime periodically.
+/// Default is `true`, meaning it will yield back to the runtime for
cooperative scheduling.
pub struct LazyMemoryExec {
/// Schema representing the data
schema: SchemaRef,
/// Functions to generate batches for each partition
batch_generators: Vec>>,
/// Plan properties cache storing equivalence properties, partitioning,
and execution mode
cache: PlanProperties,
+/// Indicates whether to enable cooperative yielding mode.
+cooperative: bool,
Review Comment:
Also enable LazyMemoryExec as a built-in, because it's a normal case, for
example, the following cases will hit this:
```rust
SET datafusion.execution.target_partitions = 1;
SELECT SUM(value) FROM range(1, 500);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2942862579 > > I am onboard with the approach in this PR, and it looks good to me overall. Just needs some finishing touches: > > > > * To avoid such a large diff (which is mostly plans in tests), I suggest gating this behind a configuration flag, which would default to `false` for now. We will turn it on in subsequent PRs. We just need to add a few tests with this flag turned on to get it exercised in CI. > > * In the next PR (or this PR if you want), before we turn the flag on, we can add an API to ask whether a leaf supports built-in yielding. If it does, the rule wouldn't add `YieldStreamExec` at all to that leaf. If we add built-in yielding support to a few common leaf nodes, and turn the configuration flag on after doing so, plan changes will be minimal. > > * In another subsequent PR, we can fix `InterleaveExec` (@berkaysynnada and I started discussing how one can approach this problem). > > * I suggest making the number of batches to yield not a constant but a parameter of `YieldStreamExec`. Also, when we display it, showing its child is not informative (we see that from the print-out already). Displaying yielding frequency is probably a better idea. > > > > Thanks @zhuqi-lucas for the great work. Love it. > > Thank you @ozankabak , i have done in the latest PR: > > 1. Gating this behind a configuration flag, which would default to `false` for now. > 2. Only enable it for a small testing case and the end to end testing. > 3. Making the number of batches to yield not a constant but a parameter of `YieldStreamExec`, also show the frequency for the display. > > Not done until now: > > 1. Add an API to ask whether a leaf supports built-in yielding. If it does, the rule wouldn't add `YieldStreamExec` at all to that leaf. If we add built-in yielding support to a few common leaf nodes, and turn the configuration flag on after doing so, plan changes will be minimal. > 2. Fix `InterleaveExec` related corner cases. > > What do you mean built-in yielding? If it means we add some leaf nodes to support built-in yielding support? Thanks! > > If it means, we manually wrap some YieldStream to those execs? And after that, we optimize it again, so we need to exclude those operators? Updated : I have done all the remaining subtasks: 1. Add an API to ask whether a leaf supports built-in yielding. If it does, the rule wouldn't add `YieldStreamExec` at all to that leaf. If we add built-in yielding support to a few common leaf nodes, and turn the configuration flag on after doing so, plan changes will be minimal. 2. And setting DataSourceExec/LazyMemoryExec as a built-in YieldStream because they are the most common cases. 3. Enabled the flag to true now, only small number slt shows we need to add YieldStreamExec, it means DataSourceExec/LazyMemoryExec built-in will cover almost all cases. Remaining things: 1. Add InterleaveExec, etc corner cases reproduce testing. 2. Make all the corner cases passing the testing? It also can be done by changing the optimizer rule to match. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2942579857 > Oh boy, it's going to take some time to grok how that actually gets evaluated. It surprised me a bit that this gets planned as one linear chain. Is there an explanation somewhere about how that works? Or what would be a good way to try to understand? > > Regardless of that we it's useful to benchmark. @zhuqi-lucas are you familiar with setting up benchmarks in the project? It's all new to me so it might take me some time to figure out. @pepijnve Here is the benchmark readme for datafusion: https://github.com/apache/datafusion/tree/main/benchmarks And we may start from clickbench. But if we want to mock custom complex sql, we may need to look into the table field from those benchmark data. For example, the tpch data generate: ```rust Create / download a specific dataset (TPCH) ./bench.sh data tpch ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2942575262 > > What do you mean built-in yielding? If it means we add some leaf nodes to support built-in yielding support? Thanks! > > Here is rough sketch of what I meant, let's discuss and optimize. > > 1. We add built-in yielding capabilities to `DataSourceExec` via a constructor argument; i.e. it yields internally every now and then if created with that constructor argument. > 2. We add two `ExecutionPlan` APIs: > > * `yields_cooperatively` (we can choose the name later, this is a placeholder), which returns a boolean value. The default implementation simply checks `EmissionType` of the operator and returns accordingly. > * `with_cooperative_yields`, which _optionally_ returns a cooperatively-yielding version of the operator if it exists. The default implementation checks `yields_cooperatively` and returns self if true, and `None` otherwise. > > 3. We override `with_cooperative_yields` for `DataSourceExec` and return the yielding version. > 4. If the rule decides that it needs a leaf to yield, it checks whether a yielding variant exists, and replaces it with a yielding variant if it does. If it doesn't, it adds a `YieldStreamExec`. > > After our investigations, if it turns out that `InterleaveExec` or some other operator can always misbehave, we can change the return value of `yields_cooperatively` to signal that and tweak the rule to add intermediate `YieldStreamExec` nodes too. > > @zhuqi-lucas, I may be missing some detail, but does it make sense from a general point of view? Good example, thank you @ozankabak ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
pepijnve commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2941721735 Oh boy, it's going to take some time to grok how that actually gets evaluated. It surprised me a bit that this gets planned as one linear chain. Is there an explanation somewhere about how that works? Or what would be a good way to try to understand? Regardless of that we it's useful to benchmark. @zhuqi-lucas are you familiar with setting up benchmarks in the project? It's all new to me so it might take me some time to figure out. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
ozankabak commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2941415843 I think queries of the following form (not this particular one, as it works on a small dataset, but of similar form) could be interesting test vehicles for you: https://github.com/apache/datafusion/blob/bf7859e5d9dbdc260674f5333a5cafa9c6e7bc12/datafusion/sqllogictest/test_files/window.slt#L3017C2-L3063C264 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
pepijnve commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2940698205 @ozankabak I patched the gh_compare script to run the benchmark suite locally and it's running now. I'll report back when it completes. Any examples of specific queries I could try beyond those? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
ozankabak commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2940683168 @pepijnve I would suggest testing with deep plans to see what happens. I am also curious to see the results. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
ozankabak commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2940614830 > What do you mean built-in yielding? If it means we add some leaf nodes to support built-in yielding support? Thanks! Here is rough sketch of what I meant, let's discuss and optimize. 1. We add built-in yielding capabilities to `DataSourceExec` via a constructor argument; i.e. it yields internally every now and then if created with that constructor argument. 2. We add two `ExecutionPlan` APIs: - `yields_cooperatively` (we can choose the name later, this is a placeholder), which returns a boolean value. The default implementation simply checks `EmissionType` of the operator and returns accordingly. - `with_cooperative_yields`, which *optionally* returns a cooperatively-yielding version of the operator if it exists. The default implementation checks `yields_cooperatively` and returns self if true, and `None` otherwise. 3. We override `with_cooperative_yields` for `DataSourceExec` and return the yielding version. 4. If the rule decides that it needs the leaf to yield, it checks if a yielding variant exists, and replaces it with a yielding variant. If it doesn't, it adds a `YieldStreamExec`. After our investigations, if it turns out that `InterleaveExec` or some other operator can always misbehave, we can change the return value of `yields_cooperatively` to signal that and tweak the rule to add intermediate `YieldStreamExec` nodes too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
pepijnve commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2940587625 Just FYI, I've completed (I think at least) the exercise of adapting the operators in https://github.com/pepijnve/datafusion/tree/cancel_safety. I wanted to finish that if only for the learning experience. Honestly, I don't think it's that dramatic a code change and intuitively at least I would expect overhead to be negligible. I would be interested to measure what the actual impact is on the benchmarks though and compare that to the variant here. @alamb is there any particular environment required to run that branch comparison script? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2940467339 > I am onboard with the approach in this PR, and it looks good to me overall. Just needs some finishing touches: > > * To avoid such a large diff (which is mostly plans in tests), I suggest gating this behind a configuration flag, which would default to `false` for now. We will turn it on in subsequent PRs. We just need to add a few tests with this flag turned on to get it exercised in CI. > * In the next PR (or this PR if you want), before we turn the flag on, we can add an API to ask whether a leaf supports built-in yielding. If it does, the rule wouldn't add `YieldStreamExec` at all to that leaf. If we add built-in yielding support to a few common leaf nodes, and turn the configuration flag on after doing so, plan changes will be minimal. > * In another subsequent PR, we can fix `InterleaveExec` (@berkaysynnada and I started discussing how one can approach this problem). > * I suggest making the number of batches to yield not a constant but a parameter of `YieldStreamExec`. Also, when we display it, showing its child is not informative (we see that from the print-out already). Displaying yielding frequency is probably a better idea. > > Thanks @zhuqi-lucas for the great work. Love it. Thank you @ozankabak , i have done in the latest PR: 1. Gating this behind a configuration flag, which would default to `false` for now. 2. Only enable it for a small testing case and the end to end testing. 3. Making the number of batches to yield not a constant but a parameter of `YieldStreamExec`, also show the frequency for the display. Not done until now: 1. Add an API to ask whether a leaf supports built-in yielding. If it does, the rule wouldn't add `YieldStreamExec` at all to that leaf. If we add built-in yielding support to a few common leaf nodes, and turn the configuration flag on after doing so, plan changes will be minimal. 2. Fix `InterleaveExec` related corner cases. What do you mean built-in yielding? If it means we add some leaf nodes to support built-in yielding support? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
ozankabak commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2940167521 > This assumption turns out to be quite easy to break as in the interleave example. I would say it is not *that* easy to break, took you some effort to contrive an example. Besides, this happens due to a particular behavior by the current implementation of `InterleaveExec`, which (I suspect) we can fix. Let's go through the following exercise together: In the most general case, let's assume that I'm wrong and there is a certain class of operators that is always prone to "eating" `Pending` values due to the nature of the actual operation they perform. If we find this to be the case, we have discovered an important piece of information we should expose via the `ExecutionPlan` APIs. We would then do that, and simply update the rule here to also insert a `YieldExec` right after such operators. This way, we would retain the ability to insert minimum necessary overhead to any given plan to have cancellability. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
zhuqi-lucas commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2940060922 Thank you folks, i will forward this solution and do next clean up and optimize. For the interleave and SortPreservingMergeExec, etc corner cases, i agree that we can add follow-up for it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: Allow cancelling of grouping operations which are CPU bound [datafusion]
pepijnve commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2940030602 > The rule only inserts YieldExecs if the plan involves pipeline-breaking, and only once at leaves. The assumption this design makes is that a `Pending` result produced at a leaf is guaranteed to get passed up the call chain all the way to the root of the plan. I've been trying to find ways to do so, but besides panicking I don't think there's any way to long jump back to orchestrate a forced return of control to the tokio executor directly. This assumption turns out to be quite easy to break as in the interleave example. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
