This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new 9e61043 [SPARK-34681][SQL] Fix bug for full outer shuffled hash join when building left side with non-equal condition 9e61043 is described below commit 9e610438a6211aa8629637644c512a41332d12a5 Author: Cheng Su <chen...@fb.com> AuthorDate: Tue Mar 9 22:55:27 2021 -0800 [SPARK-34681][SQL] Fix bug for full outer shuffled hash join when building left side with non-equal condition ### What changes were proposed in this pull request? For full outer shuffled hash join with building hash map on left side, and having non-equal condition, the join can produce wrong result. The root cause is `boundCondition` in `HashJoin.scala` always assumes the left side row is `streamedPlan` and right side row is `buildPlan` ([streamedPlan.output ++ buildPlan.output](https://github.com/apache/spark/blob/branch-3.1/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala#L141)). This is valid assumption, except for full outer + build left case. The fix is to correct `boundCondition` in `HashJoin.scala` to handle full outer + build left case properly. See reproduce in https://issues.apache.org/jira/browse/SPARK-32399?focusedCommentId=17298414&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17298414 . ### Why are the changes needed? Fix data correctness bug. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Changed the test in `OuterJoinSuite.scala` to cover full outer shuffled hash join. Before this change, the unit test `basic full outer join using ShuffledHashJoin` in `OuterJoinSuite.scala` is failed. Closes #31792 from c21/join-bugfix. Authored-by: Cheng Su <chen...@fb.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> (cherry picked from commit a916690dd9aac40df38922dbea233785354a2f2a) Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../spark/sql/execution/joins/HashJoin.scala | 8 +++++++- .../spark/sql/execution/joins/OuterJoinSuite.scala | 22 ++++++++++------------ 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala index 53bd591..42219ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala @@ -138,7 +138,13 @@ trait HashJoin extends BaseJoinExec with CodegenSupport { UnsafeProjection.create(streamedBoundKeys) @transient protected[this] lazy val boundCondition = if (condition.isDefined) { - Predicate.create(condition.get, streamedPlan.output ++ buildPlan.output).eval _ + if (joinType == FullOuter && buildSide == BuildLeft) { + // Put join left side before right side. This is to be consistent with + // `ShuffledHashJoinExec.fullOuterJoin`. + Predicate.create(condition.get, buildPlan.output ++ streamedPlan.output).eval _ + } else { + Predicate.create(condition.get, streamedPlan.output ++ buildPlan.output).eval _ + } } else { (r: InternalRow) => true } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala index 9f7e0a1..238d37a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala @@ -104,18 +104,16 @@ class OuterJoinSuite extends SparkPlanTest with SharedSparkSession { ExtractEquiJoinKeys.unapply(join) } - if (joinType != FullOuter) { - test(s"$testName using ShuffledHashJoin") { - extractJoinParts().foreach { case (_, leftKeys, rightKeys, boundCondition, _, _, _) => - withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") { - val buildSide = if (joinType == LeftOuter) BuildRight else BuildLeft - checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => - EnsureRequirements.apply( - ShuffledHashJoinExec( - leftKeys, rightKeys, joinType, buildSide, boundCondition, left, right)), - expectedAnswer.map(Row.fromTuple), - sortAnswers = true) - } + test(s"$testName using ShuffledHashJoin") { + extractJoinParts().foreach { case (_, leftKeys, rightKeys, boundCondition, _, _, _) => + withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") { + val buildSide = if (joinType == LeftOuter) BuildRight else BuildLeft + checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => + EnsureRequirements.apply( + ShuffledHashJoinExec( + leftKeys, rightKeys, joinType, buildSide, boundCondition, left, right)), + expectedAnswer.map(Row.fromTuple), + sortAnswers = true) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org