Hi Sanjay, Doing an if inside a Map sounds like a bad idea, it seems like you actually want to filter and then apply map
On Wed, Dec 31, 2014 at 9:54 AM, Kapil Malik <kma...@adobe.com> wrote: > Hi Sanjay, > > > > I tried running your code on spark shell piece by piece – > > > > // Setup > > val line1 = “025126,Chills,8.10,Injection site oedema,8.10,Injection site > reaction,8.10,Malaise,8.10,Myalgia,8.10” > > val line2 = “025127,Chills,8.10,Injection site oedema,8.10,Injection site > reaction,8.10,Malaise,8.10,Myalgia,8.10” > > val lines = Array[String](line1, line2) > > > > val r1 = sc.parallelize(lines, 2) // r1 is the original RDD[String] to > begin with > > > > val r2 = r1.map(line => line.split(',')) // RDD[Array[String]] – so far, > so good > > val r3 = r2.map(fields => { > > if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { > > > (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9))) > // Returns a pair (String, String), good > > } > > else { > > "" // Returns a String, bad > > } > > }) // RDD[Serializable] – PROBLEM > > > > I was not even able to apply flatMapValues since the filtered RDD passed > to it is RDD[Serializable] and not a pair RDD. I am surprised how your code > compiled correctly. > > > > > > The following changes in your snippet make it work as intended - > > > > reacRdd.map(line => line.split(*','*)).map(fields => { > *if *(fields.length >= 11 && !fields(0).contains(*"VAERS_ID"*)) { > > (fields(0),(fields(1)+*"**\t**"*+fields(3)+*"**\t**"*+fields(5)+*"**\t**"*+fields(7)+*"**\t**"*+fields(9))) > } > *else *{ > (*"","")* > } > }).filter(pair => pair._1.length() > 0).flatMapValues(skus => > skus.split(*'**\t**'*)).saveAsTextFile(*"/data/vaers/msfx/reac/" *+ outFile) > > > > Please note that this too saves lines like (025126,Chills), i.e. with > opening and closing brackets ( and ). If you want to get rid of them, > better do another map operation to map pair to String. > > > > Kapil > > > > *From:* Sanjay Subramanian [mailto:sanjaysubraman...@yahoo.com.INVALID] > *Sent:* 31 December 2014 13:42 > *Cc:* user@spark.apache.org > *Subject:* FlatMapValues > > > > hey guys > > > > My dataset is like this > > > > 025126,Chills,8.10,Injection site oedema,8.10,Injection site > reaction,8.10,Malaise,8.10,Myalgia,8.10 > > > > Intended output is > > ================== > > 025126,Chills > > 025126,Injection site oedema > > 025126,Injection site reaction > > 025126,Malaise > > 025126,Myalgia > > > > My code is as follows but the flatMapValues does not work even after I have > created the pair RDD. > > ************************************************************************ > > reacRdd.map(line => line.split(*','*)).map(fields => { > *if *(fields.length >= 11 && !fields(0).contains(*"VAERS_ID"*)) { > > (fields(0),(fields(1)+*"**\t**"*+fields(3)+*"**\t**"*+fields(5)+*"**\t**"*+fields(7)+*"**\t**"*+fields(9))) > } > *else *{ > > *"" *} > }).filter(line => line.toString.length() > 0).flatMapValues(skus => > skus.split(*'**\t**'*)).saveAsTextFile(*"/data/vaers/msfx/reac/" *+ outFile) > > ************************************************************************ > > > > thanks > > > > sanjay >