Spark not handling Null

2016-04-11 Thread saurabh guru
Trying to run following causes a NullPointer Exception. While I thought
Spark should have been able to handle Null, apparently it is not able to.
What could I return in place of null? What other ways could I approach
this?? There are at times, I would want to just skip parsing and proceed to
next record, how do I handle that?


filtered.mapToPair(new Parse(imprDetailMarshal)).reduceByKey(new
TimeAndCount()).foreachRDD(new
ImpressionDetailLogPub(indexNamePrefix,indexType, imprDetailMarshal));
}



*   Where Parse is:*

private static class Parse implements PairFunction {

private static final long serialVersionUID = -5060508551208900848L;
private static final DateTimeFormatter FORMATTER =
DateTimeFormat.forPattern("dd/MMM/:HH:mm:ss ZZ");
private final ImprDetailMarshal imprDetailMarshal;

public Parse(ImprDetailMarshal imprDetailMarshal){
this.imprDetailMarshal = imprDetailMarshal;
}

@Override
public Tuple2 call(String arg0)
throws Exception {

  ImpressionDetailRecordHolder recordHolder =
imprDetailMarshal.parse(arg0);
if(recordHolder != null)
{
return new Tuple2(recordHolder.getImpressionDetailRecord(),recordHolder.getTimeCount());
}
   return null;
}
}

​java.lang.NullPointerException
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)​



-- 
Thanks,
Saurabh

:)


Re: Spark not handling Null

2016-04-11 Thread Akhil Das
Surround it with a try..catch where its complaining about the null pointer
to avoid the job being failed. What is happening here is like you are
returning null and the following operation is working on null which causes
the job to fail.

Thanks
Best Regards

On Mon, Apr 11, 2016 at 12:51 PM, saurabh guru 
wrote:

> Trying to run following causes a NullPointer Exception. While I thought
> Spark should have been able to handle Null, apparently it is not able to.
> What could I return in place of null? What other ways could I approach
> this?? There are at times, I would want to just skip parsing and proceed to
> next record, how do I handle that?
>
>
> filtered.mapToPair(new Parse(imprDetailMarshal)).reduceByKey(new
> TimeAndCount()).foreachRDD(new
> ImpressionDetailLogPub(indexNamePrefix,indexType, imprDetailMarshal));
> }
>
>
>
> *   Where Parse is:*
>
> private static class Parse implements PairFunction ImpressionDetailRecord, TimeNcount> {
>
> private static final long serialVersionUID = -5060508551208900848L;
> private static final DateTimeFormatter FORMATTER =
> DateTimeFormat.forPattern("dd/MMM/:HH:mm:ss ZZ");
> private final ImprDetailMarshal imprDetailMarshal;
>
> public Parse(ImprDetailMarshal imprDetailMarshal){
> this.imprDetailMarshal = imprDetailMarshal;
> }
>
> @Override
> public Tuple2 call(String
> arg0) throws Exception {
>
>   ImpressionDetailRecordHolder recordHolder =
> imprDetailMarshal.parse(arg0);
> if(recordHolder != null)
> {
> return new Tuple2 TimeNcount>(recordHolder.getImpressionDetailRecord(),recordHolder.getTimeCount());
> }
>return null;
> }
> }
>
> ​java.lang.NullPointerException
> at
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
> at
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)​
>
>
>
> --
> Thanks,
> Saurabh
>
> :)
>