Rachelint commented on code in PR #11943:
URL: https://github.com/apache/datafusion/pull/11943#discussion_r1728363433


##########
datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs:
##########
@@ -92,32 +101,69 @@ where
         opt_filter: Option<&BooleanArray>,
         total_num_groups: usize,
     ) -> Result<()> {
+        if total_num_groups == 0 {
+            return Ok(());
+        }
+
         assert_eq!(values.len(), 1, "single argument to update_batch");
         let values = values[0].as_primitive::<T>();
 
-        // update values
-        self.values.resize(total_num_groups, self.starting_value);
-
         // NullState dispatches / handles tracking nulls and groups that saw 
no values
-        self.null_state.accumulate(
-            group_indices,
-            values,
-            opt_filter,
-            total_num_groups,
-            |group_index, new_value| {
-                let value = &mut self.values[group_index];
-                (self.prim_fn)(value, new_value);
-            },
-        );
+        match self.mode {
+            GroupStatesMode::Flat => {
+                // Ensure enough room in values
+                ensure_enough_room_for_flat_values(
+                    &mut self.values_blocks,
+                    total_num_groups,
+                    self.starting_value,
+                );
+
+                let block = self.values_blocks.current_mut().unwrap();
+                self.null_state.accumulate_for_flat(
+                    group_indices,
+                    values,
+                    opt_filter,
+                    total_num_groups,
+                    |group_index, new_value| {
+                        let value = &mut block[group_index];
+                        (self.prim_fn)(value, new_value);
+                    },
+                );
+            }
+            GroupStatesMode::Blocked(blk_size) => {

Review Comment:
   It may be possible, because flat mode is actully a special case of blocked 
mode(only one big single block), and I made some tries in the early POC, but 
there may be some difficult points needing to overcome before switching to this:
   
   - Result merging problem.In the same query, if some accumulators have 
supported blocked mode, and emit `Vec<X>`, but others still not support and 
return the big `X`, how can we merge them? 
   
   - `group indcies` parsing problem. the parsings for `group indices` are 
difference in` flat` and `blocked` mode now. If we only use the `blocked style 
indices`(high 32 for id, low 32 for offset), when fallback to single block 
case(`flat mode`), we will always set block id to 0, and use the block offset 
only, and the max available entries will be restricted to u32::MAX, is it 
enough?
   
   - The need of switching from `hash aggr` to `streaming aggr`. It is 
expansive to support `Emit::first(exact n)` which is needed in `streaming aggr` 
in `blocked emission`, so we need to disable this optimization in it(I think it 
indeed make few benefits for `streaming aggr`). However, when spilling to disk 
is triggered in final phase, the aggr mode will be switched to `streaming aggr` 
during reading back the spilled batches to merge, and we need to swith `hash 
aggr` to `streaming aggr` if it is, and we must switch the `blocked mode` to 
`flat mode` of accumulators because it can't support `streaming aggr` well...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to