Re: Stream group by

2016-02-21 Thread Vinti Maheshwari
>
> Yeah, i tried with reduceByKey, was able to do it. Thanks Ayan and Jatin.
> For reference, final solution:
>
> def main(args: Array[String]): Unit = {
> val conf = new SparkConf().setAppName("HBaseStream")
> val sc = new SparkContext(conf)
> // create a StreamingContext, the main entry point for all streaming 
> functionality
> val ssc = new StreamingContext(sc, Seconds(2))
> val inputStream = ssc.socketTextStream("hostname", )
> val parsedDstream = inputStream
>   .map(line => {
> val splitLines = line.split(",")
> (splitLines(1), splitLines.slice(2, 
> splitLines.length).map(_.trim.toInt))
>   })
>   .reduceByKey((first, second) => {
> val listOfArrays = ArrayBuffer(first, second)
> listOfArrays.toList.transpose.map(_.sum).toArray
>   })
>   .foreachRDD(rdd => rdd.foreach(Blaher.blah))
>
> }
>
>
> Regards,
> Vinti
>
> On Sun, Feb 21, 2016 at 2:22 PM, ayan guha  wrote:
>
>> I believe the best way would be to use reduceByKey operation.
>>
>> On Mon, Feb 22, 2016 at 5:10 AM, Jatin Kumar <
>> jku...@rocketfuelinc.com.invalid> wrote:
>>
>>> You will need to do a collect and update a global map if you want to.
>>>
>>> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>>>  .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
>>> r2._3))
>>>  .foreachRDD(rdd => {
>>>rdd.collect().foreach((fileName, valueTuple) => >> global map here>)
>>>  })
>>>
>>> --
>>> Thanks
>>> Jatin Kumar | Rocket Scientist
>>> +91-7696741743 m
>>>
>>> On Sun, Feb 21, 2016 at 11:30 PM, Vinti Maheshwari >> > wrote:
>>>
 Nevermind, seems like an executor level mutable map is not recommended
 as stated in
 http://blog.guillaume-pitel.fr/2015/06/spark-trick-shot-node-centered-aggregator/

 On Sun, Feb 21, 2016 at 9:54 AM, Vinti Maheshwari  wrote:

> Thanks for your reply Jatin. I changed my parsing logic to what you
> suggested:
>
> def parseCoverageLine(str: String) = {
>   val arr = str.split(",")
>   ...
>   ...
>   (arr(0), arr(1) :: count.toList)  // (test, [file, 1, 1, 2])
> }
>
> Then in the grouping, can i use a global hash map per executor /
> partition to aggregate the results?
>
> val globalMap:[String: List[Int]] = Map()
> val coverageDStream = inputStream.map(parseCoverageLine)
> coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => {
> // if exists in global map, append result else add new key
>
> // globalMap
> // { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] }
> })
>
> Thanks,
> Vinti
>
> On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar  > wrote:
>
>> Hello Vinti,
>>
>> One way to get this done is you split your input line into key and
>> value tuple and then you can simply use groupByKey and handle the values
>> the way you want. For example:
>>
>> Assuming you have already split the values into a 5 tuple:
>> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>>  .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3
>> + r2._3))
>>
>> I hope that helps.
>>
>> --
>> Thanks
>> Jatin Kumar | Rocket Scientist
>> +91-7696741743 m
>>
>> On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari <
>> vinti.u...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I have input lines like below
>>>
>>> *Input*
>>> t1, file1, 1, 1, 1
>>> t1, file1, 1, 2, 3
>>> t1, file2, 2, 2, 2, 2
>>> t2, file1, 5, 5, 5
>>> t2, file2, 1, 1, 2, 2
>>>
>>> and i want to achieve the output like below rows which is a vertical
>>> addition of the corresponding numbers.
>>>
>>> *Output*
>>> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ]
>>> “file2” : [ 2+1, 2+1, 2+2, 2+2 ]
>>>
>>> I am in a spark streaming context and i am having a hard time trying
>>> to figure out the way to group by file name.
>>>
>>> It seems like i will need to use something like below, i am not sure
>>> how to get to the correct syntax. Any inputs will be helpful.
>>>
>>> myDStream.foreachRDD(rdd => rdd.groupBy())
>>>
>>> I know how to do the vertical sum of array of given numbers, but i
>>> am not sure how to feed that function to the group by.
>>>
>>>   def compute_counters(counts : ArrayBuffer[List[Int]]) = {
>>>   counts.toList.transpose.map(_.sum)
>>>   }
>>>
>>> ~Thanks,
>>> Vinti
>>>
>>
>>
>

>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


Re: Stream group by

2016-02-21 Thread Vinti Maheshwari
Yeah, i tried with reduceByKey, was able to do it. Thanks Ayan and Jatin.
For reference, final solution:

def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("HBaseStream")
val sc = new SparkContext(conf)
// create a StreamingContext, the main entry point for all
streaming functionality
val ssc = new StreamingContext(sc, Seconds(2))
val inputStream = ssc.socketTextStream("hostname", )
val parsedDstream = inputStream
  .map(line => {
val splitLines = line.split(",")
(splitLines(1), splitLines.slice(2,
splitLines.length).map(_.trim.toInt))
  })
  .reduceByKey((first, second) => {
val listOfArrays = ArrayBuffer(first, second)
listOfArrays.toList.transpose.map(_.sum).toArray
  })
  .foreachRDD(rdd => rdd.foreach(Blaher.blah))

}


Regards,
Vinti

On Sun, Feb 21, 2016 at 2:22 PM, ayan guha  wrote:

> I believe the best way would be to use reduceByKey operation.
>
> On Mon, Feb 22, 2016 at 5:10 AM, Jatin Kumar <
> jku...@rocketfuelinc.com.invalid> wrote:
>
>> You will need to do a collect and update a global map if you want to.
>>
>> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>>  .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
>> r2._3))
>>  .foreachRDD(rdd => {
>>rdd.collect().foreach((fileName, valueTuple) => > map here>)
>>  })
>>
>> --
>> Thanks
>> Jatin Kumar | Rocket Scientist
>> +91-7696741743 m
>>
>> On Sun, Feb 21, 2016 at 11:30 PM, Vinti Maheshwari 
>> wrote:
>>
>>> Nevermind, seems like an executor level mutable map is not recommended
>>> as stated in
>>> http://blog.guillaume-pitel.fr/2015/06/spark-trick-shot-node-centered-aggregator/
>>>
>>> On Sun, Feb 21, 2016 at 9:54 AM, Vinti Maheshwari 
>>> wrote:
>>>
 Thanks for your reply Jatin. I changed my parsing logic to what you
 suggested:

 def parseCoverageLine(str: String) = {
   val arr = str.split(",")
   ...
   ...
   (arr(0), arr(1) :: count.toList)  // (test, [file, 1, 1, 2])
 }

 Then in the grouping, can i use a global hash map per executor /
 partition to aggregate the results?

 val globalMap:[String: List[Int]] = Map()
 val coverageDStream = inputStream.map(parseCoverageLine)
 coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => {
 // if exists in global map, append result else add new key

 // globalMap
 // { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] }
 })

 Thanks,
 Vinti

 On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar 
 wrote:

> Hello Vinti,
>
> One way to get this done is you split your input line into key and
> value tuple and then you can simply use groupByKey and handle the values
> the way you want. For example:
>
> Assuming you have already split the values into a 5 tuple:
> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>  .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3
> + r2._3))
>
> I hope that helps.
>
> --
> Thanks
> Jatin Kumar | Rocket Scientist
> +91-7696741743 m
>
> On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari <
> vinti.u...@gmail.com> wrote:
>
>> Hello,
>>
>> I have input lines like below
>>
>> *Input*
>> t1, file1, 1, 1, 1
>> t1, file1, 1, 2, 3
>> t1, file2, 2, 2, 2, 2
>> t2, file1, 5, 5, 5
>> t2, file2, 1, 1, 2, 2
>>
>> and i want to achieve the output like below rows which is a vertical
>> addition of the corresponding numbers.
>>
>> *Output*
>> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ]
>> “file2” : [ 2+1, 2+1, 2+2, 2+2 ]
>>
>> I am in a spark streaming context and i am having a hard time trying
>> to figure out the way to group by file name.
>>
>> It seems like i will need to use something like below, i am not sure
>> how to get to the correct syntax. Any inputs will be helpful.
>>
>> myDStream.foreachRDD(rdd => rdd.groupBy())
>>
>> I know how to do the vertical sum of array of given numbers, but i am
>> not sure how to feed that function to the group by.
>>
>>   def compute_counters(counts : ArrayBuffer[List[Int]]) = {
>>   counts.toList.transpose.map(_.sum)
>>   }
>>
>> ~Thanks,
>> Vinti
>>
>
>

>>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: Stream group by

2016-02-21 Thread ayan guha
I believe the best way would be to use reduceByKey operation.

On Mon, Feb 22, 2016 at 5:10 AM, Jatin Kumar <
jku...@rocketfuelinc.com.invalid> wrote:

> You will need to do a collect and update a global map if you want to.
>
> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>  .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
> r2._3))
>  .foreachRDD(rdd => {
>rdd.collect().foreach((fileName, valueTuple) =>  map here>)
>  })
>
> --
> Thanks
> Jatin Kumar | Rocket Scientist
> +91-7696741743 m
>
> On Sun, Feb 21, 2016 at 11:30 PM, Vinti Maheshwari 
> wrote:
>
>> Nevermind, seems like an executor level mutable map is not recommended as
>> stated in
>> http://blog.guillaume-pitel.fr/2015/06/spark-trick-shot-node-centered-aggregator/
>>
>> On Sun, Feb 21, 2016 at 9:54 AM, Vinti Maheshwari 
>> wrote:
>>
>>> Thanks for your reply Jatin. I changed my parsing logic to what you
>>> suggested:
>>>
>>> def parseCoverageLine(str: String) = {
>>>   val arr = str.split(",")
>>>   ...
>>>   ...
>>>   (arr(0), arr(1) :: count.toList)  // (test, [file, 1, 1, 2])
>>> }
>>>
>>> Then in the grouping, can i use a global hash map per executor /
>>> partition to aggregate the results?
>>>
>>> val globalMap:[String: List[Int]] = Map()
>>> val coverageDStream = inputStream.map(parseCoverageLine)
>>> coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => {
>>> // if exists in global map, append result else add new key
>>>
>>> // globalMap
>>> // { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] }
>>> })
>>>
>>> Thanks,
>>> Vinti
>>>
>>> On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar 
>>> wrote:
>>>
 Hello Vinti,

 One way to get this done is you split your input line into key and
 value tuple and then you can simply use groupByKey and handle the values
 the way you want. For example:

 Assuming you have already split the values into a 5 tuple:
 myDStream.map(record => (record._2, (record._3, record_4, record._5))
  .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
 r2._3))

 I hope that helps.

 --
 Thanks
 Jatin Kumar | Rocket Scientist
 +91-7696741743 m

 On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari <
 vinti.u...@gmail.com> wrote:

> Hello,
>
> I have input lines like below
>
> *Input*
> t1, file1, 1, 1, 1
> t1, file1, 1, 2, 3
> t1, file2, 2, 2, 2, 2
> t2, file1, 5, 5, 5
> t2, file2, 1, 1, 2, 2
>
> and i want to achieve the output like below rows which is a vertical
> addition of the corresponding numbers.
>
> *Output*
> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ]
> “file2” : [ 2+1, 2+1, 2+2, 2+2 ]
>
> I am in a spark streaming context and i am having a hard time trying
> to figure out the way to group by file name.
>
> It seems like i will need to use something like below, i am not sure
> how to get to the correct syntax. Any inputs will be helpful.
>
> myDStream.foreachRDD(rdd => rdd.groupBy())
>
> I know how to do the vertical sum of array of given numbers, but i am
> not sure how to feed that function to the group by.
>
>   def compute_counters(counts : ArrayBuffer[List[Int]]) = {
>   counts.toList.transpose.map(_.sum)
>   }
>
> ~Thanks,
> Vinti
>


>>>
>>
>


-- 
Best Regards,
Ayan Guha


Re: Stream group by

2016-02-21 Thread Jatin Kumar
You will need to do a collect and update a global map if you want to.

myDStream.map(record => (record._2, (record._3, record_4, record._5))
 .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
r2._3))
 .foreachRDD(rdd => {
   rdd.collect().foreach((fileName, valueTuple) => )
 })

--
Thanks
Jatin Kumar | Rocket Scientist
+91-7696741743 m

On Sun, Feb 21, 2016 at 11:30 PM, Vinti Maheshwari 
wrote:

> Nevermind, seems like an executor level mutable map is not recommended as
> stated in
> http://blog.guillaume-pitel.fr/2015/06/spark-trick-shot-node-centered-aggregator/
>
> On Sun, Feb 21, 2016 at 9:54 AM, Vinti Maheshwari 
> wrote:
>
>> Thanks for your reply Jatin. I changed my parsing logic to what you
>> suggested:
>>
>> def parseCoverageLine(str: String) = {
>>   val arr = str.split(",")
>>   ...
>>   ...
>>   (arr(0), arr(1) :: count.toList)  // (test, [file, 1, 1, 2])
>> }
>>
>> Then in the grouping, can i use a global hash map per executor /
>> partition to aggregate the results?
>>
>> val globalMap:[String: List[Int]] = Map()
>> val coverageDStream = inputStream.map(parseCoverageLine)
>> coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => {
>> // if exists in global map, append result else add new key
>>
>> // globalMap
>> // { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] }
>> })
>>
>> Thanks,
>> Vinti
>>
>> On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar 
>> wrote:
>>
>>> Hello Vinti,
>>>
>>> One way to get this done is you split your input line into key and value
>>> tuple and then you can simply use groupByKey and handle the values the way
>>> you want. For example:
>>>
>>> Assuming you have already split the values into a 5 tuple:
>>> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>>>  .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
>>> r2._3))
>>>
>>> I hope that helps.
>>>
>>> --
>>> Thanks
>>> Jatin Kumar | Rocket Scientist
>>> +91-7696741743 m
>>>
>>> On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari >> > wrote:
>>>
 Hello,

 I have input lines like below

 *Input*
 t1, file1, 1, 1, 1
 t1, file1, 1, 2, 3
 t1, file2, 2, 2, 2, 2
 t2, file1, 5, 5, 5
 t2, file2, 1, 1, 2, 2

 and i want to achieve the output like below rows which is a vertical
 addition of the corresponding numbers.

 *Output*
 “file1” : [ 1+1+5, 1+2+5, 1+3+5 ]
 “file2” : [ 2+1, 2+1, 2+2, 2+2 ]

 I am in a spark streaming context and i am having a hard time trying to
 figure out the way to group by file name.

 It seems like i will need to use something like below, i am not sure
 how to get to the correct syntax. Any inputs will be helpful.

 myDStream.foreachRDD(rdd => rdd.groupBy())

 I know how to do the vertical sum of array of given numbers, but i am
 not sure how to feed that function to the group by.

   def compute_counters(counts : ArrayBuffer[List[Int]]) = {
   counts.toList.transpose.map(_.sum)
   }

 ~Thanks,
 Vinti

>>>
>>>
>>
>


Re: Stream group by

2016-02-21 Thread Vinti Maheshwari
Nevermind, seems like an executor level mutable map is not recommended as
stated in
http://blog.guillaume-pitel.fr/2015/06/spark-trick-shot-node-centered-aggregator/

On Sun, Feb 21, 2016 at 9:54 AM, Vinti Maheshwari 
wrote:

> Thanks for your reply Jatin. I changed my parsing logic to what you
> suggested:
>
> def parseCoverageLine(str: String) = {
>   val arr = str.split(",")
>   ...
>   ...
>   (arr(0), arr(1) :: count.toList)  // (test, [file, 1, 1, 2])
> }
>
> Then in the grouping, can i use a global hash map per executor / partition
> to aggregate the results?
>
> val globalMap:[String: List[Int]] = Map()
> val coverageDStream = inputStream.map(parseCoverageLine)
> coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => {
> // if exists in global map, append result else add new key
>
> // globalMap
> // { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] }
> })
>
> Thanks,
> Vinti
>
> On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar 
> wrote:
>
>> Hello Vinti,
>>
>> One way to get this done is you split your input line into key and value
>> tuple and then you can simply use groupByKey and handle the values the way
>> you want. For example:
>>
>> Assuming you have already split the values into a 5 tuple:
>> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>>  .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
>> r2._3))
>>
>> I hope that helps.
>>
>> --
>> Thanks
>> Jatin Kumar | Rocket Scientist
>> +91-7696741743 m
>>
>> On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari 
>> wrote:
>>
>>> Hello,
>>>
>>> I have input lines like below
>>>
>>> *Input*
>>> t1, file1, 1, 1, 1
>>> t1, file1, 1, 2, 3
>>> t1, file2, 2, 2, 2, 2
>>> t2, file1, 5, 5, 5
>>> t2, file2, 1, 1, 2, 2
>>>
>>> and i want to achieve the output like below rows which is a vertical
>>> addition of the corresponding numbers.
>>>
>>> *Output*
>>> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ]
>>> “file2” : [ 2+1, 2+1, 2+2, 2+2 ]
>>>
>>> I am in a spark streaming context and i am having a hard time trying to
>>> figure out the way to group by file name.
>>>
>>> It seems like i will need to use something like below, i am not sure how
>>> to get to the correct syntax. Any inputs will be helpful.
>>>
>>> myDStream.foreachRDD(rdd => rdd.groupBy())
>>>
>>> I know how to do the vertical sum of array of given numbers, but i am
>>> not sure how to feed that function to the group by.
>>>
>>>   def compute_counters(counts : ArrayBuffer[List[Int]]) = {
>>>   counts.toList.transpose.map(_.sum)
>>>   }
>>>
>>> ~Thanks,
>>> Vinti
>>>
>>
>>
>


Re: Stream group by

2016-02-21 Thread Vinti Maheshwari
Thanks for your reply Jatin. I changed my parsing logic to what you
suggested:

def parseCoverageLine(str: String) = {
  val arr = str.split(",")
  ...
  ...
  (arr(0), arr(1) :: count.toList)  // (test, [file, 1, 1, 2])
}

Then in the grouping, can i use a global hash map per executor / partition
to aggregate the results?

val globalMap:[String: List[Int]] = Map()
val coverageDStream = inputStream.map(parseCoverageLine)
coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => {
// if exists in global map, append result else add new key

// globalMap
// { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] }
})

Thanks,
Vinti

On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar 
wrote:

> Hello Vinti,
>
> One way to get this done is you split your input line into key and value
> tuple and then you can simply use groupByKey and handle the values the way
> you want. For example:
>
> Assuming you have already split the values into a 5 tuple:
> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>  .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
> r2._3))
>
> I hope that helps.
>
> --
> Thanks
> Jatin Kumar | Rocket Scientist
> +91-7696741743 m
>
> On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari 
> wrote:
>
>> Hello,
>>
>> I have input lines like below
>>
>> *Input*
>> t1, file1, 1, 1, 1
>> t1, file1, 1, 2, 3
>> t1, file2, 2, 2, 2, 2
>> t2, file1, 5, 5, 5
>> t2, file2, 1, 1, 2, 2
>>
>> and i want to achieve the output like below rows which is a vertical
>> addition of the corresponding numbers.
>>
>> *Output*
>> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ]
>> “file2” : [ 2+1, 2+1, 2+2, 2+2 ]
>>
>> I am in a spark streaming context and i am having a hard time trying to
>> figure out the way to group by file name.
>>
>> It seems like i will need to use something like below, i am not sure how
>> to get to the correct syntax. Any inputs will be helpful.
>>
>> myDStream.foreachRDD(rdd => rdd.groupBy())
>>
>> I know how to do the vertical sum of array of given numbers, but i am not
>> sure how to feed that function to the group by.
>>
>>   def compute_counters(counts : ArrayBuffer[List[Int]]) = {
>>   counts.toList.transpose.map(_.sum)
>>   }
>>
>> ~Thanks,
>> Vinti
>>
>
>


Re: Stream group by

2016-02-21 Thread Jatin Kumar
Hello Vinti,

One way to get this done is you split your input line into key and value
tuple and then you can simply use groupByKey and handle the values the way
you want. For example:

Assuming you have already split the values into a 5 tuple:
myDStream.map(record => (record._2, (record._3, record_4, record._5))
 .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
r2._3))

I hope that helps.

--
Thanks
Jatin Kumar | Rocket Scientist
+91-7696741743 m

On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari 
wrote:

> Hello,
>
> I have input lines like below
>
> *Input*
> t1, file1, 1, 1, 1
> t1, file1, 1, 2, 3
> t1, file2, 2, 2, 2, 2
> t2, file1, 5, 5, 5
> t2, file2, 1, 1, 2, 2
>
> and i want to achieve the output like below rows which is a vertical
> addition of the corresponding numbers.
>
> *Output*
> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ]
> “file2” : [ 2+1, 2+1, 2+2, 2+2 ]
>
> I am in a spark streaming context and i am having a hard time trying to
> figure out the way to group by file name.
>
> It seems like i will need to use something like below, i am not sure how
> to get to the correct syntax. Any inputs will be helpful.
>
> myDStream.foreachRDD(rdd => rdd.groupBy())
>
> I know how to do the vertical sum of array of given numbers, but i am not
> sure how to feed that function to the group by.
>
>   def compute_counters(counts : ArrayBuffer[List[Int]]) = {
>   counts.toList.transpose.map(_.sum)
>   }
>
> ~Thanks,
> Vinti
>