You can try something like:

*val top10 = your_stream.mapPartitions(rdd => rdd.take(10))*


Thanks
Best Regards

On Mon, Jan 5, 2015 at 11:08 PM, Laeeq Ahmed <laeeqsp...@yahoo.com.invalid>
wrote:

> Hi,
>
> I am counting values in each window and find the top values and want to
> save only the top 10 frequent values of each window to hdfs rather than all
> the values.
>
> *eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a)
> -> 1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2)*
> *val counts = eegStreams(a).map(x => (math.round(x.toDouble),
> 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(4), Seconds(4))*
> *val sortedCounts = counts.map(_.swap).transform(rdd =>
> rdd.sortByKey(false)).map(_.swap)*
> *//sortedCounts.foreachRDD(rdd =>println("\nTop 10 amplitudes:\n" +
> rdd.take(10).mkString("\n")))*
> *sortedCounts.map(tuple => "%s,%s".format(tuple._1,
> tuple._2)).saveAsTextFiles("hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/
> <http://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/>"
> + (a+1))*
>
> I can print top 10 as above in red.
>
> I have also tried
>
> *sortedCounts.foreachRDD{ rdd =>
> ssc.sparkContext.parallelize(rdd.take(10)).saveAsTextFile("hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/
> <http://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/>"
> + (a+1))} *
>
> but I get the following error.
>
> *15/01/05 17:12:23 ERROR actor.OneForOneStrategy:
> org.apache.spark.streaming.StreamingContext*
> *java.io.NotSerializableException:
> org.apache.spark.streaming.StreamingContext*
>
> Regards,
> Laeeq
>
>

Reply via email to