This is an automated email from the ASF dual-hosted git repository. viirya pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new a58b8a8 [SPARK-37855][SQL][3.2] IllegalStateException when transforming an array inside a nested struct a58b8a8 is described below commit a58b8a864bb03bcdfe69b157bc7aec39b68556fa Author: ulysses-you <ulyssesyo...@gmail.com> AuthorDate: Wed Jan 12 11:44:04 2022 -0800 [SPARK-37855][SQL][3.2] IllegalStateException when transforming an array inside a nested struct This is a backport of #35170 for branch-3.2. ### What changes were proposed in this pull request? Skip alias the `ExtractValue` whose children contains `NamedLambdaVariable`. ### Why are the changes needed? Since #32773, the `NamedLambdaVariable` can produce the references, however it cause the rule `NestedColumnAliasing` alias the `ExtractValue` which contains `NamedLambdaVariable`. It fails since we can not match a `NamedLambdaVariable` to an actual attribute. Talk more: During `NamedLambdaVariable#replaceWithAliases`, it uses the references of nestedField to match the output attributes of grandchildren. However `NamedLambdaVariable` is created at analyzer as a virtual attribute, and it is not resolved from the output of children. So we can not get any attribute when use the references of `NamedLambdaVariable` to match the grandchildren's output. ### Does this PR introduce _any_ user-facing change? yes, bug fix ### How was this patch tested? Add new test Closes #35175 from ulysses-you/SPARK-37855-branch-3.2. Authored-by: ulysses-you <ulyssesyo...@gmail.com> Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com> --- .../catalyst/optimizer/NestedColumnAliasing.scala | 4 +- .../org/apache/spark/sql/DataFrameSuite.scala | 54 ++++++++++++++++++++++ 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala index 77a25ec..9d63f4e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala @@ -245,11 +245,13 @@ object NestedColumnAliasing { val otherRootReferences = new mutable.ArrayBuffer[AttributeReference]() exprList.foreach { e => collectRootReferenceAndExtractValue(e).foreach { - case ev: ExtractValue => + // we can not alias the attr from lambda variable whose expr id is not available + case ev: ExtractValue if ev.find(_.isInstanceOf[NamedLambdaVariable]).isEmpty => if (ev.references.size == 1) { nestedFieldReferences.append(ev) } case ar: AttributeReference => otherRootReferences.append(ar) + case _ => // ignore } } val exclusiveAttrSet = AttributeSet(exclusiveAttrs ++ otherRootReferences) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 2fd5993..70ec052 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -2943,6 +2943,60 @@ class DataFrameSuite extends QueryTest .withSequenceColumn("default_index").collect().map(_.getLong(0)) assert(ids.toSet === Range(0, 10).toSet) } + + test("SPARK-37855: IllegalStateException when transforming an array inside a nested struct") { + def makeInput(): DataFrame = { + val innerElement1 = Row(3, 3.12) + val innerElement2 = Row(4, 2.1) + val innerElement3 = Row(1, 985.2) + val innerElement4 = Row(10, 757548.0) + val innerElement5 = Row(1223, 0.665) + + val outerElement1 = Row(1, Row(List(innerElement1, innerElement2))) + val outerElement2 = Row(2, Row(List(innerElement3))) + val outerElement3 = Row(3, Row(List(innerElement4, innerElement5))) + + val data = Seq( + Row("row1", List(outerElement1)), + Row("row2", List(outerElement2, outerElement3)) + ) + + val schema = new StructType() + .add("name", StringType) + .add("outer_array", ArrayType(new StructType() + .add("id", IntegerType) + .add("inner_array_struct", new StructType() + .add("inner_array", ArrayType(new StructType() + .add("id", IntegerType) + .add("value", DoubleType) + )) + ) + )) + + spark.createDataFrame(spark.sparkContext.parallelize(data), schema) + } + + val df = makeInput().limit(2) + + val res = df.withColumn("extracted", transform( + col("outer_array"), + c1 => { + struct( + c1.getField("id").alias("outer_id"), + transform( + c1.getField("inner_array_struct").getField("inner_array"), + c2 => { + struct( + c2.getField("value").alias("inner_value") + ) + } + ) + ) + } + )) + + assert(res.collect.length == 2) + } } case class GroupByKey(a: Int, b: Int) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org