spark git commit: [SPARK-15632][SQL] Typed Filter should NOT change the Dataset schema
Repository: spark Updated Branches: refs/heads/branch-2.0 62765cbeb -> a5bec5b81 [SPARK-15632][SQL] Typed Filter should NOT change the Dataset schema ## What changes were proposed in this pull request? This PR makes sure the typed Filter doesn't change the Dataset schema. **Before the change:** ``` scala> val df = spark.range(0,9) scala> df.schema res12: org.apache.spark.sql.types.StructType = StructType(StructField(id,LongType,false)) scala> val afterFilter = df.filter(_=>true) scala> afterFilter.schema // !!! schema is CHANGED!!! Column name is changed from id to value, nullable is changed from false to true. res13: org.apache.spark.sql.types.StructType = StructType(StructField(value,LongType,true)) ``` SerializeFromObject and DeserializeToObject are inserted to wrap the Filter, and these two can possibly change the schema of Dataset. **After the change:** ``` scala> afterFilter.schema // schema is NOT changed. res47: org.apache.spark.sql.types.StructType = StructType(StructField(id,LongType,false)) ``` ## How was this patch tested? Unit test. Author: Sean ZhongCloses #13529 from clockfly/spark-15632. (cherry picked from commit 0e0904a2fce3c4447c24f1752307b6d01ffbd0ad) Signed-off-by: Cheng Lian Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a5bec5b8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a5bec5b8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a5bec5b8 Branch: refs/heads/branch-2.0 Commit: a5bec5b81d9e8ce17f1ce509731b030f0f3538e3 Parents: 62765cb Author: Sean Zhong Authored: Mon Jun 6 22:40:21 2016 -0700 Committer: Cheng Lian Committed: Mon Jun 6 22:40:29 2016 -0700 -- .../optimizer/TypedFilterOptimizationSuite.scala| 4 +++- .../main/scala/org/apache/spark/sql/Dataset.scala | 16 .../test/org/apache/spark/sql/JavaDatasetSuite.java | 13 + .../scala/org/apache/spark/sql/DatasetSuite.scala | 6 ++ .../sql/execution/WholeStageCodegenSuite.scala | 2 +- 5 files changed, 31 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a5bec5b8/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala index 289c16a..63d87bf 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala @@ -57,7 +57,9 @@ class TypedFilterOptimizationSuite extends PlanTest { comparePlans(optimized, expected) } - test("embed deserializer in filter condition if there is only one filter") { + // TODO: Remove this after we completely fix SPARK-15632 by adding optimization rules + // for typed filters. + ignore("embed deserializer in typed filter condition if there is only one filter") { val input = LocalRelation('_1.int, '_2.int) val f = (i: (Int, Int)) => i._1 > 0 http://git-wip-us.apache.org/repos/asf/spark/blob/a5bec5b8/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 96c871d..6cbc27d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1944,11 +1944,11 @@ class Dataset[T] private[sql]( */ @Experimental def filter(func: T => Boolean): Dataset[T] = { -val deserialized = CatalystSerde.deserialize[T](logicalPlan) +val deserializer = UnresolvedDeserializer(encoderFor[T].deserializer) val function = Literal.create(func, ObjectType(classOf[T => Boolean])) -val condition = Invoke(function, "apply", BooleanType, deserialized.output) -val filter = Filter(condition, deserialized) -withTypedPlan(CatalystSerde.serialize[T](filter)) +val condition = Invoke(function, "apply", BooleanType, deserializer :: Nil) +val filter = Filter(condition, logicalPlan) +withTypedPlan(filter) } /** @@ -1961,11 +1961,11 @@ class Dataset[T] private[sql]( */ @Experimental def filter(func: FilterFunction[T]): Dataset[T] = { -val deserialized = CatalystSerde.deserialize[T](logicalPlan) +val deserializer =
spark git commit: [SPARK-15632][SQL] Typed Filter should NOT change the Dataset schema
Repository: spark Updated Branches: refs/heads/master c409e23ab -> 0e0904a2f [SPARK-15632][SQL] Typed Filter should NOT change the Dataset schema ## What changes were proposed in this pull request? This PR makes sure the typed Filter doesn't change the Dataset schema. **Before the change:** ``` scala> val df = spark.range(0,9) scala> df.schema res12: org.apache.spark.sql.types.StructType = StructType(StructField(id,LongType,false)) scala> val afterFilter = df.filter(_=>true) scala> afterFilter.schema // !!! schema is CHANGED!!! Column name is changed from id to value, nullable is changed from false to true. res13: org.apache.spark.sql.types.StructType = StructType(StructField(value,LongType,true)) ``` SerializeFromObject and DeserializeToObject are inserted to wrap the Filter, and these two can possibly change the schema of Dataset. **After the change:** ``` scala> afterFilter.schema // schema is NOT changed. res47: org.apache.spark.sql.types.StructType = StructType(StructField(id,LongType,false)) ``` ## How was this patch tested? Unit test. Author: Sean ZhongCloses #13529 from clockfly/spark-15632. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0e0904a2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0e0904a2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0e0904a2 Branch: refs/heads/master Commit: 0e0904a2fce3c4447c24f1752307b6d01ffbd0ad Parents: c409e23 Author: Sean Zhong Authored: Mon Jun 6 22:40:21 2016 -0700 Committer: Cheng Lian Committed: Mon Jun 6 22:40:21 2016 -0700 -- .../optimizer/TypedFilterOptimizationSuite.scala| 4 +++- .../main/scala/org/apache/spark/sql/Dataset.scala | 16 .../test/org/apache/spark/sql/JavaDatasetSuite.java | 13 + .../scala/org/apache/spark/sql/DatasetSuite.scala | 6 ++ .../sql/execution/WholeStageCodegenSuite.scala | 2 +- 5 files changed, 31 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0e0904a2/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala index 289c16a..63d87bf 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala @@ -57,7 +57,9 @@ class TypedFilterOptimizationSuite extends PlanTest { comparePlans(optimized, expected) } - test("embed deserializer in filter condition if there is only one filter") { + // TODO: Remove this after we completely fix SPARK-15632 by adding optimization rules + // for typed filters. + ignore("embed deserializer in typed filter condition if there is only one filter") { val input = LocalRelation('_1.int, '_2.int) val f = (i: (Int, Int)) => i._1 > 0 http://git-wip-us.apache.org/repos/asf/spark/blob/0e0904a2/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 96c871d..6cbc27d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1944,11 +1944,11 @@ class Dataset[T] private[sql]( */ @Experimental def filter(func: T => Boolean): Dataset[T] = { -val deserialized = CatalystSerde.deserialize[T](logicalPlan) +val deserializer = UnresolvedDeserializer(encoderFor[T].deserializer) val function = Literal.create(func, ObjectType(classOf[T => Boolean])) -val condition = Invoke(function, "apply", BooleanType, deserialized.output) -val filter = Filter(condition, deserialized) -withTypedPlan(CatalystSerde.serialize[T](filter)) +val condition = Invoke(function, "apply", BooleanType, deserializer :: Nil) +val filter = Filter(condition, logicalPlan) +withTypedPlan(filter) } /** @@ -1961,11 +1961,11 @@ class Dataset[T] private[sql]( */ @Experimental def filter(func: FilterFunction[T]): Dataset[T] = { -val deserialized = CatalystSerde.deserialize[T](logicalPlan) +val deserializer = UnresolvedDeserializer(encoderFor[T].deserializer) val function = Literal.create(func, ObjectType(classOf[FilterFunction[T]])) -val condition =