Repository: spark
Updated Branches:
  refs/heads/master 42f3abd52 -> 3da8df939


[SPARK-2366] [SQL] Add column pruning for the right side of LeftSemi join.

The right side of `LeftSemi` join needs columns only used in join condition.

Author: Takuya UESHIN <ues...@happy-camper.st>

Closes #1301 from ueshin/issues/SPARK-2366 and squashes the following commits:

7677a39 [Takuya UESHIN] Update comments.
786d3a0 [Takuya UESHIN] Rename method name.
e0957b1 [Takuya UESHIN] Add column pruning for the right side of LeftSemi join.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3da8df93
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3da8df93
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3da8df93

Branch: refs/heads/master
Commit: 3da8df939ec63064692ba64d9188aeea908b305c
Parents: 42f3abd
Author: Takuya UESHIN <ues...@happy-camper.st>
Authored: Sat Jul 5 11:48:08 2014 -0700
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Sat Jul 5 11:48:08 2014 -0700

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      | 28 ++++++++++++++------
 1 file changed, 20 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3da8df93/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index fb517e4..48ca31e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -52,6 +52,7 @@ object Optimizer extends RuleExecutor[LogicalPlan] {
  *  - Inserting Projections beneath the following operators:
  *   - Aggregate
  *   - Project <- Join
+ *   - LeftSemiJoin
  *  - Collapse adjacent projections, performing alias substitution.
  */
 object ColumnPruning extends Rule[LogicalPlan] {
@@ -62,19 +63,22 @@ object ColumnPruning extends Rule[LogicalPlan] {
 
     // Eliminate unneeded attributes from either side of a Join.
     case Project(projectList, Join(left, right, joinType, condition)) =>
-      // Collect the list of off references required either above or to 
evaluate the condition.
+      // Collect the list of all references required either above or to 
evaluate the condition.
       val allReferences: Set[Attribute] =
         projectList.flatMap(_.references).toSet ++ 
condition.map(_.references).getOrElse(Set.empty)
 
       /** Applies a projection only when the child is producing unnecessary 
attributes */
-      def prunedChild(c: LogicalPlan) =
-        if ((c.outputSet -- 
allReferences.filter(c.outputSet.contains)).nonEmpty) {
-          Project(allReferences.filter(c.outputSet.contains).toSeq, c)
-        } else {
-          c
-        }
+      def pruneJoinChild(c: LogicalPlan) = prunedChild(c, allReferences)
 
-      Project(projectList, Join(prunedChild(left), prunedChild(right), 
joinType, condition))
+      Project(projectList, Join(pruneJoinChild(left), pruneJoinChild(right), 
joinType, condition))
+
+    // Eliminate unneeded attributes from right side of a LeftSemiJoin.
+    case Join(left, right, LeftSemi, condition) =>
+      // Collect the list of all references required to evaluate the condition.
+      val allReferences: Set[Attribute] =
+        condition.map(_.references).getOrElse(Set.empty)
+
+      Join(left, prunedChild(right, allReferences), LeftSemi, condition)
 
     // Combine adjacent Projects.
     case Project(projectList1, Project(projectList2, child)) =>
@@ -97,6 +101,14 @@ object ColumnPruning extends Rule[LogicalPlan] {
     // Eliminate no-op Projects
     case Project(projectList, child) if child.output == projectList => child
   }
+
+  /** Applies a projection only when the child is producing unnecessary 
attributes */
+  private def prunedChild(c: LogicalPlan, allReferences: Set[Attribute]) =
+    if ((c.outputSet -- allReferences.filter(c.outputSet.contains)).nonEmpty) {
+      Project(allReferences.filter(c.outputSet.contains).toSeq, c)
+    } else {
+      c
+    }
 }
 
 /**

Reply via email to