Re: Spark Streaming Checkpointing Restarts with 0 Event Batches

2015-08-26 Thread Cody Koeninger
://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png
 



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org














Re: Spark Streaming Checkpointing Restarts with 0 Event Batches

2015-08-26 Thread Susan Zhang
 ConsumerConfig(props)
   assert(!consumerConfig.autoCommitEnable)

   val zkClient = new ZkClient(consumerConfig.zkConnect,
 consumerConfig.zkSessionTimeoutMs,
 consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)

   offsetRanges.foreach { osr =
 val topicDirs = new ZKGroupTopicDirs(groupId, osr.topic)
 val zkPath = s${topicDirs.consumerOffsetDir}/${osr.partition}
 ZkUtils.updatePersistentPath(zkClient, zkPath,
 osr.untilOffset.toString)
   }
 }
 ssc
   }



 On Tue, Aug 25, 2015 at 12:07 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Sounds like something's not set up right... can you post a minimal code
 example that reproduces the issue?

 On Tue, Aug 25, 2015 at 1:40 PM, Susan Zhang suchenz...@gmail.com
 wrote:

 Yeah. All messages are lost while the streaming job was down.

 On Tue, Aug 25, 2015 at 11:37 AM, Cody Koeninger c...@koeninger.org
 wrote:

 Are you actually losing messages then?

 On Tue, Aug 25, 2015 at 1:15 PM, Susan Zhang suchenz...@gmail.com
 wrote:

 No; first batch only contains messages received after the second job
 starts (messages come in at a steady rate of about 400/second).

 On Tue, Aug 25, 2015 at 11:07 AM, Cody Koeninger c...@koeninger.org
 wrote:

 Does the first batch after restart contain all the messages received
 while the job was down?

 On Tue, Aug 25, 2015 at 12:53 PM, suchenzang suchenz...@gmail.com
 wrote:

 Hello,

 I'm using direct spark streaming (from kafka) with checkpointing,
 and
 everything works well until a restart. When I shut down (^C) the
 first
 streaming job, wait 1 minute, then re-submit, there is somehow a
 series of 0
 event batches that get queued (corresponding to the 1 minute when
 the job
 was down). Eventually, the batches would resume processing, and I
 would see
 that each batch has roughly 2000 events.

 I see that at the beginning of the second launch, the checkpoint
 dirs are
 found and loaded, according to console output.

 Is this expected behavior? It seems like I might've configured
 something
 incorrectly, since I would expect with checkpointing that the
 streaming job
 would resume from checkpoint and continue processing from there
 (without
 seeing 0 event batches corresponding to when the job was down).

 Also, if I were to wait  10 minutes or so before re-launching,
 there would
 be so many 0 event batches that the job would hang. Is this merely
 something
 to be waited out, or should I set up some restart behavior/make a
 config
 change to discard checkpointing if the elapsed time has been too
 long?

 Thanks!

 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png
 



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org











Re: Spark Streaming Checkpointing Restarts with 0 Event Batches

2015-08-26 Thread Susan Zhang
 - kafkaBrokers
 )

 val ssc = StreamingContext.getOrCreate(checkpointDirectory,
   () = {
 createContext(kafkaConf, checkpointDirectory, topic,
 numThreads, isProd)
   }, createOnError = true)

 ssc.start()
 ssc.awaitTermination()



 --


 And createContext is defined as:



 --


 val batchDuration = Seconds(5)
 val checkpointDuration = Seconds(20)

 private val AUTO_OFFSET_COMMIT = auto.commit.enable

 def createContext(kafkaConf: Map[String, String],
 checkpointDirectory: String,
 topic: String,
 numThreads: Int,
 isProd: Boolean)
   : StreamingContext = {

 val sparkConf = new SparkConf().setAppName(***)
 val ssc = new StreamingContext(sparkConf, batchDuration)
 ssc.checkpoint(checkpointDirectory)

 val topicSet = topic.split(,).toSet
 val groupId = kafkaConf.getOrElse(group.id, )

 val directKStream = KafkaUtils.createDirectStream[String, String,
 StringDecoder, StringDecoder](ssc, kafkaConf, topicSet)
 directKStream.checkpoint(checkpointDuration)

 val table = ***

 directKStream.foreachRDD { rdd =
   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
   rdd.flatMap(rec = someFunc(rec))
 .reduceByKey((i1: Long, i2: Long) = if (i1  i2) i1 else i2)
 .foreachPartition { partitionRec =
   val dbWrite = DynamoDBWriter()
   partitionRec.foreach {
 /* Update Dynamo Here */
   }
 }

   /** Set up ZK Connection **/
   val props = new Properties()
   kafkaConf.foreach(param = props.put(param._1, param._2))

   props.setProperty(AUTO_OFFSET_COMMIT, false)

   val consumerConfig = new ConsumerConfig(props)
   assert(!consumerConfig.autoCommitEnable)

   val zkClient = new ZkClient(consumerConfig.zkConnect,
 consumerConfig.zkSessionTimeoutMs,
 consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)

   offsetRanges.foreach { osr =
 val topicDirs = new ZKGroupTopicDirs(groupId, osr.topic)
 val zkPath = s${topicDirs.consumerOffsetDir}/${osr.partition}
 ZkUtils.updatePersistentPath(zkClient, zkPath,
 osr.untilOffset.toString)
   }
 }
 ssc
   }



 On Tue, Aug 25, 2015 at 12:07 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Sounds like something's not set up right... can you post a minimal
 code example that reproduces the issue?

 On Tue, Aug 25, 2015 at 1:40 PM, Susan Zhang suchenz...@gmail.com
 wrote:

 Yeah. All messages are lost while the streaming job was down.

 On Tue, Aug 25, 2015 at 11:37 AM, Cody Koeninger c...@koeninger.org
 wrote:

 Are you actually losing messages then?

 On Tue, Aug 25, 2015 at 1:15 PM, Susan Zhang suchenz...@gmail.com
 wrote:

 No; first batch only contains messages received after the second
 job starts (messages come in at a steady rate of about 400/second).

 On Tue, Aug 25, 2015 at 11:07 AM, Cody Koeninger 
 c...@koeninger.org wrote:

 Does the first batch after restart contain all the messages
 received while the job was down?

 On Tue, Aug 25, 2015 at 12:53 PM, suchenzang suchenz...@gmail.com
  wrote:

 Hello,

 I'm using direct spark streaming (from kafka) with checkpointing,
 and
 everything works well until a restart. When I shut down (^C) the
 first
 streaming job, wait 1 minute, then re-submit, there is somehow a
 series of 0
 event batches that get queued (corresponding to the 1 minute when
 the job
 was down). Eventually, the batches would resume processing, and I
 would see
 that each batch has roughly 2000 events.

 I see that at the beginning of the second launch, the checkpoint
 dirs are
 found and loaded, according to console output.

 Is this expected behavior? It seems like I might've configured
 something
 incorrectly, since I would expect with checkpointing that the
 streaming job
 would resume from checkpoint and continue processing from there
 (without
 seeing 0 event batches corresponding to when the job was down).

 Also, if I were to wait  10 minutes or so before re-launching,
 there would
 be so many 0 event batches that the job would hang. Is this
 merely something
 to be waited out, or should I set up some restart behavior/make
 a config
 change to discard checkpointing if the elapsed time has been too
 long?

 Thanks!

 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png
 



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.


 -
 To unsubscribe, e-mail: user-unsubscr

Re: Spark Streaming Checkpointing Restarts with 0 Event Batches

2015-08-26 Thread Susan Zhang
, then re-submit, there is somehow
 a series of 0
 event batches that get queued (corresponding to the 1 minute
 when the job
 was down). Eventually, the batches would resume processing, and
 I would see
 that each batch has roughly 2000 events.

 I see that at the beginning of the second launch, the
 checkpoint dirs are
 found and loaded, according to console output.

 Is this expected behavior? It seems like I might've configured
 something
 incorrectly, since I would expect with checkpointing that the
 streaming job
 would resume from checkpoint and continue processing from there
 (without
 seeing 0 event batches corresponding to when the job was down).

 Also, if I were to wait  10 minutes or so before re-launching,
 there would
 be so many 0 event batches that the job would hang. Is this
 merely something
 to be waited out, or should I set up some restart
 behavior/make a config
 change to discard checkpointing if the elapsed time has been
 too long?

 Thanks!

 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png
 



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org















Re: Spark Streaming Checkpointing Restarts with 0 Event Batches

2015-08-26 Thread Cody Koeninger
 of the second launch, the checkpoint
 dirs are
 found and loaded, according to console output.

 Is this expected behavior? It seems like I might've configured
 something
 incorrectly, since I would expect with checkpointing that the
 streaming job
 would resume from checkpoint and continue processing from there
 (without
 seeing 0 event batches corresponding to when the job was down).

 Also, if I were to wait  10 minutes or so before re-launching,
 there would
 be so many 0 event batches that the job would hang. Is this merely
 something
 to be waited out, or should I set up some restart behavior/make a
 config
 change to discard checkpointing if the elapsed time has been too
 long?

 Thanks!

 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png
 



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org










Spark Streaming Checkpointing Restarts with 0 Event Batches

2015-08-25 Thread suchenzang
Hello,

I'm using direct spark streaming (from kafka) with checkpointing, and
everything works well until a restart. When I shut down (^C) the first
streaming job, wait 1 minute, then re-submit, there is somehow a series of 0
event batches that get queued (corresponding to the 1 minute when the job
was down). Eventually, the batches would resume processing, and I would see
that each batch has roughly 2000 events.

I see that at the beginning of the second launch, the checkpoint dirs are
found and loaded, according to console output.

Is this expected behavior? It seems like I might've configured something
incorrectly, since I would expect with checkpointing that the streaming job
would resume from checkpoint and continue processing from there (without
seeing 0 event batches corresponding to when the job was down).

Also, if I were to wait  10 minutes or so before re-launching, there would
be so many 0 event batches that the job would hang. Is this merely something
to be waited out, or should I set up some restart behavior/make a config
change to discard checkpointing if the elapsed time has been too long?

Thanks!

http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png
 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming Checkpointing Restarts with 0 Event Batches

2015-08-25 Thread Cody Koeninger
Does the first batch after restart contain all the messages received while
the job was down?

On Tue, Aug 25, 2015 at 12:53 PM, suchenzang suchenz...@gmail.com wrote:

 Hello,

 I'm using direct spark streaming (from kafka) with checkpointing, and
 everything works well until a restart. When I shut down (^C) the first
 streaming job, wait 1 minute, then re-submit, there is somehow a series of
 0
 event batches that get queued (corresponding to the 1 minute when the job
 was down). Eventually, the batches would resume processing, and I would see
 that each batch has roughly 2000 events.

 I see that at the beginning of the second launch, the checkpoint dirs are
 found and loaded, according to console output.

 Is this expected behavior? It seems like I might've configured something
 incorrectly, since I would expect with checkpointing that the streaming job
 would resume from checkpoint and continue processing from there (without
 seeing 0 event batches corresponding to when the job was down).

 Also, if I were to wait  10 minutes or so before re-launching, there would
 be so many 0 event batches that the job would hang. Is this merely
 something
 to be waited out, or should I set up some restart behavior/make a config
 change to discard checkpointing if the elapsed time has been too long?

 Thanks!

 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png
 



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark Streaming Checkpointing Restarts with 0 Event Batches

2015-08-25 Thread Cody Koeninger
Are you actually losing messages then?

On Tue, Aug 25, 2015 at 1:15 PM, Susan Zhang suchenz...@gmail.com wrote:

 No; first batch only contains messages received after the second job
 starts (messages come in at a steady rate of about 400/second).

 On Tue, Aug 25, 2015 at 11:07 AM, Cody Koeninger c...@koeninger.org
 wrote:

 Does the first batch after restart contain all the messages received
 while the job was down?

 On Tue, Aug 25, 2015 at 12:53 PM, suchenzang suchenz...@gmail.com
 wrote:

 Hello,

 I'm using direct spark streaming (from kafka) with checkpointing, and
 everything works well until a restart. When I shut down (^C) the first
 streaming job, wait 1 minute, then re-submit, there is somehow a series
 of 0
 event batches that get queued (corresponding to the 1 minute when the job
 was down). Eventually, the batches would resume processing, and I would
 see
 that each batch has roughly 2000 events.

 I see that at the beginning of the second launch, the checkpoint dirs are
 found and loaded, according to console output.

 Is this expected behavior? It seems like I might've configured something
 incorrectly, since I would expect with checkpointing that the streaming
 job
 would resume from checkpoint and continue processing from there (without
 seeing 0 event batches corresponding to when the job was down).

 Also, if I were to wait  10 minutes or so before re-launching, there
 would
 be so many 0 event batches that the job would hang. Is this merely
 something
 to be waited out, or should I set up some restart behavior/make a
 config
 change to discard checkpointing if the elapsed time has been too long?

 Thanks!

 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png
 



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: Spark Streaming Checkpointing Restarts with 0 Event Batches

2015-08-25 Thread Susan Zhang
Yeah. All messages are lost while the streaming job was down.

On Tue, Aug 25, 2015 at 11:37 AM, Cody Koeninger c...@koeninger.org wrote:

 Are you actually losing messages then?

 On Tue, Aug 25, 2015 at 1:15 PM, Susan Zhang suchenz...@gmail.com wrote:

 No; first batch only contains messages received after the second job
 starts (messages come in at a steady rate of about 400/second).

 On Tue, Aug 25, 2015 at 11:07 AM, Cody Koeninger c...@koeninger.org
 wrote:

 Does the first batch after restart contain all the messages received
 while the job was down?

 On Tue, Aug 25, 2015 at 12:53 PM, suchenzang suchenz...@gmail.com
 wrote:

 Hello,

 I'm using direct spark streaming (from kafka) with checkpointing, and
 everything works well until a restart. When I shut down (^C) the first
 streaming job, wait 1 minute, then re-submit, there is somehow a series
 of 0
 event batches that get queued (corresponding to the 1 minute when the
 job
 was down). Eventually, the batches would resume processing, and I would
 see
 that each batch has roughly 2000 events.

 I see that at the beginning of the second launch, the checkpoint dirs
 are
 found and loaded, according to console output.

 Is this expected behavior? It seems like I might've configured something
 incorrectly, since I would expect with checkpointing that the streaming
 job
 would resume from checkpoint and continue processing from there (without
 seeing 0 event batches corresponding to when the job was down).

 Also, if I were to wait  10 minutes or so before re-launching, there
 would
 be so many 0 event batches that the job would hang. Is this merely
 something
 to be waited out, or should I set up some restart behavior/make a
 config
 change to discard checkpointing if the elapsed time has been too long?

 Thanks!

 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png
 



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: Spark Streaming Checkpointing Restarts with 0 Event Batches

2015-08-25 Thread Cody Koeninger
Sounds like something's not set up right... can you post a minimal code
example that reproduces the issue?

On Tue, Aug 25, 2015 at 1:40 PM, Susan Zhang suchenz...@gmail.com wrote:

 Yeah. All messages are lost while the streaming job was down.

 On Tue, Aug 25, 2015 at 11:37 AM, Cody Koeninger c...@koeninger.org
 wrote:

 Are you actually losing messages then?

 On Tue, Aug 25, 2015 at 1:15 PM, Susan Zhang suchenz...@gmail.com
 wrote:

 No; first batch only contains messages received after the second job
 starts (messages come in at a steady rate of about 400/second).

 On Tue, Aug 25, 2015 at 11:07 AM, Cody Koeninger c...@koeninger.org
 wrote:

 Does the first batch after restart contain all the messages received
 while the job was down?

 On Tue, Aug 25, 2015 at 12:53 PM, suchenzang suchenz...@gmail.com
 wrote:

 Hello,

 I'm using direct spark streaming (from kafka) with checkpointing, and
 everything works well until a restart. When I shut down (^C) the first
 streaming job, wait 1 minute, then re-submit, there is somehow a
 series of 0
 event batches that get queued (corresponding to the 1 minute when the
 job
 was down). Eventually, the batches would resume processing, and I
 would see
 that each batch has roughly 2000 events.

 I see that at the beginning of the second launch, the checkpoint dirs
 are
 found and loaded, according to console output.

 Is this expected behavior? It seems like I might've configured
 something
 incorrectly, since I would expect with checkpointing that the
 streaming job
 would resume from checkpoint and continue processing from there
 (without
 seeing 0 event batches corresponding to when the job was down).

 Also, if I were to wait  10 minutes or so before re-launching, there
 would
 be so many 0 event batches that the job would hang. Is this merely
 something
 to be waited out, or should I set up some restart behavior/make a
 config
 change to discard checkpointing if the elapsed time has been too long?

 Thanks!

 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png
 



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org








Re: Spark Streaming Checkpointing Restarts with 0 Event Batches

2015-08-25 Thread Susan Zhang
No; first batch only contains messages received after the second job starts
(messages come in at a steady rate of about 400/second).

On Tue, Aug 25, 2015 at 11:07 AM, Cody Koeninger c...@koeninger.org wrote:

 Does the first batch after restart contain all the messages received while
 the job was down?

 On Tue, Aug 25, 2015 at 12:53 PM, suchenzang suchenz...@gmail.com wrote:

 Hello,

 I'm using direct spark streaming (from kafka) with checkpointing, and
 everything works well until a restart. When I shut down (^C) the first
 streaming job, wait 1 minute, then re-submit, there is somehow a series
 of 0
 event batches that get queued (corresponding to the 1 minute when the job
 was down). Eventually, the batches would resume processing, and I would
 see
 that each batch has roughly 2000 events.

 I see that at the beginning of the second launch, the checkpoint dirs are
 found and loaded, according to console output.

 Is this expected behavior? It seems like I might've configured something
 incorrectly, since I would expect with checkpointing that the streaming
 job
 would resume from checkpoint and continue processing from there (without
 seeing 0 event batches corresponding to when the job was down).

 Also, if I were to wait  10 minutes or so before re-launching, there
 would
 be so many 0 event batches that the job would hang. Is this merely
 something
 to be waited out, or should I set up some restart behavior/make a config
 change to discard checkpointing if the elapsed time has been too long?

 Thanks!

 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png
 



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Spark Streaming Checkpointing Restarts with 0 Event Batches

2015-08-25 Thread Susan Zhang
-25_at_10.png
 



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org