i dont know anything about windowing or about not using developer apis...

but

but a trivial implementation of top-k requires a total sort per group. this
can be done with dataset. we do this using spark-sorted (
https://github.com/tresata/spark-sorted) but its not hard to do it yourself
for datasets either. for rdds its actually a little harder i think (if you
want to avoid in memory assumption, which i assume you do)..

a perhaps more efficient implementation uses an aggregator. it is not hard
to adapt algebirds topk aggregator (spacesaver) to use as a spark
aggregator. this requires a simple adapter class. we do this in-house as
well. although i have to say i would recommend spark 2.1.0 for this. spark
2.0.x aggregator codegen is too buggy in my experience.

On Tue, Jan 3, 2017 at 2:09 PM, Andy Dang <nam...@gmail.com> wrote:

> Hi Austin,
>
> It's trivial to implement top-k in the RDD world - however I would like to
> stay in the Dataset API world instead of flip-flopping between the two APIs
> (consistency, wholestage codegen etc).
>
> The twitter library appears to support only RDD, and the solution you gave
> me is very similar to what I did - it doesn't work very well with skewed
> dataset :) (it has to perform the sort to work out the row number).
>
> I've been toying with the UDAF idea, but the more I write the code the
> more I see myself digging deeper into the developer API land  - not very
> ideal to be honest. Also, UDAF doesn't have any concept of sorting, so it
> gets messy really fast.
>
> -------
> Regards,
> Andy
>
> On Tue, Jan 3, 2017 at 6:59 PM, HENSLEE, AUSTIN L <ah6...@att.com> wrote:
>
>> Andy,
>>
>>
>>
>> You might want to also checkout the Algebird libraries from Twitter. They
>> have topK and a lot of other helpful functions. I’ve used the Algebird topk
>> successfully on very large data sets.
>>
>>
>>
>> You can also use Spark SQL to do a “poor man’s” topK. This depends on how
>> scrupulous you are about your TopKs (I can expound on this, if needed).
>>
>>
>>
>> I obfuscated the field names, before pasting this into email – I think I
>> got them all consistently.
>>
>>
>>
>> Here’s the meat of the TopK part (found on SO, but I don’t have a
>> reference) – this one takes the top 4, hence “rowNum <= 4”:
>>
>>
>>
>> SELECT time_bucket,
>>
>>        identifier1,
>>
>>        identifier2,
>>
>>        incomingCount
>>
>>   FROM (select time_bucket,
>>
>>         identifier1,
>>
>>         identifier2,
>>
>>         incomingCount,
>>
>>        ROW_NUMBER() OVER (PARTITION BY time_bucket,
>>
>>                                        identifier1
>>
>>                               ORDER BY count DESC) as rowNum
>>
>>                                   FROM tablename) tmp
>>
>>   WHERE rowNum <=4
>>
>>   ORDER BY time_bucket, identifier1, rowNum
>>
>>
>>
>> The count and order by:
>>
>>
>>
>>
>>
>> SELECT time_bucket,
>>
>>        identifier1,
>>
>>        identifier2,
>>
>>        count(identifier2) as myCount
>>
>>   FROM table
>>
>>   GROUP BY time_bucket,
>>
>>            identifier1,
>>
>>            identifier2
>>
>>   ORDER BY time_bucket,
>>
>>            identifier1,
>>
>>            count(identifier2) DESC
>>
>>
>>
>>
>>
>> *From: *Andy Dang <nam...@gmail.com>
>> *Date: *Tuesday, January 3, 2017 at 7:06 AM
>> *To: *user <user@spark.apache.org>
>> *Subject: *top-k function for Window
>>
>>
>>
>> Hi all,
>>
>>
>>
>> What's the best way to do top-k with Windowing in Dataset world?
>>
>>
>>
>> I have a snippet of code that filters the data to the top-k, but with
>> skewed keys:
>>
>>
>>
>> val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime)
>>
>> val rank = row_number().over(windowSpec)
>>
>>
>>
>> input.withColumn("rank", rank).filter("rank <= 10").drop("rank")
>>
>>
>>
>> The problem with this code is that Spark doesn't know that it can sort
>> the data locally, get the local rank first. What it ends up doing is
>> performing a sort by key using the skewed keys, and this blew up the
>> cluster since the keys are heavily skewed.
>>
>>
>>
>> In the RDD world we can do something like:
>>
>> rdd.mapPartitioins(iterator -> topK(iterator))
>>
>> but I can't really think of an obvious to do this in the Dataset API,
>> especially with Window function. I guess some UserAggregateFunction would
>> do, but I wonder if there's obvious way that I missed.
>>
>>
>>
>> -------
>> Regards,
>> Andy
>>
>
>

Reply via email to