Rachelint commented on code in PR #11943:
URL: https://github.com/apache/datafusion/pull/11943#discussion_r1728467091


##########
datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs:
##########
@@ -92,32 +101,69 @@ where
         opt_filter: Option<&BooleanArray>,
         total_num_groups: usize,
     ) -> Result<()> {
+        if total_num_groups == 0 {
+            return Ok(());
+        }
+
         assert_eq!(values.len(), 1, "single argument to update_batch");
         let values = values[0].as_primitive::<T>();
 
-        // update values
-        self.values.resize(total_num_groups, self.starting_value);
-
         // NullState dispatches / handles tracking nulls and groups that saw 
no values
-        self.null_state.accumulate(
-            group_indices,
-            values,
-            opt_filter,
-            total_num_groups,
-            |group_index, new_value| {
-                let value = &mut self.values[group_index];
-                (self.prim_fn)(value, new_value);
-            },
-        );
+        match self.mode {
+            GroupStatesMode::Flat => {
+                // Ensure enough room in values
+                ensure_enough_room_for_flat_values(
+                    &mut self.values_blocks,
+                    total_num_groups,
+                    self.starting_value,
+                );
+
+                let block = self.values_blocks.current_mut().unwrap();
+                self.null_state.accumulate_for_flat(
+                    group_indices,
+                    values,
+                    opt_filter,
+                    total_num_groups,
+                    |group_index, new_value| {
+                        let value = &mut block[group_index];
+                        (self.prim_fn)(value, new_value);
+                    },
+                );
+            }
+            GroupStatesMode::Blocked(blk_size) => {

Review Comment:
   - For the `Result merging problem`. I think we shouldn't allow it to happen, 
if one of accumulators don't support `blocked`, all accumulators should disable 
this.
   
   - For `group indcies`, maybe we can modify the format to:
   ```
   // Blocked mode
   | group index mode(1bit) |  block id (31bit) | block offset (32bit) |
   
   // Flat mode
   | group index mode(1bit) |  flat offset (63bit) |
   ```
   
   - For switching, maybe we expose the `alter_block_size` as mentioned above?
   
   But as I tried in the early poc, changing the returned value of emission to 
`Vec<RecordBatch>`, and merging`ExecutionState::ProducingOutput` and 
`ExecutionState::ProducingBlocks` to one, may make the code complicated... 
   https://github.com/apache/datafusion/pull/11943#discussion_r1715843222



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to