Hi Ted, I am working on replicating the problem on a smaller scale.
I saw that Spark 2.0 is moving to Java 8 Optional instead of Guava Optional, but in the meantime I'm stuck with 1.6.1. -Adam On Fri, May 6, 2016 at 9:40 AM, Ted Yu <yuzhih...@gmail.com> wrote: > Is it possible to write a short test which exhibits this problem ? > > For Spark 2.0, this part of code has changed: > > [SPARK-4819] Remove Guava's "Optional" from public API > > FYI > > On Fri, May 6, 2016 at 6:57 AM, Adam Westerman <aswes...@gmail.com> wrote: > >> Hi, >> >> I’m attempting to do a left outer join in Spark, and I’m getting an NPE >> that appears to be due to some Spark Java API bug. (I’m running Spark 1.6.0 >> in local mode on a Mac). >> >> For a little background, the left outer join returns all keys from the >> left side of the join regardless of whether or not the key is present on >> the right side. To handle this uncertainty, the value from the right side >> is wrapped in Guava’s Optional class. The Optional class has a method to >> check whether the value is present or not (which would indicate the key >> appeared in both RDDs being joined). If the key was indeed present in both >> RDDs you can then retrieve the value and move forward. >> >> After doing a little digging, I found that Spark is using Scala’s Option >> functionality internally. This is the same concept as the Guava Optional, >> only native to Scala. It appears that during the conversion from a Scala >> Option back to a Guava Optional (this method can be found here: >> https://github.com/apache/spark/blob/v1.6.0/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala#L28) >> the >> conversion method is erroneously passed a Scala Option with the String >> value “None” instead of Scala’s null value None. This is matched to the >> first *case*, which causes Guava’s Optional.of method to attempt to pull >> the value out. A NPE is thrown since it wasn’t ever actually there. >> >> The code basically looks like this, where the classes used are just plain >> Java objects with some class attributes inside: >> // First RDD >> JavaPairRDD<GroupItemNode, WeekItemComposite> rdd1 >> // Second RDD >> JavaPairRDD<GroupItemNode, Inventory> rdd2 >> >> // Resultant RDD >> JavaPairRDD<GroupItemNode, Tuple2<WeekItemComposite, >> Optional<Inventory>>> result = rdd1.leftOuterJoin(rdd2) >> >> Has anyone ever encountered this problem before, or know why the >> optionToOptional method might be getting passed this “None” value? I’ve >> added some more relevant information below, let me know if I can provide >> any more details. >> >> Here's a screenshot showing the string value of “None” being passed into >> the optionToOptional method using the debugger: >> >> Here’s the stack trace (the method shown above is highlighted): >> >> ERROR 13:17:00,743 com.tgt.allocation.needengine.NeedEngineApplication >> Exception while running need engine: >> org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 >> in stage 31.0 failed 1 times, most recent failure: Lost task 8.0 in stage >> 31.0 (TID 50, localhost): java.lang.NullPointerException >> at >> org.spark-project.guava.base.Preconditions.checkNotNull(Preconditions.java:191) >> at com.google.common.base.Optional.of(Optional.java:86) >> at org.apache.spark.api.java.JavaUtils$.optionToOptional >> (JavaUtils.scala:30) >> at >> org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564) >> at >> org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564) >> at >> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:755) >> at >> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:755) >> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) >> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) >> at >> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >> at org.apache.spark.scheduler.Task.run(Task.scala:89) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> >> Driver stacktrace: >> at org.apache.spark.scheduler.DAGScheduler.org >> <http://org.apache.spark.scheduler.dagscheduler.org/> >> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) >> at >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> at >> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) >> at scala.Option.foreach(Option.scala:236) >> at >> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) >> at >> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) >> at >> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) >> at >> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) >> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) >> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) >> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) >> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) >> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) >> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) >> at org.apache.spark.rdd.RDD.count(RDD.scala:1143) >> at >> org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:440) >> at >> org.apache.spark.api.java.AbstractJavaRDDLike.count(JavaRDDLike.scala:46) >> at >> com.tgt.allocation.needengine.spark.processor.NeedEngineProcessor.runProcessor(NeedEngineProcessor.java:43) >> at >> com.tgt.allocation.needengine.spark.processor.SparkProcessor.runProcessor(SparkProcessor.java:68) >> at >> com.tgt.allocation.needengine.service.NeedEngineService.runProcessor(NeedEngineService.java:47) >> at >> com.tgt.allocation.needengine.NeedEngineApplication.main(NeedEngineApplication.java:29) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) >> Caused by: java.lang.NullPointerException >> at >> org.spark-project.guava.base.Preconditions.checkNotNull(Preconditions.java:191) >> at com.google.common.base.Optional.of(Optional.java:86) >> at org.apache.spark.api.java.JavaUtils$.optionToOptional >> (JavaUtils.scala:30) >> at >> org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564) >> at >> org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564) >> at >> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:755) >> at >> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:755) >> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) >> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) >> at >> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >> at org.apache.spark.scheduler.Task.run(Task.scala:89) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> WARN 13:17:00,744 org.apache.spark.Logging$class Lost task 9.0 in stage >> 31.0 (TID 51, localhost): TaskKilled (killed intentionally) >> >> Thank you for any help you may be able to provide, >> >> Adam Westerman >> > >