This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 90d31541fb0 [SPARK-40963][SQL] Set nullable correctly in project 
created by `ExtractGenerator`
90d31541fb0 is described below

commit 90d31541fb0313d762cc36067060e6445c04a9b6
Author: Bruce Robbins <bersprock...@gmail.com>
AuthorDate: Mon Oct 31 10:45:17 2022 +0900

    [SPARK-40963][SQL] Set nullable correctly in project created by 
`ExtractGenerator`
    
    ### What changes were proposed in this pull request?
    
    When creating the project list for the new projection In 
`ExtractGenerator`, take into account whether the generator is outer when 
setting nullable on generator-related output attributes.
    
    ### Why are the changes needed?
    
    This PR fixes an issue that can produce either incorrect results or a 
`NullPointerException`. It's a bit of an obscure issue in that I am 
hard-pressed to reproduce without using a subquery that has a inline table.
    
    Example:
    ```
    select c1, explode(c4) as c5 from (
      select c1, array(c3) as c4 from (
        select c1, explode_outer(c2) as c3
        from values
        (1, array(1, 2)),
        (2, array(2, 3)),
        (3, null)
        as data(c1, c2)
      )
    );
    
    +---+---+
    |c1 |c5 |
    +---+---+
    |1  |1  |
    |1  |2  |
    |2  |2  |
    |2  |3  |
    |3  |0  |
    +---+---+
    ```
    In the last row, `c5` is 0, but should be `NULL`.
    
    Another example:
    ```
    select c1, exists(c4, x -> x is null) as c5 from (
      select c1, array(c3) as c4 from (
        select c1, explode_outer(c2) as c3
        from values
        (1, array(1, 2)),
        (2, array(2, 3)),
        (3, array())
        as data(c1, c2)
      )
    );
    
    +---+-----+
    |c1 |c5   |
    +---+-----+
    |1  |false|
    |1  |false|
    |2  |false|
    |2  |false|
    |3  |false|
    +---+-----+
    ```
    In the last row, `false` should be `true`.
    
    In both cases, at the time `CreateArray(c3)` is instantiated, `c3`'s 
nullability is incorrect because the new projection created by 
`ExtractGenerator` uses `generatorOutput` from `explode_outer(c2)` as a 
projection list. `generatorOutput` doesn't take into account that 
`explode_outer(c2)` is an _outer_ explode, so the nullability setting is lost.
    
    `UpdateAttributeNullability` will eventually fix the nullable setting for 
attributes referring to `c3`, but it doesn't fix the `containsNull` setting for 
`c4` in `explode(c4)` (from the first example) or `exists(c4, x -> x is null)` 
(from the second example).
    
    This example fails with a `NullPointerException`:
    ```
    select c1, inline_outer(c4) from (
      select c1, array(c3) as c4 from (
        select c1, explode_outer(c2) as c3
        from values
        (1, array(named_struct('a', 1, 'b', 2))),
        (2, array(named_struct('a', 3, 'b', 4), named_struct('a', 5, 'b', 6))),
        (3, array())
        as data(c1, c2)
      )
    );
    22/10/30 17:34:42 ERROR Executor: Exception in task 1.0 in stage 8.0 (TID 
14)
    java.lang.NullPointerException
            at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.generate_doConsume_1$(Unknown
 Source)
            at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.generate_doConsume_0$(Unknown
 Source)
            at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
            at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
            at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
            at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    
    New unit test.
    
    Closes #38440 from bersprockets/SPARK-40963.
    
    Authored-by: Bruce Robbins <bersprock...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../apache/spark/sql/catalyst/analysis/Analyzer.scala |  2 +-
 .../plans/logical/basicLogicalOperators.scala         | 17 +++++++++--------
 .../org/apache/spark/sql/GeneratorFunctionSuite.scala | 19 +++++++++++++++++++
 3 files changed, 29 insertions(+), 9 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index fc12b6522b4..c7b84405412 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -2910,7 +2910,7 @@ class Analyzer(override val catalogManager: 
CatalogManager)
                   generatorOutput = 
ResolveGenerate.makeGeneratorOutput(generator, names),
                   child)
 
-                (Some(g), res._2 ++ g.generatorOutput)
+                (Some(g), res._2 ++ g.nullableOutput)
               case other =>
                 (res._1, res._2 :+ other)
             }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 793fecd5a5b..2b687b10312 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -273,16 +273,17 @@ case class Generate(
 
   override def producedAttributes: AttributeSet = AttributeSet(generatorOutput)
 
+  def nullableOutput: Seq[Attribute] = {
+    generatorOutput.map { a =>
+      a.withNullability(outer || a.nullable)
+    }
+  }
+
   def qualifiedGeneratorOutput: Seq[Attribute] = {
-    val qualifiedOutput = qualifier.map { q =>
+    qualifier.map { q =>
       // prepend the new qualifier to the existed one
-      generatorOutput.map(a => a.withQualifier(Seq(q)))
-    }.getOrElse(generatorOutput)
-    val nullableOutput = qualifiedOutput.map {
-      // if outer, make all attributes nullable, otherwise keep existing 
nullability
-      a => a.withNullability(outer || a.nullable)
-    }
-    nullableOutput
+      nullableOutput.map(a => a.withQualifier(Seq(q)))
+    }.getOrElse(nullableOutput)
   }
 
   def output: Seq[Attribute] = requiredChildOutput ++ qualifiedGeneratorOutput
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
index 25231fdecba..3fb66f08cea 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
@@ -425,6 +425,25 @@ class GeneratorFunctionSuite extends QueryTest with 
SharedSparkSession {
       testNullStruct
     }
   }
+
+  test("SPARK-40963: generator output has correct nullability") {
+    // This test does not check nullability directly. Before SPARK-40963,
+    // the below query got wrong results due to incorrect nullability.
+    val df = sql(
+      """select c1, explode(c4) as c5 from (
+        |  select c1, array(c3) as c4 from (
+        |    select c1, explode_outer(c2) as c3
+        |    from values
+        |    (1, array(1, 2)),
+        |    (2, array(2, 3)),
+        |    (3, null)
+        |    as data(c1, c2)
+        |  )
+        |)
+        |""".stripMargin)
+    checkAnswer(df,
+      Row(1, 1) :: Row(1, 2) :: Row(2, 2) :: Row(2, 3) :: Row(3, null) :: Nil)
+  }
 }
 
 case class EmptyGenerator() extends Generator with LeafLike[Expression] {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to