alamb commented on code in PR #10208:
URL: https://github.com/apache/datafusion/pull/10208#discussion_r1608905520


##########
datafusion/physical-expr/src/aggregate/grouping.rs:
##########
@@ -96,8 +113,172 @@ impl PartialEq<dyn Any> for Grouping {
                 self.name == x.name
                     && self.data_type == x.data_type
                     && self.nullable == x.nullable
-                    && self.expr.eq(&x.expr)
+                    && self.exprs.len() == x.exprs.len()
+                    && self
+                        .exprs
+                        .iter()
+                        .zip(x.exprs.iter())
+                        .all(|(expr1, expr2)| expr1.eq(expr2))
             })
             .unwrap_or(false)
     }
 }
+
+#[derive(Debug)]
+struct GroupingGroupsAccumulator {
+    /// Grouping columns' indices in grouping set
+    indices: Vec<usize>,
+
+    /// Mask per group.
+    ///
+    /// Note this is an i32 and not a u32 (or usize) because the
+    /// output type of grouping is `DataType::Int32`. Thus by using `i32`
+    /// for the grouping, the output [`Int32Array`] can be created
+    /// without copy.
+    masks: Vec<i32>,
+}
+
+impl GroupingGroupsAccumulator {
+    pub fn new(
+        grouping_exprs: &[Arc<dyn PhysicalExpr>],
+        group_by_exprs: &[(Arc<dyn PhysicalExpr>, String)],
+    ) -> Result<Self> {
+        macro_rules! downcast_column {
+            ($EXPR:expr) => {{
+                if let Some(column) = $EXPR.as_any().downcast_ref::<Column>() {
+                    column
+                } else {
+                    return Err(DataFusionError::Execution(
+                        "Grouping only supports grouping set which only 
contains Column Expr".to_string(),
+                    ));
+                }
+            }}
+        }
+
+        // collect column indices of group_by_exprs, only Column Expr
+        let mut group_by_column_indices = 
Vec::with_capacity(group_by_exprs.len());
+        for (group_by_expr, _) in group_by_exprs.iter() {
+            let column = downcast_column!(group_by_expr);
+            group_by_column_indices.push(column.index());
+        }
+
+        // collect grouping_exprs' indices in group_by_exprs list, eg:
+        // SQL: SELECT c1, c2, grouping(c2, c1) FROM t GROUP BY ROLLUP(c1, c2);
+        // group_by_exprs: [c1, c2]
+        // grouping_exprs: [c2, c1]
+        // indices: [1, 0]
+        let mut indices = Vec::with_capacity(grouping_exprs.len());
+        for grouping_expr in grouping_exprs {
+            let column = downcast_column!(grouping_expr);
+            indices.push(find_grouping_column_index(
+                &group_by_column_indices,
+                column.index(),
+            )?);
+        }
+
+        Ok(Self {
+            indices,
+            masks: vec![],
+        })
+    }
+}
+
+fn find_grouping_column_index(
+    group_by_column_indices: &[usize],
+    grouping_column_index: usize,
+) -> Result<usize> {
+    for (i, group_by_column_index) in 
group_by_column_indices.iter().enumerate() {
+        if grouping_column_index == *group_by_column_index {
+            return Ok(i);
+        }
+    }
+    Err(DataFusionError::Execution(
+        "Not found grouping column in group by columns".to_string(),
+    ))
+}
+
+fn compute_mask(indices: &[usize], grouping_set: &[bool]) -> i32 {
+    let mut mask = 0;
+    for (i, index) in indices.iter().rev().enumerate() {
+        if grouping_set[*index] {
+            mask |= 1 << i;
+        }
+    }
+    mask
+}
+
+impl GroupsAccumulator for GroupingGroupsAccumulator {

Review Comment:
   I agree having some special case simply for the `grouping` aggregate that 
forces changes on all other aggregates isn't ideal
   
   > When calling update_batch, we need to know the information of the current 
grouping set, so we need to add a parameter to update_batch
   
   After reading  https://www.postgresql.org/docs/9.5/functions-aggregate.html 
I see that `grouping` is basically a special case that only makes sense in the 
context of grouping set (it provides some context into the grouping set).
   
   Given it is so special, I wonder if we could special case it somehow 🤔 
   
   One thing maybe we could do is to add another signature?
   
   ```
   trait `GroupsAccumulator`  {
   ...
       /// Called with the information with what grouping set this batch 
belongs to.
       /// The default implementaiton calls `Self::update_batch` and ignores 
the grouping_set
       fn update_grouping_batch(
           &mut self,
           _values: &[ArrayRef],
           group_indices: &[usize],
           opt_filter: Option<&arrow_array::BooleanArray>,
           total_num_groups: usize,
           grouping_set: &[bool],
       ) -> Result<()> {
         self.update_batch(_values, group_indices, opt_filter, total_num_groups)
       }
   ...
   ```
   
   
   And then we could make it clear in the documentation that the agregator 
calls `update_group_batch` but that most implementations can just implement 
`update_batch` 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to