The first thing that stands out to me is
createOnError = true

Are you sure the checkpoint is actually loading, as opposed to failing and
starting the job anyway?  There should be info lines that look like

INFO DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData:
Restoring KafkaRDD for time 1440597180000 ms [(test,1,162,220)


You should be able to tell from those whether the offset ranges being
loaded from the checkpoint look reasonable.

Also, is there a reason you're calling

directKStream.checkpoint(checkpointDuration)

Just calling checkpoint on the streaming context should be sufficient to
save the metadata



On Tue, Aug 25, 2015 at 2:36 PM, Susan Zhang <suchenz...@gmail.com> wrote:

> Sure thing!
>
> The main looks like:
>
>
> --------------------------------------------------------------------------------------------------
>
>
> val kafkaBrokers = conf.getString(s"$varPrefix.metadata.broker.list")
>
> val kafkaConf = Map(
>       "zookeeper.connect" -> zookeeper,
>       "group.id" -> options.group,
>       "zookeeper.connection.timeout.ms" -> "10000",
>       "auto.commit.interval.ms" -> "1000",
>       "rebalance.max.retries" -> "25",
>       "bootstrap.servers" -> kafkaBrokers
>     )
>
> val ssc = StreamingContext.getOrCreate(checkpointDirectory,
>       () => {
>         createContext(kafkaConf, checkpointDirectory, topic, numThreads,
> isProd)
>       }, createOnError = true)
>
> ssc.start()
> ssc.awaitTermination()
>
>
>
> --------------------------------------------------------------------------------------------------
>
>
> And createContext is defined as:
>
>
>
> --------------------------------------------------------------------------------------------------
>
>
> val batchDuration = Seconds(5)
> val checkpointDuration = Seconds(20)
>
> private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
>
> def createContext(kafkaConf: Map[String, String],
>                     checkpointDirectory: String,
>                     topic: String,
>                     numThreads: Int,
>                     isProd: Boolean)
>   : StreamingContext = {
>
>     val sparkConf = new SparkConf().setAppName("***")
>     val ssc = new StreamingContext(sparkConf, batchDuration)
>     ssc.checkpoint(checkpointDirectory)
>
>     val topicSet = topic.split(",").toSet
>     val groupId = kafkaConf.getOrElse("group.id", "")
>
>     val directKStream = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaConf, topicSet)
>     directKStream.checkpoint(checkpointDuration)
>
>     val table = ***
>
>     directKStream.foreachRDD { rdd =>
>       val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>       rdd.flatMap(rec => someFunc(rec))
>         .reduceByKey((i1: Long, i2: Long) => if (i1 > i2) i1 else i2)
>         .foreachPartition { partitionRec =>
>           val dbWrite = DynamoDBWriter()
>           partitionRec.foreach {
>             /* Update Dynamo Here */
>           }
>         }
>
>       /** Set up ZK Connection **/
>       val props = new Properties()
>       kafkaConf.foreach(param => props.put(param._1, param._2))
>
>       props.setProperty(AUTO_OFFSET_COMMIT, "false")
>
>       val consumerConfig = new ConsumerConfig(props)
>       assert(!consumerConfig.autoCommitEnable)
>
>       val zkClient = new ZkClient(consumerConfig.zkConnect,
> consumerConfig.zkSessionTimeoutMs,
>         consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)
>
>       offsetRanges.foreach { osr =>
>         val topicDirs = new ZKGroupTopicDirs(groupId, osr.topic)
>         val zkPath = s"${topicDirs.consumerOffsetDir}/${osr.partition}"
>         ZkUtils.updatePersistentPath(zkClient, zkPath,
> osr.untilOffset.toString)
>       }
>     }
>     ssc
>   }
>
>
>
> On Tue, Aug 25, 2015 at 12:07 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> Sounds like something's not set up right... can you post a minimal code
>> example that reproduces the issue?
>>
>> On Tue, Aug 25, 2015 at 1:40 PM, Susan Zhang <suchenz...@gmail.com>
>> wrote:
>>
>>> Yeah. All messages are lost while the streaming job was down.
>>>
>>> On Tue, Aug 25, 2015 at 11:37 AM, Cody Koeninger <c...@koeninger.org>
>>> wrote:
>>>
>>>> Are you actually losing messages then?
>>>>
>>>> On Tue, Aug 25, 2015 at 1:15 PM, Susan Zhang <suchenz...@gmail.com>
>>>> wrote:
>>>>
>>>>> No; first batch only contains messages received after the second job
>>>>> starts (messages come in at a steady rate of about 400/second).
>>>>>
>>>>> On Tue, Aug 25, 2015 at 11:07 AM, Cody Koeninger <c...@koeninger.org>
>>>>> wrote:
>>>>>
>>>>>> Does the first batch after restart contain all the messages received
>>>>>> while the job was down?
>>>>>>
>>>>>> On Tue, Aug 25, 2015 at 12:53 PM, suchenzang <suchenz...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> I'm using direct spark streaming (from kafka) with checkpointing, and
>>>>>>> everything works well until a restart. When I shut down (^C) the
>>>>>>> first
>>>>>>> streaming job, wait 1 minute, then re-submit, there is somehow a
>>>>>>> series of 0
>>>>>>> event batches that get queued (corresponding to the 1 minute when
>>>>>>> the job
>>>>>>> was down). Eventually, the batches would resume processing, and I
>>>>>>> would see
>>>>>>> that each batch has roughly 2000 events.
>>>>>>>
>>>>>>> I see that at the beginning of the second launch, the checkpoint
>>>>>>> dirs are
>>>>>>> found and "loaded", according to console output.
>>>>>>>
>>>>>>> Is this expected behavior? It seems like I might've configured
>>>>>>> something
>>>>>>> incorrectly, since I would expect with checkpointing that the
>>>>>>> streaming job
>>>>>>> would resume from checkpoint and continue processing from there
>>>>>>> (without
>>>>>>> seeing 0 event batches corresponding to when the job was down).
>>>>>>>
>>>>>>> Also, if I were to wait > 10 minutes or so before re-launching,
>>>>>>> there would
>>>>>>> be so many 0 event batches that the job would hang. Is this merely
>>>>>>> something
>>>>>>> to be "waited out", or should I set up some restart behavior/make a
>>>>>>> config
>>>>>>> change to discard checkpointing if the elapsed time has been too
>>>>>>> long?
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>> <
>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png
>>>>>>> >
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> View this message in context:
>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html
>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>> Nabble.com.
>>>>>>>
>>>>>>> ---------------------------------------------------------------------
>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to