Cool ! You should also consider to contribute it back to spark if you are doing quantile calculations for example...there is also topbykey api added in master by @coderxiang....see if you can use that API to make the code clean.... On Apr 3, 2015 5:20 AM, "Aung Htet" <aung....@gmail.com> wrote:
> Hi Debasish, Charles, > > I solved the problem by using a BPQ like method, based on your > suggestions. So thanks very much for that! > > My approach was > 1) Count the population of each segment in the RDD by map/reduce so that I > get the bound number N equivalent to 10% of each segment. This becomes the > size of the BPQ. > 2) Associate the bounds N to the corresponding records in the first RDD. > 3) Reduce the RDD from step 2 by merging the values in every two rows, > basically creating a sorted list (Indexed Seq) > 4) If the size of the sorted list is greater than N (the bound) then, > create a new sorted list by using a priority queue and dequeuing top N > values. > > In the end, I get a record for each segment with N max values for each > segment. > > Regards, > Aung > > > > > > > > > On Fri, Mar 27, 2015 at 4:27 PM, Debasish Das <debasish.da...@gmail.com> > wrote: > >> In that case you can directly use count-min-sketch from algebird....they >> work fine with Spark aggregateBy but I have found the java BPQ from Spark >> much faster than say algebird Heap datastructure... >> >> On Thu, Mar 26, 2015 at 10:04 PM, Charles Hayden < >> charles.hay...@atigeo.com> wrote: >> >>> You could also consider using a count-min data structure such as in >>> https://github.com/laserson/dsq >>> >>> to get approximate quantiles, then use whatever values you want to >>> filter the original sequence. >>> ------------------------------ >>> *From:* Debasish Das <debasish.da...@gmail.com> >>> *Sent:* Thursday, March 26, 2015 9:45 PM >>> *To:* Aung Htet >>> *Cc:* user >>> *Subject:* Re: How to get a top X percent of a distribution represented >>> as RDD >>> >>> Idea is to use a heap and get topK elements from every >>> partition...then use aggregateBy and for combOp do a merge routine from >>> mergeSort...basically get 100 items from partition 1, 100 items from >>> partition 2, merge them so that you get sorted 200 items and take 100...for >>> merge you can use heap as well...Matei had a BPQ inside Spark which we use >>> all the time...Passing arrays over wire is better than passing full heap >>> objects and merge routine on array should run faster but needs experiment... >>> >>> On Thu, Mar 26, 2015 at 9:26 PM, Aung Htet <aung....@gmail.com> wrote: >>> >>>> Hi Debasish, >>>> >>>> Thanks for your suggestions. In-memory version is quite useful. I do >>>> not quite understand how you can use aggregateBy to get 10% top K elements. >>>> Can you please give an example? >>>> >>>> Thanks, >>>> Aung >>>> >>>> On Fri, Mar 27, 2015 at 2:40 PM, Debasish Das <debasish.da...@gmail.com >>>> > wrote: >>>> >>>>> You can do it in-memory as well....get 10% topK elements from each >>>>> partition and use merge from any sort algorithm like timsort....basically >>>>> aggregateBy >>>>> >>>>> Your version uses shuffle but this version is 0 shuffle..assuming >>>>> your data set is cached you will be using in-memory allReduce through >>>>> treeAggregate... >>>>> >>>>> But this is only good for top 10% or bottom 10%...if you need to do >>>>> it for top 30% then may be the shuffle version will work better... >>>>> >>>>> On Thu, Mar 26, 2015 at 8:31 PM, Aung Htet <aung....@gmail.com> wrote: >>>>> >>>>>> Hi all, >>>>>> >>>>>> I have a distribution represented as an RDD of tuples, in rows of >>>>>> (segment, score) >>>>>> For each segment, I want to discard tuples with top X percent scores. >>>>>> This seems hard to do in Spark RDD. >>>>>> >>>>>> A naive algorithm would be - >>>>>> >>>>>> 1) Sort RDD by segment & score (descending) >>>>>> 2) Within each segment, number the rows from top to bottom. >>>>>> 3) For each segment, calculate the cut off index. i.e. 90 for 10% >>>>>> cut off out of a segment with 100 rows. >>>>>> 4) For the entire RDD, filter rows with row num <= cut off index >>>>>> >>>>>> This does not look like a good algorithm. I would really appreciate >>>>>> if someone can suggest a better way to implement this in Spark. >>>>>> >>>>>> Regards, >>>>>> Aung >>>>>> >>>>> >>>>> >>>> >>> >> >