Repository: spark Updated Branches: refs/heads/master 079733817 -> a0d8a61ab
[SPARK-7109] [SQL] Push down left side filter for left semi join Now in spark sql optimizer we only push down right side filter for left semi join, actually we can push down left side filter because left semi join is doing filter on left table essentially. Author: wangfei <wangf...@huawei.com> Author: scwf <wangf...@huawei.com> Closes #5677 from scwf/leftsemi and squashes the following commits: 483d205 [wangfei] update with master to fix compile issue 82df0e1 [wangfei] Merge branch 'master' of https://github.com/apache/spark into leftsemi d68a053 [wangfei] added apply 8f48a3d [scwf] added test ebadaa9 [wangfei] left filter push down for left semi join Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a0d8a61a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a0d8a61a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a0d8a61a Branch: refs/heads/master Commit: a0d8a61ab198b8c0ddbb3072bbe1d0e1dabc3e45 Parents: 0797338 Author: wangfei <wangf...@huawei.com> Authored: Thu Apr 30 18:18:54 2015 -0700 Committer: Michael Armbrust <mich...@databricks.com> Committed: Thu Apr 30 18:18:54 2015 -0700 ---------------------------------------------------------------------- .../sql/catalyst/optimizer/Optimizer.scala | 8 ++++---- .../optimizer/FilterPushdownSuite.scala | 21 +++++++++++++++++++- 2 files changed, 24 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/a0d8a61a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 63669e9..709f7d6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -571,7 +571,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right) joinType match { - case Inner => + case _ @ (Inner | LeftSemi) => // push down the single side only join filter for both sides sub queries val newLeft = leftJoinConditions. reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) @@ -579,7 +579,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) val newJoinCond = commonJoinCondition.reduceLeftOption(And) - Join(newLeft, newRight, Inner, newJoinCond) + Join(newLeft, newRight, joinType, newJoinCond) case RightOuter => // push down the left side only join filter for left side sub query val newLeft = leftJoinConditions. @@ -588,14 +588,14 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { val newJoinCond = (rightJoinConditions ++ commonJoinCondition).reduceLeftOption(And) Join(newLeft, newRight, RightOuter, newJoinCond) - case _ @ (LeftOuter | LeftSemi) => + case LeftOuter => // push down the right side only join filter for right sub query val newLeft = left val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) val newJoinCond = (leftJoinConditions ++ commonJoinCondition).reduceLeftOption(And) - Join(newLeft, newRight, joinType, newJoinCond) + Join(newLeft, newRight, LeftOuter, newJoinCond) case FullOuter => f } } http://git-wip-us.apache.org/repos/asf/spark/blob/a0d8a61a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 2ad7394..58d415d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries import org.apache.spark.sql.catalyst.expressions.{Count, Explode} import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.plans.{PlanTest, LeftOuter, RightOuter} +import org.apache.spark.sql.catalyst.plans.{LeftSemi, PlanTest, LeftOuter, RightOuter} import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.dsl.expressions._ @@ -43,6 +43,8 @@ class FilterPushdownSuite extends PlanTest { val testRelation = LocalRelation('a.int, 'b.int, 'c.int) + val testRelation1 = LocalRelation('d.int) + // This test already passes. test("eliminate subqueries") { val originalQuery = @@ -213,6 +215,23 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + test("joins: push down left semi join") { + val x = testRelation.subquery('x) + val y = testRelation1.subquery('y) + + val originalQuery = { + x.join(y, LeftSemi, Option("x.a".attr === "y.d".attr && "x.b".attr >= 1 && "y.d".attr >= 2)) + } + + val optimized = Optimize.execute(originalQuery.analyze) + val left = testRelation.where('b >= 1) + val right = testRelation1.where('d >= 2) + val correctAnswer = + left.join(right, LeftSemi, Option("a".attr === "d".attr)).analyze + + comparePlans(optimized, correctAnswer) + } + test("joins: push down left outer join #1") { val x = testRelation.subquery('x) val y = testRelation.subquery('y) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org