This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 815f32fdf4f [SPARK-40963][SQL] Set nullable correctly in project
created by `ExtractGenerator`
815f32fdf4f is described below
commit 815f32fdf4f6ea61d1eddb5f2d4d05b26a30f671
Author: Bruce Robbins <[email protected]>
AuthorDate: Mon Oct 31 10:45:17 2022 +0900
[SPARK-40963][SQL] Set nullable correctly in project created by
`ExtractGenerator`
### What changes were proposed in this pull request?
When creating the project list for the new projection In
`ExtractGenerator`, take into account whether the generator is outer when
setting nullable on generator-related output attributes.
### Why are the changes needed?
This PR fixes an issue that can produce either incorrect results or a
`NullPointerException`. It's a bit of an obscure issue in that I am
hard-pressed to reproduce without using a subquery that has a inline table.
Example:
```
select c1, explode(c4) as c5 from (
select c1, array(c3) as c4 from (
select c1, explode_outer(c2) as c3
from values
(1, array(1, 2)),
(2, array(2, 3)),
(3, null)
as data(c1, c2)
)
);
+---+---+
|c1 |c5 |
+---+---+
|1 |1 |
|1 |2 |
|2 |2 |
|2 |3 |
|3 |0 |
+---+---+
```
In the last row, `c5` is 0, but should be `NULL`.
Another example:
```
select c1, exists(c4, x -> x is null) as c5 from (
select c1, array(c3) as c4 from (
select c1, explode_outer(c2) as c3
from values
(1, array(1, 2)),
(2, array(2, 3)),
(3, array())
as data(c1, c2)
)
);
+---+-----+
|c1 |c5 |
+---+-----+
|1 |false|
|1 |false|
|2 |false|
|2 |false|
|3 |false|
+---+-----+
```
In the last row, `false` should be `true`.
In both cases, at the time `CreateArray(c3)` is instantiated, `c3`'s
nullability is incorrect because the new projection created by
`ExtractGenerator` uses `generatorOutput` from `explode_outer(c2)` as a
projection list. `generatorOutput` doesn't take into account that
`explode_outer(c2)` is an _outer_ explode, so the nullability setting is lost.
`UpdateAttributeNullability` will eventually fix the nullable setting for
attributes referring to `c3`, but it doesn't fix the `containsNull` setting for
`c4` in `explode(c4)` (from the first example) or `exists(c4, x -> x is null)`
(from the second example).
This example fails with a `NullPointerException`:
```
select c1, inline_outer(c4) from (
select c1, array(c3) as c4 from (
select c1, explode_outer(c2) as c3
from values
(1, array(named_struct('a', 1, 'b', 2))),
(2, array(named_struct('a', 3, 'b', 4), named_struct('a', 5, 'b', 6))),
(3, array())
as data(c1, c2)
)
);
22/10/30 17:34:42 ERROR Executor: Exception in task 1.0 in stage 8.0 (TID
14)
java.lang.NullPointerException
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.generate_doConsume_1$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.generate_doConsume_0$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New unit test.
Closes #38440 from bersprockets/SPARK-40963.
Authored-by: Bruce Robbins <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 90d31541fb0313d762cc36067060e6445c04a9b6)
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../apache/spark/sql/catalyst/analysis/Analyzer.scala | 2 +-
.../plans/logical/basicLogicalOperators.scala | 17 +++++++++--------
.../org/apache/spark/sql/GeneratorFunctionSuite.scala | 19 +++++++++++++++++++
3 files changed, 29 insertions(+), 9 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 8d6261a7847..c5b2229db31 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -2834,7 +2834,7 @@ class Analyzer(override val catalogManager:
CatalogManager)
generatorOutput =
ResolveGenerate.makeGeneratorOutput(generator, names),
child)
- (Some(g), res._2 ++ g.generatorOutput)
+ (Some(g), res._2 ++ g.nullableOutput)
case other =>
(res._1, res._2 :+ other)
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 53c95a4ffd3..f6d337aef26 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -143,16 +143,17 @@ case class Generate(
override def producedAttributes: AttributeSet = AttributeSet(generatorOutput)
+ def nullableOutput: Seq[Attribute] = {
+ generatorOutput.map { a =>
+ a.withNullability(outer || a.nullable)
+ }
+ }
+
def qualifiedGeneratorOutput: Seq[Attribute] = {
- val qualifiedOutput = qualifier.map { q =>
+ qualifier.map { q =>
// prepend the new qualifier to the existed one
- generatorOutput.map(a => a.withQualifier(Seq(q)))
- }.getOrElse(generatorOutput)
- val nullableOutput = qualifiedOutput.map {
- // if outer, make all attributes nullable, otherwise keep existing
nullability
- a => a.withNullability(outer || a.nullable)
- }
- nullableOutput
+ nullableOutput.map(a => a.withQualifier(Seq(q)))
+ }.getOrElse(nullableOutput)
}
def output: Seq[Attribute] = requiredChildOutput ++ qualifiedGeneratorOutput
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
index 4ce9fc3a17b..0ba746b1b19 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
@@ -400,6 +400,25 @@ class GeneratorFunctionSuite extends QueryTest with
SharedSparkSession {
testNullStruct
}
}
+
+ test("SPARK-40963: generator output has correct nullability") {
+ // This test does not check nullability directly. Before SPARK-40963,
+ // the below query got wrong results due to incorrect nullability.
+ val df = sql(
+ """select c1, explode(c4) as c5 from (
+ | select c1, array(c3) as c4 from (
+ | select c1, explode_outer(c2) as c3
+ | from values
+ | (1, array(1, 2)),
+ | (2, array(2, 3)),
+ | (3, null)
+ | as data(c1, c2)
+ | )
+ |)
+ |""".stripMargin)
+ checkAnswer(df,
+ Row(1, 1) :: Row(1, 2) :: Row(2, 2) :: Row(2, 3) :: Row(3, null) :: Nil)
+ }
}
case class EmptyGenerator() extends Generator with LeafLike[Expression] {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]