Re: [PR] feat: support inability to yield for loop when it's not using Tokio MPSC (RecordBatchReceiverStream) [datafusion]

2025-05-30 Thread via GitHub


alamb commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2115561327


##
datafusion/common-runtime/src/common.rs:
##
@@ -98,6 +103,71 @@ impl Drop for SpawnedTask {
 }
 }
 
+/// Number of batches to yield before voluntarily returning Pending.

Review Comment:
   I poked around a bit more and found a better home for this, I think 
   
   
https://github.com/apache/datafusion/blob/dd9c3a815d7b4af2ef503ea557332ecc700af318/datafusion/physical-plan/src/stream.rs#L510-L509
   
   I'll move it around as I made the suggestion to move here



##
datafusion/common-runtime/Cargo.toml:
##
@@ -38,6 +38,10 @@ workspace = true
 name = "datafusion_common_runtime"
 
 [dependencies]
+arrow = { workspace = true }
+arrow-schema = { workspace = true }

Review Comment:
   I don't think we need `arrow-schema` as `arrow` directly exports most of this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: support inability to yield for loop when it's not using Tokio MPSC (RecordBatchReceiverStream) [datafusion]

2025-05-29 Thread via GitHub


zhuqi-lucas commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2115086432


##
datafusion/physical-plan/src/aggregates/no_grouping.rs:
##
@@ -77,6 +77,9 @@ impl AggregateStream {
 let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition);
 let input = agg.input.execute(partition, Arc::clone(&context))?;
 
+// Wrap no‐grouping aggregates in our YieldStream

Review Comment:
   Thank you @alamb , good suggestion, addressed it in latest PR.



##
datafusion/physical-plan/src/aggregates/no_grouping.rs:
##
@@ -170,6 +173,65 @@ impl AggregateStream {
 }
 }
 
+/// A stream that yields batches of data, yielding control back to the executor

Review Comment:
   Thank you @alamb , good suggestion, addressed it in latest PR.



##
datafusion/physical-plan/src/aggregates/no_grouping.rs:
##
@@ -170,6 +173,65 @@ impl AggregateStream {
 }
 }
 
+/// A stream that yields batches of data, yielding control back to the executor
+pub struct YieldStream {
+inner: SendableRecordBatchStream,
+batches_processed: usize,
+buffer: Option>,
+}
+
+impl YieldStream {
+pub fn new(inner: SendableRecordBatchStream) -> Self {
+Self {
+inner,
+batches_processed: 0,
+buffer: None,
+}
+}
+}
+
+// Stream> to poll_next_unpin
+impl Stream for YieldStream {
+type Item = Result;
+
+fn poll_next(
+mut self: std::pin::Pin<&mut Self>,
+cx: &mut Context<'_>,
+) -> Poll> {
+const YIELD_BATCHES: usize = 64;

Review Comment:
   Thank you @alamb , good suggestion, addressed it in latest PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: support inability to yield for loop when it's not using Tokio MPSC (RecordBatchReceiverStream) [datafusion]

2025-05-29 Thread via GitHub


zhuqi-lucas commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2115086727


##
datafusion/physical-plan/src/aggregates/no_grouping.rs:
##
@@ -170,6 +173,65 @@ impl AggregateStream {
 }
 }
 
+/// A stream that yields batches of data, yielding control back to the executor
+pub struct YieldStream {
+inner: SendableRecordBatchStream,
+batches_processed: usize,
+buffer: Option>,
+}
+
+impl YieldStream {

Review Comment:
   I also added some unit test for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: support inability to yield for loop when it's not using Tokio MPSC (RecordBatchReceiverStream) [datafusion]

2025-05-29 Thread via GitHub


zhuqi-lucas commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2921143644

   > Thank you @zhuqi-lucas and @pepijnve I think this PR does actually solve 
the cancellation issue
   > 
   > I left some suggestions for how it could be improved, but the basic idea 
looks great to me and I think we could merge this PR and handle the results as 
a follow on
   > 
   > I also tested with https://github.com/pepijnve/datafusion_cancel_test from 
@pepijnve
   > 
   > With this PR checked out locally, the test now shows proper cancel 
behavior for me
   > 
   > ```diff
   > diff --git a/Cargo.toml b/Cargo.toml
   > index 8855aee..f815883 100644
   > --- a/Cargo.toml
   > +++ b/Cargo.toml
   > @@ -6,7 +6,8 @@ edition = "2024"
   >  [dependencies]
   >  futures = "0.3.31"
   >  arrow = "55.0.0"
   > -datafusion = "47.0.0"
   > +# datafusion = "47.0.0"
   > +datafusion = { path = 
"/Users/andrewlamb/Software/datafusion/datafusion/core" }
   > 
   >  [dependencies.tokio]
   >  version = "1.45.1"
   > ```
   > 
   > ```
   > Running query; will time out after 5 seconds
   > InfiniteStream::poll_next 1 times
   > InfiniteStream::poll_next 2 times
   > InfiniteStream::poll_next 3 times
   > InfiniteStream::poll_next 4 times
   > InfiniteStream::poll_next 5 times
   > InfiniteStream::poll_next 6 times
   > InfiniteStream::poll_next 7 times
   > InfiniteStream::poll_next 8 times
   > InfiniteStream::poll_next 9 times
   > InfiniteStream::poll_next 10 times
   > InfiniteStream::poll_next 11 times
   > InfiniteStream::poll_next 12 times
   > Timeout reached!
   > No result (cancelled or empty)
   > Exiting, stream will be dropped now
   > ```
   > 
   > Without those changes
   > 
   > ```
   > Running query; will time out after 5 seconds
   > InfiniteStream::poll_next 1 times
   > InfiniteStream::poll_next 2 times
   > InfiniteStream::poll_next 3 times
   > InfiniteStream::poll_next 4 times
   > InfiniteStream::poll_next 5 times
   > InfiniteStream::poll_next 6 times
   > InfiniteStream::poll_next 7 times
   > InfiniteStream::poll_next 8 times
   > InfiniteStream::poll_next 9 times
   > InfiniteStream::poll_next 10 times
   > InfiniteStream::poll_next 11 times
   > InfiniteStream::poll_next 12 times
   > InfiniteStream::poll_next 13 times
   > InfiniteStream::poll_next 14 times
   > InfiniteStream::poll_next 15 times
   > InfiniteStream::poll_next 16 times
   > InfiniteStream::poll_next 17 times
   > InfiniteStream::poll_next 18 times
   > InfiniteStream::poll_next 19 times
   > InfiniteStream::poll_next 20 times
   > ...
   > (never cancels)
   > ```
   
   Thank you @alamb for review and checking! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: support inability to yield for loop when it's not using Tokio MPSC (RecordBatchReceiverStream) [datafusion]

2025-05-29 Thread via GitHub


zhuqi-lucas commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2115086314


##
datafusion/physical-plan/src/aggregates/no_grouping.rs:
##
@@ -170,6 +173,65 @@ impl AggregateStream {
 }
 }
 
+/// A stream that yields batches of data, yielding control back to the executor
+pub struct YieldStream {
+inner: SendableRecordBatchStream,
+batches_processed: usize,
+buffer: Option>,
+}
+
+impl YieldStream {

Review Comment:
   Thank you @alamb , good suggestion, addressed it in latest PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: support inability to yield for loop when it's not using Tokio MPSC (RecordBatchReceiverStream) [datafusion]

2025-05-29 Thread via GitHub


alamb commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2114732484


##
datafusion/physical-plan/src/aggregates/no_grouping.rs:
##
@@ -77,6 +77,9 @@ impl AggregateStream {
 let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition);
 let input = agg.input.execute(partition, Arc::clone(&context))?;
 
+// Wrap no‐grouping aggregates in our YieldStream

Review Comment:
   I think we should add the rationale for doing this here, otherwise it is not 
clear
   
   For example, something like
   ```suggestion
   // Yield control back to tokio after a certain number of batches so 
it can check for cancellation. 
   // See https://github.com/apache/datafusion/issues/16193
   ```



##
datafusion/physical-plan/src/aggregates/no_grouping.rs:
##
@@ -170,6 +173,65 @@ impl AggregateStream {
 }
 }
 
+/// A stream that yields batches of data, yielding control back to the executor
+pub struct YieldStream {
+inner: SendableRecordBatchStream,
+batches_processed: usize,
+buffer: Option>,
+}
+
+impl YieldStream {

Review Comment:
   This is a neat structure --  think it is more generally useful. Perhaps we 
could put it into somewhere that it is more likely discoverable, such as 
https://github.com/apache/datafusion/tree/main/datafusion/common-runtime/src



##
datafusion/physical-plan/src/aggregates/no_grouping.rs:
##
@@ -170,6 +173,65 @@ impl AggregateStream {
 }
 }
 
+/// A stream that yields batches of data, yielding control back to the executor

Review Comment:
   ```suggestion
   /// A stream that yields batches of data, yielding control back to the 
executor every `YIELD_BATCHES` batches
   ///
   /// This can be useful for to allow operators that might not yield to check 
for cancellation
   ```



##
datafusion/physical-plan/src/aggregates/no_grouping.rs:
##
@@ -170,6 +173,65 @@ impl AggregateStream {
 }
 }
 
+/// A stream that yields batches of data, yielding control back to the executor
+pub struct YieldStream {
+inner: SendableRecordBatchStream,
+batches_processed: usize,
+buffer: Option>,
+}
+
+impl YieldStream {
+pub fn new(inner: SendableRecordBatchStream) -> Self {
+Self {
+inner,
+batches_processed: 0,
+buffer: None,
+}
+}
+}
+
+// Stream> to poll_next_unpin
+impl Stream for YieldStream {
+type Item = Result;
+
+fn poll_next(
+mut self: std::pin::Pin<&mut Self>,
+cx: &mut Context<'_>,
+) -> Poll> {
+const YIELD_BATCHES: usize = 64;

Review Comment:
   I think this would be easier to find and make the code easer to undertand of 
 we moved it to a separate `const` in the module and add comments that describe 
it (aka the number of batches returned prior to checking for drop)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: support inability to yield for loop when it's not using Tokio MPSC (RecordBatchReceiverStream) [datafusion]

2025-05-29 Thread via GitHub


alamb commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2920564055

   πŸ€–: Benchmark completed
   
   Details
   
   
   
   ```
   Comparing HEAD and issue_16193
   
   Benchmark cancellation.json
   
   ┏━━┳━┳━┳━━━┓
   ┃ Query┃HEAD ┃ issue_16193 ┃Change ┃
   ┑━━╇━╇━╇━━━┩
   β”‚ QCancellati… β”‚ 28.73ms β”‚ 24.80ms β”‚ +1.16x faster β”‚
   β””β”€β”€β”΄β”€β”΄β”€β”΄β”€β”€β”€β”˜
   ┏┳━┓
   ┃ Benchmark Summary  ┃ ┃
   ┑╇━┩
   β”‚ Total Time (HEAD)  β”‚ 28.73ms β”‚
   β”‚ Total Time (issue_16193)   β”‚ 24.80ms β”‚
   β”‚ Average Time (HEAD)β”‚ 28.73ms β”‚
   β”‚ Average Time (issue_16193) β”‚ 24.80ms β”‚
   β”‚ Queries Faster β”‚   1 β”‚
   β”‚ Queries Slower β”‚   0 β”‚
   β”‚ Queries with No Change β”‚   0 β”‚
   β””β”΄β”€β”˜
   ```
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: support inability to yield for loop when it's not using Tokio MPSC (RecordBatchReceiverStream) [datafusion]

2025-05-29 Thread via GitHub


alamb commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2920543295

   πŸ€– `./gh_compare_branch.sh` [Benchmark 
Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh)
 Running
   Linux aal-dev 6.11.0-1013-gcp #13~24.04.1-Ubuntu SMP Wed Apr  2 16:34:16 UTC 
2025 x86_64 x86_64 x86_64 GNU/Linux
   Comparing issue_16193 (6cf3bf0fbc474a33030184fa7f0f6c013db7c7b7) to 
2d12bf6715e59142594cbc0ccb11bb19e4826b06 
[diff](https://github.com/apache/datafusion/compare/2d12bf6715e59142594cbc0ccb11bb19e4826b06..6cf3bf0fbc474a33030184fa7f0f6c013db7c7b7)
   Benchmarks: cancellation
   Results will be posted here when complete
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: support inability to yield for loop when it's not using Tokio MPSC (RecordBatchReceiverStream) [datafusion]

2025-05-29 Thread via GitHub


alamb commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2920533806

   > Thank you @alamb , it's surprising that performance has no regression, 
even faster for clickbench_partitioned, it may due to we yield for each 
partition running, and those make the partition running more efficient.
   
   I think the changes reported are likely measurement noise. My analysis is 
that this PR doesn't change performance significantly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]