This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 8cee32d9 [SPARK-38285][SQL] Avoid generator pruning for invalid 
extractor
8cee32d9 is described below

commit 8cee32d918aa7c8d127e1f17165d3b80410f7e2c
Author: Liang-Chi Hsieh <vii...@gmail.com>
AuthorDate: Mon Mar 7 12:04:24 2022 -0800

    [SPARK-38285][SQL] Avoid generator pruning for invalid extractor
    
    ### What changes were proposed in this pull request?
    
    This fixes a bug in generator nested column pruning. The bug happens when 
the extractor pattern is like `GetArrayStructFields(GetStructField(...), ...)` 
on the generator output. Once the input to the generator is an array, after 
replacing with the extractor based on pruning logic, it becomes an extractor of 
`GetArrayStructFields(GetArrayStructFields(...), ...)` which is not valid.
    
    ### Why are the changes needed?
    
    To fix a bug in generator nested column pruning.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, fixing a user-facing bug.
    
    ### How was this patch tested?
    
    Added unit test.
    
    Closes #35749 from viirya/SPARK-38285.
    
    Authored-by: Liang-Chi Hsieh <vii...@gmail.com>
    Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com>
    (cherry picked from commit 71991f75ff441e80a52cb71f66f46bfebdb05671)
    Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com>
---
 .../catalyst/optimizer/NestedColumnAliasing.scala    | 11 +++++++++++
 .../scala/org/apache/spark/sql/DataFrameSuite.scala  | 20 ++++++++++++++++++++
 2 files changed, 31 insertions(+)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
index 9d63f4e..7420ebf 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
@@ -371,6 +371,17 @@ object GeneratorNestedColumnAliasing {
                 e.withNewChildren(Seq(extractor))
             }
 
+            // If after replacing generator expression with nested extractor, 
there
+            // is invalid extractor pattern like
+            // `GetArrayStructFields(GetArrayStructFields(...), ...), we 
cannot do
+            // pruning but fallback to original query plan.
+            val invalidExtractor = rewrittenG.generator.children.head.collect {
+              case GetArrayStructFields(_: GetArrayStructFields, _, _, _, _) 
=> true
+            }
+            if (invalidExtractor.nonEmpty) {
+              return Some(pushedThrough)
+            }
+
             // As we change the child of the generator, its output data type 
must be updated.
             val updatedGeneratorOutput = rewrittenG.generatorOutput
               .zip(rewrittenG.generator.elementSchema.toAttributes)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index e427c43..8c16c31 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -3005,6 +3005,26 @@ class DataFrameSuite extends QueryTest
 
     assert(res.collect.length == 2)
   }
+
+  test("SPARK-38285: Fix ClassCastException: GenericArrayData cannot be cast 
to InternalRow") {
+    withTempView("v1") {
+      val sqlText =
+        """
+          |CREATE OR REPLACE TEMP VIEW v1 AS
+          |SELECT * FROM VALUES
+          |(array(
+          |  named_struct('s', 'string1', 'b', array(named_struct('e', 
'string2'))),
+          |  named_struct('s', 'string4', 'b', array(named_struct('e', 
'string5')))
+          |  )
+          |)
+          |v1(o);
+          |""".stripMargin
+      sql(sqlText)
+
+      val df = sql("SELECT eo.b.e FROM (SELECT explode(o) AS eo FROM v1)")
+      checkAnswer(df, Row(Seq("string2")) :: Row(Seq("string5")) :: Nil)
+    }
+  }
 }
 
 case class GroupByKey(a: Int, b: Int)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to