Is it possible to write a short test which exhibits this problem ? For Spark 2.0, this part of code has changed:
[SPARK-4819] Remove Guava's "Optional" from public API FYI On Fri, May 6, 2016 at 6:57 AM, Adam Westerman <aswes...@gmail.com> wrote: > Hi, > > I’m attempting to do a left outer join in Spark, and I’m getting an NPE > that appears to be due to some Spark Java API bug. (I’m running Spark 1.6.0 > in local mode on a Mac). > > For a little background, the left outer join returns all keys from the > left side of the join regardless of whether or not the key is present on > the right side. To handle this uncertainty, the value from the right side > is wrapped in Guava’s Optional class. The Optional class has a method to > check whether the value is present or not (which would indicate the key > appeared in both RDDs being joined). If the key was indeed present in both > RDDs you can then retrieve the value and move forward. > > After doing a little digging, I found that Spark is using Scala’s Option > functionality internally. This is the same concept as the Guava Optional, > only native to Scala. It appears that during the conversion from a Scala > Option back to a Guava Optional (this method can be found here: > https://github.com/apache/spark/blob/v1.6.0/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala#L28) > the > conversion method is erroneously passed a Scala Option with the String > value “None” instead of Scala’s null value None. This is matched to the > first *case*, which causes Guava’s Optional.of method to attempt to pull > the value out. A NPE is thrown since it wasn’t ever actually there. > > The code basically looks like this, where the classes used are just plain > Java objects with some class attributes inside: > // First RDD > JavaPairRDD<GroupItemNode, WeekItemComposite> rdd1 > // Second RDD > JavaPairRDD<GroupItemNode, Inventory> rdd2 > > // Resultant RDD > JavaPairRDD<GroupItemNode, Tuple2<WeekItemComposite, Optional<Inventory>>> > result = rdd1.leftOuterJoin(rdd2) > > Has anyone ever encountered this problem before, or know why the > optionToOptional method might be getting passed this “None” value? I’ve > added some more relevant information below, let me know if I can provide > any more details. > > Here's a screenshot showing the string value of “None” being passed into > the optionToOptional method using the debugger: > > Here’s the stack trace (the method shown above is highlighted): > > ERROR 13:17:00,743 com.tgt.allocation.needengine.NeedEngineApplication > Exception while running need engine: > org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 > in stage 31.0 failed 1 times, most recent failure: Lost task 8.0 in stage > 31.0 (TID 50, localhost): java.lang.NullPointerException > at > org.spark-project.guava.base.Preconditions.checkNotNull(Preconditions.java:191) > at com.google.common.base.Optional.of(Optional.java:86) > at org.apache.spark.api.java.JavaUtils$.optionToOptional > (JavaUtils.scala:30) > at > org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564) > at > org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:755) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:755) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > Driver stacktrace: > at org.apache.spark.scheduler.DAGScheduler.org > <http://org.apache.spark.scheduler.dagscheduler.org/> > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) > at org.apache.spark.rdd.RDD.count(RDD.scala:1143) > at org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:440) > at > org.apache.spark.api.java.AbstractJavaRDDLike.count(JavaRDDLike.scala:46) > at > com.tgt.allocation.needengine.spark.processor.NeedEngineProcessor.runProcessor(NeedEngineProcessor.java:43) > at > com.tgt.allocation.needengine.spark.processor.SparkProcessor.runProcessor(SparkProcessor.java:68) > at > com.tgt.allocation.needengine.service.NeedEngineService.runProcessor(NeedEngineService.java:47) > at > com.tgt.allocation.needengine.NeedEngineApplication.main(NeedEngineApplication.java:29) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) > Caused by: java.lang.NullPointerException > at > org.spark-project.guava.base.Preconditions.checkNotNull(Preconditions.java:191) > at com.google.common.base.Optional.of(Optional.java:86) > at org.apache.spark.api.java.JavaUtils$.optionToOptional > (JavaUtils.scala:30) > at > org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564) > at > org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:755) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:755) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > WARN 13:17:00,744 org.apache.spark.Logging$class Lost task 9.0 in stage > 31.0 (TID 51, localhost): TaskKilled (killed intentionally) > > Thank you for any help you may be able to provide, > > Adam Westerman >