I have a simple spark streaming application which reads the data from the rabbitMQ and does some aggregation on window interval of 1 min and 1 hour for batch interval of 30s.
I have a three node setup. And to enable checkpoint, I have mounted the same directory using sshfs to all worker node for creating checkpoint. When I run the spark streaming App for the first time it works fine . I could see the results being printed on console and some checkpoints happening in the network directory. But when I run the job for the second time , it fails with the following exception at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) And the exception is repeated. I am not pumping huge data to the rabbitMQ. When I run the job for the first time I am dumping only < 100 events . And when I run for the second time, I have stopped the messages being sent to RabbitMQ from the producer process. I have tried setting "spark.streaming.unpersist","true" . And My Set up has 3 node each having one core allocated for spark and executor memory per node is 512MB. Please help me in solving this issue.