agrawaldevesh commented on a change in pull request #29342:
URL: https://github.com/apache/spark/pull/29342#discussion_r467709017



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
##########
@@ -1188,4 +1188,53 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
         classOf[BroadcastNestedLoopJoinExec]))
     }
   }
+
+  test("SPARK-32399: Full outer shuffled hash join") {
+    val inputDFs = Seq(
+      // Test unique join key
+      (spark.range(10).selectExpr("id as k1"),
+        spark.range(30).selectExpr("id as k2"),
+        $"k1" === $"k2"),
+      // Test non-unique join key
+      (spark.range(10).selectExpr("id % 5 as k1"),
+        spark.range(30).selectExpr("id % 5 as k2"),
+        $"k1" === $"k2"),
+      // Test string join key
+      (spark.range(10).selectExpr("cast(id * 3 as string) as k1"),
+        spark.range(30).selectExpr("cast(id as string) as k2"),
+        $"k1" === $"k2"),
+      // Test build side at right
+      (spark.range(30).selectExpr("cast(id / 3 as string) as k1"),
+        spark.range(10).selectExpr("cast(id as string) as k2"),
+        $"k1" === $"k2"),
+      // Test NULL join key
+      (spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr("value 
as k1"),
+        spark.range(30).map(i => if (i % 4 == 0) i else 
null).selectExpr("value as k2"),
+        $"k1" === $"k2"),
+      // Test multiple join keys
+      (spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr(
+        "value as k1", "cast(value % 5 as short) as k2", "cast(value * 3 as 
long) as k3"),
+        spark.range(30).map(i => if (i % 4 == 0) i else null).selectExpr(
+          "value as k4", "cast(value % 5 as short) as k5", "cast(value * 3 as 
long) as k6"),
+        $"k1" === $"k4" && $"k2" === $"k5" && $"k3" === $"k6")
+    )
+    inputDFs.foreach { case (df1, df2, joinExprs) =>
+      withSQLConf(
+        SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80",

Review comment:
       You have used the word 'careful tuning' several times but I am not fully 
following out of my stupidity: Do you mean that to say that the broadcast 
threshold should be chosen to be larger than the build side size, so as to NOT 
trigger a BHJ ? 
   
   Perhaps add a comment above this line to explain to the reader why the 
configs are chosen the way they are: For example why shuffle-partitions is 2 
etc.
   
   Btw, is it worth testing with shuffle partitions = 1 ?

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
##########
@@ -1188,4 +1188,42 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
         classOf[BroadcastNestedLoopJoinExec]))
     }
   }
+
+  test("SPARK-32399: Full outer shuffled hash join") {

Review comment:
       Thanks for noticing the lack in the test coverage and explaining this to 
me ! I understand this better now.
   
   But you see where I am coming from :-) I am simply concerned about data 
corruption (more precisely wrong result computation). 
   
   Ideally, we should first fix the test, land that fix and then retest this PR 
with the tests fixed. It seems like it shouldn't be too hard, nor cause any 
merge conflicts, but it would delay landing this PR.
   
   As a compromise, may I please suggest that:
   - Make that test change as mentioned in SPARK-32577 locally and run the sql 
tests locally to verify that they work.
   - Run JoinSuite with code coverage enabled and ensure that all of the 
changed code paths in this PR are covered. 
   
   I recently worked on a join related feature and I was humbled by how many 
bugs I was able to make around nulls, strings and its relationship with 
downstream union queries :-D
   
   I think the tests in JoinSuite are good, but I am more concerned about the 
unknown unknowns and any regressions. And interactions of join with other 
operators like aggregations and unions.
   
   This is not a blocker at all !. Just due diligence and paranoia.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
##########
@@ -1188,4 +1188,53 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
         classOf[BroadcastNestedLoopJoinExec]))
     }
   }
+
+  test("SPARK-32399: Full outer shuffled hash join") {
+    val inputDFs = Seq(
+      // Test unique join key
+      (spark.range(10).selectExpr("id as k1"),
+        spark.range(30).selectExpr("id as k2"),
+        $"k1" === $"k2"),
+      // Test non-unique join key
+      (spark.range(10).selectExpr("id % 5 as k1"),
+        spark.range(30).selectExpr("id % 5 as k2"),
+        $"k1" === $"k2"),
+      // Test string join key
+      (spark.range(10).selectExpr("cast(id * 3 as string) as k1"),
+        spark.range(30).selectExpr("cast(id as string) as k2"),
+        $"k1" === $"k2"),
+      // Test build side at right
+      (spark.range(30).selectExpr("cast(id / 3 as string) as k1"),
+        spark.range(10).selectExpr("cast(id as string) as k2"),
+        $"k1" === $"k2"),
+      // Test NULL join key
+      (spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr("value 
as k1"),
+        spark.range(30).map(i => if (i % 4 == 0) i else 
null).selectExpr("value as k2"),
+        $"k1" === $"k2"),
+      // Test multiple join keys
+      (spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr(

Review comment:
       How about uncorrelated nulls too : 'i % 2 == 0' on the left and say 'i % 
3 == 0'




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to