cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] 
Optimize away exponential complexity in ORC predicate conversion
URL: https://github.com/apache/spark/pull/24068#discussion_r289027257
 
 

 ##########
 File path: 
sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
 ##########
 @@ -143,145 +115,231 @@ private[sql] object OrcFilters extends OrcFiltersBase {
     case _ => value
   }
 
+  import org.apache.spark.sql.sources._
+  import OrcFilters._
+
+  private[sql] def trimUnconvertibleFilters(expression: Filter): 
Option[Filter] = {
+    performFilter(expression, canPartialPushDownConjuncts = true)
+  }
+
   /**
-   * Build a SearchArgument and return the builder so far.
+   * Builds a SearchArgument for a Filter by first trimming the 
non-convertible nodes, and then
+   * only building the remaining convertible nodes.
+   *
+   * Doing the conversion in this way avoids the computational complexity 
problems introduced by
+   * checking whether a node is convertible while building it. The approach 
implemented here has
+   * complexity that's linear in the size of the Filter tree - O(number of 
Filter nodes) - we run
+   * a single pass over the tree to trim it, and then another pass on the 
trimmed tree to convert
+   * the remaining nodes.
+   *
+   * The alternative approach of checking-while-building can (and did) result
+   * in exponential complexity in the height of the tree, causing perf 
problems with Filters with
+   * as few as ~35 nodes if they were skewed.
    */
-  private def buildSearchArgument(
-      dataTypeMap: Map[String, DataType],
+  private[sql] def buildSearchArgument(
       expression: Filter,
       builder: Builder): Option[Builder] = {
-    createBuilder(dataTypeMap, expression, builder, 
canPartialPushDownConjuncts = true)
+    trimUnconvertibleFilters(expression).map { filter =>
+      updateBuilder(filter, builder)
+      builder
+    }
   }
 
-  /**
-   * @param dataTypeMap a map from the attribute name to its data type.
-   * @param expression the input filter predicates.
-   * @param builder the input SearchArgument.Builder.
-   * @param canPartialPushDownConjuncts whether a subset of conjuncts of 
predicates can be pushed
-   *                                    down safely. Pushing ONLY one side of 
AND down is safe to
-   *                                    do at the top level or none of its 
ancestors is NOT and OR.
-   * @return the builder so far.
-   */
-  private def createBuilder(
-      dataTypeMap: Map[String, DataType],
-      expression: Filter,
-      builder: Builder,
-      canPartialPushDownConjuncts: Boolean): Option[Builder] = {
+  sealed trait ActionType[ReturnType]
+  case class TrimUnconvertibleFilters(canPartialPushDownConjuncts: Boolean)
+      extends ActionType[Option[Filter]]
+  case class BuildSearchArgument(builder: Builder) extends ActionType[Unit]
+
+  // The performAction method can run both the filtering and building 
operations for a given
+  // node - we signify which one we want with the `actionType` parameter.
+  //
+  // There are a couple of benefits to coupling the two operations like this:
+  // 1. All the logic for a given predicate is grouped logically in the same 
place. You don't
+  //   have to scroll across the whole file to see what the filter action for 
an And is while
+  //   you're looking at the build action.
+  // 2. It's much easier to keep the implementations of the two operations 
up-to-date with
+  //   each other. If the `filter` and `build` operations are implemented as 
separate case-matches
+  //   in different methods, it's very easy to change one without 
appropriately updating the
+  //   other. For example, if we add a new supported node type to `filter`, it 
would be very
+  //   easy to forget to update `build` to support it too, thus leading to 
conversion errors.
+  private def performAction[ReturnType](
+      actionType: ActionType[ReturnType],
+      expression: Filter): ReturnType = {
     def getType(attribute: String): PredicateLeaf.Type =
       getPredicateLeafType(dataTypeMap(attribute))
 
-    import org.apache.spark.sql.sources._
-
     expression match {
       case And(left, right) =>
-        // At here, it is not safe to just convert one side and remove the 
other side
-        // if we do not understand what the parent filters are.
-        //
-        // Here is an example used to explain the reason.
-        // Let's say we have NOT(a = 2 AND b in ('1')) and we do not 
understand how to
-        // convert b in ('1'). If we only convert a = 2, we will end up with a 
filter
-        // NOT(a = 2), which will generate wrong results.
-        //
-        // Pushing one side of AND down is only safe to do at the top level or 
in the child
-        // AND before hitting NOT or OR conditions, and in this case, the 
unsupported predicate
-        // can be safely removed.
-        val leftBuilderOption =
-          createBuilder(dataTypeMap, left, newBuilder, 
canPartialPushDownConjuncts)
-        val rightBuilderOption =
-          createBuilder(dataTypeMap, right, newBuilder, 
canPartialPushDownConjuncts)
-        (leftBuilderOption, rightBuilderOption) match {
-          case (Some(_), Some(_)) =>
-            for {
-              lhs <- createBuilder(dataTypeMap, left,
-                builder.startAnd(), canPartialPushDownConjuncts)
-              rhs <- createBuilder(dataTypeMap, right, lhs, 
canPartialPushDownConjuncts)
-            } yield rhs.end()
-
-          case (Some(_), None) if canPartialPushDownConjuncts =>
-            createBuilder(dataTypeMap, left, builder, 
canPartialPushDownConjuncts)
-
-          case (None, Some(_)) if canPartialPushDownConjuncts =>
-            createBuilder(dataTypeMap, right, builder, 
canPartialPushDownConjuncts)
-
-          case _ => None
+        actionType match {
+          case t @ TrimUnconvertibleFilters(canPartialPushDownConjuncts) =>
+            // At here, it is not safe to just keep one side and remove the 
other side
+            // if we do not understand what the parent filters are.
+            //
+            // Here is an example used to explain the reason.
+            // Let's say we have NOT(a = 2 AND b in ('1')) and we do not 
understand how to
+            // convert b in ('1'). If we only convert a = 2, we will end up 
with a filter
+            // NOT(a = 2), which will generate wrong results.
+            //
+            // Pushing one side of AND down is only safe to do at the top 
level or in the child
+            // AND before hitting NOT or OR conditions, and in this case, the 
unsupported
+            // predicate can be safely removed.
+            val lhs = performAction(t, left)
+            val rhs = performAction(t, right)
+            (lhs, rhs) match {
+              case (Some(l), Some(r)) => Some(And(l, r))
+              case (Some(_), None) if canPartialPushDownConjuncts => lhs
+              case (None, Some(_)) if canPartialPushDownConjuncts => rhs
+              case _ => None
+            }
+          case BuildSearchArgument(builder) =>
+            builder.startAnd()
+            updateBuilder(left, builder)
+            updateBuilder(right, builder)
+            builder.end()
+            ()
         }
 
       case Or(left, right) =>
-        // The Or predicate is convertible when both of its children can be 
pushed down.
-        // That is to say, if one/both of the children can be partially pushed 
down, the Or
-        // predicate can be partially pushed down as well.
-        //
-        // Here is an example used to explain the reason.
-        // Let's say we have
-        // (a1 AND a2) OR (b1 AND b2),
-        // a1 and b1 is convertible, while a2 and b2 is not.
-        // The predicate can be converted as
-        // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2)
-        // As per the logical in And predicate, we can push down (a1 OR b1).
-        for {
-          _ <- createBuilder(dataTypeMap, left, newBuilder, 
canPartialPushDownConjuncts)
-          _ <- createBuilder(dataTypeMap, right, newBuilder, 
canPartialPushDownConjuncts)
-          lhs <- createBuilder(dataTypeMap, left,
-            builder.startOr(), canPartialPushDownConjuncts)
-          rhs <- createBuilder(dataTypeMap, right, lhs, 
canPartialPushDownConjuncts)
-        } yield rhs.end()
+        actionType match {
+          case t: TrimUnconvertibleFilters =>
+            // The Or predicate is convertible when both of its children can 
be pushed down.
+            // That is to say, if one/both of the children can be partially 
pushed down, the Or
+            // predicate can be partially pushed down as well.
+            //
+            // Here is an example used to explain the reason.
+            // Let's say we have
+            // (a1 AND a2) OR (b1 AND b2),
+            // a1 and b1 is convertible, while a2 and b2 is not.
+            // The predicate can be converted as
+            // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2)
+            // As per the logical in And predicate, we can push down (a1 OR 
b1).
+            for {
+              lhs: Filter <- performAction(t, left)
+              rhs: Filter <- performAction(t, right)
+            } yield Or(lhs, rhs)
+          case BuildSearchArgument(builder) =>
+            builder.startOr()
+            updateBuilder(left, builder)
+            updateBuilder(right, builder)
+            builder.end()
+            ()
+        }
 
       case Not(child) =>
-        for {
-          _ <- createBuilder(dataTypeMap, child, newBuilder, 
canPartialPushDownConjuncts = false)
-          negate <- createBuilder(dataTypeMap,
-            child, builder.startNot(), canPartialPushDownConjuncts = false)
-        } yield negate.end()
+        actionType match {
+          case t: TrimUnconvertibleFilters =>
+            performAction(t.copy(canPartialPushDownConjuncts = false), 
child).map(Not)
+          case BuildSearchArgument(builder) =>
+            builder.startNot()
+            updateBuilder(child, builder)
+            builder.end()
+            ()
+        }
 
-      // NOTE: For all case branches dealing with leaf predicates below, the 
additional `startAnd()`
-      // call is mandatory.  ORC `SearchArgument` builder requires that all 
leaf predicates must be
-      // wrapped by a "parent" predicate (`And`, `Or`, or `Not`).
+      // NOTE: For all case branches dealing with leaf predicates below, the 
additional
+      // `startAnd()` call is mandatory.  ORC `SearchArgument` builder 
requires that all leaf
+      // predicates must be wrapped by a "parent" predicate (`And`, `Or`, or 
`Not`).
 
       case EqualTo(attribute, value) if 
isSearchableType(dataTypeMap(attribute)) =>
-        val quotedName = quoteAttributeNameIfNeeded(attribute)
-        val castedValue = castLiteralValue(value, dataTypeMap(attribute))
-        Some(builder.startAnd().equals(quotedName, getType(attribute), 
castedValue).end())
-
+        actionType match {
+          case _: TrimUnconvertibleFilters => Some(expression)
+          case BuildSearchArgument(builder) =>
+            val quotedName = quoteAttributeNameIfNeeded(attribute)
+            val castedValue = castLiteralValue(value, dataTypeMap(attribute))
+            builder.startAnd().equals(quotedName, getType(attribute), 
castedValue).end()
+            ()
+        }
       case EqualNullSafe(attribute, value) if 
isSearchableType(dataTypeMap(attribute)) =>
-        val quotedName = quoteAttributeNameIfNeeded(attribute)
-        val castedValue = castLiteralValue(value, dataTypeMap(attribute))
-        Some(builder.startAnd().nullSafeEquals(quotedName, getType(attribute), 
castedValue).end())
-
+        actionType match {
+          case _: TrimUnconvertibleFilters => Some(expression)
+          case BuildSearchArgument(builder) =>
+            val quotedName = quoteAttributeNameIfNeeded(attribute)
+            val castedValue = castLiteralValue(value, dataTypeMap(attribute))
+            builder.startAnd().nullSafeEquals(quotedName, getType(attribute), 
castedValue).end()
+            ()
+        }
       case LessThan(attribute, value) if 
isSearchableType(dataTypeMap(attribute)) =>
-        val quotedName = quoteAttributeNameIfNeeded(attribute)
-        val castedValue = castLiteralValue(value, dataTypeMap(attribute))
-        Some(builder.startAnd().lessThan(quotedName, getType(attribute), 
castedValue).end())
-
+        actionType match {
+          case _: TrimUnconvertibleFilters => Some(expression)
+          case BuildSearchArgument(builder) =>
+            val quotedName = quoteAttributeNameIfNeeded(attribute)
+            val castedValue = castLiteralValue(value, dataTypeMap(attribute))
+            builder.startAnd().lessThan(quotedName, getType(attribute), 
castedValue).end()
+            ()
+        }
       case LessThanOrEqual(attribute, value) if 
isSearchableType(dataTypeMap(attribute)) =>
-        val quotedName = quoteAttributeNameIfNeeded(attribute)
-        val castedValue = castLiteralValue(value, dataTypeMap(attribute))
-        Some(builder.startAnd().lessThanEquals(quotedName, getType(attribute), 
castedValue).end())
-
+        actionType match {
+          case _: TrimUnconvertibleFilters => Some(expression)
+          case BuildSearchArgument(builder) =>
+            val quotedName = quoteAttributeNameIfNeeded(attribute)
+            val castedValue = castLiteralValue(value, dataTypeMap(attribute))
+            builder.startAnd().lessThanEquals(quotedName, getType(attribute), 
castedValue).end()
+            ()
+        }
       case GreaterThan(attribute, value) if 
isSearchableType(dataTypeMap(attribute)) =>
-        val quotedName = quoteAttributeNameIfNeeded(attribute)
-        val castedValue = castLiteralValue(value, dataTypeMap(attribute))
-        Some(builder.startNot().lessThanEquals(quotedName, getType(attribute), 
castedValue).end())
-
+        actionType match {
+          case _: TrimUnconvertibleFilters => Some(expression)
+          case BuildSearchArgument(builder) =>
+            val quotedName = quoteAttributeNameIfNeeded(attribute)
+            val castedValue = castLiteralValue(value, dataTypeMap(attribute))
+            builder.startNot().lessThanEquals(quotedName, getType(attribute), 
castedValue).end()
+            ()
+        }
       case GreaterThanOrEqual(attribute, value) if 
isSearchableType(dataTypeMap(attribute)) =>
-        val quotedName = quoteAttributeNameIfNeeded(attribute)
-        val castedValue = castLiteralValue(value, dataTypeMap(attribute))
-        Some(builder.startNot().lessThan(quotedName, getType(attribute), 
castedValue).end())
-
+        actionType match {
+          case _: TrimUnconvertibleFilters => Some(expression)
+          case BuildSearchArgument(builder) =>
+            val quotedName = quoteAttributeNameIfNeeded(attribute)
+            val castedValue = castLiteralValue(value, dataTypeMap(attribute))
+            builder.startNot().lessThan(quotedName, getType(attribute), 
castedValue).end()
+            ()
+        }
       case IsNull(attribute) if isSearchableType(dataTypeMap(attribute)) =>
-        val quotedName = quoteAttributeNameIfNeeded(attribute)
-        Some(builder.startAnd().isNull(quotedName, getType(attribute)).end())
-
+        actionType match {
+          case _: TrimUnconvertibleFilters => Some(expression)
+          case BuildSearchArgument(builder) =>
+            val quotedName = quoteAttributeNameIfNeeded(attribute)
+            builder.startAnd().isNull(quotedName, getType(attribute)).end()
+            ()
+        }
       case IsNotNull(attribute) if isSearchableType(dataTypeMap(attribute)) =>
-        val quotedName = quoteAttributeNameIfNeeded(attribute)
-        Some(builder.startNot().isNull(quotedName, getType(attribute)).end())
-
+        actionType match {
+          case _: TrimUnconvertibleFilters => Some(expression)
+          case BuildSearchArgument(builder) =>
+            val quotedName = quoteAttributeNameIfNeeded(attribute)
+            builder.startNot().isNull(quotedName, getType(attribute)).end()
+            ()
+        }
       case In(attribute, values) if isSearchableType(dataTypeMap(attribute)) =>
-        val quotedName = quoteAttributeNameIfNeeded(attribute)
-        val castedValues = values.map(v => castLiteralValue(v, 
dataTypeMap(attribute)))
-        Some(builder.startAnd().in(quotedName, getType(attribute),
-          castedValues.map(_.asInstanceOf[AnyRef]): _*).end())
+        actionType match {
+          case _: TrimUnconvertibleFilters => Some(expression)
+          case BuildSearchArgument(builder) =>
+            val quotedName = quoteAttributeNameIfNeeded(attribute)
+            val castedValues = values.map(v => castLiteralValue(v, 
dataTypeMap(attribute)))
+            builder.startAnd().in(quotedName, getType(attribute),
+              castedValues.map(_.asInstanceOf[AnyRef]): _*).end()
+            ()
+        }
 
-      case _ => None
+      case _ =>
+        actionType match {
+          case _: TrimUnconvertibleFilters => None
+          case BuildSearchArgument(builder) =>
+            throw new IllegalArgumentException(s"Can't build unsupported 
filter ${expression}")
+        }
     }
   }
+
+  private def performFilter(
 
 Review comment:
   or, we can remove both `performFilter` and `updateBuilder`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to