You can also create a custom aggregation function. It might provide better 
performance than dense_rank.

Consider the following example to collect everything as list:
class CollectListFunction[T](val colType: DataType) extends 
UserDefinedAggregateFunction {

  def inputSchema: StructType =
    new StructType().add("inputCol", colType)

  def bufferSchema: StructType =
    new StructType().add("outputCol", ArrayType(colType))

  def dataType: DataType = ArrayType(colType)

  def deterministic: Boolean = true

  def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer.update(0, new mutable.ArrayBuffer[T])
  }

  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    val list = buffer.getSeq[T](0)
    if (!input.isNullAt(0)) {
      val sales = input.getAs[T](0)
      buffer.update(0, list:+sales)
    }
  }

  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1.update(0, buffer1.getSeq[T](0) ++ buffer2.getSeq[T](0))
  }

  def evaluate(buffer: Row): Any = {
    buffer.getSeq[T](0)
  }
}

All you would need to do is modify it to contain only the top 100…

From: burtonator2...@gmail.com<mailto:burtonator2...@gmail.com> 
[mailto:burtonator2...@gmail.com] On Behalf Of Kevin Burton
Sent: Sunday, September 11, 2016 6:33 AM
To: Karl Higley
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: Selecting the top 100 records per group by?

Looks like you can do it with dense_rank functions.

https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

I setup some basic records and seems like it did the right thing.

Now time to throw 50TB and 100 spark nodes at this problem and see what happens 
:)

On Sat, Sep 10, 2016 at 7:42 PM, Kevin Burton 
<bur...@spinn3r.com<mailto:bur...@spinn3r.com>> wrote:
Ah.. might actually. I'll have to mess around with that.

On Sat, Sep 10, 2016 at 6:06 PM, Karl Higley 
<kmhig...@gmail.com<mailto:kmhig...@gmail.com>> wrote:
Would `topByKey` help?

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala#L42

Best,
Karl

On Sat, Sep 10, 2016 at 9:04 PM Kevin Burton 
<bur...@spinn3r.com<mailto:bur...@spinn3r.com>> wrote:
I'm trying to figure out a way to group by and return the top 100 records in 
that group.

Something like:

SELECT TOP(100, user_id) FROM posts GROUP BY user_id;

But I can't really figure out the best way to do this...

There is a FIRST and LAST aggregate function but this only returns one column.

I could do something like:

SELECT * FROM posts WHERE user_id IN ( /* select top users here */ ) LIMIT 100;

But that limit is applied for ALL the records. Not each individual user.

The only other thing I can think of is to do a manual map reduce and then have 
the reducer only return the top 100 each time...

Would LOVE some advice here...

--
We’re hiring if you know of any awesome Java Devops or Linux Operations 
Engineers!

Founder/CEO Spinn3r.com<http://Spinn3r.com>
Location: San Francisco, CA
blog: http://burtonator.wordpress.com
… or check out my Google+ 
profile<https://plus.google.com/102718274791889610666/posts>




--
We’re hiring if you know of any awesome Java Devops or Linux Operations 
Engineers!

Founder/CEO Spinn3r.com<http://Spinn3r.com>
Location: San Francisco, CA
blog: http://burtonator.wordpress.com
… or check out my Google+ 
profile<https://plus.google.com/102718274791889610666/posts>




--
We’re hiring if you know of any awesome Java Devops or Linux Operations 
Engineers!

Founder/CEO Spinn3r.com<http://Spinn3r.com>
Location: San Francisco, CA
blog: http://burtonator.wordpress.com
… or check out my Google+ 
profile<https://plus.google.com/102718274791889610666/posts>

Reply via email to