Re: top-k function for Window
What about https://github.com/myui/hivemall/wiki/Efficient-Top-k-computation-on-Apache-Hive-using-Hivemall-UDTF Koert Kuipers <ko...@tresata.com> schrieb am Mi. 4. Jan. 2017 um 16:11: > i assumed topk of frequencies in one pass. if its topk by known > sorting/ordering then use priority queue aggregator instead of spacesaver. > > On Tue, Jan 3, 2017 at 3:11 PM, Koert Kuipers <ko...@tresata.com> wrote: > > i dont know anything about windowing or about not using developer apis... > > but > > but a trivial implementation of top-k requires a total sort per group. > this can be done with dataset. we do this using spark-sorted ( > https://github.com/tresata/spark-sorted) but its not hard to do it > yourself for datasets either. for rdds its actually a little harder i think > (if you want to avoid in memory assumption, which i assume you do).. > > a perhaps more efficient implementation uses an aggregator. it is not hard > to adapt algebirds topk aggregator (spacesaver) to use as a spark > aggregator. this requires a simple adapter class. we do this in-house as > well. although i have to say i would recommend spark 2.1.0 for this. spark > 2.0.x aggregator codegen is too buggy in my experience. > > On Tue, Jan 3, 2017 at 2:09 PM, Andy Dang <nam...@gmail.com> wrote: > > Hi Austin, > > It's trivial to implement top-k in the RDD world - however I would like to > stay in the Dataset API world instead of flip-flopping between the two APIs > (consistency, wholestage codegen etc). > > The twitter library appears to support only RDD, and the solution you gave > me is very similar to what I did - it doesn't work very well with skewed > dataset :) (it has to perform the sort to work out the row number). > > I've been toying with the UDAF idea, but the more I write the code the > more I see myself digging deeper into the developer API land - not very > ideal to be honest. Also, UDAF doesn't have any concept of sorting, so it > gets messy really fast. > > --- > Regards, > Andy > > On Tue, Jan 3, 2017 at 6:59 PM, HENSLEE, AUSTIN L <ah6...@att.com> wrote: > > Andy, > > > > You might want to also checkout the Algebird libraries from Twitter. They > have topK and a lot of other helpful functions. I’ve used the Algebird topk > successfully on very large data sets. > > > > You can also use Spark SQL to do a “poor man’s” topK. This depends on how > scrupulous you are about your TopKs (I can expound on this, if needed). > > > > I obfuscated the field names, before pasting this into email – I think I > got them all consistently. > > > > Here’s the meat of the TopK part (found on SO, but I don’t have a > reference) – this one takes the top 4, hence “rowNum <= 4”: > > > > SELECT time_bucket, > >identifier1, > >identifier2, > >incomingCount > > FROM (select time_bucket, > > identifier1, > > identifier2, > > incomingCount, > >ROW_NUMBER() OVER (PARTITION BY time_bucket, > >identifier1 > > ORDER BY count DESC) as rowNum > > FROM tablename) tmp > > WHERE rowNum <=4 > > ORDER BY time_bucket, identifier1, rowNum > > > > The count and order by: > > > > > > SELECT time_bucket, > >identifier1, > >identifier2, > >count(identifier2) as myCount > > FROM table > > GROUP BY time_bucket, > >identifier1, > >identifier2 > > ORDER BY time_bucket, > >identifier1, > >count(identifier2) DESC > > > > > > *From: *Andy Dang <nam...@gmail.com> > *Date: *Tuesday, January 3, 2017 at 7:06 AM > *To: *user <user@spark.apache.org> > *Subject: *top-k function for Window > > > > Hi all, > > > > What's the best way to do top-k with Windowing in Dataset world? > > > > I have a snippet of code that filters the data to the top-k, but with > skewed keys: > > > > val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime) > > val rank = row_number().over(windowSpec) > > > > input.withColumn("rank", rank).filter("rank <= 10").drop("rank") > > > > The problem with this code is that Spark doesn't know that it can sort the > data locally, get the local rank first. What it ends up doing is performing > a sort by key using the skewed keys, and this blew up the cluster since the > keys are heavily skewed. > > > > In the RDD world we can do something like: > > rdd.mapPartitioins(iterator -> topK(iterator)) > > but I can't really think of an obvious to do this in the Dataset API, > especially with Window function. I guess some UserAggregateFunction would > do, but I wonder if there's obvious way that I missed. > > > > --- > Regards, > Andy > > > > >
Re: top-k function for Window
i assumed topk of frequencies in one pass. if its topk by known sorting/ordering then use priority queue aggregator instead of spacesaver. On Tue, Jan 3, 2017 at 3:11 PM, Koert Kuipers <ko...@tresata.com> wrote: > i dont know anything about windowing or about not using developer apis... > > but > > but a trivial implementation of top-k requires a total sort per group. > this can be done with dataset. we do this using spark-sorted ( > https://github.com/tresata/spark-sorted) but its not hard to do it > yourself for datasets either. for rdds its actually a little harder i think > (if you want to avoid in memory assumption, which i assume you do).. > > a perhaps more efficient implementation uses an aggregator. it is not hard > to adapt algebirds topk aggregator (spacesaver) to use as a spark > aggregator. this requires a simple adapter class. we do this in-house as > well. although i have to say i would recommend spark 2.1.0 for this. spark > 2.0.x aggregator codegen is too buggy in my experience. > > On Tue, Jan 3, 2017 at 2:09 PM, Andy Dang <nam...@gmail.com> wrote: > >> Hi Austin, >> >> It's trivial to implement top-k in the RDD world - however I would like >> to stay in the Dataset API world instead of flip-flopping between the two >> APIs (consistency, wholestage codegen etc). >> >> The twitter library appears to support only RDD, and the solution you >> gave me is very similar to what I did - it doesn't work very well with >> skewed dataset :) (it has to perform the sort to work out the row number). >> >> I've been toying with the UDAF idea, but the more I write the code the >> more I see myself digging deeper into the developer API land - not very >> ideal to be honest. Also, UDAF doesn't have any concept of sorting, so it >> gets messy really fast. >> >> --- >> Regards, >> Andy >> >> On Tue, Jan 3, 2017 at 6:59 PM, HENSLEE, AUSTIN L <ah6...@att.com> wrote: >> >>> Andy, >>> >>> >>> >>> You might want to also checkout the Algebird libraries from Twitter. >>> They have topK and a lot of other helpful functions. I’ve used the Algebird >>> topk successfully on very large data sets. >>> >>> >>> >>> You can also use Spark SQL to do a “poor man’s” topK. This depends on >>> how scrupulous you are about your TopKs (I can expound on this, if needed). >>> >>> >>> >>> I obfuscated the field names, before pasting this into email – I think I >>> got them all consistently. >>> >>> >>> >>> Here’s the meat of the TopK part (found on SO, but I don’t have a >>> reference) – this one takes the top 4, hence “rowNum <= 4”: >>> >>> >>> >>> SELECT time_bucket, >>> >>>identifier1, >>> >>>identifier2, >>> >>>incomingCount >>> >>> FROM (select time_bucket, >>> >>> identifier1, >>> >>> identifier2, >>> >>> incomingCount, >>> >>>ROW_NUMBER() OVER (PARTITION BY time_bucket, >>> >>>identifier1 >>> >>> ORDER BY count DESC) as rowNum >>> >>> FROM tablename) tmp >>> >>> WHERE rowNum <=4 >>> >>> ORDER BY time_bucket, identifier1, rowNum >>> >>> >>> >>> The count and order by: >>> >>> >>> >>> >>> >>> SELECT time_bucket, >>> >>>identifier1, >>> >>>identifier2, >>> >>>count(identifier2) as myCount >>> >>> FROM table >>> >>> GROUP BY time_bucket, >>> >>>identifier1, >>> >>>identifier2 >>> >>> ORDER BY time_bucket, >>> >>>identifier1, >>> >>>count(identifier2) DESC >>> >>> >>> >>> >>> >>> *From: *Andy Dang <nam...@gmail.com> >>> *Date: *Tuesday, January 3, 2017 at 7:06 AM >>> *To: *user <user@spark.apache.org> >>> *Subject: *top-k function for Window >>> >>> >>> >>> Hi all, >>> >>> >>> >>> What's the best way to do top-k with Windowing in Dataset world? >>> >>> >>> >>> I have a snippet of code that filters the data to the top-k, but with >>> skewed keys: >>> >>> >>> >>> val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime) >>> >>> val rank = row_number().over(windowSpec) >>> >>> >>> >>> input.withColumn("rank", rank).filter("rank <= 10").drop("rank") >>> >>> >>> >>> The problem with this code is that Spark doesn't know that it can sort >>> the data locally, get the local rank first. What it ends up doing is >>> performing a sort by key using the skewed keys, and this blew up the >>> cluster since the keys are heavily skewed. >>> >>> >>> >>> In the RDD world we can do something like: >>> >>> rdd.mapPartitioins(iterator -> topK(iterator)) >>> >>> but I can't really think of an obvious to do this in the Dataset API, >>> especially with Window function. I guess some UserAggregateFunction would >>> do, but I wonder if there's obvious way that I missed. >>> >>> >>> >>> --- >>> Regards, >>> Andy >>> >> >> >
RE: top-k function for Window
Assume you have a UDAF which looks like this: - Input: The value - Buffer: K elements - Output: An array (which would have the K elements) - Init: Initialize all elements to some irrelevant value (e.g. int.MinValue) - Update: Start going over the buffer find the spot which is smaller than the current value then push everything forward and put it in (i.e. sorted insert) - Merge: “merge sort” between the two buffers - Evaluate: turn the buffer to array Then run the UDAF on the groupby. The result would be an array of (upto) K elements per key. To turn it back to K lines all you need to do is explode it. Assuming that K is small, the calculation of the UDAF would be much faster than the sorting (it only needs to do sortings on very small K). Assaf. From: Andy Dang [mailto:nam...@gmail.com] Sent: Tuesday, January 03, 2017 8:03 PM To: Mendelson, Assaf Cc: user Subject: Re: top-k function for Window > Furthermore, in your example you don’t even need a window function, you can > simply use groupby and explode Can you clarify? You need to sort somehow (be it map-side sorting or reduce-side sorting). --- Regards, Andy On Tue, Jan 3, 2017 at 2:07 PM, Mendelson, Assaf <assaf.mendel...@rsa.com<mailto:assaf.mendel...@rsa.com>> wrote: You can write a UDAF in which the buffer contains the top K and manage it. This means you don’t need to sort at all. Furthermore, in your example you don’t even need a window function, you can simply use groupby and explode. Of course, this is only relevant if k is small… From: Andy Dang [mailto:nam...@gmail.com<mailto:nam...@gmail.com>] Sent: Tuesday, January 03, 2017 3:07 PM To: user Subject: top-k function for Window Hi all, What's the best way to do top-k with Windowing in Dataset world? I have a snippet of code that filters the data to the top-k, but with skewed keys: val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime) val rank = row_number().over(windowSpec) input.withColumn("rank", rank).filter("rank <= 10").drop("rank") The problem with this code is that Spark doesn't know that it can sort the data locally, get the local rank first. What it ends up doing is performing a sort by key using the skewed keys, and this blew up the cluster since the keys are heavily skewed. In the RDD world we can do something like: rdd.mapPartitioins(iterator -> topK(iterator)) but I can't really think of an obvious to do this in the Dataset API, especially with Window function. I guess some UserAggregateFunction would do, but I wonder if there's obvious way that I missed. --- Regards, Andy
Re: top-k function for Window
i dont know anything about windowing or about not using developer apis... but but a trivial implementation of top-k requires a total sort per group. this can be done with dataset. we do this using spark-sorted ( https://github.com/tresata/spark-sorted) but its not hard to do it yourself for datasets either. for rdds its actually a little harder i think (if you want to avoid in memory assumption, which i assume you do).. a perhaps more efficient implementation uses an aggregator. it is not hard to adapt algebirds topk aggregator (spacesaver) to use as a spark aggregator. this requires a simple adapter class. we do this in-house as well. although i have to say i would recommend spark 2.1.0 for this. spark 2.0.x aggregator codegen is too buggy in my experience. On Tue, Jan 3, 2017 at 2:09 PM, Andy Dang <nam...@gmail.com> wrote: > Hi Austin, > > It's trivial to implement top-k in the RDD world - however I would like to > stay in the Dataset API world instead of flip-flopping between the two APIs > (consistency, wholestage codegen etc). > > The twitter library appears to support only RDD, and the solution you gave > me is very similar to what I did - it doesn't work very well with skewed > dataset :) (it has to perform the sort to work out the row number). > > I've been toying with the UDAF idea, but the more I write the code the > more I see myself digging deeper into the developer API land - not very > ideal to be honest. Also, UDAF doesn't have any concept of sorting, so it > gets messy really fast. > > --- > Regards, > Andy > > On Tue, Jan 3, 2017 at 6:59 PM, HENSLEE, AUSTIN L <ah6...@att.com> wrote: > >> Andy, >> >> >> >> You might want to also checkout the Algebird libraries from Twitter. They >> have topK and a lot of other helpful functions. I’ve used the Algebird topk >> successfully on very large data sets. >> >> >> >> You can also use Spark SQL to do a “poor man’s” topK. This depends on how >> scrupulous you are about your TopKs (I can expound on this, if needed). >> >> >> >> I obfuscated the field names, before pasting this into email – I think I >> got them all consistently. >> >> >> >> Here’s the meat of the TopK part (found on SO, but I don’t have a >> reference) – this one takes the top 4, hence “rowNum <= 4”: >> >> >> >> SELECT time_bucket, >> >>identifier1, >> >>identifier2, >> >>incomingCount >> >> FROM (select time_bucket, >> >> identifier1, >> >> identifier2, >> >> incomingCount, >> >>ROW_NUMBER() OVER (PARTITION BY time_bucket, >> >>identifier1 >> >> ORDER BY count DESC) as rowNum >> >> FROM tablename) tmp >> >> WHERE rowNum <=4 >> >> ORDER BY time_bucket, identifier1, rowNum >> >> >> >> The count and order by: >> >> >> >> >> >> SELECT time_bucket, >> >> identifier1, >> >>identifier2, >> >>count(identifier2) as myCount >> >> FROM table >> >> GROUP BY time_bucket, >> >>identifier1, >> >>identifier2 >> >> ORDER BY time_bucket, >> >>identifier1, >> >>count(identifier2) DESC >> >> >> >> >> >> *From: *Andy Dang <nam...@gmail.com> >> *Date: *Tuesday, January 3, 2017 at 7:06 AM >> *To: *user <user@spark.apache.org> >> *Subject: *top-k function for Window >> >> >> >> Hi all, >> >> >> >> What's the best way to do top-k with Windowing in Dataset world? >> >> >> >> I have a snippet of code that filters the data to the top-k, but with >> skewed keys: >> >> >> >> val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime) >> >> val rank = row_number().over(windowSpec) >> >> >> >> input.withColumn("rank", rank).filter("rank <= 10").drop("rank") >> >> >> >> The problem with this code is that Spark doesn't know that it can sort >> the data locally, get the local rank first. What it ends up doing is >> performing a sort by key using the skewed keys, and this blew up the >> cluster since the keys are heavily skewed. >> >> >> >> In the RDD world we can do something like: >> >> rdd.mapPartitioins(iterator -> topK(iterator)) >> >> but I can't really think of an obvious to do this in the Dataset API, >> especially with Window function. I guess some UserAggregateFunction would >> do, but I wonder if there's obvious way that I missed. >> >> >> >> --- >> Regards, >> Andy >> > >
Re: top-k function for Window
Hi Austin, It's trivial to implement top-k in the RDD world - however I would like to stay in the Dataset API world instead of flip-flopping between the two APIs (consistency, wholestage codegen etc). The twitter library appears to support only RDD, and the solution you gave me is very similar to what I did - it doesn't work very well with skewed dataset :) (it has to perform the sort to work out the row number). I've been toying with the UDAF idea, but the more I write the code the more I see myself digging deeper into the developer API land - not very ideal to be honest. Also, UDAF doesn't have any concept of sorting, so it gets messy really fast. --- Regards, Andy On Tue, Jan 3, 2017 at 6:59 PM, HENSLEE, AUSTIN L <ah6...@att.com> wrote: > Andy, > > > > You might want to also checkout the Algebird libraries from Twitter. They > have topK and a lot of other helpful functions. I’ve used the Algebird topk > successfully on very large data sets. > > > > You can also use Spark SQL to do a “poor man’s” topK. This depends on how > scrupulous you are about your TopKs (I can expound on this, if needed). > > > > I obfuscated the field names, before pasting this into email – I think I > got them all consistently. > > > > Here’s the meat of the TopK part (found on SO, but I don’t have a > reference) – this one takes the top 4, hence “rowNum <= 4”: > > > > SELECT time_bucket, > >identifier1, > >identifier2, > >incomingCount > > FROM (select time_bucket, > > identifier1, > > identifier2, > > incomingCount, > >ROW_NUMBER() OVER (PARTITION BY time_bucket, > >identifier1 > > ORDER BY count DESC) as rowNum > > FROM tablename) tmp > > WHERE rowNum <=4 > > ORDER BY time_bucket, identifier1, rowNum > > > > The count and order by: > > > > > > SELECT time_bucket, > >identifier1, > >identifier2, > >count(identifier2) as myCount > > FROM table > > GROUP BY time_bucket, > >identifier1, > >identifier2 > > ORDER BY time_bucket, > >identifier1, > >count(identifier2) DESC > > > > > > *From: *Andy Dang <nam...@gmail.com> > *Date: *Tuesday, January 3, 2017 at 7:06 AM > *To: *user <user@spark.apache.org> > *Subject: *top-k function for Window > > > > Hi all, > > > > What's the best way to do top-k with Windowing in Dataset world? > > > > I have a snippet of code that filters the data to the top-k, but with > skewed keys: > > > > val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime) > > val rank = row_number().over(windowSpec) > > > > input.withColumn("rank", rank).filter("rank <= 10").drop("rank") > > > > The problem with this code is that Spark doesn't know that it can sort the > data locally, get the local rank first. What it ends up doing is performing > a sort by key using the skewed keys, and this blew up the cluster since the > keys are heavily skewed. > > > > In the RDD world we can do something like: > > rdd.mapPartitioins(iterator -> topK(iterator)) > > but I can't really think of an obvious to do this in the Dataset API, > especially with Window function. I guess some UserAggregateFunction would > do, but I wonder if there's obvious way that I missed. > > > > --- > Regards, > Andy >
Re: top-k function for Window
Andy, You might want to also checkout the Algebird libraries from Twitter. They have topK and a lot of other helpful functions. I’ve used the Algebird topk successfully on very large data sets. You can also use Spark SQL to do a “poor man’s” topK. This depends on how scrupulous you are about your TopKs (I can expound on this, if needed). I obfuscated the field names, before pasting this into email – I think I got them all consistently. Here’s the meat of the TopK part (found on SO, but I don’t have a reference) – this one takes the top 4, hence “rowNum <= 4”: SELECT time_bucket, identifier1, identifier2, incomingCount FROM (select time_bucket, identifier1, identifier2, incomingCount, ROW_NUMBER() OVER (PARTITION BY time_bucket, identifier1 ORDER BY count DESC) as rowNum FROM tablename) tmp WHERE rowNum <=4 ORDER BY time_bucket, identifier1, rowNum The count and order by: SELECT time_bucket, identifier1, identifier2, count(identifier2) as myCount FROM table GROUP BY time_bucket, identifier1, identifier2 ORDER BY time_bucket, identifier1, count(identifier2) DESC From: Andy Dang <nam...@gmail.com> Date: Tuesday, January 3, 2017 at 7:06 AM To: user <user@spark.apache.org> Subject: top-k function for Window Hi all, What's the best way to do top-k with Windowing in Dataset world? I have a snippet of code that filters the data to the top-k, but with skewed keys: val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime) val rank = row_number().over(windowSpec) input.withColumn("rank", rank).filter("rank <= 10").drop("rank") The problem with this code is that Spark doesn't know that it can sort the data locally, get the local rank first. What it ends up doing is performing a sort by key using the skewed keys, and this blew up the cluster since the keys are heavily skewed. In the RDD world we can do something like: rdd.mapPartitioins(iterator -> topK(iterator)) but I can't really think of an obvious to do this in the Dataset API, especially with Window function. I guess some UserAggregateFunction would do, but I wonder if there's obvious way that I missed. --- Regards, Andy
RE: top-k function for Window
You can write a UDAF in which the buffer contains the top K and manage it. This means you don’t need to sort at all. Furthermore, in your example you don’t even need a window function, you can simply use groupby and explode. Of course, this is only relevant if k is small… From: Andy Dang [mailto:nam...@gmail.com] Sent: Tuesday, January 03, 2017 3:07 PM To: user Subject: top-k function for Window Hi all, What's the best way to do top-k with Windowing in Dataset world? I have a snippet of code that filters the data to the top-k, but with skewed keys: val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime) val rank = row_number().over(windowSpec) input.withColumn("rank", rank).filter("rank <= 10").drop("rank") The problem with this code is that Spark doesn't know that it can sort the data locally, get the local rank first. What it ends up doing is performing a sort by key using the skewed keys, and this blew up the cluster since the keys are heavily skewed. In the RDD world we can do something like: rdd.mapPartitioins(iterator -> topK(iterator)) but I can't really think of an obvious to do this in the Dataset API, especially with Window function. I guess some UserAggregateFunction would do, but I wonder if there's obvious way that I missed. --- Regards, Andy
top-k function for Window
Hi all, What's the best way to do top-k with Windowing in Dataset world? I have a snippet of code that filters the data to the top-k, but with skewed keys: val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime) val rank = row_number().over(windowSpec) input.withColumn("rank", rank).filter("rank <= 10").drop("rank") The problem with this code is that Spark doesn't know that it can sort the data locally, get the local rank first. What it ends up doing is performing a sort by key using the skewed keys, and this blew up the cluster since the keys are heavily skewed. In the RDD world we can do something like: rdd.mapPartitioins(iterator -> topK(iterator)) but I can't really think of an obvious to do this in the Dataset API, especially with Window function. I guess some UserAggregateFunction would do, but I wonder if there's obvious way that I missed. --- Regards, Andy