Repository: spark Updated Branches: refs/heads/branch-2.0 be2d23dfd -> 6dae027a6
[SPARK-15112][SQL] Disables EmbedSerializerInFilter for plan fragments that change schema ## What changes were proposed in this pull request? `EmbedSerializerInFilter` implicitly assumes that the plan fragment being optimized doesn't change plan schema, which is reasonable because `Dataset.filter` should never change the schema. However, due to another issue involving `DeserializeToObject` and `SerializeFromObject`, typed filter *does* change plan schema (see [SPARK-15632][1]). This breaks `EmbedSerializerInFilter` and causes corrupted data. This PR disables `EmbedSerializerInFilter` when there's a schema change to avoid data corruption. The schema change issue should be addressed in follow-up PRs. ## How was this patch tested? New test case added in `DatasetSuite`. [1]: https://issues.apache.org/jira/browse/SPARK-15632 Author: Cheng Lian <l...@databricks.com> Closes #13362 from liancheng/spark-15112-corrupted-filter. (cherry picked from commit 1360a6d636dd812a27955fc85df8e0255db60dfa) Signed-off-by: Cheng Lian <l...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6dae027a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6dae027a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6dae027a Branch: refs/heads/branch-2.0 Commit: 6dae027a6cdd7c862963f71e1ea08f7f1b4b3506 Parents: be2d23d Author: Cheng Lian <l...@databricks.com> Authored: Sun May 29 23:19:12 2016 -0700 Committer: Cheng Lian <l...@databricks.com> Committed: Sun May 29 23:19:29 2016 -0700 ---------------------------------------------------------------------- .../sql/catalyst/optimizer/Optimizer.scala | 21 +++++++++++++++++++- .../org/apache/spark/sql/DatasetSuite.scala | 16 ++++++++++++++- 2 files changed, 35 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6dae027a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 48d7009..688c77d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1597,7 +1597,19 @@ case class GetCurrentDatabase(sessionCatalog: SessionCatalog) extends Rule[Logic */ object EmbedSerializerInFilter extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case s @ SerializeFromObject(_, Filter(condition, d: DeserializeToObject)) => + case s @ SerializeFromObject(_, Filter(condition, d: DeserializeToObject)) + // SPARK-15632: Conceptually, filter operator should never introduce schema change. This + // optimization rule also relies on this assumption. However, Dataset typed filter operator + // does introduce schema changes in some cases. Thus, we only enable this optimization when + // + // 1. either input and output schemata are exactly the same, or + // 2. both input and output schemata are single-field schema and share the same type. + // + // The 2nd case is included because encoders for primitive types always have only a single + // field with hard-coded field name "value". + // TODO Cleans this up after fixing SPARK-15632. + if s.schema == d.child.schema || samePrimitiveType(s.schema, d.child.schema) => + val numObjects = condition.collect { case a: Attribute if a == d.output.head => a }.length @@ -1622,6 +1634,13 @@ object EmbedSerializerInFilter extends Rule[LogicalPlan] { Project(objAttrs, filter) } } + + def samePrimitiveType(lhs: StructType, rhs: StructType): Boolean = { + (lhs, rhs) match { + case (StructType(Array(f1)), StructType(Array(f2))) => f1.dataType == f2.dataType + case _ => false + } + } } /** http://git-wip-us.apache.org/repos/asf/spark/blob/6dae027a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index e395007..8fc4dc9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -706,7 +706,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext { val dataset = Seq(1, 2, 3).toDS() dataset.createOrReplaceTempView("tempView") - // Overrrides the existing temporary view with same name + // Overrides the existing temporary view with same name // No exception should be thrown here. dataset.createOrReplaceTempView("tempView") @@ -769,6 +769,20 @@ class DatasetSuite extends QueryTest with SharedSQLContext { checkShowString(ds, expected) } + + test( + "SPARK-15112: EmbedDeserializerInFilter should not optimize plan fragment that changes schema" + ) { + val ds = Seq(1 -> "foo", 2 -> "bar").toDF("b", "a").as[ClassData] + + assertResult(Seq(ClassData("foo", 1), ClassData("bar", 2))) { + ds.collect().toSeq + } + + assertResult(Seq(ClassData("bar", 2))) { + ds.filter(_.b > 1).collect().toSeq + } + } } case class Generic[T](id: T, value: Double) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org