Yeah, i tried with reduceByKey, was able to do it. Thanks Ayan and Jatin.
For reference, final solution:

def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("HBaseStream")
    val sc = new SparkContext(conf)
    // create a StreamingContext, the main entry point for all
streaming functionality
    val ssc = new StreamingContext(sc, Seconds(2))
    val inputStream = ssc.socketTextStream("hostname", 9999)
    val parsedDstream = inputStream
      .map(line => {
        val splitLines = line.split(",")
        (splitLines(1), splitLines.slice(2,
splitLines.length).map(_.trim.toInt))
      })
      .reduceByKey((first, second) => {
        val listOfArrays = ArrayBuffer(first, second)
        listOfArrays.toList.transpose.map(_.sum).toArray
      })
      .foreachRDD(rdd => rdd.foreach(Blaher.blah))

}


Regards,
Vinti

On Sun, Feb 21, 2016 at 2:22 PM, ayan guha <guha.a...@gmail.com> wrote:

> I believe the best way would be to use reduceByKey operation.
>
> On Mon, Feb 22, 2016 at 5:10 AM, Jatin Kumar <
> jku...@rocketfuelinc.com.invalid> wrote:
>
>> You will need to do a collect and update a global map if you want to.
>>
>> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>>          .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
>> r2._3))
>>          .foreachRDD(rdd => {
>>            rdd.collect().foreach((fileName, valueTuple) => <update global
>> map here>)
>>          })
>>
>> --
>> Thanks
>> Jatin Kumar | Rocket Scientist
>> +91-7696741743 m
>>
>> On Sun, Feb 21, 2016 at 11:30 PM, Vinti Maheshwari <vinti.u...@gmail.com>
>> wrote:
>>
>>> Nevermind, seems like an executor level mutable map is not recommended
>>> as stated in
>>> http://blog.guillaume-pitel.fr/2015/06/spark-trick-shot-node-centered-aggregator/
>>>
>>> On Sun, Feb 21, 2016 at 9:54 AM, Vinti Maheshwari <vinti.u...@gmail.com>
>>> wrote:
>>>
>>>> Thanks for your reply Jatin. I changed my parsing logic to what you
>>>> suggested:
>>>>
>>>>     def parseCoverageLine(str: String) = {
>>>>       val arr = str.split(",")
>>>>       ...
>>>>       ...
>>>>       (arr(0), arr(1) :: count.toList)  // (test, [file, 1, 1, 2])
>>>>     }
>>>>
>>>> Then in the grouping, can i use a global hash map per executor /
>>>> partition to aggregate the results?
>>>>
>>>> val globalMap:[String: List[Int]] = Map()
>>>> val coverageDStream = inputStream.map(parseCoverageLine)
>>>>     coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => {
>>>>     // if exists in global map, append result else add new key
>>>>
>>>>     // globalMap
>>>>     // { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] }
>>>> })
>>>>
>>>> Thanks,
>>>> Vinti
>>>>
>>>> On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar <jku...@rocketfuelinc.com>
>>>> wrote:
>>>>
>>>>> Hello Vinti,
>>>>>
>>>>> One way to get this done is you split your input line into key and
>>>>> value tuple and then you can simply use groupByKey and handle the values
>>>>> the way you want. For example:
>>>>>
>>>>> Assuming you have already split the values into a 5 tuple:
>>>>> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>>>>>          .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3
>>>>> + r2._3))
>>>>>
>>>>> I hope that helps.
>>>>>
>>>>> --
>>>>> Thanks
>>>>> Jatin Kumar | Rocket Scientist
>>>>> +91-7696741743 m
>>>>>
>>>>> On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari <
>>>>> vinti.u...@gmail.com> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I have input lines like below
>>>>>>
>>>>>> *Input*
>>>>>> t1, file1, 1, 1, 1
>>>>>> t1, file1, 1, 2, 3
>>>>>> t1, file2, 2, 2, 2, 2
>>>>>> t2, file1, 5, 5, 5
>>>>>> t2, file2, 1, 1, 2, 2
>>>>>>
>>>>>> and i want to achieve the output like below rows which is a vertical
>>>>>> addition of the corresponding numbers.
>>>>>>
>>>>>> *Output*
>>>>>> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ]
>>>>>> “file2” : [ 2+1, 2+1, 2+2, 2+2 ]
>>>>>>
>>>>>> I am in a spark streaming context and i am having a hard time trying
>>>>>> to figure out the way to group by file name.
>>>>>>
>>>>>> It seems like i will need to use something like below, i am not sure
>>>>>> how to get to the correct syntax. Any inputs will be helpful.
>>>>>>
>>>>>> myDStream.foreachRDD(rdd => rdd.groupBy())
>>>>>>
>>>>>> I know how to do the vertical sum of array of given numbers, but i am
>>>>>> not sure how to feed that function to the group by.
>>>>>>
>>>>>>   def compute_counters(counts : ArrayBuffer[List[Int]]) = {
>>>>>>       counts.toList.transpose.map(_.sum)
>>>>>>   }
>>>>>>
>>>>>> ~Thanks,
>>>>>> Vinti
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>

Reply via email to