My understanding is that

*val top10 = your_stream.mapPartitions(rdd => rdd.take(10))*

would result in an RDD containing the top 10 entries per partition -- am I
wrong?

I am not sure if there is a more efficient way but I think this would work:

*sortedCounts.*zipWithIndex().filter(x=>x._2 <=10).saveAsText....

On Wed, Jan 7, 2015 at 10:38 AM, Laeeq Ahmed <laeeqsp...@yahoo.com.invalid>
wrote:

> Hi,
>
> I applied it as fallows:
>
>    eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group,
> Map(args(a) -> 1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
> val counts = eegStreams(a).map(x =>
> math.round(x.toDouble)).countByValueAndWindow(Seconds(4), Seconds(4))
> val sortedCounts = counts.map(_.swap).transform(rdd =>
> rdd.sortByKey(false)).map(_.swap)
> val topCounts = sortedCounts.mapPartitions(rdd=>rdd.take(10))
> *//val topCounts = sortedCounts.transform(rdd =>
> ssc.sparkContext.makeRDD(rdd.take(10)))*
> topCounts.map(tuple => "%s,%s".format(tuple._1,
> tuple._2)).saveAsTextFiles("hdfs://
> ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/" +
> (a+1))
>         topCounts.print()
>
> It gives the output with 10 extra values. I think it works on partition of
> each rdd rather than just rdd. I also tried the commented code. It gives
> correct result but in the start it gives serialisation error
>
> ERROR actor.OneForOneStrategy: org.apache.spark.streaming.StreamingContext
> java.io.NotSerializableException:
> org.apache.spark.streaming.StreamingContext
>
> Output for code in red: The values in green looks extra to me.
>
> 0,578
> -3,576
> 4,559
> -1,556
> 3,553
> -6,540
> 6,538
> -4,535
> 1,526
> 10,483
> *94,8*
> *-113,8*
> *-137,8*
> *-85,8*
> *-91,8*
> *-121,8*
> *114,8*
> *108,8*
> *93,8*
> *101,8*
> 1,128
> -8,118
> 3,112
> -4,110
> -13,108
> 4,108
> -3,107
> -10,107
> -6,106
> 8,105
> *76,6*
> *74,6*
> *60,6*
> *52,6*
> *70,6*
> *71,6*
> *-60,6*
> *55,6*
> *78,5*
> *64,5*
>
> and so on.
>
> Regards,
> Laeeq
>
>
>
>   On Tuesday, January 6, 2015 9:06 AM, Akhil Das <
> ak...@sigmoidanalytics.com> wrote:
>
>
> You can try something like:
>
> *val top10 = your_stream.mapPartitions(rdd => rdd.take(10))*
>
>
> Thanks
> Best Regards
>
> On Mon, Jan 5, 2015 at 11:08 PM, Laeeq Ahmed <laeeqsp...@yahoo.com.invalid
> > wrote:
>
> Hi,
>
> I am counting values in each window and find the top values and want to
> save only the top 10 frequent values of each window to hdfs rather than all
> the values.
>
> *eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a)
> -> 1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2)*
> *val counts = eegStreams(a).map(x => (math.round(x.toDouble),
> 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(4), Seconds(4))*
> *val sortedCounts = counts.map(_.swap).transform(rdd =>
> rdd.sortByKey(false)).map(_.swap)*
> *//sortedCounts.foreachRDD(rdd =>println("\nTop 10 amplitudes:\n" +
> rdd.take(10).mkString("\n")))*
> *sortedCounts.map(tuple => "%s,%s".format(tuple._1,
> tuple._2)).saveAsTextFiles("hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/
> <http://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/>"
> + (a+1))*
>
> I can print top 10 as above in red.
>
> I have also tried
>
> *sortedCounts.foreachRDD{ rdd =>
> ssc.sparkContext.parallelize(rdd.take(10)).saveAsTextFile("hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/
> <http://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/>"
> + (a+1))} *
>
> but I get the following error.
>
> *15/01/05 17:12:23 ERROR actor.OneForOneStrategy:
> org.apache.spark.streaming.StreamingContext*
> *java.io.NotSerializableException:
> org.apache.spark.streaming.StreamingContext*
>
> Regards,
> Laeeq
>
>
>
>
>
>

Reply via email to