alamb commented on code in PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#discussion_r1175324013


##########
datafusion-examples/examples/custom_datasource.rs:
##########
@@ -217,7 +217,7 @@ impl ExecutionPlan for CustomExec {
         datafusion::physical_plan::Partitioning::UnknownPartitioning(1)
     }
 
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+    fn output_ordering(&self) -> Option<Vec<PhysicalSortExpr>> {

Review Comment:
   Why does this signature need to change? The new signature requires an 
allocation / `clone` of a Vec where the previous one didn't and thus this seems 
to change the API for the worse.
   
   If it is to support a calculated output in the grouping perhaps we can 
calculate the output ordering once in the constructor rather than on demand.
   
   ```
   
       fn output_ordering(&self) -> Option<Vec<PhysicalSortExpr>> {
           self.calc_aggregation_ordering().map(|state| state.ordering)
       }
   ```



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -591,38 +785,93 @@ impl GroupedHashAggregateStream {
                 }
                 let batch_indices = batch_indices.finish();
 
-                let row_values = get_at_indices(&row_aggr_input_values, 
&batch_indices)?;
-                let normal_values =
-                    get_at_indices(&normal_aggr_input_values, &batch_indices)?;
                 let row_filter_values =
                     get_optional_filters(&row_filter_values, &batch_indices);
                 let normal_filter_values =
                     get_optional_filters(&normal_filter_values, 
&batch_indices);
-                self.update_accumulators_using_batch(
-                    &groups_with_rows,
-                    &offsets,
-                    &row_values,
-                    &normal_values,
-                    &row_filter_values,
-                    &normal_filter_values,
-                    &mut allocated,
-                )?;
+                if self
+                    .aggregation_ordering
+                    .as_ref()
+                    .map_or(false, |s| s.mode == GroupByOrderMode::Ordered)
+                {
+                    self.update_accumulators_using_batch(
+                        &groups_with_rows,
+                        &offsets,
+                        &row_aggr_input_values,
+                        &normal_aggr_input_values,
+                        &row_filter_values,
+                        &normal_filter_values,
+                        &mut allocated,
+                    )?;
+                } else {
+                    let row_values =
+                        get_at_indices(&row_aggr_input_values, 
&batch_indices)?;
+                    let normal_values =
+                        get_at_indices(&normal_aggr_input_values, 
&batch_indices)?;
+                    self.update_accumulators_using_batch(
+                        &groups_with_rows,
+                        &offsets,
+                        &row_values,
+                        &normal_values,
+                        &row_filter_values,
+                        &normal_filter_values,
+                        &mut allocated,
+                    )?;
+                };
             }
         }
         allocated += self
             .row_converter
             .size()
             .saturating_sub(row_converter_size_pre);
+
+        if self.aggregation_ordering.is_some() {
+            let mut new_result = false;
+            let last_ordered_columns = self
+                .aggr_state
+                .group_states
+                .last()
+                .map(|item| item.ordered_columns.clone());
+
+            if let Some(last_ordered_columns) = last_ordered_columns {

Review Comment:
   I may be mis understanding this code, but it seems like it is tracking 
per-group if the group can be emitted or not.  As I understand The `“Partial 
Streaming” / “Partitioned Streaming”` section of 
https://docs.google.com/document/d/16rm5VR1nGkY6DedMCh1NUmThwf3RduAweaBH9b1h6AY/edit#heading=h.uapxuhfa9wyi
   
   The entire hash table could be flushed each time a new value of date is seen:
   
   ![Screenshot 2023-04-24 at 10 08 15 
AM](https://user-images.githubusercontent.com/490673/234021622-3d2eb19f-153c-4ba8-889c-5efd99c80424.png)
   
   Perhaps with the obvious vectorization of only checking on record batch 
boundaries, or something
   
   



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -313,22 +331,144 @@ impl RecordBatchStream for GroupedHashAggregateStream {
     }
 }
 
+/// This utility object encapsulates the row object, the hash and the group
+/// indices for a group. This information is used when executing streaming
+/// GROUP BY calculations.
+struct GroupOrderInfo {
+    owned_row: OwnedRow,
+    hash: u64,
+    range: Range<usize>,
+}
+
 impl GroupedHashAggregateStream {
-    // Update the row_aggr_state according to groub_by values (result of 
group_by_expressions)
+    // Update the aggr_state according to groub_by values (result of 
group_by_expressions) when group by

Review Comment:
   minor nit: `groub_by` --> `group_by`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to