hi Debasish, I found test code in map translate, would it collect all products too?
+ val sortedProducts = products.toArray.sorted(ord.reverse) Yours, Xuefeng Wu 吴雪峰 敬上 > On 2014年12月2日, at 上午1:33, Debasish Das <debasish.da...@gmail.com> wrote: > > rdd.top collects it on master... > > If you want topk for a key run map / mappartition and use a bounded priority > queue and reducebykey the queues. > > I experimented with topk from algebird and bounded priority queue wrapped > over jpriority queue ( spark default)...bpq is faster > > Code example is here: > > https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-3066 > >> On Dec 1, 2014 6:46 AM, "Xuefeng Wu" <ben...@gmail.com> wrote: >> Hi, I have a problem, it is easy in Scala code, but I can not take the top N >> from RDD as RDD. >> >> >> There are 10000 Student Score, ask take top 10 age, and then take top 10 >> from each age, the result is 100 records. >> >> The Scala code is here, but how can I do it in RDD, for RDD.take return is >> Array, but other RDD. >> >> example Scala code: >> import scala.util.Random >> >> case class StudentScore(age: Int, num: Int, score: Int, name: Int) >> >> val scores = for { >> i <- 1 to 10000 >> } yield { >> StudentScore(Random.nextInt(100), Random.nextInt(100), Random.nextInt(), >> Random.nextInt()) >> } >> >> >> def takeTop(scores: Seq[StudentScore], byKey: StudentScore => Int): >> Seq[(Int, Seq[StudentScore])] = { >> val groupedScore = scores.groupBy(byKey) >> .map{case (_, _scores) => >> (_scores.foldLeft(0)((acc, v) => acc + v.score), _scores)}.toSeq >> groupedScore.sortBy(_._1).take(10) >> } >> >> val topScores = for { >> (_, ageScores) <- takeTop(scores, _.age) >> (_, numScores) <- takeTop(ageScores, _.num) >> } yield { >> numScores >> } >> >> topScores.size >> >> -- >> >> ~Yours, Xuefeng Wu/吴雪峰 敬上