I don't believe that works since your map function does not return a
value for lines shorter than 13 tokens. You should use flatMap and
Some/None. (You probably want to not parse the string 5 times too.)

val demoRddFilterMap = demoRddFilter.flatMap { line =>
  val tokens = line.split('$')
  if (tokens.length >= 13) {
    val parsed = tokens(0) + "~" + tokens(5) + "~" + tokens(11) + "~"
+ tokens(12)
    Some(parsed)
  } else {
    None
  }
}

On Wed, Dec 24, 2014 at 4:35 PM, Sanjay Subramanian
<sanjaysubraman...@yahoo.com.invalid> wrote:
> DOH Looks like I did not have enough coffee before I asked this :-)
> I added the if statement...
>
> var demoRddFilter = demoRdd.filter(line =>
> !line.contains("ISR$CASE$I_F_COD$FOLL_SEQ") ||
> !line.contains("primaryid$caseid$caseversion"))
> var demoRddFilterMap = demoRddFilter.map(line => {
>   if (line.split('$').length >= 13){
>     line.split('$')(0) + "~" + line.split('$')(5) + "~" +
> line.split('$')(11) + "~" + line.split('$')(12)
>   }
> })
>
>
> ________________________________
> From: Sanjay Subramanian <sanjaysubraman...@yahoo.com.INVALID>
> To: "user@spark.apache.org" <user@spark.apache.org>
> Sent: Wednesday, December 24, 2014 8:28 AM
> Subject: How to identify erroneous input record ?
>
> hey guys
>
> One of my input records has an problem that makes the code fail.
>
> var demoRddFilter = demoRdd.filter(line =>
> !line.contains("ISR$CASE$I_F_COD$FOLL_SEQ") ||
> !line.contains("primaryid$caseid$caseversion"))
>
> var demoRddFilterMap = demoRddFilter.map(line => line.split('$')(0) + "~" +
> line.split('$')(5) + "~" + line.split('$')(11) + "~" + line.split('$')(12))
>
> demoRddFilterMap.saveAsTextFile("/data/aers/msfx/demo/" + outFile)
>
>
> This is possibly happening because perhaps one input record may not have 13
> fields.
>
> If this were Hadoop mapper code , I have 2 ways to solve this
>
> 1. test the number of fields of each line before applying the map function
>
> 2. enclose the mapping function in a try catch block so that the mapping
> function only fails for the erroneous record
>
> How do I implement 1. or 2. in the Spark code ?
>
> Thanks
>
>
> sanjay
>
>
>
>
>
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to