Thanks, Cody. It sounds like Spark Streaming has enough state info to know
how many batches have been processed and if not all of them then the RDD is
'unfinished'. I wonder if it would know whether the last micro-batch has
been fully processed successfully. Hypothetically, the driver program could
terminate as the last batch is being processed...

On Fri, Aug 14, 2015 at 6:17 PM, Cody Koeninger <c...@koeninger.org> wrote:

> You'll resume and re-process the rdd that didnt finish
>
> On Fri, Aug 14, 2015 at 1:31 PM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> Our additional question on checkpointing is basically the logistics of it
>> --
>>
>> At which point does the data get written into checkpointing?  Is it
>> written as soon as the driver program retrieves an RDD from Kafka (or
>> another source)?  Or, is it written after that RDD has been processed and
>> we're basically moving on to the next RDD?
>>
>> What I'm driving at is, what happens if the driver program is killed?
>> The next time it's started, will it know, from Spark Streaming's
>> checkpointing, to resume from the same RDD that was being processed at the
>> time of the program getting killed?  In other words, will we, upon
>> restarting the consumer, resume from the RDD that was unfinished, or will
>> we be looking at the next RDD?
>>
>> Will we pick up from the last known *successfully processed* topic
>> offset?
>>
>> Thanks.
>>
>>
>>
>>
>> On Fri, Jul 31, 2015 at 1:52 PM, Sean Owen <so...@cloudera.com> wrote:
>>
>>> If you've set the checkpoint dir, it seems like indeed the intent is
>>> to use a default checkpoint interval in DStream:
>>>
>>> private[streaming] def initialize(time: Time) {
>>> ...
>>>   // Set the checkpoint interval to be slideDuration or 10 seconds,
>>> which ever is larger
>>>   if (mustCheckpoint && checkpointDuration == null) {
>>>     checkpointDuration = slideDuration * math.ceil(Seconds(10) /
>>> slideDuration).toInt
>>>     logInfo("Checkpoint interval automatically set to " +
>>> checkpointDuration)
>>>   }
>>>
>>> Do you see that log message? what's the interval? that could at least
>>> explain why it's not doing anything, if it's quite long.
>>>
>>> It sort of seems wrong though since
>>> https://spark.apache.org/docs/latest/streaming-programming-guide.html
>>> suggests it was intended to be a multiple of the batch interval. The
>>> slide duration wouldn't always be relevant anyway.
>>>
>>> On Fri, Jul 31, 2015 at 6:16 PM, Dmitry Goldenberg
>>> <dgoldenberg...@gmail.com> wrote:
>>> > I've instrumented checkpointing per the programming guide and I can
>>> tell
>>> > that Spark Streaming is creating the checkpoint directories but I'm not
>>> > seeing any content being created in those directories nor am I seeing
>>> the
>>> > effects I'd expect from checkpointing.  I'd expect any data that comes
>>> into
>>> > Kafka while the consumers are down, to get picked up when the
>>> consumers are
>>> > restarted; I'm not seeing that.
>>> >
>>> > For now my checkpoint directory is set to the local file system with
>>> the
>>> > directory URI being in this form:   file:///mnt/dir1/dir2.  I see a
>>> > subdirectory named with a UUID being created under there but no files.
>>> >
>>> > I'm using a custom JavaStreamingContextFactory which creates a
>>> > JavaStreamingContext with the directory set into it via the
>>> > checkpoint(String) method.
>>> >
>>> > I'm currently not invoking the checkpoint(Duration) method on the
>>> DStream
>>> > since I want to first rely on Spark's default checkpointing interval.
>>> My
>>> > streaming batch duration millis is set to 1 second.
>>> >
>>> > Anyone have any idea what might be going wrong?
>>> >
>>> > Also, at which point does Spark delete files from checkpointing?
>>> >
>>> > Thanks.
>>>
>>
>>
>

Reply via email to