You can try something like: *val top10 = your_stream.mapPartitions(rdd => rdd.take(10))*
Thanks Best Regards On Mon, Jan 5, 2015 at 11:08 PM, Laeeq Ahmed <laeeqsp...@yahoo.com.invalid> wrote: > Hi, > > I am counting values in each window and find the top values and want to > save only the top 10 frequent values of each window to hdfs rather than all > the values. > > *eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a) > -> 1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2)* > *val counts = eegStreams(a).map(x => (math.round(x.toDouble), > 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(4), Seconds(4))* > *val sortedCounts = counts.map(_.swap).transform(rdd => > rdd.sortByKey(false)).map(_.swap)* > *//sortedCounts.foreachRDD(rdd =>println("\nTop 10 amplitudes:\n" + > rdd.take(10).mkString("\n")))* > *sortedCounts.map(tuple => "%s,%s".format(tuple._1, > tuple._2)).saveAsTextFiles("hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/ > <http://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/>" > + (a+1))* > > I can print top 10 as above in red. > > I have also tried > > *sortedCounts.foreachRDD{ rdd => > ssc.sparkContext.parallelize(rdd.take(10)).saveAsTextFile("hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/ > <http://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/>" > + (a+1))} * > > but I get the following error. > > *15/01/05 17:12:23 ERROR actor.OneForOneStrategy: > org.apache.spark.streaming.StreamingContext* > *java.io.NotSerializableException: > org.apache.spark.streaming.StreamingContext* > > Regards, > Laeeq > >