Hi Austin, It's trivial to implement top-k in the RDD world - however I would like to stay in the Dataset API world instead of flip-flopping between the two APIs (consistency, wholestage codegen etc).
The twitter library appears to support only RDD, and the solution you gave me is very similar to what I did - it doesn't work very well with skewed dataset :) (it has to perform the sort to work out the row number). I've been toying with the UDAF idea, but the more I write the code the more I see myself digging deeper into the developer API land - not very ideal to be honest. Also, UDAF doesn't have any concept of sorting, so it gets messy really fast. ------- Regards, Andy On Tue, Jan 3, 2017 at 6:59 PM, HENSLEE, AUSTIN L <ah6...@att.com> wrote: > Andy, > > > > You might want to also checkout the Algebird libraries from Twitter. They > have topK and a lot of other helpful functions. I’ve used the Algebird topk > successfully on very large data sets. > > > > You can also use Spark SQL to do a “poor man’s” topK. This depends on how > scrupulous you are about your TopKs (I can expound on this, if needed). > > > > I obfuscated the field names, before pasting this into email – I think I > got them all consistently. > > > > Here’s the meat of the TopK part (found on SO, but I don’t have a > reference) – this one takes the top 4, hence “rowNum <= 4”: > > > > SELECT time_bucket, > > identifier1, > > identifier2, > > incomingCount > > FROM (select time_bucket, > > identifier1, > > identifier2, > > incomingCount, > > ROW_NUMBER() OVER (PARTITION BY time_bucket, > > identifier1 > > ORDER BY count DESC) as rowNum > > FROM tablename) tmp > > WHERE rowNum <=4 > > ORDER BY time_bucket, identifier1, rowNum > > > > The count and order by: > > > > > > SELECT time_bucket, > > identifier1, > > identifier2, > > count(identifier2) as myCount > > FROM table > > GROUP BY time_bucket, > > identifier1, > > identifier2 > > ORDER BY time_bucket, > > identifier1, > > count(identifier2) DESC > > > > > > *From: *Andy Dang <nam...@gmail.com> > *Date: *Tuesday, January 3, 2017 at 7:06 AM > *To: *user <user@spark.apache.org> > *Subject: *top-k function for Window > > > > Hi all, > > > > What's the best way to do top-k with Windowing in Dataset world? > > > > I have a snippet of code that filters the data to the top-k, but with > skewed keys: > > > > val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime) > > val rank = row_number().over(windowSpec) > > > > input.withColumn("rank", rank).filter("rank <= 10").drop("rank") > > > > The problem with this code is that Spark doesn't know that it can sort the > data locally, get the local rank first. What it ends up doing is performing > a sort by key using the skewed keys, and this blew up the cluster since the > keys are heavily skewed. > > > > In the RDD world we can do something like: > > rdd.mapPartitioins(iterator -> topK(iterator)) > > but I can't really think of an obvious to do this in the Dataset API, > especially with Window function. I guess some UserAggregateFunction would > do, but I wonder if there's obvious way that I missed. > > > > ------- > Regards, > Andy >