git commit: [SPARK-2042] Prevent unnecessary shuffle triggered by take()
Repository: spark Updated Branches: refs/heads/branch-1.0 684a93a72 -> cc004488d [SPARK-2042] Prevent unnecessary shuffle triggered by take() This PR implements `take()` on a `SchemaRDD` by inserting a logical limit that is followed by a `collect()`. This is also accompanied by adding a catalyst optimizer rule for collapsing adjacent limits. Doing so prevents an unnecessary shuffle that is sometimes triggered by `take()`. Author: Sameer Agarwal Closes #1048 from sameeragarwal/master and squashes the following commits: 3eeb848 [Sameer Agarwal] Fixing Tests 1b76ff1 [Sameer Agarwal] Deprecating limit(limitExpr: Expression) in v1.1.0 b723ac4 [Sameer Agarwal] Added limit folding tests a0ff7c4 [Sameer Agarwal] Adding catalyst rule to fold two consecutive limits 8d42d03 [Sameer Agarwal] Implement trigger() as limit() followed by collect() (cherry picked from commit 4107cce58c41160a0dc20339621eacdf8a8b1191) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cc004488 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cc004488 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cc004488 Branch: refs/heads/branch-1.0 Commit: cc004488d49e5fc431cb7bd3907faacca43d4a9e Parents: 684a93a Author: Sameer Agarwal Authored: Wed Jun 11 12:01:04 2014 -0700 Committer: Michael Armbrust Committed: Wed Jun 11 12:01:53 2014 -0700 -- .../apache/spark/sql/catalyst/dsl/package.scala | 2 + .../sql/catalyst/optimizer/Optimizer.scala | 13 .../catalyst/plans/logical/basicOperators.scala | 4 +- .../optimizer/CombiningLimitsSuite.scala| 71 .../scala/org/apache/spark/sql/SchemaRDD.scala | 12 +++- 5 files changed, 97 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cc004488/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 3cf163f..d177339 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -175,6 +175,8 @@ package object dsl { def where(condition: Expression) = Filter(condition, logicalPlan) +def limit(limitExpr: Expression) = Limit(limitExpr, logicalPlan) + def join( otherPlan: LogicalPlan, joinType: JoinType = Inner, http://git-wip-us.apache.org/repos/asf/spark/blob/cc004488/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index e41fd2d..28d1aa2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -29,6 +29,8 @@ import org.apache.spark.sql.catalyst.types._ object Optimizer extends RuleExecutor[LogicalPlan] { val batches = +Batch("Combine Limits", FixedPoint(100), + CombineLimits) :: Batch("ConstantFolding", FixedPoint(100), NullPropagation, ConstantFolding, @@ -362,3 +364,14 @@ object SimplifyCasts extends Rule[LogicalPlan] { case Cast(e, dataType) if e.dataType == dataType => e } } + +/** + * Combines two adjacent [[catalyst.plans.logical.Limit Limit]] operators into one, merging the + * expressions into one single expression. + */ +object CombineLimits extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case ll @ Limit(le, nl @ Limit(ne, grandChild)) => + Limit(If(LessThan(ne, le), ne, le), grandChild) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/cc004488/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index d3347b6..b777cf4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -135,9 +135,9 @@ case class Aggregate( def references = (groupingExpressions ++ aggregateExpressi
git commit: [SPARK-2042] Prevent unnecessary shuffle triggered by take()
Repository: spark Updated Branches: refs/heads/master 4d5c12aa1 -> 4107cce58 [SPARK-2042] Prevent unnecessary shuffle triggered by take() This PR implements `take()` on a `SchemaRDD` by inserting a logical limit that is followed by a `collect()`. This is also accompanied by adding a catalyst optimizer rule for collapsing adjacent limits. Doing so prevents an unnecessary shuffle that is sometimes triggered by `take()`. Author: Sameer Agarwal Closes #1048 from sameeragarwal/master and squashes the following commits: 3eeb848 [Sameer Agarwal] Fixing Tests 1b76ff1 [Sameer Agarwal] Deprecating limit(limitExpr: Expression) in v1.1.0 b723ac4 [Sameer Agarwal] Added limit folding tests a0ff7c4 [Sameer Agarwal] Adding catalyst rule to fold two consecutive limits 8d42d03 [Sameer Agarwal] Implement trigger() as limit() followed by collect() Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4107cce5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4107cce5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4107cce5 Branch: refs/heads/master Commit: 4107cce58c41160a0dc20339621eacdf8a8b1191 Parents: 4d5c12a Author: Sameer Agarwal Authored: Wed Jun 11 12:01:04 2014 -0700 Committer: Michael Armbrust Committed: Wed Jun 11 12:01:04 2014 -0700 -- .../apache/spark/sql/catalyst/dsl/package.scala | 2 + .../sql/catalyst/optimizer/Optimizer.scala | 13 .../catalyst/plans/logical/basicOperators.scala | 4 +- .../optimizer/CombiningLimitsSuite.scala| 71 .../scala/org/apache/spark/sql/SchemaRDD.scala | 12 +++- 5 files changed, 97 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4107cce5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 3cf163f..d177339 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -175,6 +175,8 @@ package object dsl { def where(condition: Expression) = Filter(condition, logicalPlan) +def limit(limitExpr: Expression) = Limit(limitExpr, logicalPlan) + def join( otherPlan: LogicalPlan, joinType: JoinType = Inner, http://git-wip-us.apache.org/repos/asf/spark/blob/4107cce5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index e41fd2d..28d1aa2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -29,6 +29,8 @@ import org.apache.spark.sql.catalyst.types._ object Optimizer extends RuleExecutor[LogicalPlan] { val batches = +Batch("Combine Limits", FixedPoint(100), + CombineLimits) :: Batch("ConstantFolding", FixedPoint(100), NullPropagation, ConstantFolding, @@ -362,3 +364,14 @@ object SimplifyCasts extends Rule[LogicalPlan] { case Cast(e, dataType) if e.dataType == dataType => e } } + +/** + * Combines two adjacent [[catalyst.plans.logical.Limit Limit]] operators into one, merging the + * expressions into one single expression. + */ +object CombineLimits extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case ll @ Limit(le, nl @ Limit(ne, grandChild)) => + Limit(If(LessThan(ne, le), ne, le), grandChild) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/4107cce5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index d3347b6..b777cf4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -135,9 +135,9 @@ case class Aggregate( def references = (groupingExpressions ++ aggregateExpressions).flatMap(_.references).toSet } -case class Limit(limit: Expression, child: LogicalPlan) extends UnaryNod