The source is DirectKafkaInputDStream which can ensure the exectly-once of the 
consumer side. But I have a question based the following code。As we known, the 
"graph.generateJobs(time)" will create rdds and generate jobs。And the source 
RDD is KafkaRDD which contain the offsetRange。 The jobs are submitted 
successfully by " jobScheduler.submitJobSet", and the cluster start running the 
jobs. After that, the driver crash suddenly and will lost the offsetRange. 
Because the driver has not run the "eventLoop.post(DoCheckpoint(time, 
clearCheckpointDataLater = false))" yet. 

```
  private def generateJobs(time: Time) {
    // Checkpoint all RDDs marked for checkpointing to ensure their lineages are
    // truncated periodically. Otherwise, we may run into stack overflows 
(SPARK-6847).
    ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, 
"true")
    Try {
      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate 
received blocks to batch
      graph.generateJobs(time) // generate jobs using allocated block
    } match {
      case Success(jobs) =>
        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
      case Failure(e) =>
        jobScheduler.reportError("Error generating jobs for time " + time, e)
        PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
    }
    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
  }
  ```

Reply via email to