jayzhan211 commented on code in PR #10066:
URL:
https://github.com/apache/arrow-datafusion/pull/10066#discussion_r1564359740
##########
datafusion/optimizer/src/analyzer/count_wildcard_rule.rs:
##########
@@ -47,181 +38,43 @@ impl CountWildcardRule {
impl AnalyzerRule for CountWildcardRule {
fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) ->
Result<LogicalPlan> {
- plan.transform_down(&analyze_internal).data()
+ plan.transform_down_with_subqueries(&analyze_internal)
+ .data()
}
fn name(&self) -> &str {
"count_wildcard_rule"
}
}
-fn analyze_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
- let mut rewriter = CountWildcardRewriter {};
- match plan {
- LogicalPlan::Window(window) => {
- let window_expr = window
- .window_expr
- .iter()
- .map(|expr| rewrite_preserving_name(expr.clone(), &mut
rewriter))
- .collect::<Result<Vec<_>>>()?;
-
- Ok(Transformed::yes(
- LogicalPlanBuilder::from((*window.input).clone())
- .window(window_expr)?
- .build()?,
- ))
- }
- LogicalPlan::Aggregate(agg) => {
- let aggr_expr = agg
- .aggr_expr
- .iter()
- .map(|expr| rewrite_preserving_name(expr.clone(), &mut
rewriter))
- .collect::<Result<Vec<_>>>()?;
-
- Ok(Transformed::yes(LogicalPlan::Aggregate(
- Aggregate::try_new(agg.input.clone(), agg.group_expr,
aggr_expr)?,
- )))
- }
- LogicalPlan::Sort(Sort { expr, input, fetch }) => {
- let sort_expr = expr
- .iter()
- .map(|expr| rewrite_preserving_name(expr.clone(), &mut
rewriter))
- .collect::<Result<Vec<_>>>()?;
- Ok(Transformed::yes(LogicalPlan::Sort(Sort {
- expr: sort_expr,
- input,
- fetch,
- })))
- }
- LogicalPlan::Projection(projection) => {
- let projection_expr = projection
- .expr
- .iter()
- .map(|expr| rewrite_preserving_name(expr.clone(), &mut
rewriter))
- .collect::<Result<Vec<_>>>()?;
- Ok(Transformed::yes(LogicalPlan::Projection(
- Projection::try_new(projection_expr, projection.input)?,
- )))
- }
- LogicalPlan::Filter(Filter {
- predicate, input, ..
- }) => {
- let predicate = rewrite_preserving_name(predicate, &mut rewriter)?;
- Ok(Transformed::yes(LogicalPlan::Filter(Filter::try_new(
- predicate, input,
- )?)))
- }
-
- _ => Ok(Transformed::no(plan)),
- }
+fn is_wildcard(expr: &Expr) -> bool {
+ matches!(expr, Expr::Wildcard { qualifier: None })
}
-
-struct CountWildcardRewriter {}
-
-impl TreeNodeRewriter for CountWildcardRewriter {
- type Node = Expr;
-
- fn f_up(&mut self, old_expr: Expr) -> Result<Transformed<Expr>> {
- Ok(match old_expr.clone() {
- Expr::WindowFunction(expr::WindowFunction {
- fun:
- expr::WindowFunctionDefinition::AggregateFunction(
- aggregate_function::AggregateFunction::Count,
- ),
- args,
- partition_by,
- order_by,
- window_frame,
- null_treatment,
- }) if args.len() == 1 => match args[0] {
- Expr::Wildcard { qualifier: None } => {
- Transformed::yes(Expr::WindowFunction(expr::WindowFunction
{
- fun: expr::WindowFunctionDefinition::AggregateFunction(
- aggregate_function::AggregateFunction::Count,
- ),
- args: vec![lit(COUNT_STAR_EXPANSION)],
- partition_by,
- order_by,
- window_frame,
- null_treatment,
- }))
- }
-
- _ => Transformed::no(old_expr),
- },
- Expr::AggregateFunction(AggregateFunction {
- func_def:
- AggregateFunctionDefinition::BuiltIn(
- aggregate_function::AggregateFunction::Count,
- ),
- args,
- distinct,
- filter,
- order_by,
- null_treatment,
- }) if args.len() == 1 => match args[0] {
- Expr::Wildcard { qualifier: None } => {
-
Transformed::yes(Expr::AggregateFunction(AggregateFunction::new(
- aggregate_function::AggregateFunction::Count,
- vec![lit(COUNT_STAR_EXPANSION)],
- distinct,
- filter,
- order_by,
- null_treatment,
- )))
- }
- _ => Transformed::no(old_expr),
- },
-
- ScalarSubquery(Subquery {
- subquery,
- outer_ref_columns,
- }) => subquery
- .as_ref()
- .clone()
- .transform_down(&analyze_internal)?
- .update_data(|new_plan| {
- ScalarSubquery(Subquery {
- subquery: Arc::new(new_plan),
- outer_ref_columns,
- })
- }),
- Expr::InSubquery(InSubquery {
- expr,
- subquery,
- negated,
- }) => subquery
- .subquery
- .as_ref()
- .clone()
- .transform_down(&analyze_internal)?
- .update_data(|new_plan| {
- Expr::InSubquery(InSubquery::new(
- expr,
- Subquery {
- subquery: Arc::new(new_plan),
- outer_ref_columns: subquery.outer_ref_columns,
- },
- negated,
- ))
- }),
- Expr::Exists(expr::Exists { subquery, negated }) => subquery
- .subquery
- .as_ref()
- .clone()
- .transform_down(&analyze_internal)?
- .update_data(|new_plan| {
- Expr::Exists(expr::Exists {
- subquery: Subquery {
- subquery: Arc::new(new_plan),
- outer_ref_columns: subquery.outer_ref_columns,
- },
- negated,
- })
- }),
- _ => Transformed::no(old_expr),
- })
- }
+fn analyze_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
+ let name_preserver = NamePreserver::new(&plan);
+ plan.map_expressions(|expr| {
+ let original_name = name_preserver.save(&expr)?;
+ let transformed_expr = expr.transform_up(&|expr| match expr {
+ Expr::WindowFunction(mut window_function)
+ if window_function.args.len() == 1
+ && is_wildcard(&window_function.args[0]) =>
Review Comment:
do we need to match function Count as well?
##########
datafusion/optimizer/src/analyzer/count_wildcard_rule.rs:
##########
@@ -47,181 +38,43 @@ impl CountWildcardRule {
impl AnalyzerRule for CountWildcardRule {
fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) ->
Result<LogicalPlan> {
- plan.transform_down(&analyze_internal).data()
+ plan.transform_down_with_subqueries(&analyze_internal)
+ .data()
}
fn name(&self) -> &str {
"count_wildcard_rule"
}
}
-fn analyze_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
- let mut rewriter = CountWildcardRewriter {};
- match plan {
- LogicalPlan::Window(window) => {
- let window_expr = window
- .window_expr
- .iter()
- .map(|expr| rewrite_preserving_name(expr.clone(), &mut
rewriter))
- .collect::<Result<Vec<_>>>()?;
-
- Ok(Transformed::yes(
- LogicalPlanBuilder::from((*window.input).clone())
- .window(window_expr)?
- .build()?,
- ))
- }
- LogicalPlan::Aggregate(agg) => {
- let aggr_expr = agg
- .aggr_expr
- .iter()
- .map(|expr| rewrite_preserving_name(expr.clone(), &mut
rewriter))
- .collect::<Result<Vec<_>>>()?;
-
- Ok(Transformed::yes(LogicalPlan::Aggregate(
- Aggregate::try_new(agg.input.clone(), agg.group_expr,
aggr_expr)?,
- )))
- }
- LogicalPlan::Sort(Sort { expr, input, fetch }) => {
- let sort_expr = expr
- .iter()
- .map(|expr| rewrite_preserving_name(expr.clone(), &mut
rewriter))
- .collect::<Result<Vec<_>>>()?;
- Ok(Transformed::yes(LogicalPlan::Sort(Sort {
- expr: sort_expr,
- input,
- fetch,
- })))
- }
- LogicalPlan::Projection(projection) => {
- let projection_expr = projection
- .expr
- .iter()
- .map(|expr| rewrite_preserving_name(expr.clone(), &mut
rewriter))
- .collect::<Result<Vec<_>>>()?;
- Ok(Transformed::yes(LogicalPlan::Projection(
- Projection::try_new(projection_expr, projection.input)?,
- )))
- }
- LogicalPlan::Filter(Filter {
- predicate, input, ..
- }) => {
- let predicate = rewrite_preserving_name(predicate, &mut rewriter)?;
- Ok(Transformed::yes(LogicalPlan::Filter(Filter::try_new(
- predicate, input,
- )?)))
- }
-
- _ => Ok(Transformed::no(plan)),
- }
+fn is_wildcard(expr: &Expr) -> bool {
+ matches!(expr, Expr::Wildcard { qualifier: None })
}
-
-struct CountWildcardRewriter {}
-
-impl TreeNodeRewriter for CountWildcardRewriter {
- type Node = Expr;
-
- fn f_up(&mut self, old_expr: Expr) -> Result<Transformed<Expr>> {
- Ok(match old_expr.clone() {
- Expr::WindowFunction(expr::WindowFunction {
- fun:
- expr::WindowFunctionDefinition::AggregateFunction(
- aggregate_function::AggregateFunction::Count,
- ),
- args,
- partition_by,
- order_by,
- window_frame,
- null_treatment,
- }) if args.len() == 1 => match args[0] {
- Expr::Wildcard { qualifier: None } => {
- Transformed::yes(Expr::WindowFunction(expr::WindowFunction
{
- fun: expr::WindowFunctionDefinition::AggregateFunction(
- aggregate_function::AggregateFunction::Count,
- ),
- args: vec![lit(COUNT_STAR_EXPANSION)],
- partition_by,
- order_by,
- window_frame,
- null_treatment,
- }))
- }
-
- _ => Transformed::no(old_expr),
- },
- Expr::AggregateFunction(AggregateFunction {
- func_def:
- AggregateFunctionDefinition::BuiltIn(
- aggregate_function::AggregateFunction::Count,
- ),
- args,
- distinct,
- filter,
- order_by,
- null_treatment,
- }) if args.len() == 1 => match args[0] {
- Expr::Wildcard { qualifier: None } => {
-
Transformed::yes(Expr::AggregateFunction(AggregateFunction::new(
- aggregate_function::AggregateFunction::Count,
- vec![lit(COUNT_STAR_EXPANSION)],
- distinct,
- filter,
- order_by,
- null_treatment,
- )))
- }
- _ => Transformed::no(old_expr),
- },
-
- ScalarSubquery(Subquery {
- subquery,
- outer_ref_columns,
- }) => subquery
- .as_ref()
- .clone()
- .transform_down(&analyze_internal)?
- .update_data(|new_plan| {
- ScalarSubquery(Subquery {
- subquery: Arc::new(new_plan),
- outer_ref_columns,
- })
- }),
- Expr::InSubquery(InSubquery {
- expr,
- subquery,
- negated,
- }) => subquery
- .subquery
- .as_ref()
- .clone()
- .transform_down(&analyze_internal)?
- .update_data(|new_plan| {
- Expr::InSubquery(InSubquery::new(
- expr,
- Subquery {
- subquery: Arc::new(new_plan),
- outer_ref_columns: subquery.outer_ref_columns,
- },
- negated,
- ))
- }),
- Expr::Exists(expr::Exists { subquery, negated }) => subquery
- .subquery
- .as_ref()
- .clone()
- .transform_down(&analyze_internal)?
- .update_data(|new_plan| {
- Expr::Exists(expr::Exists {
- subquery: Subquery {
- subquery: Arc::new(new_plan),
- outer_ref_columns: subquery.outer_ref_columns,
- },
- negated,
- })
- }),
- _ => Transformed::no(old_expr),
- })
- }
+fn analyze_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
+ let name_preserver = NamePreserver::new(&plan);
+ plan.map_expressions(|expr| {
+ let original_name = name_preserver.save(&expr)?;
+ let transformed_expr = expr.transform_up(&|expr| match expr {
+ Expr::WindowFunction(mut window_function)
+ if window_function.args.len() == 1
+ && is_wildcard(&window_function.args[0]) =>
Review Comment:
do we need to match the function Count as well?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]