Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13846#discussion_r68750055
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
    @@ -1637,55 +1654,31 @@ case class GetCurrentDatabase(sessionCatalog: 
SessionCatalog) extends Rule[Logic
     }
     
     /**
    - * Typed [[Filter]] is by default surrounded by a [[DeserializeToObject]] 
beneath it and a
    - * [[SerializeFromObject]] above it.  If these serializations can't be 
eliminated, we should embed
    - * the deserializer in filter condition to save the extra serialization at 
last.
    + * Combines all adjacent [[TypedFilter]]s, which operate on same type 
object in condition, into a
    + * single [[Filter]].
      */
    -object EmbedSerializerInFilter extends Rule[LogicalPlan] {
    +object CombineTypedFilters extends Rule[LogicalPlan] {
       def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    -    case s @ SerializeFromObject(_, Filter(condition, d: 
DeserializeToObject))
    -      // SPARK-15632: Conceptually, filter operator should never introduce 
schema change. This
    -      // optimization rule also relies on this assumption. However, 
Dataset typed filter operator
    -      // does introduce schema changes in some cases. Thus, we only enable 
this optimization when
    -      //
    -      //  1. either input and output schemata are exactly the same, or
    -      //  2. both input and output schemata are single-field schema and 
share the same type.
    -      //
    -      // The 2nd case is included because encoders for primitive types 
always have only a single
    -      // field with hard-coded field name "value".
    -      // TODO Cleans this up after fixing SPARK-15632.
    -      if s.schema == d.child.schema || samePrimitiveType(s.schema, 
d.child.schema) =>
    -
    -      val numObjects = condition.collect {
    -        case a: Attribute if a == d.output.head => a
    -      }.length
    -
    -      if (numObjects > 1) {
    -        // If the filter condition references the object more than one 
times, we should not embed
    -        // deserializer in it as the deserialization will happen many 
times and slow down the
    -        // execution.
    -        // TODO: we can still embed it if we can make sure subexpression 
elimination works here.
    -        s
    +    case t @ TypedFilter(_, deserializer, child) =>
    +      val filters = collectTypedFiltersOnSameTypeObj(child, 
deserializer.dataType, ArrayBuffer(t))
    +      if (filters.length > 1) {
    +        val objHolder = BoundReference(0, deserializer.dataType, nullable 
= false)
    +        val condition = filters.map(_.getCondition(objHolder)).reduce(And)
    +        Filter(ReferenceToExpressions(condition, deserializer :: Nil), 
filters.last.child)
           } else {
    -        val newCondition = condition transform {
    -          case a: Attribute if a == d.output.head => d.deserializer
    -        }
    -        val filter = Filter(newCondition, d.child)
    -
    -        // Adds an extra Project here, to preserve the output expr id of 
`SerializeFromObject`.
    -        // We will remove it later in RemoveAliasOnlyProject rule.
    -        val objAttrs = filter.output.zip(s.output).map { case (fout, sout) 
=>
    -          Alias(fout, fout.name)(exprId = sout.exprId)
    -        }
    -        Project(objAttrs, filter)
    +        t
           }
       }
     
    -  def samePrimitiveType(lhs: StructType, rhs: StructType): Boolean = {
    -    (lhs, rhs) match {
    -      case (StructType(Array(f1)), StructType(Array(f2))) => f1.dataType 
== f2.dataType
    -      case _ => false
    -    }
    +  @tailrec
    +  private def collectTypedFiltersOnSameTypeObj(
    +      plan: LogicalPlan,
    +      objType: DataType,
    +      filters: ArrayBuffer[TypedFilter]): Array[TypedFilter] = plan match {
    +    case t: TypedFilter if t.deserializer.dataType == objType =>
    +      filters += t
    --- End diff --
    
    Shall we prepend rather than append found filters here? Otherwise filter 
predicates will be evaluated in reverse order after being combined. Also would 
be nice to comment about this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to