I'm glad you solved this issue but have a followup question for you. Wouldn't Akhil's solution be better for you after all? I run similar computation where a large set of data gets reduced to a much smaller aggregate in an interval. If you do saveAsText without coalescing, I believe you'd get the same number of files as you have partitions. So in a worse case scenario, you'd end up with 10 partitions(if each item in your top 10 was in a different partition) and thus, 10 output files. Seems to me this would be horrible for subsequent processing of these (as in, this is small-files in HDFS at its worst). But even with coalesce 1 you'd get 1 pretty small file on every interval
I'm curious what you think because everytime I come to a situation like this I end up using a store that supports appends (some sort of DB) but it's more code/work. So I'm curious on your experience of saving to files (unless you have a separate process to merge these chunks, of course) On Wed, Jan 7, 2015 at 11:56 AM, Laeeq Ahmed <laeeqsp...@yahoo.com> wrote: > Hi, > > It worked out as this. > > val topCounts = sortedCounts.transform(rdd => > rdd.zipWithIndex().filter(x=>x._2 <=10)) > > Regards, > Laeeq > > > On Wednesday, January 7, 2015 5:25 PM, Laeeq Ahmed > <laeeqsp...@yahoo.com.INVALID> wrote: > > > Hi Yana, > > I also think that > *val top10 = your_stream.mapPartitions(rdd => rdd.take(10))* > > > will give top 10 from each partition. I will try your code. > > Regards, > Laeeq > > > On Wednesday, January 7, 2015 5:19 PM, Yana Kadiyska < > yana.kadiy...@gmail.com> wrote: > > > My understanding is that > > *val top10 = your_stream.mapPartitions(rdd => rdd.take(10))* > > would result in an RDD containing the top 10 entries per partition -- am I > wrong? > > I am not sure if there is a more efficient way but I think this would work: > > *sortedCounts.*zipWithIndex().filter(x=>x._2 <=10).saveAsText.... > > On Wed, Jan 7, 2015 at 10:38 AM, Laeeq Ahmed <laeeqsp...@yahoo.com.invalid > > wrote: > > Hi, > > I applied it as fallows: > > eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, > Map(args(a) -> 1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2) > val counts = eegStreams(a).map(x => > math.round(x.toDouble)).countByValueAndWindow(Seconds(4), Seconds(4)) > val sortedCounts = counts.map(_.swap).transform(rdd => > rdd.sortByKey(false)).map(_.swap) > val topCounts = sortedCounts.mapPartitions(rdd=>rdd.take(10)) > *//val topCounts = sortedCounts.transform(rdd => > ssc.sparkContext.makeRDD(rdd.take(10)))* > topCounts.map(tuple => "%s,%s".format(tuple._1, > tuple._2)).saveAsTextFiles("hdfs:// > ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/" + > (a+1)) > topCounts.print() > > It gives the output with 10 extra values. I think it works on partition of > each rdd rather than just rdd. I also tried the commented code. It gives > correct result but in the start it gives serialisation error > > ERROR actor.OneForOneStrategy: org.apache.spark.streaming.StreamingContext > java.io.NotSerializableException: > org.apache.spark.streaming.StreamingContext > > Output for code in red: The values in green looks extra to me. > > 0,578 > -3,576 > 4,559 > -1,556 > 3,553 > -6,540 > 6,538 > -4,535 > 1,526 > 10,483 > *94,8* > *-113,8* > *-137,8* > *-85,8* > *-91,8* > *-121,8* > *114,8* > *108,8* > *93,8* > *101,8* > 1,128 > -8,118 > 3,112 > -4,110 > -13,108 > 4,108 > -3,107 > -10,107 > -6,106 > 8,105 > *76,6* > *74,6* > *60,6* > *52,6* > *70,6* > *71,6* > *-60,6* > *55,6* > *78,5* > *64,5* > > and so on. > > Regards, > Laeeq > > > > On Tuesday, January 6, 2015 9:06 AM, Akhil Das < > ak...@sigmoidanalytics.com> wrote: > > > You can try something like: > > *val top10 = your_stream.mapPartitions(rdd => rdd.take(10))* > > > Thanks > Best Regards > > On Mon, Jan 5, 2015 at 11:08 PM, Laeeq Ahmed <laeeqsp...@yahoo.com.invalid > > wrote: > > Hi, > > I am counting values in each window and find the top values and want to > save only the top 10 frequent values of each window to hdfs rather than all > the values. > > *eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a) > -> 1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2)* > *val counts = eegStreams(a).map(x => (math.round(x.toDouble), > 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(4), Seconds(4))* > *val sortedCounts = counts.map(_.swap).transform(rdd => > rdd.sortByKey(false)).map(_.swap)* > *//sortedCounts.foreachRDD(rdd =>println("\nTop 10 amplitudes:\n" + > rdd.take(10).mkString("\n")))* > *sortedCounts.map(tuple => "%s,%s".format(tuple._1, > tuple._2)).saveAsTextFiles("hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/ > <http://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/>" > + (a+1))* > > I can print top 10 as above in red. > > I have also tried > > *sortedCounts.foreachRDD{ rdd => > ssc.sparkContext.parallelize(rdd.take(10)).saveAsTextFile("hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/ > <http://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/>" > + (a+1))} * > > but I get the following error. > > *15/01/05 17:12:23 ERROR actor.OneForOneStrategy: > org.apache.spark.streaming.StreamingContext* > *java.io.NotSerializableException: > org.apache.spark.streaming.StreamingContext* > > Regards, > Laeeq > > > > > > > > > > >