This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new a5dcb82 [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion a5dcb82 is described below commit a5dcb82b5a6b08ebfe168e735f6edb40b80420fd Author: Ivan Vergiliev <ivan.vergil...@gmail.com> AuthorDate: Wed Jun 19 10:44:58 2019 +0800 [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion ## What changes were proposed in this pull request? `OrcFilters.createBuilder` has exponential complexity in the height of the filter tree due to the way the check-and-build pattern is implemented. We've hit this in production by passing a `Column` filter to Spark directly, with a job taking multiple hours for a simple set of ~30 filters. This PR changes the checking logic so that the conversion has linear complexity in the size of the tree instead of exponential in its height. Right now, due to the way ORC `SearchArgument` works, the code is forced to do two separate phases when converting a given Spark filter to an ORC filter: 1. Check if the filter is convertible. 2. Only if the check in 1. succeeds, perform the actual conversion into the resulting ORC filter. However, there's one detail which is the culprit in the exponential complexity: phases 1. and 2. are both done using the exact same method. The resulting exponential complexity is easiest to see in the `NOT` case - consider the following code: ``` val f1 = col("id") === lit(5) val f2 = !f1 val f3 = !f2 val f4 = !f3 val f5 = !f4 ``` Now, when we run `createBuilder` on `f5`, we get the following behaviour: 1. call `createBuilder(f4)` to check if the child `f4` is convertible 2. call `createBuilder(f4)` to actually convert it This seems fine when looking at a single level, but what actually ends up happening is: - `createBuilder(f3)` will then recursively be called 4 times - 2 times in step 1., and two times in step 2. - `createBuilder(f2)` will be called 8 times - 4 times in each top-level step, 2 times in each sub-step. - `createBuilder(f1)` will be called 16 times. As a result, having a tree of height > 30 leads to billions of calls to `createBuilder`, heap allocations, and so on and can take multiple hours. The way this PR solves this problem is by separating the `check` and `convert` functionalities into separate functions. This way, the call to `createBuilder` on `f5` above would look like this: 1. call `isConvertible(f4)` to check if the child `f4` is convertible - amortized constant complexity 2. call `createBuilder(f4)` to actually convert it - linear complexity in the size of the subtree. This way, we get an overall complexity that's linear in the size of the filter tree, allowing us to convert tree with 10s of thousands of nodes in milliseconds. The reason this split (`check` and `build`) is possible is that the checking never actually depends on the actual building of the filter. The `check` part of `createBuilder` depends mainly on: - `isSearchableType` for leaf nodes, and - `check`-ing the child filters for composite nodes like NOT, AND and OR. Situations like the `SearchArgumentBuilder` throwing an exception while building the resulting ORC filter are not handled right now - they just get thrown out of the class, and this change preserves this behaviour. This PR extracts this part of the code to a separate class which allows the conversion to make very efficient checks to confirm that a given child is convertible before actually converting it. Results: Before: - converting a skewed tree with a height of ~35 took about 6-7 hours. - converting a skewed tree with hundreds or thousands of nodes would be completely impossible. Now: - filtering against a skewed tree with a height of 1500 in the benchmark suite finishes in less than 10 seconds. ## Steps to reproduce ```scala val schema = StructType.fromDDL("col INT") (20 to 30).foreach { width => val whereFilter = (1 to width).map(i => EqualTo("col", i)).reduceLeft(Or) val start = System.currentTimeMillis() OrcFilters.createFilter(schema, Seq(whereFilter)) println(s"With $width filters, conversion takes ${System.currentTimeMillis() - start} ms") } ``` ### Before this PR ``` With 20 filters, conversion takes 363 ms With 21 filters, conversion takes 496 ms With 22 filters, conversion takes 939 ms With 23 filters, conversion takes 1871 ms With 24 filters, conversion takes 3756 ms With 25 filters, conversion takes 7452 ms With 26 filters, conversion takes 14978 ms With 27 filters, conversion takes 30519 ms With 28 filters, conversion takes 60361 ms // 1 minute With 29 filters, conversion takes 126575 ms // 2 minutes 6 seconds With 30 filters, conversion takes 257369 ms // 4 minutes 17 seconds ``` ### After this PR ``` With 20 filters, conversion takes 12 ms With 21 filters, conversion takes 0 ms With 22 filters, conversion takes 1 ms With 23 filters, conversion takes 0 ms With 24 filters, conversion takes 1 ms With 25 filters, conversion takes 1 ms With 26 filters, conversion takes 0 ms With 27 filters, conversion takes 1 ms With 28 filters, conversion takes 0 ms With 29 filters, conversion takes 1 ms With 30 filters, conversion takes 0 ms ``` ## How was this patch tested? There are no changes in behaviour, and the existing tests pass. Added new benchmarks that expose the problematic behaviour and they finish quickly with the changes applied. Closes #24068 from IvanVergiliev/optimize-orc-filters. Authored-by: Ivan Vergiliev <ivan.vergil...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../execution/datasources/orc/OrcFiltersBase.scala | 4 +- .../sql/execution/datasources/orc/OrcFilters.scala | 366 ++++++++++++--------- .../sql/execution/datasources/orc/OrcFilters.scala | 364 +++++++++++--------- .../org/apache/spark/sql/hive/orc/OrcFilters.scala | 348 ++++++++++++-------- 4 files changed, 639 insertions(+), 443 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala index 8d4898a..0b56587 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala @@ -38,7 +38,7 @@ trait OrcFiltersBase { // Since ORC 1.5.0 (ORC-323), we need to quote for column names with `.` characters // in order to distinguish predicate pushdown for nested columns. - protected def quoteAttributeNameIfNeeded(name: String) : String = { + protected[sql] def quoteAttributeNameIfNeeded(name: String) : String = { if (!name.contains("`") && name.contains(".")) { s"`$name`" } else { @@ -50,7 +50,7 @@ trait OrcFiltersBase { * Return true if this is a searchable type in ORC. * Both CharType and VarcharType are cleaned at AstBuilder. */ - protected def isSearchableType(dataType: DataType) = dataType match { + protected[sql] def isSearchableType(dataType: DataType) = dataType match { case BinaryType => false case _: AtomicType => true case _ => false diff --git a/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index 9e4bf22..e87f7d8 100644 --- a/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -29,8 +29,9 @@ import org.apache.spark.sql.types._ /** * Helper object for building ORC `SearchArgument`s, which are used for ORC predicate push-down. * - * Due to limitation of ORC `SearchArgument` builder, we had to end up with a pretty weird double- - * checking pattern when converting `And`/`Or`/`Not` filters. + * Due to limitation of ORC `SearchArgument` builder, we had to implement separate checking and + * conversion passes through the Filter to make sure we only convert predicates that are known + * to be convertible. * * An ORC `SearchArgument` must be built in one pass using a single builder. For example, you can't * build `a = 1` and `b = 2` first, and then combine them into `a = 1 AND b = 2`. This is quite @@ -39,18 +40,18 @@ import org.apache.spark.sql.types._ * * The annoying part is that, `SearchArgument` builder methods like `startAnd()`, `startOr()`, and * `startNot()` mutate internal state of the builder instance. This forces us to translate all - * convertible filters with a single builder instance. However, before actually converting a filter, - * we've no idea whether it can be recognized by ORC or not. Thus, when an inconvertible filter is - * found, we may already end up with a builder whose internal state is inconsistent. + * convertible filters with a single builder instance. However, if we try to translate a filter + * before checking whether it can be converted or not, we may end up with a builder whose internal + * state is inconsistent in the case of an inconvertible filter. * * For example, to convert an `And` filter with builder `b`, we call `b.startAnd()` first, and then * try to convert its children. Say we convert `left` child successfully, but find that `right` * child is inconvertible. Alas, `b.startAnd()` call can't be rolled back, and `b` is inconsistent * now. * - * The workaround employed here is that, for `And`/`Or`/`Not`, we first try to convert their - * children with brand new builders, and only do the actual conversion with the right builder - * instance when the children are proven to be convertible. + * The workaround employed here is to trim the Spark filters before trying to convert them. This + * way, we can only do the actual conversion on the part of the Filter that is known to be + * convertible. * * P.S.: Hive seems to use `SearchArgument` together with `ExprNodeGenericFuncDesc` only. Usage of * builder methods mentioned above can only be found in test code, where all tested filters are @@ -63,11 +64,12 @@ private[sql] object OrcFilters extends OrcFiltersBase { */ def createFilter(schema: StructType, filters: Seq[Filter]): Option[SearchArgument] = { val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap + val orcFilterConverter = new OrcFilterConverter(dataTypeMap) for { - // Combines all convertible filters using `And` to produce a single conjunction - conjunction <- buildTree(convertibleFilters(schema, dataTypeMap, filters)) + // Combines all filters using `And` to produce a single conjunction + conjunction <- buildTree(filters) // Then tries to build a single ORC `SearchArgument` for the conjunction predicate - builder <- buildSearchArgument(dataTypeMap, conjunction, newBuilder) + builder <- orcFilterConverter.buildSearchArgument(conjunction, newBuilder) } yield builder.build() } @@ -75,43 +77,13 @@ private[sql] object OrcFilters extends OrcFiltersBase { schema: StructType, dataTypeMap: Map[String, DataType], filters: Seq[Filter]): Seq[Filter] = { - import org.apache.spark.sql.sources._ + val orcFilterConverter = new OrcFilterConverter(dataTypeMap) + filters.flatMap(orcFilterConverter.trimUnconvertibleFilters) + } - def convertibleFiltersHelper( - filter: Filter, - canPartialPushDown: Boolean): Option[Filter] = filter match { - case And(left, right) => - val leftResultOptional = convertibleFiltersHelper(left, canPartialPushDown) - val rightResultOptional = convertibleFiltersHelper(right, canPartialPushDown) - (leftResultOptional, rightResultOptional) match { - case (Some(leftResult), Some(rightResult)) => Some(And(leftResult, rightResult)) - case (Some(leftResult), None) if canPartialPushDown => Some(leftResult) - case (None, Some(rightResult)) if canPartialPushDown => Some(rightResult) - case _ => None - } +} - case Or(left, right) => - val leftResultOptional = convertibleFiltersHelper(left, canPartialPushDown) - val rightResultOptional = convertibleFiltersHelper(right, canPartialPushDown) - if (leftResultOptional.isEmpty || rightResultOptional.isEmpty) { - None - } else { - Some(Or(leftResultOptional.get, rightResultOptional.get)) - } - case Not(pred) => - val resultOptional = convertibleFiltersHelper(pred, canPartialPushDown = false) - resultOptional.map(Not) - case other => - if (buildSearchArgument(dataTypeMap, other, newBuilder()).isDefined) { - Some(other) - } else { - None - } - } - filters.flatMap { filter => - convertibleFiltersHelper(filter, true) - } - } +private class OrcFilterConverter(val dataTypeMap: Map[String, DataType]) { /** * Get PredicateLeafType which is corresponding to the given DataType. @@ -143,145 +115,229 @@ private[sql] object OrcFilters extends OrcFiltersBase { case _ => value } + import org.apache.spark.sql.sources._ + import OrcFilters._ + /** - * Build a SearchArgument and return the builder so far. + * Builds a SearchArgument for a Filter by first trimming the non-convertible nodes, and then + * only building the remaining convertible nodes. + * + * Doing the conversion in this way avoids the computational complexity problems introduced by + * checking whether a node is convertible while building it. The approach implemented here has + * complexity that's linear in the size of the Filter tree - O(number of Filter nodes) - we run + * a single pass over the tree to trim it, and then another pass on the trimmed tree to convert + * the remaining nodes. + * + * The alternative approach of checking-while-building can (and did) result + * in exponential complexity in the height of the tree, causing perf problems with Filters with + * as few as ~35 nodes if they were skewed. */ - private def buildSearchArgument( - dataTypeMap: Map[String, DataType], + private[sql] def buildSearchArgument( expression: Filter, builder: Builder): Option[Builder] = { - createBuilder(dataTypeMap, expression, builder, canPartialPushDownConjuncts = true) + trimUnconvertibleFilters(expression).map { filter => + updateBuilder(filter, builder) + builder + } } /** - * @param dataTypeMap a map from the attribute name to its data type. - * @param expression the input filter predicates. - * @param builder the input SearchArgument.Builder. - * @param canPartialPushDownConjuncts whether a subset of conjuncts of predicates can be pushed - * down safely. Pushing ONLY one side of AND down is safe to - * do at the top level or none of its ancestors is NOT and OR. - * @return the builder so far. + * Removes all sub-Filters from a given Filter that are not convertible to an ORC SearchArgument. */ - private def createBuilder( - dataTypeMap: Map[String, DataType], - expression: Filter, - builder: Builder, - canPartialPushDownConjuncts: Boolean): Option[Builder] = { + private[sql] def trimUnconvertibleFilters(expression: Filter): Option[Filter] = { + performAction(TrimUnconvertibleFilters(canPartialPushDownConjuncts = true), expression) + } + + /** + * Builds a SearchArgument for the given Filter. This method should only be called on Filters + * that have previously been trimmed to remove unsupported sub-Filters! + */ + private def updateBuilder(expression: Filter, builder: Builder): Unit = + performAction(BuildSearchArgument(builder), expression) + + sealed trait ActionType[ReturnType] + case class TrimUnconvertibleFilters(canPartialPushDownConjuncts: Boolean) + extends ActionType[Option[Filter]] + case class BuildSearchArgument(builder: Builder) extends ActionType[Unit] + + // The performAction method can run both the filtering and building operations for a given + // node - we signify which one we want with the `actionType` parameter. + // + // There are a couple of benefits to coupling the two operations like this: + // 1. All the logic for a given predicate is grouped logically in the same place. You don't + // have to scroll across the whole file to see what the filter action for an And is while + // you're looking at the build action. + // 2. It's much easier to keep the implementations of the two operations up-to-date with + // each other. If the `filter` and `build` operations are implemented as separate case-matches + // in different methods, it's very easy to change one without appropriately updating the + // other. For example, if we add a new supported node type to `filter`, it would be very + // easy to forget to update `build` to support it too, thus leading to conversion errors. + private def performAction[ReturnType]( + actionType: ActionType[ReturnType], + expression: Filter): ReturnType = { def getType(attribute: String): PredicateLeaf.Type = getPredicateLeafType(dataTypeMap(attribute)) - import org.apache.spark.sql.sources._ - expression match { case And(left, right) => - // At here, it is not safe to just convert one side and remove the other side - // if we do not understand what the parent filters are. - // - // Here is an example used to explain the reason. - // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to - // convert b in ('1'). If we only convert a = 2, we will end up with a filter - // NOT(a = 2), which will generate wrong results. - // - // Pushing one side of AND down is only safe to do at the top level or in the child - // AND before hitting NOT or OR conditions, and in this case, the unsupported predicate - // can be safely removed. - val leftBuilderOption = - createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts) - val rightBuilderOption = - createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts) - (leftBuilderOption, rightBuilderOption) match { - case (Some(_), Some(_)) => - for { - lhs <- createBuilder(dataTypeMap, left, - builder.startAnd(), canPartialPushDownConjuncts) - rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts) - } yield rhs.end() - - case (Some(_), None) if canPartialPushDownConjuncts => - createBuilder(dataTypeMap, left, builder, canPartialPushDownConjuncts) - - case (None, Some(_)) if canPartialPushDownConjuncts => - createBuilder(dataTypeMap, right, builder, canPartialPushDownConjuncts) - - case _ => None + actionType match { + case t @ TrimUnconvertibleFilters(canPartialPushDownConjuncts) => + // At here, it is not safe to just keep one side and remove the other side + // if we do not understand what the parent filters are. + // + // Here is an example used to explain the reason. + // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to + // convert b in ('1'). If we only convert a = 2, we will end up with a filter + // NOT(a = 2), which will generate wrong results. + // + // Pushing one side of AND down is only safe to do at the top level or in the child + // AND before hitting NOT or OR conditions, and in this case, the unsupported + // predicate can be safely removed. + val lhs = performAction(t, left) + val rhs = performAction(t, right) + (lhs, rhs) match { + case (Some(l), Some(r)) => Some(And(l, r)) + case (Some(_), None) if canPartialPushDownConjuncts => lhs + case (None, Some(_)) if canPartialPushDownConjuncts => rhs + case _ => None + } + case b @ BuildSearchArgument(builder) => + builder.startAnd() + performAction(b, left) + performAction(b, right) + builder.end() + () } case Or(left, right) => - // The Or predicate is convertible when both of its children can be pushed down. - // That is to say, if one/both of the children can be partially pushed down, the Or - // predicate can be partially pushed down as well. - // - // Here is an example used to explain the reason. - // Let's say we have - // (a1 AND a2) OR (b1 AND b2), - // a1 and b1 is convertible, while a2 and b2 is not. - // The predicate can be converted as - // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2) - // As per the logical in And predicate, we can push down (a1 OR b1). - for { - _ <- createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts) - _ <- createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts) - lhs <- createBuilder(dataTypeMap, left, - builder.startOr(), canPartialPushDownConjuncts) - rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts) - } yield rhs.end() + actionType match { + case t: TrimUnconvertibleFilters => + // The Or predicate is convertible when both of its children can be pushed down. + // That is to say, if one/both of the children can be partially pushed down, the Or + // predicate can be partially pushed down as well. + // + // Here is an example used to explain the reason. + // Let's say we have + // (a1 AND a2) OR (b1 AND b2), + // a1 and b1 is convertible, while a2 and b2 is not. + // The predicate can be converted as + // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2) + // As per the logical in And predicate, we can push down (a1 OR b1). + for { + lhs: Filter <- performAction(t, left) + rhs: Filter <- performAction(t, right) + } yield Or(lhs, rhs) + case b @ BuildSearchArgument(builder) => + builder.startOr() + performAction(b, left) + performAction(b, right) + builder.end() + () + } case Not(child) => - for { - _ <- createBuilder(dataTypeMap, child, newBuilder, canPartialPushDownConjuncts = false) - negate <- createBuilder(dataTypeMap, - child, builder.startNot(), canPartialPushDownConjuncts = false) - } yield negate.end() + actionType match { + case t: TrimUnconvertibleFilters => + performAction(t.copy(canPartialPushDownConjuncts = false), child).map(Not) + case b @ BuildSearchArgument(builder) => + builder.startNot() + performAction(b, child) + builder.end() + () + } - // NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()` - // call is mandatory. ORC `SearchArgument` builder requires that all leaf predicates must be - // wrapped by a "parent" predicate (`And`, `Or`, or `Not`). + // NOTE: For all case branches dealing with leaf predicates below, the additional + // `startAnd()` call is mandatory. ORC `SearchArgument` builder requires that all leaf + // predicates must be wrapped by a "parent" predicate (`And`, `Or`, or `Not`). case EqualTo(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - Some(builder.startAnd().equals(quotedName, getType(attribute), castedValue).end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + builder.startAnd().equals(quotedName, getType(attribute), castedValue).end() + () + } case EqualNullSafe(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - Some(builder.startAnd().nullSafeEquals(quotedName, getType(attribute), castedValue).end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + builder.startAnd().nullSafeEquals(quotedName, getType(attribute), castedValue).end() + () + } case LessThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - Some(builder.startAnd().lessThan(quotedName, getType(attribute), castedValue).end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + builder.startAnd().lessThan(quotedName, getType(attribute), castedValue).end() + () + } case LessThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - Some(builder.startAnd().lessThanEquals(quotedName, getType(attribute), castedValue).end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + builder.startAnd().lessThanEquals(quotedName, getType(attribute), castedValue).end() + () + } case GreaterThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - Some(builder.startNot().lessThanEquals(quotedName, getType(attribute), castedValue).end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + builder.startNot().lessThanEquals(quotedName, getType(attribute), castedValue).end() + () + } case GreaterThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - Some(builder.startNot().lessThan(quotedName, getType(attribute), castedValue).end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + builder.startNot().lessThan(quotedName, getType(attribute), castedValue).end() + () + } case IsNull(attribute) if isSearchableType(dataTypeMap(attribute)) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - Some(builder.startAnd().isNull(quotedName, getType(attribute)).end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val quotedName = quoteAttributeNameIfNeeded(attribute) + builder.startAnd().isNull(quotedName, getType(attribute)).end() + () + } case IsNotNull(attribute) if isSearchableType(dataTypeMap(attribute)) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - Some(builder.startNot().isNull(quotedName, getType(attribute)).end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val quotedName = quoteAttributeNameIfNeeded(attribute) + builder.startNot().isNull(quotedName, getType(attribute)).end() + () + } case In(attribute, values) if isSearchableType(dataTypeMap(attribute)) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValues = values.map(v => castLiteralValue(v, dataTypeMap(attribute))) - Some(builder.startAnd().in(quotedName, getType(attribute), - castedValues.map(_.asInstanceOf[AnyRef]): _*).end()) + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValues = values.map(v => castLiteralValue(v, dataTypeMap(attribute))) + builder.startAnd().in(quotedName, getType(attribute), + castedValues.map(_.asInstanceOf[AnyRef]): _*).end() + () + } - case _ => None + case _ => + actionType match { + case _: TrimUnconvertibleFilters => None + case BuildSearchArgument(builder) => + throw new IllegalArgumentException(s"Can't build unsupported filter ${expression}") + } } } } + diff --git a/sql/core/v2.3.5/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v2.3.5/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index 632a72a..ebcd356 100644 --- a/sql/core/v2.3.5/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/v2.3.5/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -29,8 +29,9 @@ import org.apache.spark.sql.types._ /** * Helper object for building ORC `SearchArgument`s, which are used for ORC predicate push-down. * - * Due to limitation of ORC `SearchArgument` builder, we had to end up with a pretty weird double- - * checking pattern when converting `And`/`Or`/`Not` filters. + * Due to limitation of ORC `SearchArgument` builder, we had to implement separate checking and + * conversion passes through the Filter to make sure we only convert predicates that are known + * to be convertible. * * An ORC `SearchArgument` must be built in one pass using a single builder. For example, you can't * build `a = 1` and `b = 2` first, and then combine them into `a = 1 AND b = 2`. This is quite @@ -39,18 +40,18 @@ import org.apache.spark.sql.types._ * * The annoying part is that, `SearchArgument` builder methods like `startAnd()`, `startOr()`, and * `startNot()` mutate internal state of the builder instance. This forces us to translate all - * convertible filters with a single builder instance. However, before actually converting a filter, - * we've no idea whether it can be recognized by ORC or not. Thus, when an inconvertible filter is - * found, we may already end up with a builder whose internal state is inconsistent. + * convertible filters with a single builder instance. However, if we try to translate a filter + * before checking whether it can be converted or not, we may end up with a builder whose internal + * state is inconsistent in the case of an inconvertible filter. * * For example, to convert an `And` filter with builder `b`, we call `b.startAnd()` first, and then * try to convert its children. Say we convert `left` child successfully, but find that `right` * child is inconvertible. Alas, `b.startAnd()` call can't be rolled back, and `b` is inconsistent * now. * - * The workaround employed here is that, for `And`/`Or`/`Not`, we first try to convert their - * children with brand new builders, and only do the actual conversion with the right builder - * instance when the children are proven to be convertible. + * The workaround employed here is to trim the Spark filters before trying to convert them. This + * way, we can only do the actual conversion on the part of the Filter that is known to be + * convertible. * * P.S.: Hive seems to use `SearchArgument` together with `ExprNodeGenericFuncDesc` only. Usage of * builder methods mentioned above can only be found in test code, where all tested filters are @@ -63,11 +64,12 @@ private[sql] object OrcFilters extends OrcFiltersBase { */ def createFilter(schema: StructType, filters: Seq[Filter]): Option[SearchArgument] = { val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap + val orcFilterConverter = new OrcFilterConverter(dataTypeMap) for { - // Combines all convertible filters using `And` to produce a single conjunction - conjunction <- buildTree(convertibleFilters(schema, dataTypeMap, filters)) + // Combines all filters using `And` to produce a single conjunction + conjunction <- buildTree(filters) // Then tries to build a single ORC `SearchArgument` for the conjunction predicate - builder <- buildSearchArgument(dataTypeMap, conjunction, newBuilder) + builder <- orcFilterConverter.buildSearchArgument(conjunction, newBuilder) } yield builder.build() } @@ -75,43 +77,13 @@ private[sql] object OrcFilters extends OrcFiltersBase { schema: StructType, dataTypeMap: Map[String, DataType], filters: Seq[Filter]): Seq[Filter] = { - import org.apache.spark.sql.sources._ + val orcFilterConverter = new OrcFilterConverter(dataTypeMap) + filters.flatMap(orcFilterConverter.trimUnconvertibleFilters) + } - def convertibleFiltersHelper( - filter: Filter, - canPartialPushDown: Boolean): Option[Filter] = filter match { - case And(left, right) => - val leftResultOptional = convertibleFiltersHelper(left, canPartialPushDown) - val rightResultOptional = convertibleFiltersHelper(right, canPartialPushDown) - (leftResultOptional, rightResultOptional) match { - case (Some(leftResult), Some(rightResult)) => Some(And(leftResult, rightResult)) - case (Some(leftResult), None) if canPartialPushDown => Some(leftResult) - case (None, Some(rightResult)) if canPartialPushDown => Some(rightResult) - case _ => None - } +} - case Or(left, right) => - val leftResultOptional = convertibleFiltersHelper(left, canPartialPushDown) - val rightResultOptional = convertibleFiltersHelper(right, canPartialPushDown) - if (leftResultOptional.isEmpty || rightResultOptional.isEmpty) { - None - } else { - Some(Or(leftResultOptional.get, rightResultOptional.get)) - } - case Not(pred) => - val resultOptional = convertibleFiltersHelper(pred, canPartialPushDown = false) - resultOptional.map(Not) - case other => - if (buildSearchArgument(dataTypeMap, other, newBuilder()).isDefined) { - Some(other) - } else { - None - } - } - filters.flatMap { filter => - convertibleFiltersHelper(filter, true) - } - } +private class OrcFilterConverter(val dataTypeMap: Map[String, DataType]) { /** * Get PredicateLeafType which is corresponding to the given DataType. @@ -143,144 +115,228 @@ private[sql] object OrcFilters extends OrcFiltersBase { case _ => value } + import org.apache.spark.sql.sources._ + import OrcFilters._ + /** - * Build a SearchArgument and return the builder so far. + * Builds a SearchArgument for a Filter by first trimming the non-convertible nodes, and then + * only building the remaining convertible nodes. + * + * Doing the conversion in this way avoids the computational complexity problems introduced by + * checking whether a node is convertible while building it. The approach implemented here has + * complexity that's linear in the size of the Filter tree - O(number of Filter nodes) - we run + * a single pass over the tree to trim it, and then another pass on the trimmed tree to convert + * the remaining nodes. + * + * The alternative approach of checking-while-building can (and did) result + * in exponential complexity in the height of the tree, causing perf problems with Filters with + * as few as ~35 nodes if they were skewed. */ - private def buildSearchArgument( - dataTypeMap: Map[String, DataType], + private[sql] def buildSearchArgument( expression: Filter, builder: Builder): Option[Builder] = { - createBuilder(dataTypeMap, expression, builder, canPartialPushDownConjuncts = true) + trimUnconvertibleFilters(expression).map { filter => + updateBuilder(filter, builder) + builder + } } /** - * @param dataTypeMap a map from the attribute name to its data type. - * @param expression the input filter predicates. - * @param builder the input SearchArgument.Builder. - * @param canPartialPushDownConjuncts whether a subset of conjuncts of predicates can be pushed - * down safely. Pushing ONLY one side of AND down is safe to - * do at the top level or none of its ancestors is NOT and OR. - * @return the builder so far. + * Removes all sub-Filters from a given Filter that are not convertible to an ORC SearchArgument. */ - private def createBuilder( - dataTypeMap: Map[String, DataType], - expression: Filter, - builder: Builder, - canPartialPushDownConjuncts: Boolean): Option[Builder] = { + private[sql] def trimUnconvertibleFilters(expression: Filter): Option[Filter] = { + performAction(TrimUnconvertibleFilters(canPartialPushDownConjuncts = true), expression) + } + + /** + * Builds a SearchArgument for the given Filter. This method should only be called on Filters + * that have previously been trimmed to remove unsupported sub-Filters! + */ + private def updateBuilder(expression: Filter, builder: Builder): Unit = + performAction(BuildSearchArgument(builder), expression) + + sealed trait ActionType[ReturnType] + case class TrimUnconvertibleFilters(canPartialPushDownConjuncts: Boolean) + extends ActionType[Option[Filter]] + case class BuildSearchArgument(builder: Builder) extends ActionType[Unit] + + // The performAction method can run both the filtering and building operations for a given + // node - we signify which one we want with the `actionType` parameter. + // + // There are a couple of benefits to coupling the two operations like this: + // 1. All the logic for a given predicate is grouped logically in the same place. You don't + // have to scroll across the whole file to see what the filter action for an And is while + // you're looking at the build action. + // 2. It's much easier to keep the implementations of the two operations up-to-date with + // each other. If the `filter` and `build` operations are implemented as separate case-matches + // in different methods, it's very easy to change one without appropriately updating the + // other. For example, if we add a new supported node type to `filter`, it would be very + // easy to forget to update `build` to support it too, thus leading to conversion errors. + private def performAction[ReturnType]( + actionType: ActionType[ReturnType], + expression: Filter): ReturnType = { def getType(attribute: String): PredicateLeaf.Type = getPredicateLeafType(dataTypeMap(attribute)) - import org.apache.spark.sql.sources._ - expression match { case And(left, right) => - // At here, it is not safe to just convert one side and remove the other side - // if we do not understand what the parent filters are. - // - // Here is an example used to explain the reason. - // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to - // convert b in ('1'). If we only convert a = 2, we will end up with a filter - // NOT(a = 2), which will generate wrong results. - // - // Pushing one side of AND down is only safe to do at the top level or in the child - // AND before hitting NOT or OR conditions, and in this case, the unsupported predicate - // can be safely removed. - val leftBuilderOption = - createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts) - val rightBuilderOption = - createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts) - (leftBuilderOption, rightBuilderOption) match { - case (Some(_), Some(_)) => - for { - lhs <- createBuilder(dataTypeMap, left, - builder.startAnd(), canPartialPushDownConjuncts) - rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts) - } yield rhs.end() - - case (Some(_), None) if canPartialPushDownConjuncts => - createBuilder(dataTypeMap, left, builder, canPartialPushDownConjuncts) - - case (None, Some(_)) if canPartialPushDownConjuncts => - createBuilder(dataTypeMap, right, builder, canPartialPushDownConjuncts) - - case _ => None + actionType match { + case t @ TrimUnconvertibleFilters(canPartialPushDownConjuncts) => + // At here, it is not safe to just keep one side and remove the other side + // if we do not understand what the parent filters are. + // + // Here is an example used to explain the reason. + // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to + // convert b in ('1'). If we only convert a = 2, we will end up with a filter + // NOT(a = 2), which will generate wrong results. + // + // Pushing one side of AND down is only safe to do at the top level or in the child + // AND before hitting NOT or OR conditions, and in this case, the unsupported + // predicate can be safely removed. + val lhs = performAction(t, left) + val rhs = performAction(t, right) + (lhs, rhs) match { + case (Some(l), Some(r)) => Some(And(l, r)) + case (Some(_), None) if canPartialPushDownConjuncts => lhs + case (None, Some(_)) if canPartialPushDownConjuncts => rhs + case _ => None + } + case b @ BuildSearchArgument(builder) => + builder.startAnd() + performAction(b, left) + performAction(b, right) + builder.end() + () } case Or(left, right) => - // The Or predicate is convertible when both of its children can be pushed down. - // That is to say, if one/both of the children can be partially pushed down, the Or - // predicate can be partially pushed down as well. - // - // Here is an example used to explain the reason. - // Let's say we have - // (a1 AND a2) OR (b1 AND b2), - // a1 and b1 is convertible, while a2 and b2 is not. - // The predicate can be converted as - // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2) - // As per the logical in And predicate, we can push down (a1 OR b1). - for { - _ <- createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts) - _ <- createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts) - lhs <- createBuilder(dataTypeMap, left, builder.startOr(), canPartialPushDownConjuncts) - rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts) - } yield rhs.end() + actionType match { + case t: TrimUnconvertibleFilters => + // The Or predicate is convertible when both of its children can be pushed down. + // That is to say, if one/both of the children can be partially pushed down, the Or + // predicate can be partially pushed down as well. + // + // Here is an example used to explain the reason. + // Let's say we have + // (a1 AND a2) OR (b1 AND b2), + // a1 and b1 is convertible, while a2 and b2 is not. + // The predicate can be converted as + // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2) + // As per the logical in And predicate, we can push down (a1 OR b1). + for { + lhs: Filter <- performAction(t, left) + rhs: Filter <- performAction(t, right) + } yield Or(lhs, rhs) + case b @ BuildSearchArgument(builder) => + builder.startOr() + performAction(b, left) + performAction(b, right) + builder.end() + () + } case Not(child) => - for { - _ <- createBuilder(dataTypeMap, child, newBuilder, canPartialPushDownConjuncts = false) - negate <- createBuilder(dataTypeMap, - child, builder.startNot(), canPartialPushDownConjuncts = false) - } yield negate.end() + actionType match { + case t: TrimUnconvertibleFilters => + performAction(t.copy(canPartialPushDownConjuncts = false), child).map(Not) + case b @ BuildSearchArgument(builder) => + builder.startNot() + performAction(b, child) + builder.end() + () + } - // NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()` - // call is mandatory. ORC `SearchArgument` builder requires that all leaf predicates must be - // wrapped by a "parent" predicate (`And`, `Or`, or `Not`). + // NOTE: For all case branches dealing with leaf predicates below, the additional + // `startAnd()` call is mandatory. ORC `SearchArgument` builder requires that all leaf + // predicates must be wrapped by a "parent" predicate (`And`, `Or`, or `Not`). case EqualTo(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - Some(builder.startAnd().equals(quotedName, getType(attribute), castedValue).end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + builder.startAnd().equals(quotedName, getType(attribute), castedValue).end() + () + } case EqualNullSafe(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - Some(builder.startAnd().nullSafeEquals(quotedName, getType(attribute), castedValue).end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + builder.startAnd().nullSafeEquals(quotedName, getType(attribute), castedValue).end() + () + } case LessThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - Some(builder.startAnd().lessThan(quotedName, getType(attribute), castedValue).end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + builder.startAnd().lessThan(quotedName, getType(attribute), castedValue).end() + () + } case LessThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - Some(builder.startAnd().lessThanEquals(quotedName, getType(attribute), castedValue).end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + builder.startAnd().lessThanEquals(quotedName, getType(attribute), castedValue).end() + () + } case GreaterThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - Some(builder.startNot().lessThanEquals(quotedName, getType(attribute), castedValue).end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + builder.startNot().lessThanEquals(quotedName, getType(attribute), castedValue).end() + () + } case GreaterThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - Some(builder.startNot().lessThan(quotedName, getType(attribute), castedValue).end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + builder.startNot().lessThan(quotedName, getType(attribute), castedValue).end() + () + } case IsNull(attribute) if isSearchableType(dataTypeMap(attribute)) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - Some(builder.startAnd().isNull(quotedName, getType(attribute)).end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val quotedName = quoteAttributeNameIfNeeded(attribute) + builder.startAnd().isNull(quotedName, getType(attribute)).end() + () + } case IsNotNull(attribute) if isSearchableType(dataTypeMap(attribute)) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - Some(builder.startNot().isNull(quotedName, getType(attribute)).end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val quotedName = quoteAttributeNameIfNeeded(attribute) + builder.startNot().isNull(quotedName, getType(attribute)).end() + () + } case In(attribute, values) if isSearchableType(dataTypeMap(attribute)) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValues = values.map(v => castLiteralValue(v, dataTypeMap(attribute))) - Some(builder.startAnd().in(quotedName, getType(attribute), - castedValues.map(_.asInstanceOf[AnyRef]): _*).end()) + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValues = values.map(v => castLiteralValue(v, dataTypeMap(attribute))) + builder.startAnd().in(quotedName, getType(attribute), + castedValues.map(_.asInstanceOf[AnyRef]): _*).end() + () + } - case _ => None + case _ => + actionType match { + case _: TrimUnconvertibleFilters => None + case BuildSearchArgument(builder) => + throw new IllegalArgumentException(s"Can't build unsupported filter ${expression}") + } } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala index 3bfe157..cee8fdb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala @@ -62,176 +62,260 @@ import org.apache.spark.sql.types._ */ private[orc] object OrcFilters extends Logging { - private def findMethod(klass: Class[_], name: String, args: Class[_]*): Method = { - val method = klass.getMethod(name, args: _*) - method.setAccessible(true) - method - } - def createFilter(schema: StructType, filters: Array[Filter]): Option[SearchArgument] = { if (HiveUtils.isHive23) { DatasourceOrcFilters.createFilter(schema, filters).asInstanceOf[Option[SearchArgument]] } else { val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap - - // First, tries to convert each filter individually to see whether it's convertible, and then - // collect all convertible ones to build the final `SearchArgument`. - val convertibleFilters = for { - filter <- filters - _ <- buildSearchArgument(dataTypeMap, filter, newBuilder) - } yield filter - + val orcFilterConverter = new OrcFilterConverter(dataTypeMap) for { - // Combines all convertible filters using `And` to produce a single conjunction - conjunction <- buildTree(convertibleFilters) + // Combines all filters using `And` to produce a single conjunction + conjunction <- buildTree(filters) // Then tries to build a single ORC `SearchArgument` for the conjunction predicate - builder <- buildSearchArgument(dataTypeMap, conjunction, newBuilder) + builder <- orcFilterConverter.buildSearchArgument(conjunction, newBuilder) } yield builder.build() } } +} - private def buildSearchArgument( - dataTypeMap: Map[String, DataType], - expression: Filter, - builder: Builder): Option[Builder] = { - createBuilder(dataTypeMap, expression, builder, canPartialPushDownConjuncts = true) +private class OrcFilterConverter(val dataTypeMap: Map[String, DataType]) { + + def isSearchableType(dataType: DataType): Boolean = dataType match { + // Only the values in the Spark types below can be recognized by + // the `SearchArgumentImpl.BuilderImpl.boxLiteral()` method. + case ByteType | ShortType | FloatType | DoubleType => true + case IntegerType | LongType | StringType | BooleanType => true + case TimestampType | _: DecimalType => true + case _ => false } + private def findMethod(klass: Class[_], name: String, args: Class[_]*): Method = { + val method = klass.getMethod(name, args: _*) + method.setAccessible(true) + method + } + + import org.apache.spark.sql.sources._ + /** - * @param dataTypeMap a map from the attribute name to its data type. - * @param expression the input filter predicates. - * @param builder the input SearchArgument.Builder. - * @param canPartialPushDownConjuncts whether a subset of conjuncts of predicates can be pushed - * down safely. Pushing ONLY one side of AND down is safe to - * do at the top level or none of its ancestors is NOT and OR. - * @return the builder so far. + * Builds a SearchArgument for a Filter by first trimming the non-convertible nodes, and then + * only building the remaining convertible nodes. + * + * Doing the conversion in this way avoids the computational complexity problems introduced by + * checking whether a node is convertible while building it. The approach implemented here has + * complexity that's linear in the size of the Filter tree - O(number of Filter nodes) - we run + * a single pass over the tree to trim it, and then another pass on the trimmed tree to convert + * the remaining nodes. + * + * The alternative approach of checking-while-building can (and did) result + * in exponential complexity in the height of the tree, causing perf problems with Filters with + * as few as ~35 nodes if they were skewed. */ - private def createBuilder( - dataTypeMap: Map[String, DataType], + private[sql] def buildSearchArgument( expression: Filter, - builder: Builder, - canPartialPushDownConjuncts: Boolean): Option[Builder] = { - def isSearchableType(dataType: DataType): Boolean = dataType match { - // Only the values in the Spark types below can be recognized by - // the `SearchArgumentImpl.BuilderImpl.boxLiteral()` method. - case ByteType | ShortType | FloatType | DoubleType => true - case IntegerType | LongType | StringType | BooleanType => true - case TimestampType | _: DecimalType => true - case _ => false + builder: Builder): Option[Builder] = { + trimUnconvertibleFilters(expression).map { filter => + updateBuilder(filter, builder) + builder } + } - expression match { - case And(left, right) => - // At here, it is not safe to just convert one side and remove the other side - // if we do not understand what the parent filters are. - // - // Here is an example used to explain the reason. - // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to - // convert b in ('1'). If we only convert a = 2, we will end up with a filter - // NOT(a = 2), which will generate wrong results. - // - // Pushing one side of AND down is only safe to do at the top level or in the child - // AND before hitting NOT or OR conditions, and in this case, the unsupported predicate - // can be safely removed. - val leftBuilderOption = - createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts) - val rightBuilderOption = - createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts) - (leftBuilderOption, rightBuilderOption) match { - case (Some(_), Some(_)) => - for { - lhs <- createBuilder(dataTypeMap, left, - builder.startAnd(), canPartialPushDownConjuncts) - rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts) - } yield rhs.end() + /** + * Removes all sub-Filters from a given Filter that are not convertible to an ORC SearchArgument. + */ + private[sql] def trimUnconvertibleFilters(expression: Filter): Option[Filter] = { + performAction(TrimUnconvertibleFilters(canPartialPushDownConjuncts = true), expression) + } - case (Some(_), None) if canPartialPushDownConjuncts => - createBuilder(dataTypeMap, left, builder, canPartialPushDownConjuncts) + /** + * Builds a SearchArgument for the given Filter. This method should only be called on Filters + * that have previously been trimmed to remove unsupported sub-Filters! + */ + private def updateBuilder(expression: Filter, builder: Builder): Unit = + performAction(BuildSearchArgument(builder), expression) - case (None, Some(_)) if canPartialPushDownConjuncts => - createBuilder(dataTypeMap, right, builder, canPartialPushDownConjuncts) + sealed trait ActionType[ReturnType] + case class TrimUnconvertibleFilters(canPartialPushDownConjuncts: Boolean) + extends ActionType[Option[Filter]] + case class BuildSearchArgument(builder: Builder) extends ActionType[Unit] - case _ => None + // The performAction method can run both the filtering and building operations for a given + // node - we signify which one we want with the `actionType` parameter. + // + // There are a couple of benefits to coupling the two operations like this: + // 1. All the logic for a given predicate is grouped logically in the same place. You don't + // have to scroll across the whole file to see what the filter action for an And is while + // you're looking at the build action. + // 2. It's much easier to keep the implementations of the two operations up-to-date with + // each other. If the `filter` and `build` operations are implemented as separate case-matches + // in different methods, it's very easy to change one without appropriately updating the + // other. For example, if we add a new supported node type to `filter`, it would be very + // easy to forget to update `build` to support it too, thus leading to conversion errors. + private def performAction[ReturnType]( + actionType: ActionType[ReturnType], + expression: Filter): ReturnType = { + + expression match { + case And(left, right) => + actionType match { + case t @ TrimUnconvertibleFilters(canPartialPushDownConjuncts) => + // At here, it is not safe to just keep one side and remove the other side + // if we do not understand what the parent filters are. + // + // Here is an example used to explain the reason. + // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to + // convert b in ('1'). If we only convert a = 2, we will end up with a filter + // NOT(a = 2), which will generate wrong results. + // + // Pushing one side of AND down is only safe to do at the top level or in the child + // AND before hitting NOT or OR conditions, and in this case, the unsupported + // predicate can be safely removed. + val lhs = performAction(t, left) + val rhs = performAction(t, right) + (lhs, rhs) match { + case (Some(l), Some(r)) => Some(And(l, r)) + case (Some(_), None) if canPartialPushDownConjuncts => lhs + case (None, Some(_)) if canPartialPushDownConjuncts => rhs + case _ => None + } + case b @ BuildSearchArgument(builder) => + builder.startAnd() + performAction(b, left) + performAction(b, right) + builder.end() + () } case Or(left, right) => - // The Or predicate is convertible when both of its children can be pushed down. - // That is to say, if one/both of the children can be partially pushed down, the Or - // predicate can be partially pushed down as well. - // - // Here is an example used to explain the reason. - // Let's say we have - // (a1 AND a2) OR (b1 AND b2), - // a1 and b1 is convertible, while a2 and b2 is not. - // The predicate can be converted as - // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2) - // As per the logical in And predicate, we can push down (a1 OR b1). - for { - _ <- createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts) - _ <- createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts) - lhs <- createBuilder(dataTypeMap, left, - builder.startOr(), canPartialPushDownConjuncts) - rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts) - } yield rhs.end() + actionType match { + case t: TrimUnconvertibleFilters => + // The Or predicate is convertible when both of its children can be pushed down. + // That is to say, if one/both of the children can be partially pushed down, the Or + // predicate can be partially pushed down as well. + // + // Here is an example used to explain the reason. + // Let's say we have + // (a1 AND a2) OR (b1 AND b2), + // a1 and b1 is convertible, while a2 and b2 is not. + // The predicate can be converted as + // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2) + // As per the logical in And predicate, we can push down (a1 OR b1). + for { + lhs: Filter <- performAction(t, left) + rhs: Filter <- performAction(t, right) + } yield Or(lhs, rhs) + case b @ BuildSearchArgument(builder) => + builder.startOr() + performAction(b, left) + performAction(b, right) + builder.end() + () + } case Not(child) => - for { - _ <- createBuilder(dataTypeMap, child, newBuilder, canPartialPushDownConjuncts = false) - negate <- createBuilder(dataTypeMap, - child, builder.startNot(), canPartialPushDownConjuncts = false) - } yield negate.end() + actionType match { + case t: TrimUnconvertibleFilters => + performAction(t.copy(canPartialPushDownConjuncts = false), child).map(Not) + case b @ BuildSearchArgument(builder) => + builder.startNot() + performAction(b, child) + builder.end() + () + } - // NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()` - // call is mandatory. ORC `SearchArgument` builder requires that all leaf predicates must be - // wrapped by a "parent" predicate (`And`, `Or`, or `Not`). + // NOTE: For all case branches dealing with leaf predicates below, the additional + // `startAnd()` call is mandatory. ORC `SearchArgument` builder requires that all leaf + // predicates must be wrapped by a "parent" predicate (`And`, `Or`, or `Not`). case EqualTo(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - val bd = builder.startAnd() - val method = findMethod(bd.getClass, "equals", classOf[String], classOf[Object]) - Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val bd = builder.startAnd() + val method = findMethod(bd.getClass, "equals", classOf[String], classOf[Object]) + method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end() + () + } case EqualNullSafe(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - val bd = builder.startAnd() - val method = findMethod(bd.getClass, "nullSafeEquals", classOf[String], classOf[Object]) - Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val bd = builder.startAnd() + val method = findMethod(bd.getClass, "nullSafeEquals", classOf[String], classOf[Object]) + method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end() + () + } case LessThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - val bd = builder.startAnd() - val method = findMethod(bd.getClass, "lessThan", classOf[String], classOf[Object]) - Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val bd = builder.startAnd() + val method = findMethod(bd.getClass, "lessThan", classOf[String], classOf[Object]) + method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end() + () + } case LessThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - val bd = builder.startAnd() - val method = findMethod(bd.getClass, "lessThanEquals", classOf[String], classOf[Object]) - Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val bd = builder.startAnd() + val method = findMethod(bd.getClass, "lessThanEquals", classOf[String], classOf[Object]) + method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end() + () + } case GreaterThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - val bd = builder.startNot() - val method = findMethod(bd.getClass, "lessThanEquals", classOf[String], classOf[Object]) - Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val bd = builder.startNot() + val method = findMethod(bd.getClass, "lessThanEquals", classOf[String], classOf[Object]) + method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end() + () + } case GreaterThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - val bd = builder.startNot() - val method = findMethod(bd.getClass, "lessThan", classOf[String], classOf[Object]) - Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val bd = builder.startNot() + val method = findMethod(bd.getClass, "lessThan", classOf[String], classOf[Object]) + method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end() + () + } case IsNull(attribute) if isSearchableType(dataTypeMap(attribute)) => - val bd = builder.startAnd() - val method = findMethod(bd.getClass, "isNull", classOf[String]) - Some(method.invoke(bd, attribute).asInstanceOf[Builder].end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val bd = builder.startAnd() + val method = findMethod(bd.getClass, "isNull", classOf[String]) + method.invoke(bd, attribute).asInstanceOf[Builder].end() + () + } case IsNotNull(attribute) if isSearchableType(dataTypeMap(attribute)) => - val bd = builder.startNot() - val method = findMethod(bd.getClass, "isNull", classOf[String]) - Some(method.invoke(bd, attribute).asInstanceOf[Builder].end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val bd = builder.startNot() + val method = findMethod(bd.getClass, "isNull", classOf[String]) + method.invoke(bd, attribute).asInstanceOf[Builder].end() + () + } case In(attribute, values) if isSearchableType(dataTypeMap(attribute)) => - val bd = builder.startAnd() - val method = findMethod(bd.getClass, "in", classOf[String], classOf[Array[Object]]) - Some(method.invoke(bd, attribute, values.map(_.asInstanceOf[AnyRef])) - .asInstanceOf[Builder].end()) + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val bd = builder.startAnd() + val method = findMethod(bd.getClass, "in", classOf[String], classOf[Array[Object]]) + method.invoke(bd, attribute, values.map(_.asInstanceOf[AnyRef])) + .asInstanceOf[Builder].end() + () + } - case _ => None + case _ => + actionType match { + case _: TrimUnconvertibleFilters => None + case BuildSearchArgument(builder) => + throw new IllegalArgumentException(s"Can't build unsupported filter ${expression}") + } } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org