Hi everyone,
there seems to be different implementations of the "distinct" feature in
DataFrames and RDD and some performance issue with the DataFrame distinct
API.

In RDD.scala :

def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] =
withScope { map(x => (x, null)).reduceByKey((x, y) => x,
numPartitions).map(_._1) }
And in DataFrame :


case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output override def
requiredChildDistribution: Seq[Distribution] = if (partial)
UnspecifiedDistribution :: Nil else ClusteredDistribution(child.output) ::
Nil *override def execute(): RDD[Row] = {** child.execute().mapPartitions {
iter =>** val hashSet = new scala.collection.mutable.HashSet[Row]()* * var
currentRow: Row = null** while (iter.hasNext) {** currentRow = iter.next()**
if (!hashSet.contains(currentRow)) {** hashSet.add(currentRow.copy())** }**
}* * hashSet.iterator** }** }*}




I can try to reproduce more clearly the performance issue, but do you have
any insights into why we can't have the same distinct strategy between
DataFrame and RDD ?

Regards,

Olivier.

Reply via email to