Repository: spark Updated Branches: refs/heads/master 42f3abd52 -> 3da8df939
[SPARK-2366] [SQL] Add column pruning for the right side of LeftSemi join. The right side of `LeftSemi` join needs columns only used in join condition. Author: Takuya UESHIN <ues...@happy-camper.st> Closes #1301 from ueshin/issues/SPARK-2366 and squashes the following commits: 7677a39 [Takuya UESHIN] Update comments. 786d3a0 [Takuya UESHIN] Rename method name. e0957b1 [Takuya UESHIN] Add column pruning for the right side of LeftSemi join. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3da8df93 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3da8df93 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3da8df93 Branch: refs/heads/master Commit: 3da8df939ec63064692ba64d9188aeea908b305c Parents: 42f3abd Author: Takuya UESHIN <ues...@happy-camper.st> Authored: Sat Jul 5 11:48:08 2014 -0700 Committer: Michael Armbrust <mich...@databricks.com> Committed: Sat Jul 5 11:48:08 2014 -0700 ---------------------------------------------------------------------- .../sql/catalyst/optimizer/Optimizer.scala | 28 ++++++++++++++------ 1 file changed, 20 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/3da8df93/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index fb517e4..48ca31e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -52,6 +52,7 @@ object Optimizer extends RuleExecutor[LogicalPlan] { * - Inserting Projections beneath the following operators: * - Aggregate * - Project <- Join + * - LeftSemiJoin * - Collapse adjacent projections, performing alias substitution. */ object ColumnPruning extends Rule[LogicalPlan] { @@ -62,19 +63,22 @@ object ColumnPruning extends Rule[LogicalPlan] { // Eliminate unneeded attributes from either side of a Join. case Project(projectList, Join(left, right, joinType, condition)) => - // Collect the list of off references required either above or to evaluate the condition. + // Collect the list of all references required either above or to evaluate the condition. val allReferences: Set[Attribute] = projectList.flatMap(_.references).toSet ++ condition.map(_.references).getOrElse(Set.empty) /** Applies a projection only when the child is producing unnecessary attributes */ - def prunedChild(c: LogicalPlan) = - if ((c.outputSet -- allReferences.filter(c.outputSet.contains)).nonEmpty) { - Project(allReferences.filter(c.outputSet.contains).toSeq, c) - } else { - c - } + def pruneJoinChild(c: LogicalPlan) = prunedChild(c, allReferences) - Project(projectList, Join(prunedChild(left), prunedChild(right), joinType, condition)) + Project(projectList, Join(pruneJoinChild(left), pruneJoinChild(right), joinType, condition)) + + // Eliminate unneeded attributes from right side of a LeftSemiJoin. + case Join(left, right, LeftSemi, condition) => + // Collect the list of all references required to evaluate the condition. + val allReferences: Set[Attribute] = + condition.map(_.references).getOrElse(Set.empty) + + Join(left, prunedChild(right, allReferences), LeftSemi, condition) // Combine adjacent Projects. case Project(projectList1, Project(projectList2, child)) => @@ -97,6 +101,14 @@ object ColumnPruning extends Rule[LogicalPlan] { // Eliminate no-op Projects case Project(projectList, child) if child.output == projectList => child } + + /** Applies a projection only when the child is producing unnecessary attributes */ + private def prunedChild(c: LogicalPlan, allReferences: Set[Attribute]) = + if ((c.outputSet -- allReferences.filter(c.outputSet.contains)).nonEmpty) { + Project(allReferences.filter(c.outputSet.contains).toSeq, c) + } else { + c + } } /**