Repository: spark
Updated Branches:
  refs/heads/branch-2.0 62765cbeb -> a5bec5b81


[SPARK-15632][SQL] Typed Filter should NOT change the Dataset schema

## What changes were proposed in this pull request?

This PR makes sure the typed Filter doesn't change the Dataset schema.

**Before the change:**

```
scala> val df = spark.range(0,9)
scala> df.schema
res12: org.apache.spark.sql.types.StructType = 
StructType(StructField(id,LongType,false))
scala> val afterFilter = df.filter(_=>true)
scala> afterFilter.schema   // !!! schema is CHANGED!!! Column name is changed 
from id to value, nullable is changed from false to true.
res13: org.apache.spark.sql.types.StructType = 
StructType(StructField(value,LongType,true))

```

SerializeFromObject and DeserializeToObject are inserted to wrap the Filter, 
and these two can possibly change the schema of Dataset.

**After the change:**

```
scala> afterFilter.schema   // schema is NOT changed.
res47: org.apache.spark.sql.types.StructType = 
StructType(StructField(id,LongType,false))
```

## How was this patch tested?

Unit test.

Author: Sean Zhong <seanzh...@databricks.com>

Closes #13529 from clockfly/spark-15632.

(cherry picked from commit 0e0904a2fce3c4447c24f1752307b6d01ffbd0ad)
Signed-off-by: Cheng Lian <l...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a5bec5b8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a5bec5b8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a5bec5b8

Branch: refs/heads/branch-2.0
Commit: a5bec5b81d9e8ce17f1ce509731b030f0f3538e3
Parents: 62765cb
Author: Sean Zhong <seanzh...@databricks.com>
Authored: Mon Jun 6 22:40:21 2016 -0700
Committer: Cheng Lian <l...@databricks.com>
Committed: Mon Jun 6 22:40:29 2016 -0700

----------------------------------------------------------------------
 .../optimizer/TypedFilterOptimizationSuite.scala    |  4 +++-
 .../main/scala/org/apache/spark/sql/Dataset.scala   | 16 ++++++++--------
 .../test/org/apache/spark/sql/JavaDatasetSuite.java | 13 +++++++++++++
 .../scala/org/apache/spark/sql/DatasetSuite.scala   |  6 ++++++
 .../sql/execution/WholeStageCodegenSuite.scala      |  2 +-
 5 files changed, 31 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a5bec5b8/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala
index 289c16a..63d87bf 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala
@@ -57,7 +57,9 @@ class TypedFilterOptimizationSuite extends PlanTest {
     comparePlans(optimized, expected)
   }
 
-  test("embed deserializer in filter condition if there is only one filter") {
+  // TODO: Remove this after we completely fix SPARK-15632 by adding 
optimization rules
+  // for typed filters.
+  ignore("embed deserializer in typed filter condition if there is only one 
filter") {
     val input = LocalRelation('_1.int, '_2.int)
     val f = (i: (Int, Int)) => i._1 > 0
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a5bec5b8/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 96c871d..6cbc27d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -1944,11 +1944,11 @@ class Dataset[T] private[sql](
    */
   @Experimental
   def filter(func: T => Boolean): Dataset[T] = {
-    val deserialized = CatalystSerde.deserialize[T](logicalPlan)
+    val deserializer = UnresolvedDeserializer(encoderFor[T].deserializer)
     val function = Literal.create(func, ObjectType(classOf[T => Boolean]))
-    val condition = Invoke(function, "apply", BooleanType, deserialized.output)
-    val filter = Filter(condition, deserialized)
-    withTypedPlan(CatalystSerde.serialize[T](filter))
+    val condition = Invoke(function, "apply", BooleanType, deserializer :: Nil)
+    val filter = Filter(condition, logicalPlan)
+    withTypedPlan(filter)
   }
 
   /**
@@ -1961,11 +1961,11 @@ class Dataset[T] private[sql](
    */
   @Experimental
   def filter(func: FilterFunction[T]): Dataset[T] = {
-    val deserialized = CatalystSerde.deserialize[T](logicalPlan)
+    val deserializer = UnresolvedDeserializer(encoderFor[T].deserializer)
     val function = Literal.create(func, ObjectType(classOf[FilterFunction[T]]))
-    val condition = Invoke(function, "call", BooleanType, deserialized.output)
-    val filter = Filter(condition, deserialized)
-    withTypedPlan(CatalystSerde.serialize[T](filter))
+    val condition = Invoke(function, "call", BooleanType, deserializer :: Nil)
+    val filter = Filter(condition, logicalPlan)
+    withTypedPlan(filter)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/a5bec5b8/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java 
b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
index 8354a5b..37577ac 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
@@ -92,6 +92,19 @@ public class JavaDatasetSuite implements Serializable {
     Assert.assertFalse(iter.hasNext());
   }
 
+  // SPARK-15632: typed filter should preserve the underlying logical schema
+  @Test
+  public void testTypedFilterPreservingSchema() {
+    Dataset<Long> ds = spark.range(10);
+    Dataset<Long> ds2 = ds.filter(new FilterFunction<Long>() {
+      @Override
+      public boolean call(Long value) throws Exception {
+        return value > 3;
+      }
+    });
+    Assert.assertEquals(ds.schema(), ds2.schema());
+  }
+
   @Test
   public void testCommonOperation() {
     List<String> data = Arrays.asList("hello", "world");

http://git-wip-us.apache.org/repos/asf/spark/blob/a5bec5b8/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index bf2b0a2..11b52bd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -225,6 +225,12 @@ class DatasetSuite extends QueryTest with SharedSQLContext 
{
       "b")
   }
 
+  test("SPARK-15632: typed filter should preserve the underlying logical 
schema") {
+    val ds = spark.range(10)
+    val ds2 = ds.filter(_ > 3)
+    assert(ds.schema.equals(ds2.schema))
+  }
+
   test("foreach") {
     val ds = Seq(("a", 1), ("b", 2), ("c", 3)).toDS()
     val acc = sparkContext.longAccumulator

http://git-wip-us.apache.org/repos/asf/spark/blob/a5bec5b8/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
index 68f0ee8..f26e5e7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
@@ -97,7 +97,7 @@ class WholeStageCodegenSuite extends SparkPlanTest with 
SharedSQLContext {
     val plan = ds.queryExecution.executedPlan
     assert(plan.find(p =>
       p.isInstanceOf[WholeStageCodegenExec] &&
-      
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SerializeFromObjectExec]).isDefined)
+      
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[FilterExec]).isDefined)
     assert(ds.collect() === Array(0, 6))
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to