This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new e6618de [SPARK-27430][SQL] broadcast hint should be respected for broadcast nested loop join e6618de is described below commit e6618de809458d1a3ca3b6b5eaa83c0613018590 Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Wed Apr 17 19:29:28 2019 +0800 [SPARK-27430][SQL] broadcast hint should be respected for broadcast nested loop join ## What changes were proposed in this pull request? A followup of https://github.com/apache/spark/pull/24164 broadcast hint should be respected for broadcast nested loop join. This PR also refactors the related code a little bit, to save duplicated code. ## How was this patch tested? new tests Closes #24376 from cloud-fan/join. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/execution/SparkStrategies.scala | 77 +++++++++++----------- .../sql/execution/joins/BroadcastJoinSuite.scala | 26 +++++++- 2 files changed, 62 insertions(+), 41 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index efd05a3..c0a28fa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -115,6 +115,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { * 1) broadcasting the left side in a right outer join; * 2) broadcasting the right side in a left outer, left semi, left anti or existence join; * 3) broadcasting either side in an inner-like join. + * For other cases, we need to scan the data multiple times, which can be rather slow. * * - Shuffle-and-replicate nested loop join (a.k.a. cartesian product join): * Supports both equi-joins and non-equi-joins. @@ -306,16 +307,43 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // If it is not an equi-join, we first look at the join hints w.r.t. the following order: // 1. broadcast hint: pick broadcast nested loop join. If both sides have the broadcast - // hints, choose the smaller side (based on stats) to broadcast. + // hints, choose the smaller side (based on stats) to broadcast for inner and full joins, + // choose the left side for right join, and choose right side for left join. // 2. shuffle replicate NL hint: pick cartesian product if join type is inner like. // // If there is no hint or the hints are not applicable, we follow these rules one by one: - // 1. Pick cartesian product if join type is inner like, and both sides are too big to - // to broadcast. - // 2. Pick broadcast nested loop join. Pick the smaller side (based on stats) to broadcast. + // 1. Pick broadcast nested loop join if one side is small enough to broadcast. If only left + // side is broadcast-able and it's left join, or only right side is broadcast-able and + // it's right join, we skip this rule. If both sides are small, broadcasts the smaller + // side for inner and full joins, broadcasts the left side for right join, and broadcasts + // right side for left join. + // 2. Pick cartesian product if join type is inner like. + // 3. Pick broadcast nested loop join as the final solution. It may OOM but we don't have + // other choice. It broadcasts the smaller side for inner and full joins, broadcasts the + // left side for right join, and broadcasts right side for left join. case logical.Join(left, right, joinType, condition, hint) => + val desiredBuildSide = if (joinType.isInstanceOf[InnerLike] || joinType == FullOuter) { + getSmallerSide(left, right) + } else { + // For perf reasons, `BroadcastNestedLoopJoinExec` prefers to broadcast left side if + // it's a right join, and broadcast right side if it's a left join. + // TODO: revisit it. If left side is much smaller than the right side, it may be better + // to broadcast the left side even if it's a left join. + if (canBuildLeft(joinType)) BuildLeft else BuildRight + } + def createBroadcastNLJoin(buildLeft: Boolean, buildRight: Boolean) = { - getBuildSide(buildLeft, buildRight, left, right).map { buildSide => + val maybeBuildSide = if (buildLeft && buildRight) { + Some(desiredBuildSide) + } else if (buildLeft) { + Some(BuildLeft) + } else if (buildRight) { + Some(BuildRight) + } else { + None + } + + maybeBuildSide.map { buildSide => Seq(joins.BroadcastNestedLoopJoinExec( planLater(left), planLater(right), buildSide, joinType, condition)) } @@ -330,45 +358,18 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } def createJoinWithoutHint() = { - (if (!canBroadcast(left) && !canBroadcast(right)) createCartesianProduct() else None) + createBroadcastNLJoin(canBroadcast(left), canBroadcast(right)) + .orElse(createCartesianProduct()) .getOrElse { // This join could be very slow or OOM - val buildSide = getSmallerSide(left, right) Seq(joins.BroadcastNestedLoopJoinExec( - planLater(left), planLater(right), buildSide, joinType, condition)) + planLater(left), planLater(right), desiredBuildSide, joinType, condition)) } } - if (joinType.isInstanceOf[InnerLike] || joinType == FullOuter) { - createBroadcastNLJoin(hintToBroadcastLeft(hint), hintToBroadcastRight(hint)) - .orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None } - .getOrElse(createJoinWithoutHint()) - } else { - val smallerSide = getSmallerSide(left, right) - val buildSide = if (canBuildLeft(joinType)) { - // For RIGHT JOIN, we may broadcast left side even if the hint asks us to broadcast - // the right side. This is for history reasons. - if (hintToBroadcastLeft(hint) || canBroadcast(left)) { - BuildLeft - } else if (hintToBroadcastRight(hint)) { - BuildRight - } else { - smallerSide - } - } else { - // For LEFT JOIN, we may broadcast right side even if the hint asks us to broadcast - // the left side. This is for history reasons. - if (hintToBroadcastRight(hint) || canBroadcast(right)) { - BuildRight - } else if (hintToBroadcastLeft(hint)) { - BuildLeft - } else { - smallerSide - } - } - Seq(joins.BroadcastNestedLoopJoinExec( - planLater(left), planLater(right), buildSide, joinType, condition)) - } + createBroadcastNLJoin(hintToBroadcastLeft(hint), hintToBroadcastRight(hint)) + .orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None } + .getOrElse(createJoinWithoutHint()) // --- Cases where this strategy does not apply --------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala index 05c583c..91cb919 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala @@ -281,13 +281,16 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils { val t2Size = spark.table("t2").queryExecution.analyzed.children.head.stats.sizeInBytes assert(t1Size < t2Size) + /* ######## test cases for equal join ######### */ // INNER JOIN && t1Size < t2Size => BuildLeft assertJoinBuildSide( "SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 JOIN t2 ON t1.key = t2.key", bh, BuildLeft) // LEFT JOIN => BuildRight + // broadcast hash join can not build left side for left join. assertJoinBuildSide( "SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 LEFT JOIN t2 ON t1.key = t2.key", bh, BuildRight) // RIGHT JOIN => BuildLeft + // broadcast hash join can not build right side for right join. assertJoinBuildSide( "SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 RIGHT JOIN t2 ON t1.key = t2.key", bh, BuildLeft) // INNER JOIN && broadcast(t1) => BuildLeft @@ -297,16 +300,20 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils { assertJoinBuildSide( "SELECT /*+ MAPJOIN(t2) */ * FROM t1 JOIN t2 ON t1.key = t2.key", bh, BuildRight) - + /* ######## test cases for non-equal join ######### */ withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") { // INNER JOIN && t1Size < t2Size => BuildLeft assertJoinBuildSide("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 JOIN t2", bl, BuildLeft) // FULL JOIN && t1Size < t2Size => BuildLeft assertJoinBuildSide("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 FULL JOIN t2", bl, BuildLeft) + // FULL OUTER && t1Size < t2Size => BuildLeft + assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2", bl, BuildLeft) // LEFT JOIN => BuildRight assertJoinBuildSide("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 LEFT JOIN t2", bl, BuildRight) // RIGHT JOIN => BuildLeft assertJoinBuildSide("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 RIGHT JOIN t2", bl, BuildLeft) + + /* #### test with broadcast hint #### */ // INNER JOIN && broadcast(t1) => BuildLeft assertJoinBuildSide("SELECT /*+ MAPJOIN(t1) */ * FROM t1 JOIN t2", bl, BuildLeft) // INNER JOIN && broadcast(t2) => BuildRight @@ -316,8 +323,10 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils { // FULL OUTER && broadcast(t2) => BuildRight assertJoinBuildSide( "SELECT /*+ MAPJOIN(t2) */ * FROM t1 FULL OUTER JOIN t2", bl, BuildRight) - // FULL OUTER && t1Size < t2Size => BuildLeft - assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2", bl, BuildLeft) + // LEFT JOIN && broadcast(t1) => BuildLeft + assertJoinBuildSide("SELECT /*+ MAPJOIN(t1) */ * FROM t1 LEFT JOIN t2", bl, BuildLeft) + // RIGHT JOIN && broadcast(t2) => BuildRight + assertJoinBuildSide("SELECT /*+ MAPJOIN(t2) */ * FROM t1 RIGHT JOIN t2", bl, BuildRight) } } } @@ -332,6 +341,7 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils { val t2Size = spark.table("t2").queryExecution.analyzed.children.head.stats.sizeInBytes assert(t1Size < t2Size) + /* ######## test cases for equal join ######### */ assertJoinBuildSide("SELECT * FROM t1 JOIN t2 ON t1.key = t2.key", bh, BuildLeft) assertJoinBuildSide("SELECT * FROM t2 JOIN t1 ON t1.key = t2.key", bh, BuildRight) @@ -341,13 +351,23 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils { assertJoinBuildSide("SELECT * FROM t1 RIGHT JOIN t2 ON t1.key = t2.key", bh, BuildLeft) assertJoinBuildSide("SELECT * FROM t2 RIGHT JOIN t1 ON t1.key = t2.key", bh, BuildLeft) + /* ######## test cases for non-equal join ######### */ withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") { + // For full outer join, prefer to broadcast the smaller side. assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2", bl, BuildLeft) assertJoinBuildSide("SELECT * FROM t2 FULL OUTER JOIN t1", bl, BuildRight) + // For inner join, prefer to broadcast the smaller side, if broadcast-able. + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> (t2Size + 1).toString()) { + assertJoinBuildSide("SELECT * FROM t1 JOIN t2", bl, BuildLeft) + assertJoinBuildSide("SELECT * FROM t2 JOIN t1", bl, BuildRight) + } + + // For left join, prefer to broadcast the right side. assertJoinBuildSide("SELECT * FROM t1 LEFT JOIN t2", bl, BuildRight) assertJoinBuildSide("SELECT * FROM t2 LEFT JOIN t1", bl, BuildRight) + // For right join, prefer to broadcast the left side. assertJoinBuildSide("SELECT * FROM t1 RIGHT JOIN t2", bl, BuildLeft) assertJoinBuildSide("SELECT * FROM t2 RIGHT JOIN t1", bl, BuildLeft) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org