Hi Sanjay,

Doing an if inside a Map sounds like a bad idea, it seems like you actually
want to filter and then apply map

On Wed, Dec 31, 2014 at 9:54 AM, Kapil Malik <kma...@adobe.com> wrote:

>  Hi Sanjay,
>
>
>
> I tried running your code on spark shell piece by piece –
>
>
>
> // Setup
>
> val line1 = “025126,Chills,8.10,Injection site oedema,8.10,Injection site
> reaction,8.10,Malaise,8.10,Myalgia,8.10”
>
> val line2 = “025127,Chills,8.10,Injection site oedema,8.10,Injection site
> reaction,8.10,Malaise,8.10,Myalgia,8.10”
>
> val lines = Array[String](line1, line2)
>
>
>
> val r1 = sc.parallelize(lines, 2) // r1 is the original RDD[String] to
> begin with
>
>
>
> val r2 = r1.map(line => line.split(',')) // RDD[Array[String]] – so far,
> so good
>
> val r3 = r2.map(fields => {
>
>   if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
>
>
> (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
> // Returns a pair (String, String), good
>
>   }
>
>   else {
>
>     "" // Returns a String, bad
>
>   }
>
>   }) // RDD[Serializable] – PROBLEM
>
>
>
> I was not even able to apply flatMapValues since the filtered RDD passed
> to it is RDD[Serializable] and not a pair RDD. I am surprised how your code
> compiled correctly.
>
>
>
>
>
> The following changes in your snippet make it work as intended -
>
>
>
> reacRdd.map(line => line.split(*','*)).map(fields => {
>   *if *(fields.length >= 11 && !fields(0).contains(*"VAERS_ID"*)) {
>     
> (fields(0),(fields(1)+*"**\t**"*+fields(3)+*"**\t**"*+fields(5)+*"**\t**"*+fields(7)+*"**\t**"*+fields(9)))
>   }
>   *else *{
>     (*"","")*
>   }
>   }).filter(pair => pair._1.length() > 0).flatMapValues(skus => 
> skus.split(*'**\t**'*)).saveAsTextFile(*"/data/vaers/msfx/reac/" *+ outFile)
>
>
>
> Please note that this too saves lines like (025126,Chills), i.e. with
> opening and closing brackets ( and ). If you want to get rid of them,
> better do another map operation to map pair to String.
>
>
>
> Kapil
>
>
>
> *From:* Sanjay Subramanian [mailto:sanjaysubraman...@yahoo.com.INVALID]
> *Sent:* 31 December 2014 13:42
> *Cc:* user@spark.apache.org
> *Subject:* FlatMapValues
>
>
>
> hey guys
>
>
>
> My dataset is like this
>
>
>
> 025126,Chills,8.10,Injection site oedema,8.10,Injection site
> reaction,8.10,Malaise,8.10,Myalgia,8.10
>
>
>
> Intended output is
>
> ==================
>
> 025126,Chills
>
> 025126,Injection site oedema
>
> 025126,Injection site reaction
>
> 025126,Malaise
>
> 025126,Myalgia
>
>
>
> My code is as follows but the flatMapValues does not work even after I have 
> created the pair RDD.
>
> ************************************************************************
>
> reacRdd.map(line => line.split(*','*)).map(fields => {
>   *if *(fields.length >= 11 && !fields(0).contains(*"VAERS_ID"*)) {
>     
> (fields(0),(fields(1)+*"**\t**"*+fields(3)+*"**\t**"*+fields(5)+*"**\t**"*+fields(7)+*"**\t**"*+fields(9)))
>   }
>   *else *{
>
> *""  *}
>   }).filter(line => line.toString.length() > 0).flatMapValues(skus => 
> skus.split(*'**\t**'*)).saveAsTextFile(*"/data/vaers/msfx/reac/" *+ outFile)
>
> ************************************************************************
>
>
>
> thanks
>
>
>
> sanjay
>

Reply via email to