>> The whole point of checkpointing is to recover the *exact* computation
where it left off.
That makes sense. We were looking at the metadata checkpointing and the
data checkpointing, and with data checkpointing, you can specify a
checkpoint duration value. With the metadata checkpointing, there
The metadata checkpointing interval does not really affect any performance,
so I didnt expose any way to control that interval. The data checkpointing
interval actually affects performance, hence the interval is configurable.
On Thu, Sep 10, 2015 at 5:45 AM, Dmitry Goldenberg
The whole point of checkpointing is to recover the *exact* computation
where it left of.
If you want any change in the specification of the computation (which
includes any intervals), then you cannot recover from checkpoint as it can
be an arbitrarily complex issue to deal with changes in the
>> when you use getOrCreate, and there exists a valid checkpoint, it will
always return the context from the checkpoint and not call the factory.
Simple way to see whats going on is to print something in the factory to
verify whether it is ever called.
This is probably OK. Seems to explain why we
See inline.
On Tue, Sep 8, 2015 at 9:02 PM, Dmitry Goldenberg
wrote:
> What's wrong with creating a checkpointed context?? We WANT
> checkpointing, first of all. We therefore WANT the checkpointed context.
>
> Second of all, it's not true that we're loading the
Have you tried deleting or moving the contents of the checkpoint directory
and restarting the job?
On Fri, Sep 4, 2015 at 8:02 PM, Dmitry Goldenberg
wrote:
> Sorry, more relevant code below:
>
> SparkConf sparkConf = createSparkConf(appName, kahunaEnv);
>
Why are you checkpointing the direct kafka stream? It serves not purpose.
TD
On Tue, Sep 8, 2015 at 9:35 AM, Dmitry Goldenberg
wrote:
> I just disabled checkpointing in our consumers and I can see that the
> batch duration millis set to 20 seconds is now being
>> Why are you checkpointing the direct kafka stream? It serves not purpose.
Could you elaborate on what you mean?
Our goal is fault tolerance. If a consumer is killed or stopped midstream,
we want to resume where we left off next time the consumer is restarted.
How would that be "not surving
Well, I'm not sure why you're checkpointing messages.
I'd also put in some logging to see what values are actually being read out
of your params object for the various settings.
On Tue, Sep 8, 2015 at 10:24 AM, Dmitry Goldenberg wrote:
> I've stopped the jobs, the
Calling directKafkaStream.checkpoint() will make the system write the raw
kafka data into HDFS files (that is, RDD checkpointing). This is completely
unnecessary with Direct Kafka because it already tracks the offset of data
in each batch (which checkpoint is enabled using
What's wrong with creating a checkpointed context?? We WANT checkpointing,
first of all. We therefore WANT the checkpointed context.
Second of all, it's not true that we're loading the checkpointed context
independent of whether params.isCheckpointed() is true. I'm quoting the
code again:
//
That is good to know. However, that doesn't change the problem I'm seeing.
Which is that, even with that piece of code commented out
(stream.checkpoint()), the batch duration millis aren't getting changed
unless I take checkpointing completely out.
In other words, this commented out:
//if
Well, you are returning JavaStreamingContext.getOrCreate(params.
getCheckpointDir(), factory);
That is loading the checkpointed context, independent of whether params
.isCheckpointed() is true.
On Tue, Sep 8, 2015 at 8:28 PM, Dmitry Goldenberg
wrote:
> That is good
I just disabled checkpointing in our consumers and I can see that the batch
duration millis set to 20 seconds is now being honored.
Why would that be the case?
And how can we "untie" batch duration millis from checkpointing?
Thanks.
On Tue, Sep 8, 2015 at 11:48 AM, Cody Koeninger
Just verified the logic for passing the batch duration millis in, looks OK.
I see the value of 20 seconds being reflected in the logs - but not in the
spark ui.
Also, just commented out this piece and the consumer is still stuck at
using 10 seconds for batch duration millis.
//if
I've stopped the jobs, the workers, and the master. Deleted the contents of
the checkpointing dir. Then restarted master, workers, and consumers.
I'm seeing the job in question still firing every 10 seconds. I'm seeing
the 10 seconds in the Spark Jobs GUI page as well as our logs. Seems quite
Tathagata,
Checkpointing is turned on but we were not recovering. I'm looking at the
logs now, feeding fresh content hours after the restart. Here's a snippet:
2015-09-04 06:11:20,013 ... Documents processed: 0.
2015-09-04 06:11:30,014 ... Documents processed: 0.
2015-09-04 06:11:40,011 ...
Could you see what the streaming tab in the Spark UI says? It should show
the underlying batch duration of the StreamingContext, the details of when
the batch starts, etc.
BTW, it seems that the 5.6 or 6.8 seconds delay is present only when data
is present (that is, * Documents processed: > 0)*
Tathagata,
In our logs I see the batch duration millis being set first to 10 then to
20 seconds. I don't see the 20 being reflected later during ingestion.
In the Spark UI under Streaming I see the below output, notice the *10
second* Batch interval. Can you think of a reason why it's stuck at
Are you sure you are not accidentally recovering from checkpoint? How are
you using StreamingContext.getOrCreate() in your code?
TD
On Fri, Sep 4, 2015 at 4:53 PM, Dmitry Goldenberg
wrote:
> Tathagata,
>
> In our logs I see the batch duration millis being set first to
I'd think that we wouldn't be "accidentally recovering from checkpoint"
hours or even days after consumers have been restarted, plus the content is
the fresh content that I'm feeding, not some content that had been fed
before the last restart.
The code is basically as follows:
SparkConf
Sorry, more relevant code below:
SparkConf sparkConf = createSparkConf(appName, kahunaEnv);
JavaStreamingContext jssc = params.isCheckpointed() ?
createCheckpointedContext(sparkConf, params) : createContext(sparkConf,
params);
jssc.start();
jssc.awaitTermination();
jssc.close();
………..
I'm seeing an oddity where I initially set the batchdurationmillis to 1
second and it works fine:
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.milliseconds(batchDurationMillis));
Then I tried changing the value to 10 seconds. The change didn't seem to
take. I've
Are you accidentally recovering from checkpoint files which has 10 second
as the batch interval?
On Thu, Sep 3, 2015 at 7:34 AM, Dmitry Goldenberg
wrote:
> I'm seeing an oddity where I initially set the batchdurationmillis to 1
> second and it works fine:
>
>
24 matches
Mail list logo