Assume you have a UDAF which looks like this: - Input: The value
- Buffer: K elements - Output: An array (which would have the K elements) - Init: Initialize all elements to some irrelevant value (e.g. int.MinValue) - Update: Start going over the buffer find the spot which is smaller than the current value then push everything forward and put it in (i.e. sorted insert) - Merge: “merge sort” between the two buffers - Evaluate: turn the buffer to array Then run the UDAF on the groupby. The result would be an array of (upto) K elements per key. To turn it back to K lines all you need to do is explode it. Assuming that K is small, the calculation of the UDAF would be much faster than the sorting (it only needs to do sortings on very small K). Assaf. From: Andy Dang [mailto:nam...@gmail.com] Sent: Tuesday, January 03, 2017 8:03 PM To: Mendelson, Assaf Cc: user Subject: Re: top-k function for Window > Furthermore, in your example you don’t even need a window function, you can > simply use groupby and explode Can you clarify? You need to sort somehow (be it map-side sorting or reduce-side sorting). ------- Regards, Andy On Tue, Jan 3, 2017 at 2:07 PM, Mendelson, Assaf <assaf.mendel...@rsa.com<mailto:assaf.mendel...@rsa.com>> wrote: You can write a UDAF in which the buffer contains the top K and manage it. This means you don’t need to sort at all. Furthermore, in your example you don’t even need a window function, you can simply use groupby and explode. Of course, this is only relevant if k is small… From: Andy Dang [mailto:nam...@gmail.com<mailto:nam...@gmail.com>] Sent: Tuesday, January 03, 2017 3:07 PM To: user Subject: top-k function for Window Hi all, What's the best way to do top-k with Windowing in Dataset world? I have a snippet of code that filters the data to the top-k, but with skewed keys: val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime) val rank = row_number().over(windowSpec) input.withColumn("rank", rank).filter("rank <= 10").drop("rank") The problem with this code is that Spark doesn't know that it can sort the data locally, get the local rank first. What it ends up doing is performing a sort by key using the skewed keys, and this blew up the cluster since the keys are heavily skewed. In the RDD world we can do something like: rdd.mapPartitioins(iterator -> topK(iterator)) but I can't really think of an obvious to do this in the Dataset API, especially with Window function. I guess some UserAggregateFunction would do, but I wonder if there's obvious way that I missed. ------- Regards, Andy