git commit: [SPARK-2042] Prevent unnecessary shuffle triggered by take()

2014-06-11 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 684a93a72 -> cc004488d


[SPARK-2042] Prevent unnecessary shuffle triggered by take()

This PR implements `take()` on a `SchemaRDD` by inserting a logical limit that 
is followed by a `collect()`. This is also accompanied by adding a catalyst 
optimizer rule for collapsing adjacent limits. Doing so prevents an unnecessary 
shuffle that is sometimes triggered by `take()`.

Author: Sameer Agarwal 

Closes #1048 from sameeragarwal/master and squashes the following commits:

3eeb848 [Sameer Agarwal] Fixing Tests
1b76ff1 [Sameer Agarwal] Deprecating limit(limitExpr: Expression) in v1.1.0
b723ac4 [Sameer Agarwal] Added limit folding tests
a0ff7c4 [Sameer Agarwal] Adding catalyst rule to fold two consecutive limits
8d42d03 [Sameer Agarwal] Implement trigger() as limit() followed by collect()

(cherry picked from commit 4107cce58c41160a0dc20339621eacdf8a8b1191)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cc004488
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cc004488
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cc004488

Branch: refs/heads/branch-1.0
Commit: cc004488d49e5fc431cb7bd3907faacca43d4a9e
Parents: 684a93a
Author: Sameer Agarwal 
Authored: Wed Jun 11 12:01:04 2014 -0700
Committer: Michael Armbrust 
Committed: Wed Jun 11 12:01:53 2014 -0700

--
 .../apache/spark/sql/catalyst/dsl/package.scala |  2 +
 .../sql/catalyst/optimizer/Optimizer.scala  | 13 
 .../catalyst/plans/logical/basicOperators.scala |  4 +-
 .../optimizer/CombiningLimitsSuite.scala| 71 
 .../scala/org/apache/spark/sql/SchemaRDD.scala  | 12 +++-
 5 files changed, 97 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/cc004488/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 3cf163f..d177339 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -175,6 +175,8 @@ package object dsl {
 
 def where(condition: Expression) = Filter(condition, logicalPlan)
 
+def limit(limitExpr: Expression) = Limit(limitExpr, logicalPlan)
+
 def join(
 otherPlan: LogicalPlan,
 joinType: JoinType = Inner,

http://git-wip-us.apache.org/repos/asf/spark/blob/cc004488/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index e41fd2d..28d1aa2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -29,6 +29,8 @@ import org.apache.spark.sql.catalyst.types._
 
 object Optimizer extends RuleExecutor[LogicalPlan] {
   val batches =
+Batch("Combine Limits", FixedPoint(100),
+  CombineLimits) ::
 Batch("ConstantFolding", FixedPoint(100),
   NullPropagation,
   ConstantFolding,
@@ -362,3 +364,14 @@ object SimplifyCasts extends Rule[LogicalPlan] {
 case Cast(e, dataType) if e.dataType == dataType => e
   }
 }
+
+/**
+ * Combines two adjacent [[catalyst.plans.logical.Limit Limit]] operators into 
one, merging the
+ * expressions into one single expression.
+ */
+object CombineLimits extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+case ll @ Limit(le, nl @ Limit(ne, grandChild)) =>
+  Limit(If(LessThan(ne, le), ne, le), grandChild)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/cc004488/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index d3347b6..b777cf4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -135,9 +135,9 @@ case class Aggregate(
   def references = (groupingExpressions ++ 
aggregateExpressi

git commit: [SPARK-2042] Prevent unnecessary shuffle triggered by take()

2014-06-11 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 4d5c12aa1 -> 4107cce58


[SPARK-2042] Prevent unnecessary shuffle triggered by take()

This PR implements `take()` on a `SchemaRDD` by inserting a logical limit that 
is followed by a `collect()`. This is also accompanied by adding a catalyst 
optimizer rule for collapsing adjacent limits. Doing so prevents an unnecessary 
shuffle that is sometimes triggered by `take()`.

Author: Sameer Agarwal 

Closes #1048 from sameeragarwal/master and squashes the following commits:

3eeb848 [Sameer Agarwal] Fixing Tests
1b76ff1 [Sameer Agarwal] Deprecating limit(limitExpr: Expression) in v1.1.0
b723ac4 [Sameer Agarwal] Added limit folding tests
a0ff7c4 [Sameer Agarwal] Adding catalyst rule to fold two consecutive limits
8d42d03 [Sameer Agarwal] Implement trigger() as limit() followed by collect()


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4107cce5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4107cce5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4107cce5

Branch: refs/heads/master
Commit: 4107cce58c41160a0dc20339621eacdf8a8b1191
Parents: 4d5c12a
Author: Sameer Agarwal 
Authored: Wed Jun 11 12:01:04 2014 -0700
Committer: Michael Armbrust 
Committed: Wed Jun 11 12:01:04 2014 -0700

--
 .../apache/spark/sql/catalyst/dsl/package.scala |  2 +
 .../sql/catalyst/optimizer/Optimizer.scala  | 13 
 .../catalyst/plans/logical/basicOperators.scala |  4 +-
 .../optimizer/CombiningLimitsSuite.scala| 71 
 .../scala/org/apache/spark/sql/SchemaRDD.scala  | 12 +++-
 5 files changed, 97 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4107cce5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 3cf163f..d177339 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -175,6 +175,8 @@ package object dsl {
 
 def where(condition: Expression) = Filter(condition, logicalPlan)
 
+def limit(limitExpr: Expression) = Limit(limitExpr, logicalPlan)
+
 def join(
 otherPlan: LogicalPlan,
 joinType: JoinType = Inner,

http://git-wip-us.apache.org/repos/asf/spark/blob/4107cce5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index e41fd2d..28d1aa2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -29,6 +29,8 @@ import org.apache.spark.sql.catalyst.types._
 
 object Optimizer extends RuleExecutor[LogicalPlan] {
   val batches =
+Batch("Combine Limits", FixedPoint(100),
+  CombineLimits) ::
 Batch("ConstantFolding", FixedPoint(100),
   NullPropagation,
   ConstantFolding,
@@ -362,3 +364,14 @@ object SimplifyCasts extends Rule[LogicalPlan] {
 case Cast(e, dataType) if e.dataType == dataType => e
   }
 }
+
+/**
+ * Combines two adjacent [[catalyst.plans.logical.Limit Limit]] operators into 
one, merging the
+ * expressions into one single expression.
+ */
+object CombineLimits extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+case ll @ Limit(le, nl @ Limit(ne, grandChild)) =>
+  Limit(If(LessThan(ne, le), ne, le), grandChild)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4107cce5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index d3347b6..b777cf4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -135,9 +135,9 @@ case class Aggregate(
   def references = (groupingExpressions ++ 
aggregateExpressions).flatMap(_.references).toSet
 }
 
-case class Limit(limit: Expression, child: LogicalPlan) extends UnaryNod