The source is DirectKafkaInputDStream which can ensure the exectly-once of the consumer side. But I have a question based the following code。As we known, the "graph.generateJobs(time)" will create rdds and generate jobs。And the source RDD is KafkaRDD which contain the offsetRange。 The jobs are submitted successfully by " jobScheduler.submitJobSet", and the cluster start running the jobs. After that, the driver crash suddenly and will lost the offsetRange. Because the driver has not run the "eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))" yet.
``` private def generateJobs(time: Time) { // Checkpoint all RDDs marked for checkpointing to ensure their lineages are // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847). ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true") Try { jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch graph.generateJobs(time) // generate jobs using allocated block } match { case Success(jobs) => val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) PythonDStream.stopStreamingContextIfPythonProcessIsDead(e) } eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) } ```