Thanks, Cody. It sounds like Spark Streaming has enough state info to know how many batches have been processed and if not all of them then the RDD is 'unfinished'. I wonder if it would know whether the last micro-batch has been fully processed successfully. Hypothetically, the driver program could terminate as the last batch is being processed...
On Fri, Aug 14, 2015 at 6:17 PM, Cody Koeninger <c...@koeninger.org> wrote: > You'll resume and re-process the rdd that didnt finish > > On Fri, Aug 14, 2015 at 1:31 PM, Dmitry Goldenberg < > dgoldenberg...@gmail.com> wrote: > >> Our additional question on checkpointing is basically the logistics of it >> -- >> >> At which point does the data get written into checkpointing? Is it >> written as soon as the driver program retrieves an RDD from Kafka (or >> another source)? Or, is it written after that RDD has been processed and >> we're basically moving on to the next RDD? >> >> What I'm driving at is, what happens if the driver program is killed? >> The next time it's started, will it know, from Spark Streaming's >> checkpointing, to resume from the same RDD that was being processed at the >> time of the program getting killed? In other words, will we, upon >> restarting the consumer, resume from the RDD that was unfinished, or will >> we be looking at the next RDD? >> >> Will we pick up from the last known *successfully processed* topic >> offset? >> >> Thanks. >> >> >> >> >> On Fri, Jul 31, 2015 at 1:52 PM, Sean Owen <so...@cloudera.com> wrote: >> >>> If you've set the checkpoint dir, it seems like indeed the intent is >>> to use a default checkpoint interval in DStream: >>> >>> private[streaming] def initialize(time: Time) { >>> ... >>> // Set the checkpoint interval to be slideDuration or 10 seconds, >>> which ever is larger >>> if (mustCheckpoint && checkpointDuration == null) { >>> checkpointDuration = slideDuration * math.ceil(Seconds(10) / >>> slideDuration).toInt >>> logInfo("Checkpoint interval automatically set to " + >>> checkpointDuration) >>> } >>> >>> Do you see that log message? what's the interval? that could at least >>> explain why it's not doing anything, if it's quite long. >>> >>> It sort of seems wrong though since >>> https://spark.apache.org/docs/latest/streaming-programming-guide.html >>> suggests it was intended to be a multiple of the batch interval. The >>> slide duration wouldn't always be relevant anyway. >>> >>> On Fri, Jul 31, 2015 at 6:16 PM, Dmitry Goldenberg >>> <dgoldenberg...@gmail.com> wrote: >>> > I've instrumented checkpointing per the programming guide and I can >>> tell >>> > that Spark Streaming is creating the checkpoint directories but I'm not >>> > seeing any content being created in those directories nor am I seeing >>> the >>> > effects I'd expect from checkpointing. I'd expect any data that comes >>> into >>> > Kafka while the consumers are down, to get picked up when the >>> consumers are >>> > restarted; I'm not seeing that. >>> > >>> > For now my checkpoint directory is set to the local file system with >>> the >>> > directory URI being in this form: file:///mnt/dir1/dir2. I see a >>> > subdirectory named with a UUID being created under there but no files. >>> > >>> > I'm using a custom JavaStreamingContextFactory which creates a >>> > JavaStreamingContext with the directory set into it via the >>> > checkpoint(String) method. >>> > >>> > I'm currently not invoking the checkpoint(Duration) method on the >>> DStream >>> > since I want to first rely on Spark's default checkpointing interval. >>> My >>> > streaming batch duration millis is set to 1 second. >>> > >>> > Anyone have any idea what might be going wrong? >>> > >>> > Also, at which point does Spark delete files from checkpointing? >>> > >>> > Thanks. >>> >> >> >