Repository: spark
Updated Branches:
  refs/heads/branch-1.4 f7f2ac69d -> c73498773


[SPARK-7289] [SPARK-9949] [SQL] Backport SPARK-7289 and SPARK-9949 to branch 1.4

The bug fixed by SPARK-7289 is a pretty serious one (Spark SQL generates wrong 
results). We should backport the fix to branch 1.4 
(https://github.com/apache/spark/pull/6780). Also, we need to backport the fix 
of `TakeOrderedAndProject` as well (https://github.com/apache/spark/pull/8179).

Author: Wenchen Fan <cloud0...@outlook.com>
Author: Yin Huai <yh...@databricks.com>

Closes #8252 from yhuai/backport7289And9949.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c7349877
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c7349877
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c7349877

Branch: refs/heads/branch-1.4
Commit: c73498773952e675c9ec9c7b5acfc8c293ed6b51
Parents: f7f2ac6
Author: Wenchen Fan <cloud0...@outlook.com>
Authored: Mon Aug 17 21:22:01 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Mon Aug 17 21:22:01 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/SQLContext.scala |  2 +-
 .../spark/sql/execution/SparkStrategies.scala   |  8 +++--
 .../spark/sql/execution/basicOperators.scala    | 34 ++++++++++++++++----
 .../spark/sql/execution/PlannerSuite.scala      | 20 ++++++++++++
 .../org/apache/spark/sql/hive/HiveContext.scala |  2 +-
 5 files changed, 56 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c7349877/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index bd3f690..6a7569d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -827,7 +827,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
       experimental.extraStrategies ++ (
       DataSourceStrategy ::
       DDLStrategy ::
-      TakeOrdered ::
+      TakeOrderedAndProject ::
       HashAggregation ::
       LeftSemiJoin ::
       HashJoin ::

http://git-wip-us.apache.org/repos/asf/spark/blob/c7349877/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index d0a1ad0..bf75cc2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -205,10 +205,14 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
   protected lazy val singleRowRdd =
     sparkContext.parallelize(Seq(new GenericRow(Array[Any]()): Row), 1)
 
-  object TakeOrdered extends Strategy {
+  object TakeOrderedAndProject extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, 
child)) =>
-        execution.TakeOrdered(limit, order, planLater(child)) :: Nil
+        execution.TakeOrderedAndProject(limit, order, None, planLater(child)) 
:: Nil
+      case logical.Limit(
+             IntegerLiteral(limit),
+             logical.Project(projectList, logical.Sort(order, true, child))) =>
+        execution.TakeOrderedAndProject(limit, order, Some(projectList), 
planLater(child)) :: Nil
       case _ => Nil
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/c7349877/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index a30ade8..72b8e48 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -144,20 +144,35 @@ case class Limit(limit: Int, child: SparkPlan)
 
 /**
  * :: DeveloperApi ::
- * Take the first limit elements as defined by the sortOrder. This is 
logically equivalent to
- * having a [[Limit]] operator after a [[Sort]] operator. This could have been 
named TopK, but
- * Spark's top operator does the opposite in ordering so we name it 
TakeOrdered to avoid confusion.
+ * Take the first limit elements as defined by the sortOrder, and do 
projection if needed.
+ * This is logically equivalent to having a [[Limit]] operator after a 
[[Sort]] operator,
+ * or having a [[Project]] operator between them.
+ * This could have been named TopK, but Spark's top operator does the opposite 
in ordering
+ * so we name it TakeOrdered to avoid confusion.
  */
 @DeveloperApi
-case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: 
SparkPlan) extends UnaryNode {
+case class TakeOrderedAndProject(
+    limit: Int,
+    sortOrder: Seq[SortOrder],
+    projectList: Option[Seq[NamedExpression]],
+    child: SparkPlan) extends UnaryNode {
 
-  override def output: Seq[Attribute] = child.output
+  override def output: Seq[Attribute] = {
+    val projectOutput = projectList.map(_.map(_.toAttribute))
+    projectOutput.getOrElse(child.output)
+  }
 
   override def outputPartitioning: Partitioning = SinglePartition
 
   private val ord: RowOrdering = new RowOrdering(sortOrder, child.output)
 
-  private def collectData(): Array[Row] = 
child.execute().map(_.copy()).takeOrdered(limit)(ord)
+  // TODO: remove @transient after figure out how to clean closure at 
InsertIntoHiveTable.
+  @transient private val projection = projectList.map(new 
InterpretedProjection(_, child.output))
+
+  private def collectData(): Array[Row] = {
+    val data = child.execute().map(_.copy()).takeOrdered(limit)(ord)
+    projection.map(data.map(_)).getOrElse(data)
+  }
 
   override def executeCollect(): Array[Row] = {
     val converter = CatalystTypeConverters.createToScalaConverter(schema)
@@ -169,6 +184,13 @@ case class TakeOrdered(limit: Int, sortOrder: 
Seq[SortOrder], child: SparkPlan)
   protected override def doExecute(): RDD[Row] = 
sparkContext.makeRDD(collectData(), 1)
 
   override def outputOrdering: Seq[SortOrder] = sortOrder
+
+  override def simpleString: String = {
+    val orderByString = sortOrder.mkString("[", ",", "]")
+    val outputString = output.mkString("[", ",", "]")
+
+    s"TakeOrderedAndProject(limit=$limit, orderBy=$orderByString, 
output=$outputString)"
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/c7349877/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index 45a7e8f..0b0f433 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -142,4 +142,24 @@ class PlannerSuite extends SparkFunSuite {
 
     setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, origThreshold.toString)
   }
+
+  test("efficient limit -> project -> sort") {
+    {
+      val query =
+        testData.select('key, 'value).sort('key).limit(2).logicalPlan
+      val planned = planner.TakeOrderedAndProject(query)
+      assert(planned.head.isInstanceOf[execution.TakeOrderedAndProject])
+      assert(planned.head.output === testData.select('key, 
'value).logicalPlan.output)
+    }
+
+    {
+      // We need to make sure TakeOrderedAndProject's output is correct when 
we push a project
+      // into it.
+      val query =
+        testData.select('key, 'value).sort('key).select('value, 
'key).limit(2).logicalPlan
+      val planned = planner.TakeOrderedAndProject(query)
+      assert(planned.head.isInstanceOf[execution.TakeOrderedAndProject])
+      assert(planned.head.output === testData.select('value, 
'key).logicalPlan.output)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c7349877/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index d43afb7..43c08eb 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -447,7 +447,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
       HiveCommandStrategy(self),
       HiveDDLStrategy,
       DDLStrategy,
-      TakeOrdered,
+      TakeOrderedAndProject,
       ParquetOperations,
       InMemoryScans,
       ParquetConversion, // Must be before HiveTableScans


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to