Assume you have a UDAF which looks like this:

-          Input: The value

-          Buffer: K elements

-          Output: An array (which would have the K elements)

-          Init: Initialize all elements to some irrelevant value (e.g. 
int.MinValue)

-          Update: Start going over the buffer find the spot which is smaller 
than the current value then push everything forward and put it in (i.e. sorted 
insert)

-          Merge: “merge sort” between the two buffers

-          Evaluate: turn the buffer to array
Then run the UDAF on the groupby.

The result would be an array of (upto) K elements per key. To turn it back to K 
lines all you need to do is explode it.

Assuming that K is small, the calculation of the UDAF would be much faster than 
the sorting (it only needs to do sortings on very small K).

Assaf.
From: Andy Dang [mailto:nam...@gmail.com]
Sent: Tuesday, January 03, 2017 8:03 PM
To: Mendelson, Assaf
Cc: user
Subject: Re: top-k function for Window

> Furthermore, in your example you don’t even need a window function, you can 
> simply use groupby and explode

Can you clarify? You need to sort somehow (be it map-side sorting or 
reduce-side sorting).



-------
Regards,
Andy

On Tue, Jan 3, 2017 at 2:07 PM, Mendelson, Assaf 
<assaf.mendel...@rsa.com<mailto:assaf.mendel...@rsa.com>> wrote:
You can write a UDAF in which the buffer contains the top K and manage it. This 
means you don’t need to sort at all. Furthermore, in your example you don’t 
even need a window function, you can simply use groupby and explode.
Of course, this is only relevant if k is small…

From: Andy Dang [mailto:nam...@gmail.com<mailto:nam...@gmail.com>]
Sent: Tuesday, January 03, 2017 3:07 PM
To: user
Subject: top-k function for Window

Hi all,

What's the best way to do top-k with Windowing in Dataset world?

I have a snippet of code that filters the data to the top-k, but with skewed 
keys:

val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime)
val rank = row_number().over(windowSpec)

input.withColumn("rank", rank).filter("rank <= 10").drop("rank")

The problem with this code is that Spark doesn't know that it can sort the data 
locally, get the local rank first. What it ends up doing is performing a sort 
by key using the skewed keys, and this blew up the cluster since the keys are 
heavily skewed.

In the RDD world we can do something like:
rdd.mapPartitioins(iterator -> topK(iterator))
but I can't really think of an obvious to do this in the Dataset API, 
especially with Window function. I guess some UserAggregateFunction would do, 
but I wonder if there's obvious way that I missed.

-------
Regards,
Andy

Reply via email to