Nevermind, seems like an executor level mutable map is not recommended as stated in http://blog.guillaume-pitel.fr/2015/06/spark-trick-shot-node-centered-aggregator/
On Sun, Feb 21, 2016 at 9:54 AM, Vinti Maheshwari <vinti.u...@gmail.com> wrote: > Thanks for your reply Jatin. I changed my parsing logic to what you > suggested: > > def parseCoverageLine(str: String) = { > val arr = str.split(",") > ... > ... > (arr(0), arr(1) :: count.toList) // (test, [file, 1, 1, 2]) > } > > Then in the grouping, can i use a global hash map per executor / partition > to aggregate the results? > > val globalMap:[String: List[Int]] = Map() > val coverageDStream = inputStream.map(parseCoverageLine) > coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => { > // if exists in global map, append result else add new key > > // globalMap > // { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] } > }) > > Thanks, > Vinti > > On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar <jku...@rocketfuelinc.com> > wrote: > >> Hello Vinti, >> >> One way to get this done is you split your input line into key and value >> tuple and then you can simply use groupByKey and handle the values the way >> you want. For example: >> >> Assuming you have already split the values into a 5 tuple: >> myDStream.map(record => (record._2, (record._3, record_4, record._5)) >> .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 + >> r2._3)) >> >> I hope that helps. >> >> -- >> Thanks >> Jatin Kumar | Rocket Scientist >> +91-7696741743 m >> >> On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari <vinti.u...@gmail.com> >> wrote: >> >>> Hello, >>> >>> I have input lines like below >>> >>> *Input* >>> t1, file1, 1, 1, 1 >>> t1, file1, 1, 2, 3 >>> t1, file2, 2, 2, 2, 2 >>> t2, file1, 5, 5, 5 >>> t2, file2, 1, 1, 2, 2 >>> >>> and i want to achieve the output like below rows which is a vertical >>> addition of the corresponding numbers. >>> >>> *Output* >>> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ] >>> “file2” : [ 2+1, 2+1, 2+2, 2+2 ] >>> >>> I am in a spark streaming context and i am having a hard time trying to >>> figure out the way to group by file name. >>> >>> It seems like i will need to use something like below, i am not sure how >>> to get to the correct syntax. Any inputs will be helpful. >>> >>> myDStream.foreachRDD(rdd => rdd.groupBy()) >>> >>> I know how to do the vertical sum of array of given numbers, but i am >>> not sure how to feed that function to the group by. >>> >>> def compute_counters(counts : ArrayBuffer[List[Int]]) = { >>> counts.toList.transpose.map(_.sum) >>> } >>> >>> ~Thanks, >>> Vinti >>> >> >> >