Repository: spark Updated Branches: refs/heads/master 3dc9ae2e1 -> c6aa356cd
[SPARK-13527][SQL] Prune Filters based on Constraints #### What changes were proposed in this pull request? Remove all the deterministic conditions in a [[Filter]] that are contained in the Child's Constraints. For example, the first query can be simplified to the second one. ```scala val queryWithUselessFilter = tr1 .where("tr1.a".attr > 10 || "tr1.c".attr < 10) .join(tr2.where('d.attr < 100), Inner, Some("tr1.a".attr === "tr2.a".attr)) .where( ("tr1.a".attr > 10 || "tr1.c".attr < 10) && 'd.attr < 100 && "tr2.a".attr === "tr1.a".attr) ``` ```scala val query = tr1 .where("tr1.a".attr > 10 || "tr1.c".attr < 10) .join(tr2.where('d.attr < 100), Inner, Some("tr1.a".attr === "tr2.a".attr)) ``` #### How was this patch tested? Six test cases are added. Author: gatorsmile <gatorsm...@gmail.com> Closes #11406 from gatorsmile/FilterRemoval. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c6aa356c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c6aa356c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c6aa356c Branch: refs/heads/master Commit: c6aa356cd831ea2d159568b699bd5b791f3d8f25 Parents: 3dc9ae2 Author: gatorsmile <gatorsm...@gmail.com> Authored: Wed Mar 9 12:50:55 2016 -0800 Committer: Michael Armbrust <mich...@databricks.com> Committed: Wed Mar 9 12:50:55 2016 -0800 ---------------------------------------------------------------------- .../sql/catalyst/optimizer/Optimizer.scala | 26 +++- .../optimizer/BooleanSimplificationSuite.scala | 2 +- .../catalyst/optimizer/PruneFiltersSuite.scala | 136 +++++++++++++++++++ .../catalyst/optimizer/SetOperationSuite.scala | 2 +- .../datasources/parquet/ParquetFilters.scala | 2 +- 5 files changed, 160 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c6aa356c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 586bf3d..650b4ee 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -86,7 +86,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { BooleanSimplification, SimplifyConditionals, RemoveDispensableExpressions, - SimplifyFilters, + PruneFilters, SimplifyCasts, SimplifyCaseConversionExpressions, EliminateSerialization) :: @@ -827,11 +827,12 @@ object CombineFilters extends Rule[LogicalPlan] { } /** - * Removes filters that can be evaluated trivially. This is done either by eliding the filter for - * cases where it will always evaluate to `true`, or substituting a dummy empty relation when the - * filter will always evaluate to `false`. + * Removes filters that can be evaluated trivially. This can be done through the following ways: + * 1) by eliding the filter for cases where it will always evaluate to `true`. + * 2) by substituting a dummy empty relation when the filter will always evaluate to `false`. + * 3) by eliminating the always-true conditions given the constraints on the child's output. */ -object SimplifyFilters extends Rule[LogicalPlan] { +object PruneFilters extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = plan transform { // If the filter condition always evaluate to true, remove the filter. case Filter(Literal(true, BooleanType), child) => child @@ -839,6 +840,21 @@ object SimplifyFilters extends Rule[LogicalPlan] { // replace the input with an empty relation. case Filter(Literal(null, _), child) => LocalRelation(child.output, data = Seq.empty) case Filter(Literal(false, BooleanType), child) => LocalRelation(child.output, data = Seq.empty) + // If any deterministic condition is guaranteed to be true given the constraints on the child's + // output, remove the condition + case f @ Filter(fc, p: LogicalPlan) => + val (prunedPredicates, remainingPredicates) = + splitConjunctivePredicates(fc).partition { cond => + cond.deterministic && p.constraints.contains(cond) + } + if (prunedPredicates.isEmpty) { + f + } else if (remainingPredicates.isEmpty) { + p + } else { + val newCond = remainingPredicates.reduce(And) + Filter(newCond, p) + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/c6aa356c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala index 3e52441..da43751 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala @@ -36,7 +36,7 @@ class BooleanSimplificationSuite extends PlanTest with PredicateHelper { NullPropagation, ConstantFolding, BooleanSimplification, - SimplifyFilters) :: Nil + PruneFilters) :: Nil } val testRelation = LocalRelation('a.int, 'b.int, 'c.int, 'd.string) http://git-wip-us.apache.org/repos/asf/spark/blob/c6aa356c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala new file mode 100644 index 0000000..0ee7cf9 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules._ + +class PruneFiltersSuite extends PlanTest { + + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = + Batch("Subqueries", Once, + EliminateSubqueryAliases) :: + Batch("Filter Pushdown and Pruning", Once, + CombineFilters, + PruneFilters, + PushPredicateThroughProject, + PushPredicateThroughJoin) :: Nil + } + + val testRelation = LocalRelation('a.int, 'b.int, 'c.int) + + test("Constraints of isNull + LeftOuter") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val query = x.where("x.b".attr.isNull).join(y, LeftOuter) + val queryWithUselessFilter = query.where("x.b".attr.isNull) + + val optimized = Optimize.execute(queryWithUselessFilter.analyze) + val correctAnswer = query.analyze + + comparePlans(optimized, correctAnswer) + } + + test("Constraints of unionall") { + val tr1 = LocalRelation('a.int, 'b.int, 'c.int) + val tr2 = LocalRelation('d.int, 'e.int, 'f.int) + val tr3 = LocalRelation('g.int, 'h.int, 'i.int) + + val query = + tr1.where('a.attr > 10) + .unionAll(tr2.where('d.attr > 10) + .unionAll(tr3.where('g.attr > 10))) + val queryWithUselessFilter = query.where('a.attr > 10) + + val optimized = Optimize.execute(queryWithUselessFilter.analyze) + val correctAnswer = query.analyze + + comparePlans(optimized, correctAnswer) + } + + test("Pruning multiple constraints in the same run") { + val tr1 = LocalRelation('a.int, 'b.int, 'c.int).subquery('tr1) + val tr2 = LocalRelation('a.int, 'd.int, 'e.int).subquery('tr2) + + val query = tr1 + .where("tr1.a".attr > 10 || "tr1.c".attr < 10) + .join(tr2.where('d.attr < 100), Inner, Some("tr1.a".attr === "tr2.a".attr)) + // different order of "tr2.a" and "tr1.a" + val queryWithUselessFilter = + query.where( + ("tr1.a".attr > 10 || "tr1.c".attr < 10) && + 'd.attr < 100 && + "tr2.a".attr === "tr1.a".attr) + + val optimized = Optimize.execute(queryWithUselessFilter.analyze) + val correctAnswer = query.analyze + + comparePlans(optimized, correctAnswer) + } + + test("Partial pruning") { + val tr1 = LocalRelation('a.int, 'b.int, 'c.int).subquery('tr1) + val tr2 = LocalRelation('a.int, 'd.int, 'e.int).subquery('tr2) + + // One of the filter condition does not exist in the constraints of its child + // Thus, the filter is not removed + val query = tr1 + .where("tr1.a".attr > 10) + .join(tr2.where('d.attr < 100), Inner, Some("tr1.a".attr === "tr2.d".attr)) + val queryWithExtraFilters = + query.where("tr1.a".attr > 10 && 'd.attr < 100 && "tr1.a".attr === "tr2.a".attr) + + val optimized = Optimize.execute(queryWithExtraFilters.analyze) + val correctAnswer = tr1 + .where("tr1.a".attr > 10) + .join(tr2.where('d.attr < 100), + Inner, + Some("tr1.a".attr === "tr2.a".attr && "tr1.a".attr === "tr2.d".attr)).analyze + + comparePlans(optimized, correctAnswer) + } + + test("No predicate is pruned") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val query = x.where("x.b".attr.isNull).join(y, LeftOuter) + val queryWithExtraFilters = query.where("x.b".attr.isNotNull) + + val optimized = Optimize.execute(queryWithExtraFilters.analyze) + val correctAnswer = + testRelation.where("b".attr.isNull).where("b".attr.isNotNull) + .join(testRelation, LeftOuter).analyze + + comparePlans(optimized, correctAnswer) + } + + test("Nondeterministic predicate is not pruned") { + val originalQuery = testRelation.where(Rand(10) > 5).select('a).where(Rand(10) > 5).analyze + val optimized = Optimize.execute(originalQuery) + val correctAnswer = testRelation.where(Rand(10) > 5).where(Rand(10) > 5).select('a).analyze + comparePlans(optimized, correctAnswer) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/c6aa356c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala index 50f3b51..b08cdc8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala @@ -32,7 +32,7 @@ class SetOperationSuite extends PlanTest { Batch("Union Pushdown", Once, CombineUnions, SetOperationPushDown, - SimplifyFilters) :: Nil + PruneFilters) :: Nil } val testRelation = LocalRelation('a.int, 'b.int, 'c.int) http://git-wip-us.apache.org/repos/asf/spark/blob/c6aa356c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 5a5cb5c..95afdc7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -234,7 +234,7 @@ private[sql] object ParquetFilters { // // For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` evaluate to `NULL`, // which can be casted to `false` implicitly. Please refer to the `eval` method of these - // operators and the `SimplifyFilters` rule for details. + // operators and the `PruneFilters` rule for details. // Hyukjin: // I added [[EqualNullSafe]] with [[org.apache.parquet.filter2.predicate.Operators.Eq]]. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org