Oh yeah. In that case you can simply repartition it into 1 and do mapPartition.
val top10 = mysream.repartition(1).mapPartitions(rdd => rdd.take(10)) On 7 Jan 2015 21:08, "Laeeq Ahmed" <laeeqsp...@yahoo.com> wrote: > Hi, > > I applied it as fallows: > > eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, > Map(args(a) -> 1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2) > val counts = eegStreams(a).map(x => > math.round(x.toDouble)).countByValueAndWindow(Seconds(4), Seconds(4)) > val sortedCounts = counts.map(_.swap).transform(rdd => > rdd.sortByKey(false)).map(_.swap) > val topCounts = sortedCounts.mapPartitions(rdd=>rdd.take(10)) > *//val topCounts = sortedCounts.transform(rdd => > ssc.sparkContext.makeRDD(rdd.take(10)))* > topCounts.map(tuple => "%s,%s".format(tuple._1, > tuple._2)).saveAsTextFiles("hdfs:// > ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/" + > (a+1)) > topCounts.print() > > It gives the output with 10 extra values. I think it works on partition of > each rdd rather than just rdd. I also tried the commented code. It gives > correct result but in the start it gives serialisation error > > ERROR actor.OneForOneStrategy: org.apache.spark.streaming.StreamingContext > java.io.NotSerializableException: > org.apache.spark.streaming.StreamingContext > > Output for code in red: The values in green looks extra to me. > > 0,578 > -3,576 > 4,559 > -1,556 > 3,553 > -6,540 > 6,538 > -4,535 > 1,526 > 10,483 > *94,8* > *-113,8* > *-137,8* > *-85,8* > *-91,8* > *-121,8* > *114,8* > *108,8* > *93,8* > *101,8* > 1,128 > -8,118 > 3,112 > -4,110 > -13,108 > 4,108 > -3,107 > -10,107 > -6,106 > 8,105 > *76,6* > *74,6* > *60,6* > *52,6* > *70,6* > *71,6* > *-60,6* > *55,6* > *78,5* > *64,5* > > and so on. > > Regards, > Laeeq > > > > On Tuesday, January 6, 2015 9:06 AM, Akhil Das < > ak...@sigmoidanalytics.com> wrote: > > > You can try something like: > > *val top10 = your_stream.mapPartitions(rdd => rdd.take(10))* > > > Thanks > Best Regards > > On Mon, Jan 5, 2015 at 11:08 PM, Laeeq Ahmed <laeeqsp...@yahoo.com.invalid > > wrote: > > Hi, > > I am counting values in each window and find the top values and want to > save only the top 10 frequent values of each window to hdfs rather than all > the values. > > *eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a) > -> 1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2)* > *val counts = eegStreams(a).map(x => (math.round(x.toDouble), > 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(4), Seconds(4))* > *val sortedCounts = counts.map(_.swap).transform(rdd => > rdd.sortByKey(false)).map(_.swap)* > *//sortedCounts.foreachRDD(rdd =>println("\nTop 10 amplitudes:\n" + > rdd.take(10).mkString("\n")))* > *sortedCounts.map(tuple => "%s,%s".format(tuple._1, > tuple._2)).saveAsTextFiles("hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/ > <http://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/>" > + (a+1))* > > I can print top 10 as above in red. > > I have also tried > > *sortedCounts.foreachRDD{ rdd => > ssc.sparkContext.parallelize(rdd.take(10)).saveAsTextFile("hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/ > <http://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/>" > + (a+1))} * > > but I get the following error. > > *15/01/05 17:12:23 ERROR actor.OneForOneStrategy: > org.apache.spark.streaming.StreamingContext* > *java.io.NotSerializableException: > org.apache.spark.streaming.StreamingContext* > > Regards, > Laeeq > > > > > >