Repository: spark Updated Branches: refs/heads/master 7a0135293 -> 7e26b5761
[SPARK-2441][SQL] Add more efficient distinct operator. Author: Michael Armbrust <mich...@databricks.com> Closes #1366 from marmbrus/partialDistinct and squashes the following commits: 12a31ab [Michael Armbrust] Add more efficient distinct operator. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7e26b576 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7e26b576 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7e26b576 Branch: refs/heads/master Commit: 7e26b57615f6c1d3f9058f9c19c05ec91f017f4c Parents: 7a01352 Author: Michael Armbrust <mich...@databricks.com> Authored: Sat Jul 12 12:07:27 2014 -0700 Committer: Reynold Xin <r...@apache.org> Committed: Sat Jul 12 12:07:27 2014 -0700 ---------------------------------------------------------------------- .../spark/sql/execution/SparkStrategies.scala | 4 +-- .../spark/sql/execution/basicOperators.scala | 33 +++++++++++++++++++- 2 files changed, 34 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/7e26b576/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 7080074..c078e71 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -247,8 +247,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.Distinct(child) => - execution.Aggregate( - partial = false, child.output, child.output, planLater(child))(sqlContext) :: Nil + execution.Distinct(partial = false, + execution.Distinct(partial = true, planLater(child))) :: Nil case logical.Sort(sortExprs, child) => // This sort is a global sort. Its requiredDistribution will be an OrderedDistribution. execution.Sort(sortExprs, global = true, planLater(child)):: Nil http://git-wip-us.apache.org/repos/asf/spark/blob/7e26b576/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 97abd63..966d8f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical.{OrderedDistribution, UnspecifiedDistribution} +import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, OrderedDistribution, UnspecifiedDistribution} import org.apache.spark.util.MutablePair /** @@ -248,6 +248,37 @@ object ExistingRdd { case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode { override def execute() = rdd } +/** + * :: DeveloperApi :: + * Computes the set of distinct input rows using a HashSet. + * @param partial when true the distinct operation is performed partially, per partition, without + * shuffling the data. + * @param child the input query plan. + */ +@DeveloperApi +case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode { + override def output = child.output + + override def requiredChildDistribution = + if (partial) UnspecifiedDistribution :: Nil else ClusteredDistribution(child.output) :: Nil + + override def execute() = { + child.execute().mapPartitions { iter => + val hashSet = new scala.collection.mutable.HashSet[Row]() + + var currentRow: Row = null + while (iter.hasNext) { + currentRow = iter.next() + if (!hashSet.contains(currentRow)) { + hashSet.add(currentRow.copy()) + } + } + + hashSet.iterator + } + } +} + /** * :: DeveloperApi ::