Re: Spark streaming 1.6.0-RC4 NullPointerException using mapWithState
Thanks for your answers, and I'm sorry for the sight delay. I was trying to narrow it down first since I noticed very unpredictable behaviour in reproducing it. Finally the unpredictability seemed related to the message format of the messages on Kafka, so I also came to suspect it had something to do with serialization. I was using the KryoSerializer, and I can confirm that it is in fact related to it (no exception when using the JavaSerializer). And it's unrelated to Kafka. Apparently the exception occurs when restoring state from previous checkpoints and using KryoSerialization. When the job has checkpoints from a previous run (containing state) and is started with new messages already available on e.g. a Kafka topic (or via nc), the NPE occurs. And this is - of course - a typical use case of using Kafka with Spark streaming and checkpointing. As requested I created issue SPARK-12591 ( https://issues.apache.org/jira/browse/SPARK-12591). It contains the procedure to reproduce the error with the testcase which is again a modified version of the StatefulNetworkWordCount Spark streaming example ( https://gist.github.com/juyttenh/9b4a4103699a7d5f698f). Best regards, Jan On Thu, Dec 31, 2015 at 2:32 AM, Ted Yu wrote: > I went through StateMap.scala a few times but didn't find any logic error > yet. > > According to the call stack, the following was executed in get(key): > > } else { > parentStateMap.get(key) > } > This implies that parentStateMap was null. > But it seems parentStateMap is properly assigned in readObject(). > > Jan: > Which serializer did you use ? > > Thanks > > On Tue, Dec 29, 2015 at 3:42 AM, Jan Uyttenhove wrote: > >> Hi guys, >> >> I upgraded to the RC4 of Spark (streaming) 1.6.0 to (re)test the new >> mapWithState API, after previously reporting issue SPARK-11932 ( >> https://issues.apache.org/jira/browse/SPARK-11932). >> >> My Spark streaming job involves reading data from a Kafka topic (using >> KafkaUtils.createDirectStream), stateful processing (using checkpointing >> & mapWithState) & publishing the results back to Kafka. >> >> I'm now facing the NullPointerException below when restoring from a >> checkpoint in the following scenario: >> 1/ run job (with local[2]), process data from Kafka while creating & >> keeping state >> 2/ stop the job >> 3/ generate some extra message on the input Kafka topic >> 4/ start the job again (and restore offsets & state from the checkpoints) >> >> The problem is caused by (or at least related to) step 3, i.e. publishing >> data to the input topic while the job is stopped. >> The above scenario has been tested successfully when: >> - step 3 is excluded, so restoring state from a checkpoint is successful >> when no messages are added when the job is stopped >> - after step 2, the checkpoints are deleted >> >> Any clues? Am I doing something wrong here, or is there still a problem >> with the mapWithState impl? >> >> Thanx, >> >> Jan >> >> >> >> 15/12/29 11:56:12 ERROR executor.Executor: Exception in task 0.0 in stage >> 3.0 (TID 24) >> java.lang.NullPointerException >> at >> org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:103) >> at >> org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:111) >> at >> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:56) >> at >> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55) >> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >> at >> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) >> at >> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55) >> at >> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) >> at >> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:148) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) >> at >> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:148) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoi
Re: Spark streaming 1.6.0-RC4 NullPointerException using mapWithState
I went through StateMap.scala a few times but didn't find any logic error yet. According to the call stack, the following was executed in get(key): } else { parentStateMap.get(key) } This implies that parentStateMap was null. But it seems parentStateMap is properly assigned in readObject(). Jan: Which serializer did you use ? Thanks On Tue, Dec 29, 2015 at 3:42 AM, Jan Uyttenhove wrote: > Hi guys, > > I upgraded to the RC4 of Spark (streaming) 1.6.0 to (re)test the new > mapWithState API, after previously reporting issue SPARK-11932 ( > https://issues.apache.org/jira/browse/SPARK-11932). > > My Spark streaming job involves reading data from a Kafka topic (using > KafkaUtils.createDirectStream), stateful processing (using checkpointing > & mapWithState) & publishing the results back to Kafka. > > I'm now facing the NullPointerException below when restoring from a > checkpoint in the following scenario: > 1/ run job (with local[2]), process data from Kafka while creating & > keeping state > 2/ stop the job > 3/ generate some extra message on the input Kafka topic > 4/ start the job again (and restore offsets & state from the checkpoints) > > The problem is caused by (or at least related to) step 3, i.e. publishing > data to the input topic while the job is stopped. > The above scenario has been tested successfully when: > - step 3 is excluded, so restoring state from a checkpoint is successful > when no messages are added when the job is stopped > - after step 2, the checkpoints are deleted > > Any clues? Am I doing something wrong here, or is there still a problem > with the mapWithState impl? > > Thanx, > > Jan > > > > 15/12/29 11:56:12 ERROR executor.Executor: Exception in task 0.0 in stage > 3.0 (TID 24) > java.lang.NullPointerException > at > org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:103) > at > org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:111) > at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:56) > at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at > org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) > at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55) > at > org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) > at > org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:148) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) > at > org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:148) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 15/12/29 11:56:12 INFO storage.BlockManagerInfo: Added rdd_25_1 in memory > on localhost:10003 (size: 1024.0 B, free: 511.1 MB) > 15/12/29 11:56:12 INFO storage.ShuffleBlockFetcherIterator: Getting 0 > non-empty blocks out of 8 blocks > 15/12/29 11:56:12 INFO storage.ShuffleBlockFetcherIterator: Started 0 > remote fetches in 0 ms > 15/12/29 11:56:12 INFO storage.MemoryStore: Block rdd_29_1 stored as > values in memory (estimated size 1824.0 B, free 488.0 KB) > 15/12/29 11:56:12 INFO storage.BlockManagerInfo: Added rdd_29_1 in memory > on localhost:10003 (size: 1824.0 B, free: 511.1 MB) > 15/12/29 11:56:12 INFO storage.ShuffleBlockFetcherIterator: Getting 0 > non-empty blocks out of 8 blocks > 15/12/29 11:56:12 INFO storage.ShuffleBlockFetcherIterator:
Re: Spark streaming 1.6.0-RC4 NullPointerException using mapWithState
Hi Jan, could you post your codes? I could not reproduce this issue in my environment. Best Regards, Shixiong Zhu 2015-12-29 10:22 GMT-08:00 Shixiong Zhu : > Could you create a JIRA? We can continue the discussion there. Thanks! > > Best Regards, > Shixiong Zhu > > 2015-12-29 3:42 GMT-08:00 Jan Uyttenhove : > >> Hi guys, >> >> I upgraded to the RC4 of Spark (streaming) 1.6.0 to (re)test the new >> mapWithState API, after previously reporting issue SPARK-11932 ( >> https://issues.apache.org/jira/browse/SPARK-11932). >> >> My Spark streaming job involves reading data from a Kafka topic >> (using KafkaUtils.createDirectStream), stateful processing (using >> checkpointing & mapWithState) & publishing the results back to Kafka. >> >> I'm now facing the NullPointerException below when restoring from a >> checkpoint in the following scenario: >> 1/ run job (with local[2]), process data from Kafka while creating & >> keeping state >> 2/ stop the job >> 3/ generate some extra message on the input Kafka topic >> 4/ start the job again (and restore offsets & state from the checkpoints) >> >> The problem is caused by (or at least related to) step 3, i.e. publishing >> data to the input topic while the job is stopped. >> The above scenario has been tested successfully when: >> - step 3 is excluded, so restoring state from a checkpoint is successful >> when no messages are added when the job is stopped >> - after step 2, the checkpoints are deleted >> >> Any clues? Am I doing something wrong here, or is there still a problem >> with the mapWithState impl? >> >> Thanx, >> >> Jan >> >> >> >> 15/12/29 11:56:12 ERROR executor.Executor: Exception in task 0.0 in stage >> 3.0 (TID 24) >> java.lang.NullPointerException >> at >> org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:103) >> at >> org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:111) >> at >> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:56) >> at >> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55) >> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >> at >> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) >> at >> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55) >> at >> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) >> at >> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:148) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) >> at >> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:148) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >> at org.apache.spark.scheduler.Task.run(Task.scala:89) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> 15/12/29 11:56:12 INFO storage.BlockManagerInfo: Added rdd_25_1 in memory >> on localhost:10003 (size: 1024.0 B, free: 511.1 MB) >> 15/12/29 11:56:12 INFO storage.ShuffleBlockFetcherIterator: Getting 0 >> non-empty blocks out of 8 blocks >> 15/12/29 11:56:12 INFO storage.ShuffleBlockFetcherIterator: Started 0 >> remote fetches in 0 ms >> 15/12/29 11:56:12 INFO storage.MemoryStore: Block rdd_29_1 stored as >> values in memory (estimated size 1824.0 B, free 488.0 KB) >> 15/12/29 11:56:12 INFO storage.BlockManagerInfo: Added rdd_29_1 in memory >> on localhost:10003 (size: 1824.0 B, free: 511.1 MB) >> 15/12/29 11:56:12 INFO storage.ShuffleBlockFetcherIterator: Getting 0 >> non-empty blocks out of 8 blocks >> 15/12/29 11:56:12 INFO storage.ShuffleBlockFetcherIter
Re: Spark streaming 1.6.0-RC4 NullPointerException using mapWithState
Could you create a JIRA? We can continue the discussion there. Thanks! Best Regards, Shixiong Zhu 2015-12-29 3:42 GMT-08:00 Jan Uyttenhove : > Hi guys, > > I upgraded to the RC4 of Spark (streaming) 1.6.0 to (re)test the new > mapWithState API, after previously reporting issue SPARK-11932 ( > https://issues.apache.org/jira/browse/SPARK-11932). > > My Spark streaming job involves reading data from a Kafka topic (using > KafkaUtils.createDirectStream), stateful processing (using checkpointing > & mapWithState) & publishing the results back to Kafka. > > I'm now facing the NullPointerException below when restoring from a > checkpoint in the following scenario: > 1/ run job (with local[2]), process data from Kafka while creating & > keeping state > 2/ stop the job > 3/ generate some extra message on the input Kafka topic > 4/ start the job again (and restore offsets & state from the checkpoints) > > The problem is caused by (or at least related to) step 3, i.e. publishing > data to the input topic while the job is stopped. > The above scenario has been tested successfully when: > - step 3 is excluded, so restoring state from a checkpoint is successful > when no messages are added when the job is stopped > - after step 2, the checkpoints are deleted > > Any clues? Am I doing something wrong here, or is there still a problem > with the mapWithState impl? > > Thanx, > > Jan > > > > 15/12/29 11:56:12 ERROR executor.Executor: Exception in task 0.0 in stage > 3.0 (TID 24) > java.lang.NullPointerException > at > org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:103) > at > org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:111) > at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:56) > at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at > org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) > at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55) > at > org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) > at > org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:148) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) > at > org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:148) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 15/12/29 11:56:12 INFO storage.BlockManagerInfo: Added rdd_25_1 in memory > on localhost:10003 (size: 1024.0 B, free: 511.1 MB) > 15/12/29 11:56:12 INFO storage.ShuffleBlockFetcherIterator: Getting 0 > non-empty blocks out of 8 blocks > 15/12/29 11:56:12 INFO storage.ShuffleBlockFetcherIterator: Started 0 > remote fetches in 0 ms > 15/12/29 11:56:12 INFO storage.MemoryStore: Block rdd_29_1 stored as > values in memory (estimated size 1824.0 B, free 488.0 KB) > 15/12/29 11:56:12 INFO storage.BlockManagerInfo: Added rdd_29_1 in memory > on localhost:10003 (size: 1824.0 B, free: 511.1 MB) > 15/12/29 11:56:12 INFO storage.ShuffleBlockFetcherIterator: Getting 0 > non-empty blocks out of 8 blocks > 15/12/29 11:56:12 INFO storage.ShuffleBlockFetcherIterator: Started 0 > remote fetches in 0 ms > 15/12/29 11:56:12 WARN scheduler.TaskSetManager: Lost task 0.0 in stage > 3.0 (TID 24, localhost): java.lang.NullPointerException > at > org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:103) > at > org.
Spark streaming 1.6.0-RC4 NullPointerException using mapWithState
Hi guys, I upgraded to the RC4 of Spark (streaming) 1.6.0 to (re)test the new mapWithState API, after previously reporting issue SPARK-11932 ( https://issues.apache.org/jira/browse/SPARK-11932). My Spark streaming job involves reading data from a Kafka topic (using KafkaUtils.createDirectStream), stateful processing (using checkpointing & mapWithState) & publishing the results back to Kafka. I'm now facing the NullPointerException below when restoring from a checkpoint in the following scenario: 1/ run job (with local[2]), process data from Kafka while creating & keeping state 2/ stop the job 3/ generate some extra message on the input Kafka topic 4/ start the job again (and restore offsets & state from the checkpoints) The problem is caused by (or at least related to) step 3, i.e. publishing data to the input topic while the job is stopped. The above scenario has been tested successfully when: - step 3 is excluded, so restoring state from a checkpoint is successful when no messages are added when the job is stopped - after step 2, the checkpoints are deleted Any clues? Am I doing something wrong here, or is there still a problem with the mapWithState impl? Thanx, Jan 15/12/29 11:56:12 ERROR executor.Executor: Exception in task 0.0 in stage 3.0 (TID 24) java.lang.NullPointerException at org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:103) at org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:111) at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:56) at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55) at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:148) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:148) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 15/12/29 11:56:12 INFO storage.BlockManagerInfo: Added rdd_25_1 in memory on localhost:10003 (size: 1024.0 B, free: 511.1 MB) 15/12/29 11:56:12 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 8 blocks 15/12/29 11:56:12 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms 15/12/29 11:56:12 INFO storage.MemoryStore: Block rdd_29_1 stored as values in memory (estimated size 1824.0 B, free 488.0 KB) 15/12/29 11:56:12 INFO storage.BlockManagerInfo: Added rdd_29_1 in memory on localhost:10003 (size: 1824.0 B, free: 511.1 MB) 15/12/29 11:56:12 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 8 blocks 15/12/29 11:56:12 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms 15/12/29 11:56:12 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 3.0 (TID 24, localhost): java.lang.NullPointerException at org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:103) at org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:111) at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:56) at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55) at scala.collection.I