Re: [I] AggregateExec not cancellable [datafusion]
ozankabak closed issue #16193: AggregateExec not cancellable URL: https://github.com/apache/datafusion/issues/16193 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [I] AggregateExec not cancellable [datafusion]
alamb commented on issue #16193: URL: https://github.com/apache/datafusion/issues/16193#issuecomment-2920591397 - I think https://github.com/apache/datafusion/pull/16196 will solve this problem. - Notes: https://github.com/apache/datafusion/pull/16196#pullrequestreview-2879569007 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [I] AggregateExec not cancellable [datafusion]
zhuqi-lucas commented on issue #16193: URL: https://github.com/apache/datafusion/issues/16193#issuecomment-2918115640 I was using IDE to run the test, and terminate, maybe it's terminated by IDE... We can try again based the PR, because it seems the PR not affect performance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [I] AggregateExec not cancellable [datafusion]
pepijnve commented on issue #16193: URL: https://github.com/apache/datafusion/issues/16193#issuecomment-2917010803 Just tested on Linux. With `USE_TASK = false` I see this ``` Running query; will time out after 5 seconds InfiniteStream::poll_next 1 times InfiniteStream::poll_next 2 times InfiniteStream::poll_next 3 times InfiniteStream::poll_next 4 times InfiniteStream::poll_next 5 times InfiniteStream::poll_next 6 times InfiniteStream::poll_next 7 times InfiniteStream::poll_next 8 times InfiniteStream::poll_next 9 times InfiniteStream::poll_next 10 times InfiniteStream::poll_next 11 times InfiniteStream::poll_next 12 times InfiniteStream::poll_next 13 times InfiniteStream::poll_next 14 times InfiniteStream::poll_next 15 times InfiniteStream::poll_next 16 times InfiniteStream::poll_next 17 times InfiniteStream::poll_next 18 times InfiniteStream::poll_next 19 times ... ``` with `USE_TASK = true` ``` Running query; will time out after 5 seconds InfiniteStream::poll_next 1 times InfiniteStream::poll_next 2 times InfiniteStream::poll_next 3 times InfiniteStream::poll_next 4 times InfiniteStream::poll_next 5 times InfiniteStream::poll_next 6 times InfiniteStream::poll_next 7 times InfiniteStream::poll_next 8 times InfiniteStream::poll_next 9 times InfiniteStream::poll_next 10 times InfiniteStream::poll_next 11 times Timeout reached! No result (cancelled or empty) Exiting, stream will be dropped now InfiniteStream::poll_next 12 times InfiniteStream::poll_next 13 times InfiniteStream::poll_next 14 times InfiniteStream::poll_next 15 times InfiniteStream::poll_next 16 times InfiniteStream::poll_next 17 times ... ``` neither process stops until killed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [I] AggregateExec not cancellable [datafusion]
pepijnve commented on issue #16193: URL: https://github.com/apache/datafusion/issues/16193#issuecomment-2916952966 🤔 testing on my machine your adapted version of the code still just keeps on running. ctrl-c does nothing. The only change I've made is to replace `tokio::test` with `tokio::main`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [I] AggregateExec not cancellable [datafusion]
zhuqi-lucas commented on issue #16193: URL: https://github.com/apache/datafusion/issues/16193#issuecomment-2916779875 Interesting, it seems give me an example which we can use in datafusion-cli to support cancel quickly! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [I] AggregateExec not cancellable [datafusion]
zhuqi-lucas commented on issue #16193:
URL: https://github.com/apache/datafusion/issues/16193#issuecomment-2916774947
@pepijnve It works for me, the change code is here:
```rust
tokio = { workspace = true, features = ["macros", "signal"]}
```
```rust
use arrow::array::{Int64Array, RecordBatch};
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream,
TaskContext};
use datafusion::functions_aggregate::sum;
use datafusion::physical_expr::aggregate::AggregateExprBuilder;
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode,
PhysicalGroupBy};
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan,
PlanProperties};
use datafusion::{common, physical_plan};
use futures::{Stream, StreamExt};
use std::any::Any;
use std::error::Error;
use std::fmt::Formatter;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::signal::ctrl_c;
use datafusion::prelude::SessionContext;
struct InfiniteStream {
batch: RecordBatch,
poll_count: usize,
}
impl RecordBatchStream for InfiniteStream {
fn schema(&self) -> SchemaRef {
self.batch.schema()
}
}
impl Stream for InfiniteStream {
type Item = common::Result;
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) ->
Poll> {
self.poll_count += 1;
if self.poll_count % 10_000 == 0 {
println!("InfiniteStream::poll_next {} times", self.poll_count);
}
Poll::Ready(Some(Ok(self.batch.clone(
}
}
#[derive(Debug)]
struct InfiniteExec {
batch: RecordBatch,
properties: PlanProperties,
}
impl InfiniteExec {
fn new(batch: &RecordBatch) -> Self {
InfiniteExec {
batch: batch.clone(),
properties: PlanProperties::new(
EquivalenceProperties::new(batch.schema().clone()),
Partitioning::UnknownPartitioning(1),
EmissionType::Incremental,
Boundedness::Unbounded {
requires_infinite_memory: false,
},
),
}
}
}
impl DisplayAs for InfiniteExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) ->
std::fmt::Result {
write!(f, "infinite")
}
}
impl ExecutionPlan for InfiniteExec {
fn name(&self) -> &str {
"infinite"
}
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.batch.schema()
}
fn properties(&self) -> &PlanProperties {
&self.properties
}
fn children(&self) -> Vec<&Arc> {
vec![]
}
fn with_new_children(
self: Arc,
_children: Vec>,
) -> common::Result> {
Ok(self.clone())
}
fn execute(
&self,
_partition: usize,
_context: Arc,
) -> common::Result {
Ok(Box::pin(InfiniteStream {
batch: self.batch.clone(),
poll_count: 0,
}))
}
}
#[tokio::test]
async fn main() -> Result<(), Box> {
// 1) build session & schema & sample batch
let session_ctx = SessionContext::new();
let schema = Arc::new(Schema::new(Fields::try_from(vec![
Field::new("value", DataType::Int64, false),
])?));
let mut builder = Int64Array::builder(8192);
for v in 0..8192 {
builder.append_value(v);
}
let batch = RecordBatch::try_new(schema.clone(),
vec![Arc::new(builder.finish())])?;
// 2) set up the infinite source + aggregation
let inf = Arc::new(InfiniteExec::new(&batch));
let aggr = Arc::new(AggregateExec::try_new(
AggregateMode::Single,
PhysicalGroupBy::new(vec![], vec![], vec![]),
vec![Arc::new(
AggregateExprBuilder::new(sum::sum_udaf(), vec![Arc::new(
datafusion::physical_expr::expressions::Column::new_with_schema(
"value", &schema,
)?
)])
.schema(inf.schema())
.alias("sum")
.build()?,
)],
vec![None],
inf.clone(),
inf.schema(),
)?);
// 3) get the stream
let mut stream = physical_plan::execute_stream(aggr,
session_ctx.task_ctx())?;
println!("Running query; press Ctrl-C to cancel");
// 4) drive the stream inline in select
Re: [I] AggregateExec not cancellable [datafusion]
pepijnve commented on issue #16193: URL: https://github.com/apache/datafusion/issues/16193#issuecomment-2916647205 @zhuqi-lucas @alamb I slapped together something quickly to test my cancellation hypothesis. See https://gist.github.com/pepijnve/c013a697b1869ea067e793bf3e1e115a For me this outputs the following which seems to confirm what I was thinking. Am I missing some essential element in the gist to make cancellation work? ``` Running query InfiniteStream::poll_next 1 times InfiniteStream::poll_next 2 times InfiniteStream::poll_next 3 times InfiniteStream::poll_next 4 times ^Cctrl-C No result Dropping stream InfiniteStream::poll_next 5 times InfiniteStream::poll_next 6 times InfiniteStream::poll_next 7 times InfiniteStream::poll_next 8 times InfiniteStream::poll_next 9 times InfiniteStream::poll_next 10 times InfiniteStream::poll_next 11 times ... never stops ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [I] AggregateExec not cancellable [datafusion]
zhuqi-lucas commented on issue #16193: URL: https://github.com/apache/datafusion/issues/16193#issuecomment-2916217497 Hi @alamb , i believe we also can do the clickbench benchmark for this PR. But i am not confident about the result since it seems we will always add some overhead to aggregate. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [I] AggregateExec not cancellable [datafusion]
zhuqi-lucas commented on issue #16193:
URL: https://github.com/apache/datafusion/issues/16193#issuecomment-2916192179
Tried now, it also works for the wrapper with CoalescePartitionExec when the
partition is 1.
```rust
diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs
b/datafusion/physical-plan/src/coalesce_partitions.rs
index 114f83068..ffb24463e 100644
--- a/datafusion/physical-plan/src/coalesce_partitions.rs
+++ b/datafusion/physical-plan/src/coalesce_partitions.rs
@@ -154,10 +154,10 @@ impl ExecutionPlan for CoalescePartitionsExec {
0 => internal_err!(
"CoalescePartitionsExec requires at least one input
partition"
),
-1 => {
-// bypass any threading / metrics if there is a single
partition
-self.input.execute(0, context)
-}
+// 1 => {
+// // bypass any threading / metrics if there is a single
partition
+// self.input.execute(0, context)
+// }
_ => {
let baseline_metrics = BaselineMetrics::new(&self.metrics,
partition);
// record the (very) minimal work done so that
diff --git a/datafusion/physical-plan/src/execution_plan.rs
b/datafusion/physical-plan/src/execution_plan.rs
index b81b3c8be..8bb8b2145 100644
--- a/datafusion/physical-plan/src/execution_plan.rs
+++ b/datafusion/physical-plan/src/execution_plan.rs
@@ -963,8 +963,7 @@ pub fn execute_stream(
) -> Result {
match plan.output_partitioning().partition_count() {
0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema(,
-1 => plan.execute(0, context),
-2.. => {
+1.. => {
// merge into a single partition
let plan = CoalescePartitionsExec::new(Arc::clone(&plan));
// CoalescePartitionsExec must produce a single partition
diff --git a/parquet-testing b/parquet-testing
index 6e851ddd7..107b36603 16
--- a/parquet-testing
+++ b/parquet-testing
@@ -1 +1 @@
-Subproject commit 6e851ddd768d6af741c7b15dc594874399fc3cff
+Subproject commit 107b36603e051aee26bd93e04b871034f6c756c0
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [I] AggregateExec not cancellable [datafusion]
zhuqi-lucas commented on issue #16193: URL: https://github.com/apache/datafusion/issues/16193#issuecomment-2916148300 I am trying to do a solution with smallest change, may be can also wrapper with CoalescePartitionExec when the partition is 1, and if it has no regression, i believe it's the easiest and safe way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [I] AggregateExec not cancellable [datafusion]
pepijnve commented on issue #16193: URL: https://github.com/apache/datafusion/issues/16193#issuecomment-2916133761 FYI I'm basing myself on the documentation and https://users.rust-lang.org/t/tokio-does-not-terminate-all-tasks-immediately-on-program-exit/100790/10 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [I] AggregateExec not cancellable [datafusion]
pepijnve commented on issue #16193: URL: https://github.com/apache/datafusion/issues/16193#issuecomment-2916076401 > do you mean it's not the RecordBatchReceiverStream which help the cancellation? Trying to figure this out :D I'm a Java developer mainly; still getting my head around async Rust. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [I] AggregateExec not cancellable [datafusion]
zhuqi-lucas commented on issue #16193:
URL: https://github.com/apache/datafusion/issues/16193#issuecomment-2916070640
> Just for context, I ran into this while working on a Java based
application that drives the DataFusion queries. I want to be able to interrupt
query execution from the Java side. From the Java side I'm basically calling
`runtime.block_on(async { record_batch_stream.next().await })`. Since I can't
use Java's thread interruption mechanism to unblock the `next()` call I was
trying to use `tokio::select!` with a cancellation token instead and that
failed to cancel. This is the same pattern as what the CLI does, just triggered
via a different channel than a signal handler.
>
> I still need to write an experiment for this, but based on my reading of
the documentation even with a `RecordBatchReceiverStream` the aggregate would
not actually get cancelled. It will look like it has cancelled because the top
level task will yield, the `JoinSet` will get dropped, and the spawned tasks
will be aborted. But tokio will only actually be able to terminate the task
once the aggregate stream's poll loop yields.
Thank you @pepijnve for this info, i tried the aggregate with more than 1
partition, it seems can be canceled, and it uses RecordBatchReceiverStream
inside the CoalescePartitionExec, do you mean it's not the
RecordBatchReceiverStream which help the cancellation?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [I] AggregateExec not cancellable [datafusion]
pepijnve commented on issue #16193:
URL: https://github.com/apache/datafusion/issues/16193#issuecomment-2916044328
Just for context, I ran into this while working on a Java based application
that drives the DataFusion queries. I want to be able to interrupt query
execution from the Java side. From the Java side I'm basically calling
`runtime.block_on(async { record_batch_stream.next().await })`. Since I can't
use Java's thread interruption mechanism to unblock the `next()` call I was
trying to use `tokio::select!` with a cancellation token instead and that
failed to cancel. This is the same pattern as what the CLI does, just triggered
via a different channel than a signal handler.
I still need to write an experiment for this, but based on my reading of the
documentation even with a `RecordBatchReceiverStream` the aggregate would not
actually get cancelled. The `JoinSet` will get dropped and the spawned tasks
will be aborted, but tokio will only actually be able to abort the loop once
the aggregate's stream yields.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [I] AggregateExec not cancellable [datafusion]
zhuqi-lucas commented on issue #16193: URL: https://github.com/apache/datafusion/issues/16193#issuecomment-2916014231 Updated no performance regression for the PR with huge aggregate testing: https://github.com/apache/datafusion/pull/16196#issuecomment-2916000852 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [I] AggregateExec not cancellable [datafusion]
zhuqi-lucas commented on issue #16193: URL: https://github.com/apache/datafusion/issues/16193#issuecomment-2915766242 > Yes this is more or less the same issue. PR [#14028](https://github.com/apache/datafusion/pull/14028) proposed adding a yield point at the leaf of the plan when moving from one file to the next. This PR adds yield points closer to the top of the plan tree just below the AggregateExec's stream by wrapping its input and then yields every 64 input batches. I was wondering if that should be row count or time interval based rather than batch count based. > > I found [#15314](https://github.com/apache/datafusion/issues/15314) in the meantime as well. This issue provides one concrete and easily reproducible example of a query that cannot be cancelled. > > ~The comments on PR [#14028](https://github.com/apache/datafusion/pull/14028) regarding Tokio's `yield_now` are interesting and relevant for PR [#16196](https://github.com/apache/datafusion/pull/16196) as well. Seems like the code pattern should be~ ~I can run some tests to see what the actual behavior is in the ST and MT Tokio runtimes if that helps.~ > > Edit: conclusion in PR [#14028](https://github.com/apache/datafusion/pull/14028) discussion was that calling `wake_by_ref` was fine. Thank you @pepijnve for review, why i was not using row count because we need to calculate batch_size * batch count, we want to not affect the performance for core logic for datafusion, even batch count 64, i am wandering if it will affect the core logic performance when we have huge data. I am wandering if we can wrapper the yeild logic outside the core exec logic in datafusion, such as in the datafusion-cli side if we only want to do the ctril c in datafusion-cli side. But it seems more cases besides datafusion-cli which want to terminate the streaming, for example the customers who use grpc to terminate: https://github.com/apache/datafusion/issues/14036#issuecomment-2577862225 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
