Rachelint commented on code in PR #11943:
URL: https://github.com/apache/datafusion/pull/11943#discussion_r1730419416


##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -529,10 +552,53 @@ impl GroupedHashAggregateStream {
             spill_state,
             group_values_soft_limit: agg.limit,
             skip_aggregation_probe,
+            enable_blocked_group_states,
         })
     }
 }
 
+/// Check if we can enable the blocked optimization for `GroupValues` and 
`GroupsAccumulator`s.
+/// The blocked optimization will be enabled when:
+///   - It is not streaming aggregation(because blocked mode can't support 
Emit::first(exact n))
+///   - The spilling is disabled(still need to consider more to support it 
efficiently)
+///   - The accumulator is not empty(I am still not sure about logic in this 
case)
+///   - `GroupValues` and all `GroupsAccumulator`s support blocked mode
+// TODO: support blocked optimization in streaming, spilling, and maybe empty 
accumulators case?
+fn maybe_enable_blocked_group_states(
+    context: &TaskContext,
+    group_values: &mut dyn GroupValues,
+    accumulators: &mut [Box<dyn GroupsAccumulator>],
+    block_size: usize,
+    group_ordering: &GroupOrdering,
+) -> Result<bool> {
+    if !matches!(group_ordering, GroupOrdering::None)
+        || accumulators.is_empty()
+        || enable_spilling(context.memory_pool().as_ref())
+    {
+        return Ok(false);
+    }
+
+    let group_supports_blocked = group_values.supports_blocked_mode();
+    let accumulators_support_blocked =
+        accumulators.iter().all(|acc| acc.supports_blocked_mode());
+
+    match (group_supports_blocked, accumulators_support_blocked) {
+        (true, true) => {
+            group_values.switch_to_mode(GroupStatesMode::Blocked(block_size))?;
+            accumulators.iter_mut().try_for_each(|acc| {
+                acc.switch_to_mode(GroupStatesMode::Blocked(block_size))
+            })?;
+            Ok(true)
+        }
+        _ => Ok(false),
+    }
+}
+
+// TODO: we should add a function(like `name`) to distinguish different memory 
pools.
+fn enable_spilling(memory_pool: &dyn MemoryPool) -> bool {
+    !format!("{memory_pool:?}").contains("UnboundedMemoryPool")

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to