alamb commented on a change in pull request #8553:
URL: https://github.com/apache/arrow/pull/8553#discussion_r514551317



##########
File path: rust/datafusion/src/physical_plan/hash_aggregate.rs
##########
@@ -214,13 +217,13 @@ Example: average
 * Once all N record batches arrive, `merge` is performed, which builds a 
RecordBatch with N rows and 2 columns.
 * Finally, `get_value` returns an array with one entry computed from the state
 */
-struct GroupedHashAggregateStream {
-    mode: AggregateMode,
-    schema: SchemaRef,
-    group_expr: Vec<Arc<dyn PhysicalExpr>>,
-    aggr_expr: Vec<Arc<dyn AggregateExpr>>,
-    input: SendableRecordBatchStream,
-    finished: bool,
+pin_project! {

Review comment:
       I found the whole notion of having to Pin things to call `poll_next` on 
streams to be unnaturally confusing. While I basically understand what 
`pin_project!` does, I will admit I do not understand why Streams need to be 
`Pin`. I am just cargo culting it there

##########
File path: rust/datafusion/src/physical_plan/hash_aggregate.rs
##########
@@ -315,6 +318,41 @@ fn group_aggregate_batch(
     Ok(accumulators)
 }
 
+async fn compute_grouped_hash_aggregate(

Review comment:
       The main change in this file is to pull the computation of the output 
record batch into a future (aka an async function). I think this formulation is 
still a natural way to express the logic (you can see each input batch being 
fed to `group_aggregate_batch`)

##########
File path: rust/datafusion/src/physical_plan/hash_aggregate.rs
##########
@@ -324,12 +362,24 @@ impl GroupedHashAggregateStream {
         aggr_expr: Vec<Arc<dyn AggregateExpr>>,
         input: SendableRecordBatchStream,
     ) -> Self {
+        let (tx, rx) = futures::channel::oneshot::channel();
+
+        let schema_clone = schema.clone();
+        tokio::spawn(async move {

Review comment:
       this spawns a separate task to do the actual aggregation (and drive the 
inputs, as needed, by calling `input.next().await`). Driving the input was not 
happening when using `future.into_stream` in the actual call to `poll_next` 
https://github.com/apache/arrow/pull/8553/files#diff-4dbe750a2836f6c723bc36ae39f2faf15e2706303f5dbb042ae00f1f36658c62L399




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to