This is an automated email from the ASF dual-hosted git repository. viirya pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new 70c322a [SPARK-35653][SQL] Fix CatalystToExternalMap interpreted path fails for Map with case classes as keys or values 70c322a is described below commit 70c322ad041511ded6e531d92ffc64c11bfdc378 Author: Emil Ejbyfeldt <eejbyfe...@liveintent.com> AuthorDate: Thu Jun 10 09:37:27 2021 -0700 [SPARK-35653][SQL] Fix CatalystToExternalMap interpreted path fails for Map with case classes as keys or values ### What changes were proposed in this pull request? Use the key/value LambdaFunction to convert the elements instead of using CatalystTypeConverters.createToScalaConverter. This is how it is done in MapObjects and that correctly handles Arrays with case classes. ### Why are the changes needed? Before these changes the added test cases would fail with the following: ``` [info] - encode/decode for map with case class as value: Map(1 -> IntAndString(1,a)) (interpreted path) *** FAILED *** (64 milliseconds) [info] Encoded/Decoded data does not match input data [info] [info] in: Map(1 -> IntAndString(1,a)) [info] out: Map(1 -> [1,a]) [info] types: scala.collection.immutable.Map$Map1 [info] [info] Encoded Data: [org.apache.spark.sql.catalyst.expressions.UnsafeMapData5ecf5d9e] [info] Schema: value#823 [info] root [info] -- value: map (nullable = true) [info] |-- key: integer [info] |-- value: struct (valueContainsNull = true) [info] | |-- i: integer (nullable = false) [info] | |-- s: string (nullable = true) [info] [info] [info] fromRow Expressions: [info] catalysttoexternalmap(lambdavariable(CatalystToExternalMap_key, IntegerType, false, 178), lambdavariable(CatalystToExternalMap_key, IntegerType, false, 178), lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179), if (isnull(lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179))) null else newInstance(class org.apache.spark.sql.catalyst.encoders [...] [info] :- lambdavariable(CatalystToExternalMap_key, IntegerType, false, 178) [info] :- lambdavariable(CatalystToExternalMap_key, IntegerType, false, 178) [info] :- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179) [info] :- if (isnull(lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179))) null else newInstance(class org.apache.spark.sql.catalyst.encoders.IntAndString) [info] : :- isnull(lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179)) [info] : : +- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179) [info] : :- null [info] : +- newInstance(class org.apache.spark.sql.catalyst.encoders.IntAndString) [info] : :- assertnotnull(lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179).i) [info] : : +- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179).i [info] : : +- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179) [info] : +- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179).s.toString [info] : +- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179).s [info] : +- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179) [info] +- input[0, map<int,struct<i:int,s:string>>, true] (ExpressionEncoderSuite.scala:627) ``` So using a map with cases classes for keys or values and using the interpreted path would incorrect deserialize data from the catalyst representation. ### Does this PR introduce _any_ user-facing change? Yes, it fixes the bug. ### How was this patch tested? Existing and new unit tests in the ExpressionEncoderSuite Closes #32783 from eejbyfeldt/fix-interpreted-path-for-map-with-case-classes. Authored-by: Emil Ejbyfeldt <eejbyfe...@liveintent.com> Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com> (cherry picked from commit e2e3fe77823387f6d4164eede05bf077b4235c87) Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com> --- .../spark/sql/catalyst/expressions/objects/objects.scala | 14 ++++++-------- .../sql/catalyst/encoders/ExpressionEncoderSuite.scala | 5 +++++ 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index f78e3f5..dc5bd02 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -27,7 +27,7 @@ import scala.util.{Properties, Try} import org.apache.spark.{SparkConf, SparkEnv} import org.apache.spark.serializer._ import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, ScalaReflection} +import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection} import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen._ @@ -1117,11 +1117,6 @@ case class CatalystToExternalMap private( private lazy val inputMapType = inputData.dataType.asInstanceOf[MapType] - private lazy val keyConverter = - CatalystTypeConverters.createToScalaConverter(inputMapType.keyType) - private lazy val valueConverter = - CatalystTypeConverters.createToScalaConverter(inputMapType.valueType) - private lazy val (newMapBuilderMethod, moduleField) = { val clazz = Utils.classForName(collClass.getCanonicalName + "$") (clazz.getMethod("newBuilder"), clazz.getField("MODULE$").get(null)) @@ -1138,10 +1133,13 @@ case class CatalystToExternalMap private( builder.sizeHint(result.numElements()) val keyArray = result.keyArray() val valueArray = result.valueArray() + val row = new GenericInternalRow(1) var i = 0 while (i < result.numElements()) { - val key = keyConverter(keyArray.get(i, inputMapType.keyType)) - val value = valueConverter(valueArray.get(i, inputMapType.valueType)) + row.update(0, keyArray.get(i, inputMapType.keyType)) + val key = keyLambdaFunction.eval(row) + row.update(0, valueArray.get(i, inputMapType.valueType)) + val value = valueLambdaFunction.eval(row) builder += Tuple2(key, value) i += 1 } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala index 7faab4e..bf4afac 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala @@ -114,6 +114,7 @@ case class ReferenceValueClass(wrapped: ReferenceValueClass.Container) extends A object ReferenceValueClass { case class Container(data: Int) } +case class IntAndString(i: Int, s: String) class ExpressionEncoderSuite extends CodegenInterpretedPlanTest with AnalysisTest { OuterScopes.addOuterScope(this) @@ -174,6 +175,10 @@ class ExpressionEncoderSuite extends CodegenInterpretedPlanTest with AnalysisTes encodeDecodeTest(Map(1 -> "a", 2 -> "b"), "map") encodeDecodeTest(Map(1 -> "a", 2 -> null), "map with null") encodeDecodeTest(Map(1 -> Map("a" -> 1), 2 -> Map("b" -> 2)), "map of map") + encodeDecodeTest(Map(1 -> IntAndString(1, "a")), "map with case class as value") + encodeDecodeTest(Map(IntAndString(1, "a") -> 1), "map with case class as key") + encodeDecodeTest(Map(IntAndString(1, "a") -> IntAndString(2, "b")), + "map with case class as key and value") encodeDecodeTest(Tuple1[Seq[Int]](null), "null seq in tuple") encodeDecodeTest(Tuple1[Map[String, String]](null), "null map in tuple") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org