Surround it with a try..catch where its complaining about the null pointer to avoid the job being failed. What is happening here is like you are returning null and the following operation is working on null which causes the job to fail.
Thanks Best Regards On Mon, Apr 11, 2016 at 12:51 PM, saurabh guru <saurabh.g...@gmail.com> wrote: > Trying to run following causes a NullPointer Exception. While I thought > Spark should have been able to handle Null, apparently it is not able to. > What could I return in place of null? What other ways could I approach > this?? There are at times, I would want to just skip parsing and proceed to > next record, how do I handle that? > > > filtered.mapToPair(new Parse(imprDetailMarshal)).reduceByKey(new > TimeAndCount()).foreachRDD(new > ImpressionDetailLogPub(indexNamePrefix,indexType, imprDetailMarshal)); > } > > > > * Where Parse is:* > > private static class Parse implements PairFunction<String, > ImpressionDetailRecord, TimeNcount> { > > private static final long serialVersionUID = -5060508551208900848L; > private static final DateTimeFormatter FORMATTER = > DateTimeFormat.forPattern("dd/MMM/yyyy:HH:mm:ss ZZ"); > private final ImprDetailMarshal imprDetailMarshal; > > public Parse(ImprDetailMarshal imprDetailMarshal){ > this.imprDetailMarshal = imprDetailMarshal; > } > > @Override > public Tuple2<ImpressionDetailRecord, TimeNcount> call(String > arg0) throws Exception { > > ImpressionDetailRecordHolder recordHolder = > imprDetailMarshal.parse(arg0); > if(recordHolder != null) > { > return new Tuple2<ImpressionDetailRecord, > TimeNcount>(recordHolder.getImpressionDetailRecord(),recordHolder.getTimeCount()); > } > return null; > } > } > > java.lang.NullPointerException > at > org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192) > at > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > > > > -- > Thanks, > Saurabh > > :) >