Hi Ted,

I am working on replicating the problem on a smaller scale.

I saw that Spark 2.0 is moving to Java 8 Optional instead of Guava
Optional, but in the meantime I'm stuck with 1.6.1.

-Adam

On Fri, May 6, 2016 at 9:40 AM, Ted Yu <yuzhih...@gmail.com> wrote:

> Is it possible to write a short test which exhibits this problem ?
>
> For Spark 2.0, this part of code has changed:
>
> [SPARK-4819] Remove Guava's "Optional" from public API
>
> FYI
>
> On Fri, May 6, 2016 at 6:57 AM, Adam Westerman <aswes...@gmail.com> wrote:
>
>> Hi,
>>
>> I’m attempting to do a left outer join in Spark, and I’m getting an NPE
>> that appears to be due to some Spark Java API bug. (I’m running Spark 1.6.0
>> in local mode on a Mac).
>>
>> For a little background, the left outer join returns all keys from the
>> left side of the join regardless of whether or not the key is present on
>> the right side.  To handle this uncertainty, the value from the right side
>> is wrapped in Guava’s Optional class.  The Optional class has a method to
>> check whether the value is present or not (which would indicate the key
>> appeared in both RDDs being joined).  If the key was indeed present in both
>> RDDs you can then retrieve the value and move forward.
>>
>> After doing a little digging, I found that Spark is using Scala’s Option
>> functionality internally.  This is the same concept as the Guava Optional,
>> only native to Scala.  It appears that during the conversion from a Scala
>> Option back to a Guava Optional (this method can be found here:
>> https://github.com/apache/spark/blob/v1.6.0/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala#L28)
>>  the
>> conversion method is erroneously passed a Scala Option with the String
>> value “None” instead of Scala’s null value None.  This is matched to the
>> first *case*, which causes Guava’s Optional.of method to attempt to pull
>> the value out.  A NPE is thrown since it wasn’t ever actually there.
>>
>> The code basically looks like this, where the classes used are just plain
>> Java objects with some class attributes inside:
>> // First RDD
>> JavaPairRDD<GroupItemNode, WeekItemComposite> rdd1
>> // Second RDD
>> JavaPairRDD<GroupItemNode, Inventory> rdd2
>>
>> // Resultant RDD
>> JavaPairRDD<GroupItemNode, Tuple2<WeekItemComposite,
>> Optional<Inventory>>> result = rdd1.leftOuterJoin(rdd2)
>>
>> Has anyone ever encountered this problem before, or know why the
>> optionToOptional method might be getting passed this “None” value?  I’ve
>> added some more relevant information below, let me know if I can provide
>> any more details.
>>
>> Here's a screenshot showing the string value of “None” being passed into
>> the optionToOptional method using the debugger:
>>
>> Here’s the stack trace (the method shown above is highlighted):
>>
>> ERROR 13:17:00,743 com.tgt.allocation.needengine.NeedEngineApplication
>> Exception while running need engine:
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 8
>> in stage 31.0 failed 1 times, most recent failure: Lost task 8.0 in stage
>> 31.0 (TID 50, localhost): java.lang.NullPointerException
>> at
>> org.spark-project.guava.base.Preconditions.checkNotNull(Preconditions.java:191)
>> at com.google.common.base.Optional.of(Optional.java:86)
>> at org.apache.spark.api.java.JavaUtils$.optionToOptional
>> (JavaUtils.scala:30)
>> at
>> org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564)
>> at
>> org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564)
>> at
>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:755)
>> at
>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:755)
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>> at
>> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Driver stacktrace:
>> at org.apache.spark.scheduler.DAGScheduler.org
>> <http://org.apache.spark.scheduler.dagscheduler.org/>
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>> at scala.Option.foreach(Option.scala:236)
>> at
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
>> at org.apache.spark.rdd.RDD.count(RDD.scala:1143)
>> at
>> org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:440)
>> at
>> org.apache.spark.api.java.AbstractJavaRDDLike.count(JavaRDDLike.scala:46)
>> at
>> com.tgt.allocation.needengine.spark.processor.NeedEngineProcessor.runProcessor(NeedEngineProcessor.java:43)
>> at
>> com.tgt.allocation.needengine.spark.processor.SparkProcessor.runProcessor(SparkProcessor.java:68)
>> at
>> com.tgt.allocation.needengine.service.NeedEngineService.runProcessor(NeedEngineService.java:47)
>> at
>> com.tgt.allocation.needengine.NeedEngineApplication.main(NeedEngineApplication.java:29)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>> Caused by: java.lang.NullPointerException
>> at
>> org.spark-project.guava.base.Preconditions.checkNotNull(Preconditions.java:191)
>> at com.google.common.base.Optional.of(Optional.java:86)
>> at org.apache.spark.api.java.JavaUtils$.optionToOptional
>> (JavaUtils.scala:30)
>> at
>> org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564)
>> at
>> org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564)
>> at
>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:755)
>> at
>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:755)
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>> at
>> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>  WARN 13:17:00,744 org.apache.spark.Logging$class Lost task 9.0 in stage
>> 31.0 (TID 51, localhost): TaskKilled (killed intentionally)
>>
>> Thank you for any help you may be able to provide,
>>
>> Adam Westerman
>>
>
>

Reply via email to