Re: top-k function for Window

2017-01-04 Thread Andy Dang
Thanks for all the suggestions.

So after toying with a bunch of examples I ended up taking the following
approach:
- Convert dataset to RDD
- Key by the columns I wanted
- Use aggregateByKey() to aggregate data into a serializable structure
backed by a bounded priority queue
- flatMap the result to get back an RDD
- Convert the RDD to Dataset<> since it's got the same schema

I tried the UDAF approach but it appears that in order to use ArrayType, I
have to store the rows in InternalRow format. It's not obvious to me how to
convert this InternalRow format back to the original Row, which is required
by evaluate() method (so I can explode the data to original schema). I
actually got it working for flat schema, but scraped it in the end since
it's not really what I wanted.

Not really a big fan of the RDD approach but if anyone's got the UDAF
approach working then please let me know :)

---
Regards,
Andy

On Wed, Jan 4, 2017 at 5:29 PM, Georg Heiler 
wrote:

> What about https://github.com/myui/hivemall/wiki/Efficient-Top-k-
> computation-on-Apache-Hive-using-Hivemall-UDTF
>
> Koert Kuipers  schrieb am Mi. 4. Jan. 2017 um 16:11:
>
>> i assumed topk of frequencies in one pass. if its topk by known
>> sorting/ordering then use priority queue aggregator instead of spacesaver.
>>
>> On Tue, Jan 3, 2017 at 3:11 PM, Koert Kuipers  wrote:
>>
>> i dont know anything about windowing or about not using developer apis...
>>
>> but
>>
>> but a trivial implementation of top-k requires a total sort per group.
>> this can be done with dataset. we do this using spark-sorted (
>> https://github.com/tresata/spark-sorted) but its not hard to do it
>> yourself for datasets either. for rdds its actually a little harder i think
>> (if you want to avoid in memory assumption, which i assume you do)..
>>
>> a perhaps more efficient implementation uses an aggregator. it is not
>> hard to adapt algebirds topk aggregator (spacesaver) to use as a spark
>> aggregator. this requires a simple adapter class. we do this in-house as
>> well. although i have to say i would recommend spark 2.1.0 for this. spark
>> 2.0.x aggregator codegen is too buggy in my experience.
>>
>> On Tue, Jan 3, 2017 at 2:09 PM, Andy Dang  wrote:
>>
>> Hi Austin,
>>
>> It's trivial to implement top-k in the RDD world - however I would like
>> to stay in the Dataset API world instead of flip-flopping between the two
>> APIs (consistency, wholestage codegen etc).
>>
>> The twitter library appears to support only RDD, and the solution you
>> gave me is very similar to what I did - it doesn't work very well with
>> skewed dataset :) (it has to perform the sort to work out the row number).
>>
>> I've been toying with the UDAF idea, but the more I write the code the
>> more I see myself digging deeper into the developer API land  - not very
>> ideal to be honest. Also, UDAF doesn't have any concept of sorting, so it
>> gets messy really fast.
>>
>> ---
>> Regards,
>> Andy
>>
>> On Tue, Jan 3, 2017 at 6:59 PM, HENSLEE, AUSTIN L  wrote:
>>
>> Andy,
>>
>>
>>
>> You might want to also checkout the Algebird libraries from Twitter. They
>> have topK and a lot of other helpful functions. I’ve used the Algebird topk
>> successfully on very large data sets.
>>
>>
>>
>> You can also use Spark SQL to do a “poor man’s” topK. This depends on how
>> scrupulous you are about your TopKs (I can expound on this, if needed).
>>
>>
>>
>> I obfuscated the field names, before pasting this into email – I think I
>> got them all consistently.
>>
>>
>>
>> Here’s the meat of the TopK part (found on SO, but I don’t have a
>> reference) – this one takes the top 4, hence “rowNum <= 4”:
>>
>>
>>
>> SELECT time_bucket,
>>
>>identifier1,
>>
>>identifier2,
>>
>>incomingCount
>>
>>   FROM (select time_bucket,
>>
>> identifier1,
>>
>> identifier2,
>>
>> incomingCount,
>>
>>ROW_NUMBER() OVER (PARTITION BY time_bucket,
>>
>>identifier1
>>
>>   ORDER BY count DESC) as rowNum
>>
>>   FROM tablename) tmp
>>
>>   WHERE rowNum <=4
>>
>>   ORDER BY time_bucket, identifier1, rowNum
>>
>>
>>
>> The count and order by:
>>
>>
>>
>>
>>
>> SELECT time_bucket,
>>
>>identifier1,
>>
>>identifier2,
>>
>>count(identifier2) as myCount
>>
>>   FROM table
>>
>>   GROUP BY time_bucket,
>>
>>identifier1,
>>
>>identifier2
>>
>>   ORDER BY time_bucket,
>>
>>identifier1,
>>
>>count(identifier2) DESC
>>
>>
>>
>>
>>
>> *From: *Andy Dang 
>> *Date: *Tuesday, January 3, 2017 at 7:06 AM
>> *To: *user 
>> *Subject: *top-k function for Window
>>
>>
>>
>> Hi all,
>>
>>
>>
>> What's the best way to do top-k with Windowing in Dataset world?
>>
>>
>>
>> I have a snippet of code that filters the data to the top-k, but with
>> skewed keys:
>>
>>
>>
>> val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTim

Re: top-k function for Window

2017-01-04 Thread Georg Heiler
What about
https://github.com/myui/hivemall/wiki/Efficient-Top-k-computation-on-Apache-Hive-using-Hivemall-UDTF
Koert Kuipers  schrieb am Mi. 4. Jan. 2017 um 16:11:

> i assumed topk of frequencies in one pass. if its topk by known
> sorting/ordering then use priority queue aggregator instead of spacesaver.
>
> On Tue, Jan 3, 2017 at 3:11 PM, Koert Kuipers  wrote:
>
> i dont know anything about windowing or about not using developer apis...
>
> but
>
> but a trivial implementation of top-k requires a total sort per group.
> this can be done with dataset. we do this using spark-sorted (
> https://github.com/tresata/spark-sorted) but its not hard to do it
> yourself for datasets either. for rdds its actually a little harder i think
> (if you want to avoid in memory assumption, which i assume you do)..
>
> a perhaps more efficient implementation uses an aggregator. it is not hard
> to adapt algebirds topk aggregator (spacesaver) to use as a spark
> aggregator. this requires a simple adapter class. we do this in-house as
> well. although i have to say i would recommend spark 2.1.0 for this. spark
> 2.0.x aggregator codegen is too buggy in my experience.
>
> On Tue, Jan 3, 2017 at 2:09 PM, Andy Dang  wrote:
>
> Hi Austin,
>
> It's trivial to implement top-k in the RDD world - however I would like to
> stay in the Dataset API world instead of flip-flopping between the two APIs
> (consistency, wholestage codegen etc).
>
> The twitter library appears to support only RDD, and the solution you gave
> me is very similar to what I did - it doesn't work very well with skewed
> dataset :) (it has to perform the sort to work out the row number).
>
> I've been toying with the UDAF idea, but the more I write the code the
> more I see myself digging deeper into the developer API land  - not very
> ideal to be honest. Also, UDAF doesn't have any concept of sorting, so it
> gets messy really fast.
>
> ---
> Regards,
> Andy
>
> On Tue, Jan 3, 2017 at 6:59 PM, HENSLEE, AUSTIN L  wrote:
>
> Andy,
>
>
>
> You might want to also checkout the Algebird libraries from Twitter. They
> have topK and a lot of other helpful functions. I’ve used the Algebird topk
> successfully on very large data sets.
>
>
>
> You can also use Spark SQL to do a “poor man’s” topK. This depends on how
> scrupulous you are about your TopKs (I can expound on this, if needed).
>
>
>
> I obfuscated the field names, before pasting this into email – I think I
> got them all consistently.
>
>
>
> Here’s the meat of the TopK part (found on SO, but I don’t have a
> reference) – this one takes the top 4, hence “rowNum <= 4”:
>
>
>
> SELECT time_bucket,
>
>identifier1,
>
>identifier2,
>
>incomingCount
>
>   FROM (select time_bucket,
>
> identifier1,
>
> identifier2,
>
> incomingCount,
>
>ROW_NUMBER() OVER (PARTITION BY time_bucket,
>
>identifier1
>
>   ORDER BY count DESC) as rowNum
>
>   FROM tablename) tmp
>
>   WHERE rowNum <=4
>
>   ORDER BY time_bucket, identifier1, rowNum
>
>
>
> The count and order by:
>
>
>
>
>
> SELECT time_bucket,
>
>identifier1,
>
>identifier2,
>
>count(identifier2) as myCount
>
>   FROM table
>
>   GROUP BY time_bucket,
>
>identifier1,
>
>identifier2
>
>   ORDER BY time_bucket,
>
>identifier1,
>
>count(identifier2) DESC
>
>
>
>
>
> *From: *Andy Dang 
> *Date: *Tuesday, January 3, 2017 at 7:06 AM
> *To: *user 
> *Subject: *top-k function for Window
>
>
>
> Hi all,
>
>
>
> What's the best way to do top-k with Windowing in Dataset world?
>
>
>
> I have a snippet of code that filters the data to the top-k, but with
> skewed keys:
>
>
>
> val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime)
>
> val rank = row_number().over(windowSpec)
>
>
>
> input.withColumn("rank", rank).filter("rank <= 10").drop("rank")
>
>
>
> The problem with this code is that Spark doesn't know that it can sort the
> data locally, get the local rank first. What it ends up doing is performing
> a sort by key using the skewed keys, and this blew up the cluster since the
> keys are heavily skewed.
>
>
>
> In the RDD world we can do something like:
>
> rdd.mapPartitioins(iterator -> topK(iterator))
>
> but I can't really think of an obvious to do this in the Dataset API,
> especially with Window function. I guess some UserAggregateFunction would
> do, but I wonder if there's obvious way that I missed.
>
>
>
> ---
> Regards,
> Andy
>
>
>
>
>


Re: top-k function for Window

2017-01-04 Thread Koert Kuipers
i assumed topk of frequencies in one pass. if its topk by known
sorting/ordering then use priority queue aggregator instead of spacesaver.

On Tue, Jan 3, 2017 at 3:11 PM, Koert Kuipers  wrote:

> i dont know anything about windowing or about not using developer apis...
>
> but
>
> but a trivial implementation of top-k requires a total sort per group.
> this can be done with dataset. we do this using spark-sorted (
> https://github.com/tresata/spark-sorted) but its not hard to do it
> yourself for datasets either. for rdds its actually a little harder i think
> (if you want to avoid in memory assumption, which i assume you do)..
>
> a perhaps more efficient implementation uses an aggregator. it is not hard
> to adapt algebirds topk aggregator (spacesaver) to use as a spark
> aggregator. this requires a simple adapter class. we do this in-house as
> well. although i have to say i would recommend spark 2.1.0 for this. spark
> 2.0.x aggregator codegen is too buggy in my experience.
>
> On Tue, Jan 3, 2017 at 2:09 PM, Andy Dang  wrote:
>
>> Hi Austin,
>>
>> It's trivial to implement top-k in the RDD world - however I would like
>> to stay in the Dataset API world instead of flip-flopping between the two
>> APIs (consistency, wholestage codegen etc).
>>
>> The twitter library appears to support only RDD, and the solution you
>> gave me is very similar to what I did - it doesn't work very well with
>> skewed dataset :) (it has to perform the sort to work out the row number).
>>
>> I've been toying with the UDAF idea, but the more I write the code the
>> more I see myself digging deeper into the developer API land  - not very
>> ideal to be honest. Also, UDAF doesn't have any concept of sorting, so it
>> gets messy really fast.
>>
>> ---
>> Regards,
>> Andy
>>
>> On Tue, Jan 3, 2017 at 6:59 PM, HENSLEE, AUSTIN L  wrote:
>>
>>> Andy,
>>>
>>>
>>>
>>> You might want to also checkout the Algebird libraries from Twitter.
>>> They have topK and a lot of other helpful functions. I’ve used the Algebird
>>> topk successfully on very large data sets.
>>>
>>>
>>>
>>> You can also use Spark SQL to do a “poor man’s” topK. This depends on
>>> how scrupulous you are about your TopKs (I can expound on this, if needed).
>>>
>>>
>>>
>>> I obfuscated the field names, before pasting this into email – I think I
>>> got them all consistently.
>>>
>>>
>>>
>>> Here’s the meat of the TopK part (found on SO, but I don’t have a
>>> reference) – this one takes the top 4, hence “rowNum <= 4”:
>>>
>>>
>>>
>>> SELECT time_bucket,
>>>
>>>identifier1,
>>>
>>>identifier2,
>>>
>>>incomingCount
>>>
>>>   FROM (select time_bucket,
>>>
>>> identifier1,
>>>
>>> identifier2,
>>>
>>> incomingCount,
>>>
>>>ROW_NUMBER() OVER (PARTITION BY time_bucket,
>>>
>>>identifier1
>>>
>>>   ORDER BY count DESC) as rowNum
>>>
>>>   FROM tablename) tmp
>>>
>>>   WHERE rowNum <=4
>>>
>>>   ORDER BY time_bucket, identifier1, rowNum
>>>
>>>
>>>
>>> The count and order by:
>>>
>>>
>>>
>>>
>>>
>>> SELECT time_bucket,
>>>
>>>identifier1,
>>>
>>>identifier2,
>>>
>>>count(identifier2) as myCount
>>>
>>>   FROM table
>>>
>>>   GROUP BY time_bucket,
>>>
>>>identifier1,
>>>
>>>identifier2
>>>
>>>   ORDER BY time_bucket,
>>>
>>>identifier1,
>>>
>>>count(identifier2) DESC
>>>
>>>
>>>
>>>
>>>
>>> *From: *Andy Dang 
>>> *Date: *Tuesday, January 3, 2017 at 7:06 AM
>>> *To: *user 
>>> *Subject: *top-k function for Window
>>>
>>>
>>>
>>> Hi all,
>>>
>>>
>>>
>>> What's the best way to do top-k with Windowing in Dataset world?
>>>
>>>
>>>
>>> I have a snippet of code that filters the data to the top-k, but with
>>> skewed keys:
>>>
>>>
>>>
>>> val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime)
>>>
>>> val rank = row_number().over(windowSpec)
>>>
>>>
>>>
>>> input.withColumn("rank", rank).filter("rank <= 10").drop("rank")
>>>
>>>
>>>
>>> The problem with this code is that Spark doesn't know that it can sort
>>> the data locally, get the local rank first. What it ends up doing is
>>> performing a sort by key using the skewed keys, and this blew up the
>>> cluster since the keys are heavily skewed.
>>>
>>>
>>>
>>> In the RDD world we can do something like:
>>>
>>> rdd.mapPartitioins(iterator -> topK(iterator))
>>>
>>> but I can't really think of an obvious to do this in the Dataset API,
>>> especially with Window function. I guess some UserAggregateFunction would
>>> do, but I wonder if there's obvious way that I missed.
>>>
>>>
>>>
>>> ---
>>> Regards,
>>> Andy
>>>
>>
>>
>


RE: top-k function for Window

2017-01-03 Thread Mendelson, Assaf
Assume you have a UDAF which looks like this:

-  Input: The value

-  Buffer: K elements

-  Output: An array (which would have the K elements)

-  Init: Initialize all elements to some irrelevant value (e.g. 
int.MinValue)

-  Update: Start going over the buffer find the spot which is smaller 
than the current value then push everything forward and put it in (i.e. sorted 
insert)

-  Merge: “merge sort” between the two buffers

-  Evaluate: turn the buffer to array
Then run the UDAF on the groupby.

The result would be an array of (upto) K elements per key. To turn it back to K 
lines all you need to do is explode it.

Assuming that K is small, the calculation of the UDAF would be much faster than 
the sorting (it only needs to do sortings on very small K).

Assaf.
From: Andy Dang [mailto:nam...@gmail.com]
Sent: Tuesday, January 03, 2017 8:03 PM
To: Mendelson, Assaf
Cc: user
Subject: Re: top-k function for Window

> Furthermore, in your example you don’t even need a window function, you can 
> simply use groupby and explode

Can you clarify? You need to sort somehow (be it map-side sorting or 
reduce-side sorting).



---
Regards,
Andy

On Tue, Jan 3, 2017 at 2:07 PM, Mendelson, Assaf 
mailto:assaf.mendel...@rsa.com>> wrote:
You can write a UDAF in which the buffer contains the top K and manage it. This 
means you don’t need to sort at all. Furthermore, in your example you don’t 
even need a window function, you can simply use groupby and explode.
Of course, this is only relevant if k is small…

From: Andy Dang [mailto:nam...@gmail.com<mailto:nam...@gmail.com>]
Sent: Tuesday, January 03, 2017 3:07 PM
To: user
Subject: top-k function for Window

Hi all,

What's the best way to do top-k with Windowing in Dataset world?

I have a snippet of code that filters the data to the top-k, but with skewed 
keys:

val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime)
val rank = row_number().over(windowSpec)

input.withColumn("rank", rank).filter("rank <= 10").drop("rank")

The problem with this code is that Spark doesn't know that it can sort the data 
locally, get the local rank first. What it ends up doing is performing a sort 
by key using the skewed keys, and this blew up the cluster since the keys are 
heavily skewed.

In the RDD world we can do something like:
rdd.mapPartitioins(iterator -> topK(iterator))
but I can't really think of an obvious to do this in the Dataset API, 
especially with Window function. I guess some UserAggregateFunction would do, 
but I wonder if there's obvious way that I missed.

---
Regards,
Andy



Re: top-k function for Window

2017-01-03 Thread Koert Kuipers
i dont know anything about windowing or about not using developer apis...

but

but a trivial implementation of top-k requires a total sort per group. this
can be done with dataset. we do this using spark-sorted (
https://github.com/tresata/spark-sorted) but its not hard to do it yourself
for datasets either. for rdds its actually a little harder i think (if you
want to avoid in memory assumption, which i assume you do)..

a perhaps more efficient implementation uses an aggregator. it is not hard
to adapt algebirds topk aggregator (spacesaver) to use as a spark
aggregator. this requires a simple adapter class. we do this in-house as
well. although i have to say i would recommend spark 2.1.0 for this. spark
2.0.x aggregator codegen is too buggy in my experience.

On Tue, Jan 3, 2017 at 2:09 PM, Andy Dang  wrote:

> Hi Austin,
>
> It's trivial to implement top-k in the RDD world - however I would like to
> stay in the Dataset API world instead of flip-flopping between the two APIs
> (consistency, wholestage codegen etc).
>
> The twitter library appears to support only RDD, and the solution you gave
> me is very similar to what I did - it doesn't work very well with skewed
> dataset :) (it has to perform the sort to work out the row number).
>
> I've been toying with the UDAF idea, but the more I write the code the
> more I see myself digging deeper into the developer API land  - not very
> ideal to be honest. Also, UDAF doesn't have any concept of sorting, so it
> gets messy really fast.
>
> ---
> Regards,
> Andy
>
> On Tue, Jan 3, 2017 at 6:59 PM, HENSLEE, AUSTIN L  wrote:
>
>> Andy,
>>
>>
>>
>> You might want to also checkout the Algebird libraries from Twitter. They
>> have topK and a lot of other helpful functions. I’ve used the Algebird topk
>> successfully on very large data sets.
>>
>>
>>
>> You can also use Spark SQL to do a “poor man’s” topK. This depends on how
>> scrupulous you are about your TopKs (I can expound on this, if needed).
>>
>>
>>
>> I obfuscated the field names, before pasting this into email – I think I
>> got them all consistently.
>>
>>
>>
>> Here’s the meat of the TopK part (found on SO, but I don’t have a
>> reference) – this one takes the top 4, hence “rowNum <= 4”:
>>
>>
>>
>> SELECT time_bucket,
>>
>>identifier1,
>>
>>identifier2,
>>
>>incomingCount
>>
>>   FROM (select time_bucket,
>>
>> identifier1,
>>
>> identifier2,
>>
>> incomingCount,
>>
>>ROW_NUMBER() OVER (PARTITION BY time_bucket,
>>
>>identifier1
>>
>>   ORDER BY count DESC) as rowNum
>>
>>   FROM tablename) tmp
>>
>>   WHERE rowNum <=4
>>
>>   ORDER BY time_bucket, identifier1, rowNum
>>
>>
>>
>> The count and order by:
>>
>>
>>
>>
>>
>> SELECT time_bucket,
>>
>>identifier1,
>>
>>identifier2,
>>
>>count(identifier2) as myCount
>>
>>   FROM table
>>
>>   GROUP BY time_bucket,
>>
>>identifier1,
>>
>>identifier2
>>
>>   ORDER BY time_bucket,
>>
>>identifier1,
>>
>>count(identifier2) DESC
>>
>>
>>
>>
>>
>> *From: *Andy Dang 
>> *Date: *Tuesday, January 3, 2017 at 7:06 AM
>> *To: *user 
>> *Subject: *top-k function for Window
>>
>>
>>
>> Hi all,
>>
>>
>>
>> What's the best way to do top-k with Windowing in Dataset world?
>>
>>
>>
>> I have a snippet of code that filters the data to the top-k, but with
>> skewed keys:
>>
>>
>>
>> val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime)
>>
>> val rank = row_number().over(windowSpec)
>>
>>
>>
>> input.withColumn("rank", rank).filter("rank <= 10").drop("rank")
>>
>>
>>
>> The problem with this code is that Spark doesn't know that it can sort
>> the data locally, get the local rank first. What it ends up doing is
>> performing a sort by key using the skewed keys, and this blew up the
>> cluster since the keys are heavily skewed.
>>
>>
>>
>> In the RDD world we can do something like:
>>
>> rdd.mapPartitioins(iterator -> topK(iterator))
>>
>> but I can't really think of an obvious to do this in the Dataset API,
>> especially with Window function. I guess some UserAggregateFunction would
>> do, but I wonder if there's obvious way that I missed.
>>
>>
>>
>> ---
>> Regards,
>> Andy
>>
>
>


Re: top-k function for Window

2017-01-03 Thread Andy Dang
Hi Austin,

It's trivial to implement top-k in the RDD world - however I would like to
stay in the Dataset API world instead of flip-flopping between the two APIs
(consistency, wholestage codegen etc).

The twitter library appears to support only RDD, and the solution you gave
me is very similar to what I did - it doesn't work very well with skewed
dataset :) (it has to perform the sort to work out the row number).

I've been toying with the UDAF idea, but the more I write the code the more
I see myself digging deeper into the developer API land  - not very ideal
to be honest. Also, UDAF doesn't have any concept of sorting, so it gets
messy really fast.

---
Regards,
Andy

On Tue, Jan 3, 2017 at 6:59 PM, HENSLEE, AUSTIN L  wrote:

> Andy,
>
>
>
> You might want to also checkout the Algebird libraries from Twitter. They
> have topK and a lot of other helpful functions. I’ve used the Algebird topk
> successfully on very large data sets.
>
>
>
> You can also use Spark SQL to do a “poor man’s” topK. This depends on how
> scrupulous you are about your TopKs (I can expound on this, if needed).
>
>
>
> I obfuscated the field names, before pasting this into email – I think I
> got them all consistently.
>
>
>
> Here’s the meat of the TopK part (found on SO, but I don’t have a
> reference) – this one takes the top 4, hence “rowNum <= 4”:
>
>
>
> SELECT time_bucket,
>
>identifier1,
>
>identifier2,
>
>incomingCount
>
>   FROM (select time_bucket,
>
> identifier1,
>
> identifier2,
>
> incomingCount,
>
>ROW_NUMBER() OVER (PARTITION BY time_bucket,
>
>identifier1
>
>   ORDER BY count DESC) as rowNum
>
>   FROM tablename) tmp
>
>   WHERE rowNum <=4
>
>   ORDER BY time_bucket, identifier1, rowNum
>
>
>
> The count and order by:
>
>
>
>
>
> SELECT time_bucket,
>
>identifier1,
>
>identifier2,
>
>count(identifier2) as myCount
>
>   FROM table
>
>   GROUP BY time_bucket,
>
>identifier1,
>
>identifier2
>
>   ORDER BY time_bucket,
>
>identifier1,
>
>count(identifier2) DESC
>
>
>
>
>
> *From: *Andy Dang 
> *Date: *Tuesday, January 3, 2017 at 7:06 AM
> *To: *user 
> *Subject: *top-k function for Window
>
>
>
> Hi all,
>
>
>
> What's the best way to do top-k with Windowing in Dataset world?
>
>
>
> I have a snippet of code that filters the data to the top-k, but with
> skewed keys:
>
>
>
> val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime)
>
> val rank = row_number().over(windowSpec)
>
>
>
> input.withColumn("rank", rank).filter("rank <= 10").drop("rank")
>
>
>
> The problem with this code is that Spark doesn't know that it can sort the
> data locally, get the local rank first. What it ends up doing is performing
> a sort by key using the skewed keys, and this blew up the cluster since the
> keys are heavily skewed.
>
>
>
> In the RDD world we can do something like:
>
> rdd.mapPartitioins(iterator -> topK(iterator))
>
> but I can't really think of an obvious to do this in the Dataset API,
> especially with Window function. I guess some UserAggregateFunction would
> do, but I wonder if there's obvious way that I missed.
>
>
>
> ---
> Regards,
> Andy
>


Re: top-k function for Window

2017-01-03 Thread HENSLEE, AUSTIN L
Andy,

You might want to also checkout the Algebird libraries from Twitter. They have 
topK and a lot of other helpful functions. I’ve used the Algebird topk 
successfully on very large data sets.

You can also use Spark SQL to do a “poor man’s” topK. This depends on how 
scrupulous you are about your TopKs (I can expound on this, if needed).

I obfuscated the field names, before pasting this into email – I think I got 
them all consistently.

Here’s the meat of the TopK part (found on SO, but I don’t have a reference) – 
this one takes the top 4, hence “rowNum <= 4”:

SELECT time_bucket,
   identifier1,
   identifier2,
   incomingCount
  FROM (select time_bucket,
identifier1,
identifier2,
incomingCount,
   ROW_NUMBER() OVER (PARTITION BY time_bucket,
   identifier1
  ORDER BY count DESC) as rowNum
  FROM tablename) tmp
  WHERE rowNum <=4
  ORDER BY time_bucket, identifier1, rowNum

The count and order by:


SELECT time_bucket,
   identifier1,
   identifier2,
   count(identifier2) as myCount
  FROM table
  GROUP BY time_bucket,
   identifier1,
   identifier2
  ORDER BY time_bucket,
   identifier1,
   count(identifier2) DESC


From: Andy Dang 
Date: Tuesday, January 3, 2017 at 7:06 AM
To: user 
Subject: top-k function for Window

Hi all,

What's the best way to do top-k with Windowing in Dataset world?

I have a snippet of code that filters the data to the top-k, but with skewed 
keys:

val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime)
val rank = row_number().over(windowSpec)

input.withColumn("rank", rank).filter("rank <= 10").drop("rank")

The problem with this code is that Spark doesn't know that it can sort the data 
locally, get the local rank first. What it ends up doing is performing a sort 
by key using the skewed keys, and this blew up the cluster since the keys are 
heavily skewed.

In the RDD world we can do something like:
rdd.mapPartitioins(iterator -> topK(iterator))
but I can't really think of an obvious to do this in the Dataset API, 
especially with Window function. I guess some UserAggregateFunction would do, 
but I wonder if there's obvious way that I missed.

---
Regards,
Andy


Re: top-k function for Window

2017-01-03 Thread Andy Dang
> Furthermore, in your example you don’t even need a window function, you
can simply use groupby and explode

Can you clarify? You need to sort somehow (be it map-side sorting or
reduce-side sorting).



---
Regards,
Andy

On Tue, Jan 3, 2017 at 2:07 PM, Mendelson, Assaf 
wrote:

> You can write a UDAF in which the buffer contains the top K and manage it.
> This means you don’t need to sort at all. Furthermore, in your example you
> don’t even need a window function, you can simply use groupby and explode.
>
> Of course, this is only relevant if k is small…
>
>
>
> *From:* Andy Dang [mailto:nam...@gmail.com]
> *Sent:* Tuesday, January 03, 2017 3:07 PM
> *To:* user
> *Subject:* top-k function for Window
>
>
>
> Hi all,
>
>
>
> What's the best way to do top-k with Windowing in Dataset world?
>
>
>
> I have a snippet of code that filters the data to the top-k, but with
> skewed keys:
>
>
>
> val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime)
>
> val rank = row_number().over(windowSpec)
>
>
>
> input.withColumn("rank", rank).filter("rank <= 10").drop("rank")
>
>
>
> The problem with this code is that Spark doesn't know that it can sort the
> data locally, get the local rank first. What it ends up doing is performing
> a sort by key using the skewed keys, and this blew up the cluster since the
> keys are heavily skewed.
>
>
>
> In the RDD world we can do something like:
>
> rdd.mapPartitioins(iterator -> topK(iterator))
>
> but I can't really think of an obvious to do this in the Dataset API,
> especially with Window function. I guess some UserAggregateFunction would
> do, but I wonder if there's obvious way that I missed.
>
>
>
> ---
> Regards,
> Andy
>


RE: top-k function for Window

2017-01-03 Thread Mendelson, Assaf
You can write a UDAF in which the buffer contains the top K and manage it. This 
means you don’t need to sort at all. Furthermore, in your example you don’t 
even need a window function, you can simply use groupby and explode.
Of course, this is only relevant if k is small…

From: Andy Dang [mailto:nam...@gmail.com]
Sent: Tuesday, January 03, 2017 3:07 PM
To: user
Subject: top-k function for Window

Hi all,

What's the best way to do top-k with Windowing in Dataset world?

I have a snippet of code that filters the data to the top-k, but with skewed 
keys:

val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime)
val rank = row_number().over(windowSpec)

input.withColumn("rank", rank).filter("rank <= 10").drop("rank")

The problem with this code is that Spark doesn't know that it can sort the data 
locally, get the local rank first. What it ends up doing is performing a sort 
by key using the skewed keys, and this blew up the cluster since the keys are 
heavily skewed.

In the RDD world we can do something like:
rdd.mapPartitioins(iterator -> topK(iterator))
but I can't really think of an obvious to do this in the Dataset API, 
especially with Window function. I guess some UserAggregateFunction would do, 
but I wonder if there's obvious way that I missed.

---
Regards,
Andy