[ https://issues.apache.org/jira/browse/SPARK-12591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15136208#comment-15136208 ]
Yuval Itzchakov edited comment on SPARK-12591 at 2/7/16 10:04 AM: ------------------------------------------------------------------ I'm seeing this error as well running Spark 1.6.0 with KryoSerializer. This error started happening after I reset a spark-worker node while a job is running. This hasn't happened previously and doesn't occur unless I restart a worker node. [~zsxwing] - I'm assuming this patch needs to be applied until 1.6.1, am I right? was (Author: yuval.itzchakov): I'm seeing this error as well running Spark 1.6.0 with KryoSerializer. This error started happening after I reset a spark-worker node while a job is running. This hasn't happened previously and doesn't occur unless I restart a worker node. [~zsxwing] - Was this supposed to be fixed in 1.6.0? Or is there still a need to manually patch this until 1.6.1? > NullPointerException using checkpointed mapWithState with KryoSerializer > ------------------------------------------------------------------------ > > Key: SPARK-12591 > URL: https://issues.apache.org/jira/browse/SPARK-12591 > Project: Spark > Issue Type: Bug > Components: Streaming > Affects Versions: 1.6.0 > Environment: MacOSX > Java(TM) SE Runtime Environment (build 1.8.0_20-ea-b17) > Reporter: Jan Uyttenhove > Assignee: Shixiong Zhu > Fix For: 1.6.1, 2.0.0 > > Attachments: Screen Shot 2016-01-27 at 10.09.18 AM.png > > > Issue occured after upgrading to the RC4 of Spark (streaming) 1.6.0 to > (re)test the new mapWithState API, after previously reporting issue > SPARK-11932 (https://issues.apache.org/jira/browse/SPARK-11932). > For initial report, see > http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-streaming-1-6-0-RC4-NullPointerException-using-mapWithState-tt15830.html > Narrowed it down to an issue unrelated to Kafka directstream, but, after > observing very unpredictable behavior as a result of changes to the Kafka > messages format, it seems to be related to KryoSerialization in specific. > For test case, see my modified version of the StatefulNetworkWordCount > example: https://gist.github.com/juyttenh/9b4a4103699a7d5f698f > To reproduce, use RC4 of Spark-1.6.0 and > - start nc: > {code}nc -lk 9999{code} > - execute the supplied test case: > {code}bin/spark-submit --class > org.apache.spark.examples.streaming.StatefulNetworkWordCount --master > local[2] file:///some-assembly-jar localhost 9999{code} > Error scenario: > - put some text in the nc console with the job running, and observe correct > functioning of the word count > - kill the spark job > - add some more text in the nc console (with the job not running) > - restart the spark job and observe the NPE > (you might need to repeat this a couple of times to trigger the exception) > Here's the stacktrace: > {code} > 15/12/31 11:43:47 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 5) > java.lang.NullPointerException > at > org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:103) > at > org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:111) > at > org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:111) > at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:56) > at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at > org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) > at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55) > at > org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) > at > org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:148) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 15/12/31 11:43:47 INFO TaskSetManager: Starting task 1.0 in stage 4.0 (TID 6, > localhost, partition 1,NODE_LOCAL, 2239 bytes) > 15/12/31 11:43:47 INFO Executor: Running task 1.0 in stage 4.0 (TID 6) > 15/12/31 11:43:47 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 5, > localhost): java.lang.NullPointerException > at > org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:103) > at > org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:111) > at > org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:111) > at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:56) > at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at > org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) > at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55) > at > org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) > at > org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:148) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 15/12/31 11:43:47 ERROR TaskSetManager: Task 0 in stage 4.0 failed 1 times; > aborting job > 15/12/31 11:43:47 INFO TaskSchedulerImpl: Cancelling stage 4 > 15/12/31 11:43:47 INFO Executor: Executor is trying to kill task 1.0 in stage > 4.0 (TID 6) > 15/12/31 11:43:47 INFO TaskSchedulerImpl: Stage 4 was cancelled > 15/12/31 11:43:47 INFO DAGScheduler: ResultStage 4 (foreachPartition at > StatefulNetworkCountKryo.scala:72) failed in 0.100 s > 15/12/31 11:43:47 INFO DAGScheduler: Job 1 failed: foreachPartition at > StatefulNetworkCountKryo.scala:72, took 0.823996 s > 15/12/31 11:43:47 INFO JobScheduler: Finished job streaming job 1451558610000 > ms.0 from job set of time 1451558610000 ms > 15/12/31 11:43:47 INFO JobScheduler: Total delay: 17.994 s for time > 1451558610000 ms (execution: 0.857 s) > 15/12/31 11:43:47 INFO JobScheduler: Starting job streaming job 1451558620000 > ms.0 from job set of time 1451558620000 ms > 15/12/31 11:43:47 ERROR JobScheduler: Error running job streaming job > 1451558610000 ms.0 > org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in > stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 > (TID 5, localhost): java.lang.NullPointerException > at > org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:103) > at > org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:111) > at > org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:111) > at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:56) > at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at > org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) > at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55) > at > org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) > at > org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:148) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) > at > org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:920) > at > org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) > at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918) > at > org.apache.spark.examples.streaming.StatefulNetworkWordCountKryo$$anonfun$createContext$2.apply(StatefulNetworkCountKryo.scala:72) > at > org.apache.spark.examples.streaming.StatefulNetworkWordCountKryo$$anonfun$createContext$2.apply(StatefulNetworkCountKryo.scala:71) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) > at scala.util.Try$.apply(Try.scala:161) > at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.NullPointerException > at > org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:103) > at > org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:111) > at > org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:111) > at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:56) > at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at > org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) > at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55) > at > org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) > at > org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:148) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > ... 3 more > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org