Yeah, i tried with reduceByKey, was able to do it. Thanks Ayan and Jatin. For reference, final solution:
def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("HBaseStream") val sc = new SparkContext(conf) // create a StreamingContext, the main entry point for all streaming functionality val ssc = new StreamingContext(sc, Seconds(2)) val inputStream = ssc.socketTextStream("hostname", 9999) val parsedDstream = inputStream .map(line => { val splitLines = line.split(",") (splitLines(1), splitLines.slice(2, splitLines.length).map(_.trim.toInt)) }) .reduceByKey((first, second) => { val listOfArrays = ArrayBuffer(first, second) listOfArrays.toList.transpose.map(_.sum).toArray }) .foreachRDD(rdd => rdd.foreach(Blaher.blah)) } Regards, Vinti On Sun, Feb 21, 2016 at 2:22 PM, ayan guha <guha.a...@gmail.com> wrote: > I believe the best way would be to use reduceByKey operation. > > On Mon, Feb 22, 2016 at 5:10 AM, Jatin Kumar < > jku...@rocketfuelinc.com.invalid> wrote: > >> You will need to do a collect and update a global map if you want to. >> >> myDStream.map(record => (record._2, (record._3, record_4, record._5)) >> .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 + >> r2._3)) >> .foreachRDD(rdd => { >> rdd.collect().foreach((fileName, valueTuple) => <update global >> map here>) >> }) >> >> -- >> Thanks >> Jatin Kumar | Rocket Scientist >> +91-7696741743 m >> >> On Sun, Feb 21, 2016 at 11:30 PM, Vinti Maheshwari <vinti.u...@gmail.com> >> wrote: >> >>> Nevermind, seems like an executor level mutable map is not recommended >>> as stated in >>> http://blog.guillaume-pitel.fr/2015/06/spark-trick-shot-node-centered-aggregator/ >>> >>> On Sun, Feb 21, 2016 at 9:54 AM, Vinti Maheshwari <vinti.u...@gmail.com> >>> wrote: >>> >>>> Thanks for your reply Jatin. I changed my parsing logic to what you >>>> suggested: >>>> >>>> def parseCoverageLine(str: String) = { >>>> val arr = str.split(",") >>>> ... >>>> ... >>>> (arr(0), arr(1) :: count.toList) // (test, [file, 1, 1, 2]) >>>> } >>>> >>>> Then in the grouping, can i use a global hash map per executor / >>>> partition to aggregate the results? >>>> >>>> val globalMap:[String: List[Int]] = Map() >>>> val coverageDStream = inputStream.map(parseCoverageLine) >>>> coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => { >>>> // if exists in global map, append result else add new key >>>> >>>> // globalMap >>>> // { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] } >>>> }) >>>> >>>> Thanks, >>>> Vinti >>>> >>>> On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar <jku...@rocketfuelinc.com> >>>> wrote: >>>> >>>>> Hello Vinti, >>>>> >>>>> One way to get this done is you split your input line into key and >>>>> value tuple and then you can simply use groupByKey and handle the values >>>>> the way you want. For example: >>>>> >>>>> Assuming you have already split the values into a 5 tuple: >>>>> myDStream.map(record => (record._2, (record._3, record_4, record._5)) >>>>> .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 >>>>> + r2._3)) >>>>> >>>>> I hope that helps. >>>>> >>>>> -- >>>>> Thanks >>>>> Jatin Kumar | Rocket Scientist >>>>> +91-7696741743 m >>>>> >>>>> On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari < >>>>> vinti.u...@gmail.com> wrote: >>>>> >>>>>> Hello, >>>>>> >>>>>> I have input lines like below >>>>>> >>>>>> *Input* >>>>>> t1, file1, 1, 1, 1 >>>>>> t1, file1, 1, 2, 3 >>>>>> t1, file2, 2, 2, 2, 2 >>>>>> t2, file1, 5, 5, 5 >>>>>> t2, file2, 1, 1, 2, 2 >>>>>> >>>>>> and i want to achieve the output like below rows which is a vertical >>>>>> addition of the corresponding numbers. >>>>>> >>>>>> *Output* >>>>>> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ] >>>>>> “file2” : [ 2+1, 2+1, 2+2, 2+2 ] >>>>>> >>>>>> I am in a spark streaming context and i am having a hard time trying >>>>>> to figure out the way to group by file name. >>>>>> >>>>>> It seems like i will need to use something like below, i am not sure >>>>>> how to get to the correct syntax. Any inputs will be helpful. >>>>>> >>>>>> myDStream.foreachRDD(rdd => rdd.groupBy()) >>>>>> >>>>>> I know how to do the vertical sum of array of given numbers, but i am >>>>>> not sure how to feed that function to the group by. >>>>>> >>>>>> def compute_counters(counts : ArrayBuffer[List[Int]]) = { >>>>>> counts.toList.transpose.map(_.sum) >>>>>> } >>>>>> >>>>>> ~Thanks, >>>>>> Vinti >>>>>> >>>>> >>>>> >>>> >>> >> > > > -- > Best Regards, > Ayan Guha >