Nevermind, seems like an executor level mutable map is not recommended as
stated in
http://blog.guillaume-pitel.fr/2015/06/spark-trick-shot-node-centered-aggregator/

On Sun, Feb 21, 2016 at 9:54 AM, Vinti Maheshwari <vinti.u...@gmail.com>
wrote:

> Thanks for your reply Jatin. I changed my parsing logic to what you
> suggested:
>
>     def parseCoverageLine(str: String) = {
>       val arr = str.split(",")
>       ...
>       ...
>       (arr(0), arr(1) :: count.toList)  // (test, [file, 1, 1, 2])
>     }
>
> Then in the grouping, can i use a global hash map per executor / partition
> to aggregate the results?
>
> val globalMap:[String: List[Int]] = Map()
> val coverageDStream = inputStream.map(parseCoverageLine)
>     coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => {
>     // if exists in global map, append result else add new key
>
>     // globalMap
>     // { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] }
> })
>
> Thanks,
> Vinti
>
> On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar <jku...@rocketfuelinc.com>
> wrote:
>
>> Hello Vinti,
>>
>> One way to get this done is you split your input line into key and value
>> tuple and then you can simply use groupByKey and handle the values the way
>> you want. For example:
>>
>> Assuming you have already split the values into a 5 tuple:
>> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>>          .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
>> r2._3))
>>
>> I hope that helps.
>>
>> --
>> Thanks
>> Jatin Kumar | Rocket Scientist
>> +91-7696741743 m
>>
>> On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari <vinti.u...@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> I have input lines like below
>>>
>>> *Input*
>>> t1, file1, 1, 1, 1
>>> t1, file1, 1, 2, 3
>>> t1, file2, 2, 2, 2, 2
>>> t2, file1, 5, 5, 5
>>> t2, file2, 1, 1, 2, 2
>>>
>>> and i want to achieve the output like below rows which is a vertical
>>> addition of the corresponding numbers.
>>>
>>> *Output*
>>> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ]
>>> “file2” : [ 2+1, 2+1, 2+2, 2+2 ]
>>>
>>> I am in a spark streaming context and i am having a hard time trying to
>>> figure out the way to group by file name.
>>>
>>> It seems like i will need to use something like below, i am not sure how
>>> to get to the correct syntax. Any inputs will be helpful.
>>>
>>> myDStream.foreachRDD(rdd => rdd.groupBy())
>>>
>>> I know how to do the vertical sum of array of given numbers, but i am
>>> not sure how to feed that function to the group by.
>>>
>>>   def compute_counters(counts : ArrayBuffer[List[Int]]) = {
>>>       counts.toList.transpose.map(_.sum)
>>>   }
>>>
>>> ~Thanks,
>>> Vinti
>>>
>>
>>
>

Reply via email to