This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 8c690c412dd [SPARK-44251][SQL] Set nullable correctly on coalesced 
join key in full outer USING join
8c690c412dd is described below

commit 8c690c412dd30a03ca932547ded01104f37f9965
Author: Bruce Robbins <bersprock...@gmail.com>
AuthorDate: Tue Jul 11 11:18:44 2023 +0800

    [SPARK-44251][SQL] Set nullable correctly on coalesced join key in full 
outer USING join
    
    ### What changes were proposed in this pull request?
    
    For full outer joins employing USING, set the nullability of the coalesced 
join columns to true.
    
    ### Why are the changes needed?
    
    The following query produces incorrect results:
    ```
    create or replace temp view v1 as values (1, 2), (null, 7) as (c1, c2);
    create or replace temp view v2 as values (2, 3) as (c1, c2);
    
    select explode(array(c1)) as x
    from v1
    full outer join v2
    using (c1);
    
    -1   <== should be null
    1
    2
    ```
    The following query fails with a `NullPointerException`:
    ```
    create or replace temp view v1 as values ('1', 2), (null, 7) as (c1, c2);
    create or replace temp view v2 as values ('2', 3) as (c1, c2);
    
    select explode(array(c1)) as x
    from v1
    full outer join v2
    using (c1);
    
    23/06/25 17:06:39 ERROR Executor: Exception in task 0.0 in stage 14.0 (TID 
11)
    java.lang.NullPointerException
            at 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.write(UnsafeWriter.java:110)
            at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.generate_doConsume_0$(Unknown
 Source)
            at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.smj_consumeFullOuterJoinRow_0$(Unknown
 Source)
            at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.wholestagecodegen_findNextJoinRows_0$(Unknown
 Source)
            at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
 Source)
            at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
            at 
org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
    ...
    ```
    The above full outer joins implicitly add an aliased coalesce to the parent 
projection of the join: `coalesce(v1.c1, v2.c1) as c1`. In the case where only 
one side's key is nullable, the coalesce's nullability is false. As a result, 
the generator's output has nullable set as false. But this is incorrect: If one 
side has a row with explicit null key values, the other side's row will also 
have null key values (because the other side's row will be "made up"), and both 
the `coalesce` and  [...]
    
    While `UpdateNullability` actually repairs the nullability of the 
`coalesce` before execution, it doesn't recreate the generator output, so the 
nullability remains incorrect in `Generate#output`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New unit test.
    
    Closes #41809 from bersprockets/using_oddity2.
    
    Authored-by: Bruce Robbins <bersprock...@gmail.com>
    Signed-off-by: Yuming Wang <yumw...@ebay.com>
    (cherry picked from commit 7a27bc68c849041837e521285e33227c3d1f9853)
    Signed-off-by: Yuming Wang <yumw...@ebay.com>
---
 .../apache/spark/sql/catalyst/analysis/Analyzer.scala  | 10 ++++++++--
 .../test/scala/org/apache/spark/sql/JoinSuite.scala    | 18 ++++++++++++++++++
 2 files changed, 26 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index c7455a4a8f2..b7d174089bc 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -3469,8 +3469,14 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
         (rightKeys ++ lUniqueOutput.map(_.withNullability(true)) ++ 
rUniqueOutput,
           leftKeys.map(_.withNullability(true)))
       case FullOuter =>
-        // in full outer join, joinCols should be non-null if there is.
-        val joinedCols = joinPairs.map { case (l, r) => Alias(Coalesce(Seq(l, 
r)), l.name)() }
+        // In full outer join, we should return non-null values for the join 
columns
+        // if either side has non-null values for those columns. Therefore, 
for each
+        // join column pair, add a coalesce to return the non-null value, if 
it exists.
+        val joinedCols = joinPairs.map { case (l, r) =>
+          // Since this is a full outer join, either side could be null, so we 
explicitly
+          // set the nullability to true for both sides.
+          Alias(Coalesce(Seq(l.withNullability(true), 
r.withNullability(true))), l.name)()
+        }
         (joinedCols ++
           lUniqueOutput.map(_.withNullability(true)) ++
           rUniqueOutput.map(_.withNullability(true)),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 062814e58b9..c5ecc1bc841 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -1544,4 +1544,22 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
       checkAnswer(sql(query), expected)
     }
   }
+
+  test("SPARK-44251: Full outer USING join with null key value") {
+    withTempView("v1", "v2") {
+      sql("create or replace temp view v1 as values (1, 2), (null, 7) as (c1, 
c2)")
+      sql("create or replace temp view v2 as values (2, 3) as (c1, c2)")
+
+      val query =
+        """select explode(array(c1)) as x
+          |from v1
+          |full outer join v2
+          |using (c1)
+          |""".stripMargin
+
+      val expected = Seq(Row(null), Row(1), Row(2))
+
+      checkAnswer(sql(query), expected)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to