Re: FlatMapValues
cool let me adapt that. thanks a tonregardssanjay From: Sean Owen To: Sanjay Subramanian Cc: "user@spark.apache.org" Sent: Monday, January 5, 2015 3:19 AM Subject: Re: FlatMapValues For the record, the solution I was suggesting was about like this: inputRDD.flatMap { input => val tokens = input.split(',') val id = tokens(0) val keyValuePairs = tokens.tail.grouped(2) val keys = keyValuePairs.map(_(0)) keys.map(key => (id, key)) } This is much more efficient. On Wed, Dec 31, 2014 at 3:46 PM, Sean Owen wrote: > From the clarification below, the problem is that you are calling > flatMapValues, which is only available on an RDD of key-value tuples. > Your map function returns a tuple in one case but a String in the > other, so your RDD is a bunch of Any, which is not at all what you > want. You need to return a tuple in both cases, which is what Kapil > pointed out. > > However it's still not quite what you want. Your input is basically > [key value1 value2 value3] so you want to flatMap that to (key,value1) > (key,value2) (key,value3). flatMapValues does not come into play. > > On Wed, Dec 31, 2014 at 3:25 PM, Sanjay Subramanian > wrote: >> My understanding is as follows >> >> STEP 1 (This would create a pair RDD) >> === >> >> reacRdd.map(line => line.split(',')).map(fields => { >> if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { >> >> (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9))) >> } >> else { >> "" >> } >> }) >> >> STEP 2 >> === >> Since previous step created a pair RDD, I thought flatMapValues method will >> be applicable. >> But the code does not even compile saying that flatMapValues is not >> applicable to RDD :-( >> >> >> reacRdd.map(line => line.split(',')).map(fields => { >> if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { >> >> (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9))) >> } >> else { >> "" >> } >> }).flatMapValues(skus => >> skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile) >> >> >> SUMMARY >> === >> when a dataset looks like the following >> >> 1,red,blue,green >> 2,yellow,violet,pink >> >> I want to output the following and I am asking how do I do that ? Perhaps my >> code is 100% wrong. Please correct me and educate me :-) >> >> 1,red >> 1,blue >> 1,green >> 2,yellow >> 2,violet >> 2,pink
Re: FlatMapValues
For the record, the solution I was suggesting was about like this: inputRDD.flatMap { input => val tokens = input.split(',') val id = tokens(0) val keyValuePairs = tokens.tail.grouped(2) val keys = keyValuePairs.map(_(0)) keys.map(key => (id, key)) } This is much more efficient. On Wed, Dec 31, 2014 at 3:46 PM, Sean Owen wrote: > From the clarification below, the problem is that you are calling > flatMapValues, which is only available on an RDD of key-value tuples. > Your map function returns a tuple in one case but a String in the > other, so your RDD is a bunch of Any, which is not at all what you > want. You need to return a tuple in both cases, which is what Kapil > pointed out. > > However it's still not quite what you want. Your input is basically > [key value1 value2 value3] so you want to flatMap that to (key,value1) > (key,value2) (key,value3). flatMapValues does not come into play. > > On Wed, Dec 31, 2014 at 3:25 PM, Sanjay Subramanian > wrote: >> My understanding is as follows >> >> STEP 1 (This would create a pair RDD) >> === >> >> reacRdd.map(line => line.split(',')).map(fields => { >> if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { >> >> (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9))) >> } >> else { >> "" >> } >> }) >> >> STEP 2 >> === >> Since previous step created a pair RDD, I thought flatMapValues method will >> be applicable. >> But the code does not even compile saying that flatMapValues is not >> applicable to RDD :-( >> >> >> reacRdd.map(line => line.split(',')).map(fields => { >> if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { >> >> (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9))) >> } >> else { >> "" >> } >> }).flatMapValues(skus => >> skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile) >> >> >> SUMMARY >> === >> when a dataset looks like the following >> >> 1,red,blue,green >> 2,yellow,violet,pink >> >> I want to output the following and I am asking how do I do that ? Perhaps my >> code is 100% wrong. Please correct me and educate me :-) >> >> 1,red >> 1,blue >> 1,green >> 2,yellow >> 2,violet >> 2,pink - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: FlatMapValues
OK this is how I solved it. Not elegant at all but works and I need to move ahead at this time.Converting to pair RDD is now not required. reacRdd.map(line => line.split(',')).map(fields => { if (fields.length >= 10 && !fields(0).contains("VAERS_ID")) { ((fields(0)+","+fields(1)+"\t"+fields(0)+","+fields(3)+"\t"+fields(0)+","+fields(5)+"\t"+fields(0)+","+fields(7)+"\t"+fields(0)+","+fields(9))) } else { ("") } }).flatMap(str => str.split('\t')).filter(line => line.toString.length() > 0).saveAsTextFile("/data/vaers/msfx/reac/" + outFile) From: Sanjay Subramanian To: Hitesh Khamesra Cc: Kapil Malik ; Sean Owen ; "user@spark.apache.org" Sent: Thursday, January 1, 2015 12:39 PM Subject: Re: FlatMapValues thanks let me try that out From: Hitesh Khamesra To: Sanjay Subramanian Cc: Kapil Malik ; Sean Owen ; "user@spark.apache.org" Sent: Thursday, January 1, 2015 9:46 AM Subject: Re: FlatMapValues How about this..apply flatmap on per line. And in that function, parse each line and return all the colums as per your need. On Wed, Dec 31, 2014 at 10:16 AM, Sanjay Subramanian wrote: hey guys Some of u may care :-) but this is just give u a background with where I am going with this. I have an IOS medical side effects app called MedicalSideFx. I built the entire underlying data layer aggregation using hadoop and currently the search is based on lucene. I am re-architecting the data layer by replacing hadoop with Spark and integrating FDA data, Canadian sidefx data and vaccines sidefx data. @Kapil , sorry but flatMapValues is being reported as undefined To give u a complete picture of the code (its inside IntelliJ but thats only for testingthe real code runs on sparkshell on my cluster) https://github.com/sanjaysubramanian/msfx_scala/blob/master/src/main/scala/org/medicalsidefx/common/utils/AersReacColumnExtractor.scala If u were to assume dataset as 025003,Delirium,8.10,Hypokinesia,8.10,Hypotonia,8.10 025005,Arthritis,8.10,Injection site oedema,8.10,Injection site reaction,8.10 This present version of the code, the flatMap works but only gives me values DeliriumHypokinesiaHypotonia ArthritisInjection site oedemaInjection site reaction What I need is 025003,Delirium 025003,Hypokinesia025003,Hypotonia025005,Arthritis 025005,Injection site oedema025005,Injection site reaction thanks sanjay From: Kapil Malik To: Sean Owen ; Sanjay Subramanian Cc: "user@spark.apache.org" Sent: Wednesday, December 31, 2014 9:35 AM Subject: RE: FlatMapValues Hi Sanjay, Oh yes .. on flatMapValues, it's defined in PairRDDFunctions, and you need to import org.apache.spark.rdd.SparkContext._ to use them (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions ) @Sean, yes indeed flatMap / flatMapValues both can be used. Regards, Kapil -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: 31 December 2014 21:16 To: Sanjay Subramanian Cc: user@spark.apache.org Subject: Re: FlatMapValues >From the clarification below, the problem is that you are calling >flatMapValues, which is only available on an RDD of key-value tuples. Your map function returns a tuple in one case but a String in the other, so your RDD is a bunch of Any, which is not at all what you want. You need to return a tuple in both cases, which is what Kapil pointed out. However it's still not quite what you want. Your input is basically [key value1 value2 value3] so you want to flatMap that to (key,value1) (key,value2) (key,value3). flatMapValues does not come into play. On Wed, Dec 31, 2014 at 3:25 PM, Sanjay Subramanian wrote: > My understanding is as follows > > STEP 1 (This would create a pair RDD) > === > > reacRdd.map(line => line.split(',')).map(fields => { > if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { > > (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9))) > } > else { > "" > } > }) > > STEP 2 > === > Since previous step created a pair RDD, I thought flatMapValues method > will be applicable. > But the code does not even compile saying that flatMapValues is not > applicable to RDD :-( > > > reacRdd.map(line => line.split(',')).map(fields => { > if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { > > (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+f
Re: FlatMapValues
thanks let me try that out From: Hitesh Khamesra To: Sanjay Subramanian Cc: Kapil Malik ; Sean Owen ; "user@spark.apache.org" Sent: Thursday, January 1, 2015 9:46 AM Subject: Re: FlatMapValues How about this..apply flatmap on per line. And in that function, parse each line and return all the colums as per your need. On Wed, Dec 31, 2014 at 10:16 AM, Sanjay Subramanian wrote: hey guys Some of u may care :-) but this is just give u a background with where I am going with this. I have an IOS medical side effects app called MedicalSideFx. I built the entire underlying data layer aggregation using hadoop and currently the search is based on lucene. I am re-architecting the data layer by replacing hadoop with Spark and integrating FDA data, Canadian sidefx data and vaccines sidefx data. @Kapil , sorry but flatMapValues is being reported as undefined To give u a complete picture of the code (its inside IntelliJ but thats only for testingthe real code runs on sparkshell on my cluster) https://github.com/sanjaysubramanian/msfx_scala/blob/master/src/main/scala/org/medicalsidefx/common/utils/AersReacColumnExtractor.scala If u were to assume dataset as 025003,Delirium,8.10,Hypokinesia,8.10,Hypotonia,8.10 025005,Arthritis,8.10,Injection site oedema,8.10,Injection site reaction,8.10 This present version of the code, the flatMap works but only gives me values DeliriumHypokinesiaHypotonia ArthritisInjection site oedemaInjection site reaction What I need is 025003,Delirium 025003,Hypokinesia025003,Hypotonia025005,Arthritis 025005,Injection site oedema025005,Injection site reaction thanks sanjay From: Kapil Malik To: Sean Owen ; Sanjay Subramanian Cc: "user@spark.apache.org" Sent: Wednesday, December 31, 2014 9:35 AM Subject: RE: FlatMapValues Hi Sanjay, Oh yes .. on flatMapValues, it's defined in PairRDDFunctions, and you need to import org.apache.spark.rdd.SparkContext._ to use them (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions ) @Sean, yes indeed flatMap / flatMapValues both can be used. Regards, Kapil -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: 31 December 2014 21:16 To: Sanjay Subramanian Cc: user@spark.apache.org Subject: Re: FlatMapValues >From the clarification below, the problem is that you are calling >flatMapValues, which is only available on an RDD of key-value tuples. Your map function returns a tuple in one case but a String in the other, so your RDD is a bunch of Any, which is not at all what you want. You need to return a tuple in both cases, which is what Kapil pointed out. However it's still not quite what you want. Your input is basically [key value1 value2 value3] so you want to flatMap that to (key,value1) (key,value2) (key,value3). flatMapValues does not come into play. On Wed, Dec 31, 2014 at 3:25 PM, Sanjay Subramanian wrote: > My understanding is as follows > > STEP 1 (This would create a pair RDD) > === > > reacRdd.map(line => line.split(',')).map(fields => { > if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { > > (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9))) > } > else { > "" > } > }) > > STEP 2 > === > Since previous step created a pair RDD, I thought flatMapValues method > will be applicable. > But the code does not even compile saying that flatMapValues is not > applicable to RDD :-( > > > reacRdd.map(line => line.split(',')).map(fields => { > if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { > > (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9))) > } > else { > "" > } > }).flatMapValues(skus => > skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile) > > > SUMMARY > === > when a dataset looks like the following > > 1,red,blue,green > 2,yellow,violet,pink > > I want to output the following and I am asking how do I do that ? > Perhaps my code is 100% wrong. Please correct me and educate me :-) > > 1,red > 1,blue > 1,green > 2,yellow > 2,violet > 2,pink - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: FlatMapValues
How about this..apply flatmap on per line. And in that function, parse each line and return all the colums as per your need. On Wed, Dec 31, 2014 at 10:16 AM, Sanjay Subramanian < sanjaysubraman...@yahoo.com.invalid> wrote: > hey guys > > Some of u may care :-) but this is just give u a background with where I > am going with this. I have an IOS medical side effects app called > MedicalSideFx. I built the entire underlying data layer aggregation using > hadoop and currently the search is based on lucene. I am re-architecting > the data layer by replacing hadoop with Spark and integrating FDA data, > Canadian sidefx data and vaccines sidefx data. > > > @Kapil , sorry but flatMapValues is being reported as undefined > > To give u a complete picture of the code (its inside IntelliJ but thats > only for testingthe real code runs on sparkshell on my cluster) > > > https://github.com/sanjaysubramanian/msfx_scala/blob/master/src/main/scala/org/medicalsidefx/common/utils/AersReacColumnExtractor.scala > > If u were to assume dataset as > > 025003,Delirium,8.10,Hypokinesia,8.10,Hypotonia,8.10 > 025005,Arthritis,8.10,Injection site oedema,8.10,Injection site > reaction,8.10 > > This present version of the code, the flatMap works but only gives me > values > Delirium > Hypokinesia > Hypotonia > Arthritis > Injection site oedema > Injection site reaction > > > What I need is > > 025003,Delirium > 025003,Hypokinesia > 025003,Hypotonia > 025005,Arthritis > 025005,Injection site oedema > 025005,Injection site reaction > > > thanks > > sanjay > > -- > *From:* Kapil Malik > *To:* Sean Owen ; Sanjay Subramanian < > sanjaysubraman...@yahoo.com> > *Cc:* "user@spark.apache.org" > *Sent:* Wednesday, December 31, 2014 9:35 AM > *Subject:* RE: FlatMapValues > > Hi Sanjay, > > Oh yes .. on flatMapValues, it's defined in PairRDDFunctions, and you need > to import org.apache.spark.rdd.SparkContext._ to use them > (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions > ) > > @Sean, yes indeed flatMap / flatMapValues both can be used. > > Regards, > > Kapil > > > > -Original Message- > From: Sean Owen [mailto:so...@cloudera.com] > Sent: 31 December 2014 21:16 > To: Sanjay Subramanian > Cc: user@spark.apache.org > Subject: Re: FlatMapValues > > From the clarification below, the problem is that you are calling > flatMapValues, which is only available on an RDD of key-value tuples. > Your map function returns a tuple in one case but a String in the other, > so your RDD is a bunch of Any, which is not at all what you want. You need > to return a tuple in both cases, which is what Kapil pointed out. > > However it's still not quite what you want. Your input is basically [key > value1 value2 value3] so you want to flatMap that to (key,value1) > (key,value2) (key,value3). flatMapValues does not come into play. > > On Wed, Dec 31, 2014 at 3:25 PM, Sanjay Subramanian < > sanjaysubraman...@yahoo.com> wrote: > > My understanding is as follows > > > > STEP 1 (This would create a pair RDD) > > === > > > > reacRdd.map(line => line.split(',')).map(fields => { > > if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { > > > > > (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9))) > > } > > else { > >"" > > } > > }) > > > > STEP 2 > > === > > Since previous step created a pair RDD, I thought flatMapValues method > > will be applicable. > > But the code does not even compile saying that flatMapValues is not > > applicable to RDD :-( > > > > > > reacRdd.map(line => line.split(',')).map(fields => { > > if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { > > > > > (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9))) > > } > > else { > >"" > > } > > }).flatMapValues(skus => > > skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile) > > > > > > SUMMARY > > === > > when a dataset looks like the following > > > > 1,red,blue,green > > 2,yellow,violet,pink > > > > I want to output the following and I am asking how do I do that ? > > Perhaps my code is 100% wrong. Please correct me and educate me :-) > > > > 1,red > > 1,blue > > 1,green > > 2,yellow > > 2,violet > > 2,pink > > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional > commands, e-mail: user-h...@spark.apache.org > > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > > > >
Re: FlatMapValues
hey guys Some of u may care :-) but this is just give u a background with where I am going with this. I have an IOS medical side effects app called MedicalSideFx. I built the entire underlying data layer aggregation using hadoop and currently the search is based on lucene. I am re-architecting the data layer by replacing hadoop with Spark and integrating FDA data, Canadian sidefx data and vaccines sidefx data. @Kapil , sorry but flatMapValues is being reported as undefined To give u a complete picture of the code (its inside IntelliJ but thats only for testingthe real code runs on sparkshell on my cluster) https://github.com/sanjaysubramanian/msfx_scala/blob/master/src/main/scala/org/medicalsidefx/common/utils/AersReacColumnExtractor.scala If u were to assume dataset as 025003,Delirium,8.10,Hypokinesia,8.10,Hypotonia,8.10 025005,Arthritis,8.10,Injection site oedema,8.10,Injection site reaction,8.10 This present version of the code, the flatMap works but only gives me values DeliriumHypokinesiaHypotonia ArthritisInjection site oedemaInjection site reaction What I need is 025003,Delirium 025003,Hypokinesia025003,Hypotonia025005,Arthritis 025005,Injection site oedema025005,Injection site reaction thanks sanjay From: Kapil Malik To: Sean Owen ; Sanjay Subramanian Cc: "user@spark.apache.org" Sent: Wednesday, December 31, 2014 9:35 AM Subject: RE: FlatMapValues Hi Sanjay, Oh yes .. on flatMapValues, it's defined in PairRDDFunctions, and you need to import org.apache.spark.rdd.SparkContext._ to use them (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions ) @Sean, yes indeed flatMap / flatMapValues both can be used. Regards, Kapil -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: 31 December 2014 21:16 To: Sanjay Subramanian Cc: user@spark.apache.org Subject: Re: FlatMapValues >From the clarification below, the problem is that you are calling >flatMapValues, which is only available on an RDD of key-value tuples. Your map function returns a tuple in one case but a String in the other, so your RDD is a bunch of Any, which is not at all what you want. You need to return a tuple in both cases, which is what Kapil pointed out. However it's still not quite what you want. Your input is basically [key value1 value2 value3] so you want to flatMap that to (key,value1) (key,value2) (key,value3). flatMapValues does not come into play. On Wed, Dec 31, 2014 at 3:25 PM, Sanjay Subramanian wrote: > My understanding is as follows > > STEP 1 (This would create a pair RDD) > === > > reacRdd.map(line => line.split(',')).map(fields => { > if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { > > (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9))) > } > else { > "" > } > }) > > STEP 2 > === > Since previous step created a pair RDD, I thought flatMapValues method > will be applicable. > But the code does not even compile saying that flatMapValues is not > applicable to RDD :-( > > > reacRdd.map(line => line.split(',')).map(fields => { > if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { > > (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9))) > } > else { > "" > } > }).flatMapValues(skus => > skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile) > > > SUMMARY > === > when a dataset looks like the following > > 1,red,blue,green > 2,yellow,violet,pink > > I want to output the following and I am asking how do I do that ? > Perhaps my code is 100% wrong. Please correct me and educate me :-) > > 1,red > 1,blue > 1,green > 2,yellow > 2,violet > 2,pink - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: FlatMapValues
Hi Sanjay, Oh yes .. on flatMapValues, it's defined in PairRDDFunctions, and you need to import org.apache.spark.rdd.SparkContext._ to use them (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions ) @Sean, yes indeed flatMap / flatMapValues both can be used. Regards, Kapil -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: 31 December 2014 21:16 To: Sanjay Subramanian Cc: user@spark.apache.org Subject: Re: FlatMapValues From the clarification below, the problem is that you are calling flatMapValues, which is only available on an RDD of key-value tuples. Your map function returns a tuple in one case but a String in the other, so your RDD is a bunch of Any, which is not at all what you want. You need to return a tuple in both cases, which is what Kapil pointed out. However it's still not quite what you want. Your input is basically [key value1 value2 value3] so you want to flatMap that to (key,value1) (key,value2) (key,value3). flatMapValues does not come into play. On Wed, Dec 31, 2014 at 3:25 PM, Sanjay Subramanian wrote: > My understanding is as follows > > STEP 1 (This would create a pair RDD) > === > > reacRdd.map(line => line.split(',')).map(fields => { > if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { > > (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9))) > } > else { > "" > } > }) > > STEP 2 > === > Since previous step created a pair RDD, I thought flatMapValues method > will be applicable. > But the code does not even compile saying that flatMapValues is not > applicable to RDD :-( > > > reacRdd.map(line => line.split(',')).map(fields => { > if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { > > (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9))) > } > else { > "" > } > }).flatMapValues(skus => > skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile) > > > SUMMARY > === > when a dataset looks like the following > > 1,red,blue,green > 2,yellow,violet,pink > > I want to output the following and I am asking how do I do that ? > Perhaps my code is 100% wrong. Please correct me and educate me :-) > > 1,red > 1,blue > 1,green > 2,yellow > 2,violet > 2,pink - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: FlatMapValues
>From the clarification below, the problem is that you are calling flatMapValues, which is only available on an RDD of key-value tuples. Your map function returns a tuple in one case but a String in the other, so your RDD is a bunch of Any, which is not at all what you want. You need to return a tuple in both cases, which is what Kapil pointed out. However it's still not quite what you want. Your input is basically [key value1 value2 value3] so you want to flatMap that to (key,value1) (key,value2) (key,value3). flatMapValues does not come into play. On Wed, Dec 31, 2014 at 3:25 PM, Sanjay Subramanian wrote: > My understanding is as follows > > STEP 1 (This would create a pair RDD) > === > > reacRdd.map(line => line.split(',')).map(fields => { > if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { > > (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9))) > } > else { > "" > } > }) > > STEP 2 > === > Since previous step created a pair RDD, I thought flatMapValues method will > be applicable. > But the code does not even compile saying that flatMapValues is not > applicable to RDD :-( > > > reacRdd.map(line => line.split(',')).map(fields => { > if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { > > (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9))) > } > else { > "" > } > }).flatMapValues(skus => > skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile) > > > SUMMARY > === > when a dataset looks like the following > > 1,red,blue,green > 2,yellow,violet,pink > > I want to output the following and I am asking how do I do that ? Perhaps my > code is 100% wrong. Please correct me and educate me :-) > > 1,red > 1,blue > 1,green > 2,yellow > 2,violet > 2,pink - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: FlatMapValues
Hey Kapil, Fernando Thanks for your mail. [1] Fernando, if I don't use an "if" logic inside the "map" then if I have lines of input data that have less fields than I am expecting I get ArrayOutOfBounds exception. so the "if" is to safeguard against that. [2] Kapil, I am sorry I did not clarify. Yes my code "DID NOT" compile saying that flatMapValues is not defined. In fact when I used your snippet , the code still does not compile Error:(36, 57) value flatMapValues is not a member of org.apache.spark.rdd.RDD[(String, String)] }).filter(pair => pair._1.length() > 0).flatMapValues(skus => skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile) ^ My pom.xml looks like this org.apache.spark spark-core_2.10 1.2.0 org.apache.spark spark-sql_2.10 1.2.0 [3] To summarize all I want is to convert SUMMARY===when a dataset looks like the following 1,red,blue,green2,yellow,violet,pink I want to output the following and currently not able to 1,red1,blue1,green2,yellow2,violet2,pink thanks regards sanjay From: Fernando O. To: Kapil Malik Cc: Sanjay Subramanian ; "user@spark.apache.org" Sent: Wednesday, December 31, 2014 6:06 AM Subject: Re: FlatMapValues Hi Sanjay, Doing an if inside a Map sounds like a bad idea, it seems like you actually want to filter and then apply map On Wed, Dec 31, 2014 at 9:54 AM, Kapil Malik wrote: Hi Sanjay, I tried running your code on spark shell piece by piece – // Setupval line1 = “025126,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10”val line2 = “025127,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10”val lines = Array[String](line1, line2) val r1 = sc.parallelize(lines, 2)// r1 is the original RDD[String] to begin with val r2 = r1.map(line => line.split(','))// RDD[Array[String]] – so far, so goodval r3 = r2.map(fields => { if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))// Returns a pair (String, String), good } else { ""// Returns a String, bad } })// RDD[Serializable] – PROBLEM I was not even able to apply flatMapValues since the filtered RDD passed to it is RDD[Serializable] and not a pair RDD. I am surprised how your code compiled correctly. The following changes in your snippet make it work as intended - reacRdd.map(line => line.split(',')).map(fields => { if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9))) } else { ("","") } }).filter(pair => pair._1.length() > 0).flatMapValues(skus => skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile) Please note that this too saves lines like (025126,Chills),i.e. with opening and closing brackets ( and ). If you want to get rid of them, better do another map operation to map pair to String. Kapil From: Sanjay Subramanian [mailto:sanjaysubraman...@yahoo.com.INVALID] Sent: 31 December 2014 13:42 Cc: user@spark.apache.org Subject: FlatMapValues hey guys My dataset is like this 025126,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10 Intended output is ==025126,Chills025126,Injection site oedema025126,Injection site reaction025126,Malaise025126,Myalgia My code is as follows but the flatMapValues does not work even after I have created the pair RDD.****reacRdd.map(line => line.split(',')).map(fields => { if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9))) } else { "" } }).filter(line => line.toString.length() > 0).flatMapValues(skus => skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile) thanks sanjay
Re: FlatMapValues
Hi Sanjay, Doing an if inside a Map sounds like a bad idea, it seems like you actually want to filter and then apply map On Wed, Dec 31, 2014 at 9:54 AM, Kapil Malik wrote: > Hi Sanjay, > > > > I tried running your code on spark shell piece by piece – > > > > // Setup > > val line1 = “025126,Chills,8.10,Injection site oedema,8.10,Injection site > reaction,8.10,Malaise,8.10,Myalgia,8.10” > > val line2 = “025127,Chills,8.10,Injection site oedema,8.10,Injection site > reaction,8.10,Malaise,8.10,Myalgia,8.10” > > val lines = Array[String](line1, line2) > > > > val r1 = sc.parallelize(lines, 2) // r1 is the original RDD[String] to > begin with > > > > val r2 = r1.map(line => line.split(',')) // RDD[Array[String]] – so far, > so good > > val r3 = r2.map(fields => { > > if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { > > > (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9))) > // Returns a pair (String, String), good > > } > > else { > > "" // Returns a String, bad > > } > > }) // RDD[Serializable] – PROBLEM > > > > I was not even able to apply flatMapValues since the filtered RDD passed > to it is RDD[Serializable] and not a pair RDD. I am surprised how your code > compiled correctly. > > > > > > The following changes in your snippet make it work as intended - > > > > reacRdd.map(line => line.split(*','*)).map(fields => { > *if *(fields.length >= 11 && !fields(0).contains(*"VAERS_ID"*)) { > > (fields(0),(fields(1)+*"**\t**"*+fields(3)+*"**\t**"*+fields(5)+*"**\t**"*+fields(7)+*"**\t**"*+fields(9))) > } > *else *{ > (*"","")* > } > }).filter(pair => pair._1.length() > 0).flatMapValues(skus => > skus.split(*'**\t**'*)).saveAsTextFile(*"/data/vaers/msfx/reac/" *+ outFile) > > > > Please note that this too saves lines like (025126,Chills), i.e. with > opening and closing brackets ( and ). If you want to get rid of them, > better do another map operation to map pair to String. > > > > Kapil > > > > *From:* Sanjay Subramanian [mailto:sanjaysubraman...@yahoo.com.INVALID] > *Sent:* 31 December 2014 13:42 > *Cc:* user@spark.apache.org > *Subject:* FlatMapValues > > > > hey guys > > > > My dataset is like this > > > > 025126,Chills,8.10,Injection site oedema,8.10,Injection site > reaction,8.10,Malaise,8.10,Myalgia,8.10 > > > > Intended output is > > == > > 025126,Chills > > 025126,Injection site oedema > > 025126,Injection site reaction > > 025126,Malaise > > 025126,Myalgia > > > > My code is as follows but the flatMapValues does not work even after I have > created the pair RDD. > > > > reacRdd.map(line => line.split(*','*)).map(fields => { > *if *(fields.length >= 11 && !fields(0).contains(*"VAERS_ID"*)) { > > (fields(0),(fields(1)+*"**\t**"*+fields(3)+*"**\t**"*+fields(5)+*"**\t**"*+fields(7)+*"**\t**"*+fields(9))) > } > *else *{ > > *"" *} > }).filter(line => line.toString.length() > 0).flatMapValues(skus => > skus.split(*'**\t**'*)).saveAsTextFile(*"/data/vaers/msfx/reac/" *+ outFile) > > > > > > thanks > > > > sanjay >
RE: FlatMapValues
Hi Sanjay, I tried running your code on spark shell piece by piece – // Setup val line1 = “025126,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10” val line2 = “025127,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10” val lines = Array[String](line1, line2) val r1 = sc.parallelize(lines, 2) // r1 is the original RDD[String] to begin with val r2 = r1.map(line => line.split(',')) // RDD[Array[String]] – so far, so good val r3 = r2.map(fields => { if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9))) // Returns a pair (String, String), good } else { "" // Returns a String, bad } }) // RDD[Serializable] – PROBLEM I was not even able to apply flatMapValues since the filtered RDD passed to it is RDD[Serializable] and not a pair RDD. I am surprised how your code compiled correctly. The following changes in your snippet make it work as intended - reacRdd.map(line => line.split(',')).map(fields => { if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9))) } else { ("","") } }).filter(pair => pair._1.length() > 0).flatMapValues(skus => skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile) Please note that this too saves lines like (025126,Chills), i.e. with opening and closing brackets ( and ). If you want to get rid of them, better do another map operation to map pair to String. Kapil From: Sanjay Subramanian [mailto:sanjaysubraman...@yahoo.com.INVALID] Sent: 31 December 2014 13:42 Cc: user@spark.apache.org Subject: FlatMapValues hey guys My dataset is like this 025126,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10 Intended output is == 025126,Chills 025126,Injection site oedema 025126,Injection site reaction 025126,Malaise 025126,Myalgia My code is as follows but the flatMapValues does not work even after I have created the pair RDD. reacRdd.map(line => line.split(',')).map(fields => { if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9))) } else { "" } }).filter(line => line.toString.length() > 0).flatMapValues(skus => skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile) thanks sanjay
Re: FlatMapValues
Why don't you push "\n" instead of "\t" in your first transformation [ (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t" +fields(9)))] and then do saveAsTextFile? -Raghavendra On Wed Dec 31 2014 at 1:42:55 PM Sanjay Subramanian wrote: > hey guys > > My dataset is like this > > 025126,Chills,8.10,Injection site oedema,8.10,Injection site > reaction,8.10,Malaise,8.10,Myalgia,8.10 > > Intended output is > == > 025126,Chills > 025126,Injection site oedema > 025126,Injection site reaction > 025126,Malaise > 025126,Myalgia > > My code is as follows but the flatMapValues does not work even after I have > created the pair RDD. > > > > reacRdd.map(line => line.split(',')).map(fields => { > if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { > > (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9))) > } > else { > "" > } > }).filter(line => line.toString.length() > 0).flatMapValues(skus => > skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile) > > > > > thanks > > sanjay >
FlatMapValues
hey guys My dataset is like this 025126,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10 Intended output is ==025126,Chills 025126,Injection site oedema 025126,Injection site reaction 025126,Malaise 025126,Myalgia My code is as follows but the flatMapValues does not work even after I have created the pair RDD.reacRdd.map(line => line.split(',')).map(fields => { if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9))) } else { "" } }).filter(line => line.toString.length() > 0).flatMapValues(skus => skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile) thanks sanjay
Re: Type problem in Java when using flatMapValues
Damn, you're right, I wasn't looking at it properly. I was confused by intelliJ I guess. Many thanks! On 2014-10-02 19:02, Sean Owen wrote: Eh, is it not that you are mapping the values of an RDD whose keys are StringStrings, but expecting the keys are Strings? That's also about what the compiler is saying too. On Thu, Oct 2, 2014 at 4:15 PM, Robin Keunen wrote: Hi all, I successfully implemented my algorithm in Scala but my team wants it in Java. I have a problem with Generics, can anyone help me? I have a first JavaPairRDD with a structure like ((ean, key), [from, to, value]) ean and key are string from and to are DateTime value is a Double JavaPairRDD> eanKeyTsParameters = javaRDD.mapToPair( ... ); Then I try to do flatMapValues to apply the GenerateTimeSeries Function, it takes the from, to and values to generate a List. Here is the error I get when compiling: error: incompatible types: no instance(s) of type variable(s) U exist so that JavaPairRDD conforms to JavaPairRDD Here is what IntelliJ tells me: flatMapValues( Function, Iterable> ) in JavaPairRDD cannot be applied to Transformations.GenerateTimeSeries Here is the problematic transformation: JavaPairRDD keyLongDouble = eanKeyTsParameters.flatMapValues(new Transformations.GenerateTimeSeries()); And here is the Function: import org.apache.spark.api.java.function.Function; [...] public class Transformations { public static class GenerateTimeSeries implements Function, Iterable> { @Override public Iterable call(List args) { DateTime start = (DateTime) args.get(0); DateTime end = (DateTime) args.get(1); Double value = (Double) args.get(2); int granularity = 24*60*60*1000; // 1 day return AggregationUtils.createTimeSeries(start, end, value, granularity); } } } Any idea? Thanks -- Robin Keunen Software Engineer robin.keu...@lampiris.be www.lampiris.be -- Robin Keunen Software Engineer robin.keu...@lampiris.be www.lampiris.be - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Type problem in Java when using flatMapValues
Eh, is it not that you are mapping the values of an RDD whose keys are StringStrings, but expecting the keys are Strings? That's also about what the compiler is saying too. On Thu, Oct 2, 2014 at 4:15 PM, Robin Keunen wrote: > Hi all, > > I successfully implemented my algorithm in Scala but my team wants it in > Java. I have a problem with Generics, can anyone help me? > > I have a first JavaPairRDD with a structure like ((ean, key), [from, to, > value]) > > ean and key are string > from and to are DateTime > value is a Double > > JavaPairRDD> eanKeyTsParameters = > javaRDD.mapToPair( ... ); > > Then I try to do flatMapValues to apply the GenerateTimeSeries Function, it > takes the from, to and values to generate a List. Here is the > error I get when compiling: > > error: incompatible types: no instance(s) of type variable(s) U exist so > that JavaPairRDD conforms to JavaPairRDD > > Here is what IntelliJ tells me: > flatMapValues( Function, Iterable> ) in JavaPairRDD > cannot be applied to Transformations.GenerateTimeSeries > > Here is the problematic transformation: > > JavaPairRDD keyLongDouble = > eanKeyTsParameters.flatMapValues(new > Transformations.GenerateTimeSeries()); > > And here is the Function: > > import org.apache.spark.api.java.function.Function; [...] > > public class Transformations { > public static class GenerateTimeSeries > implements Function, Iterable> { > > @Override > public Iterable call(List args) { > DateTime start = (DateTime) args.get(0); > DateTime end = (DateTime) args.get(1); > Double value = (Double) args.get(2); > int granularity = 24*60*60*1000; // 1 day > > return AggregationUtils.createTimeSeries(start, end, value, > granularity); > } > } > } > > Any idea? > > Thanks > > -- > > Robin Keunen > Software Engineer > robin.keu...@lampiris.be > www.lampiris.be - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Type problem in Java when using flatMapValues
Hi all, I successfully implemented my algorithm in Scala but my team wants it in Java. I have a problem with Generics, can anyone help me? I have a first JavaPairRDD with a structure like ((ean, key), [from, to, value]) * ean and key are string * from and to are DateTime * value is a Double JavaPairRDD> eanKeyTsParameters = javaRDD.mapToPair( ... ); Then I try to do flatMapValues to apply the GenerateTimeSeries Function, it takes the /from, to /and /values/ to generate a List. Here is the error I get when compiling: error: incompatible types: no instance(s) of type variable(s) U exist so that JavaPairRDD conforms to JavaPairRDD Here is what IntelliJ tells me: flatMapValues( Function, Iterable> ) in JavaPairRDD cannot be applied to Transformations.GenerateTimeSeries Here is the problematic transformation: JavaPairRDD keyLongDouble = eanKeyTsParameters.flatMapValues(new Transformations.GenerateTimeSeries()); And here is the Function: import org.apache.spark.api.java.function.Function; [...] public class Transformations { public static class GenerateTimeSeries implements Function, Iterable> { @Override public Iterable call(List args) { DateTime start = (DateTime) args.get(0); DateTime end = (DateTime) args.get(1); Double value = (Double) args.get(2); int granularity = 24*60*60*1000; // 1 day return AggregationUtils.createTimeSeries(start, end, value, granularity); } } } Any idea? Thanks -- Robin Keunen Software Engineer robin.keu...@lampiris.be www.lampiris.be