This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new d4ebb0c  [SPARK-38042][SQL] Ensure that ScalaReflection.dataTypeFor 
works on aliased array types
d4ebb0c is described below

commit d4ebb0c7531c63a9a902699a77a34736698c3236
Author: Johan Nystrom <jo...@monomorphic.org>
AuthorDate: Mon Feb 28 19:33:04 2022 +0800

    [SPARK-38042][SQL] Ensure that ScalaReflection.dataTypeFor works on aliased 
array types
    
    An aliased array type in a product, in a Dataset or Dataframe, causes an 
exception:
    
    ```
    type Data = Array[Long]
    val xs:List[(Data,Int)] = List((Array(1),1), (Array(2),2))
    sc.parallelize(xs).toDF("a", "b")
    ```
    
    Causing
    
    ```
    scala.MatchError: Data (of class 
scala.reflect.internal.Types$AliasNoArgsTypeRef)
     at 
org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$dataTypeFor$1(ScalaReflection.scala:104)
     at 
scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:69)
     at 
org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:904)
     at 
org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:903)
     at 
org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:49)
     at 
org.apache.spark.sql.catalyst.ScalaReflection$.dataTypeFor(ScalaReflection.scala:88)
     at 
org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$serializerFor$6(ScalaReflection.scala:573)
     at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
     at scala.collection.immutable.List.foreach(List.scala:392)
     at scala.collection.TraversableLike.map(TraversableLike.scala:238)
     at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
     at scala.collection.immutable.List.map(List.scala:298)
     at 
org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$serializerFor$1(ScalaReflection.scala:562)
     at 
scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:69)
     at 
org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:904)
     at 
org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:903)
     at 
org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:49)
     at 
org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:432)
     at 
org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$serializerForType$1(ScalaReflection.scala:421)
     at 
scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:69)
     at 
org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:904)
     at 
org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:903)
     at 
org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:49)
     at 
org.apache.spark.sql.catalyst.ScalaReflection$.serializerForType(ScalaReflection.scala:413)
     at 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:55)
     at org.apache.spark.sql.Encoders$.product(Encoders.scala:285)
     at 
org.apache.spark.sql.LowPrioritySQLImplicits.newProductEncoder(SQLImplicits.scala:251)
     at 
org.apache.spark.sql.LowPrioritySQLImplicits.newProductEncoder$(SQLImplicits.scala:251)
     at 
org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:32)
     ... 48 elided
    ```
    
    It seems that this can be fixed by changing, in ScalaReflection.dataTypeFor:
    
    ```
    val TypeRef(_, _, Seq(elementType)) = tpe
    ```
    
    to
    
    ```
    val TypeRef(_, _, Seq(elementType)) = tpe.dealias
    ```
    
    ### Why are the changes needed?
    
    Without this change, any attempt to create datasets or dataframes using 
such types throws the exception above.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, except for preventing this exception from being thrown.
    
    ### How was this patch tested?
    
    Added a test to DatasetSuite
    
    Closes #35370 from jtnystrom/spark-38042.
    
    Lead-authored-by: Johan Nystrom <jo...@monomorphic.org>
    Co-authored-by: Johan Nystrom-Persson <jo...@jnpersson.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit 89799b867216ba2eb71e47049bbd6c92f5ee694e)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../scala/org/apache/spark/sql/catalyst/ScalaReflection.scala     | 2 +-
 sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala   | 8 ++++++++
 2 files changed, 9 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index b4761f6..86c2469 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -103,7 +103,7 @@ object ScalaReflection extends ScalaReflection {
         val className = getClassNameFromType(tpe)
         className match {
           case "scala.Array" =>
-            val TypeRef(_, _, Seq(elementType)) = tpe
+            val TypeRef(_, _, Seq(elementType)) = tpe.dealias
             arrayClassFor(elementType)
           case other =>
             val clazz = getClassFromType(tpe)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 6706a1b..347e9fc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -46,10 +46,12 @@ object TestForTypeAlias {
   type TwoInt = (Int, Int)
   type ThreeInt = (TwoInt, Int)
   type SeqOfTwoInt = Seq[TwoInt]
+  type IntArray = Array[Int]
 
   def tupleTypeAlias: TwoInt = (1, 1)
   def nestedTupleTypeAlias: ThreeInt = ((1, 1), 2)
   def seqOfTupleTypeAlias: SeqOfTwoInt = Seq((1, 1), (2, 2))
+  def aliasedArrayInTuple: (Int, IntArray) = (1, Array(1))
 }
 
 class DatasetSuite extends QueryTest
@@ -1557,6 +1559,12 @@ class DatasetSuite extends QueryTest
       ("", Seq((1, 1), (2, 2))))
   }
 
+  test("SPARK-38042: Dataset should work with a product containing an aliased 
array type") {
+    checkDataset(
+      Seq(1).toDS().map(_ => ("", TestForTypeAlias.aliasedArrayInTuple)),
+      ("", (1, Array(1))))
+  }
+
   test("Check RelationalGroupedDataset toString: Single data") {
     val kvDataset = (1 to 3).toDF("id").groupBy("id")
     val expected = "RelationalGroupedDataset: [" +

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to