The rdd is indeed defined by mostly just the offsets / topic partitions.

On Mon, Aug 10, 2015 at 3:24 PM, Dmitry Goldenberg <dgoldenberg...@gmail.com
> wrote:

> "You need to keep a certain number of rdds around for checkpointing" --
> that seems like a hefty expense to pay in order to achieve fault
> tolerance.  Why does Spark persist whole RDD's of data?  Shouldn't it be
> sufficient to just persist the offsets, to know where to resume from?
>
> Thanks.
>
>
> On Mon, Aug 10, 2015 at 1:07 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> You need to keep a certain number of rdds around for checkpointing, based
>> on e.g. the window size.  Those would all need to be loaded at once.
>>
>> On Mon, Aug 10, 2015 at 11:49 AM, Dmitry Goldenberg <
>> dgoldenberg...@gmail.com> wrote:
>>
>>> Would there be a way to chunk up/batch up the contents of the
>>> checkpointing directories as they're being processed by Spark Streaming?
>>> Is it mandatory to load the whole thing in one go?
>>>
>>> On Mon, Aug 10, 2015 at 12:42 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>
>>>> I wonder during recovery from a checkpoint whether we can estimate the
>>>> size of the checkpoint and compare with Runtime.getRuntime().freeMemory
>>>> ().
>>>>
>>>> If the size of checkpoint is much bigger than free memory, log warning,
>>>> etc
>>>>
>>>> Cheers
>>>>
>>>> On Mon, Aug 10, 2015 at 9:34 AM, Dmitry Goldenberg <
>>>> dgoldenberg...@gmail.com> wrote:
>>>>
>>>>> Thanks, Cody, will try that. Unfortunately due to a reinstall I don't
>>>>> have the original checkpointing directory :(  Thanks for the clarification
>>>>> on spark.driver.memory, I'll keep testing (at 2g things seem OK for now).
>>>>>
>>>>> On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger <c...@koeninger.org>
>>>>> wrote:
>>>>>
>>>>>> That looks like it's during recovery from a checkpoint, so it'd be
>>>>>> driver memory not executor memory.
>>>>>>
>>>>>> How big is the checkpoint directory that you're trying to restore
>>>>>> from?
>>>>>>
>>>>>> On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg <
>>>>>> dgoldenberg...@gmail.com> wrote:
>>>>>>
>>>>>>> We're getting the below error.  Tried increasing
>>>>>>> spark.executor.memory e.g. from 1g to 2g but the below error still 
>>>>>>> happens.
>>>>>>>
>>>>>>> Any recommendations? Something to do with specifying -Xmx in the
>>>>>>> submit job scripts?
>>>>>>>
>>>>>>> Thanks.
>>>>>>>
>>>>>>> Exception in thread "main" java.lang.OutOfMemoryError: GC overhead
>>>>>>> limit exceeded
>>>>>>> at java.util.Arrays.copyOf(Arrays.java:3332)
>>>>>>> at
>>>>>>> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
>>>>>>> at
>>>>>>> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
>>>>>>> at
>>>>>>> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
>>>>>>> at java.lang.StringBuilder.append(StringBuilder.java:136)
>>>>>>> at java.lang.StackTraceElement.toString(StackTraceElement.java:173)
>>>>>>> at
>>>>>>> org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212)
>>>>>>> at
>>>>>>> org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190)
>>>>>>> at
>>>>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>>>>> at
>>>>>>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>>>>>> at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190)
>>>>>>> at
>>>>>>> org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
>>>>>>> at
>>>>>>> org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
>>>>>>> at scala.Option.getOrElse(Option.scala:120)
>>>>>>> at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441)
>>>>>>> at org.apache.spark.rdd.RDD.<init>(RDD.scala:1365)
>>>>>>> at
>>>>>>> org.apache.spark.streaming.kafka.KafkaRDD.<init>(KafkaRDD.scala:46)
>>>>>>> at
>>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155)
>>>>>>> at
>>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153)
>>>>>>> at
>>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>>> at
>>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:153)
>>>>>>> at
>>>>>>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:402)
>>>>>>> at
>>>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
>>>>>>> at
>>>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
>>>>>>> at scala.collection.immutable.List.foreach(List.scala:318)
>>>>>>> at
>>>>>>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403)
>>>>>>> at
>>>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
>>>>>>> at
>>>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
>>>>>>> at scala.collection.immutable.List.foreach(List.scala:318)
>>>>>>> at
>>>>>>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403)
>>>>>>> at
>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:149)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to