Ok, but for the moment, this seems to be killing performances on some computations... I'll try to give you precise figures on this between rdd and dataframe.
Olivier. Le jeu. 7 mai 2015 à 10:08, Reynold Xin <r...@databricks.com> a écrit : > In 1.5, we will most likely just rewrite distinct in SQL to either use the > Aggregate operator which will benefit from all the Tungsten optimizations, > or have a Tungsten version of distinct for SQL/DataFrame. > > On Thu, May 7, 2015 at 1:32 AM, Olivier Girardot < > o.girar...@lateral-thoughts.com> wrote: > >> Hi everyone, >> there seems to be different implementations of the "distinct" feature in >> DataFrames and RDD and some performance issue with the DataFrame distinct >> API. >> >> In RDD.scala : >> >> def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): >> RDD[T] = >> withScope { map(x => (x, null)).reduceByKey((x, y) => x, >> numPartitions).map(_._1) } >> And in DataFrame : >> >> >> case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode >> { >> override def output: Seq[Attribute] = child.output override def >> requiredChildDistribution: Seq[Distribution] = if (partial) >> UnspecifiedDistribution :: Nil else ClusteredDistribution(child.output) :: >> > Nil *override def execute(): RDD[Row] = {** child.execute().mapPartitions { >> iter =>** val hashSet = new scala.collection.mutable.HashSet[Row]()* * var >> currentRow: Row = null** while (iter.hasNext) {** currentRow = >> iter.next()** >> if (!hashSet.contains(currentRow)) {** hashSet.add(currentRow.copy())** >> }** >> }* * hashSet.iterator** }** }*} > > >> >> >> >> >> I can try to reproduce more clearly the performance issue, but do you have >> any insights into why we can't have the same distinct strategy between >> DataFrame and RDD ? >> >> Regards, >> >> Olivier. >> >