Thanks for your reply Jatin. I changed my parsing logic to what you
suggested:

    def parseCoverageLine(str: String) = {
      val arr = str.split(",")
      ...
      ...
      (arr(0), arr(1) :: count.toList)  // (test, [file, 1, 1, 2])
    }

Then in the grouping, can i use a global hash map per executor / partition
to aggregate the results?

val globalMap:[String: List[Int]] = Map()
val coverageDStream = inputStream.map(parseCoverageLine)
    coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => {
    // if exists in global map, append result else add new key

    // globalMap
    // { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] }
})

Thanks,
Vinti

On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar <jku...@rocketfuelinc.com>
wrote:

> Hello Vinti,
>
> One way to get this done is you split your input line into key and value
> tuple and then you can simply use groupByKey and handle the values the way
> you want. For example:
>
> Assuming you have already split the values into a 5 tuple:
> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>          .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
> r2._3))
>
> I hope that helps.
>
> --
> Thanks
> Jatin Kumar | Rocket Scientist
> +91-7696741743 m
>
> On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari <vinti.u...@gmail.com>
> wrote:
>
>> Hello,
>>
>> I have input lines like below
>>
>> *Input*
>> t1, file1, 1, 1, 1
>> t1, file1, 1, 2, 3
>> t1, file2, 2, 2, 2, 2
>> t2, file1, 5, 5, 5
>> t2, file2, 1, 1, 2, 2
>>
>> and i want to achieve the output like below rows which is a vertical
>> addition of the corresponding numbers.
>>
>> *Output*
>> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ]
>> “file2” : [ 2+1, 2+1, 2+2, 2+2 ]
>>
>> I am in a spark streaming context and i am having a hard time trying to
>> figure out the way to group by file name.
>>
>> It seems like i will need to use something like below, i am not sure how
>> to get to the correct syntax. Any inputs will be helpful.
>>
>> myDStream.foreachRDD(rdd => rdd.groupBy())
>>
>> I know how to do the vertical sum of array of given numbers, but i am not
>> sure how to feed that function to the group by.
>>
>>   def compute_counters(counts : ArrayBuffer[List[Int]]) = {
>>       counts.toList.transpose.map(_.sum)
>>   }
>>
>> ~Thanks,
>> Vinti
>>
>
>

Reply via email to