Re: FlatMapValues
For the record, the solution I was suggesting was about like this: inputRDD.flatMap { input = val tokens = input.split(',') val id = tokens(0) val keyValuePairs = tokens.tail.grouped(2) val keys = keyValuePairs.map(_(0)) keys.map(key = (id, key)) } This is much more efficient. On Wed, Dec 31, 2014 at 3:46 PM, Sean Owen so...@cloudera.com wrote: From the clarification below, the problem is that you are calling flatMapValues, which is only available on an RDD of key-value tuples. Your map function returns a tuple in one case but a String in the other, so your RDD is a bunch of Any, which is not at all what you want. You need to return a tuple in both cases, which is what Kapil pointed out. However it's still not quite what you want. Your input is basically [key value1 value2 value3] so you want to flatMap that to (key,value1) (key,value2) (key,value3). flatMapValues does not come into play. On Wed, Dec 31, 2014 at 3:25 PM, Sanjay Subramanian sanjaysubraman...@yahoo.com wrote: My understanding is as follows STEP 1 (This would create a pair RDD) === reacRdd.map(line = line.split(',')).map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9))) } else { } }) STEP 2 === Since previous step created a pair RDD, I thought flatMapValues method will be applicable. But the code does not even compile saying that flatMapValues is not applicable to RDD :-( reacRdd.map(line = line.split(',')).map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9))) } else { } }).flatMapValues(skus = skus.split('\t')).saveAsTextFile(/data/vaers/msfx/reac/ + outFile) SUMMARY === when a dataset looks like the following 1,red,blue,green 2,yellow,violet,pink I want to output the following and I am asking how do I do that ? Perhaps my code is 100% wrong. Please correct me and educate me :-) 1,red 1,blue 1,green 2,yellow 2,violet 2,pink - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: FlatMapValues
cool let me adapt that. thanks a tonregardssanjay From: Sean Owen so...@cloudera.com To: Sanjay Subramanian sanjaysubraman...@yahoo.com Cc: user@spark.apache.org user@spark.apache.org Sent: Monday, January 5, 2015 3:19 AM Subject: Re: FlatMapValues For the record, the solution I was suggesting was about like this: inputRDD.flatMap { input = val tokens = input.split(',') val id = tokens(0) val keyValuePairs = tokens.tail.grouped(2) val keys = keyValuePairs.map(_(0)) keys.map(key = (id, key)) } This is much more efficient. On Wed, Dec 31, 2014 at 3:46 PM, Sean Owen so...@cloudera.com wrote: From the clarification below, the problem is that you are calling flatMapValues, which is only available on an RDD of key-value tuples. Your map function returns a tuple in one case but a String in the other, so your RDD is a bunch of Any, which is not at all what you want. You need to return a tuple in both cases, which is what Kapil pointed out. However it's still not quite what you want. Your input is basically [key value1 value2 value3] so you want to flatMap that to (key,value1) (key,value2) (key,value3). flatMapValues does not come into play. On Wed, Dec 31, 2014 at 3:25 PM, Sanjay Subramanian sanjaysubraman...@yahoo.com wrote: My understanding is as follows STEP 1 (This would create a pair RDD) === reacRdd.map(line = line.split(',')).map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9))) } else { } }) STEP 2 === Since previous step created a pair RDD, I thought flatMapValues method will be applicable. But the code does not even compile saying that flatMapValues is not applicable to RDD :-( reacRdd.map(line = line.split(',')).map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9))) } else { } }).flatMapValues(skus = skus.split('\t')).saveAsTextFile(/data/vaers/msfx/reac/ + outFile) SUMMARY === when a dataset looks like the following 1,red,blue,green 2,yellow,violet,pink I want to output the following and I am asking how do I do that ? Perhaps my code is 100% wrong. Please correct me and educate me :-) 1,red 1,blue 1,green 2,yellow 2,violet 2,pink
Re: FlatMapValues
OK this is how I solved it. Not elegant at all but works and I need to move ahead at this time.Converting to pair RDD is now not required. reacRdd.map(line = line.split(',')).map(fields = { if (fields.length = 10 !fields(0).contains(VAERS_ID)) { ((fields(0)+,+fields(1)+\t+fields(0)+,+fields(3)+\t+fields(0)+,+fields(5)+\t+fields(0)+,+fields(7)+\t+fields(0)+,+fields(9))) } else { () } }).flatMap(str = str.split('\t')).filter(line = line.toString.length() 0).saveAsTextFile(/data/vaers/msfx/reac/ + outFile) From: Sanjay Subramanian sanjaysubraman...@yahoo.com.INVALID To: Hitesh Khamesra hiteshk...@gmail.com Cc: Kapil Malik kma...@adobe.com; Sean Owen so...@cloudera.com; user@spark.apache.org user@spark.apache.org Sent: Thursday, January 1, 2015 12:39 PM Subject: Re: FlatMapValues thanks let me try that out From: Hitesh Khamesra hiteshk...@gmail.com To: Sanjay Subramanian sanjaysubraman...@yahoo.com Cc: Kapil Malik kma...@adobe.com; Sean Owen so...@cloudera.com; user@spark.apache.org user@spark.apache.org Sent: Thursday, January 1, 2015 9:46 AM Subject: Re: FlatMapValues How about this..apply flatmap on per line. And in that function, parse each line and return all the colums as per your need. On Wed, Dec 31, 2014 at 10:16 AM, Sanjay Subramanian sanjaysubraman...@yahoo.com.invalid wrote: hey guys Some of u may care :-) but this is just give u a background with where I am going with this. I have an IOS medical side effects app called MedicalSideFx. I built the entire underlying data layer aggregation using hadoop and currently the search is based on lucene. I am re-architecting the data layer by replacing hadoop with Spark and integrating FDA data, Canadian sidefx data and vaccines sidefx data. @Kapil , sorry but flatMapValues is being reported as undefined To give u a complete picture of the code (its inside IntelliJ but thats only for testingthe real code runs on sparkshell on my cluster) https://github.com/sanjaysubramanian/msfx_scala/blob/master/src/main/scala/org/medicalsidefx/common/utils/AersReacColumnExtractor.scala If u were to assume dataset as 025003,Delirium,8.10,Hypokinesia,8.10,Hypotonia,8.10 025005,Arthritis,8.10,Injection site oedema,8.10,Injection site reaction,8.10 This present version of the code, the flatMap works but only gives me values DeliriumHypokinesiaHypotonia ArthritisInjection site oedemaInjection site reaction What I need is 025003,Delirium 025003,Hypokinesia025003,Hypotonia025005,Arthritis 025005,Injection site oedema025005,Injection site reaction thanks sanjay From: Kapil Malik kma...@adobe.com To: Sean Owen so...@cloudera.com; Sanjay Subramanian sanjaysubraman...@yahoo.com Cc: user@spark.apache.org user@spark.apache.org Sent: Wednesday, December 31, 2014 9:35 AM Subject: RE: FlatMapValues Hi Sanjay, Oh yes .. on flatMapValues, it's defined in PairRDDFunctions, and you need to import org.apache.spark.rdd.SparkContext._ to use them (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions ) @Sean, yes indeed flatMap / flatMapValues both can be used. Regards, Kapil -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: 31 December 2014 21:16 To: Sanjay Subramanian Cc: user@spark.apache.org Subject: Re: FlatMapValues From the clarification below, the problem is that you are calling flatMapValues, which is only available on an RDD of key-value tuples. Your map function returns a tuple in one case but a String in the other, so your RDD is a bunch of Any, which is not at all what you want. You need to return a tuple in both cases, which is what Kapil pointed out. However it's still not quite what you want. Your input is basically [key value1 value2 value3] so you want to flatMap that to (key,value1) (key,value2) (key,value3). flatMapValues does not come into play. On Wed, Dec 31, 2014 at 3:25 PM, Sanjay Subramanian sanjaysubraman...@yahoo.com wrote: My understanding is as follows STEP 1 (This would create a pair RDD) === reacRdd.map(line = line.split(',')).map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9))) } else { } }) STEP 2 === Since previous step created a pair RDD, I thought flatMapValues method will be applicable. But the code does not even compile saying that flatMapValues is not applicable to RDD :-( reacRdd.map(line = line.split(',')).map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9))) } else { } }).flatMapValues(skus = skus.split('\t')).saveAsTextFile(/data/vaers/msfx/reac/ + outFile) SUMMARY === when a dataset looks like
Re: FlatMapValues
How about this..apply flatmap on per line. And in that function, parse each line and return all the colums as per your need. On Wed, Dec 31, 2014 at 10:16 AM, Sanjay Subramanian sanjaysubraman...@yahoo.com.invalid wrote: hey guys Some of u may care :-) but this is just give u a background with where I am going with this. I have an IOS medical side effects app called MedicalSideFx. I built the entire underlying data layer aggregation using hadoop and currently the search is based on lucene. I am re-architecting the data layer by replacing hadoop with Spark and integrating FDA data, Canadian sidefx data and vaccines sidefx data. @Kapil , sorry but flatMapValues is being reported as undefined To give u a complete picture of the code (its inside IntelliJ but thats only for testingthe real code runs on sparkshell on my cluster) https://github.com/sanjaysubramanian/msfx_scala/blob/master/src/main/scala/org/medicalsidefx/common/utils/AersReacColumnExtractor.scala If u were to assume dataset as 025003,Delirium,8.10,Hypokinesia,8.10,Hypotonia,8.10 025005,Arthritis,8.10,Injection site oedema,8.10,Injection site reaction,8.10 This present version of the code, the flatMap works but only gives me values Delirium Hypokinesia Hypotonia Arthritis Injection site oedema Injection site reaction What I need is 025003,Delirium 025003,Hypokinesia 025003,Hypotonia 025005,Arthritis 025005,Injection site oedema 025005,Injection site reaction thanks sanjay -- *From:* Kapil Malik kma...@adobe.com *To:* Sean Owen so...@cloudera.com; Sanjay Subramanian sanjaysubraman...@yahoo.com *Cc:* user@spark.apache.org user@spark.apache.org *Sent:* Wednesday, December 31, 2014 9:35 AM *Subject:* RE: FlatMapValues Hi Sanjay, Oh yes .. on flatMapValues, it's defined in PairRDDFunctions, and you need to import org.apache.spark.rdd.SparkContext._ to use them (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions ) @Sean, yes indeed flatMap / flatMapValues both can be used. Regards, Kapil -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: 31 December 2014 21:16 To: Sanjay Subramanian Cc: user@spark.apache.org Subject: Re: FlatMapValues From the clarification below, the problem is that you are calling flatMapValues, which is only available on an RDD of key-value tuples. Your map function returns a tuple in one case but a String in the other, so your RDD is a bunch of Any, which is not at all what you want. You need to return a tuple in both cases, which is what Kapil pointed out. However it's still not quite what you want. Your input is basically [key value1 value2 value3] so you want to flatMap that to (key,value1) (key,value2) (key,value3). flatMapValues does not come into play. On Wed, Dec 31, 2014 at 3:25 PM, Sanjay Subramanian sanjaysubraman...@yahoo.com wrote: My understanding is as follows STEP 1 (This would create a pair RDD) === reacRdd.map(line = line.split(',')).map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9))) } else { } }) STEP 2 === Since previous step created a pair RDD, I thought flatMapValues method will be applicable. But the code does not even compile saying that flatMapValues is not applicable to RDD :-( reacRdd.map(line = line.split(',')).map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9))) } else { } }).flatMapValues(skus = skus.split('\t')).saveAsTextFile(/data/vaers/msfx/reac/ + outFile) SUMMARY === when a dataset looks like the following 1,red,blue,green 2,yellow,violet,pink I want to output the following and I am asking how do I do that ? Perhaps my code is 100% wrong. Please correct me and educate me :-) 1,red 1,blue 1,green 2,yellow 2,violet 2,pink - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: FlatMapValues
thanks let me try that out From: Hitesh Khamesra hiteshk...@gmail.com To: Sanjay Subramanian sanjaysubraman...@yahoo.com Cc: Kapil Malik kma...@adobe.com; Sean Owen so...@cloudera.com; user@spark.apache.org user@spark.apache.org Sent: Thursday, January 1, 2015 9:46 AM Subject: Re: FlatMapValues How about this..apply flatmap on per line. And in that function, parse each line and return all the colums as per your need. On Wed, Dec 31, 2014 at 10:16 AM, Sanjay Subramanian sanjaysubraman...@yahoo.com.invalid wrote: hey guys Some of u may care :-) but this is just give u a background with where I am going with this. I have an IOS medical side effects app called MedicalSideFx. I built the entire underlying data layer aggregation using hadoop and currently the search is based on lucene. I am re-architecting the data layer by replacing hadoop with Spark and integrating FDA data, Canadian sidefx data and vaccines sidefx data. @Kapil , sorry but flatMapValues is being reported as undefined To give u a complete picture of the code (its inside IntelliJ but thats only for testingthe real code runs on sparkshell on my cluster) https://github.com/sanjaysubramanian/msfx_scala/blob/master/src/main/scala/org/medicalsidefx/common/utils/AersReacColumnExtractor.scala If u were to assume dataset as 025003,Delirium,8.10,Hypokinesia,8.10,Hypotonia,8.10 025005,Arthritis,8.10,Injection site oedema,8.10,Injection site reaction,8.10 This present version of the code, the flatMap works but only gives me values DeliriumHypokinesiaHypotonia ArthritisInjection site oedemaInjection site reaction What I need is 025003,Delirium 025003,Hypokinesia025003,Hypotonia025005,Arthritis 025005,Injection site oedema025005,Injection site reaction thanks sanjay From: Kapil Malik kma...@adobe.com To: Sean Owen so...@cloudera.com; Sanjay Subramanian sanjaysubraman...@yahoo.com Cc: user@spark.apache.org user@spark.apache.org Sent: Wednesday, December 31, 2014 9:35 AM Subject: RE: FlatMapValues Hi Sanjay, Oh yes .. on flatMapValues, it's defined in PairRDDFunctions, and you need to import org.apache.spark.rdd.SparkContext._ to use them (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions ) @Sean, yes indeed flatMap / flatMapValues both can be used. Regards, Kapil -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: 31 December 2014 21:16 To: Sanjay Subramanian Cc: user@spark.apache.org Subject: Re: FlatMapValues From the clarification below, the problem is that you are calling flatMapValues, which is only available on an RDD of key-value tuples. Your map function returns a tuple in one case but a String in the other, so your RDD is a bunch of Any, which is not at all what you want. You need to return a tuple in both cases, which is what Kapil pointed out. However it's still not quite what you want. Your input is basically [key value1 value2 value3] so you want to flatMap that to (key,value1) (key,value2) (key,value3). flatMapValues does not come into play. On Wed, Dec 31, 2014 at 3:25 PM, Sanjay Subramanian sanjaysubraman...@yahoo.com wrote: My understanding is as follows STEP 1 (This would create a pair RDD) === reacRdd.map(line = line.split(',')).map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9))) } else { } }) STEP 2 === Since previous step created a pair RDD, I thought flatMapValues method will be applicable. But the code does not even compile saying that flatMapValues is not applicable to RDD :-( reacRdd.map(line = line.split(',')).map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9))) } else { } }).flatMapValues(skus = skus.split('\t')).saveAsTextFile(/data/vaers/msfx/reac/ + outFile) SUMMARY === when a dataset looks like the following 1,red,blue,green 2,yellow,violet,pink I want to output the following and I am asking how do I do that ? Perhaps my code is 100% wrong. Please correct me and educate me :-) 1,red 1,blue 1,green 2,yellow 2,violet 2,pink - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
FlatMapValues
hey guys My dataset is like this 025126,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10 Intended output is ==025126,Chills 025126,Injection site oedema 025126,Injection site reaction 025126,Malaise 025126,Myalgia My code is as follows but the flatMapValues does not work even after I have created the pair RDD.reacRdd.map(line = line.split(',')).map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9))) } else { } }).filter(line = line.toString.length() 0).flatMapValues(skus = skus.split('\t')).saveAsTextFile(/data/vaers/msfx/reac/ + outFile) thanks sanjay
Re: FlatMapValues
Why don't you push \n instead of \t in your first transformation [ (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t +fields(9)))] and then do saveAsTextFile? -Raghavendra On Wed Dec 31 2014 at 1:42:55 PM Sanjay Subramanian sanjaysubraman...@yahoo.com.invalid wrote: hey guys My dataset is like this 025126,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10 Intended output is == 025126,Chills 025126,Injection site oedema 025126,Injection site reaction 025126,Malaise 025126,Myalgia My code is as follows but the flatMapValues does not work even after I have created the pair RDD. reacRdd.map(line = line.split(',')).map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9))) } else { } }).filter(line = line.toString.length() 0).flatMapValues(skus = skus.split('\t')).saveAsTextFile(/data/vaers/msfx/reac/ + outFile) thanks sanjay
RE: FlatMapValues
Hi Sanjay, I tried running your code on spark shell piece by piece – // Setup val line1 = “025126,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10” val line2 = “025127,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10” val lines = Array[String](line1, line2) val r1 = sc.parallelize(lines, 2) // r1 is the original RDD[String] to begin with val r2 = r1.map(line = line.split(',')) // RDD[Array[String]] – so far, so good val r3 = r2.map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9))) // Returns a pair (String, String), good } else { // Returns a String, bad } }) // RDD[Serializable] – PROBLEM I was not even able to apply flatMapValues since the filtered RDD passed to it is RDD[Serializable] and not a pair RDD. I am surprised how your code compiled correctly. The following changes in your snippet make it work as intended - reacRdd.map(line = line.split(',')).map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9))) } else { (,) } }).filter(pair = pair._1.length() 0).flatMapValues(skus = skus.split('\t')).saveAsTextFile(/data/vaers/msfx/reac/ + outFile) Please note that this too saves lines like (025126,Chills), i.e. with opening and closing brackets ( and ). If you want to get rid of them, better do another map operation to map pair to String. Kapil From: Sanjay Subramanian [mailto:sanjaysubraman...@yahoo.com.INVALID] Sent: 31 December 2014 13:42 Cc: user@spark.apache.org Subject: FlatMapValues hey guys My dataset is like this 025126,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10 Intended output is == 025126,Chills 025126,Injection site oedema 025126,Injection site reaction 025126,Malaise 025126,Myalgia My code is as follows but the flatMapValues does not work even after I have created the pair RDD. reacRdd.map(line = line.split(',')).map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9))) } else { } }).filter(line = line.toString.length() 0).flatMapValues(skus = skus.split('\t')).saveAsTextFile(/data/vaers/msfx/reac/ + outFile) thanks sanjay
Re: FlatMapValues
Hi Sanjay, Doing an if inside a Map sounds like a bad idea, it seems like you actually want to filter and then apply map On Wed, Dec 31, 2014 at 9:54 AM, Kapil Malik kma...@adobe.com wrote: Hi Sanjay, I tried running your code on spark shell piece by piece – // Setup val line1 = “025126,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10” val line2 = “025127,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10” val lines = Array[String](line1, line2) val r1 = sc.parallelize(lines, 2) // r1 is the original RDD[String] to begin with val r2 = r1.map(line = line.split(',')) // RDD[Array[String]] – so far, so good val r3 = r2.map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9))) // Returns a pair (String, String), good } else { // Returns a String, bad } }) // RDD[Serializable] – PROBLEM I was not even able to apply flatMapValues since the filtered RDD passed to it is RDD[Serializable] and not a pair RDD. I am surprised how your code compiled correctly. The following changes in your snippet make it work as intended - reacRdd.map(line = line.split(*','*)).map(fields = { *if *(fields.length = 11 !fields(0).contains(*VAERS_ID*)) { (fields(0),(fields(1)+***\t***+fields(3)+***\t***+fields(5)+***\t***+fields(7)+***\t***+fields(9))) } *else *{ (*,)* } }).filter(pair = pair._1.length() 0).flatMapValues(skus = skus.split(*'**\t**'*)).saveAsTextFile(*/data/vaers/msfx/reac/ *+ outFile) Please note that this too saves lines like (025126,Chills), i.e. with opening and closing brackets ( and ). If you want to get rid of them, better do another map operation to map pair to String. Kapil *From:* Sanjay Subramanian [mailto:sanjaysubraman...@yahoo.com.INVALID] *Sent:* 31 December 2014 13:42 *Cc:* user@spark.apache.org *Subject:* FlatMapValues hey guys My dataset is like this 025126,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10 Intended output is == 025126,Chills 025126,Injection site oedema 025126,Injection site reaction 025126,Malaise 025126,Myalgia My code is as follows but the flatMapValues does not work even after I have created the pair RDD. reacRdd.map(line = line.split(*','*)).map(fields = { *if *(fields.length = 11 !fields(0).contains(*VAERS_ID*)) { (fields(0),(fields(1)+***\t***+fields(3)+***\t***+fields(5)+***\t***+fields(7)+***\t***+fields(9))) } *else *{ * *} }).filter(line = line.toString.length() 0).flatMapValues(skus = skus.split(*'**\t**'*)).saveAsTextFile(*/data/vaers/msfx/reac/ *+ outFile) thanks sanjay
Re: FlatMapValues
Hey Kapil, Fernando Thanks for your mail. [1] Fernando, if I don't use an if logic inside the map then if I have lines of input data that have less fields than I am expecting I get ArrayOutOfBounds exception. so the if is to safeguard against that. [2] Kapil, I am sorry I did not clarify. Yes my code DID NOT compile saying that flatMapValues is not defined. In fact when I used your snippet , the code still does not compile Error:(36, 57) value flatMapValues is not a member of org.apache.spark.rdd.RDD[(String, String)] }).filter(pair = pair._1.length() 0).flatMapValues(skus = skus.split('\t')).saveAsTextFile(/data/vaers/msfx/reac/ + outFile) ^ My pom.xml looks like this dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.2.0/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-sql_2.10/artifactId version1.2.0/version /dependency [3] To summarize all I want is to convert SUMMARY===when a dataset looks like the following 1,red,blue,green2,yellow,violet,pink I want to output the following and currently not able to 1,red1,blue1,green2,yellow2,violet2,pink thanks regards sanjay From: Fernando O. fot...@gmail.com To: Kapil Malik kma...@adobe.com Cc: Sanjay Subramanian sanjaysubraman...@yahoo.com; user@spark.apache.org user@spark.apache.org Sent: Wednesday, December 31, 2014 6:06 AM Subject: Re: FlatMapValues Hi Sanjay, Doing an if inside a Map sounds like a bad idea, it seems like you actually want to filter and then apply map On Wed, Dec 31, 2014 at 9:54 AM, Kapil Malik kma...@adobe.com wrote: Hi Sanjay, I tried running your code on spark shell piece by piece – // Setupval line1 = “025126,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10”val line2 = “025127,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10”val lines = Array[String](line1, line2) val r1 = sc.parallelize(lines, 2)// r1 is the original RDD[String] to begin with val r2 = r1.map(line = line.split(','))// RDD[Array[String]] – so far, so goodval r3 = r2.map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9)))// Returns a pair (String, String), good } else { // Returns a String, bad } })// RDD[Serializable] – PROBLEM I was not even able to apply flatMapValues since the filtered RDD passed to it is RDD[Serializable] and not a pair RDD. I am surprised how your code compiled correctly. The following changes in your snippet make it work as intended - reacRdd.map(line = line.split(',')).map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9))) } else { (,) } }).filter(pair = pair._1.length() 0).flatMapValues(skus = skus.split('\t')).saveAsTextFile(/data/vaers/msfx/reac/ + outFile) Please note that this too saves lines like (025126,Chills),i.e. with opening and closing brackets ( and ). If you want to get rid of them, better do another map operation to map pair to String. Kapil From: Sanjay Subramanian [mailto:sanjaysubraman...@yahoo.com.INVALID] Sent: 31 December 2014 13:42 Cc: user@spark.apache.org Subject: FlatMapValues hey guys My dataset is like this 025126,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10 Intended output is ==025126,Chills025126,Injection site oedema025126,Injection site reaction025126,Malaise025126,Myalgia My code is as follows but the flatMapValues does not work even after I have created the pair RDD.reacRdd.map(line = line.split(',')).map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9))) } else { } }).filter(line = line.toString.length() 0).flatMapValues(skus = skus.split('\t')).saveAsTextFile(/data/vaers/msfx/reac/ + outFile) thanks sanjay
RE: FlatMapValues
Hi Sanjay, Oh yes .. on flatMapValues, it's defined in PairRDDFunctions, and you need to import org.apache.spark.rdd.SparkContext._ to use them (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions ) @Sean, yes indeed flatMap / flatMapValues both can be used. Regards, Kapil -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: 31 December 2014 21:16 To: Sanjay Subramanian Cc: user@spark.apache.org Subject: Re: FlatMapValues From the clarification below, the problem is that you are calling flatMapValues, which is only available on an RDD of key-value tuples. Your map function returns a tuple in one case but a String in the other, so your RDD is a bunch of Any, which is not at all what you want. You need to return a tuple in both cases, which is what Kapil pointed out. However it's still not quite what you want. Your input is basically [key value1 value2 value3] so you want to flatMap that to (key,value1) (key,value2) (key,value3). flatMapValues does not come into play. On Wed, Dec 31, 2014 at 3:25 PM, Sanjay Subramanian sanjaysubraman...@yahoo.com wrote: My understanding is as follows STEP 1 (This would create a pair RDD) === reacRdd.map(line = line.split(',')).map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9))) } else { } }) STEP 2 === Since previous step created a pair RDD, I thought flatMapValues method will be applicable. But the code does not even compile saying that flatMapValues is not applicable to RDD :-( reacRdd.map(line = line.split(',')).map(fields = { if (fields.length = 11 !fields(0).contains(VAERS_ID)) { (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9))) } else { } }).flatMapValues(skus = skus.split('\t')).saveAsTextFile(/data/vaers/msfx/reac/ + outFile) SUMMARY === when a dataset looks like the following 1,red,blue,green 2,yellow,violet,pink I want to output the following and I am asking how do I do that ? Perhaps my code is 100% wrong. Please correct me and educate me :-) 1,red 1,blue 1,green 2,yellow 2,violet 2,pink - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Type problem in Java when using flatMapValues
Damn, you're right, I wasn't looking at it properly. I was confused by intelliJ I guess. Many thanks! On 2014-10-02 19:02, Sean Owen wrote: Eh, is it not that you are mapping the values of an RDD whose keys are StringStrings, but expecting the keys are Strings? That's also about what the compiler is saying too. On Thu, Oct 2, 2014 at 4:15 PM, Robin Keunen robin.keu...@lampiris.be wrote: Hi all, I successfully implemented my algorithm in Scala but my team wants it in Java. I have a problem with Generics, can anyone help me? I have a first JavaPairRDD with a structure like ((ean, key), [from, to, value]) ean and key are string from and to are DateTime value is a Double JavaPairRDDStringString, ListSerializable eanKeyTsParameters = javaRDD.mapToPair( ... ); Then I try to do flatMapValues to apply the GenerateTimeSeries Function, it takes the from, to and values to generate a ListLongDouble. Here is the error I get when compiling: error: incompatible types: no instance(s) of type variable(s) U exist so that JavaPairRDDStringString,U conforms to JavaPairRDDString,LongDouble Here is what IntelliJ tells me: flatMapValues( FunctionListSerializable, IterableU ) in JavaPairRDD cannot be applied to Transformations.GenerateTimeSeries Here is the problematic transformation: JavaPairRDDString, LongDouble keyLongDouble = eanKeyTsParameters.flatMapValues(new Transformations.GenerateTimeSeries()); And here is the Function: import org.apache.spark.api.java.function.Function; [...] public class Transformations { public static class GenerateTimeSeries implements FunctionListSerializable, IterableLongDouble { @Override public IterableLongDouble call(ListSerializable args) { DateTime start = (DateTime) args.get(0); DateTime end = (DateTime) args.get(1); Double value = (Double) args.get(2); int granularity = 24*60*60*1000; // 1 day return AggregationUtils.createTimeSeries(start, end, value, granularity); } } } Any idea? Thanks -- Robin Keunen Software Engineer robin.keu...@lampiris.be www.lampiris.be -- Robin Keunen Software Engineer robin.keu...@lampiris.be www.lampiris.be - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Type problem in Java when using flatMapValues
Hi all, I successfully implemented my algorithm in Scala but my team wants it in Java. I have a problem with Generics, can anyone help me? I have a first JavaPairRDD with a structure like ((ean, key), [from, to, value]) * ean and key are string * from and to are DateTime * value is a Double JavaPairRDDStringString, ListSerializable eanKeyTsParameters = javaRDD.mapToPair( ... ); Then I try to do flatMapValues to apply the GenerateTimeSeries Function, it takes the /from, to /and /values/ to generate a ListLongDouble. Here is the error I get when compiling: error: incompatible types: no instance(s) of type variable(s) U exist so that JavaPairRDDStringString,U conforms to JavaPairRDDString,LongDouble Here is what IntelliJ tells me: flatMapValues( FunctionListSerializable, IterableU ) in JavaPairRDD cannot be applied to Transformations.GenerateTimeSeries Here is the problematic transformation: JavaPairRDDString, LongDouble keyLongDouble = eanKeyTsParameters.flatMapValues(new Transformations.GenerateTimeSeries()); And here is the Function: import org.apache.spark.api.java.function.Function; [...] public class Transformations { public static class GenerateTimeSeries implements FunctionListSerializable, IterableLongDouble { @Override public IterableLongDouble call(ListSerializable args) { DateTime start = (DateTime) args.get(0); DateTime end = (DateTime) args.get(1); Double value = (Double) args.get(2); int granularity = 24*60*60*1000; // 1 day return AggregationUtils.createTimeSeries(start, end, value, granularity); } } } Any idea? Thanks -- Robin Keunen Software Engineer robin.keu...@lampiris.be www.lampiris.be