Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing

2015-08-10 Thread Cody Koeninger
That looks like it's during recovery from a checkpoint, so it'd be driver
memory not executor memory.

How big is the checkpoint directory that you're trying to restore from?

On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg 
dgoldenberg...@gmail.com wrote:

 We're getting the below error.  Tried increasing spark.executor.memory
 e.g. from 1g to 2g but the below error still happens.

 Any recommendations? Something to do with specifying -Xmx in the submit
 job scripts?

 Thanks.

 Exception in thread main java.lang.OutOfMemoryError: GC overhead limit
 exceeded
 at java.util.Arrays.copyOf(Arrays.java:3332)
 at
 java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
 at
 java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
 at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
 at java.lang.StringBuilder.append(StringBuilder.java:136)
 at java.lang.StackTraceElement.toString(StackTraceElement.java:173)
 at
 org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212)
 at
 org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190)
 at
 org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
 at
 org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441)
 at org.apache.spark.rdd.RDD.init(RDD.scala:1365)
 at org.apache.spark.streaming.kafka.KafkaRDD.init(KafkaRDD.scala:46)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:153)
 at
 org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:402)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403)
 at
 org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:149)






Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing

2015-08-10 Thread Ted Yu
I wonder during recovery from a checkpoint whether we can estimate the size
of the checkpoint and compare with Runtime.getRuntime().freeMemory().

If the size of checkpoint is much bigger than free memory, log warning, etc

Cheers

On Mon, Aug 10, 2015 at 9:34 AM, Dmitry Goldenberg dgoldenberg...@gmail.com
 wrote:

 Thanks, Cody, will try that. Unfortunately due to a reinstall I don't have
 the original checkpointing directory :(  Thanks for the clarification on
 spark.driver.memory, I'll keep testing (at 2g things seem OK for now).

 On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger c...@koeninger.org
 wrote:

 That looks like it's during recovery from a checkpoint, so it'd be driver
 memory not executor memory.

 How big is the checkpoint directory that you're trying to restore from?

 On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 We're getting the below error.  Tried increasing spark.executor.memory
 e.g. from 1g to 2g but the below error still happens.

 Any recommendations? Something to do with specifying -Xmx in the submit
 job scripts?

 Thanks.

 Exception in thread main java.lang.OutOfMemoryError: GC overhead limit
 exceeded
 at java.util.Arrays.copyOf(Arrays.java:3332)
 at
 java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
 at
 java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
 at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
 at java.lang.StringBuilder.append(StringBuilder.java:136)
 at java.lang.StackTraceElement.toString(StackTraceElement.java:173)
 at
 org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212)
 at
 org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190)
 at
 org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
 at
 org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441)
 at org.apache.spark.rdd.RDD.init(RDD.scala:1365)
 at org.apache.spark.streaming.kafka.KafkaRDD.init(KafkaRDD.scala:46)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:153)
 at
 org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:402)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403)
 at
 org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:149)








Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing

2015-08-10 Thread Dmitry Goldenberg
Would there be a way to chunk up/batch up the contents of the checkpointing
directories as they're being processed by Spark Streaming?  Is it mandatory
to load the whole thing in one go?

On Mon, Aug 10, 2015 at 12:42 PM, Ted Yu yuzhih...@gmail.com wrote:

 I wonder during recovery from a checkpoint whether we can estimate the
 size of the checkpoint and compare with Runtime.getRuntime().freeMemory().

 If the size of checkpoint is much bigger than free memory, log warning, etc

 Cheers

 On Mon, Aug 10, 2015 at 9:34 AM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 Thanks, Cody, will try that. Unfortunately due to a reinstall I don't
 have the original checkpointing directory :(  Thanks for the clarification
 on spark.driver.memory, I'll keep testing (at 2g things seem OK for now).

 On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger c...@koeninger.org
 wrote:

 That looks like it's during recovery from a checkpoint, so it'd be
 driver memory not executor memory.

 How big is the checkpoint directory that you're trying to restore from?

 On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 We're getting the below error.  Tried increasing spark.executor.memory
 e.g. from 1g to 2g but the below error still happens.

 Any recommendations? Something to do with specifying -Xmx in the submit
 job scripts?

 Thanks.

 Exception in thread main java.lang.OutOfMemoryError: GC overhead
 limit exceeded
 at java.util.Arrays.copyOf(Arrays.java:3332)
 at
 java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
 at
 java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
 at
 java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
 at java.lang.StringBuilder.append(StringBuilder.java:136)
 at java.lang.StackTraceElement.toString(StackTraceElement.java:173)
 at
 org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212)
 at
 org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190)
 at
 org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
 at
 org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441)
 at org.apache.spark.rdd.RDD.init(RDD.scala:1365)
 at org.apache.spark.streaming.kafka.KafkaRDD.init(KafkaRDD.scala:46)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:153)
 at
 org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:402)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403)
 at
 org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:149)









Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing

2015-08-10 Thread Cody Koeninger
You need to keep a certain number of rdds around for checkpointing, based
on e.g. the window size.  Those would all need to be loaded at once.

On Mon, Aug 10, 2015 at 11:49 AM, Dmitry Goldenberg 
dgoldenberg...@gmail.com wrote:

 Would there be a way to chunk up/batch up the contents of the
 checkpointing directories as they're being processed by Spark Streaming?
 Is it mandatory to load the whole thing in one go?

 On Mon, Aug 10, 2015 at 12:42 PM, Ted Yu yuzhih...@gmail.com wrote:

 I wonder during recovery from a checkpoint whether we can estimate the
 size of the checkpoint and compare with Runtime.getRuntime().freeMemory
 ().

 If the size of checkpoint is much bigger than free memory, log warning,
 etc

 Cheers

 On Mon, Aug 10, 2015 at 9:34 AM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 Thanks, Cody, will try that. Unfortunately due to a reinstall I don't
 have the original checkpointing directory :(  Thanks for the clarification
 on spark.driver.memory, I'll keep testing (at 2g things seem OK for now).

 On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger c...@koeninger.org
 wrote:

 That looks like it's during recovery from a checkpoint, so it'd be
 driver memory not executor memory.

 How big is the checkpoint directory that you're trying to restore from?

 On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 We're getting the below error.  Tried increasing spark.executor.memory
 e.g. from 1g to 2g but the below error still happens.

 Any recommendations? Something to do with specifying -Xmx in the
 submit job scripts?

 Thanks.

 Exception in thread main java.lang.OutOfMemoryError: GC overhead
 limit exceeded
 at java.util.Arrays.copyOf(Arrays.java:3332)
 at
 java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
 at
 java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
 at
 java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
 at java.lang.StringBuilder.append(StringBuilder.java:136)
 at java.lang.StackTraceElement.toString(StackTraceElement.java:173)
 at
 org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212)
 at
 org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190)
 at
 org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
 at
 org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441)
 at org.apache.spark.rdd.RDD.init(RDD.scala:1365)
 at org.apache.spark.streaming.kafka.KafkaRDD.init(KafkaRDD.scala:46)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:153)
 at
 org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:402)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403)
 at
 org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:149)










Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing

2015-08-10 Thread Ted Yu
Looks like workaround is to reduce *window length.*

*Cheers*

On Mon, Aug 10, 2015 at 10:07 AM, Cody Koeninger c...@koeninger.org wrote:

 You need to keep a certain number of rdds around for checkpointing, based
 on e.g. the window size.  Those would all need to be loaded at once.

 On Mon, Aug 10, 2015 at 11:49 AM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 Would there be a way to chunk up/batch up the contents of the
 checkpointing directories as they're being processed by Spark Streaming?
 Is it mandatory to load the whole thing in one go?

 On Mon, Aug 10, 2015 at 12:42 PM, Ted Yu yuzhih...@gmail.com wrote:

 I wonder during recovery from a checkpoint whether we can estimate the
 size of the checkpoint and compare with Runtime.getRuntime().freeMemory
 ().

 If the size of checkpoint is much bigger than free memory, log warning,
 etc

 Cheers

 On Mon, Aug 10, 2015 at 9:34 AM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 Thanks, Cody, will try that. Unfortunately due to a reinstall I don't
 have the original checkpointing directory :(  Thanks for the clarification
 on spark.driver.memory, I'll keep testing (at 2g things seem OK for now).

 On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger c...@koeninger.org
 wrote:

 That looks like it's during recovery from a checkpoint, so it'd be
 driver memory not executor memory.

 How big is the checkpoint directory that you're trying to restore from?

 On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 We're getting the below error.  Tried increasing
 spark.executor.memory e.g. from 1g to 2g but the below error still 
 happens.

 Any recommendations? Something to do with specifying -Xmx in the
 submit job scripts?

 Thanks.

 Exception in thread main java.lang.OutOfMemoryError: GC overhead
 limit exceeded
 at java.util.Arrays.copyOf(Arrays.java:3332)
 at
 java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
 at
 java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
 at
 java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
 at java.lang.StringBuilder.append(StringBuilder.java:136)
 at java.lang.StackTraceElement.toString(StackTraceElement.java:173)
 at
 org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212)
 at
 org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190)
 at
 org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
 at
 org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441)
 at org.apache.spark.rdd.RDD.init(RDD.scala:1365)
 at org.apache.spark.streaming.kafka.KafkaRDD.init(KafkaRDD.scala:46)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:153)
 at
 org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:402)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403)
 at
 org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:149)











Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing

2015-08-10 Thread Dmitry Goldenberg
You need to keep a certain number of rdds around for checkpointing --
that seems like a hefty expense to pay in order to achieve fault
tolerance.  Why does Spark persist whole RDD's of data?  Shouldn't it be
sufficient to just persist the offsets, to know where to resume from?

Thanks.

On Mon, Aug 10, 2015 at 1:07 PM, Cody Koeninger c...@koeninger.org wrote:

 You need to keep a certain number of rdds around for checkpointing, based
 on e.g. the window size.  Those would all need to be loaded at once.

 On Mon, Aug 10, 2015 at 11:49 AM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 Would there be a way to chunk up/batch up the contents of the
 checkpointing directories as they're being processed by Spark Streaming?
 Is it mandatory to load the whole thing in one go?

 On Mon, Aug 10, 2015 at 12:42 PM, Ted Yu yuzhih...@gmail.com wrote:

 I wonder during recovery from a checkpoint whether we can estimate the
 size of the checkpoint and compare with Runtime.getRuntime().freeMemory
 ().

 If the size of checkpoint is much bigger than free memory, log warning,
 etc

 Cheers

 On Mon, Aug 10, 2015 at 9:34 AM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 Thanks, Cody, will try that. Unfortunately due to a reinstall I don't
 have the original checkpointing directory :(  Thanks for the clarification
 on spark.driver.memory, I'll keep testing (at 2g things seem OK for now).

 On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger c...@koeninger.org
 wrote:

 That looks like it's during recovery from a checkpoint, so it'd be
 driver memory not executor memory.

 How big is the checkpoint directory that you're trying to restore from?

 On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 We're getting the below error.  Tried increasing
 spark.executor.memory e.g. from 1g to 2g but the below error still 
 happens.

 Any recommendations? Something to do with specifying -Xmx in the
 submit job scripts?

 Thanks.

 Exception in thread main java.lang.OutOfMemoryError: GC overhead
 limit exceeded
 at java.util.Arrays.copyOf(Arrays.java:3332)
 at
 java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
 at
 java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
 at
 java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
 at java.lang.StringBuilder.append(StringBuilder.java:136)
 at java.lang.StackTraceElement.toString(StackTraceElement.java:173)
 at
 org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212)
 at
 org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190)
 at
 org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
 at
 org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441)
 at org.apache.spark.rdd.RDD.init(RDD.scala:1365)
 at org.apache.spark.streaming.kafka.KafkaRDD.init(KafkaRDD.scala:46)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:153)
 at
 org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:402)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403)
 at
 org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:149)











Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing

2015-08-10 Thread Dmitry Goldenberg
Well, RDDs also contain data, don't they?

The question is, what can be so hefty in the checkpointing directory to
cause Spark driver to run out of memory?  It seems that it makes
checkpointing expensive, in terms of I/O and memory consumption.  Two
network hops -- to driver, then to workers.  Hefty file system usage, hefty
memory consumption...   What can we do to offset some of these costs?



On Mon, Aug 10, 2015 at 4:27 PM, Cody Koeninger c...@koeninger.org wrote:

 The rdd is indeed defined by mostly just the offsets / topic partitions.

 On Mon, Aug 10, 2015 at 3:24 PM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 You need to keep a certain number of rdds around for checkpointing --
 that seems like a hefty expense to pay in order to achieve fault
 tolerance.  Why does Spark persist whole RDD's of data?  Shouldn't it be
 sufficient to just persist the offsets, to know where to resume from?

 Thanks.


 On Mon, Aug 10, 2015 at 1:07 PM, Cody Koeninger c...@koeninger.org
 wrote:

 You need to keep a certain number of rdds around for checkpointing,
 based on e.g. the window size.  Those would all need to be loaded at once.

 On Mon, Aug 10, 2015 at 11:49 AM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 Would there be a way to chunk up/batch up the contents of the
 checkpointing directories as they're being processed by Spark Streaming?
 Is it mandatory to load the whole thing in one go?

 On Mon, Aug 10, 2015 at 12:42 PM, Ted Yu yuzhih...@gmail.com wrote:

 I wonder during recovery from a checkpoint whether we can estimate
 the size of the checkpoint and compare with Runtime.getRuntime().
 freeMemory().

 If the size of checkpoint is much bigger than free memory, log
 warning, etc

 Cheers

 On Mon, Aug 10, 2015 at 9:34 AM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 Thanks, Cody, will try that. Unfortunately due to a reinstall I don't
 have the original checkpointing directory :(  Thanks for the 
 clarification
 on spark.driver.memory, I'll keep testing (at 2g things seem OK for now).

 On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger c...@koeninger.org
 wrote:

 That looks like it's during recovery from a checkpoint, so it'd be
 driver memory not executor memory.

 How big is the checkpoint directory that you're trying to restore
 from?

 On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 We're getting the below error.  Tried increasing
 spark.executor.memory e.g. from 1g to 2g but the below error still 
 happens.

 Any recommendations? Something to do with specifying -Xmx in the
 submit job scripts?

 Thanks.

 Exception in thread main java.lang.OutOfMemoryError: GC overhead
 limit exceeded
 at java.util.Arrays.copyOf(Arrays.java:3332)
 at
 java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
 at
 java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
 at
 java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
 at java.lang.StringBuilder.append(StringBuilder.java:136)
 at java.lang.StackTraceElement.toString(StackTraceElement.java:173)
 at
 org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212)
 at
 org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at
 scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190)
 at
 org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
 at
 org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
 at scala.Option.getOrElse(Option.scala:120)
 at
 org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441)
 at org.apache.spark.rdd.RDD.init(RDD.scala:1365)
 at
 org.apache.spark.streaming.kafka.KafkaRDD.init(KafkaRDD.scala:46)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:153)
 at
 org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:402)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 

Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing

2015-08-10 Thread Cody Koeninger
The rdd is indeed defined by mostly just the offsets / topic partitions.

On Mon, Aug 10, 2015 at 3:24 PM, Dmitry Goldenberg dgoldenberg...@gmail.com
 wrote:

 You need to keep a certain number of rdds around for checkpointing --
 that seems like a hefty expense to pay in order to achieve fault
 tolerance.  Why does Spark persist whole RDD's of data?  Shouldn't it be
 sufficient to just persist the offsets, to know where to resume from?

 Thanks.


 On Mon, Aug 10, 2015 at 1:07 PM, Cody Koeninger c...@koeninger.org
 wrote:

 You need to keep a certain number of rdds around for checkpointing, based
 on e.g. the window size.  Those would all need to be loaded at once.

 On Mon, Aug 10, 2015 at 11:49 AM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 Would there be a way to chunk up/batch up the contents of the
 checkpointing directories as they're being processed by Spark Streaming?
 Is it mandatory to load the whole thing in one go?

 On Mon, Aug 10, 2015 at 12:42 PM, Ted Yu yuzhih...@gmail.com wrote:

 I wonder during recovery from a checkpoint whether we can estimate the
 size of the checkpoint and compare with Runtime.getRuntime().freeMemory
 ().

 If the size of checkpoint is much bigger than free memory, log warning,
 etc

 Cheers

 On Mon, Aug 10, 2015 at 9:34 AM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 Thanks, Cody, will try that. Unfortunately due to a reinstall I don't
 have the original checkpointing directory :(  Thanks for the clarification
 on spark.driver.memory, I'll keep testing (at 2g things seem OK for now).

 On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger c...@koeninger.org
 wrote:

 That looks like it's during recovery from a checkpoint, so it'd be
 driver memory not executor memory.

 How big is the checkpoint directory that you're trying to restore
 from?

 On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 We're getting the below error.  Tried increasing
 spark.executor.memory e.g. from 1g to 2g but the below error still 
 happens.

 Any recommendations? Something to do with specifying -Xmx in the
 submit job scripts?

 Thanks.

 Exception in thread main java.lang.OutOfMemoryError: GC overhead
 limit exceeded
 at java.util.Arrays.copyOf(Arrays.java:3332)
 at
 java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
 at
 java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
 at
 java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
 at java.lang.StringBuilder.append(StringBuilder.java:136)
 at java.lang.StackTraceElement.toString(StackTraceElement.java:173)
 at
 org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212)
 at
 org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at
 scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190)
 at
 org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
 at
 org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441)
 at org.apache.spark.rdd.RDD.init(RDD.scala:1365)
 at
 org.apache.spark.streaming.kafka.KafkaRDD.init(KafkaRDD.scala:46)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:153)
 at
 org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:402)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403)
 at
 org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:149)











Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing

2015-08-10 Thread Cody Koeninger
No, it's not like a given KafkaRDD object contains an array of messages
that gets serialized with the object.  Its compute method generates an
iterator of messages as needed, by connecting to kafka.

I don't know what was so hefty in your checkpoint directory, because you
deleted it.  My checkpoint directories are usually pretty reasonable in
size.

How many topicpartitions did you have, and how long was your window?

On Mon, Aug 10, 2015 at 3:33 PM, Dmitry Goldenberg dgoldenberg...@gmail.com
 wrote:

 Well, RDDs also contain data, don't they?

 The question is, what can be so hefty in the checkpointing directory to
 cause Spark driver to run out of memory?  It seems that it makes
 checkpointing expensive, in terms of I/O and memory consumption.  Two
 network hops -- to driver, then to workers.  Hefty file system usage, hefty
 memory consumption...   What can we do to offset some of these costs?



 On Mon, Aug 10, 2015 at 4:27 PM, Cody Koeninger c...@koeninger.org
 wrote:

 The rdd is indeed defined by mostly just the offsets / topic partitions.

 On Mon, Aug 10, 2015 at 3:24 PM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 You need to keep a certain number of rdds around for checkpointing --
 that seems like a hefty expense to pay in order to achieve fault
 tolerance.  Why does Spark persist whole RDD's of data?  Shouldn't it be
 sufficient to just persist the offsets, to know where to resume from?

 Thanks.


 On Mon, Aug 10, 2015 at 1:07 PM, Cody Koeninger c...@koeninger.org
 wrote:

 You need to keep a certain number of rdds around for checkpointing,
 based on e.g. the window size.  Those would all need to be loaded at once.

 On Mon, Aug 10, 2015 at 11:49 AM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 Would there be a way to chunk up/batch up the contents of the
 checkpointing directories as they're being processed by Spark Streaming?
 Is it mandatory to load the whole thing in one go?

 On Mon, Aug 10, 2015 at 12:42 PM, Ted Yu yuzhih...@gmail.com wrote:

 I wonder during recovery from a checkpoint whether we can estimate
 the size of the checkpoint and compare with Runtime.getRuntime().
 freeMemory().

 If the size of checkpoint is much bigger than free memory, log
 warning, etc

 Cheers

 On Mon, Aug 10, 2015 at 9:34 AM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 Thanks, Cody, will try that. Unfortunately due to a reinstall I
 don't have the original checkpointing directory :(  Thanks for the
 clarification on spark.driver.memory, I'll keep testing (at 2g things 
 seem
 OK for now).

 On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger c...@koeninger.org
  wrote:

 That looks like it's during recovery from a checkpoint, so it'd be
 driver memory not executor memory.

 How big is the checkpoint directory that you're trying to restore
 from?

 On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 We're getting the below error.  Tried increasing
 spark.executor.memory e.g. from 1g to 2g but the below error still 
 happens.

 Any recommendations? Something to do with specifying -Xmx in the
 submit job scripts?

 Thanks.

 Exception in thread main java.lang.OutOfMemoryError: GC overhead
 limit exceeded
 at java.util.Arrays.copyOf(Arrays.java:3332)
 at
 java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
 at
 java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
 at
 java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
 at java.lang.StringBuilder.append(StringBuilder.java:136)
 at java.lang.StackTraceElement.toString(StackTraceElement.java:173)
 at
 org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212)
 at
 org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at
 scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190)
 at
 org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
 at
 org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
 at scala.Option.getOrElse(Option.scala:120)
 at
 org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441)
 at org.apache.spark.rdd.RDD.init(RDD.scala:1365)
 at
 org.apache.spark.streaming.kafka.KafkaRDD.init(KafkaRDD.scala:46)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at