createOnError = true

Are you sure the checkpoint is actually loading, as opposed to failing and
starting the job anyway?  There should be info lines that look like

INFO DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData:
Restoring KafkaRDD for time 1440597180000 ms [(test,1,162,220)

You should be able to tell from those whether the offset ranges being
loaded from the checkpoint look reasonable.

Also, is there a reason you're calling


Just calling checkpoint on the streaming context should be sufficient to
save the metadata

> Sure thing!
> The main looks like:
> val kafkaBrokers = conf.getString(s"$")
> val kafkaConf = Map(
>       "zookeeper.connect" -> zookeeper,
>       "" ->,
>       "" -> "10000",
>       "" -> "1000",
>       "rebalance.max.retries" -> "25",
>       "bootstrap.servers" -> kafkaBrokers
>     )
> val ssc = StreamingContext.getOrCreate(checkpointDirectory,
>       () => {
>         createContext(kafkaConf, checkpointDirectory, topic, numThreads,
> isProd)
>       }, createOnError = true)
> ssc.start()
> ssc.awaitTermination()
> And createContext is defined as:
> val batchDuration = Seconds(5)
> val checkpointDuration = Seconds(20)
> private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
> def createContext(kafkaConf: Map[String, String],
>                     checkpointDirectory: String,
>                     topic: String,
>                     numThreads: Int,
>                     isProd: Boolean)
>   : StreamingContext = {
>     val sparkConf = new SparkConf().setAppName("***")
>     val ssc = new StreamingContext(sparkConf, batchDuration)
>     ssc.checkpoint(checkpointDirectory)
>     val topicSet = topic.split(",").toSet
>     val groupId = kafkaConf.getOrElse("", "")
>     val directKStream = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaConf, topicSet)
>     directKStream.checkpoint(checkpointDuration)
>     val table = ***
>     directKStream.foreachRDD { rdd =>
>       val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>       rdd.flatMap(rec => someFunc(rec))
>         .reduceByKey((i1: Long, i2: Long) => if (i1 > i2) i1 else i2)
>         .foreachPartition { partitionRec =>
>           val dbWrite = DynamoDBWriter()
>           partitionRec.foreach {
>             /* Update Dynamo Here */
>           }
>         }
>       /** Set up ZK Connection **/
>       val props = new Properties()
>       kafkaConf.foreach(param => props.put(param._1, param._2))
>       props.setProperty(AUTO_OFFSET_COMMIT, "false")
>       val consumerConfig = new ConsumerConfig(props)
>       assert(!consumerConfig.autoCommitEnable)
>       val zkClient = new ZkClient(consumerConfig.zkConnect,
> consumerConfig.zkSessionTimeoutMs,
>         consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)
>       offsetRanges.foreach { osr =>
>         val topicDirs = new ZKGroupTopicDirs(groupId, osr.topic)
>         val zkPath = s"${topicDirs.consumerOffsetDir}/${osr.partition}"
>         ZkUtils.updatePersistentPath(zkClient, zkPath,
> osr.untilOffset.toString)
>       }
>     }
>     ssc
>   }
>> Sounds like something's not set up right... can you post a minimal code
>> example that reproduces the issue?
>>> Yeah. All messages are lost while the streaming job was down.
>>>> Are you actually losing messages then?
>>>>> No; first batch only contains messages received after the second job
>>>>> starts (messages come in at a steady rate of about 400/second).
>>>>>> Does the first batch after restart contain all the messages received
>>>>>> while the job was down?
>>>>>>> Hello,
>>>>>>> I'm using direct spark streaming (from kafka) with checkpointing, and
>>>>>>> everything works well until a restart. When I shut down (^C) the
>>>>>>> first
>>>>>>> streaming job, wait 1 minute, then re-submit, there is somehow a
>>>>>>> series of 0
>>>>>>> event batches that get queued (corresponding to the 1 minute when
>>>>>>> the job
>>>>>>> was down). Eventually, the batches would resume processing, and I
>>>>>>> would see
>>>>>>> that each batch has roughly 2000 events.
>>>>>>> I see that at the beginning of the second launch, the checkpoint
>>>>>>> dirs are
>>>>>>> found and "loaded", according to console output.
>>>>>>> Is this expected behavior? It seems like I might've configured
>>>>>>> something
>>>>>>> incorrectly, since I would expect with checkpointing that the
>>>>>>> streaming job
>>>>>>> would resume from checkpoint and continue processing from there
>>>>>>> (without
>>>>>>> seeing 0 event batches corresponding to when the job was down).
>>>>>>> Also, if I were to wait > 10 minutes or so before re-launching,
>>>>>>> there would
>>>>>>> be so many 0 event batches that the job would hang. Is this merely
>>>>>>> something
>>>>>>> to be "waited out", or should I set up some restart behavior/make a
>>>>>>> config
>>>>>>> change to discard checkpointing if the elapsed time has been too
>>>>>>> long?
>>>>>>> Thanks!
