Re: Spark Streaming Checkpointing Restarts with 0 Event Batches
://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming Checkpointing Restarts with 0 Event Batches
ConsumerConfig(props) assert(!consumerConfig.autoCommitEnable) val zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs, consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer) offsetRanges.foreach { osr = val topicDirs = new ZKGroupTopicDirs(groupId, osr.topic) val zkPath = s${topicDirs.consumerOffsetDir}/${osr.partition} ZkUtils.updatePersistentPath(zkClient, zkPath, osr.untilOffset.toString) } } ssc } On Tue, Aug 25, 2015 at 12:07 PM, Cody Koeninger c...@koeninger.org wrote: Sounds like something's not set up right... can you post a minimal code example that reproduces the issue? On Tue, Aug 25, 2015 at 1:40 PM, Susan Zhang suchenz...@gmail.com wrote: Yeah. All messages are lost while the streaming job was down. On Tue, Aug 25, 2015 at 11:37 AM, Cody Koeninger c...@koeninger.org wrote: Are you actually losing messages then? On Tue, Aug 25, 2015 at 1:15 PM, Susan Zhang suchenz...@gmail.com wrote: No; first batch only contains messages received after the second job starts (messages come in at a steady rate of about 400/second). On Tue, Aug 25, 2015 at 11:07 AM, Cody Koeninger c...@koeninger.org wrote: Does the first batch after restart contain all the messages received while the job was down? On Tue, Aug 25, 2015 at 12:53 PM, suchenzang suchenz...@gmail.com wrote: Hello, I'm using direct spark streaming (from kafka) with checkpointing, and everything works well until a restart. When I shut down (^C) the first streaming job, wait 1 minute, then re-submit, there is somehow a series of 0 event batches that get queued (corresponding to the 1 minute when the job was down). Eventually, the batches would resume processing, and I would see that each batch has roughly 2000 events. I see that at the beginning of the second launch, the checkpoint dirs are found and loaded, according to console output. Is this expected behavior? It seems like I might've configured something incorrectly, since I would expect with checkpointing that the streaming job would resume from checkpoint and continue processing from there (without seeing 0 event batches corresponding to when the job was down). Also, if I were to wait 10 minutes or so before re-launching, there would be so many 0 event batches that the job would hang. Is this merely something to be waited out, or should I set up some restart behavior/make a config change to discard checkpointing if the elapsed time has been too long? Thanks! http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming Checkpointing Restarts with 0 Event Batches
- kafkaBrokers ) val ssc = StreamingContext.getOrCreate(checkpointDirectory, () = { createContext(kafkaConf, checkpointDirectory, topic, numThreads, isProd) }, createOnError = true) ssc.start() ssc.awaitTermination() -- And createContext is defined as: -- val batchDuration = Seconds(5) val checkpointDuration = Seconds(20) private val AUTO_OFFSET_COMMIT = auto.commit.enable def createContext(kafkaConf: Map[String, String], checkpointDirectory: String, topic: String, numThreads: Int, isProd: Boolean) : StreamingContext = { val sparkConf = new SparkConf().setAppName(***) val ssc = new StreamingContext(sparkConf, batchDuration) ssc.checkpoint(checkpointDirectory) val topicSet = topic.split(,).toSet val groupId = kafkaConf.getOrElse(group.id, ) val directKStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaConf, topicSet) directKStream.checkpoint(checkpointDuration) val table = *** directKStream.foreachRDD { rdd = val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd.flatMap(rec = someFunc(rec)) .reduceByKey((i1: Long, i2: Long) = if (i1 i2) i1 else i2) .foreachPartition { partitionRec = val dbWrite = DynamoDBWriter() partitionRec.foreach { /* Update Dynamo Here */ } } /** Set up ZK Connection **/ val props = new Properties() kafkaConf.foreach(param = props.put(param._1, param._2)) props.setProperty(AUTO_OFFSET_COMMIT, false) val consumerConfig = new ConsumerConfig(props) assert(!consumerConfig.autoCommitEnable) val zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs, consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer) offsetRanges.foreach { osr = val topicDirs = new ZKGroupTopicDirs(groupId, osr.topic) val zkPath = s${topicDirs.consumerOffsetDir}/${osr.partition} ZkUtils.updatePersistentPath(zkClient, zkPath, osr.untilOffset.toString) } } ssc } On Tue, Aug 25, 2015 at 12:07 PM, Cody Koeninger c...@koeninger.org wrote: Sounds like something's not set up right... can you post a minimal code example that reproduces the issue? On Tue, Aug 25, 2015 at 1:40 PM, Susan Zhang suchenz...@gmail.com wrote: Yeah. All messages are lost while the streaming job was down. On Tue, Aug 25, 2015 at 11:37 AM, Cody Koeninger c...@koeninger.org wrote: Are you actually losing messages then? On Tue, Aug 25, 2015 at 1:15 PM, Susan Zhang suchenz...@gmail.com wrote: No; first batch only contains messages received after the second job starts (messages come in at a steady rate of about 400/second). On Tue, Aug 25, 2015 at 11:07 AM, Cody Koeninger c...@koeninger.org wrote: Does the first batch after restart contain all the messages received while the job was down? On Tue, Aug 25, 2015 at 12:53 PM, suchenzang suchenz...@gmail.com wrote: Hello, I'm using direct spark streaming (from kafka) with checkpointing, and everything works well until a restart. When I shut down (^C) the first streaming job, wait 1 minute, then re-submit, there is somehow a series of 0 event batches that get queued (corresponding to the 1 minute when the job was down). Eventually, the batches would resume processing, and I would see that each batch has roughly 2000 events. I see that at the beginning of the second launch, the checkpoint dirs are found and loaded, according to console output. Is this expected behavior? It seems like I might've configured something incorrectly, since I would expect with checkpointing that the streaming job would resume from checkpoint and continue processing from there (without seeing 0 event batches corresponding to when the job was down). Also, if I were to wait 10 minutes or so before re-launching, there would be so many 0 event batches that the job would hang. Is this merely something to be waited out, or should I set up some restart behavior/make a config change to discard checkpointing if the elapsed time has been too long? Thanks! http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr
Re: Spark Streaming Checkpointing Restarts with 0 Event Batches
, then re-submit, there is somehow a series of 0 event batches that get queued (corresponding to the 1 minute when the job was down). Eventually, the batches would resume processing, and I would see that each batch has roughly 2000 events. I see that at the beginning of the second launch, the checkpoint dirs are found and loaded, according to console output. Is this expected behavior? It seems like I might've configured something incorrectly, since I would expect with checkpointing that the streaming job would resume from checkpoint and continue processing from there (without seeing 0 event batches corresponding to when the job was down). Also, if I were to wait 10 minutes or so before re-launching, there would be so many 0 event batches that the job would hang. Is this merely something to be waited out, or should I set up some restart behavior/make a config change to discard checkpointing if the elapsed time has been too long? Thanks! http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming Checkpointing Restarts with 0 Event Batches
of the second launch, the checkpoint dirs are found and loaded, according to console output. Is this expected behavior? It seems like I might've configured something incorrectly, since I would expect with checkpointing that the streaming job would resume from checkpoint and continue processing from there (without seeing 0 event batches corresponding to when the job was down). Also, if I were to wait 10 minutes or so before re-launching, there would be so many 0 event batches that the job would hang. Is this merely something to be waited out, or should I set up some restart behavior/make a config change to discard checkpointing if the elapsed time has been too long? Thanks! http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Streaming Checkpointing Restarts with 0 Event Batches
Hello, I'm using direct spark streaming (from kafka) with checkpointing, and everything works well until a restart. When I shut down (^C) the first streaming job, wait 1 minute, then re-submit, there is somehow a series of 0 event batches that get queued (corresponding to the 1 minute when the job was down). Eventually, the batches would resume processing, and I would see that each batch has roughly 2000 events. I see that at the beginning of the second launch, the checkpoint dirs are found and loaded, according to console output. Is this expected behavior? It seems like I might've configured something incorrectly, since I would expect with checkpointing that the streaming job would resume from checkpoint and continue processing from there (without seeing 0 event batches corresponding to when the job was down). Also, if I were to wait 10 minutes or so before re-launching, there would be so many 0 event batches that the job would hang. Is this merely something to be waited out, or should I set up some restart behavior/make a config change to discard checkpointing if the elapsed time has been too long? Thanks! http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming Checkpointing Restarts with 0 Event Batches
Does the first batch after restart contain all the messages received while the job was down? On Tue, Aug 25, 2015 at 12:53 PM, suchenzang suchenz...@gmail.com wrote: Hello, I'm using direct spark streaming (from kafka) with checkpointing, and everything works well until a restart. When I shut down (^C) the first streaming job, wait 1 minute, then re-submit, there is somehow a series of 0 event batches that get queued (corresponding to the 1 minute when the job was down). Eventually, the batches would resume processing, and I would see that each batch has roughly 2000 events. I see that at the beginning of the second launch, the checkpoint dirs are found and loaded, according to console output. Is this expected behavior? It seems like I might've configured something incorrectly, since I would expect with checkpointing that the streaming job would resume from checkpoint and continue processing from there (without seeing 0 event batches corresponding to when the job was down). Also, if I were to wait 10 minutes or so before re-launching, there would be so many 0 event batches that the job would hang. Is this merely something to be waited out, or should I set up some restart behavior/make a config change to discard checkpointing if the elapsed time has been too long? Thanks! http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming Checkpointing Restarts with 0 Event Batches
Are you actually losing messages then? On Tue, Aug 25, 2015 at 1:15 PM, Susan Zhang suchenz...@gmail.com wrote: No; first batch only contains messages received after the second job starts (messages come in at a steady rate of about 400/second). On Tue, Aug 25, 2015 at 11:07 AM, Cody Koeninger c...@koeninger.org wrote: Does the first batch after restart contain all the messages received while the job was down? On Tue, Aug 25, 2015 at 12:53 PM, suchenzang suchenz...@gmail.com wrote: Hello, I'm using direct spark streaming (from kafka) with checkpointing, and everything works well until a restart. When I shut down (^C) the first streaming job, wait 1 minute, then re-submit, there is somehow a series of 0 event batches that get queued (corresponding to the 1 minute when the job was down). Eventually, the batches would resume processing, and I would see that each batch has roughly 2000 events. I see that at the beginning of the second launch, the checkpoint dirs are found and loaded, according to console output. Is this expected behavior? It seems like I might've configured something incorrectly, since I would expect with checkpointing that the streaming job would resume from checkpoint and continue processing from there (without seeing 0 event batches corresponding to when the job was down). Also, if I were to wait 10 minutes or so before re-launching, there would be so many 0 event batches that the job would hang. Is this merely something to be waited out, or should I set up some restart behavior/make a config change to discard checkpointing if the elapsed time has been too long? Thanks! http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming Checkpointing Restarts with 0 Event Batches
Yeah. All messages are lost while the streaming job was down. On Tue, Aug 25, 2015 at 11:37 AM, Cody Koeninger c...@koeninger.org wrote: Are you actually losing messages then? On Tue, Aug 25, 2015 at 1:15 PM, Susan Zhang suchenz...@gmail.com wrote: No; first batch only contains messages received after the second job starts (messages come in at a steady rate of about 400/second). On Tue, Aug 25, 2015 at 11:07 AM, Cody Koeninger c...@koeninger.org wrote: Does the first batch after restart contain all the messages received while the job was down? On Tue, Aug 25, 2015 at 12:53 PM, suchenzang suchenz...@gmail.com wrote: Hello, I'm using direct spark streaming (from kafka) with checkpointing, and everything works well until a restart. When I shut down (^C) the first streaming job, wait 1 minute, then re-submit, there is somehow a series of 0 event batches that get queued (corresponding to the 1 minute when the job was down). Eventually, the batches would resume processing, and I would see that each batch has roughly 2000 events. I see that at the beginning of the second launch, the checkpoint dirs are found and loaded, according to console output. Is this expected behavior? It seems like I might've configured something incorrectly, since I would expect with checkpointing that the streaming job would resume from checkpoint and continue processing from there (without seeing 0 event batches corresponding to when the job was down). Also, if I were to wait 10 minutes or so before re-launching, there would be so many 0 event batches that the job would hang. Is this merely something to be waited out, or should I set up some restart behavior/make a config change to discard checkpointing if the elapsed time has been too long? Thanks! http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming Checkpointing Restarts with 0 Event Batches
Sounds like something's not set up right... can you post a minimal code example that reproduces the issue? On Tue, Aug 25, 2015 at 1:40 PM, Susan Zhang suchenz...@gmail.com wrote: Yeah. All messages are lost while the streaming job was down. On Tue, Aug 25, 2015 at 11:37 AM, Cody Koeninger c...@koeninger.org wrote: Are you actually losing messages then? On Tue, Aug 25, 2015 at 1:15 PM, Susan Zhang suchenz...@gmail.com wrote: No; first batch only contains messages received after the second job starts (messages come in at a steady rate of about 400/second). On Tue, Aug 25, 2015 at 11:07 AM, Cody Koeninger c...@koeninger.org wrote: Does the first batch after restart contain all the messages received while the job was down? On Tue, Aug 25, 2015 at 12:53 PM, suchenzang suchenz...@gmail.com wrote: Hello, I'm using direct spark streaming (from kafka) with checkpointing, and everything works well until a restart. When I shut down (^C) the first streaming job, wait 1 minute, then re-submit, there is somehow a series of 0 event batches that get queued (corresponding to the 1 minute when the job was down). Eventually, the batches would resume processing, and I would see that each batch has roughly 2000 events. I see that at the beginning of the second launch, the checkpoint dirs are found and loaded, according to console output. Is this expected behavior? It seems like I might've configured something incorrectly, since I would expect with checkpointing that the streaming job would resume from checkpoint and continue processing from there (without seeing 0 event batches corresponding to when the job was down). Also, if I were to wait 10 minutes or so before re-launching, there would be so many 0 event batches that the job would hang. Is this merely something to be waited out, or should I set up some restart behavior/make a config change to discard checkpointing if the elapsed time has been too long? Thanks! http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming Checkpointing Restarts with 0 Event Batches
No; first batch only contains messages received after the second job starts (messages come in at a steady rate of about 400/second). On Tue, Aug 25, 2015 at 11:07 AM, Cody Koeninger c...@koeninger.org wrote: Does the first batch after restart contain all the messages received while the job was down? On Tue, Aug 25, 2015 at 12:53 PM, suchenzang suchenz...@gmail.com wrote: Hello, I'm using direct spark streaming (from kafka) with checkpointing, and everything works well until a restart. When I shut down (^C) the first streaming job, wait 1 minute, then re-submit, there is somehow a series of 0 event batches that get queued (corresponding to the 1 minute when the job was down). Eventually, the batches would resume processing, and I would see that each batch has roughly 2000 events. I see that at the beginning of the second launch, the checkpoint dirs are found and loaded, according to console output. Is this expected behavior? It seems like I might've configured something incorrectly, since I would expect with checkpointing that the streaming job would resume from checkpoint and continue processing from there (without seeing 0 event batches corresponding to when the job was down). Also, if I were to wait 10 minutes or so before re-launching, there would be so many 0 event batches that the job would hang. Is this merely something to be waited out, or should I set up some restart behavior/make a config change to discard checkpointing if the elapsed time has been too long? Thanks! http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming Checkpointing Restarts with 0 Event Batches
-25_at_10.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org