i dont know anything about windowing or about not using developer apis... but
but a trivial implementation of top-k requires a total sort per group. this can be done with dataset. we do this using spark-sorted ( https://github.com/tresata/spark-sorted) but its not hard to do it yourself for datasets either. for rdds its actually a little harder i think (if you want to avoid in memory assumption, which i assume you do).. a perhaps more efficient implementation uses an aggregator. it is not hard to adapt algebirds topk aggregator (spacesaver) to use as a spark aggregator. this requires a simple adapter class. we do this in-house as well. although i have to say i would recommend spark 2.1.0 for this. spark 2.0.x aggregator codegen is too buggy in my experience. On Tue, Jan 3, 2017 at 2:09 PM, Andy Dang <nam...@gmail.com> wrote: > Hi Austin, > > It's trivial to implement top-k in the RDD world - however I would like to > stay in the Dataset API world instead of flip-flopping between the two APIs > (consistency, wholestage codegen etc). > > The twitter library appears to support only RDD, and the solution you gave > me is very similar to what I did - it doesn't work very well with skewed > dataset :) (it has to perform the sort to work out the row number). > > I've been toying with the UDAF idea, but the more I write the code the > more I see myself digging deeper into the developer API land - not very > ideal to be honest. Also, UDAF doesn't have any concept of sorting, so it > gets messy really fast. > > ------- > Regards, > Andy > > On Tue, Jan 3, 2017 at 6:59 PM, HENSLEE, AUSTIN L <ah6...@att.com> wrote: > >> Andy, >> >> >> >> You might want to also checkout the Algebird libraries from Twitter. They >> have topK and a lot of other helpful functions. I’ve used the Algebird topk >> successfully on very large data sets. >> >> >> >> You can also use Spark SQL to do a “poor man’s” topK. This depends on how >> scrupulous you are about your TopKs (I can expound on this, if needed). >> >> >> >> I obfuscated the field names, before pasting this into email – I think I >> got them all consistently. >> >> >> >> Here’s the meat of the TopK part (found on SO, but I don’t have a >> reference) – this one takes the top 4, hence “rowNum <= 4”: >> >> >> >> SELECT time_bucket, >> >> identifier1, >> >> identifier2, >> >> incomingCount >> >> FROM (select time_bucket, >> >> identifier1, >> >> identifier2, >> >> incomingCount, >> >> ROW_NUMBER() OVER (PARTITION BY time_bucket, >> >> identifier1 >> >> ORDER BY count DESC) as rowNum >> >> FROM tablename) tmp >> >> WHERE rowNum <=4 >> >> ORDER BY time_bucket, identifier1, rowNum >> >> >> >> The count and order by: >> >> >> >> >> >> SELECT time_bucket, >> >> identifier1, >> >> identifier2, >> >> count(identifier2) as myCount >> >> FROM table >> >> GROUP BY time_bucket, >> >> identifier1, >> >> identifier2 >> >> ORDER BY time_bucket, >> >> identifier1, >> >> count(identifier2) DESC >> >> >> >> >> >> *From: *Andy Dang <nam...@gmail.com> >> *Date: *Tuesday, January 3, 2017 at 7:06 AM >> *To: *user <user@spark.apache.org> >> *Subject: *top-k function for Window >> >> >> >> Hi all, >> >> >> >> What's the best way to do top-k with Windowing in Dataset world? >> >> >> >> I have a snippet of code that filters the data to the top-k, but with >> skewed keys: >> >> >> >> val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime) >> >> val rank = row_number().over(windowSpec) >> >> >> >> input.withColumn("rank", rank).filter("rank <= 10").drop("rank") >> >> >> >> The problem with this code is that Spark doesn't know that it can sort >> the data locally, get the local rank first. What it ends up doing is >> performing a sort by key using the skewed keys, and this blew up the >> cluster since the keys are heavily skewed. >> >> >> >> In the RDD world we can do something like: >> >> rdd.mapPartitioins(iterator -> topK(iterator)) >> >> but I can't really think of an obvious to do this in the Dataset API, >> especially with Window function. I guess some UserAggregateFunction would >> do, but I wonder if there's obvious way that I missed. >> >> >> >> ------- >> Regards, >> Andy >> > >