Hi Sanjay, I tried running your code on spark shell piece by piece –
// Setup val line1 = “025126,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10” val line2 = “025127,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10” val lines = Array[String](line1, line2) val r1 = sc.parallelize(lines, 2) // r1 is the original RDD[String] to begin with val r2 = r1.map(line => line.split(',')) // RDD[Array[String]] – so far, so good val r3 = r2.map(fields => { if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9))) // Returns a pair (String, String), good } else { "" // Returns a String, bad } }) // RDD[Serializable] – PROBLEM I was not even able to apply flatMapValues since the filtered RDD passed to it is RDD[Serializable] and not a pair RDD. I am surprised how your code compiled correctly. The following changes in your snippet make it work as intended - reacRdd.map(line => line.split(',')).map(fields => { if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9))) } else { ("","") } }).filter(pair => pair._1.length() > 0).flatMapValues(skus => skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile) Please note that this too saves lines like (025126,Chills), i.e. with opening and closing brackets ( and ). If you want to get rid of them, better do another map operation to map pair to String. Kapil From: Sanjay Subramanian [mailto:sanjaysubraman...@yahoo.com.INVALID] Sent: 31 December 2014 13:42 Cc: user@spark.apache.org Subject: FlatMapValues hey guys My dataset is like this 025126,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10 Intended output is ================== 025126,Chills 025126,Injection site oedema 025126,Injection site reaction 025126,Malaise 025126,Myalgia My code is as follows but the flatMapValues does not work even after I have created the pair RDD. ************************************************************************ reacRdd.map(line => line.split(',')).map(fields => { if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9))) } else { "" } }).filter(line => line.toString.length() > 0).flatMapValues(skus => skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile) ************************************************************************ thanks sanjay