Hi Austin,

It's trivial to implement top-k in the RDD world - however I would like to
stay in the Dataset API world instead of flip-flopping between the two APIs
(consistency, wholestage codegen etc).

The twitter library appears to support only RDD, and the solution you gave
me is very similar to what I did - it doesn't work very well with skewed
dataset :) (it has to perform the sort to work out the row number).

I've been toying with the UDAF idea, but the more I write the code the more
I see myself digging deeper into the developer API land  - not very ideal
to be honest. Also, UDAF doesn't have any concept of sorting, so it gets
messy really fast.

-------
Regards,
Andy

On Tue, Jan 3, 2017 at 6:59 PM, HENSLEE, AUSTIN L <ah6...@att.com> wrote:

> Andy,
>
>
>
> You might want to also checkout the Algebird libraries from Twitter. They
> have topK and a lot of other helpful functions. I’ve used the Algebird topk
> successfully on very large data sets.
>
>
>
> You can also use Spark SQL to do a “poor man’s” topK. This depends on how
> scrupulous you are about your TopKs (I can expound on this, if needed).
>
>
>
> I obfuscated the field names, before pasting this into email – I think I
> got them all consistently.
>
>
>
> Here’s the meat of the TopK part (found on SO, but I don’t have a
> reference) – this one takes the top 4, hence “rowNum <= 4”:
>
>
>
> SELECT time_bucket,
>
>        identifier1,
>
>        identifier2,
>
>        incomingCount
>
>   FROM (select time_bucket,
>
>         identifier1,
>
>         identifier2,
>
>         incomingCount,
>
>        ROW_NUMBER() OVER (PARTITION BY time_bucket,
>
>                                        identifier1
>
>                               ORDER BY count DESC) as rowNum
>
>                                   FROM tablename) tmp
>
>   WHERE rowNum <=4
>
>   ORDER BY time_bucket, identifier1, rowNum
>
>
>
> The count and order by:
>
>
>
>
>
> SELECT time_bucket,
>
>        identifier1,
>
>        identifier2,
>
>        count(identifier2) as myCount
>
>   FROM table
>
>   GROUP BY time_bucket,
>
>            identifier1,
>
>            identifier2
>
>   ORDER BY time_bucket,
>
>            identifier1,
>
>            count(identifier2) DESC
>
>
>
>
>
> *From: *Andy Dang <nam...@gmail.com>
> *Date: *Tuesday, January 3, 2017 at 7:06 AM
> *To: *user <user@spark.apache.org>
> *Subject: *top-k function for Window
>
>
>
> Hi all,
>
>
>
> What's the best way to do top-k with Windowing in Dataset world?
>
>
>
> I have a snippet of code that filters the data to the top-k, but with
> skewed keys:
>
>
>
> val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime)
>
> val rank = row_number().over(windowSpec)
>
>
>
> input.withColumn("rank", rank).filter("rank <= 10").drop("rank")
>
>
>
> The problem with this code is that Spark doesn't know that it can sort the
> data locally, get the local rank first. What it ends up doing is performing
> a sort by key using the skewed keys, and this blew up the cluster since the
> keys are heavily skewed.
>
>
>
> In the RDD world we can do something like:
>
> rdd.mapPartitioins(iterator -> topK(iterator))
>
> but I can't really think of an obvious to do this in the Dataset API,
> especially with Window function. I guess some UserAggregateFunction would
> do, but I wonder if there's obvious way that I missed.
>
>
>
> -------
> Regards,
> Andy
>

Reply via email to