Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing
No, it's not like a given KafkaRDD object contains an array of messages that gets serialized with the object. Its compute method generates an iterator of messages as needed, by connecting to kafka. I don't know what was so hefty in your checkpoint directory, because you deleted it. My checkpoint directories are usually pretty reasonable in size. How many topicpartitions did you have, and how long was your window? On Mon, Aug 10, 2015 at 3:33 PM, Dmitry Goldenberg wrote: > Well, RDD"s also contain data, don't they? > > The question is, what can be so hefty in the checkpointing directory to > cause Spark driver to run out of memory? It seems that it makes > checkpointing expensive, in terms of I/O and memory consumption. Two > network hops -- to driver, then to workers. Hefty file system usage, hefty > memory consumption... What can we do to offset some of these costs? > > > > On Mon, Aug 10, 2015 at 4:27 PM, Cody Koeninger > wrote: > >> The rdd is indeed defined by mostly just the offsets / topic partitions. >> >> On Mon, Aug 10, 2015 at 3:24 PM, Dmitry Goldenberg < >> dgoldenberg...@gmail.com> wrote: >> >>> "You need to keep a certain number of rdds around for checkpointing" -- >>> that seems like a hefty expense to pay in order to achieve fault >>> tolerance. Why does Spark persist whole RDD's of data? Shouldn't it be >>> sufficient to just persist the offsets, to know where to resume from? >>> >>> Thanks. >>> >>> >>> On Mon, Aug 10, 2015 at 1:07 PM, Cody Koeninger >>> wrote: >>> You need to keep a certain number of rdds around for checkpointing, based on e.g. the window size. Those would all need to be loaded at once. On Mon, Aug 10, 2015 at 11:49 AM, Dmitry Goldenberg < dgoldenberg...@gmail.com> wrote: > Would there be a way to chunk up/batch up the contents of the > checkpointing directories as they're being processed by Spark Streaming? > Is it mandatory to load the whole thing in one go? > > On Mon, Aug 10, 2015 at 12:42 PM, Ted Yu wrote: > >> I wonder during recovery from a checkpoint whether we can estimate >> the size of the checkpoint and compare with Runtime.getRuntime(). >> freeMemory(). >> >> If the size of checkpoint is much bigger than free memory, log >> warning, etc >> >> Cheers >> >> On Mon, Aug 10, 2015 at 9:34 AM, Dmitry Goldenberg < >> dgoldenberg...@gmail.com> wrote: >> >>> Thanks, Cody, will try that. Unfortunately due to a reinstall I >>> don't have the original checkpointing directory :( Thanks for the >>> clarification on spark.driver.memory, I'll keep testing (at 2g things >>> seem >>> OK for now). >>> >>> On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger >> > wrote: >>> That looks like it's during recovery from a checkpoint, so it'd be driver memory not executor memory. How big is the checkpoint directory that you're trying to restore from? On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg < dgoldenberg...@gmail.com> wrote: > We're getting the below error. Tried increasing > spark.executor.memory e.g. from 1g to 2g but the below error still > happens. > > Any recommendations? Something to do with specifying -Xmx in the > submit job scripts? > > Thanks. > > Exception in thread "main" java.lang.OutOfMemoryError: GC overhead > limit exceeded > at java.util.Arrays.copyOf(Arrays.java:3332) > at > java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137) > at > java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121) > at > java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421) > at java.lang.StringBuilder.append(StringBuilder.java:136) > at java.lang.StackTraceElement.toString(StackTraceElement.java:173) > at > org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212) > at > org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at > scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) > at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190) > at > org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) > at > org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) > at scala.Option.getOrElse(Option.scala:120) > at > org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441) > at org.apache.spark.rdd.RDD.(RDD.scala:1365) > at > org.apache.spark.streaming.kafka.Kafka
Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing
Well, RDD"s also contain data, don't they? The question is, what can be so hefty in the checkpointing directory to cause Spark driver to run out of memory? It seems that it makes checkpointing expensive, in terms of I/O and memory consumption. Two network hops -- to driver, then to workers. Hefty file system usage, hefty memory consumption... What can we do to offset some of these costs? On Mon, Aug 10, 2015 at 4:27 PM, Cody Koeninger wrote: > The rdd is indeed defined by mostly just the offsets / topic partitions. > > On Mon, Aug 10, 2015 at 3:24 PM, Dmitry Goldenberg < > dgoldenberg...@gmail.com> wrote: > >> "You need to keep a certain number of rdds around for checkpointing" -- >> that seems like a hefty expense to pay in order to achieve fault >> tolerance. Why does Spark persist whole RDD's of data? Shouldn't it be >> sufficient to just persist the offsets, to know where to resume from? >> >> Thanks. >> >> >> On Mon, Aug 10, 2015 at 1:07 PM, Cody Koeninger >> wrote: >> >>> You need to keep a certain number of rdds around for checkpointing, >>> based on e.g. the window size. Those would all need to be loaded at once. >>> >>> On Mon, Aug 10, 2015 at 11:49 AM, Dmitry Goldenberg < >>> dgoldenberg...@gmail.com> wrote: >>> Would there be a way to chunk up/batch up the contents of the checkpointing directories as they're being processed by Spark Streaming? Is it mandatory to load the whole thing in one go? On Mon, Aug 10, 2015 at 12:42 PM, Ted Yu wrote: > I wonder during recovery from a checkpoint whether we can estimate > the size of the checkpoint and compare with Runtime.getRuntime(). > freeMemory(). > > If the size of checkpoint is much bigger than free memory, log > warning, etc > > Cheers > > On Mon, Aug 10, 2015 at 9:34 AM, Dmitry Goldenberg < > dgoldenberg...@gmail.com> wrote: > >> Thanks, Cody, will try that. Unfortunately due to a reinstall I don't >> have the original checkpointing directory :( Thanks for the >> clarification >> on spark.driver.memory, I'll keep testing (at 2g things seem OK for now). >> >> On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger >> wrote: >> >>> That looks like it's during recovery from a checkpoint, so it'd be >>> driver memory not executor memory. >>> >>> How big is the checkpoint directory that you're trying to restore >>> from? >>> >>> On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg < >>> dgoldenberg...@gmail.com> wrote: >>> We're getting the below error. Tried increasing spark.executor.memory e.g. from 1g to 2g but the below error still happens. Any recommendations? Something to do with specifying -Xmx in the submit job scripts? Thanks. Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOf(Arrays.java:3332) at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137) at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421) at java.lang.StringBuilder.append(StringBuilder.java:136) at java.lang.StackTraceElement.toString(StackTraceElement.java:173) at org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212) at org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190) at org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) at org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441) at org.apache.spark.rdd.RDD.(RDD.scala:1365) at org.apache.spark.streaming.kafka.KafkaRDD.(KafkaRDD.scala:46) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.strea
Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing
The rdd is indeed defined by mostly just the offsets / topic partitions. On Mon, Aug 10, 2015 at 3:24 PM, Dmitry Goldenberg wrote: > "You need to keep a certain number of rdds around for checkpointing" -- > that seems like a hefty expense to pay in order to achieve fault > tolerance. Why does Spark persist whole RDD's of data? Shouldn't it be > sufficient to just persist the offsets, to know where to resume from? > > Thanks. > > > On Mon, Aug 10, 2015 at 1:07 PM, Cody Koeninger > wrote: > >> You need to keep a certain number of rdds around for checkpointing, based >> on e.g. the window size. Those would all need to be loaded at once. >> >> On Mon, Aug 10, 2015 at 11:49 AM, Dmitry Goldenberg < >> dgoldenberg...@gmail.com> wrote: >> >>> Would there be a way to chunk up/batch up the contents of the >>> checkpointing directories as they're being processed by Spark Streaming? >>> Is it mandatory to load the whole thing in one go? >>> >>> On Mon, Aug 10, 2015 at 12:42 PM, Ted Yu wrote: >>> I wonder during recovery from a checkpoint whether we can estimate the size of the checkpoint and compare with Runtime.getRuntime().freeMemory (). If the size of checkpoint is much bigger than free memory, log warning, etc Cheers On Mon, Aug 10, 2015 at 9:34 AM, Dmitry Goldenberg < dgoldenberg...@gmail.com> wrote: > Thanks, Cody, will try that. Unfortunately due to a reinstall I don't > have the original checkpointing directory :( Thanks for the clarification > on spark.driver.memory, I'll keep testing (at 2g things seem OK for now). > > On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger > wrote: > >> That looks like it's during recovery from a checkpoint, so it'd be >> driver memory not executor memory. >> >> How big is the checkpoint directory that you're trying to restore >> from? >> >> On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg < >> dgoldenberg...@gmail.com> wrote: >> >>> We're getting the below error. Tried increasing >>> spark.executor.memory e.g. from 1g to 2g but the below error still >>> happens. >>> >>> Any recommendations? Something to do with specifying -Xmx in the >>> submit job scripts? >>> >>> Thanks. >>> >>> Exception in thread "main" java.lang.OutOfMemoryError: GC overhead >>> limit exceeded >>> at java.util.Arrays.copyOf(Arrays.java:3332) >>> at >>> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137) >>> at >>> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121) >>> at >>> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421) >>> at java.lang.StringBuilder.append(StringBuilder.java:136) >>> at java.lang.StackTraceElement.toString(StackTraceElement.java:173) >>> at >>> org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212) >>> at >>> org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190) >>> at >>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >>> at >>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >>> at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190) >>> at >>> org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) >>> at >>> org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) >>> at scala.Option.getOrElse(Option.scala:120) >>> at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441) >>> at org.apache.spark.rdd.RDD.(RDD.scala:1365) >>> at >>> org.apache.spark.streaming.kafka.KafkaRDD.(KafkaRDD.scala:46) >>> at >>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155) >>> at >>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153) >>> at >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>> at >>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:153) >>> at >>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:402) >>> at >>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) >>> at >>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) >>> at scala.collection.immutable.List.foreach(List.scala:318) >>> at >>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStrea
Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing
"You need to keep a certain number of rdds around for checkpointing" -- that seems like a hefty expense to pay in order to achieve fault tolerance. Why does Spark persist whole RDD's of data? Shouldn't it be sufficient to just persist the offsets, to know where to resume from? Thanks. On Mon, Aug 10, 2015 at 1:07 PM, Cody Koeninger wrote: > You need to keep a certain number of rdds around for checkpointing, based > on e.g. the window size. Those would all need to be loaded at once. > > On Mon, Aug 10, 2015 at 11:49 AM, Dmitry Goldenberg < > dgoldenberg...@gmail.com> wrote: > >> Would there be a way to chunk up/batch up the contents of the >> checkpointing directories as they're being processed by Spark Streaming? >> Is it mandatory to load the whole thing in one go? >> >> On Mon, Aug 10, 2015 at 12:42 PM, Ted Yu wrote: >> >>> I wonder during recovery from a checkpoint whether we can estimate the >>> size of the checkpoint and compare with Runtime.getRuntime().freeMemory >>> (). >>> >>> If the size of checkpoint is much bigger than free memory, log warning, >>> etc >>> >>> Cheers >>> >>> On Mon, Aug 10, 2015 at 9:34 AM, Dmitry Goldenberg < >>> dgoldenberg...@gmail.com> wrote: >>> Thanks, Cody, will try that. Unfortunately due to a reinstall I don't have the original checkpointing directory :( Thanks for the clarification on spark.driver.memory, I'll keep testing (at 2g things seem OK for now). On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger wrote: > That looks like it's during recovery from a checkpoint, so it'd be > driver memory not executor memory. > > How big is the checkpoint directory that you're trying to restore from? > > On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg < > dgoldenberg...@gmail.com> wrote: > >> We're getting the below error. Tried increasing >> spark.executor.memory e.g. from 1g to 2g but the below error still >> happens. >> >> Any recommendations? Something to do with specifying -Xmx in the >> submit job scripts? >> >> Thanks. >> >> Exception in thread "main" java.lang.OutOfMemoryError: GC overhead >> limit exceeded >> at java.util.Arrays.copyOf(Arrays.java:3332) >> at >> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137) >> at >> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121) >> at >> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421) >> at java.lang.StringBuilder.append(StringBuilder.java:136) >> at java.lang.StackTraceElement.toString(StackTraceElement.java:173) >> at >> org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212) >> at >> org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190) >> at >> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >> at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190) >> at >> org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) >> at >> org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) >> at scala.Option.getOrElse(Option.scala:120) >> at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441) >> at org.apache.spark.rdd.RDD.(RDD.scala:1365) >> at org.apache.spark.streaming.kafka.KafkaRDD.(KafkaRDD.scala:46) >> at >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155) >> at >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153) >> at >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> at >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:153) >> at >> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:402) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) >> at scala.collection.immutable.List.foreach(List.scala:318) >> at >> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) >> at scala.coll
Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing
Looks like workaround is to reduce *window length.* *Cheers* On Mon, Aug 10, 2015 at 10:07 AM, Cody Koeninger wrote: > You need to keep a certain number of rdds around for checkpointing, based > on e.g. the window size. Those would all need to be loaded at once. > > On Mon, Aug 10, 2015 at 11:49 AM, Dmitry Goldenberg < > dgoldenberg...@gmail.com> wrote: > >> Would there be a way to chunk up/batch up the contents of the >> checkpointing directories as they're being processed by Spark Streaming? >> Is it mandatory to load the whole thing in one go? >> >> On Mon, Aug 10, 2015 at 12:42 PM, Ted Yu wrote: >> >>> I wonder during recovery from a checkpoint whether we can estimate the >>> size of the checkpoint and compare with Runtime.getRuntime().freeMemory >>> (). >>> >>> If the size of checkpoint is much bigger than free memory, log warning, >>> etc >>> >>> Cheers >>> >>> On Mon, Aug 10, 2015 at 9:34 AM, Dmitry Goldenberg < >>> dgoldenberg...@gmail.com> wrote: >>> Thanks, Cody, will try that. Unfortunately due to a reinstall I don't have the original checkpointing directory :( Thanks for the clarification on spark.driver.memory, I'll keep testing (at 2g things seem OK for now). On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger wrote: > That looks like it's during recovery from a checkpoint, so it'd be > driver memory not executor memory. > > How big is the checkpoint directory that you're trying to restore from? > > On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg < > dgoldenberg...@gmail.com> wrote: > >> We're getting the below error. Tried increasing >> spark.executor.memory e.g. from 1g to 2g but the below error still >> happens. >> >> Any recommendations? Something to do with specifying -Xmx in the >> submit job scripts? >> >> Thanks. >> >> Exception in thread "main" java.lang.OutOfMemoryError: GC overhead >> limit exceeded >> at java.util.Arrays.copyOf(Arrays.java:3332) >> at >> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137) >> at >> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121) >> at >> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421) >> at java.lang.StringBuilder.append(StringBuilder.java:136) >> at java.lang.StackTraceElement.toString(StackTraceElement.java:173) >> at >> org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212) >> at >> org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190) >> at >> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >> at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190) >> at >> org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) >> at >> org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) >> at scala.Option.getOrElse(Option.scala:120) >> at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441) >> at org.apache.spark.rdd.RDD.(RDD.scala:1365) >> at org.apache.spark.streaming.kafka.KafkaRDD.(KafkaRDD.scala:46) >> at >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155) >> at >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153) >> at >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> at >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:153) >> at >> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:402) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) >> at scala.collection.immutable.List.foreach(List.scala:318) >> at >> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) >> at scala.collection.immutable.List.foreach(List.scala:318) >> at >> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403) >> at >> org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckp
Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing
You need to keep a certain number of rdds around for checkpointing, based on e.g. the window size. Those would all need to be loaded at once. On Mon, Aug 10, 2015 at 11:49 AM, Dmitry Goldenberg < dgoldenberg...@gmail.com> wrote: > Would there be a way to chunk up/batch up the contents of the > checkpointing directories as they're being processed by Spark Streaming? > Is it mandatory to load the whole thing in one go? > > On Mon, Aug 10, 2015 at 12:42 PM, Ted Yu wrote: > >> I wonder during recovery from a checkpoint whether we can estimate the >> size of the checkpoint and compare with Runtime.getRuntime().freeMemory >> (). >> >> If the size of checkpoint is much bigger than free memory, log warning, >> etc >> >> Cheers >> >> On Mon, Aug 10, 2015 at 9:34 AM, Dmitry Goldenberg < >> dgoldenberg...@gmail.com> wrote: >> >>> Thanks, Cody, will try that. Unfortunately due to a reinstall I don't >>> have the original checkpointing directory :( Thanks for the clarification >>> on spark.driver.memory, I'll keep testing (at 2g things seem OK for now). >>> >>> On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger >>> wrote: >>> That looks like it's during recovery from a checkpoint, so it'd be driver memory not executor memory. How big is the checkpoint directory that you're trying to restore from? On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg < dgoldenberg...@gmail.com> wrote: > We're getting the below error. Tried increasing spark.executor.memory > e.g. from 1g to 2g but the below error still happens. > > Any recommendations? Something to do with specifying -Xmx in the > submit job scripts? > > Thanks. > > Exception in thread "main" java.lang.OutOfMemoryError: GC overhead > limit exceeded > at java.util.Arrays.copyOf(Arrays.java:3332) > at > java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137) > at > java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121) > at > java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421) > at java.lang.StringBuilder.append(StringBuilder.java:136) > at java.lang.StackTraceElement.toString(StackTraceElement.java:173) > at > org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212) > at > org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) > at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190) > at > org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) > at > org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441) > at org.apache.spark.rdd.RDD.(RDD.scala:1365) > at org.apache.spark.streaming.kafka.KafkaRDD.(KafkaRDD.scala:46) > at > org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155) > at > org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:153) > at > org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:402) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) > at scala.collection.immutable.List.foreach(List.scala:318) > at > org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) > at scala.collection.immutable.List.foreach(List.scala:318) > at > org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:149) > > > > >>> >> >
Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing
Would there be a way to chunk up/batch up the contents of the checkpointing directories as they're being processed by Spark Streaming? Is it mandatory to load the whole thing in one go? On Mon, Aug 10, 2015 at 12:42 PM, Ted Yu wrote: > I wonder during recovery from a checkpoint whether we can estimate the > size of the checkpoint and compare with Runtime.getRuntime().freeMemory(). > > If the size of checkpoint is much bigger than free memory, log warning, etc > > Cheers > > On Mon, Aug 10, 2015 at 9:34 AM, Dmitry Goldenberg < > dgoldenberg...@gmail.com> wrote: > >> Thanks, Cody, will try that. Unfortunately due to a reinstall I don't >> have the original checkpointing directory :( Thanks for the clarification >> on spark.driver.memory, I'll keep testing (at 2g things seem OK for now). >> >> On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger >> wrote: >> >>> That looks like it's during recovery from a checkpoint, so it'd be >>> driver memory not executor memory. >>> >>> How big is the checkpoint directory that you're trying to restore from? >>> >>> On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg < >>> dgoldenberg...@gmail.com> wrote: >>> We're getting the below error. Tried increasing spark.executor.memory e.g. from 1g to 2g but the below error still happens. Any recommendations? Something to do with specifying -Xmx in the submit job scripts? Thanks. Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOf(Arrays.java:3332) at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137) at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421) at java.lang.StringBuilder.append(StringBuilder.java:136) at java.lang.StackTraceElement.toString(StackTraceElement.java:173) at org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212) at org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190) at org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) at org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441) at org.apache.spark.rdd.RDD.(RDD.scala:1365) at org.apache.spark.streaming.kafka.KafkaRDD.(KafkaRDD.scala:46) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:153) at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:402) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403) at org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:149) >>> >> >
Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing
I wonder during recovery from a checkpoint whether we can estimate the size of the checkpoint and compare with Runtime.getRuntime().freeMemory(). If the size of checkpoint is much bigger than free memory, log warning, etc Cheers On Mon, Aug 10, 2015 at 9:34 AM, Dmitry Goldenberg wrote: > Thanks, Cody, will try that. Unfortunately due to a reinstall I don't have > the original checkpointing directory :( Thanks for the clarification on > spark.driver.memory, I'll keep testing (at 2g things seem OK for now). > > On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger > wrote: > >> That looks like it's during recovery from a checkpoint, so it'd be driver >> memory not executor memory. >> >> How big is the checkpoint directory that you're trying to restore from? >> >> On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg < >> dgoldenberg...@gmail.com> wrote: >> >>> We're getting the below error. Tried increasing spark.executor.memory >>> e.g. from 1g to 2g but the below error still happens. >>> >>> Any recommendations? Something to do with specifying -Xmx in the submit >>> job scripts? >>> >>> Thanks. >>> >>> Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit >>> exceeded >>> at java.util.Arrays.copyOf(Arrays.java:3332) >>> at >>> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137) >>> at >>> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121) >>> at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421) >>> at java.lang.StringBuilder.append(StringBuilder.java:136) >>> at java.lang.StackTraceElement.toString(StackTraceElement.java:173) >>> at >>> org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212) >>> at >>> org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190) >>> at >>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >>> at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190) >>> at >>> org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) >>> at >>> org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) >>> at scala.Option.getOrElse(Option.scala:120) >>> at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441) >>> at org.apache.spark.rdd.RDD.(RDD.scala:1365) >>> at org.apache.spark.streaming.kafka.KafkaRDD.(KafkaRDD.scala:46) >>> at >>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155) >>> at >>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153) >>> at >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>> at >>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:153) >>> at >>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:402) >>> at >>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) >>> at >>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) >>> at scala.collection.immutable.List.foreach(List.scala:318) >>> at >>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403) >>> at >>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) >>> at >>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) >>> at scala.collection.immutable.List.foreach(List.scala:318) >>> at >>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403) >>> at >>> org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:149) >>> >>> >>> >>> >> >
Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing
Thanks, Cody, will try that. Unfortunately due to a reinstall I don't have the original checkpointing directory :( Thanks for the clarification on spark.driver.memory, I'll keep testing (at 2g things seem OK for now). On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger wrote: > That looks like it's during recovery from a checkpoint, so it'd be driver > memory not executor memory. > > How big is the checkpoint directory that you're trying to restore from? > > On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg < > dgoldenberg...@gmail.com> wrote: > >> We're getting the below error. Tried increasing spark.executor.memory >> e.g. from 1g to 2g but the below error still happens. >> >> Any recommendations? Something to do with specifying -Xmx in the submit >> job scripts? >> >> Thanks. >> >> Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit >> exceeded >> at java.util.Arrays.copyOf(Arrays.java:3332) >> at >> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137) >> at >> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121) >> at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421) >> at java.lang.StringBuilder.append(StringBuilder.java:136) >> at java.lang.StackTraceElement.toString(StackTraceElement.java:173) >> at >> org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212) >> at >> org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190) >> at >> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >> at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190) >> at >> org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) >> at >> org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) >> at scala.Option.getOrElse(Option.scala:120) >> at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441) >> at org.apache.spark.rdd.RDD.(RDD.scala:1365) >> at org.apache.spark.streaming.kafka.KafkaRDD.(KafkaRDD.scala:46) >> at >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155) >> at >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153) >> at >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> at >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:153) >> at >> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:402) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) >> at scala.collection.immutable.List.foreach(List.scala:318) >> at >> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) >> at scala.collection.immutable.List.foreach(List.scala:318) >> at >> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403) >> at >> org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:149) >> >> >> >> >
Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing
That looks like it's during recovery from a checkpoint, so it'd be driver memory not executor memory. How big is the checkpoint directory that you're trying to restore from? On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg < dgoldenberg...@gmail.com> wrote: > We're getting the below error. Tried increasing spark.executor.memory > e.g. from 1g to 2g but the below error still happens. > > Any recommendations? Something to do with specifying -Xmx in the submit > job scripts? > > Thanks. > > Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit > exceeded > at java.util.Arrays.copyOf(Arrays.java:3332) > at > java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137) > at > java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121) > at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421) > at java.lang.StringBuilder.append(StringBuilder.java:136) > at java.lang.StackTraceElement.toString(StackTraceElement.java:173) > at > org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212) > at > org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) > at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190) > at > org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) > at > org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441) > at org.apache.spark.rdd.RDD.(RDD.scala:1365) > at org.apache.spark.streaming.kafka.KafkaRDD.(KafkaRDD.scala:46) > at > org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155) > at > org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:153) > at > org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:402) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) > at scala.collection.immutable.List.foreach(List.scala:318) > at > org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) > at scala.collection.immutable.List.foreach(List.scala:318) > at > org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:149) > > > >