Thanks for your reply Jatin. I changed my parsing logic to what you suggested:
def parseCoverageLine(str: String) = { val arr = str.split(",") ... ... (arr(0), arr(1) :: count.toList) // (test, [file, 1, 1, 2]) } Then in the grouping, can i use a global hash map per executor / partition to aggregate the results? val globalMap:[String: List[Int]] = Map() val coverageDStream = inputStream.map(parseCoverageLine) coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => { // if exists in global map, append result else add new key // globalMap // { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] } }) Thanks, Vinti On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar <jku...@rocketfuelinc.com> wrote: > Hello Vinti, > > One way to get this done is you split your input line into key and value > tuple and then you can simply use groupByKey and handle the values the way > you want. For example: > > Assuming you have already split the values into a 5 tuple: > myDStream.map(record => (record._2, (record._3, record_4, record._5)) > .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 + > r2._3)) > > I hope that helps. > > -- > Thanks > Jatin Kumar | Rocket Scientist > +91-7696741743 m > > On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari <vinti.u...@gmail.com> > wrote: > >> Hello, >> >> I have input lines like below >> >> *Input* >> t1, file1, 1, 1, 1 >> t1, file1, 1, 2, 3 >> t1, file2, 2, 2, 2, 2 >> t2, file1, 5, 5, 5 >> t2, file2, 1, 1, 2, 2 >> >> and i want to achieve the output like below rows which is a vertical >> addition of the corresponding numbers. >> >> *Output* >> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ] >> “file2” : [ 2+1, 2+1, 2+2, 2+2 ] >> >> I am in a spark streaming context and i am having a hard time trying to >> figure out the way to group by file name. >> >> It seems like i will need to use something like below, i am not sure how >> to get to the correct syntax. Any inputs will be helpful. >> >> myDStream.foreachRDD(rdd => rdd.groupBy()) >> >> I know how to do the vertical sum of array of given numbers, but i am not >> sure how to feed that function to the group by. >> >> def compute_counters(counts : ArrayBuffer[List[Int]]) = { >> counts.toList.transpose.map(_.sum) >> } >> >> ~Thanks, >> Vinti >> > >