Ah, I was using the UI coupled with the job logs indicating that offsets were being "processed" even though it corresponded to 0 events. Looks like I wasn't matching up timestamps correctly: the 0 event batches were queued/processed when offsets were getting skipped:
15/08/26 11:26:05 INFO storage.BlockManager: Removing RDD 0 15/08/26 11:26:05 INFO kafka.KafkaRDD: Beginning offset 831729964 is the same as ending offset skipping install-json 1 15/08/26 11:26:05 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 6 blocks 15/08/26 11:26:08 INFO storage.BlockManager: Removing RDD 1 But eventually processing of offset 831729964 would resume: 15/08/26 11:27:18 INFO kafka.KafkaRDD: Computing topic install-json, partition 1 offsets 831729964 -> 831729976 Lesson learned: will be more focused on reading the job logs properly in the future. Thanks for all the help on this! On Wed, Aug 26, 2015 at 12:16 PM, Cody Koeninger <c...@koeninger.org> wrote: > I'd be less concerned about what the streaming ui shows than what's > actually going on with the job. When you say you were losing messages, how > were you observing that? The UI, or actual job output? > > The log lines you posted indicate that the checkpoint was restored and > those offsets were processed; what are the log lines for the following > KafkaRDD ? > > > On Wed, Aug 26, 2015 at 2:04 PM, Susan Zhang <suchenz...@gmail.com> wrote: > >> Compared offsets, and it continues from checkpoint loading: >> >> 15/08/26 11:24:54 INFO >> DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring >> KafkaRDD for time 1440612035000 ms [(install-json,5,826112083,826112446), >> (install-json,4,825772921,825773536), >> (install-json,1,831654775,831655076), >> (install-json,0,1296018451,1296018810), >> (install-json,2,824785282,824785696), (install-json,3, >> 811428882,811429181)] >> >> 15/08/26 11:25:19 INFO kafka.KafkaRDD: Computing topic install-json, >> partition 0 offsets 1296018451 -> 1296018810 >> 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json, >> partition 4 offsets 825773536 -> 825907428 >> 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json, >> partition 2 offsets 824785696 -> 824889957 >> 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json, >> partition 3 offsets 811429181 -> 811529084 >> 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json, >> partition 1 offsets 831655076 -> 831729964 >> ... >> >> But for some reason the streaming UI shows it as computing 0 events. >> >> Removing the call to checkpoint does remove the queueing of 0 event >> batches, since offsets just skip to the latest (checked that the first >> part.fromOffset in the restarted job is larger than the last >> part.untilOffset before restart). >> >> >> >> >> On Wed, Aug 26, 2015 at 11:19 AM, Cody Koeninger <c...@koeninger.org> >> wrote: >> >>> When the kafka rdd is actually being iterated on the worker, there >>> should be an info line of the form >>> >>> log.info(s"Computing topic ${part.topic}, partition >>> ${part.partition} " + >>> >>> s"offsets ${part.fromOffset} -> ${part.untilOffset}") >>> >>> >>> You should be able to compare that to log of offsets during checkpoint >>> loading, to see if they line up. >>> >>> Just out of curiosity, does removing the call to checkpoint on the >>> stream affect anything? >>> >>> >>> >>> On Wed, Aug 26, 2015 at 1:04 PM, Susan Zhang <suchenz...@gmail.com> >>> wrote: >>> >>>> Thanks for the suggestions! I tried the following: >>>> >>>> I removed >>>> >>>> createOnError = true >>>> >>>> And reran the same process to reproduce. Double checked that checkpoint >>>> is loading: >>>> >>>> 15/08/26 10:10:40 INFO >>>> DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring >>>> KafkaRDD for time 1440608825000 ms [(install-json,5,825898270,825898528), >>>> (install-json,4,825400856,825401058), >>>> (install-json,1,831453228,831453396), >>>> (install-json,0,1295759888,1295760378), >>>> (install-json,2,824443526,824444409), (install-json,3, >>>> 811222580,811222874)] >>>> 15/08/26 10:10:40 INFO >>>> DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring >>>> KafkaRDD for time 1440608830000 ms [(install-json,5,825898528,825898791), >>>> (install-json,4,825401058,825401249), >>>> (install-json,1,831453396,831453603), >>>> (install-json,0,1295760378,1295760809), >>>> (install-json,2,824444409,824445510), (install-json,3, >>>> 811222874,811223285)] >>>> ... >>>> >>>> And the same issue is appearing as before (with 0 event batches getting >>>> queued corresponding to dropped messages). Our kafka brokers are on version >>>> 0.8.2.0, if that makes a difference. >>>> >>>> Also as a sanity check, I took out the ZK updates and reran (just in >>>> case that was somehow causing problems), and that didn't change anything as >>>> expected. >>>> >>>> Furthermore, the 0 event batches seem to take longer to process than >>>> batches with the regular load of events: processing time for 0 event >>>> batches can be upwards of 1 - 2 minutes, whereas processing time for ~2000 >>>> event batches is consistently < 1s. Why would that happen? >>>> >>>> >>>> As for the checkpoint call: >>>> >>>> directKStream.checkpoint(checkpointDuration) >>>> >>>> was an attempt to set the checkpointing interval (at some multiple of >>>> the batch interval), whereas StreamingContext.checkpoint seems like it will >>>> only set the checkpoint directory. >>>> >>>> >>>> >>>> Thanks for all the help, >>>> >>>> Susan >>>> >>>> >>>> On Wed, Aug 26, 2015 at 7:12 AM, Cody Koeninger <c...@koeninger.org> >>>> wrote: >>>> >>>>> The first thing that stands out to me is >>>>> createOnError = true >>>>> >>>>> Are you sure the checkpoint is actually loading, as opposed to failing >>>>> and starting the job anyway? There should be info lines that look like >>>>> >>>>> INFO DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: >>>>> Restoring KafkaRDD for time 1440597180000 ms [(test,1,162,220) >>>>> >>>>> >>>>> You should be able to tell from those whether the offset ranges being >>>>> loaded from the checkpoint look reasonable. >>>>> >>>>> Also, is there a reason you're calling >>>>> >>>>> directKStream.checkpoint(checkpointDuration) >>>>> >>>>> Just calling checkpoint on the streaming context should be sufficient >>>>> to save the metadata >>>>> >>>>> >>>>> >>>>> On Tue, Aug 25, 2015 at 2:36 PM, Susan Zhang <suchenz...@gmail.com> >>>>> wrote: >>>>> >>>>>> Sure thing! >>>>>> >>>>>> The main looks like: >>>>>> >>>>>> >>>>>> -------------------------------------------------------------------------------------------------- >>>>>> >>>>>> >>>>>> val kafkaBrokers = conf.getString(s"$varPrefix.metadata.broker.list") >>>>>> >>>>>> val kafkaConf = Map( >>>>>> "zookeeper.connect" -> zookeeper, >>>>>> "group.id" -> options.group, >>>>>> "zookeeper.connection.timeout.ms" -> "10000", >>>>>> "auto.commit.interval.ms" -> "1000", >>>>>> "rebalance.max.retries" -> "25", >>>>>> "bootstrap.servers" -> kafkaBrokers >>>>>> ) >>>>>> >>>>>> val ssc = StreamingContext.getOrCreate(checkpointDirectory, >>>>>> () => { >>>>>> createContext(kafkaConf, checkpointDirectory, topic, >>>>>> numThreads, isProd) >>>>>> }, createOnError = true) >>>>>> >>>>>> ssc.start() >>>>>> ssc.awaitTermination() >>>>>> >>>>>> >>>>>> >>>>>> -------------------------------------------------------------------------------------------------- >>>>>> >>>>>> >>>>>> And createContext is defined as: >>>>>> >>>>>> >>>>>> >>>>>> -------------------------------------------------------------------------------------------------- >>>>>> >>>>>> >>>>>> val batchDuration = Seconds(5) >>>>>> val checkpointDuration = Seconds(20) >>>>>> >>>>>> private val AUTO_OFFSET_COMMIT = "auto.commit.enable" >>>>>> >>>>>> def createContext(kafkaConf: Map[String, String], >>>>>> checkpointDirectory: String, >>>>>> topic: String, >>>>>> numThreads: Int, >>>>>> isProd: Boolean) >>>>>> : StreamingContext = { >>>>>> >>>>>> val sparkConf = new SparkConf().setAppName("***") >>>>>> val ssc = new StreamingContext(sparkConf, batchDuration) >>>>>> ssc.checkpoint(checkpointDirectory) >>>>>> >>>>>> val topicSet = topic.split(",").toSet >>>>>> val groupId = kafkaConf.getOrElse("group.id", "") >>>>>> >>>>>> val directKStream = KafkaUtils.createDirectStream[String, String, >>>>>> StringDecoder, StringDecoder](ssc, kafkaConf, topicSet) >>>>>> directKStream.checkpoint(checkpointDuration) >>>>>> >>>>>> val table = *** >>>>>> >>>>>> directKStream.foreachRDD { rdd => >>>>>> val offsetRanges = >>>>>> rdd.asInstanceOf[HasOffsetRanges].offsetRanges >>>>>> rdd.flatMap(rec => someFunc(rec)) >>>>>> .reduceByKey((i1: Long, i2: Long) => if (i1 > i2) i1 else i2) >>>>>> .foreachPartition { partitionRec => >>>>>> val dbWrite = DynamoDBWriter() >>>>>> partitionRec.foreach { >>>>>> /* Update Dynamo Here */ >>>>>> } >>>>>> } >>>>>> >>>>>> /** Set up ZK Connection **/ >>>>>> val props = new Properties() >>>>>> kafkaConf.foreach(param => props.put(param._1, param._2)) >>>>>> >>>>>> props.setProperty(AUTO_OFFSET_COMMIT, "false") >>>>>> >>>>>> val consumerConfig = new ConsumerConfig(props) >>>>>> assert(!consumerConfig.autoCommitEnable) >>>>>> >>>>>> val zkClient = new ZkClient(consumerConfig.zkConnect, >>>>>> consumerConfig.zkSessionTimeoutMs, >>>>>> consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer) >>>>>> >>>>>> offsetRanges.foreach { osr => >>>>>> val topicDirs = new ZKGroupTopicDirs(groupId, osr.topic) >>>>>> val zkPath = >>>>>> s"${topicDirs.consumerOffsetDir}/${osr.partition}" >>>>>> ZkUtils.updatePersistentPath(zkClient, zkPath, >>>>>> osr.untilOffset.toString) >>>>>> } >>>>>> } >>>>>> ssc >>>>>> } >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Aug 25, 2015 at 12:07 PM, Cody Koeninger <c...@koeninger.org> >>>>>> wrote: >>>>>> >>>>>>> Sounds like something's not set up right... can you post a minimal >>>>>>> code example that reproduces the issue? >>>>>>> >>>>>>> On Tue, Aug 25, 2015 at 1:40 PM, Susan Zhang <suchenz...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Yeah. All messages are lost while the streaming job was down. >>>>>>>> >>>>>>>> On Tue, Aug 25, 2015 at 11:37 AM, Cody Koeninger < >>>>>>>> c...@koeninger.org> wrote: >>>>>>>> >>>>>>>>> Are you actually losing messages then? >>>>>>>>> >>>>>>>>> On Tue, Aug 25, 2015 at 1:15 PM, Susan Zhang <suchenz...@gmail.com >>>>>>>>> > wrote: >>>>>>>>> >>>>>>>>>> No; first batch only contains messages received after the second >>>>>>>>>> job starts (messages come in at a steady rate of about 400/second). >>>>>>>>>> >>>>>>>>>> On Tue, Aug 25, 2015 at 11:07 AM, Cody Koeninger < >>>>>>>>>> c...@koeninger.org> wrote: >>>>>>>>>> >>>>>>>>>>> Does the first batch after restart contain all the messages >>>>>>>>>>> received while the job was down? >>>>>>>>>>> >>>>>>>>>>> On Tue, Aug 25, 2015 at 12:53 PM, suchenzang < >>>>>>>>>>> suchenz...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hello, >>>>>>>>>>>> >>>>>>>>>>>> I'm using direct spark streaming (from kafka) with >>>>>>>>>>>> checkpointing, and >>>>>>>>>>>> everything works well until a restart. When I shut down (^C) >>>>>>>>>>>> the first >>>>>>>>>>>> streaming job, wait 1 minute, then re-submit, there is somehow >>>>>>>>>>>> a series of 0 >>>>>>>>>>>> event batches that get queued (corresponding to the 1 minute >>>>>>>>>>>> when the job >>>>>>>>>>>> was down). Eventually, the batches would resume processing, and >>>>>>>>>>>> I would see >>>>>>>>>>>> that each batch has roughly 2000 events. >>>>>>>>>>>> >>>>>>>>>>>> I see that at the beginning of the second launch, the >>>>>>>>>>>> checkpoint dirs are >>>>>>>>>>>> found and "loaded", according to console output. >>>>>>>>>>>> >>>>>>>>>>>> Is this expected behavior? It seems like I might've configured >>>>>>>>>>>> something >>>>>>>>>>>> incorrectly, since I would expect with checkpointing that the >>>>>>>>>>>> streaming job >>>>>>>>>>>> would resume from checkpoint and continue processing from there >>>>>>>>>>>> (without >>>>>>>>>>>> seeing 0 event batches corresponding to when the job was down). >>>>>>>>>>>> >>>>>>>>>>>> Also, if I were to wait > 10 minutes or so before re-launching, >>>>>>>>>>>> there would >>>>>>>>>>>> be so many 0 event batches that the job would hang. Is this >>>>>>>>>>>> merely something >>>>>>>>>>>> to be "waited out", or should I set up some restart >>>>>>>>>>>> behavior/make a config >>>>>>>>>>>> change to discard checkpointing if the elapsed time has been >>>>>>>>>>>> too long? >>>>>>>>>>>> >>>>>>>>>>>> Thanks! >>>>>>>>>>>> >>>>>>>>>>>> < >>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png >>>>>>>>>>>> > >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> View this message in context: >>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html >>>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at >>>>>>>>>>>> Nabble.com. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> --------------------------------------------------------------------- >>>>>>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >