Hey Kapil, Fernando Thanks for your mail. [1] Fernando, if I don't use an "if" logic inside the "map" then if I have lines of input data that have less fields than I am expecting I get ArrayOutOfBounds exception. so the "if" is to safeguard against that. [2] Kapil, I am sorry I did not clarify. Yes my code "DID NOT" compile saying that flatMapValues is not defined. In fact when I used your snippet , the code still does not compile Error:(36, 57) value flatMapValues is not a member of org.apache.spark.rdd.RDD[(String, String)] }).filter(pair => pair._1.length() > 0).flatMapValues(skus => skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile) ^
My pom.xml looks like this <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.2.0</version> </dependency> [3] To summarize all I want is to convert SUMMARY=======when a dataset looks like the following 1,red,blue,green2,yellow,violet,pink I want to output the following and currently not able to 1,red1,blue1,green2,yellow2,violet2,pink thanks regards sanjay From: Fernando O. <fot...@gmail.com> To: Kapil Malik <kma...@adobe.com> Cc: Sanjay Subramanian <sanjaysubraman...@yahoo.com>; "user@spark.apache.org" <user@spark.apache.org> Sent: Wednesday, December 31, 2014 6:06 AM Subject: Re: FlatMapValues Hi Sanjay, Doing an if inside a Map sounds like a bad idea, it seems like you actually want to filter and then apply map On Wed, Dec 31, 2014 at 9:54 AM, Kapil Malik <kma...@adobe.com> wrote: Hi Sanjay, I tried running your code on spark shell piece by piece – // Setupval line1 = “025126,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10”val line2 = “025127,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10”val lines = Array[String](line1, line2) val r1 = sc.parallelize(lines, 2)// r1 is the original RDD[String] to begin with val r2 = r1.map(line => line.split(','))// RDD[Array[String]] – so far, so goodval r3 = r2.map(fields => { if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))// Returns a pair (String, String), good } else { ""// Returns a String, bad } })// RDD[Serializable] – PROBLEM I was not even able to apply flatMapValues since the filtered RDD passed to it is RDD[Serializable] and not a pair RDD. I am surprised how your code compiled correctly. The following changes in your snippet make it work as intended - reacRdd.map(line => line.split(',')).map(fields => { if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9))) } else { ("","") } }).filter(pair => pair._1.length() > 0).flatMapValues(skus => skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile) Please note that this too saves lines like (025126,Chills),i.e. with opening and closing brackets ( and ). If you want to get rid of them, better do another map operation to map pair to String. Kapil From: Sanjay Subramanian [mailto:sanjaysubraman...@yahoo.com.INVALID] Sent: 31 December 2014 13:42 Cc: user@spark.apache.org Subject: FlatMapValues hey guys My dataset is like this 025126,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10 Intended output is ==================025126,Chills025126,Injection site oedema025126,Injection site reaction025126,Malaise025126,Myalgia My code is as follows but the flatMapValues does not work even after I have created the pair RDD.************************************************************************reacRdd.map(line => line.split(',')).map(fields => { if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9))) } else { "" } }).filter(line => line.toString.length() > 0).flatMapValues(skus => skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile)************************************************************************ thanks sanjay