[GitHub] [spark] cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion URL: https://github.com/apache/spark/pull/24068#discussion_r293209902 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala ## @@ -362,6 +394,13 @@ object FilterPushdownBenchmark extends BenchmarkBase with SQLHelper { } runBenchmark(s"Pushdown benchmark with many filters") { + // This benchmark and the next one are similar in that they both test predicate pushdown + // where the filter itself is very large. There have been cases where the filter conversion + // would take minutes to hours for large filters due to it being implemented with exponential + // complexity in the height of the filter tree. + // The difference between these two benchmarks is that this one benchmarks pushdown with a + // large string filter (`a AND b AND c ...`), whereas the next one benchmarks pushdown with + // a large Column-based filter (`col(a) || (col(b) || (col(c)...))`). Review comment: I still can't get it. Both the string filter and column-based filter will become an `Expression` in the `Filter` operator. The differences I see are 1. the new benchmark builds a larger filter 2. the new benchmark use `Or` instead of `And`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion URL: https://github.com/apache/spark/pull/24068#discussion_r292759584 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala ## @@ -362,6 +394,13 @@ object FilterPushdownBenchmark extends BenchmarkBase with SQLHelper { } runBenchmark(s"Pushdown benchmark with many filters") { + // This benchmark and the next one are similar in that they both test predicate pushdown + // where the filter itself is very large. There have been cases where the filter conversion + // would take minutes to hours for large filters due to it being implemented with exponential + // complexity in the height of the filter tree. + // The difference between these two benchmarks is that this one benchmarks pushdown with a + // large string filter (`a AND b AND c ...`), whereas the next one benchmarks pushdown with + // a large Column-based filter (`col(a) || (col(b) || (col(c)...))`). Review comment: If I read this comment correctly, it seems that we should just remove the next benchmark, as string filter and Column-based filter have no difference regarding performance. Is there any other critical difference that I missed? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion URL: https://github.com/apache/spark/pull/24068#discussion_r289372532 ## File path: sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala ## @@ -143,145 +115,226 @@ private[sql] object OrcFilters extends OrcFiltersBase { case _ => value } + import org.apache.spark.sql.sources._ + import OrcFilters._ + /** - * Build a SearchArgument and return the builder so far. + * Builds a SearchArgument for a Filter by first trimming the non-convertible nodes, and then + * only building the remaining convertible nodes. + * + * Doing the conversion in this way avoids the computational complexity problems introduced by + * checking whether a node is convertible while building it. The approach implemented here has + * complexity that's linear in the size of the Filter tree - O(number of Filter nodes) - we run + * a single pass over the tree to trim it, and then another pass on the trimmed tree to convert + * the remaining nodes. + * + * The alternative approach of checking-while-building can (and did) result + * in exponential complexity in the height of the tree, causing perf problems with Filters with + * as few as ~35 nodes if they were skewed. */ - private def buildSearchArgument( - dataTypeMap: Map[String, DataType], + private[sql] def buildSearchArgument( expression: Filter, builder: Builder): Option[Builder] = { -createBuilder(dataTypeMap, expression, builder, canPartialPushDownConjuncts = true) +trimUnconvertibleFilters(expression).map { filter => + updateBuilder(filter, builder) + builder +} + } + + private[sql] def trimUnconvertibleFilters(expression: Filter): Option[Filter] = { Review comment: nit: since `updateBuilder` has doc, it's better to add doc for this one as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion URL: https://github.com/apache/spark/pull/24068#discussion_r289027257 ## File path: sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala ## @@ -143,145 +115,231 @@ private[sql] object OrcFilters extends OrcFiltersBase { case _ => value } + import org.apache.spark.sql.sources._ + import OrcFilters._ + + private[sql] def trimUnconvertibleFilters(expression: Filter): Option[Filter] = { +performFilter(expression, canPartialPushDownConjuncts = true) + } + /** - * Build a SearchArgument and return the builder so far. + * Builds a SearchArgument for a Filter by first trimming the non-convertible nodes, and then + * only building the remaining convertible nodes. + * + * Doing the conversion in this way avoids the computational complexity problems introduced by + * checking whether a node is convertible while building it. The approach implemented here has + * complexity that's linear in the size of the Filter tree - O(number of Filter nodes) - we run + * a single pass over the tree to trim it, and then another pass on the trimmed tree to convert + * the remaining nodes. + * + * The alternative approach of checking-while-building can (and did) result + * in exponential complexity in the height of the tree, causing perf problems with Filters with + * as few as ~35 nodes if they were skewed. */ - private def buildSearchArgument( - dataTypeMap: Map[String, DataType], + private[sql] def buildSearchArgument( expression: Filter, builder: Builder): Option[Builder] = { -createBuilder(dataTypeMap, expression, builder, canPartialPushDownConjuncts = true) +trimUnconvertibleFilters(expression).map { filter => + updateBuilder(filter, builder) + builder +} } - /** - * @param dataTypeMap a map from the attribute name to its data type. - * @param expression the input filter predicates. - * @param builder the input SearchArgument.Builder. - * @param canPartialPushDownConjuncts whether a subset of conjuncts of predicates can be pushed - *down safely. Pushing ONLY one side of AND down is safe to - *do at the top level or none of its ancestors is NOT and OR. - * @return the builder so far. - */ - private def createBuilder( - dataTypeMap: Map[String, DataType], - expression: Filter, - builder: Builder, - canPartialPushDownConjuncts: Boolean): Option[Builder] = { + sealed trait ActionType[ReturnType] + case class TrimUnconvertibleFilters(canPartialPushDownConjuncts: Boolean) + extends ActionType[Option[Filter]] + case class BuildSearchArgument(builder: Builder) extends ActionType[Unit] + + // The performAction method can run both the filtering and building operations for a given + // node - we signify which one we want with the `actionType` parameter. + // + // There are a couple of benefits to coupling the two operations like this: + // 1. All the logic for a given predicate is grouped logically in the same place. You don't + // have to scroll across the whole file to see what the filter action for an And is while + // you're looking at the build action. + // 2. It's much easier to keep the implementations of the two operations up-to-date with + // each other. If the `filter` and `build` operations are implemented as separate case-matches + // in different methods, it's very easy to change one without appropriately updating the + // other. For example, if we add a new supported node type to `filter`, it would be very + // easy to forget to update `build` to support it too, thus leading to conversion errors. + private def performAction[ReturnType]( + actionType: ActionType[ReturnType], + expression: Filter): ReturnType = { def getType(attribute: String): PredicateLeaf.Type = getPredicateLeafType(dataTypeMap(attribute)) -import org.apache.spark.sql.sources._ - expression match { case And(left, right) => -// At here, it is not safe to just convert one side and remove the other side -// if we do not understand what the parent filters are. -// -// Here is an example used to explain the reason. -// Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to -// convert b in ('1'). If we only convert a = 2, we will end up with a filter -// NOT(a = 2), which will generate wrong results. -// -// Pushing one side of AND down is only safe to do at the top level or in the child -// AND before hitting NOT or OR conditions, and in this case, the unsupported predicate -// can be safely removed. -val leftBuilderOption = -
[GitHub] [spark] cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion URL: https://github.com/apache/spark/pull/24068#discussion_r289026905 ## File path: sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala ## @@ -143,145 +115,231 @@ private[sql] object OrcFilters extends OrcFiltersBase { case _ => value } + import org.apache.spark.sql.sources._ + import OrcFilters._ + + private[sql] def trimUnconvertibleFilters(expression: Filter): Option[Filter] = { +performFilter(expression, canPartialPushDownConjuncts = true) + } + /** - * Build a SearchArgument and return the builder so far. + * Builds a SearchArgument for a Filter by first trimming the non-convertible nodes, and then + * only building the remaining convertible nodes. + * + * Doing the conversion in this way avoids the computational complexity problems introduced by + * checking whether a node is convertible while building it. The approach implemented here has + * complexity that's linear in the size of the Filter tree - O(number of Filter nodes) - we run + * a single pass over the tree to trim it, and then another pass on the trimmed tree to convert + * the remaining nodes. + * + * The alternative approach of checking-while-building can (and did) result + * in exponential complexity in the height of the tree, causing perf problems with Filters with + * as few as ~35 nodes if they were skewed. */ - private def buildSearchArgument( - dataTypeMap: Map[String, DataType], + private[sql] def buildSearchArgument( expression: Filter, builder: Builder): Option[Builder] = { -createBuilder(dataTypeMap, expression, builder, canPartialPushDownConjuncts = true) +trimUnconvertibleFilters(expression).map { filter => + updateBuilder(filter, builder) + builder +} } - /** - * @param dataTypeMap a map from the attribute name to its data type. - * @param expression the input filter predicates. - * @param builder the input SearchArgument.Builder. - * @param canPartialPushDownConjuncts whether a subset of conjuncts of predicates can be pushed - *down safely. Pushing ONLY one side of AND down is safe to - *do at the top level or none of its ancestors is NOT and OR. - * @return the builder so far. - */ - private def createBuilder( - dataTypeMap: Map[String, DataType], - expression: Filter, - builder: Builder, - canPartialPushDownConjuncts: Boolean): Option[Builder] = { + sealed trait ActionType[ReturnType] + case class TrimUnconvertibleFilters(canPartialPushDownConjuncts: Boolean) + extends ActionType[Option[Filter]] + case class BuildSearchArgument(builder: Builder) extends ActionType[Unit] + + // The performAction method can run both the filtering and building operations for a given + // node - we signify which one we want with the `actionType` parameter. + // + // There are a couple of benefits to coupling the two operations like this: + // 1. All the logic for a given predicate is grouped logically in the same place. You don't + // have to scroll across the whole file to see what the filter action for an And is while + // you're looking at the build action. + // 2. It's much easier to keep the implementations of the two operations up-to-date with + // each other. If the `filter` and `build` operations are implemented as separate case-matches + // in different methods, it's very easy to change one without appropriately updating the + // other. For example, if we add a new supported node type to `filter`, it would be very + // easy to forget to update `build` to support it too, thus leading to conversion errors. + private def performAction[ReturnType]( + actionType: ActionType[ReturnType], + expression: Filter): ReturnType = { def getType(attribute: String): PredicateLeaf.Type = getPredicateLeafType(dataTypeMap(attribute)) -import org.apache.spark.sql.sources._ - expression match { case And(left, right) => -// At here, it is not safe to just convert one side and remove the other side -// if we do not understand what the parent filters are. -// -// Here is an example used to explain the reason. -// Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to -// convert b in ('1'). If we only convert a = 2, we will end up with a filter -// NOT(a = 2), which will generate wrong results. -// -// Pushing one side of AND down is only safe to do at the top level or in the child -// AND before hitting NOT or OR conditions, and in this case, the unsupported predicate -// can be safely removed. -val leftBuilderOption = -
[GitHub] [spark] cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion URL: https://github.com/apache/spark/pull/24068#discussion_r289025895 ## File path: sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala ## @@ -143,145 +115,231 @@ private[sql] object OrcFilters extends OrcFiltersBase { case _ => value } + import org.apache.spark.sql.sources._ + import OrcFilters._ + + private[sql] def trimUnconvertibleFilters(expression: Filter): Option[Filter] = { +performFilter(expression, canPartialPushDownConjuncts = true) + } + /** - * Build a SearchArgument and return the builder so far. + * Builds a SearchArgument for a Filter by first trimming the non-convertible nodes, and then + * only building the remaining convertible nodes. + * + * Doing the conversion in this way avoids the computational complexity problems introduced by + * checking whether a node is convertible while building it. The approach implemented here has + * complexity that's linear in the size of the Filter tree - O(number of Filter nodes) - we run + * a single pass over the tree to trim it, and then another pass on the trimmed tree to convert + * the remaining nodes. + * + * The alternative approach of checking-while-building can (and did) result + * in exponential complexity in the height of the tree, causing perf problems with Filters with + * as few as ~35 nodes if they were skewed. */ - private def buildSearchArgument( - dataTypeMap: Map[String, DataType], + private[sql] def buildSearchArgument( expression: Filter, builder: Builder): Option[Builder] = { -createBuilder(dataTypeMap, expression, builder, canPartialPushDownConjuncts = true) +trimUnconvertibleFilters(expression).map { filter => + updateBuilder(filter, builder) + builder +} } - /** - * @param dataTypeMap a map from the attribute name to its data type. - * @param expression the input filter predicates. - * @param builder the input SearchArgument.Builder. - * @param canPartialPushDownConjuncts whether a subset of conjuncts of predicates can be pushed - *down safely. Pushing ONLY one side of AND down is safe to - *do at the top level or none of its ancestors is NOT and OR. - * @return the builder so far. - */ - private def createBuilder( - dataTypeMap: Map[String, DataType], - expression: Filter, - builder: Builder, - canPartialPushDownConjuncts: Boolean): Option[Builder] = { + sealed trait ActionType[ReturnType] + case class TrimUnconvertibleFilters(canPartialPushDownConjuncts: Boolean) + extends ActionType[Option[Filter]] + case class BuildSearchArgument(builder: Builder) extends ActionType[Unit] + + // The performAction method can run both the filtering and building operations for a given + // node - we signify which one we want with the `actionType` parameter. + // + // There are a couple of benefits to coupling the two operations like this: + // 1. All the logic for a given predicate is grouped logically in the same place. You don't + // have to scroll across the whole file to see what the filter action for an And is while + // you're looking at the build action. + // 2. It's much easier to keep the implementations of the two operations up-to-date with + // each other. If the `filter` and `build` operations are implemented as separate case-matches + // in different methods, it's very easy to change one without appropriately updating the + // other. For example, if we add a new supported node type to `filter`, it would be very + // easy to forget to update `build` to support it too, thus leading to conversion errors. + private def performAction[ReturnType]( + actionType: ActionType[ReturnType], + expression: Filter): ReturnType = { def getType(attribute: String): PredicateLeaf.Type = getPredicateLeafType(dataTypeMap(attribute)) -import org.apache.spark.sql.sources._ - expression match { case And(left, right) => -// At here, it is not safe to just convert one side and remove the other side -// if we do not understand what the parent filters are. -// -// Here is an example used to explain the reason. -// Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to -// convert b in ('1'). If we only convert a = 2, we will end up with a filter -// NOT(a = 2), which will generate wrong results. -// -// Pushing one side of AND down is only safe to do at the top level or in the child -// AND before hitting NOT or OR conditions, and in this case, the unsupported predicate -// can be safely removed. -val leftBuilderOption = -
[GitHub] [spark] cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion URL: https://github.com/apache/spark/pull/24068#discussion_r288938467 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala ## @@ -135,6 +139,34 @@ object FilterPushdownBenchmark extends BenchmarkBase with SQLHelper { benchmark.run() } + def filterPushDownBenchmarkWithColumn( Review comment: a general note: I think `FilterPushdownBenchmark` is to test how good are these file formats about skipping disk IO with pushed filter. Checking the performance of building the filters may go beyond its scope. Maybe we can just do a manual microbenchmark, and post the benchmark and result in PR description. cc @dongjoon-hyun This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion URL: https://github.com/apache/spark/pull/24068#discussion_r288936575 ## File path: sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala ## @@ -143,145 +115,231 @@ private[sql] object OrcFilters extends OrcFiltersBase { case _ => value } + import org.apache.spark.sql.sources._ + import OrcFilters._ + + private[sql] def trimUnconvertibleFilters(expression: Filter): Option[Filter] = { +performFilter(expression, canPartialPushDownConjuncts = true) + } + /** - * Build a SearchArgument and return the builder so far. + * Builds a SearchArgument for a Filter by first trimming the non-convertible nodes, and then + * only building the remaining convertible nodes. + * + * Doing the conversion in this way avoids the computational complexity problems introduced by + * checking whether a node is convertible while building it. The approach implemented here has + * complexity that's linear in the size of the Filter tree - O(number of Filter nodes) - we run + * a single pass over the tree to trim it, and then another pass on the trimmed tree to convert + * the remaining nodes. + * + * The alternative approach of checking-while-building can (and did) result + * in exponential complexity in the height of the tree, causing perf problems with Filters with + * as few as ~35 nodes if they were skewed. */ - private def buildSearchArgument( - dataTypeMap: Map[String, DataType], + private[sql] def buildSearchArgument( expression: Filter, builder: Builder): Option[Builder] = { -createBuilder(dataTypeMap, expression, builder, canPartialPushDownConjuncts = true) +performFilter(expression, canPartialPushDownConjuncts = true).map { filter => + updateBuilder(filter, builder) + builder +} } - /** - * @param dataTypeMap a map from the attribute name to its data type. - * @param expression the input filter predicates. - * @param builder the input SearchArgument.Builder. - * @param canPartialPushDownConjuncts whether a subset of conjuncts of predicates can be pushed - *down safely. Pushing ONLY one side of AND down is safe to - *do at the top level or none of its ancestors is NOT and OR. - * @return the builder so far. - */ - private def createBuilder( - dataTypeMap: Map[String, DataType], - expression: Filter, - builder: Builder, - canPartialPushDownConjuncts: Boolean): Option[Builder] = { + sealed trait ActionType[ReturnType] + case class TrimUnconvertibleFilters(canPartialPushDownConjuncts: Boolean) + extends ActionType[Option[Filter]] + case class BuildSearchArgument(builder: Builder) extends ActionType[Unit] + + // The performAction method can run both the filtering and building operations for a given + // node - we signify which one we want with the `actionType` parameter. + // + // There are a couple of benefits to coupling the two operations like this: + // 1. All the logic for a given predicate is grouped logically in the same place. You don't + // have to scroll across the whole file to see what the filter action for an And is while + // you're looking at the build action. + // 2. It's much easier to keep the implementations of the two operations up-to-date with + // each other. If the `filter` and `build` operations are implemented as separate case-matches + // in different methods, it's very easy to change one without appropriately updating the + // other. For example, if we add a new supported node type to `filter`, it would be very + // easy to forget to update `build` to support it too, thus leading to conversion errors. + private def performAction[ReturnType]( + actionType: ActionType[ReturnType], + expression: Filter): ReturnType = { def getType(attribute: String): PredicateLeaf.Type = getPredicateLeafType(dataTypeMap(attribute)) -import org.apache.spark.sql.sources._ - expression match { case And(left, right) => -// At here, it is not safe to just convert one side and remove the other side -// if we do not understand what the parent filters are. -// -// Here is an example used to explain the reason. -// Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to -// convert b in ('1'). If we only convert a = 2, we will end up with a filter -// NOT(a = 2), which will generate wrong results. -// -// Pushing one side of AND down is only safe to do at the top level or in the child -// AND before hitting NOT or OR conditions, and in this case, the unsupported predicate -// can be safely removed. -val
[GitHub] [spark] cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion URL: https://github.com/apache/spark/pull/24068#discussion_r288936270 ## File path: sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala ## @@ -143,145 +115,231 @@ private[sql] object OrcFilters extends OrcFiltersBase { case _ => value } + import org.apache.spark.sql.sources._ + import OrcFilters._ + + private[sql] def trimUnconvertibleFilters(expression: Filter): Option[Filter] = { +performFilter(expression, canPartialPushDownConjuncts = true) + } + /** - * Build a SearchArgument and return the builder so far. + * Builds a SearchArgument for a Filter by first trimming the non-convertible nodes, and then + * only building the remaining convertible nodes. + * + * Doing the conversion in this way avoids the computational complexity problems introduced by + * checking whether a node is convertible while building it. The approach implemented here has + * complexity that's linear in the size of the Filter tree - O(number of Filter nodes) - we run + * a single pass over the tree to trim it, and then another pass on the trimmed tree to convert + * the remaining nodes. + * + * The alternative approach of checking-while-building can (and did) result + * in exponential complexity in the height of the tree, causing perf problems with Filters with + * as few as ~35 nodes if they were skewed. */ - private def buildSearchArgument( - dataTypeMap: Map[String, DataType], + private[sql] def buildSearchArgument( expression: Filter, builder: Builder): Option[Builder] = { -createBuilder(dataTypeMap, expression, builder, canPartialPushDownConjuncts = true) +performFilter(expression, canPartialPushDownConjuncts = true).map { filter => + updateBuilder(filter, builder) + builder +} } - /** - * @param dataTypeMap a map from the attribute name to its data type. - * @param expression the input filter predicates. - * @param builder the input SearchArgument.Builder. - * @param canPartialPushDownConjuncts whether a subset of conjuncts of predicates can be pushed - *down safely. Pushing ONLY one side of AND down is safe to - *do at the top level or none of its ancestors is NOT and OR. - * @return the builder so far. - */ - private def createBuilder( - dataTypeMap: Map[String, DataType], - expression: Filter, - builder: Builder, - canPartialPushDownConjuncts: Boolean): Option[Builder] = { + sealed trait ActionType[ReturnType] + case class TrimUnconvertibleFilters(canPartialPushDownConjuncts: Boolean) + extends ActionType[Option[Filter]] + case class BuildSearchArgument(builder: Builder) extends ActionType[Unit] + + // The performAction method can run both the filtering and building operations for a given + // node - we signify which one we want with the `actionType` parameter. + // + // There are a couple of benefits to coupling the two operations like this: + // 1. All the logic for a given predicate is grouped logically in the same place. You don't + // have to scroll across the whole file to see what the filter action for an And is while + // you're looking at the build action. + // 2. It's much easier to keep the implementations of the two operations up-to-date with + // each other. If the `filter` and `build` operations are implemented as separate case-matches + // in different methods, it's very easy to change one without appropriately updating the + // other. For example, if we add a new supported node type to `filter`, it would be very + // easy to forget to update `build` to support it too, thus leading to conversion errors. + private def performAction[ReturnType]( + actionType: ActionType[ReturnType], + expression: Filter): ReturnType = { def getType(attribute: String): PredicateLeaf.Type = getPredicateLeafType(dataTypeMap(attribute)) -import org.apache.spark.sql.sources._ - expression match { case And(left, right) => -// At here, it is not safe to just convert one side and remove the other side -// if we do not understand what the parent filters are. -// -// Here is an example used to explain the reason. -// Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to -// convert b in ('1'). If we only convert a = 2, we will end up with a filter -// NOT(a = 2), which will generate wrong results. -// -// Pushing one side of AND down is only safe to do at the top level or in the child -// AND before hitting NOT or OR conditions, and in this case, the unsupported predicate -// can be safely removed. -val
[GitHub] [spark] cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion URL: https://github.com/apache/spark/pull/24068#discussion_r288935742 ## File path: sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala ## @@ -143,145 +115,231 @@ private[sql] object OrcFilters extends OrcFiltersBase { case _ => value } + import org.apache.spark.sql.sources._ + import OrcFilters._ + + private[sql] def trimUnconvertibleFilters(expression: Filter): Option[Filter] = { +performFilter(expression, canPartialPushDownConjuncts = true) + } + /** - * Build a SearchArgument and return the builder so far. + * Builds a SearchArgument for a Filter by first trimming the non-convertible nodes, and then + * only building the remaining convertible nodes. + * + * Doing the conversion in this way avoids the computational complexity problems introduced by + * checking whether a node is convertible while building it. The approach implemented here has + * complexity that's linear in the size of the Filter tree - O(number of Filter nodes) - we run + * a single pass over the tree to trim it, and then another pass on the trimmed tree to convert + * the remaining nodes. + * + * The alternative approach of checking-while-building can (and did) result + * in exponential complexity in the height of the tree, causing perf problems with Filters with + * as few as ~35 nodes if they were skewed. */ - private def buildSearchArgument( - dataTypeMap: Map[String, DataType], + private[sql] def buildSearchArgument( expression: Filter, builder: Builder): Option[Builder] = { -createBuilder(dataTypeMap, expression, builder, canPartialPushDownConjuncts = true) +performFilter(expression, canPartialPushDownConjuncts = true).map { filter => Review comment: super nit: seems it's clearer to call `trimUnconvertibleFilters` instead of `performFilter` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion URL: https://github.com/apache/spark/pull/24068#discussion_r288936689 ## File path: sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala ## @@ -143,145 +115,231 @@ private[sql] object OrcFilters extends OrcFiltersBase { case _ => value } + import org.apache.spark.sql.sources._ + import OrcFilters._ + + private[sql] def trimUnconvertibleFilters(expression: Filter): Option[Filter] = { +performFilter(expression, canPartialPushDownConjuncts = true) + } + /** - * Build a SearchArgument and return the builder so far. + * Builds a SearchArgument for a Filter by first trimming the non-convertible nodes, and then + * only building the remaining convertible nodes. + * + * Doing the conversion in this way avoids the computational complexity problems introduced by + * checking whether a node is convertible while building it. The approach implemented here has + * complexity that's linear in the size of the Filter tree - O(number of Filter nodes) - we run + * a single pass over the tree to trim it, and then another pass on the trimmed tree to convert + * the remaining nodes. + * + * The alternative approach of checking-while-building can (and did) result + * in exponential complexity in the height of the tree, causing perf problems with Filters with + * as few as ~35 nodes if they were skewed. */ - private def buildSearchArgument( - dataTypeMap: Map[String, DataType], + private[sql] def buildSearchArgument( expression: Filter, builder: Builder): Option[Builder] = { -createBuilder(dataTypeMap, expression, builder, canPartialPushDownConjuncts = true) +performFilter(expression, canPartialPushDownConjuncts = true).map { filter => + updateBuilder(filter, builder) + builder +} } - /** - * @param dataTypeMap a map from the attribute name to its data type. - * @param expression the input filter predicates. - * @param builder the input SearchArgument.Builder. - * @param canPartialPushDownConjuncts whether a subset of conjuncts of predicates can be pushed - *down safely. Pushing ONLY one side of AND down is safe to - *do at the top level or none of its ancestors is NOT and OR. - * @return the builder so far. - */ - private def createBuilder( - dataTypeMap: Map[String, DataType], - expression: Filter, - builder: Builder, - canPartialPushDownConjuncts: Boolean): Option[Builder] = { + sealed trait ActionType[ReturnType] + case class TrimUnconvertibleFilters(canPartialPushDownConjuncts: Boolean) + extends ActionType[Option[Filter]] + case class BuildSearchArgument(builder: Builder) extends ActionType[Unit] + + // The performAction method can run both the filtering and building operations for a given + // node - we signify which one we want with the `actionType` parameter. + // + // There are a couple of benefits to coupling the two operations like this: + // 1. All the logic for a given predicate is grouped logically in the same place. You don't + // have to scroll across the whole file to see what the filter action for an And is while + // you're looking at the build action. + // 2. It's much easier to keep the implementations of the two operations up-to-date with + // each other. If the `filter` and `build` operations are implemented as separate case-matches + // in different methods, it's very easy to change one without appropriately updating the + // other. For example, if we add a new supported node type to `filter`, it would be very + // easy to forget to update `build` to support it too, thus leading to conversion errors. + private def performAction[ReturnType]( + actionType: ActionType[ReturnType], + expression: Filter): ReturnType = { def getType(attribute: String): PredicateLeaf.Type = getPredicateLeafType(dataTypeMap(attribute)) -import org.apache.spark.sql.sources._ - expression match { case And(left, right) => -// At here, it is not safe to just convert one side and remove the other side -// if we do not understand what the parent filters are. -// -// Here is an example used to explain the reason. -// Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to -// convert b in ('1'). If we only convert a = 2, we will end up with a filter -// NOT(a = 2), which will generate wrong results. -// -// Pushing one side of AND down is only safe to do at the top level or in the child -// AND before hitting NOT or OR conditions, and in this case, the unsupported predicate -// can be safely removed. -val
[GitHub] [spark] cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion URL: https://github.com/apache/spark/pull/24068#discussion_r288823022 ## File path: sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala ## @@ -63,55 +64,26 @@ private[sql] object OrcFilters extends OrcFiltersBase { */ def createFilter(schema: StructType, filters: Seq[Filter]): Option[SearchArgument] = { val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap +val orcFilterConverter = new OrcFilterConverter(dataTypeMap) for { // Combines all convertible filters using `And` to produce a single conjunction - conjunction <- buildTree(convertibleFilters(schema, dataTypeMap, filters)) + conjunction <- buildTree(filters.flatMap(orcFilterConverter.trimUnconvertibleFilters)) // Then tries to build a single ORC `SearchArgument` for the conjunction predicate - builder <- buildSearchArgument(dataTypeMap, conjunction, newBuilder) + builder <- orcFilterConverter.buildSearchArgument(conjunction, newBuilder) Review comment: The code here: ``` for { conjunction <- buildTree(filters.flatMap(orcFilterConverter.trimUnconvertibleFilters)) builder <- orcFilterConverter.buildSearchArgument(conjunction, newBuilder) } ``` However, `buildSearchArgument` calls `performFilter` which is same as calling `trimUnconvertibleFilters`. That said, we call `trimUnconvertibleFilters` twice. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion URL: https://github.com/apache/spark/pull/24068#discussion_r288548031 ## File path: sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala ## @@ -143,145 +115,236 @@ private[sql] object OrcFilters extends OrcFiltersBase { case _ => value } + import org.apache.spark.sql.sources._ + import OrcFilters._ + + private[sql] def trimUnconvertibleFilters(expression: Filter): Option[Filter] = { +performFilter(expression, canPartialPushDownConjuncts = true) + } + /** - * Build a SearchArgument and return the builder so far. + * Builds a SearchArgument for a Filter by first trimming the non-convertible nodes, and then + * only building the remaining convertible nodes. + * + * Doing the conversion in this way avoids the computational complexity problems introduced by + * checking whether a node is convertible while building it. The approach implemented here has + * complexity that's linear in the size of the Filter tree - O(number of Filter nodes) - we run + * a single pass over the tree to trim it, and then another pass on the trimmed tree to convert + * the remaining nodes. + * + * The alternative approach of checking-while-building can (and did) result + * in exponential complexity in the height of the tree, causing perf problems with Filters with + * as few as ~35 nodes if they were skewed. */ - private def buildSearchArgument( - dataTypeMap: Map[String, DataType], + private[sql] def buildSearchArgument( expression: Filter, builder: Builder): Option[Builder] = { -createBuilder(dataTypeMap, expression, builder, canPartialPushDownConjuncts = true) +val filteredExpression: Option[Filter] = performFilter( + expression, + canPartialPushDownConjuncts = true) +filteredExpression.foreach(updateBuilder(_, builder)) +filteredExpression.map(_ => builder) } - /** - * @param dataTypeMap a map from the attribute name to its data type. - * @param expression the input filter predicates. - * @param builder the input SearchArgument.Builder. - * @param canPartialPushDownConjuncts whether a subset of conjuncts of predicates can be pushed - *down safely. Pushing ONLY one side of AND down is safe to - *do at the top level or none of its ancestors is NOT and OR. - * @return the builder so far. - */ - private def createBuilder( - dataTypeMap: Map[String, DataType], - expression: Filter, - builder: Builder, - canPartialPushDownConjuncts: Boolean): Option[Builder] = { + sealed trait ActionType[ReturnType] + case class TrimUnconvertibleFilters(canPartialPushDownConjuncts: Boolean) + extends ActionType[Option[Filter]] + case class BuildSearchArgument(builder: Builder) extends ActionType[Unit] + + // The performAction method can run both the filtering and building operations for a given + // node - we signify which one we want with the `actionType` parameter. + // + // There are a couple of benefits to coupling the two operations like this: + // 1. All the logic for a given predicate is grouped logically in the same place. You don't + // have to scroll across the whole file to see what the filter action for an And is while + // you're looking at the build action. + // 2. It's much easier to keep the implementations of the two operations up-to-date with + // each other. If the `filter` and `build` operations are implemented as separate case-matches + // in different methods, it's very easy to change one without appropriately updating the + // other. For example, if we add a new supported node type to `filter`, it would be very + // easy to forget to update `build` to support it too, thus leading to conversion errors. + private def performAction[ReturnType]( + actionType: ActionType[ReturnType], + expression: Filter): ReturnType = { def getType(attribute: String): PredicateLeaf.Type = getPredicateLeafType(dataTypeMap(attribute)) -import org.apache.spark.sql.sources._ - expression match { case And(left, right) => -// At here, it is not safe to just convert one side and remove the other side -// if we do not understand what the parent filters are. -// -// Here is an example used to explain the reason. -// Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to -// convert b in ('1'). If we only convert a = 2, we will end up with a filter -// NOT(a = 2), which will generate wrong results. -// -// Pushing one side of AND down is only safe to do at the top level or in the child -// AND before hitting NOT or OR conditions, and in this case, the
[GitHub] [spark] cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion URL: https://github.com/apache/spark/pull/24068#discussion_r288546979 ## File path: sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala ## @@ -143,145 +115,236 @@ private[sql] object OrcFilters extends OrcFiltersBase { case _ => value } + import org.apache.spark.sql.sources._ + import OrcFilters._ + + private[sql] def trimUnconvertibleFilters(expression: Filter): Option[Filter] = { +performFilter(expression, canPartialPushDownConjuncts = true) + } + /** - * Build a SearchArgument and return the builder so far. + * Builds a SearchArgument for a Filter by first trimming the non-convertible nodes, and then + * only building the remaining convertible nodes. + * + * Doing the conversion in this way avoids the computational complexity problems introduced by + * checking whether a node is convertible while building it. The approach implemented here has + * complexity that's linear in the size of the Filter tree - O(number of Filter nodes) - we run + * a single pass over the tree to trim it, and then another pass on the trimmed tree to convert + * the remaining nodes. + * + * The alternative approach of checking-while-building can (and did) result + * in exponential complexity in the height of the tree, causing perf problems with Filters with + * as few as ~35 nodes if they were skewed. */ - private def buildSearchArgument( - dataTypeMap: Map[String, DataType], + private[sql] def buildSearchArgument( expression: Filter, builder: Builder): Option[Builder] = { -createBuilder(dataTypeMap, expression, builder, canPartialPushDownConjuncts = true) +val filteredExpression: Option[Filter] = performFilter( + expression, + canPartialPushDownConjuncts = true) +filteredExpression.foreach(updateBuilder(_, builder)) +filteredExpression.map(_ => builder) } - /** - * @param dataTypeMap a map from the attribute name to its data type. - * @param expression the input filter predicates. - * @param builder the input SearchArgument.Builder. - * @param canPartialPushDownConjuncts whether a subset of conjuncts of predicates can be pushed - *down safely. Pushing ONLY one side of AND down is safe to - *do at the top level or none of its ancestors is NOT and OR. - * @return the builder so far. - */ - private def createBuilder( - dataTypeMap: Map[String, DataType], - expression: Filter, - builder: Builder, - canPartialPushDownConjuncts: Boolean): Option[Builder] = { + sealed trait ActionType[ReturnType] + case class TrimUnconvertibleFilters(canPartialPushDownConjuncts: Boolean) + extends ActionType[Option[Filter]] + case class BuildSearchArgument(builder: Builder) extends ActionType[Unit] + + // The performAction method can run both the filtering and building operations for a given + // node - we signify which one we want with the `actionType` parameter. + // + // There are a couple of benefits to coupling the two operations like this: + // 1. All the logic for a given predicate is grouped logically in the same place. You don't + // have to scroll across the whole file to see what the filter action for an And is while + // you're looking at the build action. + // 2. It's much easier to keep the implementations of the two operations up-to-date with + // each other. If the `filter` and `build` operations are implemented as separate case-matches + // in different methods, it's very easy to change one without appropriately updating the + // other. For example, if we add a new supported node type to `filter`, it would be very + // easy to forget to update `build` to support it too, thus leading to conversion errors. + private def performAction[ReturnType]( + actionType: ActionType[ReturnType], + expression: Filter): ReturnType = { def getType(attribute: String): PredicateLeaf.Type = getPredicateLeafType(dataTypeMap(attribute)) -import org.apache.spark.sql.sources._ - expression match { case And(left, right) => -// At here, it is not safe to just convert one side and remove the other side -// if we do not understand what the parent filters are. -// -// Here is an example used to explain the reason. -// Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to -// convert b in ('1'). If we only convert a = 2, we will end up with a filter -// NOT(a = 2), which will generate wrong results. -// -// Pushing one side of AND down is only safe to do at the top level or in the child -// AND before hitting NOT or OR conditions, and in this case, the
[GitHub] [spark] cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion URL: https://github.com/apache/spark/pull/24068#discussion_r288546979 ## File path: sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala ## @@ -143,145 +115,236 @@ private[sql] object OrcFilters extends OrcFiltersBase { case _ => value } + import org.apache.spark.sql.sources._ + import OrcFilters._ + + private[sql] def trimUnconvertibleFilters(expression: Filter): Option[Filter] = { +performFilter(expression, canPartialPushDownConjuncts = true) + } + /** - * Build a SearchArgument and return the builder so far. + * Builds a SearchArgument for a Filter by first trimming the non-convertible nodes, and then + * only building the remaining convertible nodes. + * + * Doing the conversion in this way avoids the computational complexity problems introduced by + * checking whether a node is convertible while building it. The approach implemented here has + * complexity that's linear in the size of the Filter tree - O(number of Filter nodes) - we run + * a single pass over the tree to trim it, and then another pass on the trimmed tree to convert + * the remaining nodes. + * + * The alternative approach of checking-while-building can (and did) result + * in exponential complexity in the height of the tree, causing perf problems with Filters with + * as few as ~35 nodes if they were skewed. */ - private def buildSearchArgument( - dataTypeMap: Map[String, DataType], + private[sql] def buildSearchArgument( expression: Filter, builder: Builder): Option[Builder] = { -createBuilder(dataTypeMap, expression, builder, canPartialPushDownConjuncts = true) +val filteredExpression: Option[Filter] = performFilter( + expression, + canPartialPushDownConjuncts = true) +filteredExpression.foreach(updateBuilder(_, builder)) +filteredExpression.map(_ => builder) } - /** - * @param dataTypeMap a map from the attribute name to its data type. - * @param expression the input filter predicates. - * @param builder the input SearchArgument.Builder. - * @param canPartialPushDownConjuncts whether a subset of conjuncts of predicates can be pushed - *down safely. Pushing ONLY one side of AND down is safe to - *do at the top level or none of its ancestors is NOT and OR. - * @return the builder so far. - */ - private def createBuilder( - dataTypeMap: Map[String, DataType], - expression: Filter, - builder: Builder, - canPartialPushDownConjuncts: Boolean): Option[Builder] = { + sealed trait ActionType[ReturnType] + case class TrimUnconvertibleFilters(canPartialPushDownConjuncts: Boolean) + extends ActionType[Option[Filter]] + case class BuildSearchArgument(builder: Builder) extends ActionType[Unit] + + // The performAction method can run both the filtering and building operations for a given + // node - we signify which one we want with the `actionType` parameter. + // + // There are a couple of benefits to coupling the two operations like this: + // 1. All the logic for a given predicate is grouped logically in the same place. You don't + // have to scroll across the whole file to see what the filter action for an And is while + // you're looking at the build action. + // 2. It's much easier to keep the implementations of the two operations up-to-date with + // each other. If the `filter` and `build` operations are implemented as separate case-matches + // in different methods, it's very easy to change one without appropriately updating the + // other. For example, if we add a new supported node type to `filter`, it would be very + // easy to forget to update `build` to support it too, thus leading to conversion errors. + private def performAction[ReturnType]( + actionType: ActionType[ReturnType], + expression: Filter): ReturnType = { def getType(attribute: String): PredicateLeaf.Type = getPredicateLeafType(dataTypeMap(attribute)) -import org.apache.spark.sql.sources._ - expression match { case And(left, right) => -// At here, it is not safe to just convert one side and remove the other side -// if we do not understand what the parent filters are. -// -// Here is an example used to explain the reason. -// Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to -// convert b in ('1'). If we only convert a = 2, we will end up with a filter -// NOT(a = 2), which will generate wrong results. -// -// Pushing one side of AND down is only safe to do at the top level or in the child -// AND before hitting NOT or OR conditions, and in this case, the
[GitHub] [spark] cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion URL: https://github.com/apache/spark/pull/24068#discussion_r288545737 ## File path: sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala ## @@ -143,145 +115,236 @@ private[sql] object OrcFilters extends OrcFiltersBase { case _ => value } + import org.apache.spark.sql.sources._ + import OrcFilters._ + + private[sql] def trimUnconvertibleFilters(expression: Filter): Option[Filter] = { +performFilter(expression, canPartialPushDownConjuncts = true) + } + /** - * Build a SearchArgument and return the builder so far. + * Builds a SearchArgument for a Filter by first trimming the non-convertible nodes, and then + * only building the remaining convertible nodes. + * + * Doing the conversion in this way avoids the computational complexity problems introduced by + * checking whether a node is convertible while building it. The approach implemented here has + * complexity that's linear in the size of the Filter tree - O(number of Filter nodes) - we run + * a single pass over the tree to trim it, and then another pass on the trimmed tree to convert + * the remaining nodes. + * + * The alternative approach of checking-while-building can (and did) result + * in exponential complexity in the height of the tree, causing perf problems with Filters with + * as few as ~35 nodes if they were skewed. */ - private def buildSearchArgument( - dataTypeMap: Map[String, DataType], + private[sql] def buildSearchArgument( expression: Filter, builder: Builder): Option[Builder] = { -createBuilder(dataTypeMap, expression, builder, canPartialPushDownConjuncts = true) +val filteredExpression: Option[Filter] = performFilter( + expression, + canPartialPushDownConjuncts = true) +filteredExpression.foreach(updateBuilder(_, builder)) +filteredExpression.map(_ => builder) } - /** - * @param dataTypeMap a map from the attribute name to its data type. - * @param expression the input filter predicates. - * @param builder the input SearchArgument.Builder. - * @param canPartialPushDownConjuncts whether a subset of conjuncts of predicates can be pushed - *down safely. Pushing ONLY one side of AND down is safe to - *do at the top level or none of its ancestors is NOT and OR. - * @return the builder so far. - */ - private def createBuilder( - dataTypeMap: Map[String, DataType], - expression: Filter, - builder: Builder, - canPartialPushDownConjuncts: Boolean): Option[Builder] = { + sealed trait ActionType[ReturnType] + case class TrimUnconvertibleFilters(canPartialPushDownConjuncts: Boolean) + extends ActionType[Option[Filter]] + case class BuildSearchArgument(builder: Builder) extends ActionType[Unit] + + // The performAction method can run both the filtering and building operations for a given + // node - we signify which one we want with the `actionType` parameter. + // + // There are a couple of benefits to coupling the two operations like this: + // 1. All the logic for a given predicate is grouped logically in the same place. You don't + // have to scroll across the whole file to see what the filter action for an And is while + // you're looking at the build action. + // 2. It's much easier to keep the implementations of the two operations up-to-date with + // each other. If the `filter` and `build` operations are implemented as separate case-matches + // in different methods, it's very easy to change one without appropriately updating the + // other. For example, if we add a new supported node type to `filter`, it would be very + // easy to forget to update `build` to support it too, thus leading to conversion errors. + private def performAction[ReturnType]( + actionType: ActionType[ReturnType], + expression: Filter): ReturnType = { def getType(attribute: String): PredicateLeaf.Type = getPredicateLeafType(dataTypeMap(attribute)) -import org.apache.spark.sql.sources._ - expression match { case And(left, right) => -// At here, it is not safe to just convert one side and remove the other side -// if we do not understand what the parent filters are. -// -// Here is an example used to explain the reason. -// Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to -// convert b in ('1'). If we only convert a = 2, we will end up with a filter -// NOT(a = 2), which will generate wrong results. -// -// Pushing one side of AND down is only safe to do at the top level or in the child -// AND before hitting NOT or OR conditions, and in this case, the
[GitHub] [spark] cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion URL: https://github.com/apache/spark/pull/24068#discussion_r288544694 ## File path: sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala ## @@ -143,145 +115,236 @@ private[sql] object OrcFilters extends OrcFiltersBase { case _ => value } + import org.apache.spark.sql.sources._ + import OrcFilters._ + + private[sql] def trimUnconvertibleFilters(expression: Filter): Option[Filter] = { +performFilter(expression, canPartialPushDownConjuncts = true) + } + /** - * Build a SearchArgument and return the builder so far. + * Builds a SearchArgument for a Filter by first trimming the non-convertible nodes, and then + * only building the remaining convertible nodes. + * + * Doing the conversion in this way avoids the computational complexity problems introduced by + * checking whether a node is convertible while building it. The approach implemented here has + * complexity that's linear in the size of the Filter tree - O(number of Filter nodes) - we run + * a single pass over the tree to trim it, and then another pass on the trimmed tree to convert + * the remaining nodes. + * + * The alternative approach of checking-while-building can (and did) result + * in exponential complexity in the height of the tree, causing perf problems with Filters with + * as few as ~35 nodes if they were skewed. */ - private def buildSearchArgument( - dataTypeMap: Map[String, DataType], + private[sql] def buildSearchArgument( expression: Filter, builder: Builder): Option[Builder] = { -createBuilder(dataTypeMap, expression, builder, canPartialPushDownConjuncts = true) +val filteredExpression: Option[Filter] = performFilter( + expression, + canPartialPushDownConjuncts = true) +filteredExpression.foreach(updateBuilder(_, builder)) Review comment: nit: ``` performFilter(...).map { filter => updateBuilder(filter, builder) builder.build() } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion URL: https://github.com/apache/spark/pull/24068#discussion_r288544046 ## File path: sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala ## @@ -63,55 +64,26 @@ private[sql] object OrcFilters extends OrcFiltersBase { */ def createFilter(schema: StructType, filters: Seq[Filter]): Option[SearchArgument] = { val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap +val orcFilterConverter = new OrcFilterConverter(dataTypeMap) for { // Combines all convertible filters using `And` to produce a single conjunction - conjunction <- buildTree(convertibleFilters(schema, dataTypeMap, filters)) + conjunction <- buildTree(filters.flatMap(orcFilterConverter.trimUnconvertibleFilters)) // Then tries to build a single ORC `SearchArgument` for the conjunction predicate - builder <- buildSearchArgument(dataTypeMap, conjunction, newBuilder) + builder <- orcFilterConverter.buildSearchArgument(conjunction, newBuilder) Review comment: `buildSearchArgument` already trimmed filters, so we don't need the for loop here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion URL: https://github.com/apache/spark/pull/24068#discussion_r288218663 ## File path: sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala ## @@ -143,145 +117,244 @@ private[sql] object OrcFilters extends OrcFiltersBase { case _ => value } + import org.apache.spark.sql.sources._ + import OrcFilters._ + + private[sql] def trimUnconvertibleFilters(expression: Filter): Option[Filter] = { +performFilter(expression, canPartialPushDownConjuncts = true) + } + /** - * Build a SearchArgument and return the builder so far. + * Builds a SearchArgument for a Filter by first trimming the non-convertible nodes, and then + * only building the remaining convertible nodes. + * + * Doing the conversion in this way avoids the computational complexity problems introduced by + * checking whether a node is convertible while building it. The approach implemented here has + * complexity that's linear in the size of the Filter tree - O(number of Filter nodes) - we run + * a single pass over the tree to trim it, and then another pass on the trimmed tree to convert + * the remaining nodes. + * + * The alternative approach of checking-while-building can (and did) result + * in exponential complexity in the height of the tree, causing perf problems with Filters with + * as few as ~35 nodes if they were skewed. */ - private def buildSearchArgument( - dataTypeMap: Map[String, DataType], + private[sql] def buildSearchArgument( expression: Filter, builder: Builder): Option[Builder] = { -createBuilder(dataTypeMap, expression, builder, canPartialPushDownConjuncts = true) +val filteredExpression: Option[Filter] = performFilter( + expression, + canPartialPushDownConjuncts = true) +filteredExpression.foreach(updateBuilder(_, builder)) +filteredExpression.map(_ => builder) } - /** - * @param dataTypeMap a map from the attribute name to its data type. - * @param expression the input filter predicates. - * @param builder the input SearchArgument.Builder. - * @param canPartialPushDownConjuncts whether a subset of conjuncts of predicates can be pushed - *down safely. Pushing ONLY one side of AND down is safe to - *do at the top level or none of its ancestors is NOT and OR. - * @return the builder so far. - */ - private def createBuilder( - dataTypeMap: Map[String, DataType], - expression: Filter, - builder: Builder, - canPartialPushDownConjuncts: Boolean): Option[Builder] = { + sealed trait ActionType + case class TrimUnconvertibleFilters(canPartialPushDownConjuncts: Boolean) extends ActionType + case class BuildSearchArgument(builder: Builder) extends ActionType + + // The performAction method can run both the filtering and building operations for a given + // node - we signify which one we want with the `actionType` parameter. + // + // There are a couple of benefits to coupling the two operations like this: + // 1. All the logic for a given predicate is grouped logically in the same place. You don't + // have to scroll across the whole file to see what the filter action for an And is while + // you're looking at the build action. + // 2. It's much easier to keep the implementations of the two operations up-to-date with + // each other. If the `filter` and `build` operations are implemented as separate case-matches + // in different methods, it's very easy to change one without appropriately updating the + // other. For example, if we add a new supported node type to `filter`, it would be very + // easy to forget to update `build` to support it too, thus leading to conversion errors. + // + // Doing things this way does have some annoying side effects: + // - We need to return an `Either`, with one action type always returning a Left and the other + // always returning a Right. + // - We always need to pass the canPartialPushDownConjuncts parameter even though the build + // action doesn't need it (because by the time we run the `build` operation, we know all + // remaining nodes are convertible). + def performAction( + actionType: ActionType, + expression: Filter): Either[Option[Filter], Unit] = { Review comment: another idea: add type parameter to `ActionType` ``` sealed trait ActionType[T] case class TrimUnconvertibleFilters(canPartialPushDownConjuncts: Boolean) extends ActionType[Option[Filter]] case class BuildSearchArgument(builder: Builder) extends ActionType[Unit] def performAction(actionType: ActionType[T], ...): T ``` This is an automated message from
[GitHub] [spark] cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion URL: https://github.com/apache/spark/pull/24068#discussion_r288218053 ## File path: sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala ## @@ -143,145 +117,244 @@ private[sql] object OrcFilters extends OrcFiltersBase { case _ => value } + import org.apache.spark.sql.sources._ + import OrcFilters._ + + private[sql] def trimUnconvertibleFilters(expression: Filter): Option[Filter] = { +performFilter(expression, canPartialPushDownConjuncts = true) + } + /** - * Build a SearchArgument and return the builder so far. + * Builds a SearchArgument for a Filter by first trimming the non-convertible nodes, and then + * only building the remaining convertible nodes. + * + * Doing the conversion in this way avoids the computational complexity problems introduced by + * checking whether a node is convertible while building it. The approach implemented here has + * complexity that's linear in the size of the Filter tree - O(number of Filter nodes) - we run + * a single pass over the tree to trim it, and then another pass on the trimmed tree to convert + * the remaining nodes. + * + * The alternative approach of checking-while-building can (and did) result + * in exponential complexity in the height of the tree, causing perf problems with Filters with + * as few as ~35 nodes if they were skewed. */ - private def buildSearchArgument( - dataTypeMap: Map[String, DataType], + private[sql] def buildSearchArgument( expression: Filter, builder: Builder): Option[Builder] = { -createBuilder(dataTypeMap, expression, builder, canPartialPushDownConjuncts = true) +val filteredExpression: Option[Filter] = performFilter( + expression, + canPartialPushDownConjuncts = true) +filteredExpression.foreach(updateBuilder(_, builder)) +filteredExpression.map(_ => builder) } - /** - * @param dataTypeMap a map from the attribute name to its data type. - * @param expression the input filter predicates. - * @param builder the input SearchArgument.Builder. - * @param canPartialPushDownConjuncts whether a subset of conjuncts of predicates can be pushed - *down safely. Pushing ONLY one side of AND down is safe to - *do at the top level or none of its ancestors is NOT and OR. - * @return the builder so far. - */ - private def createBuilder( - dataTypeMap: Map[String, DataType], - expression: Filter, - builder: Builder, - canPartialPushDownConjuncts: Boolean): Option[Builder] = { + sealed trait ActionType + case class TrimUnconvertibleFilters(canPartialPushDownConjuncts: Boolean) extends ActionType + case class BuildSearchArgument(builder: Builder) extends ActionType + + // The performAction method can run both the filtering and building operations for a given + // node - we signify which one we want with the `actionType` parameter. + // + // There are a couple of benefits to coupling the two operations like this: + // 1. All the logic for a given predicate is grouped logically in the same place. You don't + // have to scroll across the whole file to see what the filter action for an And is while + // you're looking at the build action. + // 2. It's much easier to keep the implementations of the two operations up-to-date with + // each other. If the `filter` and `build` operations are implemented as separate case-matches + // in different methods, it's very easy to change one without appropriately updating the + // other. For example, if we add a new supported node type to `filter`, it would be very + // easy to forget to update `build` to support it too, thus leading to conversion errors. + // + // Doing things this way does have some annoying side effects: + // - We need to return an `Either`, with one action type always returning a Left and the other + // always returning a Right. + // - We always need to pass the canPartialPushDownConjuncts parameter even though the build Review comment: this one is gone. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion URL: https://github.com/apache/spark/pull/24068#discussion_r288217387 ## File path: sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala ## @@ -63,55 +64,28 @@ private[sql] object OrcFilters extends OrcFiltersBase { */ def createFilter(schema: StructType, filters: Seq[Filter]): Option[SearchArgument] = { val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap +val orcFilterConverter = new OrcFilterConverter(dataTypeMap) for { // Combines all convertible filters using `And` to produce a single conjunction conjunction <- buildTree(convertibleFilters(schema, dataTypeMap, filters)) // Then tries to build a single ORC `SearchArgument` for the conjunction predicate - builder <- buildSearchArgument(dataTypeMap, conjunction, newBuilder) + builder <- orcFilterConverter.buildSearchArgument(conjunction, newBuilder) } yield builder.build() } def convertibleFilters( schema: StructType, dataTypeMap: Map[String, DataType], filters: Seq[Filter]): Seq[Filter] = { -import org.apache.spark.sql.sources._ +val orcFilterConverter = new OrcFilterConverter(dataTypeMap) +filters.flatMap(orcFilterConverter.trimUnconvertibleFilters) + } -def convertibleFiltersHelper( -filter: Filter, -canPartialPushDown: Boolean): Option[Filter] = filter match { - case And(left, right) => -val leftResultOptional = convertibleFiltersHelper(left, canPartialPushDown) -val rightResultOptional = convertibleFiltersHelper(right, canPartialPushDown) -(leftResultOptional, rightResultOptional) match { - case (Some(leftResult), Some(rightResult)) => Some(And(leftResult, rightResult)) - case (Some(leftResult), None) if canPartialPushDown => Some(leftResult) - case (None, Some(rightResult)) if canPartialPushDown => Some(rightResult) - case _ => None -} +} - case Or(left, right) => -val leftResultOptional = convertibleFiltersHelper(left, canPartialPushDown) -val rightResultOptional = convertibleFiltersHelper(right, canPartialPushDown) -if (leftResultOptional.isEmpty || rightResultOptional.isEmpty) { - None -} else { - Some(Or(leftResultOptional.get, rightResultOptional.get)) -} - case Not(pred) => -val resultOptional = convertibleFiltersHelper(pred, canPartialPushDown = false) -resultOptional.map(Not) - case other => -if (buildSearchArgument(dataTypeMap, other, newBuilder()).isDefined) { - Some(other) -} else { - None -} -} -filters.flatMap { filter => - convertibleFiltersHelper(filter, true) -} - } +private class OrcFilterConverter( +val dataTypeMap: Map[String, DataType] Review comment: nit: keep class definition in one line if possible This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion URL: https://github.com/apache/spark/pull/24068#discussion_r288217090 ## File path: sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala ## @@ -63,55 +64,28 @@ private[sql] object OrcFilters extends OrcFiltersBase { */ def createFilter(schema: StructType, filters: Seq[Filter]): Option[SearchArgument] = { val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap +val orcFilterConverter = new OrcFilterConverter(dataTypeMap) for { // Combines all convertible filters using `And` to produce a single conjunction conjunction <- buildTree(convertibleFilters(schema, dataTypeMap, filters)) Review comment: why not call `buildTree(filters.flatMap(orcFilterConverter.trimUnconvertibleFilters))` here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion URL: https://github.com/apache/spark/pull/24068#discussion_r287705558 ## File path: sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala ## @@ -150,138 +118,232 @@ private[sql] object OrcFilters extends OrcFiltersBase { dataTypeMap: Map[String, DataType], expression: Filter, builder: Builder): Option[Builder] = { -createBuilder(dataTypeMap, expression, builder, canPartialPushDownConjuncts = true) +filterAndBuild(dataTypeMap, expression, builder) } + sealed trait ActionType + case object FilterAction extends ActionType + case object BuildAction extends ActionType + /** - * @param dataTypeMap a map from the attribute name to its data type. - * @param expression the input filter predicates. - * @param builder the input SearchArgument.Builder. - * @param canPartialPushDownConjuncts whether a subset of conjuncts of predicates can be pushed - *down safely. Pushing ONLY one side of AND down is safe to - *do at the top level or none of its ancestors is NOT and OR. - * @return the builder so far. + * Builds a SearchArgument for a Filter by first trimming the non-convertible nodes, and then + * only building the remaining convertible nodes. + * + * Doing the conversion in this way avoids the computational complexity problems introduced by + * checking whether a node is convertible while building it. The approach implemented here has + * complexity that's linear in the size of the Filter tree - O(number of Filter nodes) - we run + * a single pass over the tree to trim it, and then another pass on the trimmed tree to convert + * the remaining nodes. + * + * The alternative approach of checking-while-building can (and did) result + * in exponential complexity in the height of the tree, causing perf problems with Filters with + * as few as ~35 nodes if they were skewed. */ - private def createBuilder( + private def filterAndBuild( dataTypeMap: Map[String, DataType], expression: Filter, - builder: Builder, - canPartialPushDownConjuncts: Boolean): Option[Builder] = { + builder: Builder + ): Option[Builder] = { def getType(attribute: String): PredicateLeaf.Type = getPredicateLeafType(dataTypeMap(attribute)) import org.apache.spark.sql.sources._ -expression match { - case And(left, right) => -// At here, it is not safe to just convert one side and remove the other side -// if we do not understand what the parent filters are. -// -// Here is an example used to explain the reason. -// Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to -// convert b in ('1'). If we only convert a = 2, we will end up with a filter -// NOT(a = 2), which will generate wrong results. -// -// Pushing one side of AND down is only safe to do at the top level or in the child -// AND before hitting NOT or OR conditions, and in this case, the unsupported predicate -// can be safely removed. -val leftBuilderOption = - createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts) -val rightBuilderOption = - createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts) -(leftBuilderOption, rightBuilderOption) match { - case (Some(_), Some(_)) => -for { - lhs <- createBuilder(dataTypeMap, left, -builder.startAnd(), canPartialPushDownConjuncts) - rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts) -} yield rhs.end() - - case (Some(_), None) if canPartialPushDownConjuncts => -createBuilder(dataTypeMap, left, builder, canPartialPushDownConjuncts) +// The performAction method can run both the filtering and building operations for a given +// node - we signify which one we want with the `actionType` parameter. +// +// There are a couple of benefits to coupling the two operations like this: +// 1. All the logic for a given predicate is grouped logically in the same place. You don't +// have to scroll across the whole file to see what the filter action for an And is while +// you're looking at the build action. +// 2. It's much easier to keep the implementations of the two operations up-to-date with +// each other. If the `filter` and `build` operations are implemented as separate case-matches +// in different methods, it's very easy to change one without appropriately updating the +// other. For example, if we add a new supported node type to `filter`, it would be very +
[GitHub] [spark] cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion URL: https://github.com/apache/spark/pull/24068#discussion_r287693171 ## File path: sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala ## @@ -150,138 +118,232 @@ private[sql] object OrcFilters extends OrcFiltersBase { dataTypeMap: Map[String, DataType], expression: Filter, builder: Builder): Option[Builder] = { -createBuilder(dataTypeMap, expression, builder, canPartialPushDownConjuncts = true) +filterAndBuild(dataTypeMap, expression, builder) } + sealed trait ActionType + case object FilterAction extends ActionType + case object BuildAction extends ActionType + /** - * @param dataTypeMap a map from the attribute name to its data type. - * @param expression the input filter predicates. - * @param builder the input SearchArgument.Builder. - * @param canPartialPushDownConjuncts whether a subset of conjuncts of predicates can be pushed - *down safely. Pushing ONLY one side of AND down is safe to - *do at the top level or none of its ancestors is NOT and OR. - * @return the builder so far. + * Builds a SearchArgument for a Filter by first trimming the non-convertible nodes, and then + * only building the remaining convertible nodes. + * + * Doing the conversion in this way avoids the computational complexity problems introduced by + * checking whether a node is convertible while building it. The approach implemented here has + * complexity that's linear in the size of the Filter tree - O(number of Filter nodes) - we run + * a single pass over the tree to trim it, and then another pass on the trimmed tree to convert + * the remaining nodes. + * + * The alternative approach of checking-while-building can (and did) result + * in exponential complexity in the height of the tree, causing perf problems with Filters with + * as few as ~35 nodes if they were skewed. */ - private def createBuilder( + private def filterAndBuild( dataTypeMap: Map[String, DataType], expression: Filter, - builder: Builder, - canPartialPushDownConjuncts: Boolean): Option[Builder] = { + builder: Builder + ): Option[Builder] = { def getType(attribute: String): PredicateLeaf.Type = getPredicateLeafType(dataTypeMap(attribute)) import org.apache.spark.sql.sources._ -expression match { - case And(left, right) => -// At here, it is not safe to just convert one side and remove the other side -// if we do not understand what the parent filters are. -// -// Here is an example used to explain the reason. -// Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to -// convert b in ('1'). If we only convert a = 2, we will end up with a filter -// NOT(a = 2), which will generate wrong results. -// -// Pushing one side of AND down is only safe to do at the top level or in the child -// AND before hitting NOT or OR conditions, and in this case, the unsupported predicate -// can be safely removed. -val leftBuilderOption = - createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts) -val rightBuilderOption = - createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts) -(leftBuilderOption, rightBuilderOption) match { - case (Some(_), Some(_)) => -for { - lhs <- createBuilder(dataTypeMap, left, -builder.startAnd(), canPartialPushDownConjuncts) - rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts) -} yield rhs.end() - - case (Some(_), None) if canPartialPushDownConjuncts => -createBuilder(dataTypeMap, left, builder, canPartialPushDownConjuncts) +// The performAction method can run both the filtering and building operations for a given +// node - we signify which one we want with the `actionType` parameter. +// +// There are a couple of benefits to coupling the two operations like this: +// 1. All the logic for a given predicate is grouped logically in the same place. You don't +// have to scroll across the whole file to see what the filter action for an And is while +// you're looking at the build action. +// 2. It's much easier to keep the implementations of the two operations up-to-date with +// each other. If the `filter` and `build` operations are implemented as separate case-matches +// in different methods, it's very easy to change one without appropriately updating the +// other. For example, if we add a new supported node type to `filter`, it would be very +
[GitHub] [spark] cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion URL: https://github.com/apache/spark/pull/24068#discussion_r287692725 ## File path: sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala ## @@ -150,138 +118,232 @@ private[sql] object OrcFilters extends OrcFiltersBase { dataTypeMap: Map[String, DataType], expression: Filter, builder: Builder): Option[Builder] = { -createBuilder(dataTypeMap, expression, builder, canPartialPushDownConjuncts = true) +filterAndBuild(dataTypeMap, expression, builder) } + sealed trait ActionType + case object FilterAction extends ActionType + case object BuildAction extends ActionType Review comment: nit: I think `TrimUnconvertibleFilters` and `BuildSearchArgument` are better names. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion URL: https://github.com/apache/spark/pull/24068#discussion_r287691857 ## File path: sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala ## @@ -150,138 +118,232 @@ private[sql] object OrcFilters extends OrcFiltersBase { dataTypeMap: Map[String, DataType], expression: Filter, builder: Builder): Option[Builder] = { -createBuilder(dataTypeMap, expression, builder, canPartialPushDownConjuncts = true) +filterAndBuild(dataTypeMap, expression, builder) } + sealed trait ActionType + case object FilterAction extends ActionType + case object BuildAction extends ActionType + /** - * @param dataTypeMap a map from the attribute name to its data type. - * @param expression the input filter predicates. - * @param builder the input SearchArgument.Builder. - * @param canPartialPushDownConjuncts whether a subset of conjuncts of predicates can be pushed - *down safely. Pushing ONLY one side of AND down is safe to - *do at the top level or none of its ancestors is NOT and OR. - * @return the builder so far. + * Builds a SearchArgument for a Filter by first trimming the non-convertible nodes, and then + * only building the remaining convertible nodes. + * + * Doing the conversion in this way avoids the computational complexity problems introduced by + * checking whether a node is convertible while building it. The approach implemented here has + * complexity that's linear in the size of the Filter tree - O(number of Filter nodes) - we run + * a single pass over the tree to trim it, and then another pass on the trimmed tree to convert + * the remaining nodes. + * + * The alternative approach of checking-while-building can (and did) result + * in exponential complexity in the height of the tree, causing perf problems with Filters with + * as few as ~35 nodes if they were skewed. */ - private def createBuilder( + private def filterAndBuild( dataTypeMap: Map[String, DataType], expression: Filter, - builder: Builder, - canPartialPushDownConjuncts: Boolean): Option[Builder] = { + builder: Builder + ): Option[Builder] = { Review comment: nit: let's keep the old code style ``` def( para1: T, para2: U): R ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion URL: https://github.com/apache/spark/pull/24068#discussion_r287690701 ## File path: sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala ## @@ -29,8 +29,8 @@ import org.apache.spark.sql.types._ /** * Helper object for building ORC `SearchArgument`s, which are used for ORC predicate push-down. * - * Due to limitation of ORC `SearchArgument` builder, we had to end up with a pretty weird double- - * checking pattern when converting `And`/`Or`/`Not` filters. + * Due to limitation of ORC `SearchArgument` builder, we had to implement separate checking and Review comment: we need to update the comment, it's not separated code paths now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion URL: https://github.com/apache/spark/pull/24068#discussion_r287690701 ## File path: sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala ## @@ -29,8 +29,8 @@ import org.apache.spark.sql.types._ /** * Helper object for building ORC `SearchArgument`s, which are used for ORC predicate push-down. * - * Due to limitation of ORC `SearchArgument` builder, we had to end up with a pretty weird double- - * checking pattern when converting `And`/`Or`/`Not` filters. + * Due to limitation of ORC `SearchArgument` builder, we had to implement separate checking and Review comment: we need to update the comment, it's not separated code paths now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion URL: https://github.com/apache/spark/pull/24068#discussion_r284113800 ## File path: sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala ## @@ -153,92 +189,143 @@ private[sql] object OrcFilters extends OrcFiltersBase { // Pushing one side of AND down is only safe to do at the top level or in the child // AND before hitting NOT or OR conditions, and in this case, the unsupported predicate // can be safely removed. -val leftBuilderOption = - createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts) -val rightBuilderOption = - createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts) -(leftBuilderOption, rightBuilderOption) match { - case (Some(_), Some(_)) => -for { - lhs <- createBuilder(dataTypeMap, left, -builder.startAnd(), canPartialPushDownConjuncts) - rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts) -} yield rhs.end() - - case (Some(_), None) if canPartialPushDownConjuncts => -createBuilder(dataTypeMap, left, builder, canPartialPushDownConjuncts) - - case (None, Some(_)) if canPartialPushDownConjuncts => -createBuilder(dataTypeMap, right, builder, canPartialPushDownConjuncts) - +val lhs = + trimNonConvertibleSubtreesImpl(dataTypeMap, left, canPartialPushDownConjuncts = true) +val rhs = + trimNonConvertibleSubtreesImpl(dataTypeMap, right, canPartialPushDownConjuncts = true) +(lhs, rhs) match { + case (Some(l), Some(r)) => Some(And(l, r)) + case (Some(_), None) if canPartialPushDownConjuncts => lhs + case (None, Some(_)) if canPartialPushDownConjuncts => rhs case _ => None } case Or(left, right) => for { - _ <- createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts = false) - _ <- createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts = false) - lhs <- createBuilder(dataTypeMap, left, -builder.startOr(), canPartialPushDownConjuncts = false) - rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts = false) -} yield rhs.end() + lhs: Filter <- +trimNonConvertibleSubtreesImpl(dataTypeMap, left, canPartialPushDownConjuncts = false) + rhs: Filter <- +trimNonConvertibleSubtreesImpl(dataTypeMap, right, canPartialPushDownConjuncts = false) +} yield Or(lhs, rhs) case Not(child) => -for { - _ <- createBuilder(dataTypeMap, child, newBuilder, canPartialPushDownConjuncts = false) - negate <- createBuilder(dataTypeMap, -child, builder.startNot(), canPartialPushDownConjuncts = false) -} yield negate.end() +val filteredSubtree = + trimNonConvertibleSubtreesImpl(dataTypeMap, child, canPartialPushDownConjuncts = false) +filteredSubtree.map(Not(_)) + + case EqualTo(attribute, value) if isSearchableType(dataTypeMap(attribute)) => Some(expression) + case EqualNullSafe(attribute, value) if isSearchableType(dataTypeMap(attribute)) => +Some(expression) + case LessThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => +Some(expression) + case LessThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => +Some(expression) + case GreaterThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => +Some(expression) + case GreaterThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => +Some(expression) + case IsNull(attribute) if isSearchableType(dataTypeMap(attribute)) => Some(expression) + case IsNotNull(attribute) if isSearchableType(dataTypeMap(attribute)) => Some(expression) + case In(attribute, values) if isSearchableType(dataTypeMap(attribute)) => Some(expression) + + case _ => None +} + } + + /** + * Build a SearchArgument for a Filter that has already been trimmed so as to only contain + * expressions that are convertible to a `SearchArgument`. This allows for a more efficient and + * more readable implementation since there's no need to check every node before converting it. + * + * NOTE: If you change the set of supported `Filter` types here, you need to modify + * `trimNonConvertibleSubtreesImpl` accordingly! + * + * @param dataTypeMap a map from the attribute name to its data type. + * @param expression the trimmed input filter predicates. + * @param builder the builder so far. + * @return + */ + private def
[GitHub] [spark] cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion URL: https://github.com/apache/spark/pull/24068#discussion_r284113800 ## File path: sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala ## @@ -153,92 +189,143 @@ private[sql] object OrcFilters extends OrcFiltersBase { // Pushing one side of AND down is only safe to do at the top level or in the child // AND before hitting NOT or OR conditions, and in this case, the unsupported predicate // can be safely removed. -val leftBuilderOption = - createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts) -val rightBuilderOption = - createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts) -(leftBuilderOption, rightBuilderOption) match { - case (Some(_), Some(_)) => -for { - lhs <- createBuilder(dataTypeMap, left, -builder.startAnd(), canPartialPushDownConjuncts) - rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts) -} yield rhs.end() - - case (Some(_), None) if canPartialPushDownConjuncts => -createBuilder(dataTypeMap, left, builder, canPartialPushDownConjuncts) - - case (None, Some(_)) if canPartialPushDownConjuncts => -createBuilder(dataTypeMap, right, builder, canPartialPushDownConjuncts) - +val lhs = + trimNonConvertibleSubtreesImpl(dataTypeMap, left, canPartialPushDownConjuncts = true) +val rhs = + trimNonConvertibleSubtreesImpl(dataTypeMap, right, canPartialPushDownConjuncts = true) +(lhs, rhs) match { + case (Some(l), Some(r)) => Some(And(l, r)) + case (Some(_), None) if canPartialPushDownConjuncts => lhs + case (None, Some(_)) if canPartialPushDownConjuncts => rhs case _ => None } case Or(left, right) => for { - _ <- createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts = false) - _ <- createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts = false) - lhs <- createBuilder(dataTypeMap, left, -builder.startOr(), canPartialPushDownConjuncts = false) - rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts = false) -} yield rhs.end() + lhs: Filter <- +trimNonConvertibleSubtreesImpl(dataTypeMap, left, canPartialPushDownConjuncts = false) + rhs: Filter <- +trimNonConvertibleSubtreesImpl(dataTypeMap, right, canPartialPushDownConjuncts = false) +} yield Or(lhs, rhs) case Not(child) => -for { - _ <- createBuilder(dataTypeMap, child, newBuilder, canPartialPushDownConjuncts = false) - negate <- createBuilder(dataTypeMap, -child, builder.startNot(), canPartialPushDownConjuncts = false) -} yield negate.end() +val filteredSubtree = + trimNonConvertibleSubtreesImpl(dataTypeMap, child, canPartialPushDownConjuncts = false) +filteredSubtree.map(Not(_)) + + case EqualTo(attribute, value) if isSearchableType(dataTypeMap(attribute)) => Some(expression) + case EqualNullSafe(attribute, value) if isSearchableType(dataTypeMap(attribute)) => +Some(expression) + case LessThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => +Some(expression) + case LessThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => +Some(expression) + case GreaterThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => +Some(expression) + case GreaterThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => +Some(expression) + case IsNull(attribute) if isSearchableType(dataTypeMap(attribute)) => Some(expression) + case IsNotNull(attribute) if isSearchableType(dataTypeMap(attribute)) => Some(expression) + case In(attribute, values) if isSearchableType(dataTypeMap(attribute)) => Some(expression) + + case _ => None +} + } + + /** + * Build a SearchArgument for a Filter that has already been trimmed so as to only contain + * expressions that are convertible to a `SearchArgument`. This allows for a more efficient and + * more readable implementation since there's no need to check every node before converting it. + * + * NOTE: If you change the set of supported `Filter` types here, you need to modify + * `trimNonConvertibleSubtreesImpl` accordingly! + * + * @param dataTypeMap a map from the attribute name to its data type. + * @param expression the trimmed input filter predicates. + * @param builder the builder so far. + * @return + */ + private def
[GitHub] [spark] cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion URL: https://github.com/apache/spark/pull/24068#discussion_r282406621 ## File path: sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala ## @@ -153,92 +189,143 @@ private[sql] object OrcFilters extends OrcFiltersBase { // Pushing one side of AND down is only safe to do at the top level or in the child // AND before hitting NOT or OR conditions, and in this case, the unsupported predicate // can be safely removed. -val leftBuilderOption = - createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts) -val rightBuilderOption = - createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts) -(leftBuilderOption, rightBuilderOption) match { - case (Some(_), Some(_)) => -for { - lhs <- createBuilder(dataTypeMap, left, -builder.startAnd(), canPartialPushDownConjuncts) - rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts) -} yield rhs.end() - - case (Some(_), None) if canPartialPushDownConjuncts => -createBuilder(dataTypeMap, left, builder, canPartialPushDownConjuncts) - - case (None, Some(_)) if canPartialPushDownConjuncts => -createBuilder(dataTypeMap, right, builder, canPartialPushDownConjuncts) - +val lhs = + trimNonConvertibleSubtreesImpl(dataTypeMap, left, canPartialPushDownConjuncts = true) +val rhs = + trimNonConvertibleSubtreesImpl(dataTypeMap, right, canPartialPushDownConjuncts = true) +(lhs, rhs) match { + case (Some(l), Some(r)) => Some(And(l, r)) + case (Some(_), None) if canPartialPushDownConjuncts => lhs + case (None, Some(_)) if canPartialPushDownConjuncts => rhs case _ => None } case Or(left, right) => for { - _ <- createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts = false) - _ <- createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts = false) - lhs <- createBuilder(dataTypeMap, left, -builder.startOr(), canPartialPushDownConjuncts = false) - rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts = false) -} yield rhs.end() + lhs: Filter <- +trimNonConvertibleSubtreesImpl(dataTypeMap, left, canPartialPushDownConjuncts = false) + rhs: Filter <- +trimNonConvertibleSubtreesImpl(dataTypeMap, right, canPartialPushDownConjuncts = false) +} yield Or(lhs, rhs) case Not(child) => -for { - _ <- createBuilder(dataTypeMap, child, newBuilder, canPartialPushDownConjuncts = false) - negate <- createBuilder(dataTypeMap, -child, builder.startNot(), canPartialPushDownConjuncts = false) -} yield negate.end() +val filteredSubtree = + trimNonConvertibleSubtreesImpl(dataTypeMap, child, canPartialPushDownConjuncts = false) +filteredSubtree.map(Not(_)) + + case EqualTo(attribute, value) if isSearchableType(dataTypeMap(attribute)) => Some(expression) + case EqualNullSafe(attribute, value) if isSearchableType(dataTypeMap(attribute)) => +Some(expression) + case LessThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => +Some(expression) + case LessThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => +Some(expression) + case GreaterThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => +Some(expression) + case GreaterThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => +Some(expression) + case IsNull(attribute) if isSearchableType(dataTypeMap(attribute)) => Some(expression) + case IsNotNull(attribute) if isSearchableType(dataTypeMap(attribute)) => Some(expression) + case In(attribute, values) if isSearchableType(dataTypeMap(attribute)) => Some(expression) + + case _ => None +} + } + + /** + * Build a SearchArgument for a Filter that has already been trimmed so as to only contain + * expressions that are convertible to a `SearchArgument`. This allows for a more efficient and + * more readable implementation since there's no need to check every node before converting it. + * + * NOTE: If you change the set of supported `Filter` types here, you need to modify + * `trimNonConvertibleSubtreesImpl` accordingly! + * + * @param dataTypeMap a map from the attribute name to its data type. + * @param expression the trimmed input filter predicates. + * @param builder the builder so far. + * @return + */ + private def
[GitHub] [spark] cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion URL: https://github.com/apache/spark/pull/24068#discussion_r282407053 ## File path: sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala ## @@ -153,92 +189,143 @@ private[sql] object OrcFilters extends OrcFiltersBase { // Pushing one side of AND down is only safe to do at the top level or in the child // AND before hitting NOT or OR conditions, and in this case, the unsupported predicate // can be safely removed. -val leftBuilderOption = - createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts) -val rightBuilderOption = - createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts) -(leftBuilderOption, rightBuilderOption) match { - case (Some(_), Some(_)) => -for { - lhs <- createBuilder(dataTypeMap, left, -builder.startAnd(), canPartialPushDownConjuncts) - rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts) -} yield rhs.end() - - case (Some(_), None) if canPartialPushDownConjuncts => -createBuilder(dataTypeMap, left, builder, canPartialPushDownConjuncts) - - case (None, Some(_)) if canPartialPushDownConjuncts => -createBuilder(dataTypeMap, right, builder, canPartialPushDownConjuncts) - +val lhs = + trimNonConvertibleSubtreesImpl(dataTypeMap, left, canPartialPushDownConjuncts = true) +val rhs = + trimNonConvertibleSubtreesImpl(dataTypeMap, right, canPartialPushDownConjuncts = true) +(lhs, rhs) match { + case (Some(l), Some(r)) => Some(And(l, r)) + case (Some(_), None) if canPartialPushDownConjuncts => lhs + case (None, Some(_)) if canPartialPushDownConjuncts => rhs case _ => None } case Or(left, right) => for { - _ <- createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts = false) - _ <- createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts = false) - lhs <- createBuilder(dataTypeMap, left, -builder.startOr(), canPartialPushDownConjuncts = false) - rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts = false) -} yield rhs.end() + lhs: Filter <- +trimNonConvertibleSubtreesImpl(dataTypeMap, left, canPartialPushDownConjuncts = false) + rhs: Filter <- +trimNonConvertibleSubtreesImpl(dataTypeMap, right, canPartialPushDownConjuncts = false) +} yield Or(lhs, rhs) case Not(child) => -for { - _ <- createBuilder(dataTypeMap, child, newBuilder, canPartialPushDownConjuncts = false) - negate <- createBuilder(dataTypeMap, -child, builder.startNot(), canPartialPushDownConjuncts = false) -} yield negate.end() +val filteredSubtree = + trimNonConvertibleSubtreesImpl(dataTypeMap, child, canPartialPushDownConjuncts = false) +filteredSubtree.map(Not(_)) + + case EqualTo(attribute, value) if isSearchableType(dataTypeMap(attribute)) => Some(expression) + case EqualNullSafe(attribute, value) if isSearchableType(dataTypeMap(attribute)) => +Some(expression) + case LessThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => +Some(expression) + case LessThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => +Some(expression) + case GreaterThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => +Some(expression) + case GreaterThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => +Some(expression) + case IsNull(attribute) if isSearchableType(dataTypeMap(attribute)) => Some(expression) + case IsNotNull(attribute) if isSearchableType(dataTypeMap(attribute)) => Some(expression) + case In(attribute, values) if isSearchableType(dataTypeMap(attribute)) => Some(expression) + + case _ => None +} + } + + /** + * Build a SearchArgument for a Filter that has already been trimmed so as to only contain + * expressions that are convertible to a `SearchArgument`. This allows for a more efficient and + * more readable implementation since there's no need to check every node before converting it. + * + * NOTE: If you change the set of supported `Filter` types here, you need to modify + * `trimNonConvertibleSubtreesImpl` accordingly! + * + * @param dataTypeMap a map from the attribute name to its data type. + * @param expression the trimmed input filter predicates. + * @param builder the builder so far. + * @return + */ + private def
[GitHub] [spark] cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion URL: https://github.com/apache/spark/pull/24068#discussion_r282403426 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala ## @@ -371,12 +403,63 @@ object FilterPushdownBenchmark extends BenchmarkBase with SQLHelper { withTempTable("orcTable", "parquetTable") { saveAsTable(df, dir) Seq(1, 250, 500).foreach { numFilter => -val whereExpr = (1 to numFilter).map(i => s"c$i = 0").mkString(" and ") +val whereExpr = (1 to numFilter).map(i => s"c$i = 0").mkString(" or ") Review comment: why change the existing benchmark? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion URL: https://github.com/apache/spark/pull/24068#discussion_r282043784 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala ## @@ -371,12 +403,64 @@ object FilterPushdownBenchmark extends BenchmarkBase with SQLHelper { withTempTable("orcTable", "parquetTable") { saveAsTable(df, dir) Seq(1, 250, 500).foreach { numFilter => -val whereExpr = (1 to numFilter).map(i => s"c$i = 0").mkString(" and ") +val whereExpr = (1 to numFilter).map(i => s"c$i = 0").mkString(" or ") // Note: InferFiltersFromConstraints will add more filters to this given filters filterPushDownBenchmark(numRows, s"Select 1 row with $numFilter filters", whereExpr) } } } } + +runBenchmark(s"Predicate conversion benchmark with unbalanced Expression") { + val numRows = 1 + val width = 2000 + + val columns = (1 to width).map(i => s"id c$i") + val df = spark.range(1).selectExpr(columns: _*) + Seq(25, 5000, 15000).foreach { numFilter => +val whereExpression = (1 to numFilter) + .map { +i => + EqualTo( +Literal(0), +AttributeReference( + s"c1", + IntegerType, + nullable = true)() + ).asInstanceOf[Expression] + } + .foldLeft[Expression](Literal.FalseLiteral)((x, y) => Or(x, y)) +val benchmark = new Benchmark(s"Select 1 row with $numFilter filters", + numRows, minNumIters = 5, output = output) +val name = s"Native ORC Vectorized (Pushdown)" +benchmark.addCase(name) { _ => + OrcFilters.createFilter(df.schema, +DataSourceStrategy.translateFilter(whereExpression).toSeq) +} +benchmark.run() + } +} + +runBenchmark(s"Pushdown benchmark with unbalanced Column") { Review comment: what's the difference between `Column` and `Expression` in this benchmark? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion URL: https://github.com/apache/spark/pull/24068#discussion_r282046361 ## File path: sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala ## @@ -153,92 +188,140 @@ private[sql] object OrcFilters extends OrcFiltersBase { // Pushing one side of AND down is only safe to do at the top level or in the child // AND before hitting NOT or OR conditions, and in this case, the unsupported predicate // can be safely removed. -val leftBuilderOption = - createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts) -val rightBuilderOption = - createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts) -(leftBuilderOption, rightBuilderOption) match { - case (Some(_), Some(_)) => -for { - lhs <- createBuilder(dataTypeMap, left, -builder.startAnd(), canPartialPushDownConjuncts) - rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts) -} yield rhs.end() - - case (Some(_), None) if canPartialPushDownConjuncts => -createBuilder(dataTypeMap, left, builder, canPartialPushDownConjuncts) - - case (None, Some(_)) if canPartialPushDownConjuncts => -createBuilder(dataTypeMap, right, builder, canPartialPushDownConjuncts) - +val lhs = + trimNonConvertibleSubtreesImpl(dataTypeMap, left, canPartialPushDownConjuncts = true) +val rhs = + trimNonConvertibleSubtreesImpl(dataTypeMap, right, canPartialPushDownConjuncts = true) +(lhs, rhs) match { + case (Some(l), Some(r)) => Some(And(l, r)) + case (Some(_), None) if canPartialPushDownConjuncts => lhs + case (None, Some(_)) if canPartialPushDownConjuncts => rhs case _ => None } case Or(left, right) => for { - _ <- createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts = false) - _ <- createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts = false) - lhs <- createBuilder(dataTypeMap, left, -builder.startOr(), canPartialPushDownConjuncts = false) - rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts = false) -} yield rhs.end() + lhs <- +trimNonConvertibleSubtreesImpl(dataTypeMap, left, canPartialPushDownConjuncts = false) + rhs <- +trimNonConvertibleSubtreesImpl(dataTypeMap, right, canPartialPushDownConjuncts = false) +} yield Or(lhs, rhs) case Not(child) => -for { - _ <- createBuilder(dataTypeMap, child, newBuilder, canPartialPushDownConjuncts = false) - negate <- createBuilder(dataTypeMap, -child, builder.startNot(), canPartialPushDownConjuncts = false) -} yield negate.end() +trimNonConvertibleSubtreesImpl(dataTypeMap, child, canPartialPushDownConjuncts = false) +.map(Not) // NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()` // call is mandatory. ORC `SearchArgument` builder requires that all leaf predicates must be // wrapped by a "parent" predicate (`And`, `Or`, or `Not`). + case EqualTo(attribute, value) if isSearchableType(dataTypeMap(attribute)) => Some(expression) + case EqualNullSafe(attribute, value) if isSearchableType(dataTypeMap(attribute)) => +Some(expression) + case LessThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => +Some(expression) + case LessThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => +Some(expression) + case GreaterThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => +Some(expression) + case GreaterThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => +Some(expression) + case IsNull(attribute) if isSearchableType(dataTypeMap(attribute)) => Some(expression) + case IsNotNull(attribute) if isSearchableType(dataTypeMap(attribute)) => Some(expression) + case In(attribute, values) if isSearchableType(dataTypeMap(attribute)) => Some(expression) + + case _ => None +} + } + + /** + * Build a SearchArgument for a Filter that has already been trimmed so as to only contain + * expressions that are convertible to a `SearchArgument`. This allows for a more efficient and + * more readable implementation since there's no need to check every node before converting it. + * + * NOTE: If you change the set of supported `Filter` types here, you need to modify + * `trimNonConvertibleSubtreesImpl` accordingly! + * + * @param
[GitHub] [spark] cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion URL: https://github.com/apache/spark/pull/24068#discussion_r282045908 ## File path: sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala ## @@ -153,92 +188,140 @@ private[sql] object OrcFilters extends OrcFiltersBase { // Pushing one side of AND down is only safe to do at the top level or in the child // AND before hitting NOT or OR conditions, and in this case, the unsupported predicate // can be safely removed. -val leftBuilderOption = - createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts) -val rightBuilderOption = - createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts) -(leftBuilderOption, rightBuilderOption) match { - case (Some(_), Some(_)) => -for { - lhs <- createBuilder(dataTypeMap, left, -builder.startAnd(), canPartialPushDownConjuncts) - rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts) -} yield rhs.end() - - case (Some(_), None) if canPartialPushDownConjuncts => -createBuilder(dataTypeMap, left, builder, canPartialPushDownConjuncts) - - case (None, Some(_)) if canPartialPushDownConjuncts => -createBuilder(dataTypeMap, right, builder, canPartialPushDownConjuncts) - +val lhs = + trimNonConvertibleSubtreesImpl(dataTypeMap, left, canPartialPushDownConjuncts = true) +val rhs = + trimNonConvertibleSubtreesImpl(dataTypeMap, right, canPartialPushDownConjuncts = true) +(lhs, rhs) match { + case (Some(l), Some(r)) => Some(And(l, r)) + case (Some(_), None) if canPartialPushDownConjuncts => lhs + case (None, Some(_)) if canPartialPushDownConjuncts => rhs case _ => None } case Or(left, right) => for { - _ <- createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts = false) - _ <- createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts = false) - lhs <- createBuilder(dataTypeMap, left, -builder.startOr(), canPartialPushDownConjuncts = false) - rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts = false) -} yield rhs.end() + lhs <- +trimNonConvertibleSubtreesImpl(dataTypeMap, left, canPartialPushDownConjuncts = false) + rhs <- +trimNonConvertibleSubtreesImpl(dataTypeMap, right, canPartialPushDownConjuncts = false) +} yield Or(lhs, rhs) case Not(child) => -for { - _ <- createBuilder(dataTypeMap, child, newBuilder, canPartialPushDownConjuncts = false) - negate <- createBuilder(dataTypeMap, -child, builder.startNot(), canPartialPushDownConjuncts = false) -} yield negate.end() +trimNonConvertibleSubtreesImpl(dataTypeMap, child, canPartialPushDownConjuncts = false) +.map(Not) // NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()` Review comment: need to update comment? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion URL: https://github.com/apache/spark/pull/24068#discussion_r282043084 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala ## @@ -288,7 +320,7 @@ object FilterPushdownBenchmark extends BenchmarkBase with SQLHelper { val filter = Range(0, count).map(r => scala.util.Random.nextInt(numRows * distribution / 100)) val whereExpr = s"value in(${filter.mkString(",")})" - val title = s"InSet -> InFilters (values count: $count, distribution: $distribution)" + val title = s"InSet -> InFilters (values count: $count, distribution: $distribution)" Review comment: unnecessary change? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion URL: https://github.com/apache/spark/pull/24068#discussion_r282045720 ## File path: sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala ## @@ -111,38 +111,73 @@ private[sql] object OrcFilters extends OrcFiltersBase { case _ => value } + /** + * A TrimmedFilter is a Filter that has been trimmed such that all the remaining nodes + * are convertible to ORC predicates. + * + * Since nothing in the underlying representation of the Filter is actually different from a + * regular Filter (the only difference is that we might remove some subtrees), this class is just + * a wrapper around a `Filter` value. The main benefits of using this class are readability + * and type safety (to signal that the respective functions only work with already trimmed + * filters). + * + * @param filter The underlying filter representation. + */ + private case class TrimmedFilter(filter: Filter) extends AnyVal Review comment: hmm I feel this one is not very useful. We just wrap the trimmer filter with it and then throw it away when building ORC filter. I think we can add assert when building ORC filter, to make sure the passed filter is already trimed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion URL: https://github.com/apache/spark/pull/24068#discussion_r267227486 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala ## @@ -274,3 +298,137 @@ private[sql] object OrcFilters { } } } + +/** + * This case class represents a position in a `Filter` tree, paired with information about Review comment: Seems like using reflection to instantiate `ExpressionTree` is easier than this approach. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion
cloud-fan commented on a change in pull request #24068: [SPARK-27105][SQL] Optimize away exponential complexity in ORC predicate conversion URL: https://github.com/apache/spark/pull/24068#discussion_r264999576 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala ## @@ -38,18 +41,18 @@ import org.apache.spark.sql.types._ * * The annoying part is that, `SearchArgument` builder methods like `startAnd()`, `startOr()`, and * `startNot()` mutate internal state of the builder instance. This forces us to translate all - * convertible filters with a single builder instance. However, before actually converting a filter, - * we've no idea whether it can be recognized by ORC or not. Thus, when an inconvertible filter is - * found, we may already end up with a builder whose internal state is inconsistent. + * convertible filters with a single builder instance. However, if we try to translate a filter + * before checking whether it can be converted or not, we may end up with a builder whose internal + * state is inconsistent in the case of an inconvertible filter. * * For example, to convert an `And` filter with builder `b`, we call `b.startAnd()` first, and then * try to convert its children. Say we convert `left` child successfully, but find that `right` * child is inconvertible. Alas, `b.startAnd()` call can't be rolled back, and `b` is inconsistent Review comment: can the builder be copied? Then we can simulate the rollback. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org