This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new c7ef5600a75 [SPARK-40963][SQL] Set nullable correctly in project created by `ExtractGenerator` c7ef5600a75 is described below commit c7ef5600a75789152af40804f539a3d075cf2c0c Author: Bruce Robbins <bersprock...@gmail.com> AuthorDate: Mon Oct 31 10:45:17 2022 +0900 [SPARK-40963][SQL] Set nullable correctly in project created by `ExtractGenerator` ### What changes were proposed in this pull request? When creating the project list for the new projection In `ExtractGenerator`, take into account whether the generator is outer when setting nullable on generator-related output attributes. ### Why are the changes needed? This PR fixes an issue that can produce either incorrect results or a `NullPointerException`. It's a bit of an obscure issue in that I am hard-pressed to reproduce without using a subquery that has a inline table. Example: ``` select c1, explode(c4) as c5 from ( select c1, array(c3) as c4 from ( select c1, explode_outer(c2) as c3 from values (1, array(1, 2)), (2, array(2, 3)), (3, null) as data(c1, c2) ) ); +---+---+ |c1 |c5 | +---+---+ |1 |1 | |1 |2 | |2 |2 | |2 |3 | |3 |0 | +---+---+ ``` In the last row, `c5` is 0, but should be `NULL`. Another example: ``` select c1, exists(c4, x -> x is null) as c5 from ( select c1, array(c3) as c4 from ( select c1, explode_outer(c2) as c3 from values (1, array(1, 2)), (2, array(2, 3)), (3, array()) as data(c1, c2) ) ); +---+-----+ |c1 |c5 | +---+-----+ |1 |false| |1 |false| |2 |false| |2 |false| |3 |false| +---+-----+ ``` In the last row, `false` should be `true`. In both cases, at the time `CreateArray(c3)` is instantiated, `c3`'s nullability is incorrect because the new projection created by `ExtractGenerator` uses `generatorOutput` from `explode_outer(c2)` as a projection list. `generatorOutput` doesn't take into account that `explode_outer(c2)` is an _outer_ explode, so the nullability setting is lost. `UpdateAttributeNullability` will eventually fix the nullable setting for attributes referring to `c3`, but it doesn't fix the `containsNull` setting for `c4` in `explode(c4)` (from the first example) or `exists(c4, x -> x is null)` (from the second example). This example fails with a `NullPointerException`: ``` select c1, inline_outer(c4) from ( select c1, array(c3) as c4 from ( select c1, explode_outer(c2) as c3 from values (1, array(named_struct('a', 1, 'b', 2))), (2, array(named_struct('a', 3, 'b', 4), named_struct('a', 5, 'b', 6))), (3, array()) as data(c1, c2) ) ); 22/10/30 17:34:42 ERROR Executor: Exception in task 1.0 in stage 8.0 (TID 14) java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.generate_doConsume_1$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.generate_doConsume_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364) ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New unit test. Closes #38440 from bersprockets/SPARK-40963. Authored-by: Bruce Robbins <bersprock...@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit 90d31541fb0313d762cc36067060e6445c04a9b6) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 2 +- .../plans/logical/basicLogicalOperators.scala | 17 +++++++++-------- .../org/apache/spark/sql/GeneratorFunctionSuite.scala | 19 +++++++++++++++++++ 3 files changed, 29 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 3a3997ff9c7..ad40f924ef8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2826,7 +2826,7 @@ class Analyzer(override val catalogManager: CatalogManager) generatorOutput = ResolveGenerate.makeGeneratorOutput(generator, names), child) - (Some(g), res._2 ++ g.generatorOutput) + (Some(g), res._2 ++ g.nullableOutput) case other => (res._1, res._2 :+ other) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index b52ce468390..bdc6e48d08a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -143,16 +143,17 @@ case class Generate( override def producedAttributes: AttributeSet = AttributeSet(generatorOutput) + def nullableOutput: Seq[Attribute] = { + generatorOutput.map { a => + a.withNullability(outer || a.nullable) + } + } + def qualifiedGeneratorOutput: Seq[Attribute] = { - val qualifiedOutput = qualifier.map { q => + qualifier.map { q => // prepend the new qualifier to the existed one - generatorOutput.map(a => a.withQualifier(Seq(q))) - }.getOrElse(generatorOutput) - val nullableOutput = qualifiedOutput.map { - // if outer, make all attributes nullable, otherwise keep existing nullability - a => a.withNullability(outer || a.nullable) - } - nullableOutput + nullableOutput.map(a => a.withQualifier(Seq(q))) + }.getOrElse(nullableOutput) } def output: Seq[Attribute] = requiredChildOutput ++ qualifiedGeneratorOutput diff --git a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala index 08280c08cd2..49cdc802410 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala @@ -425,6 +425,25 @@ class GeneratorFunctionSuite extends QueryTest with SharedSparkSession { testNullStruct } } + + test("SPARK-40963: generator output has correct nullability") { + // This test does not check nullability directly. Before SPARK-40963, + // the below query got wrong results due to incorrect nullability. + val df = sql( + """select c1, explode(c4) as c5 from ( + | select c1, array(c3) as c4 from ( + | select c1, explode_outer(c2) as c3 + | from values + | (1, array(1, 2)), + | (2, array(2, 3)), + | (3, null) + | as data(c1, c2) + | ) + |) + |""".stripMargin) + checkAnswer(df, + Row(1, 1) :: Row(1, 2) :: Row(2, 2) :: Row(2, 3) :: Row(3, null) :: Nil) + } } case class EmptyGenerator() extends Generator with LeafLike[Expression] { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org