Re: [I] AggregateExec not cancellable [datafusion]

2025-06-09 Thread via GitHub


ozankabak closed issue #16193: AggregateExec not cancellable
URL: https://github.com/apache/datafusion/issues/16193


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] AggregateExec not cancellable [datafusion]

2025-05-29 Thread via GitHub


alamb commented on issue #16193:
URL: https://github.com/apache/datafusion/issues/16193#issuecomment-2920591397

   - I think  https://github.com/apache/datafusion/pull/16196 will solve this 
problem.
   - Notes: 
https://github.com/apache/datafusion/pull/16196#pullrequestreview-2879569007


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] AggregateExec not cancellable [datafusion]

2025-05-28 Thread via GitHub


zhuqi-lucas commented on issue #16193:
URL: https://github.com/apache/datafusion/issues/16193#issuecomment-2918115640

   I was using IDE to run the test, and terminate, maybe it's terminated by 
IDE...
   
   We can try again based the PR, because it seems the PR not affect 
performance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] AggregateExec not cancellable [datafusion]

2025-05-28 Thread via GitHub


pepijnve commented on issue #16193:
URL: https://github.com/apache/datafusion/issues/16193#issuecomment-2917010803

   Just tested on Linux. With `USE_TASK = false` I see this
   
   ```
   Running query; will time out after 5 seconds
   InfiniteStream::poll_next 1 times
   InfiniteStream::poll_next 2 times
   InfiniteStream::poll_next 3 times
   InfiniteStream::poll_next 4 times
   InfiniteStream::poll_next 5 times
   InfiniteStream::poll_next 6 times
   InfiniteStream::poll_next 7 times
   InfiniteStream::poll_next 8 times
   InfiniteStream::poll_next 9 times
   InfiniteStream::poll_next 10 times
   InfiniteStream::poll_next 11 times
   InfiniteStream::poll_next 12 times
   InfiniteStream::poll_next 13 times
   InfiniteStream::poll_next 14 times
   InfiniteStream::poll_next 15 times
   InfiniteStream::poll_next 16 times
   InfiniteStream::poll_next 17 times
   InfiniteStream::poll_next 18 times
   InfiniteStream::poll_next 19 times
   ...
   ```
   
   with `USE_TASK = true`
   
   ```
   Running query; will time out after 5 seconds
   InfiniteStream::poll_next 1 times
   InfiniteStream::poll_next 2 times
   InfiniteStream::poll_next 3 times
   InfiniteStream::poll_next 4 times
   InfiniteStream::poll_next 5 times
   InfiniteStream::poll_next 6 times
   InfiniteStream::poll_next 7 times
   InfiniteStream::poll_next 8 times
   InfiniteStream::poll_next 9 times
   InfiniteStream::poll_next 10 times
   InfiniteStream::poll_next 11 times
   Timeout reached!
   No result (cancelled or empty)
   Exiting, stream will be dropped now
   InfiniteStream::poll_next 12 times
   InfiniteStream::poll_next 13 times
   InfiniteStream::poll_next 14 times
   InfiniteStream::poll_next 15 times
   InfiniteStream::poll_next 16 times
   InfiniteStream::poll_next 17 times
   ...
   ```
   
   neither process stops until killed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] AggregateExec not cancellable [datafusion]

2025-05-28 Thread via GitHub


pepijnve commented on issue #16193:
URL: https://github.com/apache/datafusion/issues/16193#issuecomment-2916952966

   🤔 testing on my machine your adapted version of the code still just keeps on 
running. ctrl-c does nothing. The only change I've made is to replace 
`tokio::test` with `tokio::main`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] AggregateExec not cancellable [datafusion]

2025-05-28 Thread via GitHub


zhuqi-lucas commented on issue #16193:
URL: https://github.com/apache/datafusion/issues/16193#issuecomment-2916779875

   Interesting, it seems give me an example which we can use in datafusion-cli 
to support cancel quickly! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] AggregateExec not cancellable [datafusion]

2025-05-28 Thread via GitHub


zhuqi-lucas commented on issue #16193:
URL: https://github.com/apache/datafusion/issues/16193#issuecomment-2916774947

   @pepijnve It works for me, the change code is here:
   
   ```rust
   tokio = { workspace = true, features = ["macros", "signal"]}
   ```
   
   
   ```rust
   use arrow::array::{Int64Array, RecordBatch};
   use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
   use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, 
TaskContext};
   use datafusion::functions_aggregate::sum;
   use datafusion::physical_expr::aggregate::AggregateExprBuilder;
   use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
   use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, 
PhysicalGroupBy};
   use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
   use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, 
PlanProperties};
   use datafusion::{common, physical_plan};
   use futures::{Stream, StreamExt};
   use std::any::Any;
   use std::error::Error;
   use std::fmt::Formatter;
   use std::pin::Pin;
   use std::sync::Arc;
   use std::task::{Context, Poll};
   use tokio::signal::ctrl_c;
   use datafusion::prelude::SessionContext;
   
   struct InfiniteStream {
   batch: RecordBatch,
   poll_count: usize,
   }
   
   impl RecordBatchStream for InfiniteStream {
   fn schema(&self) -> SchemaRef {
   self.batch.schema()
   }
   }
   
   impl Stream for InfiniteStream {
   type Item = common::Result;
   
   fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> 
Poll> {
   self.poll_count += 1;
   if self.poll_count % 10_000 == 0 {
   println!("InfiniteStream::poll_next {} times", self.poll_count);
   }
   Poll::Ready(Some(Ok(self.batch.clone(
   }
   }
   
   #[derive(Debug)]
   struct InfiniteExec {
   batch: RecordBatch,
   properties: PlanProperties,
   }
   
   impl InfiniteExec {
   fn new(batch: &RecordBatch) -> Self {
   InfiniteExec {
   batch: batch.clone(),
   properties: PlanProperties::new(
   EquivalenceProperties::new(batch.schema().clone()),
   Partitioning::UnknownPartitioning(1),
   EmissionType::Incremental,
   Boundedness::Unbounded {
   requires_infinite_memory: false,
   },
   ),
   }
   }
   }
   
   impl DisplayAs for InfiniteExec {
   fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> 
std::fmt::Result {
   write!(f, "infinite")
   }
   }
   
   impl ExecutionPlan for InfiniteExec {
   fn name(&self) -> &str {
   "infinite"
   }
   
   fn as_any(&self) -> &dyn Any {
   self
   }
   
   fn schema(&self) -> SchemaRef {
   self.batch.schema()
   }
   
   fn properties(&self) -> &PlanProperties {
   &self.properties
   }
   
   fn children(&self) -> Vec<&Arc> {
   vec![]
   }
   
   fn with_new_children(
   self: Arc,
   _children: Vec>,
   ) -> common::Result> {
   Ok(self.clone())
   }
   
   fn execute(
   &self,
   _partition: usize,
   _context: Arc,
   ) -> common::Result {
   Ok(Box::pin(InfiniteStream {
   batch: self.batch.clone(),
   poll_count: 0,
   }))
   }
   }
   
   #[tokio::test]
   async fn main() -> Result<(), Box> {
   // 1) build session & schema & sample batch
   let session_ctx = SessionContext::new();
   let schema = Arc::new(Schema::new(Fields::try_from(vec![
   Field::new("value", DataType::Int64, false),
   ])?));
   let mut builder = Int64Array::builder(8192);
   for v in 0..8192 {
   builder.append_value(v);
   }
   let batch = RecordBatch::try_new(schema.clone(), 
vec![Arc::new(builder.finish())])?;
   
   // 2) set up the infinite source + aggregation
   let inf = Arc::new(InfiniteExec::new(&batch));
   let aggr = Arc::new(AggregateExec::try_new(
   AggregateMode::Single,
   PhysicalGroupBy::new(vec![], vec![], vec![]),
   vec![Arc::new(
   AggregateExprBuilder::new(sum::sum_udaf(), vec![Arc::new(
   
datafusion::physical_expr::expressions::Column::new_with_schema(
   "value", &schema,
   )?
   )])
   .schema(inf.schema())
   .alias("sum")
   .build()?,
   )],
   vec![None],
   inf.clone(),
   inf.schema(),
   )?);
   
   // 3) get the stream
   let mut stream = physical_plan::execute_stream(aggr, 
session_ctx.task_ctx())?;
   
   println!("Running query; press Ctrl-C to cancel");
   // 4) drive the stream inline in select

Re: [I] AggregateExec not cancellable [datafusion]

2025-05-28 Thread via GitHub


pepijnve commented on issue #16193:
URL: https://github.com/apache/datafusion/issues/16193#issuecomment-2916647205

   @zhuqi-lucas @alamb I slapped together something quickly to test my 
cancellation hypothesis. See 
https://gist.github.com/pepijnve/c013a697b1869ea067e793bf3e1e115a
   
   For me this outputs the following which seems to confirm what I was 
thinking. Am I missing some essential element in the gist to make cancellation 
work?
   
   ```
   Running query
   InfiniteStream::poll_next 1 times
   InfiniteStream::poll_next 2 times
   InfiniteStream::poll_next 3 times
   InfiniteStream::poll_next 4 times
   ^Cctrl-C
   No result
   Dropping stream
   InfiniteStream::poll_next 5 times
   InfiniteStream::poll_next 6 times
   InfiniteStream::poll_next 7 times
   InfiniteStream::poll_next 8 times
   InfiniteStream::poll_next 9 times
   InfiniteStream::poll_next 10 times
   InfiniteStream::poll_next 11 times
   ... never stops
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] AggregateExec not cancellable [datafusion]

2025-05-28 Thread via GitHub


zhuqi-lucas commented on issue #16193:
URL: https://github.com/apache/datafusion/issues/16193#issuecomment-2916217497

   Hi @alamb , i believe we also can do the clickbench benchmark for this PR. 
But i am not confident about the result since it seems we will always add some 
overhead to aggregate.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] AggregateExec not cancellable [datafusion]

2025-05-28 Thread via GitHub


zhuqi-lucas commented on issue #16193:
URL: https://github.com/apache/datafusion/issues/16193#issuecomment-2916192179

   Tried now, it also works for the wrapper with CoalescePartitionExec when the 
partition is 1.
   
   ```rust
   diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs 
b/datafusion/physical-plan/src/coalesce_partitions.rs
   index 114f83068..ffb24463e 100644
   --- a/datafusion/physical-plan/src/coalesce_partitions.rs
   +++ b/datafusion/physical-plan/src/coalesce_partitions.rs
   @@ -154,10 +154,10 @@ impl ExecutionPlan for CoalescePartitionsExec {
0 => internal_err!(
"CoalescePartitionsExec requires at least one input 
partition"
),
   -1 => {
   -// bypass any threading / metrics if there is a single 
partition
   -self.input.execute(0, context)
   -}
   +// 1 => {
   +// // bypass any threading / metrics if there is a single 
partition
   +// self.input.execute(0, context)
   +// }
_ => {
let baseline_metrics = BaselineMetrics::new(&self.metrics, 
partition);
// record the (very) minimal work done so that
   diff --git a/datafusion/physical-plan/src/execution_plan.rs 
b/datafusion/physical-plan/src/execution_plan.rs
   index b81b3c8be..8bb8b2145 100644
   --- a/datafusion/physical-plan/src/execution_plan.rs
   +++ b/datafusion/physical-plan/src/execution_plan.rs
   @@ -963,8 +963,7 @@ pub fn execute_stream(
) -> Result {
match plan.output_partitioning().partition_count() {
0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema(,
   -1 => plan.execute(0, context),
   -2.. => {
   +1.. => {
// merge into a single partition
let plan = CoalescePartitionsExec::new(Arc::clone(&plan));
// CoalescePartitionsExec must produce a single partition
   diff --git a/parquet-testing b/parquet-testing
   index 6e851ddd7..107b36603 16
   --- a/parquet-testing
   +++ b/parquet-testing
   @@ -1 +1 @@
   -Subproject commit 6e851ddd768d6af741c7b15dc594874399fc3cff
   +Subproject commit 107b36603e051aee26bd93e04b871034f6c756c0
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] AggregateExec not cancellable [datafusion]

2025-05-28 Thread via GitHub


zhuqi-lucas commented on issue #16193:
URL: https://github.com/apache/datafusion/issues/16193#issuecomment-2916148300

   I am trying to do a solution with smallest change, may be can also wrapper 
with CoalescePartitionExec when the partition is 1, and if it has no 
regression, i believe it's the easiest and safe way.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] AggregateExec not cancellable [datafusion]

2025-05-28 Thread via GitHub


pepijnve commented on issue #16193:
URL: https://github.com/apache/datafusion/issues/16193#issuecomment-2916133761

   FYI I'm basing myself on the documentation and 
https://users.rust-lang.org/t/tokio-does-not-terminate-all-tasks-immediately-on-program-exit/100790/10


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] AggregateExec not cancellable [datafusion]

2025-05-28 Thread via GitHub


pepijnve commented on issue #16193:
URL: https://github.com/apache/datafusion/issues/16193#issuecomment-2916076401

   > do you mean it's not the RecordBatchReceiverStream which help the 
cancellation?
   
   Trying to figure this out :D I'm a Java developer mainly; still getting my 
head around async Rust.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] AggregateExec not cancellable [datafusion]

2025-05-28 Thread via GitHub


zhuqi-lucas commented on issue #16193:
URL: https://github.com/apache/datafusion/issues/16193#issuecomment-2916070640

   > Just for context, I ran into this while working on a Java based 
application that drives the DataFusion queries. I want to be able to interrupt 
query execution from the Java side. From the Java side I'm basically calling 
`runtime.block_on(async { record_batch_stream.next().await })`. Since I can't 
use Java's thread interruption mechanism to unblock the `next()` call I was 
trying to use `tokio::select!` with a cancellation token instead and that 
failed to cancel. This is the same pattern as what the CLI does, just triggered 
via a different channel than a signal handler.
   > 
   > I still need to write an experiment for this, but based on my reading of 
the documentation even with a `RecordBatchReceiverStream` the aggregate would 
not actually get cancelled. It will look like it has cancelled because the top 
level task will yield, the `JoinSet` will get dropped, and the spawned tasks 
will be aborted. But tokio will only actually be able to terminate the task 
once the aggregate stream's poll loop yields.
   
   Thank you @pepijnve for this info, i tried the aggregate with more than 1 
partition, it seems can be canceled, and it uses RecordBatchReceiverStream 
inside the CoalescePartitionExec, do you mean it's not the 
RecordBatchReceiverStream which help the cancellation?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] AggregateExec not cancellable [datafusion]

2025-05-28 Thread via GitHub


pepijnve commented on issue #16193:
URL: https://github.com/apache/datafusion/issues/16193#issuecomment-2916044328

   Just for context, I ran into this while working on a Java based application 
that drives the DataFusion queries. I want to be able to interrupt query 
execution from the Java side. From the Java side I'm basically calling 
`runtime.block_on(async { record_batch_stream.next().await })`. Since I can't 
use Java's thread interruption mechanism to unblock the `next()` call I was 
trying to use `tokio::select!` with a cancellation token instead and that 
failed to cancel. This is the same pattern as what the CLI does, just triggered 
via a different channel than a signal handler.
   
   I still need to write an experiment for this, but based on my reading of the 
documentation even with a `RecordBatchReceiverStream` the aggregate would not 
actually get cancelled. The `JoinSet` will get dropped and the spawned tasks 
will be aborted, but tokio will only actually be able to abort the loop once 
the aggregate's stream yields.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] AggregateExec not cancellable [datafusion]

2025-05-28 Thread via GitHub


zhuqi-lucas commented on issue #16193:
URL: https://github.com/apache/datafusion/issues/16193#issuecomment-2916014231

   Updated no performance regression for the PR with huge aggregate testing:
   
   https://github.com/apache/datafusion/pull/16196#issuecomment-2916000852


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] AggregateExec not cancellable [datafusion]

2025-05-28 Thread via GitHub


zhuqi-lucas commented on issue #16193:
URL: https://github.com/apache/datafusion/issues/16193#issuecomment-2915766242

   > Yes this is more or less the same issue. PR 
[#14028](https://github.com/apache/datafusion/pull/14028) proposed adding a 
yield point at the leaf of the plan when moving from one file to the next. This 
PR adds yield points closer to the top of the plan tree just below the 
AggregateExec's stream by wrapping its input and then yields every 64 input 
batches. I was wondering if that should be row count or time interval based 
rather than batch count based.
   > 
   > I found [#15314](https://github.com/apache/datafusion/issues/15314) in the 
meantime as well. This issue provides one concrete and easily reproducible 
example of a query that cannot be cancelled.
   > 
   > ~The comments on PR 
[#14028](https://github.com/apache/datafusion/pull/14028) regarding Tokio's 
`yield_now` are interesting and relevant for PR 
[#16196](https://github.com/apache/datafusion/pull/16196) as well. Seems like 
the code pattern should be~ ~I can run some tests to see what the actual 
behavior is in the ST and MT Tokio runtimes if that helps.~
   > 
   > Edit: conclusion in PR 
[#14028](https://github.com/apache/datafusion/pull/14028) discussion was that 
calling `wake_by_ref` was fine.
   
   Thank you @pepijnve for review, why i was not using row count because we 
need to calculate batch_size * batch count, we want to not affect the 
performance for core logic for datafusion, even batch count 64, i am wandering 
if it will affect the core logic performance when we have huge data. 
   
   I am wandering if we can wrapper the yeild logic outside the core exec logic 
in datafusion, such as in the datafusion-cli side if we only want to do the 
ctril c in datafusion-cli side. 
   
   But it seems more cases besides datafusion-cli which want to terminate the 
streaming, for example the customers who use grpc to terminate:
   
   https://github.com/apache/datafusion/issues/14036#issuecomment-2577862225 
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]