spark git commit: [SPARK-15632][SQL] Typed Filter should NOT change the Dataset schema

2016-06-06 Thread lian
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 62765cbeb -> a5bec5b81


[SPARK-15632][SQL] Typed Filter should NOT change the Dataset schema

## What changes were proposed in this pull request?

This PR makes sure the typed Filter doesn't change the Dataset schema.

**Before the change:**

```
scala> val df = spark.range(0,9)
scala> df.schema
res12: org.apache.spark.sql.types.StructType = 
StructType(StructField(id,LongType,false))
scala> val afterFilter = df.filter(_=>true)
scala> afterFilter.schema   // !!! schema is CHANGED!!! Column name is changed 
from id to value, nullable is changed from false to true.
res13: org.apache.spark.sql.types.StructType = 
StructType(StructField(value,LongType,true))

```

SerializeFromObject and DeserializeToObject are inserted to wrap the Filter, 
and these two can possibly change the schema of Dataset.

**After the change:**

```
scala> afterFilter.schema   // schema is NOT changed.
res47: org.apache.spark.sql.types.StructType = 
StructType(StructField(id,LongType,false))
```

## How was this patch tested?

Unit test.

Author: Sean Zhong 

Closes #13529 from clockfly/spark-15632.

(cherry picked from commit 0e0904a2fce3c4447c24f1752307b6d01ffbd0ad)
Signed-off-by: Cheng Lian 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a5bec5b8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a5bec5b8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a5bec5b8

Branch: refs/heads/branch-2.0
Commit: a5bec5b81d9e8ce17f1ce509731b030f0f3538e3
Parents: 62765cb
Author: Sean Zhong 
Authored: Mon Jun 6 22:40:21 2016 -0700
Committer: Cheng Lian 
Committed: Mon Jun 6 22:40:29 2016 -0700

--
 .../optimizer/TypedFilterOptimizationSuite.scala|  4 +++-
 .../main/scala/org/apache/spark/sql/Dataset.scala   | 16 
 .../test/org/apache/spark/sql/JavaDatasetSuite.java | 13 +
 .../scala/org/apache/spark/sql/DatasetSuite.scala   |  6 ++
 .../sql/execution/WholeStageCodegenSuite.scala  |  2 +-
 5 files changed, 31 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a5bec5b8/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala
index 289c16a..63d87bf 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala
@@ -57,7 +57,9 @@ class TypedFilterOptimizationSuite extends PlanTest {
 comparePlans(optimized, expected)
   }
 
-  test("embed deserializer in filter condition if there is only one filter") {
+  // TODO: Remove this after we completely fix SPARK-15632 by adding 
optimization rules
+  // for typed filters.
+  ignore("embed deserializer in typed filter condition if there is only one 
filter") {
 val input = LocalRelation('_1.int, '_2.int)
 val f = (i: (Int, Int)) => i._1 > 0
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a5bec5b8/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 96c871d..6cbc27d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -1944,11 +1944,11 @@ class Dataset[T] private[sql](
*/
   @Experimental
   def filter(func: T => Boolean): Dataset[T] = {
-val deserialized = CatalystSerde.deserialize[T](logicalPlan)
+val deserializer = UnresolvedDeserializer(encoderFor[T].deserializer)
 val function = Literal.create(func, ObjectType(classOf[T => Boolean]))
-val condition = Invoke(function, "apply", BooleanType, deserialized.output)
-val filter = Filter(condition, deserialized)
-withTypedPlan(CatalystSerde.serialize[T](filter))
+val condition = Invoke(function, "apply", BooleanType, deserializer :: Nil)
+val filter = Filter(condition, logicalPlan)
+withTypedPlan(filter)
   }
 
   /**
@@ -1961,11 +1961,11 @@ class Dataset[T] private[sql](
*/
   @Experimental
   def filter(func: FilterFunction[T]): Dataset[T] = {
-val deserialized = CatalystSerde.deserialize[T](logicalPlan)
+val deserializer = 

spark git commit: [SPARK-15632][SQL] Typed Filter should NOT change the Dataset schema

2016-06-06 Thread lian
Repository: spark
Updated Branches:
  refs/heads/master c409e23ab -> 0e0904a2f


[SPARK-15632][SQL] Typed Filter should NOT change the Dataset schema

## What changes were proposed in this pull request?

This PR makes sure the typed Filter doesn't change the Dataset schema.

**Before the change:**

```
scala> val df = spark.range(0,9)
scala> df.schema
res12: org.apache.spark.sql.types.StructType = 
StructType(StructField(id,LongType,false))
scala> val afterFilter = df.filter(_=>true)
scala> afterFilter.schema   // !!! schema is CHANGED!!! Column name is changed 
from id to value, nullable is changed from false to true.
res13: org.apache.spark.sql.types.StructType = 
StructType(StructField(value,LongType,true))

```

SerializeFromObject and DeserializeToObject are inserted to wrap the Filter, 
and these two can possibly change the schema of Dataset.

**After the change:**

```
scala> afterFilter.schema   // schema is NOT changed.
res47: org.apache.spark.sql.types.StructType = 
StructType(StructField(id,LongType,false))
```

## How was this patch tested?

Unit test.

Author: Sean Zhong 

Closes #13529 from clockfly/spark-15632.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0e0904a2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0e0904a2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0e0904a2

Branch: refs/heads/master
Commit: 0e0904a2fce3c4447c24f1752307b6d01ffbd0ad
Parents: c409e23
Author: Sean Zhong 
Authored: Mon Jun 6 22:40:21 2016 -0700
Committer: Cheng Lian 
Committed: Mon Jun 6 22:40:21 2016 -0700

--
 .../optimizer/TypedFilterOptimizationSuite.scala|  4 +++-
 .../main/scala/org/apache/spark/sql/Dataset.scala   | 16 
 .../test/org/apache/spark/sql/JavaDatasetSuite.java | 13 +
 .../scala/org/apache/spark/sql/DatasetSuite.scala   |  6 ++
 .../sql/execution/WholeStageCodegenSuite.scala  |  2 +-
 5 files changed, 31 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0e0904a2/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala
index 289c16a..63d87bf 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala
@@ -57,7 +57,9 @@ class TypedFilterOptimizationSuite extends PlanTest {
 comparePlans(optimized, expected)
   }
 
-  test("embed deserializer in filter condition if there is only one filter") {
+  // TODO: Remove this after we completely fix SPARK-15632 by adding 
optimization rules
+  // for typed filters.
+  ignore("embed deserializer in typed filter condition if there is only one 
filter") {
 val input = LocalRelation('_1.int, '_2.int)
 val f = (i: (Int, Int)) => i._1 > 0
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0e0904a2/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 96c871d..6cbc27d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -1944,11 +1944,11 @@ class Dataset[T] private[sql](
*/
   @Experimental
   def filter(func: T => Boolean): Dataset[T] = {
-val deserialized = CatalystSerde.deserialize[T](logicalPlan)
+val deserializer = UnresolvedDeserializer(encoderFor[T].deserializer)
 val function = Literal.create(func, ObjectType(classOf[T => Boolean]))
-val condition = Invoke(function, "apply", BooleanType, deserialized.output)
-val filter = Filter(condition, deserialized)
-withTypedPlan(CatalystSerde.serialize[T](filter))
+val condition = Invoke(function, "apply", BooleanType, deserializer :: Nil)
+val filter = Filter(condition, logicalPlan)
+withTypedPlan(filter)
   }
 
   /**
@@ -1961,11 +1961,11 @@ class Dataset[T] private[sql](
*/
   @Experimental
   def filter(func: FilterFunction[T]): Dataset[T] = {
-val deserialized = CatalystSerde.deserialize[T](logicalPlan)
+val deserializer = UnresolvedDeserializer(encoderFor[T].deserializer)
 val function = Literal.create(func, ObjectType(classOf[FilterFunction[T]]))
-val condition =