Github user liancheng commented on a diff in the pull request: https://github.com/apache/spark/pull/13846#discussion_r68750055 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1637,55 +1654,31 @@ case class GetCurrentDatabase(sessionCatalog: SessionCatalog) extends Rule[Logic } /** - * Typed [[Filter]] is by default surrounded by a [[DeserializeToObject]] beneath it and a - * [[SerializeFromObject]] above it. If these serializations can't be eliminated, we should embed - * the deserializer in filter condition to save the extra serialization at last. + * Combines all adjacent [[TypedFilter]]s, which operate on same type object in condition, into a + * single [[Filter]]. */ -object EmbedSerializerInFilter extends Rule[LogicalPlan] { +object CombineTypedFilters extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case s @ SerializeFromObject(_, Filter(condition, d: DeserializeToObject)) - // SPARK-15632: Conceptually, filter operator should never introduce schema change. This - // optimization rule also relies on this assumption. However, Dataset typed filter operator - // does introduce schema changes in some cases. Thus, we only enable this optimization when - // - // 1. either input and output schemata are exactly the same, or - // 2. both input and output schemata are single-field schema and share the same type. - // - // The 2nd case is included because encoders for primitive types always have only a single - // field with hard-coded field name "value". - // TODO Cleans this up after fixing SPARK-15632. - if s.schema == d.child.schema || samePrimitiveType(s.schema, d.child.schema) => - - val numObjects = condition.collect { - case a: Attribute if a == d.output.head => a - }.length - - if (numObjects > 1) { - // If the filter condition references the object more than one times, we should not embed - // deserializer in it as the deserialization will happen many times and slow down the - // execution. - // TODO: we can still embed it if we can make sure subexpression elimination works here. - s + case t @ TypedFilter(_, deserializer, child) => + val filters = collectTypedFiltersOnSameTypeObj(child, deserializer.dataType, ArrayBuffer(t)) + if (filters.length > 1) { + val objHolder = BoundReference(0, deserializer.dataType, nullable = false) + val condition = filters.map(_.getCondition(objHolder)).reduce(And) + Filter(ReferenceToExpressions(condition, deserializer :: Nil), filters.last.child) } else { - val newCondition = condition transform { - case a: Attribute if a == d.output.head => d.deserializer - } - val filter = Filter(newCondition, d.child) - - // Adds an extra Project here, to preserve the output expr id of `SerializeFromObject`. - // We will remove it later in RemoveAliasOnlyProject rule. - val objAttrs = filter.output.zip(s.output).map { case (fout, sout) => - Alias(fout, fout.name)(exprId = sout.exprId) - } - Project(objAttrs, filter) + t } } - def samePrimitiveType(lhs: StructType, rhs: StructType): Boolean = { - (lhs, rhs) match { - case (StructType(Array(f1)), StructType(Array(f2))) => f1.dataType == f2.dataType - case _ => false - } + @tailrec + private def collectTypedFiltersOnSameTypeObj( + plan: LogicalPlan, + objType: DataType, + filters: ArrayBuffer[TypedFilter]): Array[TypedFilter] = plan match { + case t: TypedFilter if t.deserializer.dataType == objType => + filters += t --- End diff -- Shall we prepend rather than append found filters here? Otherwise filter predicates will be evaluated in reverse order after being combined. Also would be nice to comment about this.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org