Hi,
It worked out as this.
val topCounts = sortedCounts.transform(rdd => rdd.zipWithIndex().filter(x=>x._2 
<=10))

Regards,Laeeq 

     On Wednesday, January 7, 2015 5:25 PM, Laeeq Ahmed 
<laeeqsp...@yahoo.com.INVALID> wrote:
   

 Hi Yana,
I also think thatval top10 = your_stream.mapPartitions(rdd => rdd.take(10))


will give top 10 from each partition. I will try your code.
Regards,Laeeq 

     On Wednesday, January 7, 2015 5:19 PM, Yana Kadiyska 
<yana.kadiy...@gmail.com> wrote:
   

 My understanding is that 
val top10 = your_stream.mapPartitions(rdd => rdd.take(10))
would result in an RDD containing the top 10 entries per partition -- am I 
wrong?
I am not sure if there is a more efficient way but I think this would work:
sortedCounts.zipWithIndex().filter(x=>x._2 <=10).saveAsText....

On Wed, Jan 7, 2015 at 10:38 AM, Laeeq Ahmed <laeeqsp...@yahoo.com.invalid> 
wrote:

Hi,
I applied it as fallows:
   eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a) -> 
1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2)val counts = eegStreams(a).map(x 
=> math.round(x.toDouble)).countByValueAndWindow(Seconds(4), Seconds(4))
val sortedCounts = counts.map(_.swap).transform(rdd => 
rdd.sortByKey(false)).map(_.swap)val topCounts = 
sortedCounts.mapPartitions(rdd=>rdd.take(10))
//val topCounts = sortedCounts.transform(rdd => 
ssc.sparkContext.makeRDD(rdd.take(10)))topCounts.map(tuple => 
"%s,%s".format(tuple._1, 
tuple._2)).saveAsTextFiles("hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/"
 + (a+1))        topCounts.print()
It gives the output with 10 extra values. I think it works on partition of each 
rdd rather than just rdd. I also tried the commented code. It gives correct 
result but in the start it gives serialisation error 
ERROR actor.OneForOneStrategy: 
org.apache.spark.streaming.StreamingContextjava.io.NotSerializableException: 
org.apache.spark.streaming.StreamingContext
Output for code in red: The values in green looks extra to me.
0,578-3,5764,559-1,5563,553-6,5406,538-4,5351,52610,48394,8-113,8-137,8-85,8-91,8-121,8114,8108,893,8101,81,128-8,1183,112-4,110-13,1084,108-3,107-10,107-6,1068,10576,674,660,652,670,671,6-60,655,678,564,5
and so on.
Regards,Laeeq
 

     On Tuesday, January 6, 2015 9:06 AM, Akhil Das 
<ak...@sigmoidanalytics.com> wrote:
   

 You can try something like:

val top10 = your_stream.mapPartitions(rdd => rdd.take(10))

ThanksBest Regards
On Mon, Jan 5, 2015 at 11:08 PM, Laeeq Ahmed <laeeqsp...@yahoo.com.invalid> 
wrote:

Hi,
I am counting values in each window and find the top values and want to save 
only the top 10 frequent values of each window to hdfs rather than all the 
values.
eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a) -> 
1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2)val counts = eegStreams(a).map(x 
=> (math.round(x.toDouble), 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(4), 
Seconds(4))val sortedCounts = counts.map(_.swap).transform(rdd => 
rdd.sortByKey(false)).map(_.swap)//sortedCounts.foreachRDD(rdd =>println("\nTop 
10 amplitudes:\n" + rdd.take(10).mkString("\n")))sortedCounts.map(tuple => 
"%s,%s".format(tuple._1, 
tuple._2)).saveAsTextFiles("hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/"
 + (a+1))

I can print top 10 as above in red.
I have also tried 
sortedCounts.foreachRDD{ rdd => 
ssc.sparkContext.parallelize(rdd.take(10)).saveAsTextFile("hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/"
 + (a+1))} 

but I get the following error.
15/01/05 17:12:23 ERROR actor.OneForOneStrategy: 
org.apache.spark.streaming.StreamingContextjava.io.NotSerializableException: 
org.apache.spark.streaming.StreamingContext
Regards,Laeeq   



    



    

   

Reply via email to