This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 0749938b6109 [SPARK-45896][SQL][3.4] Construct `ValidateExternalType` 
with the correct expected type
0749938b6109 is described below

commit 0749938b610970d26d9ee65a17f500483230b0a2
Author: Bruce Robbins <bersprock...@gmail.com>
AuthorDate: Sun Nov 12 18:23:04 2023 -0800

    [SPARK-45896][SQL][3.4] Construct `ValidateExternalType` with the correct 
expected type
    
    ### What changes were proposed in this pull request?
    
    This is a backport of #43770.
    
    When creating a serializer for a `Map` or `Seq` with an element of type 
`Option`, pass an expected type of `Option`  to `ValidateExternalType` rather 
than the `Option`'s type argument.
    
    ### Why are the changes needed?
    
    In 3.4.1, 3.5.0, and master, the following code gets an error:
    ```
    scala> val df = Seq(Seq(Some(Seq(0)))).toDF("a")
    val df = Seq(Seq(Some(Seq(0)))).toDF("a")
    org.apache.spark.SparkRuntimeException: [EXPRESSION_ENCODING_FAILED] Failed 
to encode a value of the expressions: mapobjects(lambdavariable(MapObject, 
ObjectType(class java.lang.Object), true, -1), 
mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, 
-2), assertnotnull(validateexternaltype(lambdavariable(MapObject, 
ObjectType(class java.lang.Object), true, -2), IntegerType, IntegerType)), 
unwrapoption(ObjectType(interface scala.collection.immutable.Seq), vali [...]
    ...
    Caused by: java.lang.RuntimeException: scala.Some is not a valid external 
type for schema of array<int>
      at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.MapObjects_0$(Unknown
 Source)
    ...
    
    ```
    However, this code works in 3.3.3.
    
    Similarly, this code gets an error:
    ```
    scala> val df = Seq(Seq(Some(java.sql.Timestamp.valueOf("2023-01-01 
00:00:00")))).toDF("a")
    val df = Seq(Seq(Some(java.sql.Timestamp.valueOf("2023-01-01 
00:00:00")))).toDF("a")
    org.apache.spark.SparkRuntimeException: [EXPRESSION_ENCODING_FAILED] Failed 
to encode a value of the expressions: mapobjects(lambdavariable(MapObject, 
ObjectType(class java.lang.Object), true, -1), staticinvoke(class 
org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, 
fromJavaTimestamp, unwrapoption(ObjectType(class java.sql.Timestamp), 
validateexternaltype(lambdavariable(MapObject, ObjectType(class 
java.lang.Object), true, -1), TimestampType, ObjectType(class scala.Opti [...]
    ...
    Caused by: java.lang.RuntimeException: scala.Some is not a valid external 
type for schema of timestamp
    ...
    ```
    As with the first example, this code works in 3.3.3.
    
    `ScalaReflection#validateAndSerializeElement` will construct 
`ValidateExternalType` with an expected type of the `Option`'s type parameter. 
Therefore, for element types `Option[Seq/Date/Timestamp/BigDecimal]`, 
`ValidateExternalType` will try to validate that the element is of the 
contained type (e.g., `BigDecimal`) rather than of type `Option`. Since the 
element type is of type `Option`, the validation fails.
    
    Validation currently works by accident for element types 
`Option[Map/<primitive-type]`, simply because in that case 
`ValidateExternalType` ignores that passed expected type and tries to validate 
based on the encoder's `clsTag` field (which, for the `OptionEncoder`, will be 
class `Option`).
    
    ### Does this PR introduce _any_ user-facing change?
    
    Other than fixing the bug, no.
    
    ### How was this patch tested?
    
    New unit tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #43775 from bersprockets/encoding_error_br34.
    
    Authored-by: Bruce Robbins <bersprock...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../org/apache/spark/sql/catalyst/ScalaReflection.scala      |  7 ++++++-
 .../spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala | 12 ++++++++++++
 .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala   |  9 +++++++++
 3 files changed, 27 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index b18613bdad3a..4e4ca4ee0632 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -564,10 +564,15 @@ object ScalaReflection extends ScalaReflection {
   private def validateAndSerializeElement(
       enc: AgnosticEncoder[_],
       nullable: Boolean): Expression => Expression = { input =>
+    val expected = enc match {
+      case OptionEncoder(_) => lenientExternalDataTypeFor(enc)
+      case _ => enc.dataType
+    }
+
     expressionWithNullSafety(
       serializerFor(
         enc,
-        ValidateExternalType(input, enc.dataType, 
lenientExternalDataTypeFor(enc))),
+        ValidateExternalType(input, expected, 
lenientExternalDataTypeFor(enc))),
       nullable,
       WalkedTypePath())
   }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
index 79417c4ca1fe..9e19c3755b24 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
@@ -476,6 +476,18 @@ class ExpressionEncoderSuite extends 
CodegenInterpretedPlanTest with AnalysisTes
   encodeDecodeTest(Option.empty[Int], "empty option of int")
   encodeDecodeTest(Option("abc"), "option of string")
   encodeDecodeTest(Option.empty[String], "empty option of string")
+  encodeDecodeTest(Seq(Some(Seq(0))), "SPARK-45896: seq of option of seq")
+  encodeDecodeTest(Map(0 -> Some(Seq(0))), "SPARK-45896: map of option of seq")
+  encodeDecodeTest(Seq(Some(Timestamp.valueOf("2023-01-01 00:00:00"))),
+    "SPARK-45896: seq of option of timestamp")
+  encodeDecodeTest(Map(0 -> Some(Timestamp.valueOf("2023-01-01 00:00:00"))),
+    "SPARK-45896: map of option of timestamp")
+  encodeDecodeTest(Seq(Some(Date.valueOf("2023-01-01"))),
+    "SPARK-45896: seq of option of date")
+  encodeDecodeTest(Map(0 -> Some(Date.valueOf("2023-01-01"))),
+    "SPARK-45896: map of option of date")
+  encodeDecodeTest(Seq(Some(BigDecimal(200))), "SPARK-45896: seq of option of 
bigdecimal")
+  encodeDecodeTest(Map(0 -> Some(BigDecimal(200))), "SPARK-45896: map of 
option of bigdecimal")
 
   encodeDecodeTest(ScroogeLikeExample(1),
     "SPARK-40385 class with only a companion object constructor")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 6a1aa25c6e21..7dec558f8df3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -270,6 +270,13 @@ class DatasetSuite extends QueryTest
       (ClassData("one", 2), 1L), (ClassData("two", 3), 1L))
   }
 
+  test("SPARK-45896: seq of option of seq") {
+    val ds = Seq(DataSeqOptSeq(Seq(Some(Seq(0))))).toDS()
+    checkDataset(
+      ds,
+      DataSeqOptSeq(Seq(Some(List(0)))))
+  }
+
   test("select") {
     val ds = Seq(("a", 1), ("b", 2), ("c", 3)).toDS()
     checkDataset(
@@ -2561,6 +2568,8 @@ case class ClassNullableData(a: String, b: Integer)
 case class NestedStruct(f: ClassData)
 case class DeepNestedStruct(f: NestedStruct)
 
+case class DataSeqOptSeq(a: Seq[Option[Seq[Int]]])
+
 /**
  * A class used to test serialization using encoders. This class throws 
exceptions when using
  * Java serialization -- so the only way it can be "serialized" is through our 
encoders.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to