spark git commit: [SPARK-15140][SQL] make the semantics of null input object for encoder clear

2016-06-03 Thread lian
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 52376e067 -> 7315acf89


[SPARK-15140][SQL] make the semantics of null input object for encoder clear

## What changes were proposed in this pull request?

For input object of non-flat type, we can't encode it to row if it's null, as 
Spark SQL doesn't allow row to be null, only its columns can be null.

This PR explicitly add this constraint and throw exception if users break it.

## How was this patch tested?

several new tests

Author: Wenchen Fan 

Closes #13469 from cloud-fan/null-object.

(cherry picked from commit 11c83f83d5172167cb64513d5311b4178797d40e)
Signed-off-by: Cheng Lian 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7315acf8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7315acf8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7315acf8

Branch: refs/heads/branch-2.0
Commit: 7315acf896b2474a4b7513434f5ba2faf468abd9
Parents: 52376e0
Author: Wenchen Fan 
Authored: Fri Jun 3 14:28:19 2016 -0700
Committer: Cheng Lian 
Committed: Fri Jun 3 14:28:26 2016 -0700

--
 .../sql/catalyst/encoders/ExpressionEncoder.scala  | 13 ++---
 .../spark/sql/catalyst/encoders/RowEncoder.scala   |  7 +++
 .../sql/catalyst/expressions/objects/objects.scala |  4 ++--
 .../spark/sql/catalyst/encoders/RowEncoderSuite.scala  |  8 
 .../test/scala/org/apache/spark/sql/DatasetSuite.scala | 10 ++
 5 files changed, 33 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7315acf8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
index cc59d06..688082d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, 
JavaTypeInference, ScalaRefle
 import org.apache.spark.sql.catalyst.analysis.{Analyzer, GetColumnByOrdinal, 
SimpleAnalyzer, UnresolvedAttribute, UnresolvedExtractValue}
 import org.apache.spark.sql.catalyst.expressions._
 import 
org.apache.spark.sql.catalyst.expressions.codegen.{GenerateSafeProjection, 
GenerateUnsafeProjection}
-import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, NewInstance}
+import org.apache.spark.sql.catalyst.expressions.objects.{AssertNotNull, 
Invoke, NewInstance}
 import org.apache.spark.sql.catalyst.optimizer.SimplifyCasts
 import org.apache.spark.sql.catalyst.plans.logical.{CatalystSerde, 
DeserializeToObject, LocalRelation}
 import org.apache.spark.sql.types.{ObjectType, StructField, StructType}
@@ -50,8 +50,15 @@ object ExpressionEncoder {
 val cls = mirror.runtimeClass(tpe)
 val flat = !ScalaReflection.definedByConstructorParams(tpe)
 
-val inputObject = BoundReference(0, ScalaReflection.dataTypeFor[T], 
nullable = false)
-val serializer = ScalaReflection.serializerFor[T](inputObject)
+val inputObject = BoundReference(0, ScalaReflection.dataTypeFor[T], 
nullable = true)
+val nullSafeInput = if (flat) {
+  inputObject
+} else {
+  // For input object of non-flat type, we can't encode it to row if it's 
null, as Spark SQL
+  // doesn't allow top-level row to be null, only its columns can be null.
+  AssertNotNull(inputObject, Seq("top level non-flat input object"))
+}
+val serializer = ScalaReflection.serializerFor[T](nullSafeInput)
 val deserializer = ScalaReflection.deserializerFor[T]
 
 val schema = ScalaReflection.schemaFor[T] match {

http://git-wip-us.apache.org/repos/asf/spark/blob/7315acf8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
index 3c6ae1c..6cd7b34 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
@@ -57,8 +57,8 @@ import org.apache.spark.unsafe.types.UTF8String
 object RowEncoder {
   def apply(schema: StructType): ExpressionEncoder[Row] = {
 val cls = classOf[Row]
-val inputObject = BoundReference(0, 

spark git commit: [SPARK-15140][SQL] make the semantics of null input object for encoder clear

2016-06-03 Thread lian
Repository: spark
Updated Branches:
  refs/heads/master 28ad0f7b0 -> 11c83f83d


[SPARK-15140][SQL] make the semantics of null input object for encoder clear

## What changes were proposed in this pull request?

For input object of non-flat type, we can't encode it to row if it's null, as 
Spark SQL doesn't allow row to be null, only its columns can be null.

This PR explicitly add this constraint and throw exception if users break it.

## How was this patch tested?

several new tests

Author: Wenchen Fan 

Closes #13469 from cloud-fan/null-object.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/11c83f83
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/11c83f83
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/11c83f83

Branch: refs/heads/master
Commit: 11c83f83d5172167cb64513d5311b4178797d40e
Parents: 28ad0f7
Author: Wenchen Fan 
Authored: Fri Jun 3 14:28:19 2016 -0700
Committer: Cheng Lian 
Committed: Fri Jun 3 14:28:19 2016 -0700

--
 .../sql/catalyst/encoders/ExpressionEncoder.scala  | 13 ++---
 .../spark/sql/catalyst/encoders/RowEncoder.scala   |  7 +++
 .../sql/catalyst/expressions/objects/objects.scala |  4 ++--
 .../spark/sql/catalyst/encoders/RowEncoderSuite.scala  |  8 
 .../test/scala/org/apache/spark/sql/DatasetSuite.scala | 10 ++
 5 files changed, 33 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/11c83f83/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
index cc59d06..688082d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, 
JavaTypeInference, ScalaRefle
 import org.apache.spark.sql.catalyst.analysis.{Analyzer, GetColumnByOrdinal, 
SimpleAnalyzer, UnresolvedAttribute, UnresolvedExtractValue}
 import org.apache.spark.sql.catalyst.expressions._
 import 
org.apache.spark.sql.catalyst.expressions.codegen.{GenerateSafeProjection, 
GenerateUnsafeProjection}
-import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, NewInstance}
+import org.apache.spark.sql.catalyst.expressions.objects.{AssertNotNull, 
Invoke, NewInstance}
 import org.apache.spark.sql.catalyst.optimizer.SimplifyCasts
 import org.apache.spark.sql.catalyst.plans.logical.{CatalystSerde, 
DeserializeToObject, LocalRelation}
 import org.apache.spark.sql.types.{ObjectType, StructField, StructType}
@@ -50,8 +50,15 @@ object ExpressionEncoder {
 val cls = mirror.runtimeClass(tpe)
 val flat = !ScalaReflection.definedByConstructorParams(tpe)
 
-val inputObject = BoundReference(0, ScalaReflection.dataTypeFor[T], 
nullable = false)
-val serializer = ScalaReflection.serializerFor[T](inputObject)
+val inputObject = BoundReference(0, ScalaReflection.dataTypeFor[T], 
nullable = true)
+val nullSafeInput = if (flat) {
+  inputObject
+} else {
+  // For input object of non-flat type, we can't encode it to row if it's 
null, as Spark SQL
+  // doesn't allow top-level row to be null, only its columns can be null.
+  AssertNotNull(inputObject, Seq("top level non-flat input object"))
+}
+val serializer = ScalaReflection.serializerFor[T](nullSafeInput)
 val deserializer = ScalaReflection.deserializerFor[T]
 
 val schema = ScalaReflection.schemaFor[T] match {

http://git-wip-us.apache.org/repos/asf/spark/blob/11c83f83/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
index 3c6ae1c..6cd7b34 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
@@ -57,8 +57,8 @@ import org.apache.spark.unsafe.types.UTF8String
 object RowEncoder {
   def apply(schema: StructType): ExpressionEncoder[Row] = {
 val cls = classOf[Row]
-val inputObject = BoundReference(0, ObjectType(cls), nullable = false)
-val serializer = serializerFor(inputObject, schema)
+val inputObject = BoundReference(0,