Ok, but for the moment, this seems to be killing performances on some
computations...
I'll try to give you precise figures on this between rdd and dataframe.

Olivier.

Le jeu. 7 mai 2015 à 10:08, Reynold Xin <r...@databricks.com> a écrit :

> In 1.5, we will most likely just rewrite distinct in SQL to either use the
> Aggregate operator which will benefit from all the Tungsten optimizations,
> or have a Tungsten version of distinct for SQL/DataFrame.
>
> On Thu, May 7, 2015 at 1:32 AM, Olivier Girardot <
> o.girar...@lateral-thoughts.com> wrote:
>
>> Hi everyone,
>> there seems to be different implementations of the "distinct" feature in
>> DataFrames and RDD and some performance issue with the DataFrame distinct
>> API.
>>
>> In RDD.scala :
>>
>> def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null):
>> RDD[T] =
>> withScope { map(x => (x, null)).reduceByKey((x, y) => x,
>> numPartitions).map(_._1) }
>> And in DataFrame :
>>
>>
>> case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode
>> {
>> override def output: Seq[Attribute] = child.output override def
>> requiredChildDistribution: Seq[Distribution] = if (partial)
>> UnspecifiedDistribution :: Nil else ClusteredDistribution(child.output) ::
>>
> Nil *override def execute(): RDD[Row] = {** child.execute().mapPartitions {
>> iter =>** val hashSet = new scala.collection.mutable.HashSet[Row]()* * var
>> currentRow: Row = null** while (iter.hasNext) {** currentRow =
>> iter.next()**
>> if (!hashSet.contains(currentRow)) {** hashSet.add(currentRow.copy())**
>> }**
>> }* * hashSet.iterator** }** }*}
>
>
>>
>>
>>
>>
>> I can try to reproduce more clearly the performance issue, but do you have
>> any insights into why we can't have the same distinct strategy between
>> DataFrame and RDD ?
>>
>> Regards,
>>
>> Olivier.
>>
>

Reply via email to