This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e6618de  [SPARK-27430][SQL] broadcast hint should be respected for 
broadcast nested loop join
e6618de is described below

commit e6618de809458d1a3ca3b6b5eaa83c0613018590
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Wed Apr 17 19:29:28 2019 +0800

    [SPARK-27430][SQL] broadcast hint should be respected for broadcast nested 
loop join
    
    ## What changes were proposed in this pull request?
    
    A followup of https://github.com/apache/spark/pull/24164
    
    broadcast hint should be respected for broadcast nested loop join. This PR 
also refactors the related code a little bit, to save duplicated code.
    
    ## How was this patch tested?
    
    new tests
    
    Closes #24376 from cloud-fan/join.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/execution/SparkStrategies.scala      | 77 +++++++++++-----------
 .../sql/execution/joins/BroadcastJoinSuite.scala   | 26 +++++++-
 2 files changed, 62 insertions(+), 41 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index efd05a3..c0a28fa 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -115,6 +115,7 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
    *       1) broadcasting the left side in a right outer join;
    *       2) broadcasting the right side in a left outer, left semi, left 
anti or existence join;
    *       3) broadcasting either side in an inner-like join.
+   *     For other cases, we need to scan the data multiple times, which can 
be rather slow.
    *
    * - Shuffle-and-replicate nested loop join (a.k.a. cartesian product join):
    *     Supports both equi-joins and non-equi-joins.
@@ -306,16 +307,43 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 
       // If it is not an equi-join, we first look at the join hints w.r.t. the 
following order:
       //   1. broadcast hint: pick broadcast nested loop join. If both sides 
have the broadcast
-      //      hints, choose the smaller side (based on stats) to broadcast.
+      //      hints, choose the smaller side (based on stats) to broadcast for 
inner and full joins,
+      //      choose the left side for right join, and choose right side for 
left join.
       //   2. shuffle replicate NL hint: pick cartesian product if join type 
is inner like.
       //
       // If there is no hint or the hints are not applicable, we follow these 
rules one by one:
-      //   1. Pick cartesian product if join type is inner like, and both 
sides are too big to
-      //      to broadcast.
-      //   2. Pick broadcast nested loop join. Pick the smaller side (based on 
stats) to broadcast.
+      //   1. Pick broadcast nested loop join if one side is small enough to 
broadcast. If only left
+      //      side is broadcast-able and it's left join, or only right side is 
broadcast-able and
+      //      it's right join, we skip this rule. If both sides are small, 
broadcasts the smaller
+      //      side for inner and full joins, broadcasts the left side for 
right join, and broadcasts
+      //      right side for left join.
+      //   2. Pick cartesian product if join type is inner like.
+      //   3. Pick broadcast nested loop join as the final solution. It may 
OOM but we don't have
+      //      other choice. It broadcasts the smaller side for inner and full 
joins, broadcasts the
+      //      left side for right join, and broadcasts right side for left 
join.
       case logical.Join(left, right, joinType, condition, hint) =>
+        val desiredBuildSide = if (joinType.isInstanceOf[InnerLike] || 
joinType == FullOuter) {
+          getSmallerSide(left, right)
+        } else {
+          // For perf reasons, `BroadcastNestedLoopJoinExec` prefers to 
broadcast left side if
+          // it's a right join, and broadcast right side if it's a left join.
+          // TODO: revisit it. If left side is much smaller than the right 
side, it may be better
+          // to broadcast the left side even if it's a left join.
+          if (canBuildLeft(joinType)) BuildLeft else BuildRight
+        }
+
         def createBroadcastNLJoin(buildLeft: Boolean, buildRight: Boolean) = {
-          getBuildSide(buildLeft, buildRight, left, right).map { buildSide =>
+          val maybeBuildSide = if (buildLeft && buildRight) {
+            Some(desiredBuildSide)
+          } else if (buildLeft) {
+            Some(BuildLeft)
+          } else if (buildRight) {
+            Some(BuildRight)
+          } else {
+            None
+          }
+
+          maybeBuildSide.map { buildSide =>
             Seq(joins.BroadcastNestedLoopJoinExec(
               planLater(left), planLater(right), buildSide, joinType, 
condition))
           }
@@ -330,45 +358,18 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
         }
 
         def createJoinWithoutHint() = {
-          (if (!canBroadcast(left) && !canBroadcast(right)) 
createCartesianProduct() else None)
+          createBroadcastNLJoin(canBroadcast(left), canBroadcast(right))
+            .orElse(createCartesianProduct())
             .getOrElse {
               // This join could be very slow or OOM
-              val buildSide = getSmallerSide(left, right)
               Seq(joins.BroadcastNestedLoopJoinExec(
-                planLater(left), planLater(right), buildSide, joinType, 
condition))
+                planLater(left), planLater(right), desiredBuildSide, joinType, 
condition))
             }
         }
 
-        if (joinType.isInstanceOf[InnerLike] || joinType == FullOuter) {
-          createBroadcastNLJoin(hintToBroadcastLeft(hint), 
hintToBroadcastRight(hint))
-            .orElse { if (hintToShuffleReplicateNL(hint)) 
createCartesianProduct() else None }
-            .getOrElse(createJoinWithoutHint())
-        } else {
-          val smallerSide = getSmallerSide(left, right)
-          val buildSide = if (canBuildLeft(joinType)) {
-            // For RIGHT JOIN, we may broadcast left side even if the hint 
asks us to broadcast
-            // the right side. This is for history reasons.
-            if (hintToBroadcastLeft(hint) || canBroadcast(left)) {
-              BuildLeft
-            } else if (hintToBroadcastRight(hint)) {
-              BuildRight
-            } else {
-              smallerSide
-            }
-          } else {
-            // For LEFT JOIN, we may broadcast right side even if the hint 
asks us to broadcast
-            // the left side. This is for history reasons.
-            if (hintToBroadcastRight(hint) || canBroadcast(right)) {
-              BuildRight
-            } else if (hintToBroadcastLeft(hint)) {
-              BuildLeft
-            } else {
-              smallerSide
-            }
-          }
-          Seq(joins.BroadcastNestedLoopJoinExec(
-            planLater(left), planLater(right), buildSide, joinType, condition))
-        }
+        createBroadcastNLJoin(hintToBroadcastLeft(hint), 
hintToBroadcastRight(hint))
+          .orElse { if (hintToShuffleReplicateNL(hint)) 
createCartesianProduct() else None }
+          .getOrElse(createJoinWithoutHint())
 
 
       // --- Cases where this strategy does not apply 
---------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
index 05c583c..91cb919 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
@@ -281,13 +281,16 @@ class BroadcastJoinSuite extends QueryTest with 
SQLTestUtils {
       val t2Size = 
spark.table("t2").queryExecution.analyzed.children.head.stats.sizeInBytes
       assert(t1Size < t2Size)
 
+      /* ######## test cases for equal join ######### */
       // INNER JOIN && t1Size < t2Size => BuildLeft
       assertJoinBuildSide(
         "SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 JOIN t2 ON t1.key = t2.key", 
bh, BuildLeft)
       // LEFT JOIN => BuildRight
+      // broadcast hash join can not build left side for left join.
       assertJoinBuildSide(
         "SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 LEFT JOIN t2 ON t1.key = 
t2.key", bh, BuildRight)
       // RIGHT JOIN => BuildLeft
+      // broadcast hash join can not build right side for right join.
       assertJoinBuildSide(
         "SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 RIGHT JOIN t2 ON t1.key = 
t2.key", bh, BuildLeft)
       // INNER JOIN && broadcast(t1) => BuildLeft
@@ -297,16 +300,20 @@ class BroadcastJoinSuite extends QueryTest with 
SQLTestUtils {
       assertJoinBuildSide(
         "SELECT /*+ MAPJOIN(t2) */ * FROM t1 JOIN t2 ON t1.key = t2.key", bh, 
BuildRight)
 
-
+      /* ######## test cases for non-equal join ######### */
       withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
         // INNER JOIN && t1Size < t2Size => BuildLeft
         assertJoinBuildSide("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 JOIN t2", 
bl, BuildLeft)
         // FULL JOIN && t1Size < t2Size => BuildLeft
         assertJoinBuildSide("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 FULL JOIN 
t2", bl, BuildLeft)
+        // FULL OUTER && t1Size < t2Size => BuildLeft
+        assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2", bl, 
BuildLeft)
         // LEFT JOIN => BuildRight
         assertJoinBuildSide("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 LEFT JOIN 
t2", bl, BuildRight)
         // RIGHT JOIN => BuildLeft
         assertJoinBuildSide("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 RIGHT 
JOIN t2", bl, BuildLeft)
+
+        /* #### test with broadcast hint #### */
         // INNER JOIN && broadcast(t1) => BuildLeft
         assertJoinBuildSide("SELECT /*+ MAPJOIN(t1) */ * FROM t1 JOIN t2", bl, 
BuildLeft)
         // INNER JOIN && broadcast(t2) => BuildRight
@@ -316,8 +323,10 @@ class BroadcastJoinSuite extends QueryTest with 
SQLTestUtils {
         // FULL OUTER && broadcast(t2) => BuildRight
         assertJoinBuildSide(
           "SELECT /*+ MAPJOIN(t2) */ * FROM t1 FULL OUTER JOIN t2", bl, 
BuildRight)
-        // FULL OUTER && t1Size < t2Size => BuildLeft
-        assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2", bl, 
BuildLeft)
+        // LEFT JOIN && broadcast(t1) => BuildLeft
+        assertJoinBuildSide("SELECT /*+ MAPJOIN(t1) */ * FROM t1 LEFT JOIN 
t2", bl, BuildLeft)
+        // RIGHT JOIN && broadcast(t2) => BuildRight
+        assertJoinBuildSide("SELECT /*+ MAPJOIN(t2) */ * FROM t1 RIGHT JOIN 
t2", bl, BuildRight)
       }
     }
   }
@@ -332,6 +341,7 @@ class BroadcastJoinSuite extends QueryTest with 
SQLTestUtils {
       val t2Size = 
spark.table("t2").queryExecution.analyzed.children.head.stats.sizeInBytes
       assert(t1Size < t2Size)
 
+      /* ######## test cases for equal join ######### */
       assertJoinBuildSide("SELECT * FROM t1 JOIN t2 ON t1.key = t2.key", bh, 
BuildLeft)
       assertJoinBuildSide("SELECT * FROM t2 JOIN t1 ON t1.key = t2.key", bh, 
BuildRight)
 
@@ -341,13 +351,23 @@ class BroadcastJoinSuite extends QueryTest with 
SQLTestUtils {
       assertJoinBuildSide("SELECT * FROM t1 RIGHT JOIN t2 ON t1.key = t2.key", 
bh, BuildLeft)
       assertJoinBuildSide("SELECT * FROM t2 RIGHT JOIN t1 ON t1.key = t2.key", 
bh, BuildLeft)
 
+      /* ######## test cases for non-equal join ######### */
       withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
+        // For full outer join, prefer to broadcast the smaller side.
         assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2", bl, 
BuildLeft)
         assertJoinBuildSide("SELECT * FROM t2 FULL OUTER JOIN t1", bl, 
BuildRight)
 
+        // For inner join, prefer to broadcast the smaller side, if 
broadcast-able.
+        withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> (t2Size + 
1).toString()) {
+          assertJoinBuildSide("SELECT * FROM t1 JOIN t2", bl, BuildLeft)
+          assertJoinBuildSide("SELECT * FROM t2 JOIN t1", bl, BuildRight)
+        }
+
+        // For left join, prefer to broadcast the right side.
         assertJoinBuildSide("SELECT * FROM t1 LEFT JOIN t2", bl, BuildRight)
         assertJoinBuildSide("SELECT * FROM t2 LEFT JOIN t1", bl, BuildRight)
 
+        // For right join, prefer to broadcast the left side.
         assertJoinBuildSide("SELECT * FROM t1 RIGHT JOIN t2", bl, BuildLeft)
         assertJoinBuildSide("SELECT * FROM t2 RIGHT JOIN t1", bl, BuildLeft)
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to