ygf11 commented on code in PR #4826: URL: https://github.com/apache/arrow-datafusion/pull/4826#discussion_r1064001139
########## datafusion/optimizer/src/decorrelate_where_in.rs: ########## @@ -121,80 +121,95 @@ impl OptimizerRule for DecorrelateWhereIn { } } +/// Optimize the where in subquery to left-anti/left-semi join. +/// If the subquery is a correlated subquery, we need extract the join predicate from the subquery. +/// +/// For example, given a query like: +/// `select t1.a, t1.b from t1 where t1 in (select t2.a from t2 where t1.b = t2.b and t1.c > t2.c)` +/// +/// The optimized plan will be: +/// +/// Projection: t1.a, t1.b | +/// LeftSemi Join: Filter: t1.a = __correlated_sq_1.a AND t1.b = __correlated_sq_1.b AND t1.c > __correlated_sq_1.c | +/// TableScan: t1 | +/// SubqueryAlias: __correlated_sq_1 | +/// Projection: t2.a AS a, t2.b, t2.c | +/// TableScan: t2 | +/// fn optimize_where_in( query_info: &SubqueryInfo, - outer_input: &LogicalPlan, + left: &LogicalPlan, outer_other_exprs: &[Expr], alias: &AliasGenerator, ) -> Result<LogicalPlan> { let proj = Projection::try_from_plan(&query_info.query.subquery) .map_err(|e| context!("a projection is required", e))?; - let mut subqry_input = proj.input.clone(); - let proj = only_or_err(proj.expr.as_slice()) + let subquery_input = proj.input.clone(); + let subquery_expr = only_or_err(proj.expr.as_slice()) .map_err(|e| context!("single expression projection required", e))?; - let subquery_col = proj - .try_into_col() - .map_err(|e| context!("single column projection required", e))?; - let outer_col = query_info - .where_in_expr - .try_into_col() - .map_err(|e| context!("column comparison required", e))?; - - // If subquery is correlated, grab necessary information - let mut subqry_cols = vec![]; - let mut outer_cols = vec![]; - let mut join_filters = None; - let mut other_subqry_exprs = vec![]; - if let LogicalPlan::Filter(subqry_filter) = (*subqry_input).clone() { - // split into filters - let subqry_filter_exprs = split_conjunction(&subqry_filter.predicate); - verify_not_disjunction(&subqry_filter_exprs)?; - - // Grab column names to join on - let (col_exprs, other_exprs) = - find_join_exprs(subqry_filter_exprs, subqry_filter.input.schema()) - .map_err(|e| context!("column correlation not found", e))?; - if !col_exprs.is_empty() { - // it's correlated - subqry_input = subqry_filter.input.clone(); - (outer_cols, subqry_cols, join_filters) = - exprs_to_join_cols(&col_exprs, subqry_filter.input.schema(), false) - .map_err(|e| context!("column correlation not found", e))?; - other_subqry_exprs = other_exprs; - } - } - let (subqry_cols, outer_cols) = - merge_cols((&[subquery_col], &subqry_cols), (&[outer_col], &outer_cols)); - - // build subquery side of join - the thing the subquery was querying - let subqry_alias = alias.next("__correlated_sq"); - let mut subqry_plan = LogicalPlanBuilder::from((*subqry_input).clone()); - if let Some(expr) = conjunction(other_subqry_exprs) { - // if the subquery had additional expressions, restore them - subqry_plan = subqry_plan.filter(expr)? + // extract join filters + let (join_filters, subquery_input) = extract_join_filters(subquery_input.as_ref())?; + + // in_predicate may be also include in the join filters, remove it from the join filters. + let in_predicate = Expr::eq(query_info.where_in_expr.clone(), subquery_expr.clone()); + let join_filters = remove_duplicate_filter(join_filters, in_predicate); + + // replace qualified name with subquery alias. + let subquery_alias = alias.next("__correlated_sq"); + let input_schema = subquery_input.schema(); + let mut subquery_cols = + join_filters + .iter() + .try_fold(BTreeSet::<Column>::new(), |mut cols, expr| { + let using_cols: Vec<Column> = expr + .to_columns()? + .into_iter() + .filter(|col| input_schema.field_from_column(col).is_ok()) + .collect::<_>(); + + cols.extend(using_cols); + Result::Ok(cols) + })?; + let join_filter = conjunction(join_filters).map_or(Ok(None), |filter| { + replace_qualify_name(filter, &subquery_cols, &subquery_alias).map(Option::Some) + })?; + + // add projection + if let Expr::Column(col) = subquery_expr { + subquery_cols.remove(col); } - let projection = alias_cols(&subqry_cols); - let subqry_plan = subqry_plan - .project(projection)? - .alias(&subqry_alias)? + let subquery_expr_name = format!("{:?}", unnormalize_col(subquery_expr.clone())); + let first_expr = subquery_expr.clone().alias(subquery_expr_name.clone()); Review Comment: For the right side of the in predicate, the original qualify name seems strange, so it is stripped. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org