The first thing that stands out to me is createOnError = true Are you sure the checkpoint is actually loading, as opposed to failing and starting the job anyway? There should be info lines that look like
INFO DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring KafkaRDD for time 1440597180000 ms [(test,1,162,220) You should be able to tell from those whether the offset ranges being loaded from the checkpoint look reasonable. Also, is there a reason you're calling directKStream.checkpoint(checkpointDuration) Just calling checkpoint on the streaming context should be sufficient to save the metadata On Tue, Aug 25, 2015 at 2:36 PM, Susan Zhang <suchenz...@gmail.com> wrote: > Sure thing! > > The main looks like: > > > -------------------------------------------------------------------------------------------------- > > > val kafkaBrokers = conf.getString(s"$varPrefix.metadata.broker.list") > > val kafkaConf = Map( > "zookeeper.connect" -> zookeeper, > "group.id" -> options.group, > "zookeeper.connection.timeout.ms" -> "10000", > "auto.commit.interval.ms" -> "1000", > "rebalance.max.retries" -> "25", > "bootstrap.servers" -> kafkaBrokers > ) > > val ssc = StreamingContext.getOrCreate(checkpointDirectory, > () => { > createContext(kafkaConf, checkpointDirectory, topic, numThreads, > isProd) > }, createOnError = true) > > ssc.start() > ssc.awaitTermination() > > > > -------------------------------------------------------------------------------------------------- > > > And createContext is defined as: > > > > -------------------------------------------------------------------------------------------------- > > > val batchDuration = Seconds(5) > val checkpointDuration = Seconds(20) > > private val AUTO_OFFSET_COMMIT = "auto.commit.enable" > > def createContext(kafkaConf: Map[String, String], > checkpointDirectory: String, > topic: String, > numThreads: Int, > isProd: Boolean) > : StreamingContext = { > > val sparkConf = new SparkConf().setAppName("***") > val ssc = new StreamingContext(sparkConf, batchDuration) > ssc.checkpoint(checkpointDirectory) > > val topicSet = topic.split(",").toSet > val groupId = kafkaConf.getOrElse("group.id", "") > > val directKStream = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder](ssc, kafkaConf, topicSet) > directKStream.checkpoint(checkpointDuration) > > val table = *** > > directKStream.foreachRDD { rdd => > val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges > rdd.flatMap(rec => someFunc(rec)) > .reduceByKey((i1: Long, i2: Long) => if (i1 > i2) i1 else i2) > .foreachPartition { partitionRec => > val dbWrite = DynamoDBWriter() > partitionRec.foreach { > /* Update Dynamo Here */ > } > } > > /** Set up ZK Connection **/ > val props = new Properties() > kafkaConf.foreach(param => props.put(param._1, param._2)) > > props.setProperty(AUTO_OFFSET_COMMIT, "false") > > val consumerConfig = new ConsumerConfig(props) > assert(!consumerConfig.autoCommitEnable) > > val zkClient = new ZkClient(consumerConfig.zkConnect, > consumerConfig.zkSessionTimeoutMs, > consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer) > > offsetRanges.foreach { osr => > val topicDirs = new ZKGroupTopicDirs(groupId, osr.topic) > val zkPath = s"${topicDirs.consumerOffsetDir}/${osr.partition}" > ZkUtils.updatePersistentPath(zkClient, zkPath, > osr.untilOffset.toString) > } > } > ssc > } > > > > On Tue, Aug 25, 2015 at 12:07 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> Sounds like something's not set up right... can you post a minimal code >> example that reproduces the issue? >> >> On Tue, Aug 25, 2015 at 1:40 PM, Susan Zhang <suchenz...@gmail.com> >> wrote: >> >>> Yeah. All messages are lost while the streaming job was down. >>> >>> On Tue, Aug 25, 2015 at 11:37 AM, Cody Koeninger <c...@koeninger.org> >>> wrote: >>> >>>> Are you actually losing messages then? >>>> >>>> On Tue, Aug 25, 2015 at 1:15 PM, Susan Zhang <suchenz...@gmail.com> >>>> wrote: >>>> >>>>> No; first batch only contains messages received after the second job >>>>> starts (messages come in at a steady rate of about 400/second). >>>>> >>>>> On Tue, Aug 25, 2015 at 11:07 AM, Cody Koeninger <c...@koeninger.org> >>>>> wrote: >>>>> >>>>>> Does the first batch after restart contain all the messages received >>>>>> while the job was down? >>>>>> >>>>>> On Tue, Aug 25, 2015 at 12:53 PM, suchenzang <suchenz...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hello, >>>>>>> >>>>>>> I'm using direct spark streaming (from kafka) with checkpointing, and >>>>>>> everything works well until a restart. When I shut down (^C) the >>>>>>> first >>>>>>> streaming job, wait 1 minute, then re-submit, there is somehow a >>>>>>> series of 0 >>>>>>> event batches that get queued (corresponding to the 1 minute when >>>>>>> the job >>>>>>> was down). Eventually, the batches would resume processing, and I >>>>>>> would see >>>>>>> that each batch has roughly 2000 events. >>>>>>> >>>>>>> I see that at the beginning of the second launch, the checkpoint >>>>>>> dirs are >>>>>>> found and "loaded", according to console output. >>>>>>> >>>>>>> Is this expected behavior? It seems like I might've configured >>>>>>> something >>>>>>> incorrectly, since I would expect with checkpointing that the >>>>>>> streaming job >>>>>>> would resume from checkpoint and continue processing from there >>>>>>> (without >>>>>>> seeing 0 event batches corresponding to when the job was down). >>>>>>> >>>>>>> Also, if I were to wait > 10 minutes or so before re-launching, >>>>>>> there would >>>>>>> be so many 0 event batches that the job would hang. Is this merely >>>>>>> something >>>>>>> to be "waited out", or should I set up some restart behavior/make a >>>>>>> config >>>>>>> change to discard checkpointing if the elapsed time has been too >>>>>>> long? >>>>>>> >>>>>>> Thanks! >>>>>>> >>>>>>> < >>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png >>>>>>> > >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> View this message in context: >>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html >>>>>>> Sent from the Apache Spark User List mailing list archive at >>>>>>> Nabble.com. >>>>>>> >>>>>>> --------------------------------------------------------------------- >>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >