Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12668#discussion_r61089605
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
    @@ -131,76 +116,80 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
           a.statistics.sizeInBytes * 3 <= b.statistics.sizeInBytes
         }
     
    -    /**
    -     * Returns whether we should use shuffle hash join or not.
    -     *
    -     * We should only use shuffle hash join when:
    -     *  1) any single partition of a small table could fit in memory.
    -     *  2) the smaller table is much smaller (3X) than the other one.
    -     */
    -    private def shouldShuffleHashJoin(left: LogicalPlan, right: 
LogicalPlan): Boolean = {
    -      canBuildHashMap(left) && muchSmaller(left, right) ||
    -        canBuildHashMap(right) && muchSmaller(right, left)
    +    private def canBuildRight(joinType: JoinType): Boolean = joinType 
match {
    +      case Inner | LeftOuter | LeftSemi | LeftAnti => true
    +      case _ => false
         }
     
    -    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    +    private def canBuildLeft(joinType: JoinType): Boolean = joinType match 
{
    +      case Inner | RightOuter => true
    +      case _ => false
    +    }
     
    -      // --- Inner joins 
--------------------------------------------------------------------------
    +    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
     
    -      case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, 
left, CanBroadcast(right)) =>
    -        Seq(joins.BroadcastHashJoinExec(
    -          leftKeys, rightKeys, Inner, BuildRight, condition, 
planLater(left), planLater(right)))
    +      // --- BroadcastHashJoin 
--------------------------------------------------------------------
     
    -      case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, 
CanBroadcast(left), right) =>
    +      case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, 
left, right)
    +        if canBuildRight(joinType) && canBroadcast(right) =>
             Seq(joins.BroadcastHashJoinExec(
    -          leftKeys, rightKeys, Inner, BuildLeft, condition, 
planLater(left), planLater(right)))
    -
    -      case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, 
left, right)
    -        if !conf.preferSortMergeJoin && shouldShuffleHashJoin(left, right) 
||
    -          !RowOrdering.isOrderable(leftKeys) =>
    -        val buildSide =
    -          if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) 
{
    -            BuildRight
    -          } else {
    -            BuildLeft
    -          }
    -        Seq(joins.ShuffledHashJoinExec(
    -          leftKeys, rightKeys, Inner, buildSide, condition, 
planLater(left), planLater(right)))
    -
    -      case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, 
left, right)
    -        if RowOrdering.isOrderable(leftKeys) =>
    -        joins.SortMergeJoinExec(
    -          leftKeys, rightKeys, Inner, condition, planLater(left), 
planLater(right)) :: Nil
    -
    -      // --- Outer joins 
--------------------------------------------------------------------------
    +          leftKeys, rightKeys, joinType, BuildRight, condition, 
planLater(left), planLater(right)))
     
    -      case ExtractEquiJoinKeys(
    -          LeftOuter, leftKeys, rightKeys, condition, left, 
CanBroadcast(right)) =>
    +      case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, 
left, right)
    +        if canBuildLeft(joinType) && canBroadcast(left) =>
             Seq(joins.BroadcastHashJoinExec(
    -          leftKeys, rightKeys, LeftOuter, BuildRight, condition, 
planLater(left), planLater(right)))
    +          leftKeys, rightKeys, joinType, BuildLeft, condition, 
planLater(left), planLater(right)))
     
    -      case ExtractEquiJoinKeys(
    -          RightOuter, leftKeys, rightKeys, condition, CanBroadcast(left), 
right) =>
    -        Seq(joins.BroadcastHashJoinExec(
    -          leftKeys, rightKeys, RightOuter, BuildLeft, condition, 
planLater(left), planLater(right)))
    +      // --- ShuffledHashJoin 
---------------------------------------------------------------------
     
    -      case ExtractEquiJoinKeys(LeftOuter, leftKeys, rightKeys, condition, 
left, right)
    -         if !conf.preferSortMergeJoin && canBuildHashMap(right) && 
muchSmaller(right, left) ||
    +      case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, 
left, right)
    +         if !conf.preferSortMergeJoin && canBuildRight(joinType) && 
canBuildHashMap(right)
    +           && muchSmaller(right, left) ||
                !RowOrdering.isOrderable(leftKeys) =>
             Seq(joins.ShuffledHashJoinExec(
    -          leftKeys, rightKeys, LeftOuter, BuildRight, condition, 
planLater(left), planLater(right)))
    +          leftKeys, rightKeys, joinType, BuildRight, condition, 
planLater(left), planLater(right)))
     
    -      case ExtractEquiJoinKeys(RightOuter, leftKeys, rightKeys, condition, 
left, right)
    -         if !conf.preferSortMergeJoin && canBuildHashMap(left) && 
muchSmaller(left, right) ||
    +      case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, 
left, right)
    +         if !conf.preferSortMergeJoin && canBuildRight(joinType) && 
canBuildHashMap(left)
    --- End diff --
    
    canBuildLeft?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to