Hi Sanjay,

I tried running your code on spark shell piece by piece –

// Setup
val line1 = “025126,Chills,8.10,Injection site oedema,8.10,Injection site 
reaction,8.10,Malaise,8.10,Myalgia,8.10”
val line2 = “025127,Chills,8.10,Injection site oedema,8.10,Injection site 
reaction,8.10,Malaise,8.10,Myalgia,8.10”
val lines = Array[String](line1, line2)

val r1 = sc.parallelize(lines, 2) // r1 is the original RDD[String] to begin 
with

val r2 = r1.map(line => line.split(',')) // RDD[Array[String]] – so far, so good
val r3 = r2.map(fields => {
  if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
    
(fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
 // Returns a pair (String, String), good
  }
  else {
    "" // Returns a String, bad
  }
  }) // RDD[Serializable] – PROBLEM

I was not even able to apply flatMapValues since the filtered RDD passed to it 
is RDD[Serializable] and not a pair RDD. I am surprised how your code compiled 
correctly.


The following changes in your snippet make it work as intended -


reacRdd.map(line => line.split(',')).map(fields => {
  if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
    
(fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
  }
  else {
    ("","")
  }
  }).filter(pair => pair._1.length() > 0).flatMapValues(skus => 
skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile)

Please note that this too saves lines like (025126,Chills), i.e. with opening 
and closing brackets ( and ). If you want to get rid of them, better do another 
map operation to map pair to String.

Kapil

From: Sanjay Subramanian [mailto:sanjaysubraman...@yahoo.com.INVALID]
Sent: 31 December 2014 13:42
Cc: user@spark.apache.org
Subject: FlatMapValues

hey guys

My dataset is like this

025126,Chills,8.10,Injection site oedema,8.10,Injection site 
reaction,8.10,Malaise,8.10,Myalgia,8.10

Intended output is
==================
025126,Chills
025126,Injection site oedema
025126,Injection site reaction
025126,Malaise
025126,Myalgia


My code is as follows but the flatMapValues does not work even after I have 
created the pair RDD.

************************************************************************

reacRdd.map(line => line.split(',')).map(fields => {
  if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
    
(fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
  }
  else {
    ""
  }
  }).filter(line => line.toString.length() > 0).flatMapValues(skus => 
skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile)

************************************************************************

thanks

sanjay

Reply via email to