alamb commented on code in PR #10527: URL: https://github.com/apache/datafusion/pull/10527#discussion_r1606615253
########## datafusion/optimizer/src/single_distinct_to_groupby.rs: ########## @@ -131,177 +126,190 @@ fn contains_grouping_set(expr: &[Expr]) -> bool { impl OptimizerRule for SingleDistinctToGroupBy { fn try_optimize( &self, - plan: &LogicalPlan, + _plan: &LogicalPlan, _config: &dyn OptimizerConfig, ) -> Result<Option<LogicalPlan>> { + internal_err!("Should have called SingleDistinctToGroupBy::rewrite") + } + + fn name(&self) -> &str { + "single_distinct_aggregation_to_group_by" + } + + fn apply_order(&self) -> Option<ApplyOrder> { + Some(ApplyOrder::TopDown) + } + + fn supports_rewrite(&self) -> bool { + true + } + + fn rewrite( + &self, + plan: LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> Result<Transformed<LogicalPlan>, DataFusionError> { match plan { LogicalPlan::Aggregate(Aggregate { input, aggr_expr, schema, group_expr, .. - }) => { - if is_single_distinct_agg(plan)? && !contains_grouping_set(group_expr) { - // alias all original group_by exprs - let (mut inner_group_exprs, out_group_expr_with_alias): ( - Vec<Expr>, - Vec<(Expr, Option<String>)>, - ) = group_expr - .iter() - .enumerate() - .map(|(i, group_expr)| { - if let Expr::Column(_) = group_expr { - // For Column expressions we can use existing expression as is. - (group_expr.clone(), (group_expr.clone(), None)) - } else { - // For complex expression write is as alias, to be able to refer - // if from parent operators successfully. - // Consider plan below. - // - // Aggregate: groupBy=[[group_alias_0]], aggr=[[COUNT(alias1)]] [group_alias_0:Int32, COUNT(alias1):Int64;N]\ - // --Aggregate: groupBy=[[test.a + Int32(1) AS group_alias_0, test.c AS alias1]], aggr=[[]] [group_alias_0:Int32, alias1:UInt32]\ - // ----TableScan: test [a:UInt32, b:UInt32, c:UInt32] - // - // First aggregate(from bottom) refers to `test.a` column. - // Second aggregate refers to the `group_alias_0` column, Which is a valid field in the first aggregate. - // If we were to write plan above as below without alias - // - // Aggregate: groupBy=[[test.a + Int32(1)]], aggr=[[COUNT(alias1)]] [group_alias_0:Int32, COUNT(alias1):Int64;N]\ - // --Aggregate: groupBy=[[test.a + Int32(1), test.c AS alias1]], aggr=[[]] [group_alias_0:Int32, alias1:UInt32]\ - // ----TableScan: test [a:UInt32, b:UInt32, c:UInt32] - // - // Second aggregate refers to the `test.a + Int32(1)` expression However, its input do not have `test.a` expression in it. - let alias_str = format!("group_alias_{i}"); - let alias_expr = group_expr.clone().alias(&alias_str); - let (qualifier, field) = schema.qualified_field(i); + }) if is_single_distinct_agg(&aggr_expr)? + && !contains_grouping_set(&group_expr) => + { + let group_size = group_expr.len(); + // alias all original group_by exprs + let (mut inner_group_exprs, out_group_expr_with_alias): ( + Vec<Expr>, + Vec<(Expr, Option<String>)>, + ) = group_expr + .into_iter() + .enumerate() + .map(|(i, group_expr)| { + if let Expr::Column(_) = group_expr { + // For Column expressions we can use existing expression as is. + (group_expr.clone(), (group_expr, None)) + } else { + // For complex expression write is as alias, to be able to refer + // if from parent operators successfully. + // Consider plan below. + // + // Aggregate: groupBy=[[group_alias_0]], aggr=[[COUNT(alias1)]] [group_alias_0:Int32, COUNT(alias1):Int64;N]\ + // --Aggregate: groupBy=[[test.a + Int32(1) AS group_alias_0, test.c AS alias1]], aggr=[[]] [group_alias_0:Int32, alias1:UInt32]\ + // ----TableScan: test [a:UInt32, b:UInt32, c:UInt32] + // + // First aggregate(from bottom) refers to `test.a` column. + // Second aggregate refers to the `group_alias_0` column, Which is a valid field in the first aggregate. + // If we were to write plan above as below without alias + // + // Aggregate: groupBy=[[test.a + Int32(1)]], aggr=[[COUNT(alias1)]] [group_alias_0:Int32, COUNT(alias1):Int64;N]\ + // --Aggregate: groupBy=[[test.a + Int32(1), test.c AS alias1]], aggr=[[]] [group_alias_0:Int32, alias1:UInt32]\ + // ----TableScan: test [a:UInt32, b:UInt32, c:UInt32] + // + // Second aggregate refers to the `test.a + Int32(1)` expression However, its input do not have `test.a` expression in it. + let alias_str = format!("group_alias_{i}"); + let (qualifier, field) = schema.qualified_field(i); + ( + group_expr.alias(alias_str.clone()), ( - alias_expr, - ( - col(alias_str), - Some(qualified_name(qualifier, field.name())), - ), - ) - } - }) - .unzip(); - - // and they can be referenced by the alias in the outer aggr plan - let outer_group_exprs = out_group_expr_with_alias - .iter() - .map(|(out_group_expr, _)| out_group_expr.clone()) - .collect::<Vec<_>>(); - - // replace the distinct arg with alias - let mut index = 1; - let mut group_fields_set = HashSet::new(); - let mut inner_aggr_exprs = vec![]; - let outer_aggr_exprs = aggr_expr - .iter() - .map(|aggr_expr| match aggr_expr { - Expr::AggregateFunction(AggregateFunction { - func_def: AggregateFunctionDefinition::BuiltIn(fun), - args, - distinct, - .. - }) => { - // is_single_distinct_agg ensure args.len=1 - if *distinct - && group_fields_set.insert(args[0].display_name()?) - { - inner_group_exprs.push( - args[0].clone().alias(SINGLE_DISTINCT_ALIAS), - ); + col(alias_str), + Some(qualified_name(qualifier, field.name())), + ), + ) + } + }) + .unzip(); + + // replace the distinct arg with alias + let mut index = 1; + let mut distinct_aggr_exprs = HashSet::new(); + let mut inner_aggr_exprs = vec![]; + let outer_aggr_exprs = aggr_expr + .into_iter() + .map(|aggr_expr| match aggr_expr { + Expr::AggregateFunction(AggregateFunction { + func_def: AggregateFunctionDefinition::BuiltIn(fun), + mut args, + distinct, + .. + }) => { + if distinct { + debug_assert_eq!( Review Comment: Can we please change this from `debug_assert_eq` (which will panic) to an `internal_err!`? ########## datafusion/optimizer/src/single_distinct_to_groupby.rs: ########## @@ -131,177 +126,190 @@ fn contains_grouping_set(expr: &[Expr]) -> bool { impl OptimizerRule for SingleDistinctToGroupBy { fn try_optimize( &self, - plan: &LogicalPlan, + _plan: &LogicalPlan, _config: &dyn OptimizerConfig, ) -> Result<Option<LogicalPlan>> { + internal_err!("Should have called SingleDistinctToGroupBy::rewrite") + } + + fn name(&self) -> &str { + "single_distinct_aggregation_to_group_by" + } + + fn apply_order(&self) -> Option<ApplyOrder> { + Some(ApplyOrder::TopDown) + } + + fn supports_rewrite(&self) -> bool { + true + } + + fn rewrite( + &self, + plan: LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> Result<Transformed<LogicalPlan>, DataFusionError> { match plan { LogicalPlan::Aggregate(Aggregate { input, aggr_expr, schema, group_expr, .. - }) => { - if is_single_distinct_agg(plan)? && !contains_grouping_set(group_expr) { - // alias all original group_by exprs - let (mut inner_group_exprs, out_group_expr_with_alias): ( - Vec<Expr>, - Vec<(Expr, Option<String>)>, - ) = group_expr - .iter() - .enumerate() - .map(|(i, group_expr)| { - if let Expr::Column(_) = group_expr { - // For Column expressions we can use existing expression as is. - (group_expr.clone(), (group_expr.clone(), None)) - } else { - // For complex expression write is as alias, to be able to refer - // if from parent operators successfully. - // Consider plan below. - // - // Aggregate: groupBy=[[group_alias_0]], aggr=[[COUNT(alias1)]] [group_alias_0:Int32, COUNT(alias1):Int64;N]\ - // --Aggregate: groupBy=[[test.a + Int32(1) AS group_alias_0, test.c AS alias1]], aggr=[[]] [group_alias_0:Int32, alias1:UInt32]\ - // ----TableScan: test [a:UInt32, b:UInt32, c:UInt32] - // - // First aggregate(from bottom) refers to `test.a` column. - // Second aggregate refers to the `group_alias_0` column, Which is a valid field in the first aggregate. - // If we were to write plan above as below without alias - // - // Aggregate: groupBy=[[test.a + Int32(1)]], aggr=[[COUNT(alias1)]] [group_alias_0:Int32, COUNT(alias1):Int64;N]\ - // --Aggregate: groupBy=[[test.a + Int32(1), test.c AS alias1]], aggr=[[]] [group_alias_0:Int32, alias1:UInt32]\ - // ----TableScan: test [a:UInt32, b:UInt32, c:UInt32] - // - // Second aggregate refers to the `test.a + Int32(1)` expression However, its input do not have `test.a` expression in it. - let alias_str = format!("group_alias_{i}"); - let alias_expr = group_expr.clone().alias(&alias_str); - let (qualifier, field) = schema.qualified_field(i); + }) if is_single_distinct_agg(&aggr_expr)? + && !contains_grouping_set(&group_expr) => + { + let group_size = group_expr.len(); + // alias all original group_by exprs + let (mut inner_group_exprs, out_group_expr_with_alias): ( + Vec<Expr>, + Vec<(Expr, Option<String>)>, + ) = group_expr + .into_iter() + .enumerate() + .map(|(i, group_expr)| { + if let Expr::Column(_) = group_expr { + // For Column expressions we can use existing expression as is. + (group_expr.clone(), (group_expr, None)) + } else { + // For complex expression write is as alias, to be able to refer + // if from parent operators successfully. + // Consider plan below. + // + // Aggregate: groupBy=[[group_alias_0]], aggr=[[COUNT(alias1)]] [group_alias_0:Int32, COUNT(alias1):Int64;N]\ + // --Aggregate: groupBy=[[test.a + Int32(1) AS group_alias_0, test.c AS alias1]], aggr=[[]] [group_alias_0:Int32, alias1:UInt32]\ + // ----TableScan: test [a:UInt32, b:UInt32, c:UInt32] + // + // First aggregate(from bottom) refers to `test.a` column. + // Second aggregate refers to the `group_alias_0` column, Which is a valid field in the first aggregate. + // If we were to write plan above as below without alias + // + // Aggregate: groupBy=[[test.a + Int32(1)]], aggr=[[COUNT(alias1)]] [group_alias_0:Int32, COUNT(alias1):Int64;N]\ + // --Aggregate: groupBy=[[test.a + Int32(1), test.c AS alias1]], aggr=[[]] [group_alias_0:Int32, alias1:UInt32]\ + // ----TableScan: test [a:UInt32, b:UInt32, c:UInt32] + // + // Second aggregate refers to the `test.a + Int32(1)` expression However, its input do not have `test.a` expression in it. + let alias_str = format!("group_alias_{i}"); + let (qualifier, field) = schema.qualified_field(i); + ( + group_expr.alias(alias_str.clone()), ( - alias_expr, - ( - col(alias_str), - Some(qualified_name(qualifier, field.name())), - ), - ) - } - }) - .unzip(); - - // and they can be referenced by the alias in the outer aggr plan - let outer_group_exprs = out_group_expr_with_alias - .iter() - .map(|(out_group_expr, _)| out_group_expr.clone()) - .collect::<Vec<_>>(); - - // replace the distinct arg with alias - let mut index = 1; - let mut group_fields_set = HashSet::new(); - let mut inner_aggr_exprs = vec![]; - let outer_aggr_exprs = aggr_expr - .iter() - .map(|aggr_expr| match aggr_expr { - Expr::AggregateFunction(AggregateFunction { - func_def: AggregateFunctionDefinition::BuiltIn(fun), - args, - distinct, - .. - }) => { - // is_single_distinct_agg ensure args.len=1 - if *distinct - && group_fields_set.insert(args[0].display_name()?) - { - inner_group_exprs.push( - args[0].clone().alias(SINGLE_DISTINCT_ALIAS), - ); + col(alias_str), + Some(qualified_name(qualifier, field.name())), + ), + ) + } + }) + .unzip(); + + // replace the distinct arg with alias + let mut index = 1; + let mut distinct_aggr_exprs = HashSet::new(); + let mut inner_aggr_exprs = vec![]; + let outer_aggr_exprs = aggr_expr + .into_iter() + .map(|aggr_expr| match aggr_expr { + Expr::AggregateFunction(AggregateFunction { + func_def: AggregateFunctionDefinition::BuiltIn(fun), + mut args, + distinct, + .. + }) => { + if distinct { + debug_assert_eq!( + args.len(), + 1, + "DISTINCT aggregate should have exactly one argument" + ); + let arg = args.swap_remove(0); + + let expr_id = distinct_aggr_exprs.hasher().hash_one(&arg); + if distinct_aggr_exprs.insert(expr_id) { Review Comment: This code appears to deduplicate distinct aggregate expressions by comparing their hash values rather than by equality. While expressions that hash to the same value are very likely the same expression, this isn't guaranteed I don't think. I think for correctness the distinct_agg_exprs should hold the arguments by value -- e.g. `distinct_agg_exprs.insert(arg.clone())` or use the display name as in the previous version I realize this is another clone, but I think it is required for correctness In general finding some way to compute a digest / fingerprint of an expression that can be used for equality would be great, but maybe it should be done as a separate project. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org