Thanks Sean. that works and I started the join of this mappedRDD to another one 
I have.I have to internalize the use of Map versus FlatMap. Thinking Map Reduce 
Java Hadoop code often blinds me :-) 
      From: Sean Owen <so...@cloudera.com>
 To: Sanjay Subramanian <sanjaysubraman...@yahoo.com> 
Cc: Cheng Lian <lian.cs....@gmail.com>; Jorge Lopez-Malla 
<jlop...@stratio.com>; "user@spark.apache.org" <user@spark.apache.org> 
 Sent: Wednesday, January 28, 2015 11:44 AM
 Subject: Re: MappedRDD signature
   
I think it's clear if you format your function reasonably:

mjpJobOrderRDD.map(line => {
  val tokens = line.split("\t");
  if (tokens.length == 164 && tokens(23) != null) {
    (tokens(23),tokens(7))
  }
})

In some cases the function returns nothing, in some cases a tuple. The
return type is therefore Any. If you just mean to output a result in
some cases and not others, you must use flatMap + Some + None:

mjpJobOrderRDD.flatMap { line =>
  val tokens = line.split("\t")
  if (tokens.length == 164 && tokens(23) != null) {
    Some((tokens(23),tokens(7)))
  } else {
    None
  }
}



On Wed, Jan 28, 2015 at 7:37 PM, Sanjay Subramanian
<sanjaysubraman...@yahoo.com.invalid> wrote:
> hey guys
>
> I am not following why this happens
>
> DATASET
> =======
> Tab separated values (164 columns)
>
> Spark command 1
> ================
> val mjpJobOrderRDD = sc.textFile("/data/cdr/cdr_mjp_joborder_raw")
> val mjpJobOrderColsPairedRDD = mjpJobOrderRDD.map(line => { val tokens =
> line.split("\t");(tokens(23),tokens(7))})
> mjpJobOrderColsPairedRDD: org.apache.spark.rdd.RDD[(String, String)] =
> MappedRDD[18] at map at <console>:14
>
>
> Spark command 2
> ================
> val mjpJobOrderRDD = sc.textFile("/data/cdr/cdr_mjp_joborder_raw")
> scala> val mjpJobOrderColsPairedRDD = mjpJobOrderRDD.map(line => { val
> tokens = line.split("\t"); if (tokens.length == 164 && tokens(23) != null)
> {(tokens(23),tokens(7))} })
> mjpJobOrderColsPairedRDD: org.apache.spark.rdd.RDD[Any] = MappedRDD[19] at
> map at <console>:14
>
>
> In the second case above , why does it say org.apache.spark.rdd.RDD[Any] and
> not org.apache.spark.rdd.RDD[(String, String)]
>
>
> thanks
>
> sanjay
>


  

Reply via email to