mingmwang commented on code in PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#discussion_r1231718662


##########
datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs:
##########
@@ -649,36 +652,59 @@ impl BoundedAggregateStream {
             })
             .collect::<Result<Vec<_>>>()?;
 
-        for group_idx in groups_with_rows {
-            let group_state =
-                &mut 
self.aggr_state.ordered_group_states[*group_idx].group_state;
-            let mut state_accessor =
-                RowAccessor::new_from_layout(self.row_aggr_layout.clone());
-            state_accessor.point_to(0, 
group_state.aggregation_buffer.as_mut_slice());
-            for idx in &group_state.indices {
-                for (accumulator, values_array, filter_array) in izip!(
-                    self.row_accumulators.iter_mut(),
-                    row_values.iter(),
-                    filter_bool_array.iter()
-                ) {
-                    if values_array.len() == 1 {
-                        let scalar_value =
-                            col_to_scalar(&values_array[0], filter_array, *idx 
as usize)?;
-                        accumulator.update_scalar(&scalar_value, &mut 
state_accessor)?;
-                    } else {
-                        let scalar_values = values_array
-                            .iter()
-                            .map(|array| {
-                                col_to_scalar(array, filter_array, *idx as 
usize)
-                            })
-                            .collect::<Result<Vec<_>>>()?;
-                        accumulator
-                            .update_scalar_values(&scalar_values, &mut 
state_accessor)?;
+        let mut single_value_acc_idx = vec![];
+        let mut single_row_acc_idx = vec![];
+        self.row_accumulators
+            .iter()
+            .zip(row_values.iter())
+            .enumerate()
+            .for_each(|(idx, (acc, values))| {
+                if let RowAccumulatorItem::COUNT(_) = acc {
+                    single_row_acc_idx.push(idx);
+                } else if values.len() == 1 {
+                    single_value_acc_idx.push(idx);
+                } else {
+                    single_row_acc_idx.push(idx);
+                };
+            });
+
+        if single_value_acc_idx.len() == 1 && single_row_acc_idx.is_empty() {
+            let acc_idx1 = single_value_acc_idx[0];
+            let array1 = &row_values[acc_idx1][0];
+            let array1_dt = array1.data_type();
+            dispatch_all_supported_data_types! { 
impl_one_row_accumulator_dispatch, array1_dt, array1, acc_idx1, self, 
update_one_accumulator_with_native_value, groups_with_rows, filter_bool_array}
+        } else if single_value_acc_idx.len() == 2 && 
single_row_acc_idx.is_empty() {
+            let acc_idx1 = single_value_acc_idx[0];
+            let acc_idx2 = single_value_acc_idx[1];
+            let array1 = &row_values[acc_idx1][0];
+            let array2 = &row_values[acc_idx2][0];
+            let array1_dt = array1.data_type();
+            let array2_dt = array2.data_type();
+            dispatch_all_supported_data_types_pairs! { 
impl_two_row_accumulators_dispatch, array1_dt, array2_dt, array1, array2, 
acc_idx1, acc_idx2, self, update_two_accumulator2_with_native_value, 
groups_with_rows, filter_bool_array}
+        } else {
+            for group_idx in groups_with_rows {
+                let group_state =
+                    &mut 
self.aggr_state.ordered_group_states[*group_idx].group_state;
+                let mut state_accessor =
+                    RowAccessor::new_from_layout(self.row_aggr_layout.clone());
+                state_accessor.point_to(0, 
group_state.aggregation_buffer.as_mut_slice());
+                for idx in &group_state.indices {
+                    for (accumulator, values_array, filter_array) in izip!(
+                        self.row_accumulators.iter_mut(),
+                        row_values.iter(),
+                        filter_bool_array.iter()
+                    ) {
+                        accumulator.update_single_row(
+                            values_array,
+                            filter_array,
+                            *idx as usize,
+                            &mut state_accessor,
+                        )?;
                     }

Review Comment:
   > As written, doesn't this code do all the type dispatch for for each `idx` 
in this loop?
   > 
   > Would it be possible to pass in `group_state.indices` just once so the 
dispatch is done just once?
   > 
   > Something like this (would need the traits / etc updated)
   
   I will try this approach as you suggested.  But even with this change, the 
type dispatch overhead is still heavy because it is applied for every update 
groups with rows.
   
   
   
    
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to