Surround it with a try..catch where its complaining about the null pointer
to avoid the job being failed. What is happening here is like you are
returning null and the following operation is working on null which causes
the job to fail.

Thanks
Best Regards

On Mon, Apr 11, 2016 at 12:51 PM, saurabh guru <saurabh.g...@gmail.com>
wrote:

> Trying to run following causes a NullPointer Exception. While I thought
> Spark should have been able to handle Null, apparently it is not able to.
> What could I return in place of null? What other ways could I approach
> this?? There are at times, I would want to just skip parsing and proceed to
> next record, how do I handle that?
>
>
> filtered.mapToPair(new Parse(imprDetailMarshal)).reduceByKey(new
> TimeAndCount()).foreachRDD(new
> ImpressionDetailLogPub(indexNamePrefix,indexType, imprDetailMarshal));
>     }
>
>
>
> *   Where Parse is:*
>
>     private static class Parse implements PairFunction<String,
> ImpressionDetailRecord, TimeNcount> {
>
>         private static final long serialVersionUID = -5060508551208900848L;
>         private static final DateTimeFormatter FORMATTER =
> DateTimeFormat.forPattern("dd/MMM/yyyy:HH:mm:ss ZZ");
>         private final ImprDetailMarshal imprDetailMarshal;
>
>         public Parse(ImprDetailMarshal imprDetailMarshal){
>         this.imprDetailMarshal = imprDetailMarshal;
>         }
>
>         @Override
>         public Tuple2<ImpressionDetailRecord, TimeNcount> call(String
> arg0) throws Exception {
>
>           ImpressionDetailRecordHolder recordHolder =
> imprDetailMarshal.parse(arg0);
>             if(recordHolder != null)
>             {
>             return new Tuple2<ImpressionDetailRecord,
> TimeNcount>(recordHolder.getImpressionDetailRecord(),recordHolder.getTimeCount());
>             }
>            return null;
>         }
>     }
>
> ​java.lang.NullPointerException
>         at
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
>         at
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)​
>
>
>
> --
> Thanks,
> Saurabh
>
> :)
>

Reply via email to