This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 9e61043  [SPARK-34681][SQL] Fix bug for full outer shuffled hash join 
when building left side with non-equal condition
9e61043 is described below

commit 9e610438a6211aa8629637644c512a41332d12a5
Author: Cheng Su <chen...@fb.com>
AuthorDate: Tue Mar 9 22:55:27 2021 -0800

    [SPARK-34681][SQL] Fix bug for full outer shuffled hash join when building 
left side with non-equal condition
    
    ### What changes were proposed in this pull request?
    
    For full outer shuffled hash join with building hash map on left side, and 
having non-equal condition, the join can produce wrong result.
    
    The root cause is `boundCondition` in `HashJoin.scala` always assumes the 
left side row is `streamedPlan` and right side row is `buildPlan` 
([streamedPlan.output ++ 
buildPlan.output](https://github.com/apache/spark/blob/branch-3.1/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala#L141)).
 This is valid assumption, except for full outer + build left case.
    
    The fix is to correct `boundCondition` in `HashJoin.scala` to handle full 
outer + build left case properly. See reproduce in 
https://issues.apache.org/jira/browse/SPARK-32399?focusedCommentId=17298414&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17298414
 .
    
    ### Why are the changes needed?
    
    Fix data correctness bug.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Changed the test in `OuterJoinSuite.scala` to cover full outer shuffled 
hash join.
    Before this change, the unit test `basic full outer join using 
ShuffledHashJoin` in `OuterJoinSuite.scala` is failed.
    
    Closes #31792 from c21/join-bugfix.
    
    Authored-by: Cheng Su <chen...@fb.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
    (cherry picked from commit a916690dd9aac40df38922dbea233785354a2f2a)
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../spark/sql/execution/joins/HashJoin.scala       |  8 +++++++-
 .../spark/sql/execution/joins/OuterJoinSuite.scala | 22 ++++++++++------------
 2 files changed, 17 insertions(+), 13 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index 53bd591..42219ee 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -138,7 +138,13 @@ trait HashJoin extends BaseJoinExec with CodegenSupport {
     UnsafeProjection.create(streamedBoundKeys)
 
   @transient protected[this] lazy val boundCondition = if 
(condition.isDefined) {
-    Predicate.create(condition.get, streamedPlan.output ++ 
buildPlan.output).eval _
+    if (joinType == FullOuter && buildSide == BuildLeft) {
+      // Put join left side before right side. This is to be consistent with
+      // `ShuffledHashJoinExec.fullOuterJoin`.
+      Predicate.create(condition.get, buildPlan.output ++ 
streamedPlan.output).eval _
+    } else {
+      Predicate.create(condition.get, streamedPlan.output ++ 
buildPlan.output).eval _
+    }
   } else {
     (r: InternalRow) => true
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala
index 9f7e0a1..238d37a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala
@@ -104,18 +104,16 @@ class OuterJoinSuite extends SparkPlanTest with 
SharedSparkSession {
       ExtractEquiJoinKeys.unapply(join)
     }
 
-    if (joinType != FullOuter) {
-      test(s"$testName using ShuffledHashJoin") {
-        extractJoinParts().foreach { case (_, leftKeys, rightKeys, 
boundCondition, _, _, _) =>
-          withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
-            val buildSide = if (joinType == LeftOuter) BuildRight else 
BuildLeft
-            checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: 
SparkPlan) =>
-              EnsureRequirements.apply(
-                ShuffledHashJoinExec(
-                  leftKeys, rightKeys, joinType, buildSide, boundCondition, 
left, right)),
-              expectedAnswer.map(Row.fromTuple),
-              sortAnswers = true)
-          }
+    test(s"$testName using ShuffledHashJoin") {
+      extractJoinParts().foreach { case (_, leftKeys, rightKeys, 
boundCondition, _, _, _) =>
+        withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
+          val buildSide = if (joinType == LeftOuter) BuildRight else BuildLeft
+          checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: 
SparkPlan) =>
+            EnsureRequirements.apply(
+              ShuffledHashJoinExec(
+                leftKeys, rightKeys, joinType, buildSide, boundCondition, 
left, right)),
+            expectedAnswer.map(Row.fromTuple),
+            sortAnswers = true)
         }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to