Re: Stream group by
> > Yeah, i tried with reduceByKey, was able to do it. Thanks Ayan and Jatin. > For reference, final solution: > > def main(args: Array[String]): Unit = { > val conf = new SparkConf().setAppName("HBaseStream") > val sc = new SparkContext(conf) > // create a StreamingContext, the main entry point for all streaming > functionality > val ssc = new StreamingContext(sc, Seconds(2)) > val inputStream = ssc.socketTextStream("hostname", ) > val parsedDstream = inputStream > .map(line => { > val splitLines = line.split(",") > (splitLines(1), splitLines.slice(2, > splitLines.length).map(_.trim.toInt)) > }) > .reduceByKey((first, second) => { > val listOfArrays = ArrayBuffer(first, second) > listOfArrays.toList.transpose.map(_.sum).toArray > }) > .foreachRDD(rdd => rdd.foreach(Blaher.blah)) > > } > > > Regards, > Vinti > > On Sun, Feb 21, 2016 at 2:22 PM, ayan guhawrote: > >> I believe the best way would be to use reduceByKey operation. >> >> On Mon, Feb 22, 2016 at 5:10 AM, Jatin Kumar < >> jku...@rocketfuelinc.com.invalid> wrote: >> >>> You will need to do a collect and update a global map if you want to. >>> >>> myDStream.map(record => (record._2, (record._3, record_4, record._5)) >>> .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 + >>> r2._3)) >>> .foreachRDD(rdd => { >>>rdd.collect().foreach((fileName, valueTuple) => >> global map here>) >>> }) >>> >>> -- >>> Thanks >>> Jatin Kumar | Rocket Scientist >>> +91-7696741743 m >>> >>> On Sun, Feb 21, 2016 at 11:30 PM, Vinti Maheshwari >> > wrote: >>> Nevermind, seems like an executor level mutable map is not recommended as stated in http://blog.guillaume-pitel.fr/2015/06/spark-trick-shot-node-centered-aggregator/ On Sun, Feb 21, 2016 at 9:54 AM, Vinti Maheshwari wrote: > Thanks for your reply Jatin. I changed my parsing logic to what you > suggested: > > def parseCoverageLine(str: String) = { > val arr = str.split(",") > ... > ... > (arr(0), arr(1) :: count.toList) // (test, [file, 1, 1, 2]) > } > > Then in the grouping, can i use a global hash map per executor / > partition to aggregate the results? > > val globalMap:[String: List[Int]] = Map() > val coverageDStream = inputStream.map(parseCoverageLine) > coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => { > // if exists in global map, append result else add new key > > // globalMap > // { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] } > }) > > Thanks, > Vinti > > On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar > wrote: > >> Hello Vinti, >> >> One way to get this done is you split your input line into key and >> value tuple and then you can simply use groupByKey and handle the values >> the way you want. For example: >> >> Assuming you have already split the values into a 5 tuple: >> myDStream.map(record => (record._2, (record._3, record_4, record._5)) >> .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 >> + r2._3)) >> >> I hope that helps. >> >> -- >> Thanks >> Jatin Kumar | Rocket Scientist >> +91-7696741743 m >> >> On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari < >> vinti.u...@gmail.com> wrote: >> >>> Hello, >>> >>> I have input lines like below >>> >>> *Input* >>> t1, file1, 1, 1, 1 >>> t1, file1, 1, 2, 3 >>> t1, file2, 2, 2, 2, 2 >>> t2, file1, 5, 5, 5 >>> t2, file2, 1, 1, 2, 2 >>> >>> and i want to achieve the output like below rows which is a vertical >>> addition of the corresponding numbers. >>> >>> *Output* >>> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ] >>> “file2” : [ 2+1, 2+1, 2+2, 2+2 ] >>> >>> I am in a spark streaming context and i am having a hard time trying >>> to figure out the way to group by file name. >>> >>> It seems like i will need to use something like below, i am not sure >>> how to get to the correct syntax. Any inputs will be helpful. >>> >>> myDStream.foreachRDD(rdd => rdd.groupBy()) >>> >>> I know how to do the vertical sum of array of given numbers, but i >>> am not sure how to feed that function to the group by. >>> >>> def compute_counters(counts : ArrayBuffer[List[Int]]) = { >>> counts.toList.transpose.map(_.sum) >>> } >>> >>> ~Thanks, >>> Vinti >>> >> >> > >>> >> >> >> -- >> Best Regards, >> Ayan Guha >> > >
Re: Stream group by
Yeah, i tried with reduceByKey, was able to do it. Thanks Ayan and Jatin. For reference, final solution: def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("HBaseStream") val sc = new SparkContext(conf) // create a StreamingContext, the main entry point for all streaming functionality val ssc = new StreamingContext(sc, Seconds(2)) val inputStream = ssc.socketTextStream("hostname", ) val parsedDstream = inputStream .map(line => { val splitLines = line.split(",") (splitLines(1), splitLines.slice(2, splitLines.length).map(_.trim.toInt)) }) .reduceByKey((first, second) => { val listOfArrays = ArrayBuffer(first, second) listOfArrays.toList.transpose.map(_.sum).toArray }) .foreachRDD(rdd => rdd.foreach(Blaher.blah)) } Regards, Vinti On Sun, Feb 21, 2016 at 2:22 PM, ayan guhawrote: > I believe the best way would be to use reduceByKey operation. > > On Mon, Feb 22, 2016 at 5:10 AM, Jatin Kumar < > jku...@rocketfuelinc.com.invalid> wrote: > >> You will need to do a collect and update a global map if you want to. >> >> myDStream.map(record => (record._2, (record._3, record_4, record._5)) >> .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 + >> r2._3)) >> .foreachRDD(rdd => { >>rdd.collect().foreach((fileName, valueTuple) => > map here>) >> }) >> >> -- >> Thanks >> Jatin Kumar | Rocket Scientist >> +91-7696741743 m >> >> On Sun, Feb 21, 2016 at 11:30 PM, Vinti Maheshwari >> wrote: >> >>> Nevermind, seems like an executor level mutable map is not recommended >>> as stated in >>> http://blog.guillaume-pitel.fr/2015/06/spark-trick-shot-node-centered-aggregator/ >>> >>> On Sun, Feb 21, 2016 at 9:54 AM, Vinti Maheshwari >>> wrote: >>> Thanks for your reply Jatin. I changed my parsing logic to what you suggested: def parseCoverageLine(str: String) = { val arr = str.split(",") ... ... (arr(0), arr(1) :: count.toList) // (test, [file, 1, 1, 2]) } Then in the grouping, can i use a global hash map per executor / partition to aggregate the results? val globalMap:[String: List[Int]] = Map() val coverageDStream = inputStream.map(parseCoverageLine) coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => { // if exists in global map, append result else add new key // globalMap // { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] } }) Thanks, Vinti On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar wrote: > Hello Vinti, > > One way to get this done is you split your input line into key and > value tuple and then you can simply use groupByKey and handle the values > the way you want. For example: > > Assuming you have already split the values into a 5 tuple: > myDStream.map(record => (record._2, (record._3, record_4, record._5)) > .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 > + r2._3)) > > I hope that helps. > > -- > Thanks > Jatin Kumar | Rocket Scientist > +91-7696741743 m > > On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari < > vinti.u...@gmail.com> wrote: > >> Hello, >> >> I have input lines like below >> >> *Input* >> t1, file1, 1, 1, 1 >> t1, file1, 1, 2, 3 >> t1, file2, 2, 2, 2, 2 >> t2, file1, 5, 5, 5 >> t2, file2, 1, 1, 2, 2 >> >> and i want to achieve the output like below rows which is a vertical >> addition of the corresponding numbers. >> >> *Output* >> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ] >> “file2” : [ 2+1, 2+1, 2+2, 2+2 ] >> >> I am in a spark streaming context and i am having a hard time trying >> to figure out the way to group by file name. >> >> It seems like i will need to use something like below, i am not sure >> how to get to the correct syntax. Any inputs will be helpful. >> >> myDStream.foreachRDD(rdd => rdd.groupBy()) >> >> I know how to do the vertical sum of array of given numbers, but i am >> not sure how to feed that function to the group by. >> >> def compute_counters(counts : ArrayBuffer[List[Int]]) = { >> counts.toList.transpose.map(_.sum) >> } >> >> ~Thanks, >> Vinti >> > > >>> >> > > > -- > Best Regards, > Ayan Guha >
Re: Stream group by
I believe the best way would be to use reduceByKey operation. On Mon, Feb 22, 2016 at 5:10 AM, Jatin Kumar < jku...@rocketfuelinc.com.invalid> wrote: > You will need to do a collect and update a global map if you want to. > > myDStream.map(record => (record._2, (record._3, record_4, record._5)) > .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 + > r2._3)) > .foreachRDD(rdd => { >rdd.collect().foreach((fileName, valueTuple) => map here>) > }) > > -- > Thanks > Jatin Kumar | Rocket Scientist > +91-7696741743 m > > On Sun, Feb 21, 2016 at 11:30 PM, Vinti Maheshwari> wrote: > >> Nevermind, seems like an executor level mutable map is not recommended as >> stated in >> http://blog.guillaume-pitel.fr/2015/06/spark-trick-shot-node-centered-aggregator/ >> >> On Sun, Feb 21, 2016 at 9:54 AM, Vinti Maheshwari >> wrote: >> >>> Thanks for your reply Jatin. I changed my parsing logic to what you >>> suggested: >>> >>> def parseCoverageLine(str: String) = { >>> val arr = str.split(",") >>> ... >>> ... >>> (arr(0), arr(1) :: count.toList) // (test, [file, 1, 1, 2]) >>> } >>> >>> Then in the grouping, can i use a global hash map per executor / >>> partition to aggregate the results? >>> >>> val globalMap:[String: List[Int]] = Map() >>> val coverageDStream = inputStream.map(parseCoverageLine) >>> coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => { >>> // if exists in global map, append result else add new key >>> >>> // globalMap >>> // { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] } >>> }) >>> >>> Thanks, >>> Vinti >>> >>> On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar >>> wrote: >>> Hello Vinti, One way to get this done is you split your input line into key and value tuple and then you can simply use groupByKey and handle the values the way you want. For example: Assuming you have already split the values into a 5 tuple: myDStream.map(record => (record._2, (record._3, record_4, record._5)) .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 + r2._3)) I hope that helps. -- Thanks Jatin Kumar | Rocket Scientist +91-7696741743 m On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari < vinti.u...@gmail.com> wrote: > Hello, > > I have input lines like below > > *Input* > t1, file1, 1, 1, 1 > t1, file1, 1, 2, 3 > t1, file2, 2, 2, 2, 2 > t2, file1, 5, 5, 5 > t2, file2, 1, 1, 2, 2 > > and i want to achieve the output like below rows which is a vertical > addition of the corresponding numbers. > > *Output* > “file1” : [ 1+1+5, 1+2+5, 1+3+5 ] > “file2” : [ 2+1, 2+1, 2+2, 2+2 ] > > I am in a spark streaming context and i am having a hard time trying > to figure out the way to group by file name. > > It seems like i will need to use something like below, i am not sure > how to get to the correct syntax. Any inputs will be helpful. > > myDStream.foreachRDD(rdd => rdd.groupBy()) > > I know how to do the vertical sum of array of given numbers, but i am > not sure how to feed that function to the group by. > > def compute_counters(counts : ArrayBuffer[List[Int]]) = { > counts.toList.transpose.map(_.sum) > } > > ~Thanks, > Vinti > >>> >> > -- Best Regards, Ayan Guha
Re: Stream group by
You will need to do a collect and update a global map if you want to. myDStream.map(record => (record._2, (record._3, record_4, record._5)) .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 + r2._3)) .foreachRDD(rdd => { rdd.collect().foreach((fileName, valueTuple) => ) }) -- Thanks Jatin Kumar | Rocket Scientist +91-7696741743 m On Sun, Feb 21, 2016 at 11:30 PM, Vinti Maheshwariwrote: > Nevermind, seems like an executor level mutable map is not recommended as > stated in > http://blog.guillaume-pitel.fr/2015/06/spark-trick-shot-node-centered-aggregator/ > > On Sun, Feb 21, 2016 at 9:54 AM, Vinti Maheshwari > wrote: > >> Thanks for your reply Jatin. I changed my parsing logic to what you >> suggested: >> >> def parseCoverageLine(str: String) = { >> val arr = str.split(",") >> ... >> ... >> (arr(0), arr(1) :: count.toList) // (test, [file, 1, 1, 2]) >> } >> >> Then in the grouping, can i use a global hash map per executor / >> partition to aggregate the results? >> >> val globalMap:[String: List[Int]] = Map() >> val coverageDStream = inputStream.map(parseCoverageLine) >> coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => { >> // if exists in global map, append result else add new key >> >> // globalMap >> // { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] } >> }) >> >> Thanks, >> Vinti >> >> On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar >> wrote: >> >>> Hello Vinti, >>> >>> One way to get this done is you split your input line into key and value >>> tuple and then you can simply use groupByKey and handle the values the way >>> you want. For example: >>> >>> Assuming you have already split the values into a 5 tuple: >>> myDStream.map(record => (record._2, (record._3, record_4, record._5)) >>> .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 + >>> r2._3)) >>> >>> I hope that helps. >>> >>> -- >>> Thanks >>> Jatin Kumar | Rocket Scientist >>> +91-7696741743 m >>> >>> On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari >> > wrote: >>> Hello, I have input lines like below *Input* t1, file1, 1, 1, 1 t1, file1, 1, 2, 3 t1, file2, 2, 2, 2, 2 t2, file1, 5, 5, 5 t2, file2, 1, 1, 2, 2 and i want to achieve the output like below rows which is a vertical addition of the corresponding numbers. *Output* “file1” : [ 1+1+5, 1+2+5, 1+3+5 ] “file2” : [ 2+1, 2+1, 2+2, 2+2 ] I am in a spark streaming context and i am having a hard time trying to figure out the way to group by file name. It seems like i will need to use something like below, i am not sure how to get to the correct syntax. Any inputs will be helpful. myDStream.foreachRDD(rdd => rdd.groupBy()) I know how to do the vertical sum of array of given numbers, but i am not sure how to feed that function to the group by. def compute_counters(counts : ArrayBuffer[List[Int]]) = { counts.toList.transpose.map(_.sum) } ~Thanks, Vinti >>> >>> >> >
Re: Stream group by
Nevermind, seems like an executor level mutable map is not recommended as stated in http://blog.guillaume-pitel.fr/2015/06/spark-trick-shot-node-centered-aggregator/ On Sun, Feb 21, 2016 at 9:54 AM, Vinti Maheshwariwrote: > Thanks for your reply Jatin. I changed my parsing logic to what you > suggested: > > def parseCoverageLine(str: String) = { > val arr = str.split(",") > ... > ... > (arr(0), arr(1) :: count.toList) // (test, [file, 1, 1, 2]) > } > > Then in the grouping, can i use a global hash map per executor / partition > to aggregate the results? > > val globalMap:[String: List[Int]] = Map() > val coverageDStream = inputStream.map(parseCoverageLine) > coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => { > // if exists in global map, append result else add new key > > // globalMap > // { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] } > }) > > Thanks, > Vinti > > On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar > wrote: > >> Hello Vinti, >> >> One way to get this done is you split your input line into key and value >> tuple and then you can simply use groupByKey and handle the values the way >> you want. For example: >> >> Assuming you have already split the values into a 5 tuple: >> myDStream.map(record => (record._2, (record._3, record_4, record._5)) >> .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 + >> r2._3)) >> >> I hope that helps. >> >> -- >> Thanks >> Jatin Kumar | Rocket Scientist >> +91-7696741743 m >> >> On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari >> wrote: >> >>> Hello, >>> >>> I have input lines like below >>> >>> *Input* >>> t1, file1, 1, 1, 1 >>> t1, file1, 1, 2, 3 >>> t1, file2, 2, 2, 2, 2 >>> t2, file1, 5, 5, 5 >>> t2, file2, 1, 1, 2, 2 >>> >>> and i want to achieve the output like below rows which is a vertical >>> addition of the corresponding numbers. >>> >>> *Output* >>> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ] >>> “file2” : [ 2+1, 2+1, 2+2, 2+2 ] >>> >>> I am in a spark streaming context and i am having a hard time trying to >>> figure out the way to group by file name. >>> >>> It seems like i will need to use something like below, i am not sure how >>> to get to the correct syntax. Any inputs will be helpful. >>> >>> myDStream.foreachRDD(rdd => rdd.groupBy()) >>> >>> I know how to do the vertical sum of array of given numbers, but i am >>> not sure how to feed that function to the group by. >>> >>> def compute_counters(counts : ArrayBuffer[List[Int]]) = { >>> counts.toList.transpose.map(_.sum) >>> } >>> >>> ~Thanks, >>> Vinti >>> >> >> >
Re: Stream group by
Thanks for your reply Jatin. I changed my parsing logic to what you suggested: def parseCoverageLine(str: String) = { val arr = str.split(",") ... ... (arr(0), arr(1) :: count.toList) // (test, [file, 1, 1, 2]) } Then in the grouping, can i use a global hash map per executor / partition to aggregate the results? val globalMap:[String: List[Int]] = Map() val coverageDStream = inputStream.map(parseCoverageLine) coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => { // if exists in global map, append result else add new key // globalMap // { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] } }) Thanks, Vinti On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumarwrote: > Hello Vinti, > > One way to get this done is you split your input line into key and value > tuple and then you can simply use groupByKey and handle the values the way > you want. For example: > > Assuming you have already split the values into a 5 tuple: > myDStream.map(record => (record._2, (record._3, record_4, record._5)) > .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 + > r2._3)) > > I hope that helps. > > -- > Thanks > Jatin Kumar | Rocket Scientist > +91-7696741743 m > > On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari > wrote: > >> Hello, >> >> I have input lines like below >> >> *Input* >> t1, file1, 1, 1, 1 >> t1, file1, 1, 2, 3 >> t1, file2, 2, 2, 2, 2 >> t2, file1, 5, 5, 5 >> t2, file2, 1, 1, 2, 2 >> >> and i want to achieve the output like below rows which is a vertical >> addition of the corresponding numbers. >> >> *Output* >> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ] >> “file2” : [ 2+1, 2+1, 2+2, 2+2 ] >> >> I am in a spark streaming context and i am having a hard time trying to >> figure out the way to group by file name. >> >> It seems like i will need to use something like below, i am not sure how >> to get to the correct syntax. Any inputs will be helpful. >> >> myDStream.foreachRDD(rdd => rdd.groupBy()) >> >> I know how to do the vertical sum of array of given numbers, but i am not >> sure how to feed that function to the group by. >> >> def compute_counters(counts : ArrayBuffer[List[Int]]) = { >> counts.toList.transpose.map(_.sum) >> } >> >> ~Thanks, >> Vinti >> > >
Re: Stream group by
Hello Vinti, One way to get this done is you split your input line into key and value tuple and then you can simply use groupByKey and handle the values the way you want. For example: Assuming you have already split the values into a 5 tuple: myDStream.map(record => (record._2, (record._3, record_4, record._5)) .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 + r2._3)) I hope that helps. -- Thanks Jatin Kumar | Rocket Scientist +91-7696741743 m On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwariwrote: > Hello, > > I have input lines like below > > *Input* > t1, file1, 1, 1, 1 > t1, file1, 1, 2, 3 > t1, file2, 2, 2, 2, 2 > t2, file1, 5, 5, 5 > t2, file2, 1, 1, 2, 2 > > and i want to achieve the output like below rows which is a vertical > addition of the corresponding numbers. > > *Output* > “file1” : [ 1+1+5, 1+2+5, 1+3+5 ] > “file2” : [ 2+1, 2+1, 2+2, 2+2 ] > > I am in a spark streaming context and i am having a hard time trying to > figure out the way to group by file name. > > It seems like i will need to use something like below, i am not sure how > to get to the correct syntax. Any inputs will be helpful. > > myDStream.foreachRDD(rdd => rdd.groupBy()) > > I know how to do the vertical sum of array of given numbers, but i am not > sure how to feed that function to the group by. > > def compute_counters(counts : ArrayBuffer[List[Int]]) = { > counts.toList.transpose.map(_.sum) > } > > ~Thanks, > Vinti >