This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 0749938b6109 [SPARK-45896][SQL][3.4] Construct `ValidateExternalType` with the correct expected type 0749938b6109 is described below commit 0749938b610970d26d9ee65a17f500483230b0a2 Author: Bruce Robbins <bersprock...@gmail.com> AuthorDate: Sun Nov 12 18:23:04 2023 -0800 [SPARK-45896][SQL][3.4] Construct `ValidateExternalType` with the correct expected type ### What changes were proposed in this pull request? This is a backport of #43770. When creating a serializer for a `Map` or `Seq` with an element of type `Option`, pass an expected type of `Option` to `ValidateExternalType` rather than the `Option`'s type argument. ### Why are the changes needed? In 3.4.1, 3.5.0, and master, the following code gets an error: ``` scala> val df = Seq(Seq(Some(Seq(0)))).toDF("a") val df = Seq(Seq(Some(Seq(0)))).toDF("a") org.apache.spark.SparkRuntimeException: [EXPRESSION_ENCODING_FAILED] Failed to encode a value of the expressions: mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -2), assertnotnull(validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -2), IntegerType, IntegerType)), unwrapoption(ObjectType(interface scala.collection.immutable.Seq), vali [...] ... Caused by: java.lang.RuntimeException: scala.Some is not a valid external type for schema of array<int> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.MapObjects_0$(Unknown Source) ... ``` However, this code works in 3.3.3. Similarly, this code gets an error: ``` scala> val df = Seq(Seq(Some(java.sql.Timestamp.valueOf("2023-01-01 00:00:00")))).toDF("a") val df = Seq(Seq(Some(java.sql.Timestamp.valueOf("2023-01-01 00:00:00")))).toDF("a") org.apache.spark.SparkRuntimeException: [EXPRESSION_ENCODING_FAILED] Failed to encode a value of the expressions: mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, unwrapoption(ObjectType(class java.sql.Timestamp), validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), TimestampType, ObjectType(class scala.Opti [...] ... Caused by: java.lang.RuntimeException: scala.Some is not a valid external type for schema of timestamp ... ``` As with the first example, this code works in 3.3.3. `ScalaReflection#validateAndSerializeElement` will construct `ValidateExternalType` with an expected type of the `Option`'s type parameter. Therefore, for element types `Option[Seq/Date/Timestamp/BigDecimal]`, `ValidateExternalType` will try to validate that the element is of the contained type (e.g., `BigDecimal`) rather than of type `Option`. Since the element type is of type `Option`, the validation fails. Validation currently works by accident for element types `Option[Map/<primitive-type]`, simply because in that case `ValidateExternalType` ignores that passed expected type and tries to validate based on the encoder's `clsTag` field (which, for the `OptionEncoder`, will be class `Option`). ### Does this PR introduce _any_ user-facing change? Other than fixing the bug, no. ### How was this patch tested? New unit tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43775 from bersprockets/encoding_error_br34. Authored-by: Bruce Robbins <bersprock...@gmail.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../org/apache/spark/sql/catalyst/ScalaReflection.scala | 7 ++++++- .../spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala | 12 ++++++++++++ .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 9 +++++++++ 3 files changed, 27 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index b18613bdad3a..4e4ca4ee0632 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -564,10 +564,15 @@ object ScalaReflection extends ScalaReflection { private def validateAndSerializeElement( enc: AgnosticEncoder[_], nullable: Boolean): Expression => Expression = { input => + val expected = enc match { + case OptionEncoder(_) => lenientExternalDataTypeFor(enc) + case _ => enc.dataType + } + expressionWithNullSafety( serializerFor( enc, - ValidateExternalType(input, enc.dataType, lenientExternalDataTypeFor(enc))), + ValidateExternalType(input, expected, lenientExternalDataTypeFor(enc))), nullable, WalkedTypePath()) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala index 79417c4ca1fe..9e19c3755b24 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala @@ -476,6 +476,18 @@ class ExpressionEncoderSuite extends CodegenInterpretedPlanTest with AnalysisTes encodeDecodeTest(Option.empty[Int], "empty option of int") encodeDecodeTest(Option("abc"), "option of string") encodeDecodeTest(Option.empty[String], "empty option of string") + encodeDecodeTest(Seq(Some(Seq(0))), "SPARK-45896: seq of option of seq") + encodeDecodeTest(Map(0 -> Some(Seq(0))), "SPARK-45896: map of option of seq") + encodeDecodeTest(Seq(Some(Timestamp.valueOf("2023-01-01 00:00:00"))), + "SPARK-45896: seq of option of timestamp") + encodeDecodeTest(Map(0 -> Some(Timestamp.valueOf("2023-01-01 00:00:00"))), + "SPARK-45896: map of option of timestamp") + encodeDecodeTest(Seq(Some(Date.valueOf("2023-01-01"))), + "SPARK-45896: seq of option of date") + encodeDecodeTest(Map(0 -> Some(Date.valueOf("2023-01-01"))), + "SPARK-45896: map of option of date") + encodeDecodeTest(Seq(Some(BigDecimal(200))), "SPARK-45896: seq of option of bigdecimal") + encodeDecodeTest(Map(0 -> Some(BigDecimal(200))), "SPARK-45896: map of option of bigdecimal") encodeDecodeTest(ScroogeLikeExample(1), "SPARK-40385 class with only a companion object constructor") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 6a1aa25c6e21..7dec558f8df3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -270,6 +270,13 @@ class DatasetSuite extends QueryTest (ClassData("one", 2), 1L), (ClassData("two", 3), 1L)) } + test("SPARK-45896: seq of option of seq") { + val ds = Seq(DataSeqOptSeq(Seq(Some(Seq(0))))).toDS() + checkDataset( + ds, + DataSeqOptSeq(Seq(Some(List(0))))) + } + test("select") { val ds = Seq(("a", 1), ("b", 2), ("c", 3)).toDS() checkDataset( @@ -2561,6 +2568,8 @@ case class ClassNullableData(a: String, b: Integer) case class NestedStruct(f: ClassData) case class DeepNestedStruct(f: NestedStruct) +case class DataSeqOptSeq(a: Seq[Option[Seq[Int]]]) + /** * A class used to test serialization using encoders. This class throws exceptions when using * Java serialization -- so the only way it can be "serialized" is through our encoders. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org