>
> Yeah, i tried with reduceByKey, was able to do it. Thanks Ayan and Jatin.
> For reference, final solution:
>
> def main(args: Array[String]): Unit = {
>     val conf = new SparkConf().setAppName("HBaseStream")
>     val sc = new SparkContext(conf)
>     // create a StreamingContext, the main entry point for all streaming 
> functionality
>     val ssc = new StreamingContext(sc, Seconds(2))
>     val inputStream = ssc.socketTextStream("hostname", 9999)
>     val parsedDstream = inputStream
>       .map(line => {
>         val splitLines = line.split(",")
>         (splitLines(1), splitLines.slice(2, 
> splitLines.length).map(_.trim.toInt))
>       })
>       .reduceByKey((first, second) => {
>         val listOfArrays = ArrayBuffer(first, second)
>         listOfArrays.toList.transpose.map(_.sum).toArray
>       })
>       .foreachRDD(rdd => rdd.foreach(Blaher.blah))
>
> }
>
>
> Regards,
> Vinti
>
> On Sun, Feb 21, 2016 at 2:22 PM, ayan guha <guha.a...@gmail.com> wrote:
>
>> I believe the best way would be to use reduceByKey operation.
>>
>> On Mon, Feb 22, 2016 at 5:10 AM, Jatin Kumar <
>> jku...@rocketfuelinc.com.invalid> wrote:
>>
>>> You will need to do a collect and update a global map if you want to.
>>>
>>> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>>>          .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
>>> r2._3))
>>>          .foreachRDD(rdd => {
>>>            rdd.collect().foreach((fileName, valueTuple) => <update
>>> global map here>)
>>>          })
>>>
>>> --
>>> Thanks
>>> Jatin Kumar | Rocket Scientist
>>> +91-7696741743 m
>>>
>>> On Sun, Feb 21, 2016 at 11:30 PM, Vinti Maheshwari <vinti.u...@gmail.com
>>> > wrote:
>>>
>>>> Nevermind, seems like an executor level mutable map is not recommended
>>>> as stated in
>>>> http://blog.guillaume-pitel.fr/2015/06/spark-trick-shot-node-centered-aggregator/
>>>>
>>>> On Sun, Feb 21, 2016 at 9:54 AM, Vinti Maheshwari <vinti.u...@gmail.com
>>>> > wrote:
>>>>
>>>>> Thanks for your reply Jatin. I changed my parsing logic to what you
>>>>> suggested:
>>>>>
>>>>>     def parseCoverageLine(str: String) = {
>>>>>       val arr = str.split(",")
>>>>>       ...
>>>>>       ...
>>>>>       (arr(0), arr(1) :: count.toList)  // (test, [file, 1, 1, 2])
>>>>>     }
>>>>>
>>>>> Then in the grouping, can i use a global hash map per executor /
>>>>> partition to aggregate the results?
>>>>>
>>>>> val globalMap:[String: List[Int]] = Map()
>>>>> val coverageDStream = inputStream.map(parseCoverageLine)
>>>>>     coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => {
>>>>>     // if exists in global map, append result else add new key
>>>>>
>>>>>     // globalMap
>>>>>     // { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] }
>>>>> })
>>>>>
>>>>> Thanks,
>>>>> Vinti
>>>>>
>>>>> On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar <jku...@rocketfuelinc.com
>>>>> > wrote:
>>>>>
>>>>>> Hello Vinti,
>>>>>>
>>>>>> One way to get this done is you split your input line into key and
>>>>>> value tuple and then you can simply use groupByKey and handle the values
>>>>>> the way you want. For example:
>>>>>>
>>>>>> Assuming you have already split the values into a 5 tuple:
>>>>>> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>>>>>>          .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3
>>>>>> + r2._3))
>>>>>>
>>>>>> I hope that helps.
>>>>>>
>>>>>> --
>>>>>> Thanks
>>>>>> Jatin Kumar | Rocket Scientist
>>>>>> +91-7696741743 m
>>>>>>
>>>>>> On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari <
>>>>>> vinti.u...@gmail.com> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> I have input lines like below
>>>>>>>
>>>>>>> *Input*
>>>>>>> t1, file1, 1, 1, 1
>>>>>>> t1, file1, 1, 2, 3
>>>>>>> t1, file2, 2, 2, 2, 2
>>>>>>> t2, file1, 5, 5, 5
>>>>>>> t2, file2, 1, 1, 2, 2
>>>>>>>
>>>>>>> and i want to achieve the output like below rows which is a vertical
>>>>>>> addition of the corresponding numbers.
>>>>>>>
>>>>>>> *Output*
>>>>>>> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ]
>>>>>>> “file2” : [ 2+1, 2+1, 2+2, 2+2 ]
>>>>>>>
>>>>>>> I am in a spark streaming context and i am having a hard time trying
>>>>>>> to figure out the way to group by file name.
>>>>>>>
>>>>>>> It seems like i will need to use something like below, i am not sure
>>>>>>> how to get to the correct syntax. Any inputs will be helpful.
>>>>>>>
>>>>>>> myDStream.foreachRDD(rdd => rdd.groupBy())
>>>>>>>
>>>>>>> I know how to do the vertical sum of array of given numbers, but i
>>>>>>> am not sure how to feed that function to the group by.
>>>>>>>
>>>>>>>   def compute_counters(counts : ArrayBuffer[List[Int]]) = {
>>>>>>>       counts.toList.transpose.map(_.sum)
>>>>>>>   }
>>>>>>>
>>>>>>> ~Thanks,
>>>>>>> Vinti
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>

Reply via email to