Rachelint commented on code in PR #11943: URL: https://github.com/apache/datafusion/pull/11943#discussion_r1728363433
########## datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs: ########## @@ -92,32 +101,69 @@ where opt_filter: Option<&BooleanArray>, total_num_groups: usize, ) -> Result<()> { + if total_num_groups == 0 { + return Ok(()); + } + assert_eq!(values.len(), 1, "single argument to update_batch"); let values = values[0].as_primitive::<T>(); - // update values - self.values.resize(total_num_groups, self.starting_value); - // NullState dispatches / handles tracking nulls and groups that saw no values - self.null_state.accumulate( - group_indices, - values, - opt_filter, - total_num_groups, - |group_index, new_value| { - let value = &mut self.values[group_index]; - (self.prim_fn)(value, new_value); - }, - ); + match self.mode { + GroupStatesMode::Flat => { + // Ensure enough room in values + ensure_enough_room_for_flat_values( + &mut self.values_blocks, + total_num_groups, + self.starting_value, + ); + + let block = self.values_blocks.current_mut().unwrap(); + self.null_state.accumulate_for_flat( + group_indices, + values, + opt_filter, + total_num_groups, + |group_index, new_value| { + let value = &mut block[group_index]; + (self.prim_fn)(value, new_value); + }, + ); + } + GroupStatesMode::Blocked(blk_size) => { Review Comment: It may be possible, because flat mode is actully a special case of blocked mode(only one big single block), and I made some tries in the early POC, but there may be some difficult points needing to overcome before switching to this: - Result merging problem.In the same query, if some accumulators have supported blocked mode, and emit `Vec<X>`, but others still not support and return the big `X`, how can we merge them? - `group indcies` parsing problem. the parsings for `group indices` are difference in` flat` and `blocked` mode now. If we only use the `blocked style indices`(high 32 for id, low 32 for offset), when fallback to single block case(`flat mode`), we will always set block id to 0, and use the block offset only, and the max available entries will be restricted to u32::MAX, is it enough? - The need of switching from `hash aggr` to `streaming aggr`. It is expansive to support `Emit::first(exact n)` which is needed in `streaming aggr` in `blocked emission`, so we need to disable this optimization in it(I think it indeed make few benefits for `streaming aggr`). However, when spilling to disk is triggered in final phase, the aggr mode will be switched to `streaming aggr` during reading back the spilled batches to merge, and we need to swith `hash aggr` to `streaming aggr` if it is, and we must switch the `blocked mode` to `flat mode` of accumulators because it can't support `streaming aggr` well... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org