I'm glad you solved this issue but have a followup question for you.
Wouldn't Akhil's solution be better for you after all? I run similar
computation where a large set of data gets reduced to a much smaller
aggregate in an interval. If you do saveAsText without coalescing, I
believe you'd get the same number of files as you have partitions. So in a
worse case scenario, you'd end up with 10 partitions(if each item in your
top 10 was in a different partition) and thus, 10 output files. Seems to me
this would be horrible for subsequent processing of these (as in, this is
small-files in HDFS at its worst). But even with coalesce 1 you'd get 1
pretty small file on every interval

I'm curious what you think because everytime I come to a situation like
this I end up using a store that supports appends (some sort of DB) but
it's more code/work. So I'm curious on your experience of saving to files
(unless you have a separate process to merge these chunks, of course)

On Wed, Jan 7, 2015 at 11:56 AM, Laeeq Ahmed <laeeqsp...@yahoo.com> wrote:

> Hi,
>
> It worked out as this.
>
> val topCounts = sortedCounts.transform(rdd =>
> rdd.zipWithIndex().filter(x=>x._2 <=10))
>
> Regards,
> Laeeq
>
>
>   On Wednesday, January 7, 2015 5:25 PM, Laeeq Ahmed
> <laeeqsp...@yahoo.com.INVALID> wrote:
>
>
> Hi Yana,
>
> I also think that
> *val top10 = your_stream.mapPartitions(rdd => rdd.take(10))*
>
>
> will give top 10 from each partition. I will try your code.
>
> Regards,
> Laeeq
>
>
>   On Wednesday, January 7, 2015 5:19 PM, Yana Kadiyska <
> yana.kadiy...@gmail.com> wrote:
>
>
> My understanding is that
>
> *val top10 = your_stream.mapPartitions(rdd => rdd.take(10))*
>
> would result in an RDD containing the top 10 entries per partition -- am I
> wrong?
>
> I am not sure if there is a more efficient way but I think this would work:
>
> *sortedCounts.*zipWithIndex().filter(x=>x._2 <=10).saveAsText....
>
> On Wed, Jan 7, 2015 at 10:38 AM, Laeeq Ahmed <laeeqsp...@yahoo.com.invalid
> > wrote:
>
> Hi,
>
> I applied it as fallows:
>
>    eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group,
> Map(args(a) -> 1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
> val counts = eegStreams(a).map(x =>
> math.round(x.toDouble)).countByValueAndWindow(Seconds(4), Seconds(4))
> val sortedCounts = counts.map(_.swap).transform(rdd =>
> rdd.sortByKey(false)).map(_.swap)
> val topCounts = sortedCounts.mapPartitions(rdd=>rdd.take(10))
> *//val topCounts = sortedCounts.transform(rdd =>
> ssc.sparkContext.makeRDD(rdd.take(10)))*
> topCounts.map(tuple => "%s,%s".format(tuple._1,
> tuple._2)).saveAsTextFiles("hdfs://
> ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/" +
> (a+1))
>         topCounts.print()
>
> It gives the output with 10 extra values. I think it works on partition of
> each rdd rather than just rdd. I also tried the commented code. It gives
> correct result but in the start it gives serialisation error
>
> ERROR actor.OneForOneStrategy: org.apache.spark.streaming.StreamingContext
> java.io.NotSerializableException:
> org.apache.spark.streaming.StreamingContext
>
> Output for code in red: The values in green looks extra to me.
>
> 0,578
> -3,576
> 4,559
> -1,556
> 3,553
> -6,540
> 6,538
> -4,535
> 1,526
> 10,483
> *94,8*
> *-113,8*
> *-137,8*
> *-85,8*
> *-91,8*
> *-121,8*
> *114,8*
> *108,8*
> *93,8*
> *101,8*
> 1,128
> -8,118
> 3,112
> -4,110
> -13,108
> 4,108
> -3,107
> -10,107
> -6,106
> 8,105
> *76,6*
> *74,6*
> *60,6*
> *52,6*
> *70,6*
> *71,6*
> *-60,6*
> *55,6*
> *78,5*
> *64,5*
>
> and so on.
>
> Regards,
> Laeeq
>
>
>
>   On Tuesday, January 6, 2015 9:06 AM, Akhil Das <
> ak...@sigmoidanalytics.com> wrote:
>
>
> You can try something like:
>
> *val top10 = your_stream.mapPartitions(rdd => rdd.take(10))*
>
>
> Thanks
> Best Regards
>
> On Mon, Jan 5, 2015 at 11:08 PM, Laeeq Ahmed <laeeqsp...@yahoo.com.invalid
> > wrote:
>
> Hi,
>
> I am counting values in each window and find the top values and want to
> save only the top 10 frequent values of each window to hdfs rather than all
> the values.
>
> *eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a)
> -> 1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2)*
> *val counts = eegStreams(a).map(x => (math.round(x.toDouble),
> 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(4), Seconds(4))*
> *val sortedCounts = counts.map(_.swap).transform(rdd =>
> rdd.sortByKey(false)).map(_.swap)*
> *//sortedCounts.foreachRDD(rdd =>println("\nTop 10 amplitudes:\n" +
> rdd.take(10).mkString("\n")))*
> *sortedCounts.map(tuple => "%s,%s".format(tuple._1,
> tuple._2)).saveAsTextFiles("hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/
> <http://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/>"
> + (a+1))*
>
> I can print top 10 as above in red.
>
> I have also tried
>
> *sortedCounts.foreachRDD{ rdd =>
> ssc.sparkContext.parallelize(rdd.take(10)).saveAsTextFile("hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/
> <http://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/>"
> + (a+1))} *
>
> but I get the following error.
>
> *15/01/05 17:12:23 ERROR actor.OneForOneStrategy:
> org.apache.spark.streaming.StreamingContext*
> *java.io.NotSerializableException:
> org.apache.spark.streaming.StreamingContext*
>
> Regards,
> Laeeq
>
>
>
>
>
>
>
>
>
>
>

Reply via email to