This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a5dcb82  [SPARK-27105][SQL] Optimize away exponential complexity in 
ORC predicate conversion
a5dcb82 is described below

commit a5dcb82b5a6b08ebfe168e735f6edb40b80420fd
Author: Ivan Vergiliev <ivan.vergil...@gmail.com>
AuthorDate: Wed Jun 19 10:44:58 2019 +0800

    [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate 
conversion
    
    ## What changes were proposed in this pull request?
    
    `OrcFilters.createBuilder` has exponential complexity in the height of the 
filter tree due to the way the check-and-build pattern is implemented. We've 
hit this in production by passing a `Column` filter to Spark directly, with a 
job taking multiple hours for a simple set of ~30 filters. This PR changes the 
checking logic so that the conversion has linear complexity in the size of the 
tree instead of exponential in its height.
    
    Right now, due to the way ORC `SearchArgument` works, the code is forced to 
do two separate phases when converting a given Spark filter to an ORC filter:
    1. Check if the filter is convertible.
    2. Only if the check in 1. succeeds, perform the actual conversion into the 
resulting ORC filter.
    
    However, there's one detail which is the culprit in the exponential 
complexity: phases 1. and 2. are both done using the exact same method. The 
resulting exponential complexity is easiest to see in the `NOT` case - consider 
the following code:
    
    ```
    val f1 = col("id") === lit(5)
    val f2 = !f1
    val f3 = !f2
    val f4 = !f3
    val f5 = !f4
    ```
    
    Now, when we run `createBuilder` on `f5`, we get the following behaviour:
    1. call `createBuilder(f4)` to check if the child `f4` is convertible
    2. call `createBuilder(f4)` to actually convert it
    
    This seems fine when looking at a single level, but what actually ends up 
happening is:
    - `createBuilder(f3)` will then recursively be called 4 times - 2 times in 
step 1., and two times in step 2.
    - `createBuilder(f2)` will be called 8 times - 4 times in each top-level 
step, 2 times in each sub-step.
    - `createBuilder(f1)` will be called 16 times.
    
    As a result, having a tree of height > 30 leads to billions of calls to 
`createBuilder`, heap allocations, and so on and can take multiple hours.
    
    The way this PR solves this problem is by separating the `check` and 
`convert` functionalities into separate functions. This way, the call to 
`createBuilder` on `f5` above would look like this:
    1. call `isConvertible(f4)` to check if the child `f4` is convertible - 
amortized constant complexity
    2. call `createBuilder(f4)` to actually convert it - linear complexity in 
the size of the subtree.
    
    This way, we get an overall complexity that's linear in the size of the 
filter tree, allowing us to convert tree with 10s of thousands of nodes in 
milliseconds.
    
    The reason this split (`check` and `build`) is possible is that the 
checking never actually depends on the actual building of the filter. The 
`check` part of `createBuilder` depends mainly on:
    - `isSearchableType` for leaf nodes, and
    - `check`-ing the child filters for composite nodes like NOT, AND and OR.
    Situations like the `SearchArgumentBuilder` throwing an exception while 
building the resulting ORC filter are not handled right now - they just get 
thrown out of the class, and this change preserves this behaviour.
    
    This PR extracts this part of the code to a separate class which allows the 
conversion to make very efficient checks to confirm that a given child is 
convertible before actually converting it.
    
    Results:
    Before:
    - converting a skewed tree with a height of ~35 took about 6-7 hours.
    - converting a skewed tree with hundreds or thousands of nodes would be 
completely impossible.
    
    Now:
    - filtering against a skewed tree with a height of 1500 in the benchmark 
suite finishes in less than 10 seconds.
    
    ## Steps to reproduce
    ```scala
    val schema = StructType.fromDDL("col INT")
    (20 to 30).foreach { width =>
      val whereFilter = (1 to width).map(i => EqualTo("col", i)).reduceLeft(Or)
      val start = System.currentTimeMillis()
      OrcFilters.createFilter(schema, Seq(whereFilter))
      println(s"With $width filters, conversion takes 
${System.currentTimeMillis() - start} ms")
    }
    ```
    
    ### Before this PR
    ```
    With 20 filters, conversion takes 363 ms
    With 21 filters, conversion takes 496 ms
    With 22 filters, conversion takes 939 ms
    With 23 filters, conversion takes 1871 ms
    With 24 filters, conversion takes 3756 ms
    With 25 filters, conversion takes 7452 ms
    With 26 filters, conversion takes 14978 ms
    With 27 filters, conversion takes 30519 ms
    With 28 filters, conversion takes 60361 ms // 1 minute
    With 29 filters, conversion takes 126575 ms // 2 minutes 6 seconds
    With 30 filters, conversion takes 257369 ms // 4 minutes 17 seconds
    ```
    
    ### After this PR
    ```
    With 20 filters, conversion takes 12 ms
    With 21 filters, conversion takes 0 ms
    With 22 filters, conversion takes 1 ms
    With 23 filters, conversion takes 0 ms
    With 24 filters, conversion takes 1 ms
    With 25 filters, conversion takes 1 ms
    With 26 filters, conversion takes 0 ms
    With 27 filters, conversion takes 1 ms
    With 28 filters, conversion takes 0 ms
    With 29 filters, conversion takes 1 ms
    With 30 filters, conversion takes 0 ms
    ```
    
    ## How was this patch tested?
    
    There are no changes in behaviour, and the existing tests pass. Added new 
benchmarks that expose the problematic behaviour and they finish quickly with 
the changes applied.
    
    Closes #24068 from IvanVergiliev/optimize-orc-filters.
    
    Authored-by: Ivan Vergiliev <ivan.vergil...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../execution/datasources/orc/OrcFiltersBase.scala |   4 +-
 .../sql/execution/datasources/orc/OrcFilters.scala | 366 ++++++++++++---------
 .../sql/execution/datasources/orc/OrcFilters.scala | 364 +++++++++++---------
 .../org/apache/spark/sql/hive/orc/OrcFilters.scala | 348 ++++++++++++--------
 4 files changed, 639 insertions(+), 443 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala
index 8d4898a..0b56587 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala
@@ -38,7 +38,7 @@ trait OrcFiltersBase {
 
   // Since ORC 1.5.0 (ORC-323), we need to quote for column names with `.` 
characters
   // in order to distinguish predicate pushdown for nested columns.
-  protected def quoteAttributeNameIfNeeded(name: String) : String = {
+  protected[sql] def quoteAttributeNameIfNeeded(name: String) : String = {
     if (!name.contains("`") && name.contains(".")) {
       s"`$name`"
     } else {
@@ -50,7 +50,7 @@ trait OrcFiltersBase {
    * Return true if this is a searchable type in ORC.
    * Both CharType and VarcharType are cleaned at AstBuilder.
    */
-  protected def isSearchableType(dataType: DataType) = dataType match {
+  protected[sql] def isSearchableType(dataType: DataType) = dataType match {
     case BinaryType => false
     case _: AtomicType => true
     case _ => false
diff --git 
a/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
 
b/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
index 9e4bf22..e87f7d8 100644
--- 
a/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
+++ 
b/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
@@ -29,8 +29,9 @@ import org.apache.spark.sql.types._
 /**
  * Helper object for building ORC `SearchArgument`s, which are used for ORC 
predicate push-down.
  *
- * Due to limitation of ORC `SearchArgument` builder, we had to end up with a 
pretty weird double-
- * checking pattern when converting `And`/`Or`/`Not` filters.
+ * Due to limitation of ORC `SearchArgument` builder, we had to implement 
separate checking and
+ * conversion passes through the Filter to make sure we only convert 
predicates that are known
+ * to be convertible.
  *
  * An ORC `SearchArgument` must be built in one pass using a single builder.  
For example, you can't
  * build `a = 1` and `b = 2` first, and then combine them into `a = 1 AND b = 
2`.  This is quite
@@ -39,18 +40,18 @@ import org.apache.spark.sql.types._
  *
  * The annoying part is that, `SearchArgument` builder methods like 
`startAnd()`, `startOr()`, and
  * `startNot()` mutate internal state of the builder instance.  This forces us 
to translate all
- * convertible filters with a single builder instance. However, before 
actually converting a filter,
- * we've no idea whether it can be recognized by ORC or not. Thus, when an 
inconvertible filter is
- * found, we may already end up with a builder whose internal state is 
inconsistent.
+ * convertible filters with a single builder instance. However, if we try to 
translate a filter
+ * before checking whether it can be converted or not, we may end up with a 
builder whose internal
+ * state is inconsistent in the case of an inconvertible filter.
  *
  * For example, to convert an `And` filter with builder `b`, we call 
`b.startAnd()` first, and then
  * try to convert its children.  Say we convert `left` child successfully, but 
find that `right`
  * child is inconvertible.  Alas, `b.startAnd()` call can't be rolled back, 
and `b` is inconsistent
  * now.
  *
- * The workaround employed here is that, for `And`/`Or`/`Not`, we first try to 
convert their
- * children with brand new builders, and only do the actual conversion with 
the right builder
- * instance when the children are proven to be convertible.
+ * The workaround employed here is to trim the Spark filters before trying to 
convert them. This
+ * way, we can only do the actual conversion on the part of the Filter that is 
known to be
+ * convertible.
  *
  * P.S.: Hive seems to use `SearchArgument` together with 
`ExprNodeGenericFuncDesc` only.  Usage of
  * builder methods mentioned above can only be found in test code, where all 
tested filters are
@@ -63,11 +64,12 @@ private[sql] object OrcFilters extends OrcFiltersBase {
    */
   def createFilter(schema: StructType, filters: Seq[Filter]): 
Option[SearchArgument] = {
     val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap
+    val orcFilterConverter = new OrcFilterConverter(dataTypeMap)
     for {
-      // Combines all convertible filters using `And` to produce a single 
conjunction
-      conjunction <- buildTree(convertibleFilters(schema, dataTypeMap, 
filters))
+      // Combines all filters using `And` to produce a single conjunction
+      conjunction <- buildTree(filters)
       // Then tries to build a single ORC `SearchArgument` for the conjunction 
predicate
-      builder <- buildSearchArgument(dataTypeMap, conjunction, newBuilder)
+      builder <- orcFilterConverter.buildSearchArgument(conjunction, 
newBuilder)
     } yield builder.build()
   }
 
@@ -75,43 +77,13 @@ private[sql] object OrcFilters extends OrcFiltersBase {
       schema: StructType,
       dataTypeMap: Map[String, DataType],
       filters: Seq[Filter]): Seq[Filter] = {
-    import org.apache.spark.sql.sources._
+    val orcFilterConverter = new OrcFilterConverter(dataTypeMap)
+    filters.flatMap(orcFilterConverter.trimUnconvertibleFilters)
+  }
 
-    def convertibleFiltersHelper(
-        filter: Filter,
-        canPartialPushDown: Boolean): Option[Filter] = filter match {
-      case And(left, right) =>
-        val leftResultOptional = convertibleFiltersHelper(left, 
canPartialPushDown)
-        val rightResultOptional = convertibleFiltersHelper(right, 
canPartialPushDown)
-        (leftResultOptional, rightResultOptional) match {
-          case (Some(leftResult), Some(rightResult)) => Some(And(leftResult, 
rightResult))
-          case (Some(leftResult), None) if canPartialPushDown => 
Some(leftResult)
-          case (None, Some(rightResult)) if canPartialPushDown => 
Some(rightResult)
-          case _ => None
-        }
+}
 
-      case Or(left, right) =>
-        val leftResultOptional = convertibleFiltersHelper(left, 
canPartialPushDown)
-        val rightResultOptional = convertibleFiltersHelper(right, 
canPartialPushDown)
-        if (leftResultOptional.isEmpty || rightResultOptional.isEmpty) {
-          None
-        } else {
-          Some(Or(leftResultOptional.get, rightResultOptional.get))
-        }
-      case Not(pred) =>
-        val resultOptional = convertibleFiltersHelper(pred, canPartialPushDown 
= false)
-        resultOptional.map(Not)
-      case other =>
-        if (buildSearchArgument(dataTypeMap, other, newBuilder()).isDefined) {
-          Some(other)
-        } else {
-          None
-        }
-    }
-    filters.flatMap { filter =>
-      convertibleFiltersHelper(filter, true)
-    }
-  }
+private class OrcFilterConverter(val dataTypeMap: Map[String, DataType]) {
 
   /**
    * Get PredicateLeafType which is corresponding to the given DataType.
@@ -143,145 +115,229 @@ private[sql] object OrcFilters extends OrcFiltersBase {
     case _ => value
   }
 
+  import org.apache.spark.sql.sources._
+  import OrcFilters._
+
   /**
-   * Build a SearchArgument and return the builder so far.
+   * Builds a SearchArgument for a Filter by first trimming the 
non-convertible nodes, and then
+   * only building the remaining convertible nodes.
+   *
+   * Doing the conversion in this way avoids the computational complexity 
problems introduced by
+   * checking whether a node is convertible while building it. The approach 
implemented here has
+   * complexity that's linear in the size of the Filter tree - O(number of 
Filter nodes) - we run
+   * a single pass over the tree to trim it, and then another pass on the 
trimmed tree to convert
+   * the remaining nodes.
+   *
+   * The alternative approach of checking-while-building can (and did) result
+   * in exponential complexity in the height of the tree, causing perf 
problems with Filters with
+   * as few as ~35 nodes if they were skewed.
    */
-  private def buildSearchArgument(
-      dataTypeMap: Map[String, DataType],
+  private[sql] def buildSearchArgument(
       expression: Filter,
       builder: Builder): Option[Builder] = {
-    createBuilder(dataTypeMap, expression, builder, 
canPartialPushDownConjuncts = true)
+    trimUnconvertibleFilters(expression).map { filter =>
+      updateBuilder(filter, builder)
+      builder
+    }
   }
 
   /**
-   * @param dataTypeMap a map from the attribute name to its data type.
-   * @param expression the input filter predicates.
-   * @param builder the input SearchArgument.Builder.
-   * @param canPartialPushDownConjuncts whether a subset of conjuncts of 
predicates can be pushed
-   *                                    down safely. Pushing ONLY one side of 
AND down is safe to
-   *                                    do at the top level or none of its 
ancestors is NOT and OR.
-   * @return the builder so far.
+   * Removes all sub-Filters from a given Filter that are not convertible to 
an ORC SearchArgument.
    */
-  private def createBuilder(
-      dataTypeMap: Map[String, DataType],
-      expression: Filter,
-      builder: Builder,
-      canPartialPushDownConjuncts: Boolean): Option[Builder] = {
+  private[sql] def trimUnconvertibleFilters(expression: Filter): 
Option[Filter] = {
+    performAction(TrimUnconvertibleFilters(canPartialPushDownConjuncts = 
true), expression)
+  }
+
+  /**
+   * Builds a SearchArgument for the given Filter. This method should only be 
called on Filters
+   * that have previously been trimmed to remove unsupported sub-Filters!
+   */
+  private def updateBuilder(expression: Filter, builder: Builder): Unit =
+    performAction(BuildSearchArgument(builder), expression)
+
+  sealed trait ActionType[ReturnType]
+  case class TrimUnconvertibleFilters(canPartialPushDownConjuncts: Boolean)
+      extends ActionType[Option[Filter]]
+  case class BuildSearchArgument(builder: Builder) extends ActionType[Unit]
+
+  // The performAction method can run both the filtering and building 
operations for a given
+  // node - we signify which one we want with the `actionType` parameter.
+  //
+  // There are a couple of benefits to coupling the two operations like this:
+  // 1. All the logic for a given predicate is grouped logically in the same 
place. You don't
+  //   have to scroll across the whole file to see what the filter action for 
an And is while
+  //   you're looking at the build action.
+  // 2. It's much easier to keep the implementations of the two operations 
up-to-date with
+  //   each other. If the `filter` and `build` operations are implemented as 
separate case-matches
+  //   in different methods, it's very easy to change one without 
appropriately updating the
+  //   other. For example, if we add a new supported node type to `filter`, it 
would be very
+  //   easy to forget to update `build` to support it too, thus leading to 
conversion errors.
+  private def performAction[ReturnType](
+      actionType: ActionType[ReturnType],
+      expression: Filter): ReturnType = {
     def getType(attribute: String): PredicateLeaf.Type =
       getPredicateLeafType(dataTypeMap(attribute))
 
-    import org.apache.spark.sql.sources._
-
     expression match {
       case And(left, right) =>
-        // At here, it is not safe to just convert one side and remove the 
other side
-        // if we do not understand what the parent filters are.
-        //
-        // Here is an example used to explain the reason.
-        // Let's say we have NOT(a = 2 AND b in ('1')) and we do not 
understand how to
-        // convert b in ('1'). If we only convert a = 2, we will end up with a 
filter
-        // NOT(a = 2), which will generate wrong results.
-        //
-        // Pushing one side of AND down is only safe to do at the top level or 
in the child
-        // AND before hitting NOT or OR conditions, and in this case, the 
unsupported predicate
-        // can be safely removed.
-        val leftBuilderOption =
-          createBuilder(dataTypeMap, left, newBuilder, 
canPartialPushDownConjuncts)
-        val rightBuilderOption =
-          createBuilder(dataTypeMap, right, newBuilder, 
canPartialPushDownConjuncts)
-        (leftBuilderOption, rightBuilderOption) match {
-          case (Some(_), Some(_)) =>
-            for {
-              lhs <- createBuilder(dataTypeMap, left,
-                builder.startAnd(), canPartialPushDownConjuncts)
-              rhs <- createBuilder(dataTypeMap, right, lhs, 
canPartialPushDownConjuncts)
-            } yield rhs.end()
-
-          case (Some(_), None) if canPartialPushDownConjuncts =>
-            createBuilder(dataTypeMap, left, builder, 
canPartialPushDownConjuncts)
-
-          case (None, Some(_)) if canPartialPushDownConjuncts =>
-            createBuilder(dataTypeMap, right, builder, 
canPartialPushDownConjuncts)
-
-          case _ => None
+        actionType match {
+          case t @ TrimUnconvertibleFilters(canPartialPushDownConjuncts) =>
+            // At here, it is not safe to just keep one side and remove the 
other side
+            // if we do not understand what the parent filters are.
+            //
+            // Here is an example used to explain the reason.
+            // Let's say we have NOT(a = 2 AND b in ('1')) and we do not 
understand how to
+            // convert b in ('1'). If we only convert a = 2, we will end up 
with a filter
+            // NOT(a = 2), which will generate wrong results.
+            //
+            // Pushing one side of AND down is only safe to do at the top 
level or in the child
+            // AND before hitting NOT or OR conditions, and in this case, the 
unsupported
+            // predicate can be safely removed.
+            val lhs = performAction(t, left)
+            val rhs = performAction(t, right)
+            (lhs, rhs) match {
+              case (Some(l), Some(r)) => Some(And(l, r))
+              case (Some(_), None) if canPartialPushDownConjuncts => lhs
+              case (None, Some(_)) if canPartialPushDownConjuncts => rhs
+              case _ => None
+            }
+          case b @ BuildSearchArgument(builder) =>
+            builder.startAnd()
+            performAction(b, left)
+            performAction(b, right)
+            builder.end()
+            ()
         }
 
       case Or(left, right) =>
-        // The Or predicate is convertible when both of its children can be 
pushed down.
-        // That is to say, if one/both of the children can be partially pushed 
down, the Or
-        // predicate can be partially pushed down as well.
-        //
-        // Here is an example used to explain the reason.
-        // Let's say we have
-        // (a1 AND a2) OR (b1 AND b2),
-        // a1 and b1 is convertible, while a2 and b2 is not.
-        // The predicate can be converted as
-        // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2)
-        // As per the logical in And predicate, we can push down (a1 OR b1).
-        for {
-          _ <- createBuilder(dataTypeMap, left, newBuilder, 
canPartialPushDownConjuncts)
-          _ <- createBuilder(dataTypeMap, right, newBuilder, 
canPartialPushDownConjuncts)
-          lhs <- createBuilder(dataTypeMap, left,
-            builder.startOr(), canPartialPushDownConjuncts)
-          rhs <- createBuilder(dataTypeMap, right, lhs, 
canPartialPushDownConjuncts)
-        } yield rhs.end()
+        actionType match {
+          case t: TrimUnconvertibleFilters =>
+            // The Or predicate is convertible when both of its children can 
be pushed down.
+            // That is to say, if one/both of the children can be partially 
pushed down, the Or
+            // predicate can be partially pushed down as well.
+            //
+            // Here is an example used to explain the reason.
+            // Let's say we have
+            // (a1 AND a2) OR (b1 AND b2),
+            // a1 and b1 is convertible, while a2 and b2 is not.
+            // The predicate can be converted as
+            // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2)
+            // As per the logical in And predicate, we can push down (a1 OR 
b1).
+            for {
+              lhs: Filter <- performAction(t, left)
+              rhs: Filter <- performAction(t, right)
+            } yield Or(lhs, rhs)
+          case b @ BuildSearchArgument(builder) =>
+            builder.startOr()
+            performAction(b, left)
+            performAction(b, right)
+            builder.end()
+            ()
+        }
 
       case Not(child) =>
-        for {
-          _ <- createBuilder(dataTypeMap, child, newBuilder, 
canPartialPushDownConjuncts = false)
-          negate <- createBuilder(dataTypeMap,
-            child, builder.startNot(), canPartialPushDownConjuncts = false)
-        } yield negate.end()
+        actionType match {
+          case t: TrimUnconvertibleFilters =>
+            performAction(t.copy(canPartialPushDownConjuncts = false), 
child).map(Not)
+          case b @ BuildSearchArgument(builder) =>
+            builder.startNot()
+            performAction(b, child)
+            builder.end()
+            ()
+        }
 
-      // NOTE: For all case branches dealing with leaf predicates below, the 
additional `startAnd()`
-      // call is mandatory.  ORC `SearchArgument` builder requires that all 
leaf predicates must be
-      // wrapped by a "parent" predicate (`And`, `Or`, or `Not`).
+      // NOTE: For all case branches dealing with leaf predicates below, the 
additional
+      // `startAnd()` call is mandatory.  ORC `SearchArgument` builder 
requires that all leaf
+      // predicates must be wrapped by a "parent" predicate (`And`, `Or`, or 
`Not`).
 
       case EqualTo(attribute, value) if 
isSearchableType(dataTypeMap(attribute)) =>
-        val quotedName = quoteAttributeNameIfNeeded(attribute)
-        val castedValue = castLiteralValue(value, dataTypeMap(attribute))
-        Some(builder.startAnd().equals(quotedName, getType(attribute), 
castedValue).end())
-
+        actionType match {
+          case _: TrimUnconvertibleFilters => Some(expression)
+          case BuildSearchArgument(builder) =>
+            val quotedName = quoteAttributeNameIfNeeded(attribute)
+            val castedValue = castLiteralValue(value, dataTypeMap(attribute))
+            builder.startAnd().equals(quotedName, getType(attribute), 
castedValue).end()
+            ()
+        }
       case EqualNullSafe(attribute, value) if 
isSearchableType(dataTypeMap(attribute)) =>
-        val quotedName = quoteAttributeNameIfNeeded(attribute)
-        val castedValue = castLiteralValue(value, dataTypeMap(attribute))
-        Some(builder.startAnd().nullSafeEquals(quotedName, getType(attribute), 
castedValue).end())
-
+        actionType match {
+          case _: TrimUnconvertibleFilters => Some(expression)
+          case BuildSearchArgument(builder) =>
+            val quotedName = quoteAttributeNameIfNeeded(attribute)
+            val castedValue = castLiteralValue(value, dataTypeMap(attribute))
+            builder.startAnd().nullSafeEquals(quotedName, getType(attribute), 
castedValue).end()
+            ()
+        }
       case LessThan(attribute, value) if 
isSearchableType(dataTypeMap(attribute)) =>
-        val quotedName = quoteAttributeNameIfNeeded(attribute)
-        val castedValue = castLiteralValue(value, dataTypeMap(attribute))
-        Some(builder.startAnd().lessThan(quotedName, getType(attribute), 
castedValue).end())
-
+        actionType match {
+          case _: TrimUnconvertibleFilters => Some(expression)
+          case BuildSearchArgument(builder) =>
+            val quotedName = quoteAttributeNameIfNeeded(attribute)
+            val castedValue = castLiteralValue(value, dataTypeMap(attribute))
+            builder.startAnd().lessThan(quotedName, getType(attribute), 
castedValue).end()
+            ()
+        }
       case LessThanOrEqual(attribute, value) if 
isSearchableType(dataTypeMap(attribute)) =>
-        val quotedName = quoteAttributeNameIfNeeded(attribute)
-        val castedValue = castLiteralValue(value, dataTypeMap(attribute))
-        Some(builder.startAnd().lessThanEquals(quotedName, getType(attribute), 
castedValue).end())
-
+        actionType match {
+          case _: TrimUnconvertibleFilters => Some(expression)
+          case BuildSearchArgument(builder) =>
+            val quotedName = quoteAttributeNameIfNeeded(attribute)
+            val castedValue = castLiteralValue(value, dataTypeMap(attribute))
+            builder.startAnd().lessThanEquals(quotedName, getType(attribute), 
castedValue).end()
+            ()
+        }
       case GreaterThan(attribute, value) if 
isSearchableType(dataTypeMap(attribute)) =>
-        val quotedName = quoteAttributeNameIfNeeded(attribute)
-        val castedValue = castLiteralValue(value, dataTypeMap(attribute))
-        Some(builder.startNot().lessThanEquals(quotedName, getType(attribute), 
castedValue).end())
-
+        actionType match {
+          case _: TrimUnconvertibleFilters => Some(expression)
+          case BuildSearchArgument(builder) =>
+            val quotedName = quoteAttributeNameIfNeeded(attribute)
+            val castedValue = castLiteralValue(value, dataTypeMap(attribute))
+            builder.startNot().lessThanEquals(quotedName, getType(attribute), 
castedValue).end()
+            ()
+        }
       case GreaterThanOrEqual(attribute, value) if 
isSearchableType(dataTypeMap(attribute)) =>
-        val quotedName = quoteAttributeNameIfNeeded(attribute)
-        val castedValue = castLiteralValue(value, dataTypeMap(attribute))
-        Some(builder.startNot().lessThan(quotedName, getType(attribute), 
castedValue).end())
-
+        actionType match {
+          case _: TrimUnconvertibleFilters => Some(expression)
+          case BuildSearchArgument(builder) =>
+            val quotedName = quoteAttributeNameIfNeeded(attribute)
+            val castedValue = castLiteralValue(value, dataTypeMap(attribute))
+            builder.startNot().lessThan(quotedName, getType(attribute), 
castedValue).end()
+            ()
+        }
       case IsNull(attribute) if isSearchableType(dataTypeMap(attribute)) =>
-        val quotedName = quoteAttributeNameIfNeeded(attribute)
-        Some(builder.startAnd().isNull(quotedName, getType(attribute)).end())
-
+        actionType match {
+          case _: TrimUnconvertibleFilters => Some(expression)
+          case BuildSearchArgument(builder) =>
+            val quotedName = quoteAttributeNameIfNeeded(attribute)
+            builder.startAnd().isNull(quotedName, getType(attribute)).end()
+            ()
+        }
       case IsNotNull(attribute) if isSearchableType(dataTypeMap(attribute)) =>
-        val quotedName = quoteAttributeNameIfNeeded(attribute)
-        Some(builder.startNot().isNull(quotedName, getType(attribute)).end())
-
+        actionType match {
+          case _: TrimUnconvertibleFilters => Some(expression)
+          case BuildSearchArgument(builder) =>
+            val quotedName = quoteAttributeNameIfNeeded(attribute)
+            builder.startNot().isNull(quotedName, getType(attribute)).end()
+            ()
+        }
       case In(attribute, values) if isSearchableType(dataTypeMap(attribute)) =>
-        val quotedName = quoteAttributeNameIfNeeded(attribute)
-        val castedValues = values.map(v => castLiteralValue(v, 
dataTypeMap(attribute)))
-        Some(builder.startAnd().in(quotedName, getType(attribute),
-          castedValues.map(_.asInstanceOf[AnyRef]): _*).end())
+        actionType match {
+          case _: TrimUnconvertibleFilters => Some(expression)
+          case BuildSearchArgument(builder) =>
+            val quotedName = quoteAttributeNameIfNeeded(attribute)
+            val castedValues = values.map(v => castLiteralValue(v, 
dataTypeMap(attribute)))
+            builder.startAnd().in(quotedName, getType(attribute),
+              castedValues.map(_.asInstanceOf[AnyRef]): _*).end()
+            ()
+        }
 
-      case _ => None
+      case _ =>
+        actionType match {
+          case _: TrimUnconvertibleFilters => None
+          case BuildSearchArgument(builder) =>
+            throw new IllegalArgumentException(s"Can't build unsupported 
filter ${expression}")
+        }
     }
   }
 }
+
diff --git 
a/sql/core/v2.3.5/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
 
b/sql/core/v2.3.5/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
index 632a72a..ebcd356 100644
--- 
a/sql/core/v2.3.5/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
+++ 
b/sql/core/v2.3.5/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
@@ -29,8 +29,9 @@ import org.apache.spark.sql.types._
 /**
  * Helper object for building ORC `SearchArgument`s, which are used for ORC 
predicate push-down.
  *
- * Due to limitation of ORC `SearchArgument` builder, we had to end up with a 
pretty weird double-
- * checking pattern when converting `And`/`Or`/`Not` filters.
+ * Due to limitation of ORC `SearchArgument` builder, we had to implement 
separate checking and
+ * conversion passes through the Filter to make sure we only convert 
predicates that are known
+ * to be convertible.
  *
  * An ORC `SearchArgument` must be built in one pass using a single builder.  
For example, you can't
  * build `a = 1` and `b = 2` first, and then combine them into `a = 1 AND b = 
2`.  This is quite
@@ -39,18 +40,18 @@ import org.apache.spark.sql.types._
  *
  * The annoying part is that, `SearchArgument` builder methods like 
`startAnd()`, `startOr()`, and
  * `startNot()` mutate internal state of the builder instance.  This forces us 
to translate all
- * convertible filters with a single builder instance. However, before 
actually converting a filter,
- * we've no idea whether it can be recognized by ORC or not. Thus, when an 
inconvertible filter is
- * found, we may already end up with a builder whose internal state is 
inconsistent.
+ * convertible filters with a single builder instance. However, if we try to 
translate a filter
+ * before checking whether it can be converted or not, we may end up with a 
builder whose internal
+ * state is inconsistent in the case of an inconvertible filter.
  *
  * For example, to convert an `And` filter with builder `b`, we call 
`b.startAnd()` first, and then
  * try to convert its children.  Say we convert `left` child successfully, but 
find that `right`
  * child is inconvertible.  Alas, `b.startAnd()` call can't be rolled back, 
and `b` is inconsistent
  * now.
  *
- * The workaround employed here is that, for `And`/`Or`/`Not`, we first try to 
convert their
- * children with brand new builders, and only do the actual conversion with 
the right builder
- * instance when the children are proven to be convertible.
+ * The workaround employed here is to trim the Spark filters before trying to 
convert them. This
+ * way, we can only do the actual conversion on the part of the Filter that is 
known to be
+ * convertible.
  *
  * P.S.: Hive seems to use `SearchArgument` together with 
`ExprNodeGenericFuncDesc` only.  Usage of
  * builder methods mentioned above can only be found in test code, where all 
tested filters are
@@ -63,11 +64,12 @@ private[sql] object OrcFilters extends OrcFiltersBase {
    */
   def createFilter(schema: StructType, filters: Seq[Filter]): 
Option[SearchArgument] = {
     val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap
+    val orcFilterConverter = new OrcFilterConverter(dataTypeMap)
     for {
-      // Combines all convertible filters using `And` to produce a single 
conjunction
-      conjunction <- buildTree(convertibleFilters(schema, dataTypeMap, 
filters))
+      // Combines all filters using `And` to produce a single conjunction
+      conjunction <- buildTree(filters)
       // Then tries to build a single ORC `SearchArgument` for the conjunction 
predicate
-      builder <- buildSearchArgument(dataTypeMap, conjunction, newBuilder)
+      builder <- orcFilterConverter.buildSearchArgument(conjunction, 
newBuilder)
     } yield builder.build()
   }
 
@@ -75,43 +77,13 @@ private[sql] object OrcFilters extends OrcFiltersBase {
       schema: StructType,
       dataTypeMap: Map[String, DataType],
       filters: Seq[Filter]): Seq[Filter] = {
-    import org.apache.spark.sql.sources._
+    val orcFilterConverter = new OrcFilterConverter(dataTypeMap)
+    filters.flatMap(orcFilterConverter.trimUnconvertibleFilters)
+  }
 
-    def convertibleFiltersHelper(
-        filter: Filter,
-        canPartialPushDown: Boolean): Option[Filter] = filter match {
-      case And(left, right) =>
-        val leftResultOptional = convertibleFiltersHelper(left, 
canPartialPushDown)
-        val rightResultOptional = convertibleFiltersHelper(right, 
canPartialPushDown)
-        (leftResultOptional, rightResultOptional) match {
-          case (Some(leftResult), Some(rightResult)) => Some(And(leftResult, 
rightResult))
-          case (Some(leftResult), None) if canPartialPushDown => 
Some(leftResult)
-          case (None, Some(rightResult)) if canPartialPushDown => 
Some(rightResult)
-          case _ => None
-        }
+}
 
-      case Or(left, right) =>
-        val leftResultOptional = convertibleFiltersHelper(left, 
canPartialPushDown)
-        val rightResultOptional = convertibleFiltersHelper(right, 
canPartialPushDown)
-        if (leftResultOptional.isEmpty || rightResultOptional.isEmpty) {
-          None
-        } else {
-          Some(Or(leftResultOptional.get, rightResultOptional.get))
-        }
-      case Not(pred) =>
-        val resultOptional = convertibleFiltersHelper(pred, canPartialPushDown 
= false)
-        resultOptional.map(Not)
-      case other =>
-        if (buildSearchArgument(dataTypeMap, other, newBuilder()).isDefined) {
-          Some(other)
-        } else {
-          None
-        }
-    }
-    filters.flatMap { filter =>
-      convertibleFiltersHelper(filter, true)
-    }
-  }
+private class OrcFilterConverter(val dataTypeMap: Map[String, DataType]) {
 
   /**
    * Get PredicateLeafType which is corresponding to the given DataType.
@@ -143,144 +115,228 @@ private[sql] object OrcFilters extends OrcFiltersBase {
     case _ => value
   }
 
+  import org.apache.spark.sql.sources._
+  import OrcFilters._
+
   /**
-   * Build a SearchArgument and return the builder so far.
+   * Builds a SearchArgument for a Filter by first trimming the 
non-convertible nodes, and then
+   * only building the remaining convertible nodes.
+   *
+   * Doing the conversion in this way avoids the computational complexity 
problems introduced by
+   * checking whether a node is convertible while building it. The approach 
implemented here has
+   * complexity that's linear in the size of the Filter tree - O(number of 
Filter nodes) - we run
+   * a single pass over the tree to trim it, and then another pass on the 
trimmed tree to convert
+   * the remaining nodes.
+   *
+   * The alternative approach of checking-while-building can (and did) result
+   * in exponential complexity in the height of the tree, causing perf 
problems with Filters with
+   * as few as ~35 nodes if they were skewed.
    */
-  private def buildSearchArgument(
-      dataTypeMap: Map[String, DataType],
+  private[sql] def buildSearchArgument(
       expression: Filter,
       builder: Builder): Option[Builder] = {
-    createBuilder(dataTypeMap, expression, builder, 
canPartialPushDownConjuncts = true)
+    trimUnconvertibleFilters(expression).map { filter =>
+      updateBuilder(filter, builder)
+      builder
+    }
   }
 
   /**
-   * @param dataTypeMap a map from the attribute name to its data type.
-   * @param expression the input filter predicates.
-   * @param builder the input SearchArgument.Builder.
-   * @param canPartialPushDownConjuncts whether a subset of conjuncts of 
predicates can be pushed
-   *                                    down safely. Pushing ONLY one side of 
AND down is safe to
-   *                                    do at the top level or none of its 
ancestors is NOT and OR.
-   * @return the builder so far.
+   * Removes all sub-Filters from a given Filter that are not convertible to 
an ORC SearchArgument.
    */
-  private def createBuilder(
-      dataTypeMap: Map[String, DataType],
-      expression: Filter,
-      builder: Builder,
-      canPartialPushDownConjuncts: Boolean): Option[Builder] = {
+  private[sql] def trimUnconvertibleFilters(expression: Filter): 
Option[Filter] = {
+    performAction(TrimUnconvertibleFilters(canPartialPushDownConjuncts = 
true), expression)
+  }
+
+  /**
+   * Builds a SearchArgument for the given Filter. This method should only be 
called on Filters
+   * that have previously been trimmed to remove unsupported sub-Filters!
+   */
+  private def updateBuilder(expression: Filter, builder: Builder): Unit =
+    performAction(BuildSearchArgument(builder), expression)
+
+  sealed trait ActionType[ReturnType]
+  case class TrimUnconvertibleFilters(canPartialPushDownConjuncts: Boolean)
+      extends ActionType[Option[Filter]]
+  case class BuildSearchArgument(builder: Builder) extends ActionType[Unit]
+
+  // The performAction method can run both the filtering and building 
operations for a given
+  // node - we signify which one we want with the `actionType` parameter.
+  //
+  // There are a couple of benefits to coupling the two operations like this:
+  // 1. All the logic for a given predicate is grouped logically in the same 
place. You don't
+  //   have to scroll across the whole file to see what the filter action for 
an And is while
+  //   you're looking at the build action.
+  // 2. It's much easier to keep the implementations of the two operations 
up-to-date with
+  //   each other. If the `filter` and `build` operations are implemented as 
separate case-matches
+  //   in different methods, it's very easy to change one without 
appropriately updating the
+  //   other. For example, if we add a new supported node type to `filter`, it 
would be very
+  //   easy to forget to update `build` to support it too, thus leading to 
conversion errors.
+  private def performAction[ReturnType](
+      actionType: ActionType[ReturnType],
+      expression: Filter): ReturnType = {
     def getType(attribute: String): PredicateLeaf.Type =
       getPredicateLeafType(dataTypeMap(attribute))
 
-    import org.apache.spark.sql.sources._
-
     expression match {
       case And(left, right) =>
-        // At here, it is not safe to just convert one side and remove the 
other side
-        // if we do not understand what the parent filters are.
-        //
-        // Here is an example used to explain the reason.
-        // Let's say we have NOT(a = 2 AND b in ('1')) and we do not 
understand how to
-        // convert b in ('1'). If we only convert a = 2, we will end up with a 
filter
-        // NOT(a = 2), which will generate wrong results.
-        //
-        // Pushing one side of AND down is only safe to do at the top level or 
in the child
-        // AND before hitting NOT or OR conditions, and in this case, the 
unsupported predicate
-        // can be safely removed.
-        val leftBuilderOption =
-          createBuilder(dataTypeMap, left, newBuilder, 
canPartialPushDownConjuncts)
-        val rightBuilderOption =
-          createBuilder(dataTypeMap, right, newBuilder, 
canPartialPushDownConjuncts)
-        (leftBuilderOption, rightBuilderOption) match {
-          case (Some(_), Some(_)) =>
-            for {
-              lhs <- createBuilder(dataTypeMap, left,
-                builder.startAnd(), canPartialPushDownConjuncts)
-              rhs <- createBuilder(dataTypeMap, right, lhs, 
canPartialPushDownConjuncts)
-            } yield rhs.end()
-
-          case (Some(_), None) if canPartialPushDownConjuncts =>
-            createBuilder(dataTypeMap, left, builder, 
canPartialPushDownConjuncts)
-
-          case (None, Some(_)) if canPartialPushDownConjuncts =>
-            createBuilder(dataTypeMap, right, builder, 
canPartialPushDownConjuncts)
-
-          case _ => None
+        actionType match {
+          case t @ TrimUnconvertibleFilters(canPartialPushDownConjuncts) =>
+            // At here, it is not safe to just keep one side and remove the 
other side
+            // if we do not understand what the parent filters are.
+            //
+            // Here is an example used to explain the reason.
+            // Let's say we have NOT(a = 2 AND b in ('1')) and we do not 
understand how to
+            // convert b in ('1'). If we only convert a = 2, we will end up 
with a filter
+            // NOT(a = 2), which will generate wrong results.
+            //
+            // Pushing one side of AND down is only safe to do at the top 
level or in the child
+            // AND before hitting NOT or OR conditions, and in this case, the 
unsupported
+            // predicate can be safely removed.
+            val lhs = performAction(t, left)
+            val rhs = performAction(t, right)
+            (lhs, rhs) match {
+              case (Some(l), Some(r)) => Some(And(l, r))
+              case (Some(_), None) if canPartialPushDownConjuncts => lhs
+              case (None, Some(_)) if canPartialPushDownConjuncts => rhs
+              case _ => None
+            }
+          case b @ BuildSearchArgument(builder) =>
+            builder.startAnd()
+            performAction(b, left)
+            performAction(b, right)
+            builder.end()
+            ()
         }
 
       case Or(left, right) =>
-        // The Or predicate is convertible when both of its children can be 
pushed down.
-        // That is to say, if one/both of the children can be partially pushed 
down, the Or
-        // predicate can be partially pushed down as well.
-        //
-        // Here is an example used to explain the reason.
-        // Let's say we have
-        // (a1 AND a2) OR (b1 AND b2),
-        // a1 and b1 is convertible, while a2 and b2 is not.
-        // The predicate can be converted as
-        // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2)
-        // As per the logical in And predicate, we can push down (a1 OR b1).
-        for {
-          _ <- createBuilder(dataTypeMap, left, newBuilder, 
canPartialPushDownConjuncts)
-          _ <- createBuilder(dataTypeMap, right, newBuilder, 
canPartialPushDownConjuncts)
-          lhs <- createBuilder(dataTypeMap, left, builder.startOr(), 
canPartialPushDownConjuncts)
-          rhs <- createBuilder(dataTypeMap, right, lhs, 
canPartialPushDownConjuncts)
-        } yield rhs.end()
+        actionType match {
+          case t: TrimUnconvertibleFilters =>
+            // The Or predicate is convertible when both of its children can 
be pushed down.
+            // That is to say, if one/both of the children can be partially 
pushed down, the Or
+            // predicate can be partially pushed down as well.
+            //
+            // Here is an example used to explain the reason.
+            // Let's say we have
+            // (a1 AND a2) OR (b1 AND b2),
+            // a1 and b1 is convertible, while a2 and b2 is not.
+            // The predicate can be converted as
+            // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2)
+            // As per the logical in And predicate, we can push down (a1 OR 
b1).
+            for {
+              lhs: Filter <- performAction(t, left)
+              rhs: Filter <- performAction(t, right)
+            } yield Or(lhs, rhs)
+          case b @ BuildSearchArgument(builder) =>
+            builder.startOr()
+            performAction(b, left)
+            performAction(b, right)
+            builder.end()
+            ()
+        }
 
       case Not(child) =>
-        for {
-          _ <- createBuilder(dataTypeMap, child, newBuilder, 
canPartialPushDownConjuncts = false)
-          negate <- createBuilder(dataTypeMap,
-            child, builder.startNot(), canPartialPushDownConjuncts = false)
-        } yield negate.end()
+        actionType match {
+          case t: TrimUnconvertibleFilters =>
+            performAction(t.copy(canPartialPushDownConjuncts = false), 
child).map(Not)
+          case b @ BuildSearchArgument(builder) =>
+            builder.startNot()
+            performAction(b, child)
+            builder.end()
+            ()
+        }
 
-      // NOTE: For all case branches dealing with leaf predicates below, the 
additional `startAnd()`
-      // call is mandatory.  ORC `SearchArgument` builder requires that all 
leaf predicates must be
-      // wrapped by a "parent" predicate (`And`, `Or`, or `Not`).
+      // NOTE: For all case branches dealing with leaf predicates below, the 
additional
+      // `startAnd()` call is mandatory.  ORC `SearchArgument` builder 
requires that all leaf
+      // predicates must be wrapped by a "parent" predicate (`And`, `Or`, or 
`Not`).
 
       case EqualTo(attribute, value) if 
isSearchableType(dataTypeMap(attribute)) =>
-        val quotedName = quoteAttributeNameIfNeeded(attribute)
-        val castedValue = castLiteralValue(value, dataTypeMap(attribute))
-        Some(builder.startAnd().equals(quotedName, getType(attribute), 
castedValue).end())
-
+        actionType match {
+          case _: TrimUnconvertibleFilters => Some(expression)
+          case BuildSearchArgument(builder) =>
+            val quotedName = quoteAttributeNameIfNeeded(attribute)
+            val castedValue = castLiteralValue(value, dataTypeMap(attribute))
+            builder.startAnd().equals(quotedName, getType(attribute), 
castedValue).end()
+            ()
+        }
       case EqualNullSafe(attribute, value) if 
isSearchableType(dataTypeMap(attribute)) =>
-        val quotedName = quoteAttributeNameIfNeeded(attribute)
-        val castedValue = castLiteralValue(value, dataTypeMap(attribute))
-        Some(builder.startAnd().nullSafeEquals(quotedName, getType(attribute), 
castedValue).end())
-
+        actionType match {
+          case _: TrimUnconvertibleFilters => Some(expression)
+          case BuildSearchArgument(builder) =>
+            val quotedName = quoteAttributeNameIfNeeded(attribute)
+            val castedValue = castLiteralValue(value, dataTypeMap(attribute))
+            builder.startAnd().nullSafeEquals(quotedName, getType(attribute), 
castedValue).end()
+            ()
+        }
       case LessThan(attribute, value) if 
isSearchableType(dataTypeMap(attribute)) =>
-        val quotedName = quoteAttributeNameIfNeeded(attribute)
-        val castedValue = castLiteralValue(value, dataTypeMap(attribute))
-        Some(builder.startAnd().lessThan(quotedName, getType(attribute), 
castedValue).end())
-
+        actionType match {
+          case _: TrimUnconvertibleFilters => Some(expression)
+          case BuildSearchArgument(builder) =>
+            val quotedName = quoteAttributeNameIfNeeded(attribute)
+            val castedValue = castLiteralValue(value, dataTypeMap(attribute))
+            builder.startAnd().lessThan(quotedName, getType(attribute), 
castedValue).end()
+            ()
+        }
       case LessThanOrEqual(attribute, value) if 
isSearchableType(dataTypeMap(attribute)) =>
-        val quotedName = quoteAttributeNameIfNeeded(attribute)
-        val castedValue = castLiteralValue(value, dataTypeMap(attribute))
-        Some(builder.startAnd().lessThanEquals(quotedName, getType(attribute), 
castedValue).end())
-
+        actionType match {
+          case _: TrimUnconvertibleFilters => Some(expression)
+          case BuildSearchArgument(builder) =>
+            val quotedName = quoteAttributeNameIfNeeded(attribute)
+            val castedValue = castLiteralValue(value, dataTypeMap(attribute))
+            builder.startAnd().lessThanEquals(quotedName, getType(attribute), 
castedValue).end()
+            ()
+        }
       case GreaterThan(attribute, value) if 
isSearchableType(dataTypeMap(attribute)) =>
-        val quotedName = quoteAttributeNameIfNeeded(attribute)
-        val castedValue = castLiteralValue(value, dataTypeMap(attribute))
-        Some(builder.startNot().lessThanEquals(quotedName, getType(attribute), 
castedValue).end())
-
+        actionType match {
+          case _: TrimUnconvertibleFilters => Some(expression)
+          case BuildSearchArgument(builder) =>
+            val quotedName = quoteAttributeNameIfNeeded(attribute)
+            val castedValue = castLiteralValue(value, dataTypeMap(attribute))
+            builder.startNot().lessThanEquals(quotedName, getType(attribute), 
castedValue).end()
+            ()
+        }
       case GreaterThanOrEqual(attribute, value) if 
isSearchableType(dataTypeMap(attribute)) =>
-        val quotedName = quoteAttributeNameIfNeeded(attribute)
-        val castedValue = castLiteralValue(value, dataTypeMap(attribute))
-        Some(builder.startNot().lessThan(quotedName, getType(attribute), 
castedValue).end())
-
+        actionType match {
+          case _: TrimUnconvertibleFilters => Some(expression)
+          case BuildSearchArgument(builder) =>
+            val quotedName = quoteAttributeNameIfNeeded(attribute)
+            val castedValue = castLiteralValue(value, dataTypeMap(attribute))
+            builder.startNot().lessThan(quotedName, getType(attribute), 
castedValue).end()
+            ()
+        }
       case IsNull(attribute) if isSearchableType(dataTypeMap(attribute)) =>
-        val quotedName = quoteAttributeNameIfNeeded(attribute)
-        Some(builder.startAnd().isNull(quotedName, getType(attribute)).end())
-
+        actionType match {
+          case _: TrimUnconvertibleFilters => Some(expression)
+          case BuildSearchArgument(builder) =>
+            val quotedName = quoteAttributeNameIfNeeded(attribute)
+            builder.startAnd().isNull(quotedName, getType(attribute)).end()
+            ()
+        }
       case IsNotNull(attribute) if isSearchableType(dataTypeMap(attribute)) =>
-        val quotedName = quoteAttributeNameIfNeeded(attribute)
-        Some(builder.startNot().isNull(quotedName, getType(attribute)).end())
-
+        actionType match {
+          case _: TrimUnconvertibleFilters => Some(expression)
+          case BuildSearchArgument(builder) =>
+            val quotedName = quoteAttributeNameIfNeeded(attribute)
+            builder.startNot().isNull(quotedName, getType(attribute)).end()
+            ()
+        }
       case In(attribute, values) if isSearchableType(dataTypeMap(attribute)) =>
-        val quotedName = quoteAttributeNameIfNeeded(attribute)
-        val castedValues = values.map(v => castLiteralValue(v, 
dataTypeMap(attribute)))
-        Some(builder.startAnd().in(quotedName, getType(attribute),
-          castedValues.map(_.asInstanceOf[AnyRef]): _*).end())
+        actionType match {
+          case _: TrimUnconvertibleFilters => Some(expression)
+          case BuildSearchArgument(builder) =>
+            val quotedName = quoteAttributeNameIfNeeded(attribute)
+            val castedValues = values.map(v => castLiteralValue(v, 
dataTypeMap(attribute)))
+            builder.startAnd().in(quotedName, getType(attribute),
+              castedValues.map(_.asInstanceOf[AnyRef]): _*).end()
+            ()
+        }
 
-      case _ => None
+      case _ =>
+        actionType match {
+          case _: TrimUnconvertibleFilters => None
+          case BuildSearchArgument(builder) =>
+            throw new IllegalArgumentException(s"Can't build unsupported 
filter ${expression}")
+        }
     }
   }
 }
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
index 3bfe157..cee8fdb 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
@@ -62,176 +62,260 @@ import org.apache.spark.sql.types._
  */
 private[orc] object OrcFilters extends Logging {
 
-  private def findMethod(klass: Class[_], name: String, args: Class[_]*): 
Method = {
-    val method = klass.getMethod(name, args: _*)
-    method.setAccessible(true)
-    method
-  }
-
   def createFilter(schema: StructType, filters: Array[Filter]): 
Option[SearchArgument] = {
     if (HiveUtils.isHive23) {
       DatasourceOrcFilters.createFilter(schema, 
filters).asInstanceOf[Option[SearchArgument]]
     } else {
       val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap
-
-      // First, tries to convert each filter individually to see whether it's 
convertible, and then
-      // collect all convertible ones to build the final `SearchArgument`.
-      val convertibleFilters = for {
-        filter <- filters
-        _ <- buildSearchArgument(dataTypeMap, filter, newBuilder)
-      } yield filter
-
+      val orcFilterConverter = new OrcFilterConverter(dataTypeMap)
       for {
-        // Combines all convertible filters using `And` to produce a single 
conjunction
-        conjunction <- buildTree(convertibleFilters)
+        // Combines all filters using `And` to produce a single conjunction
+        conjunction <- buildTree(filters)
         // Then tries to build a single ORC `SearchArgument` for the 
conjunction predicate
-        builder <- buildSearchArgument(dataTypeMap, conjunction, newBuilder)
+        builder <- orcFilterConverter.buildSearchArgument(conjunction, 
newBuilder)
       } yield builder.build()
     }
   }
+}
 
-  private def buildSearchArgument(
-      dataTypeMap: Map[String, DataType],
-      expression: Filter,
-      builder: Builder): Option[Builder] = {
-    createBuilder(dataTypeMap, expression, builder, 
canPartialPushDownConjuncts = true)
+private class OrcFilterConverter(val dataTypeMap: Map[String, DataType]) {
+
+  def isSearchableType(dataType: DataType): Boolean = dataType match {
+    // Only the values in the Spark types below can be recognized by
+    // the `SearchArgumentImpl.BuilderImpl.boxLiteral()` method.
+    case ByteType | ShortType | FloatType | DoubleType => true
+    case IntegerType | LongType | StringType | BooleanType => true
+    case TimestampType | _: DecimalType => true
+    case _ => false
   }
 
+  private def findMethod(klass: Class[_], name: String, args: Class[_]*): 
Method = {
+    val method = klass.getMethod(name, args: _*)
+    method.setAccessible(true)
+    method
+  }
+
+  import org.apache.spark.sql.sources._
+
   /**
-   * @param dataTypeMap a map from the attribute name to its data type.
-   * @param expression the input filter predicates.
-   * @param builder the input SearchArgument.Builder.
-   * @param canPartialPushDownConjuncts whether a subset of conjuncts of 
predicates can be pushed
-   *                                    down safely. Pushing ONLY one side of 
AND down is safe to
-   *                                    do at the top level or none of its 
ancestors is NOT and OR.
-   * @return the builder so far.
+   * Builds a SearchArgument for a Filter by first trimming the 
non-convertible nodes, and then
+   * only building the remaining convertible nodes.
+   *
+   * Doing the conversion in this way avoids the computational complexity 
problems introduced by
+   * checking whether a node is convertible while building it. The approach 
implemented here has
+   * complexity that's linear in the size of the Filter tree - O(number of 
Filter nodes) - we run
+   * a single pass over the tree to trim it, and then another pass on the 
trimmed tree to convert
+   * the remaining nodes.
+   *
+   * The alternative approach of checking-while-building can (and did) result
+   * in exponential complexity in the height of the tree, causing perf 
problems with Filters with
+   * as few as ~35 nodes if they were skewed.
    */
-  private def createBuilder(
-      dataTypeMap: Map[String, DataType],
+  private[sql] def buildSearchArgument(
       expression: Filter,
-      builder: Builder,
-      canPartialPushDownConjuncts: Boolean): Option[Builder] = {
-    def isSearchableType(dataType: DataType): Boolean = dataType match {
-      // Only the values in the Spark types below can be recognized by
-      // the `SearchArgumentImpl.BuilderImpl.boxLiteral()` method.
-      case ByteType | ShortType | FloatType | DoubleType => true
-      case IntegerType | LongType | StringType | BooleanType => true
-      case TimestampType | _: DecimalType => true
-      case _ => false
+      builder: Builder): Option[Builder] = {
+    trimUnconvertibleFilters(expression).map { filter =>
+      updateBuilder(filter, builder)
+      builder
     }
+  }
 
-    expression match {
-      case And(left, right) =>
-        // At here, it is not safe to just convert one side and remove the 
other side
-        // if we do not understand what the parent filters are.
-        //
-        // Here is an example used to explain the reason.
-        // Let's say we have NOT(a = 2 AND b in ('1')) and we do not 
understand how to
-        // convert b in ('1'). If we only convert a = 2, we will end up with a 
filter
-        // NOT(a = 2), which will generate wrong results.
-        //
-        // Pushing one side of AND down is only safe to do at the top level or 
in the child
-        // AND before hitting NOT or OR conditions, and in this case, the 
unsupported predicate
-        // can be safely removed.
-        val leftBuilderOption =
-          createBuilder(dataTypeMap, left, newBuilder, 
canPartialPushDownConjuncts)
-        val rightBuilderOption =
-          createBuilder(dataTypeMap, right, newBuilder, 
canPartialPushDownConjuncts)
-        (leftBuilderOption, rightBuilderOption) match {
-          case (Some(_), Some(_)) =>
-            for {
-              lhs <- createBuilder(dataTypeMap, left,
-                builder.startAnd(), canPartialPushDownConjuncts)
-              rhs <- createBuilder(dataTypeMap, right, lhs, 
canPartialPushDownConjuncts)
-            } yield rhs.end()
+  /**
+   * Removes all sub-Filters from a given Filter that are not convertible to 
an ORC SearchArgument.
+   */
+  private[sql] def trimUnconvertibleFilters(expression: Filter): 
Option[Filter] = {
+    performAction(TrimUnconvertibleFilters(canPartialPushDownConjuncts = 
true), expression)
+  }
 
-          case (Some(_), None) if canPartialPushDownConjuncts =>
-            createBuilder(dataTypeMap, left, builder, 
canPartialPushDownConjuncts)
+  /**
+   * Builds a SearchArgument for the given Filter. This method should only be 
called on Filters
+   * that have previously been trimmed to remove unsupported sub-Filters!
+   */
+  private def updateBuilder(expression: Filter, builder: Builder): Unit =
+    performAction(BuildSearchArgument(builder), expression)
 
-          case (None, Some(_)) if canPartialPushDownConjuncts =>
-            createBuilder(dataTypeMap, right, builder, 
canPartialPushDownConjuncts)
+  sealed trait ActionType[ReturnType]
+  case class TrimUnconvertibleFilters(canPartialPushDownConjuncts: Boolean)
+      extends ActionType[Option[Filter]]
+  case class BuildSearchArgument(builder: Builder) extends ActionType[Unit]
 
-          case _ => None
+  // The performAction method can run both the filtering and building 
operations for a given
+  // node - we signify which one we want with the `actionType` parameter.
+  //
+  // There are a couple of benefits to coupling the two operations like this:
+  // 1. All the logic for a given predicate is grouped logically in the same 
place. You don't
+  //   have to scroll across the whole file to see what the filter action for 
an And is while
+  //   you're looking at the build action.
+  // 2. It's much easier to keep the implementations of the two operations 
up-to-date with
+  //   each other. If the `filter` and `build` operations are implemented as 
separate case-matches
+  //   in different methods, it's very easy to change one without 
appropriately updating the
+  //   other. For example, if we add a new supported node type to `filter`, it 
would be very
+  //   easy to forget to update `build` to support it too, thus leading to 
conversion errors.
+  private def performAction[ReturnType](
+      actionType: ActionType[ReturnType],
+      expression: Filter): ReturnType = {
+
+    expression match {
+      case And(left, right) =>
+        actionType match {
+          case t @ TrimUnconvertibleFilters(canPartialPushDownConjuncts) =>
+            // At here, it is not safe to just keep one side and remove the 
other side
+            // if we do not understand what the parent filters are.
+            //
+            // Here is an example used to explain the reason.
+            // Let's say we have NOT(a = 2 AND b in ('1')) and we do not 
understand how to
+            // convert b in ('1'). If we only convert a = 2, we will end up 
with a filter
+            // NOT(a = 2), which will generate wrong results.
+            //
+            // Pushing one side of AND down is only safe to do at the top 
level or in the child
+            // AND before hitting NOT or OR conditions, and in this case, the 
unsupported
+            // predicate can be safely removed.
+            val lhs = performAction(t, left)
+            val rhs = performAction(t, right)
+            (lhs, rhs) match {
+              case (Some(l), Some(r)) => Some(And(l, r))
+              case (Some(_), None) if canPartialPushDownConjuncts => lhs
+              case (None, Some(_)) if canPartialPushDownConjuncts => rhs
+              case _ => None
+            }
+          case b @ BuildSearchArgument(builder) =>
+            builder.startAnd()
+            performAction(b, left)
+            performAction(b, right)
+            builder.end()
+            ()
         }
 
       case Or(left, right) =>
-        // The Or predicate is convertible when both of its children can be 
pushed down.
-        // That is to say, if one/both of the children can be partially pushed 
down, the Or
-        // predicate can be partially pushed down as well.
-        //
-        // Here is an example used to explain the reason.
-        // Let's say we have
-        // (a1 AND a2) OR (b1 AND b2),
-        // a1 and b1 is convertible, while a2 and b2 is not.
-        // The predicate can be converted as
-        // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2)
-        // As per the logical in And predicate, we can push down (a1 OR b1).
-        for {
-          _ <- createBuilder(dataTypeMap, left, newBuilder, 
canPartialPushDownConjuncts)
-          _ <- createBuilder(dataTypeMap, right, newBuilder, 
canPartialPushDownConjuncts)
-          lhs <- createBuilder(dataTypeMap, left,
-            builder.startOr(), canPartialPushDownConjuncts)
-          rhs <- createBuilder(dataTypeMap, right, lhs, 
canPartialPushDownConjuncts)
-        } yield rhs.end()
+        actionType match {
+          case t: TrimUnconvertibleFilters =>
+            // The Or predicate is convertible when both of its children can 
be pushed down.
+            // That is to say, if one/both of the children can be partially 
pushed down, the Or
+            // predicate can be partially pushed down as well.
+            //
+            // Here is an example used to explain the reason.
+            // Let's say we have
+            // (a1 AND a2) OR (b1 AND b2),
+            // a1 and b1 is convertible, while a2 and b2 is not.
+            // The predicate can be converted as
+            // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2)
+            // As per the logical in And predicate, we can push down (a1 OR 
b1).
+            for {
+              lhs: Filter <- performAction(t, left)
+              rhs: Filter <- performAction(t, right)
+            } yield Or(lhs, rhs)
+          case b @ BuildSearchArgument(builder) =>
+            builder.startOr()
+            performAction(b, left)
+            performAction(b, right)
+            builder.end()
+            ()
+        }
 
       case Not(child) =>
-        for {
-          _ <- createBuilder(dataTypeMap, child, newBuilder, 
canPartialPushDownConjuncts = false)
-          negate <- createBuilder(dataTypeMap,
-            child, builder.startNot(), canPartialPushDownConjuncts = false)
-        } yield negate.end()
+        actionType match {
+          case t: TrimUnconvertibleFilters =>
+            performAction(t.copy(canPartialPushDownConjuncts = false), 
child).map(Not)
+          case b @ BuildSearchArgument(builder) =>
+            builder.startNot()
+            performAction(b, child)
+            builder.end()
+            ()
+        }
 
-      // NOTE: For all case branches dealing with leaf predicates below, the 
additional `startAnd()`
-      // call is mandatory.  ORC `SearchArgument` builder requires that all 
leaf predicates must be
-      // wrapped by a "parent" predicate (`And`, `Or`, or `Not`).
+      // NOTE: For all case branches dealing with leaf predicates below, the 
additional
+      // `startAnd()` call is mandatory.  ORC `SearchArgument` builder 
requires that all leaf
+      // predicates must be wrapped by a "parent" predicate (`And`, `Or`, or 
`Not`).
 
       case EqualTo(attribute, value) if 
isSearchableType(dataTypeMap(attribute)) =>
-        val bd = builder.startAnd()
-        val method = findMethod(bd.getClass, "equals", classOf[String], 
classOf[Object])
-        Some(method.invoke(bd, attribute, 
value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end())
-
+        actionType match {
+          case _: TrimUnconvertibleFilters => Some(expression)
+          case BuildSearchArgument(builder) =>
+            val bd = builder.startAnd()
+            val method = findMethod(bd.getClass, "equals", classOf[String], 
classOf[Object])
+            method.invoke(bd, attribute, 
value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end()
+            ()
+        }
       case EqualNullSafe(attribute, value) if 
isSearchableType(dataTypeMap(attribute)) =>
-        val bd = builder.startAnd()
-        val method = findMethod(bd.getClass, "nullSafeEquals", 
classOf[String], classOf[Object])
-        Some(method.invoke(bd, attribute, 
value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end())
-
+        actionType match {
+          case _: TrimUnconvertibleFilters => Some(expression)
+          case BuildSearchArgument(builder) =>
+            val bd = builder.startAnd()
+            val method = findMethod(bd.getClass, "nullSafeEquals", 
classOf[String], classOf[Object])
+            method.invoke(bd, attribute, 
value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end()
+            ()
+        }
       case LessThan(attribute, value) if 
isSearchableType(dataTypeMap(attribute)) =>
-        val bd = builder.startAnd()
-        val method = findMethod(bd.getClass, "lessThan", classOf[String], 
classOf[Object])
-        Some(method.invoke(bd, attribute, 
value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end())
-
+        actionType match {
+          case _: TrimUnconvertibleFilters => Some(expression)
+          case BuildSearchArgument(builder) =>
+            val bd = builder.startAnd()
+            val method = findMethod(bd.getClass, "lessThan", classOf[String], 
classOf[Object])
+            method.invoke(bd, attribute, 
value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end()
+            ()
+        }
       case LessThanOrEqual(attribute, value) if 
isSearchableType(dataTypeMap(attribute)) =>
-        val bd = builder.startAnd()
-        val method = findMethod(bd.getClass, "lessThanEquals", 
classOf[String], classOf[Object])
-        Some(method.invoke(bd, attribute, 
value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end())
-
+        actionType match {
+          case _: TrimUnconvertibleFilters => Some(expression)
+          case BuildSearchArgument(builder) =>
+            val bd = builder.startAnd()
+            val method = findMethod(bd.getClass, "lessThanEquals", 
classOf[String], classOf[Object])
+            method.invoke(bd, attribute, 
value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end()
+            ()
+        }
       case GreaterThan(attribute, value) if 
isSearchableType(dataTypeMap(attribute)) =>
-        val bd = builder.startNot()
-        val method = findMethod(bd.getClass, "lessThanEquals", 
classOf[String], classOf[Object])
-        Some(method.invoke(bd, attribute, 
value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end())
-
+        actionType match {
+          case _: TrimUnconvertibleFilters => Some(expression)
+          case BuildSearchArgument(builder) =>
+            val bd = builder.startNot()
+            val method = findMethod(bd.getClass, "lessThanEquals", 
classOf[String], classOf[Object])
+            method.invoke(bd, attribute, 
value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end()
+            ()
+        }
       case GreaterThanOrEqual(attribute, value) if 
isSearchableType(dataTypeMap(attribute)) =>
-        val bd = builder.startNot()
-        val method = findMethod(bd.getClass, "lessThan", classOf[String], 
classOf[Object])
-        Some(method.invoke(bd, attribute, 
value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end())
-
+        actionType match {
+          case _: TrimUnconvertibleFilters => Some(expression)
+          case BuildSearchArgument(builder) =>
+            val bd = builder.startNot()
+            val method = findMethod(bd.getClass, "lessThan", classOf[String], 
classOf[Object])
+            method.invoke(bd, attribute, 
value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end()
+            ()
+        }
       case IsNull(attribute) if isSearchableType(dataTypeMap(attribute)) =>
-        val bd = builder.startAnd()
-        val method = findMethod(bd.getClass, "isNull", classOf[String])
-        Some(method.invoke(bd, attribute).asInstanceOf[Builder].end())
-
+        actionType match {
+          case _: TrimUnconvertibleFilters => Some(expression)
+          case BuildSearchArgument(builder) =>
+            val bd = builder.startAnd()
+            val method = findMethod(bd.getClass, "isNull", classOf[String])
+            method.invoke(bd, attribute).asInstanceOf[Builder].end()
+            ()
+        }
       case IsNotNull(attribute) if isSearchableType(dataTypeMap(attribute)) =>
-        val bd = builder.startNot()
-        val method = findMethod(bd.getClass, "isNull", classOf[String])
-        Some(method.invoke(bd, attribute).asInstanceOf[Builder].end())
-
+        actionType match {
+          case _: TrimUnconvertibleFilters => Some(expression)
+          case BuildSearchArgument(builder) =>
+            val bd = builder.startNot()
+            val method = findMethod(bd.getClass, "isNull", classOf[String])
+            method.invoke(bd, attribute).asInstanceOf[Builder].end()
+            ()
+        }
       case In(attribute, values) if isSearchableType(dataTypeMap(attribute)) =>
-        val bd = builder.startAnd()
-        val method = findMethod(bd.getClass, "in", classOf[String], 
classOf[Array[Object]])
-        Some(method.invoke(bd, attribute, values.map(_.asInstanceOf[AnyRef]))
-          .asInstanceOf[Builder].end())
+        actionType match {
+          case _: TrimUnconvertibleFilters => Some(expression)
+          case BuildSearchArgument(builder) =>
+            val bd = builder.startAnd()
+            val method = findMethod(bd.getClass, "in", classOf[String], 
classOf[Array[Object]])
+            method.invoke(bd, attribute, values.map(_.asInstanceOf[AnyRef]))
+                .asInstanceOf[Builder].end()
+            ()
+        }
 
-      case _ => None
+      case _ =>
+        actionType match {
+          case _: TrimUnconvertibleFilters => None
+          case BuildSearchArgument(builder) =>
+            throw new IllegalArgumentException(s"Can't build unsupported 
filter ${expression}")
+        }
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to