Hi everyone, there seems to be different implementations of the "distinct" feature in DataFrames and RDD and some performance issue with the DataFrame distinct API.
In RDD.scala : def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) } And in DataFrame : case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output override def requiredChildDistribution: Seq[Distribution] = if (partial) UnspecifiedDistribution :: Nil else ClusteredDistribution(child.output) :: Nil *override def execute(): RDD[Row] = {** child.execute().mapPartitions { iter =>** val hashSet = new scala.collection.mutable.HashSet[Row]()* * var currentRow: Row = null** while (iter.hasNext) {** currentRow = iter.next()** if (!hashSet.contains(currentRow)) {** hashSet.add(currentRow.copy())** }** }* * hashSet.iterator** }** }*} I can try to reproduce more clearly the performance issue, but do you have any insights into why we can't have the same distinct strategy between DataFrame and RDD ? Regards, Olivier.