Re: top-k function for Window
Thanks for all the suggestions. So after toying with a bunch of examples I ended up taking the following approach: - Convert dataset to RDD - Key by the columns I wanted - Use aggregateByKey() to aggregate data into a serializable structure backed by a bounded priority queue - flatMap the result to get back an RDD - Convert the RDD to Dataset<> since it's got the same schema I tried the UDAF approach but it appears that in order to use ArrayType, I have to store the rows in InternalRow format. It's not obvious to me how to convert this InternalRow format back to the original Row, which is required by evaluate() method (so I can explode the data to original schema). I actually got it working for flat schema, but scraped it in the end since it's not really what I wanted. Not really a big fan of the RDD approach but if anyone's got the UDAF approach working then please let me know :) --- Regards, Andy On Wed, Jan 4, 2017 at 5:29 PM, Georg Heiler wrote: > What about https://github.com/myui/hivemall/wiki/Efficient-Top-k- > computation-on-Apache-Hive-using-Hivemall-UDTF > > Koert Kuipers schrieb am Mi. 4. Jan. 2017 um 16:11: > >> i assumed topk of frequencies in one pass. if its topk by known >> sorting/ordering then use priority queue aggregator instead of spacesaver. >> >> On Tue, Jan 3, 2017 at 3:11 PM, Koert Kuipers wrote: >> >> i dont know anything about windowing or about not using developer apis... >> >> but >> >> but a trivial implementation of top-k requires a total sort per group. >> this can be done with dataset. we do this using spark-sorted ( >> https://github.com/tresata/spark-sorted) but its not hard to do it >> yourself for datasets either. for rdds its actually a little harder i think >> (if you want to avoid in memory assumption, which i assume you do).. >> >> a perhaps more efficient implementation uses an aggregator. it is not >> hard to adapt algebirds topk aggregator (spacesaver) to use as a spark >> aggregator. this requires a simple adapter class. we do this in-house as >> well. although i have to say i would recommend spark 2.1.0 for this. spark >> 2.0.x aggregator codegen is too buggy in my experience. >> >> On Tue, Jan 3, 2017 at 2:09 PM, Andy Dang wrote: >> >> Hi Austin, >> >> It's trivial to implement top-k in the RDD world - however I would like >> to stay in the Dataset API world instead of flip-flopping between the two >> APIs (consistency, wholestage codegen etc). >> >> The twitter library appears to support only RDD, and the solution you >> gave me is very similar to what I did - it doesn't work very well with >> skewed dataset :) (it has to perform the sort to work out the row number). >> >> I've been toying with the UDAF idea, but the more I write the code the >> more I see myself digging deeper into the developer API land - not very >> ideal to be honest. Also, UDAF doesn't have any concept of sorting, so it >> gets messy really fast. >> >> --- >> Regards, >> Andy >> >> On Tue, Jan 3, 2017 at 6:59 PM, HENSLEE, AUSTIN L wrote: >> >> Andy, >> >> >> >> You might want to also checkout the Algebird libraries from Twitter. They >> have topK and a lot of other helpful functions. I’ve used the Algebird topk >> successfully on very large data sets. >> >> >> >> You can also use Spark SQL to do a “poor man’s” topK. This depends on how >> scrupulous you are about your TopKs (I can expound on this, if needed). >> >> >> >> I obfuscated the field names, before pasting this into email – I think I >> got them all consistently. >> >> >> >> Here’s the meat of the TopK part (found on SO, but I don’t have a >> reference) – this one takes the top 4, hence “rowNum <= 4”: >> >> >> >> SELECT time_bucket, >> >>identifier1, >> >>identifier2, >> >>incomingCount >> >> FROM (select time_bucket, >> >> identifier1, >> >> identifier2, >> >> incomingCount, >> >>ROW_NUMBER() OVER (PARTITION BY time_bucket, >> >>identifier1 >> >> ORDER BY count DESC) as rowNum >> >> FROM tablename) tmp >> >> WHERE rowNum <=4 >> >> ORDER BY time_bucket, identifier1, rowNum >> >> >> >> The count and order by: >> >> >> >> >> >> SELECT time_bucket, >> >>identifier1, >> >>identifier2, >> >>count(identifier2) as myCount >> >> FROM table >> >> GROUP BY time_bucket, >> >>identifier1, >> >>identifier2 >> >> ORDER BY time_bucket, >> >>identifier1, >> >>count(identifier2) DESC >> >> >> >> >> >> *From: *Andy Dang >> *Date: *Tuesday, January 3, 2017 at 7:06 AM >> *To: *user >> *Subject: *top-k function for Window >> >> >> >> Hi all, >> >> >> >> What's the best way to do top-k with Windowing in Dataset world? >> >> >> >> I have a snippet of code that filters the data to the top-k, but with >> skewed keys: >> >> >> >> val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTim
Re: top-k function for Window
What about https://github.com/myui/hivemall/wiki/Efficient-Top-k-computation-on-Apache-Hive-using-Hivemall-UDTF Koert Kuipers schrieb am Mi. 4. Jan. 2017 um 16:11: > i assumed topk of frequencies in one pass. if its topk by known > sorting/ordering then use priority queue aggregator instead of spacesaver. > > On Tue, Jan 3, 2017 at 3:11 PM, Koert Kuipers wrote: > > i dont know anything about windowing or about not using developer apis... > > but > > but a trivial implementation of top-k requires a total sort per group. > this can be done with dataset. we do this using spark-sorted ( > https://github.com/tresata/spark-sorted) but its not hard to do it > yourself for datasets either. for rdds its actually a little harder i think > (if you want to avoid in memory assumption, which i assume you do).. > > a perhaps more efficient implementation uses an aggregator. it is not hard > to adapt algebirds topk aggregator (spacesaver) to use as a spark > aggregator. this requires a simple adapter class. we do this in-house as > well. although i have to say i would recommend spark 2.1.0 for this. spark > 2.0.x aggregator codegen is too buggy in my experience. > > On Tue, Jan 3, 2017 at 2:09 PM, Andy Dang wrote: > > Hi Austin, > > It's trivial to implement top-k in the RDD world - however I would like to > stay in the Dataset API world instead of flip-flopping between the two APIs > (consistency, wholestage codegen etc). > > The twitter library appears to support only RDD, and the solution you gave > me is very similar to what I did - it doesn't work very well with skewed > dataset :) (it has to perform the sort to work out the row number). > > I've been toying with the UDAF idea, but the more I write the code the > more I see myself digging deeper into the developer API land - not very > ideal to be honest. Also, UDAF doesn't have any concept of sorting, so it > gets messy really fast. > > --- > Regards, > Andy > > On Tue, Jan 3, 2017 at 6:59 PM, HENSLEE, AUSTIN L wrote: > > Andy, > > > > You might want to also checkout the Algebird libraries from Twitter. They > have topK and a lot of other helpful functions. I’ve used the Algebird topk > successfully on very large data sets. > > > > You can also use Spark SQL to do a “poor man’s” topK. This depends on how > scrupulous you are about your TopKs (I can expound on this, if needed). > > > > I obfuscated the field names, before pasting this into email – I think I > got them all consistently. > > > > Here’s the meat of the TopK part (found on SO, but I don’t have a > reference) – this one takes the top 4, hence “rowNum <= 4”: > > > > SELECT time_bucket, > >identifier1, > >identifier2, > >incomingCount > > FROM (select time_bucket, > > identifier1, > > identifier2, > > incomingCount, > >ROW_NUMBER() OVER (PARTITION BY time_bucket, > >identifier1 > > ORDER BY count DESC) as rowNum > > FROM tablename) tmp > > WHERE rowNum <=4 > > ORDER BY time_bucket, identifier1, rowNum > > > > The count and order by: > > > > > > SELECT time_bucket, > >identifier1, > >identifier2, > >count(identifier2) as myCount > > FROM table > > GROUP BY time_bucket, > >identifier1, > >identifier2 > > ORDER BY time_bucket, > >identifier1, > >count(identifier2) DESC > > > > > > *From: *Andy Dang > *Date: *Tuesday, January 3, 2017 at 7:06 AM > *To: *user > *Subject: *top-k function for Window > > > > Hi all, > > > > What's the best way to do top-k with Windowing in Dataset world? > > > > I have a snippet of code that filters the data to the top-k, but with > skewed keys: > > > > val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime) > > val rank = row_number().over(windowSpec) > > > > input.withColumn("rank", rank).filter("rank <= 10").drop("rank") > > > > The problem with this code is that Spark doesn't know that it can sort the > data locally, get the local rank first. What it ends up doing is performing > a sort by key using the skewed keys, and this blew up the cluster since the > keys are heavily skewed. > > > > In the RDD world we can do something like: > > rdd.mapPartitioins(iterator -> topK(iterator)) > > but I can't really think of an obvious to do this in the Dataset API, > especially with Window function. I guess some UserAggregateFunction would > do, but I wonder if there's obvious way that I missed. > > > > --- > Regards, > Andy > > > > >
Re: top-k function for Window
i assumed topk of frequencies in one pass. if its topk by known sorting/ordering then use priority queue aggregator instead of spacesaver. On Tue, Jan 3, 2017 at 3:11 PM, Koert Kuipers wrote: > i dont know anything about windowing or about not using developer apis... > > but > > but a trivial implementation of top-k requires a total sort per group. > this can be done with dataset. we do this using spark-sorted ( > https://github.com/tresata/spark-sorted) but its not hard to do it > yourself for datasets either. for rdds its actually a little harder i think > (if you want to avoid in memory assumption, which i assume you do).. > > a perhaps more efficient implementation uses an aggregator. it is not hard > to adapt algebirds topk aggregator (spacesaver) to use as a spark > aggregator. this requires a simple adapter class. we do this in-house as > well. although i have to say i would recommend spark 2.1.0 for this. spark > 2.0.x aggregator codegen is too buggy in my experience. > > On Tue, Jan 3, 2017 at 2:09 PM, Andy Dang wrote: > >> Hi Austin, >> >> It's trivial to implement top-k in the RDD world - however I would like >> to stay in the Dataset API world instead of flip-flopping between the two >> APIs (consistency, wholestage codegen etc). >> >> The twitter library appears to support only RDD, and the solution you >> gave me is very similar to what I did - it doesn't work very well with >> skewed dataset :) (it has to perform the sort to work out the row number). >> >> I've been toying with the UDAF idea, but the more I write the code the >> more I see myself digging deeper into the developer API land - not very >> ideal to be honest. Also, UDAF doesn't have any concept of sorting, so it >> gets messy really fast. >> >> --- >> Regards, >> Andy >> >> On Tue, Jan 3, 2017 at 6:59 PM, HENSLEE, AUSTIN L wrote: >> >>> Andy, >>> >>> >>> >>> You might want to also checkout the Algebird libraries from Twitter. >>> They have topK and a lot of other helpful functions. I’ve used the Algebird >>> topk successfully on very large data sets. >>> >>> >>> >>> You can also use Spark SQL to do a “poor man’s” topK. This depends on >>> how scrupulous you are about your TopKs (I can expound on this, if needed). >>> >>> >>> >>> I obfuscated the field names, before pasting this into email – I think I >>> got them all consistently. >>> >>> >>> >>> Here’s the meat of the TopK part (found on SO, but I don’t have a >>> reference) – this one takes the top 4, hence “rowNum <= 4”: >>> >>> >>> >>> SELECT time_bucket, >>> >>>identifier1, >>> >>>identifier2, >>> >>>incomingCount >>> >>> FROM (select time_bucket, >>> >>> identifier1, >>> >>> identifier2, >>> >>> incomingCount, >>> >>>ROW_NUMBER() OVER (PARTITION BY time_bucket, >>> >>>identifier1 >>> >>> ORDER BY count DESC) as rowNum >>> >>> FROM tablename) tmp >>> >>> WHERE rowNum <=4 >>> >>> ORDER BY time_bucket, identifier1, rowNum >>> >>> >>> >>> The count and order by: >>> >>> >>> >>> >>> >>> SELECT time_bucket, >>> >>>identifier1, >>> >>>identifier2, >>> >>>count(identifier2) as myCount >>> >>> FROM table >>> >>> GROUP BY time_bucket, >>> >>>identifier1, >>> >>>identifier2 >>> >>> ORDER BY time_bucket, >>> >>>identifier1, >>> >>>count(identifier2) DESC >>> >>> >>> >>> >>> >>> *From: *Andy Dang >>> *Date: *Tuesday, January 3, 2017 at 7:06 AM >>> *To: *user >>> *Subject: *top-k function for Window >>> >>> >>> >>> Hi all, >>> >>> >>> >>> What's the best way to do top-k with Windowing in Dataset world? >>> >>> >>> >>> I have a snippet of code that filters the data to the top-k, but with >>> skewed keys: >>> >>> >>> >>> val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime) >>> >>> val rank = row_number().over(windowSpec) >>> >>> >>> >>> input.withColumn("rank", rank).filter("rank <= 10").drop("rank") >>> >>> >>> >>> The problem with this code is that Spark doesn't know that it can sort >>> the data locally, get the local rank first. What it ends up doing is >>> performing a sort by key using the skewed keys, and this blew up the >>> cluster since the keys are heavily skewed. >>> >>> >>> >>> In the RDD world we can do something like: >>> >>> rdd.mapPartitioins(iterator -> topK(iterator)) >>> >>> but I can't really think of an obvious to do this in the Dataset API, >>> especially with Window function. I guess some UserAggregateFunction would >>> do, but I wonder if there's obvious way that I missed. >>> >>> >>> >>> --- >>> Regards, >>> Andy >>> >> >> >
RE: top-k function for Window
Assume you have a UDAF which looks like this: - Input: The value - Buffer: K elements - Output: An array (which would have the K elements) - Init: Initialize all elements to some irrelevant value (e.g. int.MinValue) - Update: Start going over the buffer find the spot which is smaller than the current value then push everything forward and put it in (i.e. sorted insert) - Merge: “merge sort” between the two buffers - Evaluate: turn the buffer to array Then run the UDAF on the groupby. The result would be an array of (upto) K elements per key. To turn it back to K lines all you need to do is explode it. Assuming that K is small, the calculation of the UDAF would be much faster than the sorting (it only needs to do sortings on very small K). Assaf. From: Andy Dang [mailto:nam...@gmail.com] Sent: Tuesday, January 03, 2017 8:03 PM To: Mendelson, Assaf Cc: user Subject: Re: top-k function for Window > Furthermore, in your example you don’t even need a window function, you can > simply use groupby and explode Can you clarify? You need to sort somehow (be it map-side sorting or reduce-side sorting). --- Regards, Andy On Tue, Jan 3, 2017 at 2:07 PM, Mendelson, Assaf mailto:assaf.mendel...@rsa.com>> wrote: You can write a UDAF in which the buffer contains the top K and manage it. This means you don’t need to sort at all. Furthermore, in your example you don’t even need a window function, you can simply use groupby and explode. Of course, this is only relevant if k is small… From: Andy Dang [mailto:nam...@gmail.com<mailto:nam...@gmail.com>] Sent: Tuesday, January 03, 2017 3:07 PM To: user Subject: top-k function for Window Hi all, What's the best way to do top-k with Windowing in Dataset world? I have a snippet of code that filters the data to the top-k, but with skewed keys: val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime) val rank = row_number().over(windowSpec) input.withColumn("rank", rank).filter("rank <= 10").drop("rank") The problem with this code is that Spark doesn't know that it can sort the data locally, get the local rank first. What it ends up doing is performing a sort by key using the skewed keys, and this blew up the cluster since the keys are heavily skewed. In the RDD world we can do something like: rdd.mapPartitioins(iterator -> topK(iterator)) but I can't really think of an obvious to do this in the Dataset API, especially with Window function. I guess some UserAggregateFunction would do, but I wonder if there's obvious way that I missed. --- Regards, Andy
Re: top-k function for Window
i dont know anything about windowing or about not using developer apis... but but a trivial implementation of top-k requires a total sort per group. this can be done with dataset. we do this using spark-sorted ( https://github.com/tresata/spark-sorted) but its not hard to do it yourself for datasets either. for rdds its actually a little harder i think (if you want to avoid in memory assumption, which i assume you do).. a perhaps more efficient implementation uses an aggregator. it is not hard to adapt algebirds topk aggregator (spacesaver) to use as a spark aggregator. this requires a simple adapter class. we do this in-house as well. although i have to say i would recommend spark 2.1.0 for this. spark 2.0.x aggregator codegen is too buggy in my experience. On Tue, Jan 3, 2017 at 2:09 PM, Andy Dang wrote: > Hi Austin, > > It's trivial to implement top-k in the RDD world - however I would like to > stay in the Dataset API world instead of flip-flopping between the two APIs > (consistency, wholestage codegen etc). > > The twitter library appears to support only RDD, and the solution you gave > me is very similar to what I did - it doesn't work very well with skewed > dataset :) (it has to perform the sort to work out the row number). > > I've been toying with the UDAF idea, but the more I write the code the > more I see myself digging deeper into the developer API land - not very > ideal to be honest. Also, UDAF doesn't have any concept of sorting, so it > gets messy really fast. > > --- > Regards, > Andy > > On Tue, Jan 3, 2017 at 6:59 PM, HENSLEE, AUSTIN L wrote: > >> Andy, >> >> >> >> You might want to also checkout the Algebird libraries from Twitter. They >> have topK and a lot of other helpful functions. I’ve used the Algebird topk >> successfully on very large data sets. >> >> >> >> You can also use Spark SQL to do a “poor man’s” topK. This depends on how >> scrupulous you are about your TopKs (I can expound on this, if needed). >> >> >> >> I obfuscated the field names, before pasting this into email – I think I >> got them all consistently. >> >> >> >> Here’s the meat of the TopK part (found on SO, but I don’t have a >> reference) – this one takes the top 4, hence “rowNum <= 4”: >> >> >> >> SELECT time_bucket, >> >>identifier1, >> >>identifier2, >> >>incomingCount >> >> FROM (select time_bucket, >> >> identifier1, >> >> identifier2, >> >> incomingCount, >> >>ROW_NUMBER() OVER (PARTITION BY time_bucket, >> >>identifier1 >> >> ORDER BY count DESC) as rowNum >> >> FROM tablename) tmp >> >> WHERE rowNum <=4 >> >> ORDER BY time_bucket, identifier1, rowNum >> >> >> >> The count and order by: >> >> >> >> >> >> SELECT time_bucket, >> >>identifier1, >> >>identifier2, >> >>count(identifier2) as myCount >> >> FROM table >> >> GROUP BY time_bucket, >> >>identifier1, >> >>identifier2 >> >> ORDER BY time_bucket, >> >>identifier1, >> >>count(identifier2) DESC >> >> >> >> >> >> *From: *Andy Dang >> *Date: *Tuesday, January 3, 2017 at 7:06 AM >> *To: *user >> *Subject: *top-k function for Window >> >> >> >> Hi all, >> >> >> >> What's the best way to do top-k with Windowing in Dataset world? >> >> >> >> I have a snippet of code that filters the data to the top-k, but with >> skewed keys: >> >> >> >> val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime) >> >> val rank = row_number().over(windowSpec) >> >> >> >> input.withColumn("rank", rank).filter("rank <= 10").drop("rank") >> >> >> >> The problem with this code is that Spark doesn't know that it can sort >> the data locally, get the local rank first. What it ends up doing is >> performing a sort by key using the skewed keys, and this blew up the >> cluster since the keys are heavily skewed. >> >> >> >> In the RDD world we can do something like: >> >> rdd.mapPartitioins(iterator -> topK(iterator)) >> >> but I can't really think of an obvious to do this in the Dataset API, >> especially with Window function. I guess some UserAggregateFunction would >> do, but I wonder if there's obvious way that I missed. >> >> >> >> --- >> Regards, >> Andy >> > >
Re: top-k function for Window
Hi Austin, It's trivial to implement top-k in the RDD world - however I would like to stay in the Dataset API world instead of flip-flopping between the two APIs (consistency, wholestage codegen etc). The twitter library appears to support only RDD, and the solution you gave me is very similar to what I did - it doesn't work very well with skewed dataset :) (it has to perform the sort to work out the row number). I've been toying with the UDAF idea, but the more I write the code the more I see myself digging deeper into the developer API land - not very ideal to be honest. Also, UDAF doesn't have any concept of sorting, so it gets messy really fast. --- Regards, Andy On Tue, Jan 3, 2017 at 6:59 PM, HENSLEE, AUSTIN L wrote: > Andy, > > > > You might want to also checkout the Algebird libraries from Twitter. They > have topK and a lot of other helpful functions. I’ve used the Algebird topk > successfully on very large data sets. > > > > You can also use Spark SQL to do a “poor man’s” topK. This depends on how > scrupulous you are about your TopKs (I can expound on this, if needed). > > > > I obfuscated the field names, before pasting this into email – I think I > got them all consistently. > > > > Here’s the meat of the TopK part (found on SO, but I don’t have a > reference) – this one takes the top 4, hence “rowNum <= 4”: > > > > SELECT time_bucket, > >identifier1, > >identifier2, > >incomingCount > > FROM (select time_bucket, > > identifier1, > > identifier2, > > incomingCount, > >ROW_NUMBER() OVER (PARTITION BY time_bucket, > >identifier1 > > ORDER BY count DESC) as rowNum > > FROM tablename) tmp > > WHERE rowNum <=4 > > ORDER BY time_bucket, identifier1, rowNum > > > > The count and order by: > > > > > > SELECT time_bucket, > >identifier1, > >identifier2, > >count(identifier2) as myCount > > FROM table > > GROUP BY time_bucket, > >identifier1, > >identifier2 > > ORDER BY time_bucket, > >identifier1, > >count(identifier2) DESC > > > > > > *From: *Andy Dang > *Date: *Tuesday, January 3, 2017 at 7:06 AM > *To: *user > *Subject: *top-k function for Window > > > > Hi all, > > > > What's the best way to do top-k with Windowing in Dataset world? > > > > I have a snippet of code that filters the data to the top-k, but with > skewed keys: > > > > val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime) > > val rank = row_number().over(windowSpec) > > > > input.withColumn("rank", rank).filter("rank <= 10").drop("rank") > > > > The problem with this code is that Spark doesn't know that it can sort the > data locally, get the local rank first. What it ends up doing is performing > a sort by key using the skewed keys, and this blew up the cluster since the > keys are heavily skewed. > > > > In the RDD world we can do something like: > > rdd.mapPartitioins(iterator -> topK(iterator)) > > but I can't really think of an obvious to do this in the Dataset API, > especially with Window function. I guess some UserAggregateFunction would > do, but I wonder if there's obvious way that I missed. > > > > --- > Regards, > Andy >
Re: top-k function for Window
Andy, You might want to also checkout the Algebird libraries from Twitter. They have topK and a lot of other helpful functions. I’ve used the Algebird topk successfully on very large data sets. You can also use Spark SQL to do a “poor man’s” topK. This depends on how scrupulous you are about your TopKs (I can expound on this, if needed). I obfuscated the field names, before pasting this into email – I think I got them all consistently. Here’s the meat of the TopK part (found on SO, but I don’t have a reference) – this one takes the top 4, hence “rowNum <= 4”: SELECT time_bucket, identifier1, identifier2, incomingCount FROM (select time_bucket, identifier1, identifier2, incomingCount, ROW_NUMBER() OVER (PARTITION BY time_bucket, identifier1 ORDER BY count DESC) as rowNum FROM tablename) tmp WHERE rowNum <=4 ORDER BY time_bucket, identifier1, rowNum The count and order by: SELECT time_bucket, identifier1, identifier2, count(identifier2) as myCount FROM table GROUP BY time_bucket, identifier1, identifier2 ORDER BY time_bucket, identifier1, count(identifier2) DESC From: Andy Dang Date: Tuesday, January 3, 2017 at 7:06 AM To: user Subject: top-k function for Window Hi all, What's the best way to do top-k with Windowing in Dataset world? I have a snippet of code that filters the data to the top-k, but with skewed keys: val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime) val rank = row_number().over(windowSpec) input.withColumn("rank", rank).filter("rank <= 10").drop("rank") The problem with this code is that Spark doesn't know that it can sort the data locally, get the local rank first. What it ends up doing is performing a sort by key using the skewed keys, and this blew up the cluster since the keys are heavily skewed. In the RDD world we can do something like: rdd.mapPartitioins(iterator -> topK(iterator)) but I can't really think of an obvious to do this in the Dataset API, especially with Window function. I guess some UserAggregateFunction would do, but I wonder if there's obvious way that I missed. --- Regards, Andy
Re: top-k function for Window
> Furthermore, in your example you don’t even need a window function, you can simply use groupby and explode Can you clarify? You need to sort somehow (be it map-side sorting or reduce-side sorting). --- Regards, Andy On Tue, Jan 3, 2017 at 2:07 PM, Mendelson, Assaf wrote: > You can write a UDAF in which the buffer contains the top K and manage it. > This means you don’t need to sort at all. Furthermore, in your example you > don’t even need a window function, you can simply use groupby and explode. > > Of course, this is only relevant if k is small… > > > > *From:* Andy Dang [mailto:nam...@gmail.com] > *Sent:* Tuesday, January 03, 2017 3:07 PM > *To:* user > *Subject:* top-k function for Window > > > > Hi all, > > > > What's the best way to do top-k with Windowing in Dataset world? > > > > I have a snippet of code that filters the data to the top-k, but with > skewed keys: > > > > val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime) > > val rank = row_number().over(windowSpec) > > > > input.withColumn("rank", rank).filter("rank <= 10").drop("rank") > > > > The problem with this code is that Spark doesn't know that it can sort the > data locally, get the local rank first. What it ends up doing is performing > a sort by key using the skewed keys, and this blew up the cluster since the > keys are heavily skewed. > > > > In the RDD world we can do something like: > > rdd.mapPartitioins(iterator -> topK(iterator)) > > but I can't really think of an obvious to do this in the Dataset API, > especially with Window function. I guess some UserAggregateFunction would > do, but I wonder if there's obvious way that I missed. > > > > --- > Regards, > Andy >
RE: top-k function for Window
You can write a UDAF in which the buffer contains the top K and manage it. This means you don’t need to sort at all. Furthermore, in your example you don’t even need a window function, you can simply use groupby and explode. Of course, this is only relevant if k is small… From: Andy Dang [mailto:nam...@gmail.com] Sent: Tuesday, January 03, 2017 3:07 PM To: user Subject: top-k function for Window Hi all, What's the best way to do top-k with Windowing in Dataset world? I have a snippet of code that filters the data to the top-k, but with skewed keys: val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime) val rank = row_number().over(windowSpec) input.withColumn("rank", rank).filter("rank <= 10").drop("rank") The problem with this code is that Spark doesn't know that it can sort the data locally, get the local rank first. What it ends up doing is performing a sort by key using the skewed keys, and this blew up the cluster since the keys are heavily skewed. In the RDD world we can do something like: rdd.mapPartitioins(iterator -> topK(iterator)) but I can't really think of an obvious to do this in the Dataset API, especially with Window function. I guess some UserAggregateFunction would do, but I wonder if there's obvious way that I missed. --- Regards, Andy