I have a simple spark streaming application which reads the data from the
rabbitMQ
 and does some aggregation on window interval of  1 min and 1 hour for
batch interval of 30s.

 I have a three node setup. And to enable checkpoint,
 I have mounted the same directory using sshfs to all worker node for
creating checkpoint.

 When I run the spark streaming App for the first time it works fine .
 I could see the results being printed on console and some checkpoints
happening in the network directory.

 But when I run the job for the second time , it fails with the following
exception

        at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
                at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
                at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
                at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
                at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
                at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
                at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
                at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
                at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
                at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
                at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
                at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
                at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
                at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
                at org.apache.spark.scheduler.Task.run(Task.scala:70)
                at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
                at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
                at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
                at java.lang.Thread.run(Thread.java:745)


And the exception is repeated.


I am not pumping huge data to the rabbitMQ. When I run the job for the
first time I am dumping only < 100 events .
And when I run for the second time,  I have stopped the messages being sent
to RabbitMQ from the producer process.

I have tried setting "spark.streaming.unpersist","true" .
And My Set up has 3 node each having one core allocated for spark and
executor memory per node is 512MB.

Please help me in solving this issue.

Reply via email to