Repository: spark
Updated Branches:
  refs/heads/branch-1.6 4a11e2df7 -> 6f98d47f8


[SPARK-11734][SQL] Rename TungstenProject -> Project, TungstenSort -> Sort

I didn't remove the old Sort operator, since we still use it in randomized 
tests. I moved it into test module and renamed it ReferenceSort.

Author: Reynold Xin <r...@databricks.com>

Closes #9700 from rxin/SPARK-11734.

(cherry picked from commit d22fc10887fdc6a86f6122648a823d0d37d4d795)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6f98d47f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6f98d47f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6f98d47f

Branch: refs/heads/branch-1.6
Commit: 6f98d47f8d87b19b3cc5915d96efe6766997da45
Parents: 4a11e2d
Author: Reynold Xin <r...@databricks.com>
Authored: Sun Nov 15 10:33:53 2015 -0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Sun Nov 15 10:34:01 2015 -0800

----------------------------------------------------------------------
 .../apache/spark/sql/execution/Exchange.scala   |   7 +-
 .../org/apache/spark/sql/execution/Sort.scala   | 100 +++++++++++++
 .../spark/sql/execution/SparkPlanner.scala      |   2 +-
 .../spark/sql/execution/SparkStrategies.scala   |  16 +-
 .../spark/sql/execution/basicOperators.scala    |   2 +-
 .../datasources/DataSourceStrategy.scala        |   2 +-
 .../org/apache/spark/sql/execution/sort.scala   | 147 -------------------
 .../spark/sql/ColumnExpressionSuite.scala       |   4 +-
 .../spark/sql/execution/PlannerSuite.scala      |   6 +-
 .../spark/sql/execution/ReferenceSort.scala     |  61 ++++++++
 .../execution/RowFormatConvertersSuite.scala    |   4 +-
 .../apache/spark/sql/execution/SortSuite.scala  |  69 ++++++++-
 .../spark/sql/execution/TungstenSortSuite.scala |  86 -----------
 .../sql/execution/metric/SQLMetricsSuite.scala  |  12 +-
 .../hive/execution/HiveTypeCoercionSuite.scala  |   4 +-
 .../sources/ParquetHadoopFsRelationSuite.scala  |   2 +-
 16 files changed, 244 insertions(+), 280 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6f98d47f/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index a161cf0..62cbc51 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -51,7 +51,7 @@ case class Exchange(
     }
 
     val simpleNodeName = if (tungstenMode) "TungstenExchange" else "Exchange"
-    s"${simpleNodeName}${extraInfo}"
+    s"$simpleNodeName$extraInfo"
   }
 
   /**
@@ -475,10 +475,7 @@ private[sql] case class EnsureRequirements(sqlContext: 
SQLContext) extends Rule[
       if (requiredOrdering.nonEmpty) {
         // If child.outputOrdering is [a, b] and requiredOrdering is [a], we 
do not need to sort.
         if (requiredOrdering != 
child.outputOrdering.take(requiredOrdering.length)) {
-          sqlContext.planner.BasicOperators.getSortOperator(
-            requiredOrdering,
-            global = false,
-            child)
+          Sort(requiredOrdering, global = false, child = child)
         } else {
           child
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/6f98d47f/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
new file mode 100644
index 0000000..24207cb
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.{InternalAccumulator, SparkEnv, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical.{Distribution, 
OrderedDistribution, UnspecifiedDistribution}
+import org.apache.spark.sql.execution.metric.SQLMetrics
+
+/**
+ * Performs (external) sorting.
+ *
+ * @param global when true performs a global sort of all partitions by 
shuffling the data first
+ *               if necessary.
+ * @param testSpillFrequency Method for configuring periodic spilling in unit 
tests. If set, will
+ *                           spill every `frequency` records.
+ */
+case class Sort(
+    sortOrder: Seq[SortOrder],
+    global: Boolean,
+    child: SparkPlan,
+    testSpillFrequency: Int = 0)
+  extends UnaryNode {
+
+  override def outputsUnsafeRows: Boolean = true
+  override def canProcessUnsafeRows: Boolean = true
+  override def canProcessSafeRows: Boolean = false
+
+  override def output: Seq[Attribute] = child.output
+
+  override def outputOrdering: Seq[SortOrder] = sortOrder
+
+  override def requiredChildDistribution: Seq[Distribution] =
+    if (global) OrderedDistribution(sortOrder) :: Nil else 
UnspecifiedDistribution :: Nil
+
+  override private[sql] lazy val metrics = Map(
+    "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
+    "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"))
+
+  protected override def doExecute(): RDD[InternalRow] = {
+    val schema = child.schema
+    val childOutput = child.output
+
+    val dataSize = longMetric("dataSize")
+    val spillSize = longMetric("spillSize")
+
+    child.execute().mapPartitionsInternal { iter =>
+      val ordering = newOrdering(sortOrder, childOutput)
+
+      // The comparator for comparing prefix
+      val boundSortExpression = BindReferences.bindReference(sortOrder.head, 
childOutput)
+      val prefixComparator = 
SortPrefixUtils.getPrefixComparator(boundSortExpression)
+
+      // The generator for prefix
+      val prefixProjection = 
UnsafeProjection.create(Seq(SortPrefix(boundSortExpression)))
+      val prefixComputer = new UnsafeExternalRowSorter.PrefixComputer {
+        override def computePrefix(row: InternalRow): Long = {
+          prefixProjection.apply(row).getLong(0)
+        }
+      }
+
+      val pageSize = SparkEnv.get.memoryManager.pageSizeBytes
+      val sorter = new UnsafeExternalRowSorter(
+        schema, ordering, prefixComparator, prefixComputer, pageSize)
+      if (testSpillFrequency > 0) {
+        sorter.setTestSpillFrequency(testSpillFrequency)
+      }
+
+      // Remember spill data size of this task before execute this operator so 
that we can
+      // figure out how many bytes we spilled for this operator.
+      val spillSizeBefore = TaskContext.get().taskMetrics().memoryBytesSpilled
+
+      val sortedIterator = sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
+
+      dataSize += sorter.getPeakMemoryUsage
+      spillSize += TaskContext.get().taskMetrics().memoryBytesSpilled - 
spillSizeBefore
+
+      TaskContext.get().internalMetricsToAccumulators(
+        
InternalAccumulator.PEAK_EXECUTION_MEMORY).add(sorter.getPeakMemoryUsage)
+      sortedIterator
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6f98d47f/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
index b7c5476..6e9a4df 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
@@ -80,7 +80,7 @@ class SparkPlanner(val sqlContext: SQLContext) extends 
SparkStrategies {
       filterCondition.map(Filter(_, scan)).getOrElse(scan)
     } else {
       val scan = scanBuilder((projectSet ++ filterSet).toSeq)
-      TungstenProject(projectList, filterCondition.map(Filter(_, 
scan)).getOrElse(scan))
+      Project(projectList, filterCondition.map(Filter(_, 
scan)).getOrElse(scan))
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6f98d47f/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 67201a2..3d4ce63 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -302,16 +302,6 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
   object BasicOperators extends Strategy {
     def numPartitions: Int = self.numPartitions
 
-    /**
-     * Picks an appropriate sort operator.
-     *
-     * @param global when true performs a global sort of all partitions by 
shuffling the data first
-     *               if necessary.
-     */
-    def getSortOperator(sortExprs: Seq[SortOrder], global: Boolean, child: 
SparkPlan): SparkPlan = {
-      execution.TungstenSort(sortExprs, global, child)
-    }
-
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case r: RunnableCommand => ExecutedCommand(r) :: Nil
 
@@ -339,11 +329,11 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
       case logical.SortPartitions(sortExprs, child) =>
         // This sort only sorts tuples within a partition. Its 
requiredDistribution will be
         // an UnspecifiedDistribution.
-        getSortOperator(sortExprs, global = false, planLater(child)) :: Nil
+        execution.Sort(sortExprs, global = false, child = planLater(child)) :: 
Nil
       case logical.Sort(sortExprs, global, child) =>
-        getSortOperator(sortExprs, global, planLater(child)):: Nil
+        execution.Sort(sortExprs, global, planLater(child)) :: Nil
       case logical.Project(projectList, child) =>
-        execution.TungstenProject(projectList, planLater(child)) :: Nil
+        execution.Project(projectList, planLater(child)) :: Nil
       case logical.Filter(condition, child) =>
         execution.Filter(condition, planLater(child)) :: Nil
       case e @ logical.Expand(_, _, child) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/6f98d47f/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 07925c6..e79092e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -30,7 +30,7 @@ import org.apache.spark.util.random.PoissonSampler
 import org.apache.spark.{HashPartitioner, SparkEnv}
 
 
-case class TungstenProject(projectList: Seq[NamedExpression], child: 
SparkPlan) extends UnaryNode {
+case class Project(projectList: Seq[NamedExpression], child: SparkPlan) 
extends UnaryNode {
 
   override private[sql] lazy val metrics = Map(
     "numRows" -> SQLMetrics.createLongMetric(sparkContext, "number of rows"))

http://git-wip-us.apache.org/repos/asf/spark/blob/6f98d47f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 824c89a..9bbbfa7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -343,7 +343,7 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
         requestedColumns,
         scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
         relation.relation)
-      execution.TungstenProject(
+      execution.Project(
         projects, filterCondition.map(execution.Filter(_, 
scan)).getOrElse(scan))
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/6f98d47f/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
deleted file mode 100644
index 52ef00e..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.errors._
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical.{Distribution, 
OrderedDistribution, UnspecifiedDistribution}
-import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.CompletionIterator
-import org.apache.spark.util.collection.ExternalSorter
-import org.apache.spark.{InternalAccumulator, SparkEnv, TaskContext}
-
-////////////////////////////////////////////////////////////////////////////////////////////////////
-// This file defines various sort operators.
-////////////////////////////////////////////////////////////////////////////////////////////////////
-
-/**
- * Performs a sort, spilling to disk as needed.
- * @param global when true performs a global sort of all partitions by 
shuffling the data first
- *               if necessary.
- */
-case class Sort(
-    sortOrder: Seq[SortOrder],
-    global: Boolean,
-    child: SparkPlan)
-  extends UnaryNode {
-
-  override def requiredChildDistribution: Seq[Distribution] =
-    if (global) OrderedDistribution(sortOrder) :: Nil else 
UnspecifiedDistribution :: Nil
-
-  protected override def doExecute(): RDD[InternalRow] = attachTree(this, 
"sort") {
-    child.execute().mapPartitionsInternal( { iterator =>
-      val ordering = newOrdering(sortOrder, child.output)
-      val sorter = new ExternalSorter[InternalRow, Null, InternalRow](
-        TaskContext.get(), ordering = Some(ordering))
-      sorter.insertAll(iterator.map(r => (r.copy(), null)))
-      val baseIterator = sorter.iterator.map(_._1)
-      val context = TaskContext.get()
-      context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
-      context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
-      context.internalMetricsToAccumulators(
-        
InternalAccumulator.PEAK_EXECUTION_MEMORY).add(sorter.peakMemoryUsedBytes)
-      // TODO(marmbrus): The complex type signature below thwarts inference 
for no reason.
-      CompletionIterator[InternalRow, Iterator[InternalRow]](baseIterator, 
sorter.stop())
-    }, preservesPartitioning = true)
-  }
-
-  override def output: Seq[Attribute] = child.output
-
-  override def outputOrdering: Seq[SortOrder] = sortOrder
-}
-
-/**
- * Optimized version of [[Sort]] that operates on binary data (implemented as 
part of
- * Project Tungsten).
- *
- * @param global when true performs a global sort of all partitions by 
shuffling the data first
- *               if necessary.
- * @param testSpillFrequency Method for configuring periodic spilling in unit 
tests. If set, will
- *                           spill every `frequency` records.
- */
-
-case class TungstenSort(
-    sortOrder: Seq[SortOrder],
-    global: Boolean,
-    child: SparkPlan,
-    testSpillFrequency: Int = 0)
-  extends UnaryNode {
-
-  override def outputsUnsafeRows: Boolean = true
-  override def canProcessUnsafeRows: Boolean = true
-  override def canProcessSafeRows: Boolean = false
-
-  override def output: Seq[Attribute] = child.output
-
-  override def outputOrdering: Seq[SortOrder] = sortOrder
-
-  override def requiredChildDistribution: Seq[Distribution] =
-    if (global) OrderedDistribution(sortOrder) :: Nil else 
UnspecifiedDistribution :: Nil
-
-  override private[sql] lazy val metrics = Map(
-    "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
-    "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"))
-
-  protected override def doExecute(): RDD[InternalRow] = {
-    val schema = child.schema
-    val childOutput = child.output
-
-    val dataSize = longMetric("dataSize")
-    val spillSize = longMetric("spillSize")
-
-    child.execute().mapPartitions { iter =>
-      val ordering = newOrdering(sortOrder, childOutput)
-
-      // The comparator for comparing prefix
-      val boundSortExpression = BindReferences.bindReference(sortOrder.head, 
childOutput)
-      val prefixComparator = 
SortPrefixUtils.getPrefixComparator(boundSortExpression)
-
-      // The generator for prefix
-      val prefixProjection = 
UnsafeProjection.create(Seq(SortPrefix(boundSortExpression)))
-      val prefixComputer = new UnsafeExternalRowSorter.PrefixComputer {
-        override def computePrefix(row: InternalRow): Long = {
-          prefixProjection.apply(row).getLong(0)
-        }
-      }
-
-      val pageSize = SparkEnv.get.memoryManager.pageSizeBytes
-      val sorter = new UnsafeExternalRowSorter(
-        schema, ordering, prefixComparator, prefixComputer, pageSize)
-      if (testSpillFrequency > 0) {
-        sorter.setTestSpillFrequency(testSpillFrequency)
-      }
-
-      // Remember spill data size of this task before execute this operator so 
that we can
-      // figure out how many bytes we spilled for this operator.
-      val spillSizeBefore = TaskContext.get().taskMetrics().memoryBytesSpilled
-
-      val sortedIterator = sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
-
-      dataSize += sorter.getPeakMemoryUsage
-      spillSize += TaskContext.get().taskMetrics().memoryBytesSpilled - 
spillSizeBefore
-
-      TaskContext.get().internalMetricsToAccumulators(
-        
InternalAccumulator.PEAK_EXECUTION_MEMORY).add(sorter.getPeakMemoryUsage)
-      sortedIterator
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/6f98d47f/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
index 8674da7..3eae3f6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql
 import org.apache.spark.sql.catalyst.expressions.NamedExpression
 import org.scalatest.Matchers._
 
-import org.apache.spark.sql.execution.TungstenProject
+import org.apache.spark.sql.execution.Project
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
@@ -619,7 +619,7 @@ class ColumnExpressionSuite extends QueryTest with 
SharedSQLContext {
 
     def checkNumProjects(df: DataFrame, expectedNumProjects: Int): Unit = {
       val projects = df.queryExecution.executedPlan.collect {
-        case tungstenProject: TungstenProject => tungstenProject
+        case tungstenProject: Project => tungstenProject
       }
       assert(projects.size === expectedNumProjects)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/6f98d47f/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index 8c41d79..be53ec3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -365,7 +365,7 @@ class PlannerSuite extends SharedSQLContext {
     )
     val outputPlan = EnsureRequirements(sqlContext).apply(inputPlan)
     assertDistributionRequirementsAreSatisfied(outputPlan)
-    if (outputPlan.collect { case s: TungstenSort => true; case s: Sort => 
true }.isEmpty) {
+    if (outputPlan.collect { case s: Sort => true }.isEmpty) {
       fail(s"Sort should have been added:\n$outputPlan")
     }
   }
@@ -381,7 +381,7 @@ class PlannerSuite extends SharedSQLContext {
     )
     val outputPlan = EnsureRequirements(sqlContext).apply(inputPlan)
     assertDistributionRequirementsAreSatisfied(outputPlan)
-    if (outputPlan.collect { case s: TungstenSort => true; case s: Sort => 
true }.nonEmpty) {
+    if (outputPlan.collect { case s: Sort => true }.nonEmpty) {
       fail(s"No sorts should have been added:\n$outputPlan")
     }
   }
@@ -398,7 +398,7 @@ class PlannerSuite extends SharedSQLContext {
     )
     val outputPlan = EnsureRequirements(sqlContext).apply(inputPlan)
     assertDistributionRequirementsAreSatisfied(outputPlan)
-    if (outputPlan.collect { case s: TungstenSort => true; case s: Sort => 
true }.isEmpty) {
+    if (outputPlan.collect { case s: Sort => true }.isEmpty) {
       fail(s"Sort should have been added:\n$outputPlan")
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/6f98d47f/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala
new file mode 100644
index 0000000..9575d26
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.{InternalAccumulator, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.errors._
+import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.util.CompletionIterator
+import org.apache.spark.util.collection.ExternalSorter
+
+
+/**
+ * A reference sort implementation used to compare against our normal sort.
+ */
+case class ReferenceSort(
+    sortOrder: Seq[SortOrder],
+    global: Boolean,
+    child: SparkPlan)
+  extends UnaryNode {
+
+  override def requiredChildDistribution: Seq[Distribution] =
+    if (global) OrderedDistribution(sortOrder) :: Nil else 
UnspecifiedDistribution :: Nil
+
+  protected override def doExecute(): RDD[InternalRow] = attachTree(this, 
"sort") {
+    child.execute().mapPartitions( { iterator =>
+      val ordering = newOrdering(sortOrder, child.output)
+      val sorter = new ExternalSorter[InternalRow, Null, InternalRow](
+        TaskContext.get(), ordering = Some(ordering))
+      sorter.insertAll(iterator.map(r => (r.copy(), null)))
+      val baseIterator = sorter.iterator.map(_._1)
+      val context = TaskContext.get()
+      context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
+      context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
+      context.internalMetricsToAccumulators(
+        
InternalAccumulator.PEAK_EXECUTION_MEMORY).add(sorter.peakMemoryUsedBytes)
+      CompletionIterator[InternalRow, Iterator[InternalRow]](baseIterator, 
sorter.stop())
+    }, preservesPartitioning = true)
+  }
+
+  override def output: Seq[Attribute] = child.output
+
+  override def outputOrdering: Seq[SortOrder] = sortOrder
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6f98d47f/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
index b3fceea..6876ab0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
@@ -33,9 +33,9 @@ class RowFormatConvertersSuite extends SparkPlanTest with 
SharedSQLContext {
     case c: ConvertToSafe => c
   }
 
-  private val outputsSafe = Sort(Nil, false, PhysicalRDD(Seq.empty, null, 
"name"))
+  private val outputsSafe = ReferenceSort(Nil, false, PhysicalRDD(Seq.empty, 
null, "name"))
   assert(!outputsSafe.outputsUnsafeRows)
-  private val outputsUnsafe = TungstenSort(Nil, false, PhysicalRDD(Seq.empty, 
null, "name"))
+  private val outputsUnsafe = Sort(Nil, false, PhysicalRDD(Seq.empty, null, 
"name"))
   assert(outputsUnsafe.outputsUnsafeRows)
 
   test("planner should insert unsafe->safe conversions when required") {

http://git-wip-us.apache.org/repos/asf/spark/blob/6f98d47f/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
index 847c188..e5d34be 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
@@ -17,15 +17,22 @@
 
 package org.apache.spark.sql.execution
 
-import org.apache.spark.sql.Row
+import scala.util.Random
+
+import org.apache.spark.AccumulatorSuite
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.{RandomDataGenerator, Row}
+
 
+/**
+ * Test sorting. Many of the test cases generate random data and compares the 
sorted result with one
+ * sorted by a reference implementation ([[ReferenceSort]]).
+ */
 class SortSuite extends SparkPlanTest with SharedSQLContext {
   import testImplicits.localSeqToDataFrameHolder
 
-  // This test was originally added as an example of how to use 
[[SparkPlanTest]];
-  // it's not designed to be a comprehensive test of ExternalSort.
   test("basic sorting using ExternalSort") {
 
     val input = Seq(
@@ -36,14 +43,66 @@ class SortSuite extends SparkPlanTest with SharedSQLContext 
{
 
     checkAnswer(
       input.toDF("a", "b", "c"),
-      Sort('a.asc :: 'b.asc :: Nil, global = true, _: SparkPlan),
+      (child: SparkPlan) => Sort('a.asc :: 'b.asc :: Nil, global = true, child 
= child),
       input.sortBy(t => (t._1, t._2)).map(Row.fromTuple),
       sortAnswers = false)
 
     checkAnswer(
       input.toDF("a", "b", "c"),
-      Sort('b.asc :: 'a.asc :: Nil, global = true, _: SparkPlan),
+      (child: SparkPlan) => Sort('b.asc :: 'a.asc :: Nil, global = true, child 
= child),
       input.sortBy(t => (t._2, t._1)).map(Row.fromTuple),
       sortAnswers = false)
   }
+
+  test("sort followed by limit") {
+    checkThatPlansAgree(
+      (1 to 100).map(v => Tuple1(v)).toDF("a"),
+      (child: SparkPlan) => Limit(10, Sort('a.asc :: Nil, global = true, child 
= child)),
+      (child: SparkPlan) => Limit(10, ReferenceSort('a.asc :: Nil, global = 
true, child)),
+      sortAnswers = false
+    )
+  }
+
+  test("sorting does not crash for large inputs") {
+    val sortOrder = 'a.asc :: Nil
+    val stringLength = 1024 * 1024 * 2
+    checkThatPlansAgree(
+      Seq(Tuple1("a" * stringLength), Tuple1("b" * 
stringLength)).toDF("a").repartition(1),
+      Sort(sortOrder, global = true, _: SparkPlan, testSpillFrequency = 1),
+      ReferenceSort(sortOrder, global = true, _: SparkPlan),
+      sortAnswers = false
+    )
+  }
+
+  test("sorting updates peak execution memory") {
+    AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "unsafe 
external sort") {
+      checkThatPlansAgree(
+        (1 to 100).map(v => Tuple1(v)).toDF("a"),
+        (child: SparkPlan) => Sort('a.asc :: Nil, global = true, child = 
child),
+        (child: SparkPlan) => ReferenceSort('a.asc :: Nil, global = true, 
child),
+        sortAnswers = false)
+    }
+  }
+
+  // Test sorting on different data types
+  for (
+    dataType <- DataTypeTestUtils.atomicTypes ++ Set(NullType);
+    nullable <- Seq(true, false);
+    sortOrder <- Seq('a.asc :: Nil, 'a.desc :: Nil);
+    randomDataGenerator <- RandomDataGenerator.forType(dataType, nullable)
+  ) {
+    test(s"sorting on $dataType with nullable=$nullable, 
sortOrder=$sortOrder") {
+      val inputData = Seq.fill(1000)(randomDataGenerator())
+      val inputDf = sqlContext.createDataFrame(
+        sparkContext.parallelize(Random.shuffle(inputData).map(v => Row(v))),
+        StructType(StructField("a", dataType, nullable = true) :: Nil)
+      )
+      checkThatPlansAgree(
+        inputDf,
+        p => ConvertToSafe(Sort(sortOrder, global = true, p: SparkPlan, 
testSpillFrequency = 23)),
+        ReferenceSort(sortOrder, global = true, _: SparkPlan),
+        sortAnswers = false
+      )
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6f98d47f/sql/core/src/test/scala/org/apache/spark/sql/execution/TungstenSortSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/TungstenSortSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/TungstenSortSuite.scala
deleted file mode 100644
index 7c860d1..0000000
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/TungstenSortSuite.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import scala.util.Random
-
-import org.apache.spark.AccumulatorSuite
-import org.apache.spark.sql.{RandomDataGenerator, Row, SQLConf}
-import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types._
-
-/**
- * A test suite that generates randomized data to test the [[TungstenSort]] 
operator.
- */
-class TungstenSortSuite extends SparkPlanTest with SharedSQLContext {
-  import testImplicits.localSeqToDataFrameHolder
-
-  test("sort followed by limit") {
-    checkThatPlansAgree(
-      (1 to 100).map(v => Tuple1(v)).toDF("a"),
-      (child: SparkPlan) => Limit(10, TungstenSort('a.asc :: Nil, true, 
child)),
-      (child: SparkPlan) => Limit(10, Sort('a.asc :: Nil, global = true, 
child)),
-      sortAnswers = false
-    )
-  }
-
-  test("sorting does not crash for large inputs") {
-    val sortOrder = 'a.asc :: Nil
-    val stringLength = 1024 * 1024 * 2
-    checkThatPlansAgree(
-      Seq(Tuple1("a" * stringLength), Tuple1("b" * 
stringLength)).toDF("a").repartition(1),
-      TungstenSort(sortOrder, global = true, _: SparkPlan, testSpillFrequency 
= 1),
-      Sort(sortOrder, global = true, _: SparkPlan),
-      sortAnswers = false
-    )
-  }
-
-  test("sorting updates peak execution memory") {
-    AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "unsafe 
external sort") {
-      checkThatPlansAgree(
-        (1 to 100).map(v => Tuple1(v)).toDF("a"),
-        (child: SparkPlan) => TungstenSort('a.asc :: Nil, true, child),
-        (child: SparkPlan) => Sort('a.asc :: Nil, global = true, child),
-        sortAnswers = false)
-    }
-  }
-
-  // Test sorting on different data types
-  for (
-    dataType <- DataTypeTestUtils.atomicTypes ++ Set(NullType);
-    nullable <- Seq(true, false);
-    sortOrder <- Seq('a.asc :: Nil, 'a.desc :: Nil);
-    randomDataGenerator <- RandomDataGenerator.forType(dataType, nullable)
-  ) {
-    test(s"sorting on $dataType with nullable=$nullable, 
sortOrder=$sortOrder") {
-      val inputData = Seq.fill(1000)(randomDataGenerator())
-      val inputDf = sqlContext.createDataFrame(
-        sparkContext.parallelize(Random.shuffle(inputData).map(v => Row(v))),
-        StructType(StructField("a", dataType, nullable = true) :: Nil)
-      )
-      checkThatPlansAgree(
-        inputDf,
-        plan => ConvertToSafe(
-          TungstenSort(sortOrder, global = true, plan: SparkPlan, 
testSpillFrequency = 23)),
-        Sort(sortOrder, global = true, _: SparkPlan),
-        sortAnswers = false
-      )
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/6f98d47f/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 486bfbb..5e2b415 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -114,17 +114,7 @@ class SQLMetricsSuite extends SparkFunSuite with 
SharedSQLContext {
     // PhysicalRDD(nodeId = 1) -> Project(nodeId = 0)
     val df = person.select('name)
     testSparkPlanMetrics(df, 1, Map(
-      0L ->("TungstenProject", Map(
-        "number of rows" -> 2L)))
-    )
-  }
-
-  test("TungstenProject metrics") {
-    // Assume the execution plan is
-    // PhysicalRDD(nodeId = 1) -> TungstenProject(nodeId = 0)
-    val df = person.select('name)
-    testSparkPlanMetrics(df, 1, Map(
-      0L ->("TungstenProject", Map(
+      0L ->("Project", Map(
         "number of rows" -> 2L)))
     )
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/6f98d47f/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala
index 4cf4e13..5bd323e 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.hive.execution
 
 import org.apache.spark.sql.catalyst.expressions.{Cast, EqualTo}
-import org.apache.spark.sql.execution.TungstenProject
+import org.apache.spark.sql.execution.Project
 import org.apache.spark.sql.hive.test.TestHive
 
 /**
@@ -44,7 +44,7 @@ class HiveTypeCoercionSuite extends HiveComparisonTest {
   test("[SPARK-2210] boolean cast on boolean value should be removed") {
     val q = "select cast(cast(key=0 as boolean) as boolean) from src"
     val project = TestHive.sql(q).queryExecution.executedPlan.collect {
-      case e: TungstenProject => e
+      case e: Project => e
     }.head
 
     // No cast expression introduced

http://git-wip-us.apache.org/repos/asf/spark/blob/6f98d47f/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
index b6db622..e866493 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
@@ -151,7 +151,7 @@ class ParquetHadoopFsRelationSuite extends 
HadoopFsRelationTest {
       val df = sqlContext.read.parquet(path).filter('a === 0).select('b)
       val physicalPlan = df.queryExecution.executedPlan
 
-      assert(physicalPlan.collect { case p: execution.TungstenProject => p 
}.length === 1)
+      assert(physicalPlan.collect { case p: execution.Project => p }.length 
=== 1)
       assert(physicalPlan.collect { case p: execution.Filter => p }.length === 
1)
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to