Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/9055#discussion_r42581800 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -270,6 +273,146 @@ class Analyzer( } /** + * Rewrite the [[Exists]] [[In]] with left semi join or anti join. + */ + object RewriteFilterSubQuery extends Rule[LogicalPlan] with PredicateHelper { + def unapply(condition: Expression): Option[(Expression, Seq[Expression])] = { + if (condition.resolved == false) { + return None + } + + val conjuctions = splitConjunctivePredicates(condition).map(_ transformDown { + // Remove the Cast expression for SubQueryExpression. + case Cast(f: SubQueryExpression, BooleanType) => f + } + ) + + val (subqueries, others) = conjuctions.partition(c => c.isInstanceOf[SubQueryExpression]) + if (subqueries.isEmpty) { + None + } else if (subqueries.length > 1) { + throw new AnalysisException( + s"Only 1 SubQuery expression is supported, but we got $subqueries") + } else { + val subQueryExpr = subqueries(0).asInstanceOf[SubQueryExpression] + // try to resolve the subquery + + val subquery = Analyzer.this.execute(subQueryExpr.subquery) match { + case Distinct(child) => child // Distinct is useless for semi join, ignore it. + case other => other + } + Some((subQueryExpr.withNewSubQuery(subquery), others)) + } + } + + def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { + case f if f.childrenResolved == false => f + + case f @ Filter(RewriteFilterSubQuery(subquery, others), left) => + subquery match { + case Exists(Project(_, Filter(condition, right)), positive) => + checkAnalysis(right) + if (condition.resolved) { + // Apparently, it should be not resolved here, since EXIST should be correlated. + throw new AnalysisException( + s"Exist clause should be correlated, but we got $condition") + } + Join(others.reduceOption(And).map(Filter(_, left)).getOrElse(left), right, + if (positive) LeftSemi else LeftAnti, + Some(ResolveReferences.tryResolveAttributes(condition, right))) + + case Exists(right, positive) => + throw new AnalysisException(s"Exist clause should be correlated, but we got $right") + + case InSubquery(key, Project(projectList, Filter(condition, right)), positive) => + checkAnalysis(right) + if (projectList.length != 1) { + throw new AnalysisException( + s"Expect only 1 projection in In Subquery Expression, but we got $projectList") + } else { + val rightKey = ResolveReferences.tryResolveAttributes(projectList(0), right) + + if (!rightKey.resolved) { + throw new AnalysisException( + s"Outer query expression should be only presented at the filter clause, " + + s"but we got $rightKey") + } + Join(others.reduceOption(And).map(Filter(_, left)).getOrElse(left), right, + if (positive) LeftSemi else LeftAnti, + Some( + And( + ResolveReferences.tryResolveAttributes(condition, right), + EqualTo(rightKey, key)))) --- End diff -- It will be better to split it to multiple lines.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org