you are creating streaming context each time

val streamingContext = new StreamingContext(sparkSession.sparkContext,
Seconds(config.getInt(Constants.Properties.BatchInterval)))

if you want fault-tolerance, to read from where it stopped between spark job
restarts, the correct way is to restore streaming context from the
checkpoint directory

// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory,
functionToCreateContext _)

please refer here for checkpointing and to achieve fault-tolerance in case
of driver failures
checkpointing
<https://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing>
  

hope this helps





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to