Repository: spark Updated Branches: refs/heads/master 60022bfd6 -> f135b70fd
[SPARK-18251][SQL] the type of Dataset can't be Option of non-flat type ## What changes were proposed in this pull request? For input object of non-flat type, we can't encode it to row if it's null, as Spark SQL doesn't allow the entire row to be null, only its columns can be null. That's the reason we forbid users to use top level null objects in https://github.com/apache/spark/pull/13469 However, if users wrap non-flat type with `Option`, then we may still encoder top level null object to row, which is not allowed. This PR fixes this case, and suggests users to wrap their type with `Tuple1` if they do wanna top level null objects. ## How was this patch tested? new test Author: Wenchen Fan <wenc...@databricks.com> Closes #15979 from cloud-fan/option. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f135b70f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f135b70f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f135b70f Branch: refs/heads/master Commit: f135b70fd590438bebb2a54012a6f73074219758 Parents: 60022bf Author: Wenchen Fan <wenc...@databricks.com> Authored: Wed Nov 30 13:36:17 2016 -0800 Committer: Cheng Lian <l...@databricks.com> Committed: Wed Nov 30 13:36:17 2016 -0800 ---------------------------------------------------------------------- .../apache/spark/sql/catalyst/ScalaReflection.scala | 13 +++++++++++++ .../sql/catalyst/encoders/ExpressionEncoder.scala | 14 ++++++++++++-- .../scala/org/apache/spark/sql/DatasetSuite.scala | 13 +++++++++++-- .../org/apache/spark/sql/JsonFunctionsSuite.scala | 2 +- 4 files changed, 37 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f135b70f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index 7bcaea7..0aa21b9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -606,6 +606,19 @@ object ScalaReflection extends ScalaReflection { } /** + * Returns true if the given type is option of product type, e.g. `Option[Tuple2]`. Note that, + * we also treat [[DefinedByConstructorParams]] as product type. + */ + def optionOfProductType(tpe: `Type`): Boolean = ScalaReflectionLock.synchronized { + tpe match { + case t if t <:< localTypeOf[Option[_]] => + val TypeRef(_, _, Seq(optType)) = t + definedByConstructorParams(optType) + case _ => false + } + } + + /** * Returns the parameter names and types for the primary constructor of this class. * * Note that it only works for scala classes with primary constructor, and currently doesn't http://git-wip-us.apache.org/repos/asf/spark/blob/f135b70f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala index 82e1a8a..9c4818d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala @@ -47,6 +47,16 @@ object ExpressionEncoder { // We convert the not-serializable TypeTag into StructType and ClassTag. val mirror = typeTag[T].mirror val tpe = typeTag[T].tpe + + if (ScalaReflection.optionOfProductType(tpe)) { + throw new UnsupportedOperationException( + "Cannot create encoder for Option of Product type, because Product type is represented " + + "as a row, and the entire row can not be null in Spark SQL like normal databases. " + + "You can wrap your type with Tuple1 if you do want top level null Product objects, " + + "e.g. instead of creating `Dataset[Option[MyClass]]`, you can do something like " + + "`val ds: Dataset[Tuple1[MyClass]] = Seq(Tuple1(MyClass(...)), Tuple1(null)).toDS`") + } + val cls = mirror.runtimeClass(tpe) val flat = !ScalaReflection.definedByConstructorParams(tpe) @@ -54,9 +64,9 @@ object ExpressionEncoder { val nullSafeInput = if (flat) { inputObject } else { - // For input object of non-flat type, we can't encode it to row if it's null, as Spark SQL + // For input object of Product type, we can't encode it to row if it's null, as Spark SQL // doesn't allow top-level row to be null, only its columns can be null. - AssertNotNull(inputObject, Seq("top level non-flat input object")) + AssertNotNull(inputObject, Seq("top level Product input object")) } val serializer = ScalaReflection.serializerFor[T](nullSafeInput) val deserializer = ScalaReflection.deserializerFor[T] http://git-wip-us.apache.org/repos/asf/spark/blob/f135b70f/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 81fa8cb..1174d73 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -867,10 +867,10 @@ class DatasetSuite extends QueryTest with SharedSQLContext { checkDataset(Seq("a", null).toDS(), "a", null) } - test("Dataset should throw RuntimeException if non-flat input object is null") { + test("Dataset should throw RuntimeException if top-level product input object is null") { val e = intercept[RuntimeException](Seq(ClassData("a", 1), null).toDS()) assert(e.getMessage.contains("Null value appeared in non-nullable field")) - assert(e.getMessage.contains("top level non-flat input object")) + assert(e.getMessage.contains("top level Product input object")) } test("dropDuplicates") { @@ -1051,6 +1051,15 @@ class DatasetSuite extends QueryTest with SharedSQLContext { checkDataset(dsDouble, arrayDouble) checkDataset(dsString, arrayString) } + + test("SPARK-18251: the type of Dataset can't be Option of Product type") { + checkDataset(Seq(Some(1), None).toDS(), Some(1), None) + + val e = intercept[UnsupportedOperationException] { + Seq(Some(1 -> "a"), None).toDS() + } + assert(e.getMessage.contains("Cannot create encoder for Option of Product type")) + } } case class Generic[T](id: T, value: Double) http://git-wip-us.apache.org/repos/asf/spark/blob/f135b70f/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 7d63d31..890cc5b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -143,7 +143,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { } test("roundtrip in to_json and from_json") { - val dfOne = Seq(Some(Tuple1(Tuple1(1))), None).toDF("struct") + val dfOne = Seq(Tuple1(Tuple1(1)), Tuple1(null)).toDF("struct") val schemaOne = dfOne.schema(0).dataType.asInstanceOf[StructType] val readBackOne = dfOne.select(to_json($"struct").as("json")) .select(from_json($"json", schemaOne).as("struct")) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org