The following worked for me as a workaround for distinct:

val pf = sqlContext.parquetFile("hdfs://file")
val distinctValuesOfColumn4 = 
pf.rdd.aggregate[scala.collection.mutable.HashSet[String]](new 
scala.collection.mutable.HashSet[String]())( (s, v) => s += v.getString(4), 
(s1, s2) => s1 ++= s2)

Best regards, Alexander

-----Original Message-----
From: Ulanov, Alexander 
Sent: Monday, May 11, 2015 11:59 AM
To: Olivier Girardot; Michael Armbrust
Cc: Reynold Xin; dev@spark.apache.org
Subject: RE: DataFrame distinct vs RDD distinct

Hi,

Could you suggest alternative way of implementing distinct, e.g. via fold or 
aggregate? Both SQL distinct and RDD distinct fail on my dataset due to 
overflow of Spark shuffle disk. I have 7 nodes with 300GB dedicated to Spark 
shuffle each. My dataset is 2B rows, the field which I'm performing distinct 
has 23 distinct values.

Best regards, Alexander 

-----Original Message-----
From: Olivier Girardot [mailto:o.girar...@lateral-thoughts.com]
Sent: Friday, May 08, 2015 12:50 PM
To: Michael Armbrust; Olivier Girardot
Cc: Reynold Xin; dev@spark.apache.org
Subject: Re: DataFrame distinct vs RDD distinct

I'll try to reproduce what has been reported to me first :) and I'll let you 
know. Thanks !

Le jeu. 7 mai 2015 à 21:16, Michael Armbrust <mich...@databricks.com> a écrit :

> I'd happily merge a PR that changes the distinct implementation to be 
> more like Spark core, assuming it includes benchmarks that show better 
> performance for both the "fits in memory case" and the "too big for 
> memory case".
>
> On Thu, May 7, 2015 at 2:23 AM, Olivier Girardot < 
> o.girar...@lateral-thoughts.com> wrote:
>
>> Ok, but for the moment, this seems to be killing performances on some 
>> computations...
>> I'll try to give you precise figures on this between rdd and dataframe.
>>
>> Olivier.
>>
>> Le jeu. 7 mai 2015 à 10:08, Reynold Xin <r...@databricks.com> a écrit :
>>
>> > In 1.5, we will most likely just rewrite distinct in SQL to either 
>> > use
>> the
>> > Aggregate operator which will benefit from all the Tungsten
>> optimizations,
>> > or have a Tungsten version of distinct for SQL/DataFrame.
>> >
>> > On Thu, May 7, 2015 at 1:32 AM, Olivier Girardot < 
>> > o.girar...@lateral-thoughts.com> wrote:
>> >
>> >> Hi everyone,
>> >> there seems to be different implementations of the "distinct" 
>> >> feature
>> in
>> >> DataFrames and RDD and some performance issue with the DataFrame
>> distinct
>> >> API.
>> >>
>> >> In RDD.scala :
>> >>
>> >> def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null):
>> >> RDD[T] =
>> >> withScope { map(x => (x, null)).reduceByKey((x, y) => x,
>> >> numPartitions).map(_._1) }
>> >> And in DataFrame :
>> >>
>> >>
>> >> case class Distinct(partial: Boolean, child: SparkPlan) extends
>> UnaryNode
>> >> {
>> >> override def output: Seq[Attribute] = child.output override def
>> >> requiredChildDistribution: Seq[Distribution] = if (partial) 
>> >> UnspecifiedDistribution :: Nil else
>> ClusteredDistribution(child.output) ::
>> >>
>> > Nil *override def execute(): RDD[Row] = {**
>> child.execute().mapPartitions {
>> >> iter =>** val hashSet = new
>> >> scala.collection.mutable.HashSet[Row]()* *
>> var
>> >> currentRow: Row = null** while (iter.hasNext) {** currentRow =
>> >> iter.next()**
>> >> if (!hashSet.contains(currentRow)) {**
>> >> hashSet.add(currentRow.copy())**
>> >> }**
>> >> }* * hashSet.iterator** }** }*}
>> >
>> >
>> >>
>> >>
>> >>
>> >>
>> >> I can try to reproduce more clearly the performance issue, but do 
>> >> you
>> have
>> >> any insights into why we can't have the same distinct strategy 
>> >> between DataFrame and RDD ?
>> >>
>> >> Regards,
>> >>
>> >> Olivier.
>> >>
>> >
>>
>
>
B KKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKCB  [  
X  ܚX KK[XZ[
 ] ][  X  ܚX P \ ˘\X K ܙ B  ܈Y][ۘ[  [X[  K[XZ[
 ] Z[ \ ˘\X K ܙ B 

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org

Reply via email to