Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9055#discussion_r42581800
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
    @@ -270,6 +273,146 @@ class Analyzer(
       }
     
       /**
    +   * Rewrite the [[Exists]] [[In]] with left semi join or anti join.
    +   */
    +  object RewriteFilterSubQuery extends Rule[LogicalPlan] with 
PredicateHelper {
    +    def unapply(condition: Expression): Option[(Expression, 
Seq[Expression])] = {
    +      if (condition.resolved == false) {
    +        return None
    +      }
    +
    +      val conjuctions = splitConjunctivePredicates(condition).map(_ 
transformDown {
    +          // Remove the Cast expression for SubQueryExpression.
    +          case Cast(f: SubQueryExpression, BooleanType) => f
    +        }
    +      )
    +
    +      val (subqueries, others) = conjuctions.partition(c => 
c.isInstanceOf[SubQueryExpression])
    +      if (subqueries.isEmpty) {
    +        None
    +      } else if (subqueries.length > 1) {
    +        throw new AnalysisException(
    +          s"Only 1 SubQuery expression is supported, but we got 
$subqueries")
    +      } else {
    +        val subQueryExpr = subqueries(0).asInstanceOf[SubQueryExpression]
    +        // try to resolve the subquery
    +
    +        val subquery = Analyzer.this.execute(subQueryExpr.subquery) match {
    +          case Distinct(child) => child // Distinct is useless for semi 
join, ignore it.
    +          case other => other
    +        }
    +        Some((subQueryExpr.withNewSubQuery(subquery), others))
    +      }
    +    }
    +
    +    def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
    +      case f if f.childrenResolved == false => f
    +
    +      case f @ Filter(RewriteFilterSubQuery(subquery, others), left) =>
    +        subquery match {
    +          case Exists(Project(_, Filter(condition, right)), positive) =>
    +            checkAnalysis(right)
    +            if (condition.resolved) {
    +              // Apparently, it should be not resolved here, since EXIST 
should be correlated.
    +              throw new AnalysisException(
    +                s"Exist clause should be correlated, but we got 
$condition")
    +            }
    +            Join(others.reduceOption(And).map(Filter(_, 
left)).getOrElse(left), right,
    +              if (positive) LeftSemi else LeftAnti,
    +              Some(ResolveReferences.tryResolveAttributes(condition, 
right)))
    +
    +          case Exists(right, positive) =>
    +            throw new AnalysisException(s"Exist clause should be 
correlated, but we got $right")
    +
    +          case InSubquery(key, Project(projectList, Filter(condition, 
right)), positive) =>
    +            checkAnalysis(right)
    +            if (projectList.length != 1) {
    +              throw new AnalysisException(
    +                s"Expect only 1 projection in In Subquery Expression, but 
we got $projectList")
    +            } else {
    +              val rightKey = 
ResolveReferences.tryResolveAttributes(projectList(0), right)
    +
    +              if (!rightKey.resolved) {
    +                throw new AnalysisException(
    +                  s"Outer query expression should be only presented at the 
filter clause, " +
    +                    s"but we got $rightKey")
    +              }
    +              Join(others.reduceOption(And).map(Filter(_, 
left)).getOrElse(left), right,
    +                if (positive) LeftSemi else LeftAnti,
    +                Some(
    +                  And(
    +                    ResolveReferences.tryResolveAttributes(condition, 
right),
    +                    EqualTo(rightKey, key))))
    --- End diff --
    
    It will be better to split it to multiple lines.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to