alamb commented on a change in pull request #8553: URL: https://github.com/apache/arrow/pull/8553#discussion_r514551317
########## File path: rust/datafusion/src/physical_plan/hash_aggregate.rs ########## @@ -214,13 +217,13 @@ Example: average * Once all N record batches arrive, `merge` is performed, which builds a RecordBatch with N rows and 2 columns. * Finally, `get_value` returns an array with one entry computed from the state */ -struct GroupedHashAggregateStream { - mode: AggregateMode, - schema: SchemaRef, - group_expr: Vec<Arc<dyn PhysicalExpr>>, - aggr_expr: Vec<Arc<dyn AggregateExpr>>, - input: SendableRecordBatchStream, - finished: bool, +pin_project! { Review comment: I found the whole notion of having to Pin things to call `poll_next` on streams to be unnaturally confusing. While I basically understand what `pin_project!` does, I will admit I do not understand why Streams need to be `Pin`. I am just cargo culting it there ########## File path: rust/datafusion/src/physical_plan/hash_aggregate.rs ########## @@ -315,6 +318,41 @@ fn group_aggregate_batch( Ok(accumulators) } +async fn compute_grouped_hash_aggregate( Review comment: The main change in this file is to pull the computation of the output record batch into a future (aka an async function). I think this formulation is still a natural way to express the logic (you can see each input batch being fed to `group_aggregate_batch`) ########## File path: rust/datafusion/src/physical_plan/hash_aggregate.rs ########## @@ -324,12 +362,24 @@ impl GroupedHashAggregateStream { aggr_expr: Vec<Arc<dyn AggregateExpr>>, input: SendableRecordBatchStream, ) -> Self { + let (tx, rx) = futures::channel::oneshot::channel(); + + let schema_clone = schema.clone(); + tokio::spawn(async move { Review comment: this spawns a separate task to do the actual aggregation (and drive the inputs, as needed, by calling `input.next().await`). Driving the input was not happening when using `future.into_stream` in the actual call to `poll_next` https://github.com/apache/arrow/pull/8553/files#diff-4dbe750a2836f6c723bc36ae39f2faf15e2706303f5dbb042ae00f1f36658c62L399 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org