The rdd is indeed defined by mostly just the offsets / topic partitions. On Mon, Aug 10, 2015 at 3:24 PM, Dmitry Goldenberg <dgoldenberg...@gmail.com > wrote:
> "You need to keep a certain number of rdds around for checkpointing" -- > that seems like a hefty expense to pay in order to achieve fault > tolerance. Why does Spark persist whole RDD's of data? Shouldn't it be > sufficient to just persist the offsets, to know where to resume from? > > Thanks. > > > On Mon, Aug 10, 2015 at 1:07 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> You need to keep a certain number of rdds around for checkpointing, based >> on e.g. the window size. Those would all need to be loaded at once. >> >> On Mon, Aug 10, 2015 at 11:49 AM, Dmitry Goldenberg < >> dgoldenberg...@gmail.com> wrote: >> >>> Would there be a way to chunk up/batch up the contents of the >>> checkpointing directories as they're being processed by Spark Streaming? >>> Is it mandatory to load the whole thing in one go? >>> >>> On Mon, Aug 10, 2015 at 12:42 PM, Ted Yu <yuzhih...@gmail.com> wrote: >>> >>>> I wonder during recovery from a checkpoint whether we can estimate the >>>> size of the checkpoint and compare with Runtime.getRuntime().freeMemory >>>> (). >>>> >>>> If the size of checkpoint is much bigger than free memory, log warning, >>>> etc >>>> >>>> Cheers >>>> >>>> On Mon, Aug 10, 2015 at 9:34 AM, Dmitry Goldenberg < >>>> dgoldenberg...@gmail.com> wrote: >>>> >>>>> Thanks, Cody, will try that. Unfortunately due to a reinstall I don't >>>>> have the original checkpointing directory :( Thanks for the clarification >>>>> on spark.driver.memory, I'll keep testing (at 2g things seem OK for now). >>>>> >>>>> On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger <c...@koeninger.org> >>>>> wrote: >>>>> >>>>>> That looks like it's during recovery from a checkpoint, so it'd be >>>>>> driver memory not executor memory. >>>>>> >>>>>> How big is the checkpoint directory that you're trying to restore >>>>>> from? >>>>>> >>>>>> On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg < >>>>>> dgoldenberg...@gmail.com> wrote: >>>>>> >>>>>>> We're getting the below error. Tried increasing >>>>>>> spark.executor.memory e.g. from 1g to 2g but the below error still >>>>>>> happens. >>>>>>> >>>>>>> Any recommendations? Something to do with specifying -Xmx in the >>>>>>> submit job scripts? >>>>>>> >>>>>>> Thanks. >>>>>>> >>>>>>> Exception in thread "main" java.lang.OutOfMemoryError: GC overhead >>>>>>> limit exceeded >>>>>>> at java.util.Arrays.copyOf(Arrays.java:3332) >>>>>>> at >>>>>>> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137) >>>>>>> at >>>>>>> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121) >>>>>>> at >>>>>>> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421) >>>>>>> at java.lang.StringBuilder.append(StringBuilder.java:136) >>>>>>> at java.lang.StackTraceElement.toString(StackTraceElement.java:173) >>>>>>> at >>>>>>> org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212) >>>>>>> at >>>>>>> org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190) >>>>>>> at >>>>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >>>>>>> at >>>>>>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >>>>>>> at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190) >>>>>>> at >>>>>>> org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) >>>>>>> at >>>>>>> org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441) >>>>>>> at scala.Option.getOrElse(Option.scala:120) >>>>>>> at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441) >>>>>>> at org.apache.spark.rdd.RDD.<init>(RDD.scala:1365) >>>>>>> at >>>>>>> org.apache.spark.streaming.kafka.KafkaRDD.<init>(KafkaRDD.scala:46) >>>>>>> at >>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155) >>>>>>> at >>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153) >>>>>>> at >>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>>>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>>>>> at >>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:153) >>>>>>> at >>>>>>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:402) >>>>>>> at >>>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) >>>>>>> at >>>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) >>>>>>> at scala.collection.immutable.List.foreach(List.scala:318) >>>>>>> at >>>>>>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403) >>>>>>> at >>>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) >>>>>>> at >>>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403) >>>>>>> at scala.collection.immutable.List.foreach(List.scala:318) >>>>>>> at >>>>>>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403) >>>>>>> at >>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:149) >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >