alamb commented on code in PR #10527:
URL: https://github.com/apache/datafusion/pull/10527#discussion_r1606615253


##########
datafusion/optimizer/src/single_distinct_to_groupby.rs:
##########
@@ -131,177 +126,190 @@ fn contains_grouping_set(expr: &[Expr]) -> bool {
 impl OptimizerRule for SingleDistinctToGroupBy {
     fn try_optimize(
         &self,
-        plan: &LogicalPlan,
+        _plan: &LogicalPlan,
         _config: &dyn OptimizerConfig,
     ) -> Result<Option<LogicalPlan>> {
+        internal_err!("Should have called SingleDistinctToGroupBy::rewrite")
+    }
+
+    fn name(&self) -> &str {
+        "single_distinct_aggregation_to_group_by"
+    }
+
+    fn apply_order(&self) -> Option<ApplyOrder> {
+        Some(ApplyOrder::TopDown)
+    }
+
+    fn supports_rewrite(&self) -> bool {
+        true
+    }
+
+    fn rewrite(
+        &self,
+        plan: LogicalPlan,
+        _config: &dyn OptimizerConfig,
+    ) -> Result<Transformed<LogicalPlan>, DataFusionError> {
         match plan {
             LogicalPlan::Aggregate(Aggregate {
                 input,
                 aggr_expr,
                 schema,
                 group_expr,
                 ..
-            }) => {
-                if is_single_distinct_agg(plan)? && 
!contains_grouping_set(group_expr) {
-                    // alias all original group_by exprs
-                    let (mut inner_group_exprs, out_group_expr_with_alias): (
-                        Vec<Expr>,
-                        Vec<(Expr, Option<String>)>,
-                    ) = group_expr
-                        .iter()
-                        .enumerate()
-                        .map(|(i, group_expr)| {
-                            if let Expr::Column(_) = group_expr {
-                                // For Column expressions we can use existing 
expression as is.
-                                (group_expr.clone(), (group_expr.clone(), 
None))
-                            } else {
-                                // For complex expression write is as alias, 
to be able to refer
-                                // if from parent operators successfully.
-                                // Consider plan below.
-                                //
-                                // Aggregate: groupBy=[[group_alias_0]], 
aggr=[[COUNT(alias1)]] [group_alias_0:Int32, COUNT(alias1):Int64;N]\
-                                // --Aggregate: groupBy=[[test.a + Int32(1) AS 
group_alias_0, test.c AS alias1]], aggr=[[]] [group_alias_0:Int32, 
alias1:UInt32]\
-                                // ----TableScan: test [a:UInt32, b:UInt32, 
c:UInt32]
-                                //
-                                // First aggregate(from bottom) refers to 
`test.a` column.
-                                // Second aggregate refers to the 
`group_alias_0` column, Which is a valid field in the first aggregate.
-                                // If we were to write plan above as below 
without alias
-                                //
-                                // Aggregate: groupBy=[[test.a + Int32(1)]], 
aggr=[[COUNT(alias1)]] [group_alias_0:Int32, COUNT(alias1):Int64;N]\
-                                // --Aggregate: groupBy=[[test.a + Int32(1), 
test.c AS alias1]], aggr=[[]] [group_alias_0:Int32, alias1:UInt32]\
-                                // ----TableScan: test [a:UInt32, b:UInt32, 
c:UInt32]
-                                //
-                                // Second aggregate refers to the `test.a + 
Int32(1)` expression However, its input do not have `test.a` expression in it.
-                                let alias_str = format!("group_alias_{i}");
-                                let alias_expr = 
group_expr.clone().alias(&alias_str);
-                                let (qualifier, field) = 
schema.qualified_field(i);
+            }) if is_single_distinct_agg(&aggr_expr)?
+                && !contains_grouping_set(&group_expr) =>
+            {
+                let group_size = group_expr.len();
+                // alias all original group_by exprs
+                let (mut inner_group_exprs, out_group_expr_with_alias): (
+                    Vec<Expr>,
+                    Vec<(Expr, Option<String>)>,
+                ) = group_expr
+                    .into_iter()
+                    .enumerate()
+                    .map(|(i, group_expr)| {
+                        if let Expr::Column(_) = group_expr {
+                            // For Column expressions we can use existing 
expression as is.
+                            (group_expr.clone(), (group_expr, None))
+                        } else {
+                            // For complex expression write is as alias, to be 
able to refer
+                            // if from parent operators successfully.
+                            // Consider plan below.
+                            //
+                            // Aggregate: groupBy=[[group_alias_0]], 
aggr=[[COUNT(alias1)]] [group_alias_0:Int32, COUNT(alias1):Int64;N]\
+                            // --Aggregate: groupBy=[[test.a + Int32(1) AS 
group_alias_0, test.c AS alias1]], aggr=[[]] [group_alias_0:Int32, 
alias1:UInt32]\
+                            // ----TableScan: test [a:UInt32, b:UInt32, 
c:UInt32]
+                            //
+                            // First aggregate(from bottom) refers to `test.a` 
column.
+                            // Second aggregate refers to the `group_alias_0` 
column, Which is a valid field in the first aggregate.
+                            // If we were to write plan above as below without 
alias
+                            //
+                            // Aggregate: groupBy=[[test.a + Int32(1)]], 
aggr=[[COUNT(alias1)]] [group_alias_0:Int32, COUNT(alias1):Int64;N]\
+                            // --Aggregate: groupBy=[[test.a + Int32(1), 
test.c AS alias1]], aggr=[[]] [group_alias_0:Int32, alias1:UInt32]\
+                            // ----TableScan: test [a:UInt32, b:UInt32, 
c:UInt32]
+                            //
+                            // Second aggregate refers to the `test.a + 
Int32(1)` expression However, its input do not have `test.a` expression in it.
+                            let alias_str = format!("group_alias_{i}");
+                            let (qualifier, field) = schema.qualified_field(i);
+                            (
+                                group_expr.alias(alias_str.clone()),
                                 (
-                                    alias_expr,
-                                    (
-                                        col(alias_str),
-                                        Some(qualified_name(qualifier, 
field.name())),
-                                    ),
-                                )
-                            }
-                        })
-                        .unzip();
-
-                    // and they can be referenced by the alias in the outer 
aggr plan
-                    let outer_group_exprs = out_group_expr_with_alias
-                        .iter()
-                        .map(|(out_group_expr, _)| out_group_expr.clone())
-                        .collect::<Vec<_>>();
-
-                    // replace the distinct arg with alias
-                    let mut index = 1;
-                    let mut group_fields_set = HashSet::new();
-                    let mut inner_aggr_exprs = vec![];
-                    let outer_aggr_exprs = aggr_expr
-                        .iter()
-                        .map(|aggr_expr| match aggr_expr {
-                            Expr::AggregateFunction(AggregateFunction {
-                                func_def: 
AggregateFunctionDefinition::BuiltIn(fun),
-                                args,
-                                distinct,
-                                ..
-                            }) => {
-                                // is_single_distinct_agg ensure args.len=1
-                                if *distinct
-                                    && 
group_fields_set.insert(args[0].display_name()?)
-                                {
-                                    inner_group_exprs.push(
-                                        
args[0].clone().alias(SINGLE_DISTINCT_ALIAS),
-                                    );
+                                    col(alias_str),
+                                    Some(qualified_name(qualifier, 
field.name())),
+                                ),
+                            )
+                        }
+                    })
+                    .unzip();
+
+                // replace the distinct arg with alias
+                let mut index = 1;
+                let mut distinct_aggr_exprs = HashSet::new();
+                let mut inner_aggr_exprs = vec![];
+                let outer_aggr_exprs = aggr_expr
+                    .into_iter()
+                    .map(|aggr_expr| match aggr_expr {
+                        Expr::AggregateFunction(AggregateFunction {
+                            func_def: 
AggregateFunctionDefinition::BuiltIn(fun),
+                            mut args,
+                            distinct,
+                            ..
+                        }) => {
+                            if distinct {
+                                debug_assert_eq!(

Review Comment:
   Can we please change this from `debug_assert_eq` (which will panic) to an 
`internal_err!`?



##########
datafusion/optimizer/src/single_distinct_to_groupby.rs:
##########
@@ -131,177 +126,190 @@ fn contains_grouping_set(expr: &[Expr]) -> bool {
 impl OptimizerRule for SingleDistinctToGroupBy {
     fn try_optimize(
         &self,
-        plan: &LogicalPlan,
+        _plan: &LogicalPlan,
         _config: &dyn OptimizerConfig,
     ) -> Result<Option<LogicalPlan>> {
+        internal_err!("Should have called SingleDistinctToGroupBy::rewrite")
+    }
+
+    fn name(&self) -> &str {
+        "single_distinct_aggregation_to_group_by"
+    }
+
+    fn apply_order(&self) -> Option<ApplyOrder> {
+        Some(ApplyOrder::TopDown)
+    }
+
+    fn supports_rewrite(&self) -> bool {
+        true
+    }
+
+    fn rewrite(
+        &self,
+        plan: LogicalPlan,
+        _config: &dyn OptimizerConfig,
+    ) -> Result<Transformed<LogicalPlan>, DataFusionError> {
         match plan {
             LogicalPlan::Aggregate(Aggregate {
                 input,
                 aggr_expr,
                 schema,
                 group_expr,
                 ..
-            }) => {
-                if is_single_distinct_agg(plan)? && 
!contains_grouping_set(group_expr) {
-                    // alias all original group_by exprs
-                    let (mut inner_group_exprs, out_group_expr_with_alias): (
-                        Vec<Expr>,
-                        Vec<(Expr, Option<String>)>,
-                    ) = group_expr
-                        .iter()
-                        .enumerate()
-                        .map(|(i, group_expr)| {
-                            if let Expr::Column(_) = group_expr {
-                                // For Column expressions we can use existing 
expression as is.
-                                (group_expr.clone(), (group_expr.clone(), 
None))
-                            } else {
-                                // For complex expression write is as alias, 
to be able to refer
-                                // if from parent operators successfully.
-                                // Consider plan below.
-                                //
-                                // Aggregate: groupBy=[[group_alias_0]], 
aggr=[[COUNT(alias1)]] [group_alias_0:Int32, COUNT(alias1):Int64;N]\
-                                // --Aggregate: groupBy=[[test.a + Int32(1) AS 
group_alias_0, test.c AS alias1]], aggr=[[]] [group_alias_0:Int32, 
alias1:UInt32]\
-                                // ----TableScan: test [a:UInt32, b:UInt32, 
c:UInt32]
-                                //
-                                // First aggregate(from bottom) refers to 
`test.a` column.
-                                // Second aggregate refers to the 
`group_alias_0` column, Which is a valid field in the first aggregate.
-                                // If we were to write plan above as below 
without alias
-                                //
-                                // Aggregate: groupBy=[[test.a + Int32(1)]], 
aggr=[[COUNT(alias1)]] [group_alias_0:Int32, COUNT(alias1):Int64;N]\
-                                // --Aggregate: groupBy=[[test.a + Int32(1), 
test.c AS alias1]], aggr=[[]] [group_alias_0:Int32, alias1:UInt32]\
-                                // ----TableScan: test [a:UInt32, b:UInt32, 
c:UInt32]
-                                //
-                                // Second aggregate refers to the `test.a + 
Int32(1)` expression However, its input do not have `test.a` expression in it.
-                                let alias_str = format!("group_alias_{i}");
-                                let alias_expr = 
group_expr.clone().alias(&alias_str);
-                                let (qualifier, field) = 
schema.qualified_field(i);
+            }) if is_single_distinct_agg(&aggr_expr)?
+                && !contains_grouping_set(&group_expr) =>
+            {
+                let group_size = group_expr.len();
+                // alias all original group_by exprs
+                let (mut inner_group_exprs, out_group_expr_with_alias): (
+                    Vec<Expr>,
+                    Vec<(Expr, Option<String>)>,
+                ) = group_expr
+                    .into_iter()
+                    .enumerate()
+                    .map(|(i, group_expr)| {
+                        if let Expr::Column(_) = group_expr {
+                            // For Column expressions we can use existing 
expression as is.
+                            (group_expr.clone(), (group_expr, None))
+                        } else {
+                            // For complex expression write is as alias, to be 
able to refer
+                            // if from parent operators successfully.
+                            // Consider plan below.
+                            //
+                            // Aggregate: groupBy=[[group_alias_0]], 
aggr=[[COUNT(alias1)]] [group_alias_0:Int32, COUNT(alias1):Int64;N]\
+                            // --Aggregate: groupBy=[[test.a + Int32(1) AS 
group_alias_0, test.c AS alias1]], aggr=[[]] [group_alias_0:Int32, 
alias1:UInt32]\
+                            // ----TableScan: test [a:UInt32, b:UInt32, 
c:UInt32]
+                            //
+                            // First aggregate(from bottom) refers to `test.a` 
column.
+                            // Second aggregate refers to the `group_alias_0` 
column, Which is a valid field in the first aggregate.
+                            // If we were to write plan above as below without 
alias
+                            //
+                            // Aggregate: groupBy=[[test.a + Int32(1)]], 
aggr=[[COUNT(alias1)]] [group_alias_0:Int32, COUNT(alias1):Int64;N]\
+                            // --Aggregate: groupBy=[[test.a + Int32(1), 
test.c AS alias1]], aggr=[[]] [group_alias_0:Int32, alias1:UInt32]\
+                            // ----TableScan: test [a:UInt32, b:UInt32, 
c:UInt32]
+                            //
+                            // Second aggregate refers to the `test.a + 
Int32(1)` expression However, its input do not have `test.a` expression in it.
+                            let alias_str = format!("group_alias_{i}");
+                            let (qualifier, field) = schema.qualified_field(i);
+                            (
+                                group_expr.alias(alias_str.clone()),
                                 (
-                                    alias_expr,
-                                    (
-                                        col(alias_str),
-                                        Some(qualified_name(qualifier, 
field.name())),
-                                    ),
-                                )
-                            }
-                        })
-                        .unzip();
-
-                    // and they can be referenced by the alias in the outer 
aggr plan
-                    let outer_group_exprs = out_group_expr_with_alias
-                        .iter()
-                        .map(|(out_group_expr, _)| out_group_expr.clone())
-                        .collect::<Vec<_>>();
-
-                    // replace the distinct arg with alias
-                    let mut index = 1;
-                    let mut group_fields_set = HashSet::new();
-                    let mut inner_aggr_exprs = vec![];
-                    let outer_aggr_exprs = aggr_expr
-                        .iter()
-                        .map(|aggr_expr| match aggr_expr {
-                            Expr::AggregateFunction(AggregateFunction {
-                                func_def: 
AggregateFunctionDefinition::BuiltIn(fun),
-                                args,
-                                distinct,
-                                ..
-                            }) => {
-                                // is_single_distinct_agg ensure args.len=1
-                                if *distinct
-                                    && 
group_fields_set.insert(args[0].display_name()?)
-                                {
-                                    inner_group_exprs.push(
-                                        
args[0].clone().alias(SINGLE_DISTINCT_ALIAS),
-                                    );
+                                    col(alias_str),
+                                    Some(qualified_name(qualifier, 
field.name())),
+                                ),
+                            )
+                        }
+                    })
+                    .unzip();
+
+                // replace the distinct arg with alias
+                let mut index = 1;
+                let mut distinct_aggr_exprs = HashSet::new();
+                let mut inner_aggr_exprs = vec![];
+                let outer_aggr_exprs = aggr_expr
+                    .into_iter()
+                    .map(|aggr_expr| match aggr_expr {
+                        Expr::AggregateFunction(AggregateFunction {
+                            func_def: 
AggregateFunctionDefinition::BuiltIn(fun),
+                            mut args,
+                            distinct,
+                            ..
+                        }) => {
+                            if distinct {
+                                debug_assert_eq!(
+                                    args.len(),
+                                    1,
+                                    "DISTINCT aggregate should have exactly 
one argument"
+                                );
+                                let arg = args.swap_remove(0);
+
+                                let expr_id = 
distinct_aggr_exprs.hasher().hash_one(&arg);
+                                if distinct_aggr_exprs.insert(expr_id) {

Review Comment:
   This code appears to deduplicate distinct aggregate expressions by comparing 
their hash values rather than by equality. While expressions that hash to the 
same value are very likely the same expression, this isn't guaranteed I don't 
think. 
   
   I think for correctness the distinct_agg_exprs should hold the arguments by 
value -- e.g. `distinct_agg_exprs.insert(arg.clone())` or use the display name 
as in the previous version
   
   I realize this is another clone, but I think it is required for correctness
   
   In general finding some way to compute a digest / fingerprint of an 
expression that can be used for equality would be great, but maybe it should be 
done as a separate project. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to