Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/13570#discussion_r66697830 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1715,31 +1715,52 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { // Filter the plan by applying left semi and left anti joins. withSubquery.foldLeft(newFilter) { case (p, PredicateSubquery(sub, conditions, _, _)) => - Join(p, sub, LeftSemi, conditions.reduceOption(And)) + val (joinCond, outerPlan) = rewriteExistentialExpr(conditions.reduceOption(And), p) + Join(outerPlan, sub, LeftSemi, joinCond) case (p, Not(PredicateSubquery(sub, conditions, false, _))) => - Join(p, sub, LeftAnti, conditions.reduceOption(And)) + val (joinCond, outerPlan) = rewriteExistentialExpr(conditions.reduceOption(And), p) + Join(outerPlan, sub, LeftAnti, joinCond) case (p, Not(PredicateSubquery(sub, conditions, true, _))) => - // This is a NULL-aware (left) anti join (NAAJ). + // This is a NULL-aware (left) anti join (NAAJ) e.g. col NOT IN expr // Construct the condition. A NULL in one of the conditions is regarded as a positive // result; such a row will be filtered out by the Anti-Join operator. - val anyNull = conditions.map(IsNull).reduceLeft(Or) - val condition = conditions.reduceLeft(And) - // Note that will almost certainly be planned as a Broadcast Nested Loop join. Use EXISTS - // if performance matters to you. - Join(p, sub, LeftAnti, Option(Or(anyNull, condition))) + // Note that will almost certainly be planned as a Broadcast Nested Loop join. + // Use EXISTS if performance matters to you. + val (joinCond, outerPlan) = rewriteExistentialExpr(conditions.reduceLeftOption(And), p) + val anyNull = splitConjunctivePredicates(joinCond.get).map(IsNull).reduceLeft(Or) + Join(outerPlan, sub, LeftAnti, Option(Or(anyNull, joinCond.get))) case (p, predicate) => - var joined = p - val replaced = predicate transformUp { - case PredicateSubquery(sub, conditions, nullAware, _) => - // TODO: support null-aware join - val exists = AttributeReference("exists", BooleanType, nullable = false)() - joined = Join(joined, sub, ExistenceJoin(exists), conditions.reduceLeftOption(And)) - exists - } - Project(p.output, Filter(replaced, joined)) + val (newCond, inputPlan) = rewriteExistentialExpr(Option(predicate), p) + Project(p.output, Filter(newCond.get, inputPlan)) } } + + /** + * Given a predicate expression and an input plan, it rewrites + * any embedded existential sub-query into an existential join. + * It returns the rewritten expression together with the updated plan. + * Currently, it does not support null-aware joins. Embedded NOT IN predicates + * are blocked in the Analyzer. + */ + private def rewriteExistentialExpr( + expr: Option[Expression], + plan: LogicalPlan): (Option[Expression], LogicalPlan) = { + var newPlan = plan --- End diff -- Move this down to the Some(case). A bit of mutability is not a problem though.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org