Repository: spark
Updated Branches:
  refs/heads/branch-2.0 8da431473 -> e1bdf1e02


Revert "[SPARK-16134][SQL] optimizer rules for typed filter"

This reverts commit 8da4314735ed55f259642e2977d8d7bf2212474f.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e1bdf1e0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e1bdf1e0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e1bdf1e0

Branch: refs/heads/branch-2.0
Commit: e1bdf1e02483bf513b6e012e8921d440a5efbc11
Parents: 8da4314
Author: Cheng Lian <l...@databricks.com>
Authored: Thu Jun 30 08:17:43 2016 +0800
Committer: Cheng Lian <l...@databricks.com>
Committed: Thu Jun 30 08:17:43 2016 +0800

----------------------------------------------------------------------
 .../apache/spark/sql/catalyst/dsl/package.scala |  6 +-
 .../expressions/ReferenceToExpressions.scala    |  1 -
 .../sql/catalyst/optimizer/Optimizer.scala      | 98 +++++++++++---------
 .../sql/catalyst/plans/logical/object.scala     | 47 +---------
 .../TypedFilterOptimizationSuite.scala          | 86 ++++-------------
 .../scala/org/apache/spark/sql/Dataset.scala    | 12 ++-
 .../spark/sql/execution/SparkStrategies.scala   |  2 -
 .../scala/org/apache/spark/sql/QueryTest.scala  |  1 -
 8 files changed, 91 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e1bdf1e0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 84c9cc8..2ca990d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -293,7 +293,11 @@ package object dsl {
 
       def where(condition: Expression): LogicalPlan = Filter(condition, 
logicalPlan)
 
-      def filter[T : Encoder](func: T => Boolean): LogicalPlan = 
TypedFilter(func, logicalPlan)
+      def filter[T : Encoder](func: T => Boolean): LogicalPlan = {
+        val deserialized = logicalPlan.deserialize[T]
+        val condition = expressions.callFunction(func, BooleanType, 
deserialized.output.head)
+        Filter(condition, deserialized).serialize[T]
+      }
 
       def serialize[T : Encoder]: LogicalPlan = 
CatalystSerde.serialize[T](logicalPlan)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e1bdf1e0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala
index 127797c..502d791 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala
@@ -45,7 +45,6 @@ case class ReferenceToExpressions(result: Expression, 
children: Seq[Expression])
     var maxOrdinal = -1
     result foreach {
       case b: BoundReference if b.ordinal > maxOrdinal => maxOrdinal = 
b.ordinal
-      case _ =>
     }
     if (maxOrdinal > children.length) {
       return TypeCheckFailure(s"The result expression need $maxOrdinal input 
expressions, but " +

http://git-wip-us.apache.org/repos/asf/spark/blob/e1bdf1e0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index aa90735..f24f8b7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -21,7 +21,6 @@ import scala.annotation.tailrec
 import scala.collection.immutable.HashSet
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.spark.api.java.function.FilterFunction
 import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
@@ -110,7 +109,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, 
conf: CatalystConf)
     Batch("Decimal Optimizations", fixedPoint,
       DecimalAggregates) ::
     Batch("Typed Filter Optimization", fixedPoint,
-      CombineTypedFilters) ::
+      EmbedSerializerInFilter,
+      RemoveAliasOnlyProject) ::
     Batch("LocalRelation", fixedPoint,
       ConvertToLocalRelation) ::
     Batch("OptimizeCodegen", Once,
@@ -205,33 +205,15 @@ object RemoveAliasOnlyProject extends Rule[LogicalPlan] {
 object EliminateSerialization extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
     case d @ DeserializeToObject(_, _, s: SerializeFromObject)
-        if d.outputObjAttr.dataType == s.inputObjAttr.dataType =>
+        if d.outputObjectType == s.inputObjectType =>
       // Adds an extra Project here, to preserve the output expr id of 
`DeserializeToObject`.
       // We will remove it later in RemoveAliasOnlyProject rule.
-      val objAttr = Alias(s.inputObjAttr, s.inputObjAttr.name)(exprId = 
d.outputObjAttr.exprId)
+      val objAttr =
+        Alias(s.child.output.head, s.child.output.head.name)(exprId = 
d.output.head.exprId)
       Project(objAttr :: Nil, s.child)
-
     case a @ AppendColumns(_, _, _, s: SerializeFromObject)
-        if a.deserializer.dataType == s.inputObjAttr.dataType =>
+        if a.deserializer.dataType == s.inputObjectType =>
       AppendColumnsWithObject(a.func, s.serializer, a.serializer, s.child)
-
-    // If there is a `SerializeFromObject` under typed filter and its input 
object type is same with
-    // the typed filter's deserializer, we can convert typed filter to normal 
filter without
-    // deserialization in condition, and push it down through 
`SerializeFromObject`.
-    // e.g. `ds.map(...).filter(...)` can be optimized by this rule to save 
extra deserialization,
-    // but `ds.map(...).as[AnotherType].filter(...)` can not be optimized.
-    case f @ TypedFilter(_, _, s: SerializeFromObject)
-        if f.deserializer.dataType == s.inputObjAttr.dataType =>
-      s.copy(child = f.withObjectProducerChild(s.child))
-
-    // If there is a `DeserializeToObject` upon typed filter and its output 
object type is same with
-    // the typed filter's deserializer, we can convert typed filter to normal 
filter without
-    // deserialization in condition, and pull it up through 
`DeserializeToObject`.
-    // e.g. `ds.filter(...).map(...)` can be optimized by this rule to save 
extra deserialization,
-    // but `ds.filter(...).as[AnotherType].map(...)` can not be optimized.
-    case d @ DeserializeToObject(_, _, f: TypedFilter)
-        if d.outputObjAttr.dataType == f.deserializer.dataType =>
-      f.withObjectProducerChild(d.copy(child = f.child))
   }
 }
 
@@ -1624,30 +1606,54 @@ case class GetCurrentDatabase(sessionCatalog: 
SessionCatalog) extends Rule[Logic
 }
 
 /**
- * Combines two adjacent [[TypedFilter]]s, which operate on same type object 
in condition, into one,
- * mering the filter functions into one conjunctive function.
+ * Typed [[Filter]] is by default surrounded by a [[DeserializeToObject]] 
beneath it and a
+ * [[SerializeFromObject]] above it.  If these serializations can't be 
eliminated, we should embed
+ * the deserializer in filter condition to save the extra serialization at 
last.
  */
-object CombineTypedFilters extends Rule[LogicalPlan] {
+object EmbedSerializerInFilter extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-    case t1 @ TypedFilter(_, _, t2 @ TypedFilter(_, _, child))
-        if t1.deserializer.dataType == t2.deserializer.dataType =>
-      TypedFilter(combineFilterFunction(t2.func, t1.func), t1.deserializer, 
child)
-  }
-
-  private def combineFilterFunction(func1: AnyRef, func2: AnyRef): Any => 
Boolean = {
-    (func1, func2) match {
-      case (f1: FilterFunction[_], f2: FilterFunction[_]) =>
-        input => f1.asInstanceOf[FilterFunction[Any]].call(input) &&
-          f2.asInstanceOf[FilterFunction[Any]].call(input)
-      case (f1: FilterFunction[_], f2) =>
-        input => f1.asInstanceOf[FilterFunction[Any]].call(input) &&
-          f2.asInstanceOf[Any => Boolean](input)
-      case (f1, f2: FilterFunction[_]) =>
-        input => f1.asInstanceOf[Any => Boolean].apply(input) &&
-          f2.asInstanceOf[FilterFunction[Any]].call(input)
-      case (f1, f2) =>
-        input => f1.asInstanceOf[Any => Boolean].apply(input) &&
-          f2.asInstanceOf[Any => Boolean].apply(input)
+    case s @ SerializeFromObject(_, Filter(condition, d: DeserializeToObject))
+      // SPARK-15632: Conceptually, filter operator should never introduce 
schema change. This
+      // optimization rule also relies on this assumption. However, Dataset 
typed filter operator
+      // does introduce schema changes in some cases. Thus, we only enable 
this optimization when
+      //
+      //  1. either input and output schemata are exactly the same, or
+      //  2. both input and output schemata are single-field schema and share 
the same type.
+      //
+      // The 2nd case is included because encoders for primitive types always 
have only a single
+      // field with hard-coded field name "value".
+      // TODO Cleans this up after fixing SPARK-15632.
+      if s.schema == d.child.schema || samePrimitiveType(s.schema, 
d.child.schema) =>
+
+      val numObjects = condition.collect {
+        case a: Attribute if a == d.output.head => a
+      }.length
+
+      if (numObjects > 1) {
+        // If the filter condition references the object more than one times, 
we should not embed
+        // deserializer in it as the deserialization will happen many times 
and slow down the
+        // execution.
+        // TODO: we can still embed it if we can make sure subexpression 
elimination works here.
+        s
+      } else {
+        val newCondition = condition transform {
+          case a: Attribute if a == d.output.head => d.deserializer
+        }
+        val filter = Filter(newCondition, d.child)
+
+        // Adds an extra Project here, to preserve the output expr id of 
`SerializeFromObject`.
+        // We will remove it later in RemoveAliasOnlyProject rule.
+        val objAttrs = filter.output.zip(s.output).map { case (fout, sout) =>
+          Alias(fout, fout.name)(exprId = sout.exprId)
+        }
+        Project(objAttrs, filter)
+      }
+  }
+
+  def samePrimitiveType(lhs: StructType, rhs: StructType): Boolean = {
+    (lhs, rhs) match {
+      case (StructType(Array(f1)), StructType(Array(f2))) => f1.dataType == 
f2.dataType
+      case _ => false
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e1bdf1e0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
index e1890ed..7beeeb4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
@@ -17,15 +17,11 @@
 
 package org.apache.spark.sql.catalyst.plans.logical
 
-import scala.language.existentials
-
-import org.apache.spark.api.java.function.FilterFunction
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.sql.{Encoder, Row}
 import org.apache.spark.sql.catalyst.analysis.UnresolvedDeserializer
 import org.apache.spark.sql.catalyst.encoders._
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.objects.Invoke
 import org.apache.spark.sql.types._
 
 object CatalystSerde {
@@ -49,11 +45,13 @@ object CatalystSerde {
  */
 trait ObjectProducer extends LogicalPlan {
   // The attribute that reference to the single object field this operator 
outputs.
-  def outputObjAttr: Attribute
+  protected def outputObjAttr: Attribute
 
   override def output: Seq[Attribute] = outputObjAttr :: Nil
 
   override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr)
+
+  def outputObjectType: DataType = outputObjAttr.dataType
 }
 
 /**
@@ -66,7 +64,7 @@ trait ObjectConsumer extends UnaryNode {
   // This operator always need all columns of its child, even it doesn't 
reference to.
   override def references: AttributeSet = child.outputSet
 
-  def inputObjAttr: Attribute = child.output.head
+  def inputObjectType: DataType = child.output.head.dataType
 }
 
 /**
@@ -169,43 +167,6 @@ case class MapElements(
     outputObjAttr: Attribute,
     child: LogicalPlan) extends ObjectConsumer with ObjectProducer
 
-object TypedFilter {
-  def apply[T : Encoder](func: AnyRef, child: LogicalPlan): TypedFilter = {
-    TypedFilter(func, UnresolvedDeserializer(encoderFor[T].deserializer), 
child)
-  }
-}
-
-/**
- * A relation produced by applying `func` to each element of the `child` and 
filter them by the
- * resulting boolean value.
- *
- * This is logically equal to a normal [[Filter]] operator whose condition 
expression is decoding
- * the input row to object and apply the given function with decoded object. 
However we need the
- * encapsulation of [[TypedFilter]] to make the concept more clear and make it 
easier to write
- * optimizer rules.
- */
-case class TypedFilter(
-    func: AnyRef,
-    deserializer: Expression,
-    child: LogicalPlan) extends UnaryNode {
-
-  override def output: Seq[Attribute] = child.output
-
-  def withObjectProducerChild(obj: LogicalPlan): Filter = {
-    assert(obj.output.length == 1)
-    Filter(typedCondition(obj.output.head), obj)
-  }
-
-  def typedCondition(input: Expression): Expression = {
-    val (funcClass, methodName) = func match {
-      case m: FilterFunction[_] => classOf[FilterFunction[_]] -> "call"
-      case _ => classOf[Any => Boolean] -> "apply"
-    }
-    val funcObj = Literal.create(func, ObjectType(funcClass))
-    Invoke(funcObj, methodName, BooleanType, input :: Nil)
-  }
-}
-
 /** Factory for constructing new `AppendColumn` nodes. */
 object AppendColumns {
   def apply[T : Encoder, U : Encoder](

http://git-wip-us.apache.org/repos/asf/spark/blob/e1bdf1e0/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala
index 56f096f..63d87bf 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala
@@ -19,11 +19,12 @@ package org.apache.spark.sql.catalyst.optimizer
 
 import scala.reflect.runtime.universe.TypeTag
 
+import org.apache.spark.sql.catalyst.analysis.UnresolvedDeserializer
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder}
 import org.apache.spark.sql.catalyst.plans.PlanTest
-import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, 
LogicalPlan, TypedFilter}
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
 import org.apache.spark.sql.types.BooleanType
 
@@ -32,91 +33,44 @@ class TypedFilterOptimizationSuite extends PlanTest {
     val batches =
       Batch("EliminateSerialization", FixedPoint(50),
         EliminateSerialization) ::
-      Batch("CombineTypedFilters", FixedPoint(50),
-        CombineTypedFilters) :: Nil
+      Batch("EmbedSerializerInFilter", FixedPoint(50),
+        EmbedSerializerInFilter) :: Nil
   }
 
   implicit private def productEncoder[T <: Product : TypeTag] = 
ExpressionEncoder[T]()
 
-  test("filter after serialize with the same object type") {
+  test("back to back filter") {
     val input = LocalRelation('_1.int, '_2.int)
-    val f = (i: (Int, Int)) => i._1 > 0
+    val f1 = (i: (Int, Int)) => i._1 > 0
+    val f2 = (i: (Int, Int)) => i._2 > 0
 
-    val query = input
-      .deserialize[(Int, Int)]
-      .serialize[(Int, Int)]
-      .filter(f).analyze
+    val query = input.filter(f1).filter(f2).analyze
 
     val optimized = Optimize.execute(query)
 
-    val expected = input
-      .deserialize[(Int, Int)]
-      .where(callFunction(f, BooleanType, 'obj))
+    val expected = input.deserialize[(Int, Int)]
+      .where(callFunction(f1, BooleanType, 'obj))
+      .select('obj.as("obj"))
+      .where(callFunction(f2, BooleanType, 'obj))
       .serialize[(Int, Int)].analyze
 
     comparePlans(optimized, expected)
   }
 
-  test("filter after serialize with different object types") {
-    val input = LocalRelation('_1.int, '_2.int)
-    val f = (i: OtherTuple) => i._1 > 0
-
-    val query = input
-      .deserialize[(Int, Int)]
-      .serialize[(Int, Int)]
-      .filter(f).analyze
-    val optimized = Optimize.execute(query)
-    comparePlans(optimized, query)
-  }
-
-  test("filter before deserialize with the same object type") {
+  // TODO: Remove this after we completely fix SPARK-15632 by adding 
optimization rules
+  // for typed filters.
+  ignore("embed deserializer in typed filter condition if there is only one 
filter") {
     val input = LocalRelation('_1.int, '_2.int)
     val f = (i: (Int, Int)) => i._1 > 0
 
-    val query = input
-      .filter(f)
-      .deserialize[(Int, Int)]
-      .serialize[(Int, Int)].analyze
+    val query = input.filter(f).analyze
 
     val optimized = Optimize.execute(query)
 
-    val expected = input
-      .deserialize[(Int, Int)]
-      .where(callFunction(f, BooleanType, 'obj))
-      .serialize[(Int, Int)].analyze
+    val deserializer = UnresolvedDeserializer(encoderFor[(Int, 
Int)].deserializer)
+    val condition = callFunction(f, BooleanType, deserializer)
+    val expected = input.where(condition).select('_1.as("_1"), 
'_2.as("_2")).analyze
 
     comparePlans(optimized, expected)
   }
-
-  test("filter before deserialize with different object types") {
-    val input = LocalRelation('_1.int, '_2.int)
-    val f = (i: OtherTuple) => i._1 > 0
-
-    val query = input
-      .filter(f)
-      .deserialize[(Int, Int)]
-      .serialize[(Int, Int)].analyze
-    val optimized = Optimize.execute(query)
-    comparePlans(optimized, query)
-  }
-
-  test("back to back filter with the same object type") {
-    val input = LocalRelation('_1.int, '_2.int)
-    val f1 = (i: (Int, Int)) => i._1 > 0
-    val f2 = (i: (Int, Int)) => i._2 > 0
-
-    val query = input.filter(f1).filter(f2).analyze
-    val optimized = Optimize.execute(query)
-    assert(optimized.collect { case t: TypedFilter => t }.length == 1)
-  }
-
-  test("back to back filter with different object types") {
-    val input = LocalRelation('_1.int, '_2.int)
-    val f1 = (i: (Int, Int)) => i._1 > 0
-    val f2 = (i: OtherTuple) => i._2 > 0
-
-    val query = input.filter(f1).filter(f2).analyze
-    val optimized = Optimize.execute(query)
-    assert(optimized.collect { case t: TypedFilter => t }.length == 2)
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e1bdf1e0/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 8e914fc..067cbec 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -1964,7 +1964,11 @@ class Dataset[T] private[sql](
    */
   @Experimental
   def filter(func: T => Boolean): Dataset[T] = {
-    withTypedPlan(TypedFilter(func, logicalPlan))
+    val deserializer = UnresolvedDeserializer(encoderFor[T].deserializer)
+    val function = Literal.create(func, ObjectType(classOf[T => Boolean]))
+    val condition = Invoke(function, "apply", BooleanType, deserializer :: Nil)
+    val filter = Filter(condition, logicalPlan)
+    withTypedPlan(filter)
   }
 
   /**
@@ -1977,7 +1981,11 @@ class Dataset[T] private[sql](
    */
   @Experimental
   def filter(func: FilterFunction[T]): Dataset[T] = {
-    withTypedPlan(TypedFilter(func, logicalPlan))
+    val deserializer = UnresolvedDeserializer(encoderFor[T].deserializer)
+    val function = Literal.create(func, ObjectType(classOf[FilterFunction[T]]))
+    val condition = Invoke(function, "call", BooleanType, deserializer :: Nil)
+    val filter = Filter(condition, logicalPlan)
+    withTypedPlan(filter)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/e1bdf1e0/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 5e643ea..b619d4e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -385,8 +385,6 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
         execution.ProjectExec(projectList, planLater(child)) :: Nil
       case logical.Filter(condition, child) =>
         execution.FilterExec(condition, planLater(child)) :: Nil
-      case f: logical.TypedFilter =>
-        execution.FilterExec(f.typedCondition(f.deserializer), 
planLater(f.child)) :: Nil
       case e @ logical.Expand(_, _, child) =>
         execution.ExpandExec(e.projections, e.output, planLater(child)) :: Nil
       case logical.Window(windowExprs, partitionSpec, orderSpec, child) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/e1bdf1e0/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
index ab50513..b15f38c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -238,7 +238,6 @@ abstract class QueryTest extends PlanTest {
       case _: ObjectConsumer => return
       case _: ObjectProducer => return
       case _: AppendColumns => return
-      case _: TypedFilter => return
       case _: LogicalRelation => return
       case p if p.getClass.getSimpleName == "MetastoreRelation" => return
       case _: MemoryPlan => return


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to