Repository: spark Updated Branches: refs/heads/branch-1.5 8d2624790 -> 6be945cef
[SPARK-9949] [SQL] Fix TakeOrderedAndProject's output. https://issues.apache.org/jira/browse/SPARK-9949 Author: Yin Huai <[email protected]> Closes #8179 from yhuai/SPARK-9949. (cherry picked from commit 932b24fd144232fb08184f0bd0a46369ecba164e) Signed-off-by: Reynold Xin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6be945ce Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6be945ce Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6be945ce Branch: refs/heads/branch-1.5 Commit: 6be945cef041c36aeda20c72b25b5659adea9c5c Parents: 8d26247 Author: Yin Huai <[email protected]> Authored: Fri Aug 14 17:35:17 2015 -0700 Committer: Reynold Xin <[email protected]> Committed: Fri Aug 14 17:35:26 2015 -0700 ---------------------------------------------------------------------- .../spark/sql/execution/basicOperators.scala | 12 +++++++++++- .../spark/sql/execution/PlannerSuite.scala | 20 +++++++++++++++++--- 2 files changed, 28 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6be945ce/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 247c900..77b9806 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -237,7 +237,10 @@ case class TakeOrderedAndProject( projectList: Option[Seq[NamedExpression]], child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = child.output + override def output: Seq[Attribute] = { + val projectOutput = projectList.map(_.map(_.toAttribute)) + projectOutput.getOrElse(child.output) + } override def outputPartitioning: Partitioning = SinglePartition @@ -263,6 +266,13 @@ case class TakeOrderedAndProject( protected override def doExecute(): RDD[InternalRow] = sparkContext.makeRDD(collectData(), 1) override def outputOrdering: Seq[SortOrder] = sortOrder + + override def simpleString: String = { + val orderByString = sortOrder.mkString("[", ",", "]") + val outputString = output.mkString("[", ",", "]") + + s"TakeOrderedAndProject(limit=$limit, orderBy=$orderByString, output=$outputString)" + } } /** http://git-wip-us.apache.org/repos/asf/spark/blob/6be945ce/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 937a108..fad93b0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -162,9 +162,23 @@ class PlannerSuite extends SparkFunSuite with SharedSQLContext { } test("efficient limit -> project -> sort") { - val query = testData.sort('key).select('value).limit(2).logicalPlan - val planned = ctx.planner.TakeOrderedAndProject(query) - assert(planned.head.isInstanceOf[execution.TakeOrderedAndProject]) + { + val query = + testData.select('key, 'value).sort('key).limit(2).logicalPlan + val planned = ctx.planner.TakeOrderedAndProject(query) + assert(planned.head.isInstanceOf[execution.TakeOrderedAndProject]) + assert(planned.head.output === testData.select('key, 'value).logicalPlan.output) + } + + { + // We need to make sure TakeOrderedAndProject's output is correct when we push a project + // into it. + val query = + testData.select('key, 'value).sort('key).select('value, 'key).limit(2).logicalPlan + val planned = ctx.planner.TakeOrderedAndProject(query) + assert(planned.head.isInstanceOf[execution.TakeOrderedAndProject]) + assert(planned.head.output === testData.select('value, 'key).logicalPlan.output) + } } test("PartitioningCollection") { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
