Github user nsyca commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17520#discussion_r109522976
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
    @@ -474,9 +478,42 @@ case class EliminateOuterJoin(conf: CatalystConf) 
extends Rule[LogicalPlan] with
         }
       }
     
    +  private def buildNewJoinType(upperJoin: Join, lowerJoin: Join, 
otherTableOutput: AttributeSet):
    +    JoinType = {
    +    val conditions = upperJoin.constraints
    +    // Find the predicates reference only on the other table.
    +    val localConditions = 
conditions.filter(_.references.subsetOf(otherTableOutput))
    +    // Find the predicates reference either the left table or the join 
predicates
    +    // between the left table and the other table.
    +    val leftConditions = conditions.filter(_.references.
    +      subsetOf(lowerJoin.left.outputSet ++ 
otherTableOutput)).diff(localConditions)
    +    // Find the predicates reference either the right table or the join 
predicates
    +    // between the right table and the other table.
    +    val rightConditions = conditions.filter(_.references.
    +      subsetOf(lowerJoin.right.outputSet ++ 
otherTableOutput)).diff(localConditions)
    +
    +    val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull)
    +    val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull)
    +
    +    lowerJoin.joinType match {
    +      case RightOuter if leftHasNonNullPredicate => Inner
    +      case LeftOuter if rightHasNonNullPredicate => Inner
    +      case FullOuter if leftHasNonNullPredicate && 
rightHasNonNullPredicate => Inner
    +      case FullOuter if leftHasNonNullPredicate => LeftOuter
    +      case FullOuter if rightHasNonNullPredicate => RightOuter
    +      case o => o
    +    }
    +  }
    +
       def apply(plan: LogicalPlan): LogicalPlan = plan transform {
         case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | 
FullOuter, _)) =>
           val newJoinType = buildNewJoinType(f, j)
           if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
    +    case j @ Join(child @ Join(_, _, RightOuter | LeftOuter | FullOuter, 
_),
    +      subquery, LeftSemiOrAnti(joinType), joinCond) =>
    +      val newJoinType = buildNewJoinType(j, child, subquery.outputSet)
    +      if (newJoinType == child.joinType) j else {
    +        Join(child.copy(joinType = newJoinType), subquery, joinType, 
joinCond)
    +      }
    --- End diff --
    
    This is a new rewrite to convert the outer joins below LeftSemi/LeftAnti to 
an inner join (or left or right outer join in the case of the original full 
outer join). EXISTS/IN/NOT EXISTS subqueries are null-filtering predicates if 
the correlated predicates in the subquery are null-filtering.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to