mingmwang commented on code in PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#discussion_r1231718662
##########
datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs:
##########
@@ -649,36 +652,59 @@ impl BoundedAggregateStream {
})
.collect::<Result<Vec<_>>>()?;
- for group_idx in groups_with_rows {
- let group_state =
- &mut
self.aggr_state.ordered_group_states[*group_idx].group_state;
- let mut state_accessor =
- RowAccessor::new_from_layout(self.row_aggr_layout.clone());
- state_accessor.point_to(0,
group_state.aggregation_buffer.as_mut_slice());
- for idx in &group_state.indices {
- for (accumulator, values_array, filter_array) in izip!(
- self.row_accumulators.iter_mut(),
- row_values.iter(),
- filter_bool_array.iter()
- ) {
- if values_array.len() == 1 {
- let scalar_value =
- col_to_scalar(&values_array[0], filter_array, *idx
as usize)?;
- accumulator.update_scalar(&scalar_value, &mut
state_accessor)?;
- } else {
- let scalar_values = values_array
- .iter()
- .map(|array| {
- col_to_scalar(array, filter_array, *idx as
usize)
- })
- .collect::<Result<Vec<_>>>()?;
- accumulator
- .update_scalar_values(&scalar_values, &mut
state_accessor)?;
+ let mut single_value_acc_idx = vec![];
+ let mut single_row_acc_idx = vec![];
+ self.row_accumulators
+ .iter()
+ .zip(row_values.iter())
+ .enumerate()
+ .for_each(|(idx, (acc, values))| {
+ if let RowAccumulatorItem::COUNT(_) = acc {
+ single_row_acc_idx.push(idx);
+ } else if values.len() == 1 {
+ single_value_acc_idx.push(idx);
+ } else {
+ single_row_acc_idx.push(idx);
+ };
+ });
+
+ if single_value_acc_idx.len() == 1 && single_row_acc_idx.is_empty() {
+ let acc_idx1 = single_value_acc_idx[0];
+ let array1 = &row_values[acc_idx1][0];
+ let array1_dt = array1.data_type();
+ dispatch_all_supported_data_types! {
impl_one_row_accumulator_dispatch, array1_dt, array1, acc_idx1, self,
update_one_accumulator_with_native_value, groups_with_rows, filter_bool_array}
+ } else if single_value_acc_idx.len() == 2 &&
single_row_acc_idx.is_empty() {
+ let acc_idx1 = single_value_acc_idx[0];
+ let acc_idx2 = single_value_acc_idx[1];
+ let array1 = &row_values[acc_idx1][0];
+ let array2 = &row_values[acc_idx2][0];
+ let array1_dt = array1.data_type();
+ let array2_dt = array2.data_type();
+ dispatch_all_supported_data_types_pairs! {
impl_two_row_accumulators_dispatch, array1_dt, array2_dt, array1, array2,
acc_idx1, acc_idx2, self, update_two_accumulator2_with_native_value,
groups_with_rows, filter_bool_array}
+ } else {
+ for group_idx in groups_with_rows {
+ let group_state =
+ &mut
self.aggr_state.ordered_group_states[*group_idx].group_state;
+ let mut state_accessor =
+ RowAccessor::new_from_layout(self.row_aggr_layout.clone());
+ state_accessor.point_to(0,
group_state.aggregation_buffer.as_mut_slice());
+ for idx in &group_state.indices {
+ for (accumulator, values_array, filter_array) in izip!(
+ self.row_accumulators.iter_mut(),
+ row_values.iter(),
+ filter_bool_array.iter()
+ ) {
+ accumulator.update_single_row(
+ values_array,
+ filter_array,
+ *idx as usize,
+ &mut state_accessor,
+ )?;
}
Review Comment:
> As written, doesn't this code do all the type dispatch for for each `idx`
in this loop?
>
> Would it be possible to pass in `group_state.indices` just once so the
dispatch is done just once?
>
> Something like this (would need the traits / etc updated)
I will try this approach as you suggested. But even with this change, the
type dispatch overhead is still heavy because it is applied for every update
groups with rows.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]