you are creating streaming context each time val streamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(config.getInt(Constants.Properties.BatchInterval)))
if you want fault-tolerance, to read from where it stopped between spark job restarts, the correct way is to restore streaming context from the checkpoint directory // Get StreamingContext from checkpoint data or create a new one val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _) please refer here for checkpointing and to achieve fault-tolerance in case of driver failures checkpointing <https://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing> hope this helps -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org