Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-10 Thread Dmitry Goldenberg
>> The whole point of checkpointing is to recover the *exact* computation where it left off. That makes sense. We were looking at the metadata checkpointing and the data checkpointing, and with data checkpointing, you can specify a checkpoint duration value. With the metadata checkpointing, there

Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-10 Thread Tathagata Das
The metadata checkpointing interval does not really affect any performance, so I didnt expose any way to control that interval. The data checkpointing interval actually affects performance, hence the interval is configurable. On Thu, Sep 10, 2015 at 5:45 AM, Dmitry Goldenberg

Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-09 Thread Tathagata Das
The whole point of checkpointing is to recover the *exact* computation where it left of. If you want any change in the specification of the computation (which includes any intervals), then you cannot recover from checkpoint as it can be an arbitrarily complex issue to deal with changes in the

Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-09 Thread Dmitry Goldenberg
>> when you use getOrCreate, and there exists a valid checkpoint, it will always return the context from the checkpoint and not call the factory. Simple way to see whats going on is to print something in the factory to verify whether it is ever called. This is probably OK. Seems to explain why we

Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-09 Thread Tathagata Das
See inline. On Tue, Sep 8, 2015 at 9:02 PM, Dmitry Goldenberg wrote: > What's wrong with creating a checkpointed context?? We WANT > checkpointing, first of all. We therefore WANT the checkpointed context. > > Second of all, it's not true that we're loading the

Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-08 Thread Cody Koeninger
Have you tried deleting or moving the contents of the checkpoint directory and restarting the job? On Fri, Sep 4, 2015 at 8:02 PM, Dmitry Goldenberg wrote: > Sorry, more relevant code below: > > SparkConf sparkConf = createSparkConf(appName, kahunaEnv); >

Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-08 Thread Tathagata Das
Why are you checkpointing the direct kafka stream? It serves not purpose. TD On Tue, Sep 8, 2015 at 9:35 AM, Dmitry Goldenberg wrote: > I just disabled checkpointing in our consumers and I can see that the > batch duration millis set to 20 seconds is now being

Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-08 Thread Dmitry Goldenberg
>> Why are you checkpointing the direct kafka stream? It serves not purpose. Could you elaborate on what you mean? Our goal is fault tolerance. If a consumer is killed or stopped midstream, we want to resume where we left off next time the consumer is restarted. How would that be "not surving

Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-08 Thread Cody Koeninger
Well, I'm not sure why you're checkpointing messages. I'd also put in some logging to see what values are actually being read out of your params object for the various settings. On Tue, Sep 8, 2015 at 10:24 AM, Dmitry Goldenberg wrote: > I've stopped the jobs, the

Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-08 Thread Tathagata Das
Calling directKafkaStream.checkpoint() will make the system write the raw kafka data into HDFS files (that is, RDD checkpointing). This is completely unnecessary with Direct Kafka because it already tracks the offset of data in each batch (which checkpoint is enabled using

Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-08 Thread Dmitry Goldenberg
What's wrong with creating a checkpointed context?? We WANT checkpointing, first of all. We therefore WANT the checkpointed context. Second of all, it's not true that we're loading the checkpointed context independent of whether params.isCheckpointed() is true. I'm quoting the code again: //

Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-08 Thread Dmitry Goldenberg
That is good to know. However, that doesn't change the problem I'm seeing. Which is that, even with that piece of code commented out (stream.checkpoint()), the batch duration millis aren't getting changed unless I take checkpointing completely out. In other words, this commented out: //if

Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-08 Thread Tathagata Das
Well, you are returning JavaStreamingContext.getOrCreate(params. getCheckpointDir(), factory); That is loading the checkpointed context, independent of whether params .isCheckpointed() is true. On Tue, Sep 8, 2015 at 8:28 PM, Dmitry Goldenberg wrote: > That is good

Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-08 Thread Dmitry Goldenberg
I just disabled checkpointing in our consumers and I can see that the batch duration millis set to 20 seconds is now being honored. Why would that be the case? And how can we "untie" batch duration millis from checkpointing? Thanks. On Tue, Sep 8, 2015 at 11:48 AM, Cody Koeninger

Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-08 Thread Dmitry Goldenberg
Just verified the logic for passing the batch duration millis in, looks OK. I see the value of 20 seconds being reflected in the logs - but not in the spark ui. Also, just commented out this piece and the consumer is still stuck at using 10 seconds for batch duration millis. //if

Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-08 Thread Dmitry Goldenberg
I've stopped the jobs, the workers, and the master. Deleted the contents of the checkpointing dir. Then restarted master, workers, and consumers. I'm seeing the job in question still firing every 10 seconds. I'm seeing the 10 seconds in the Spark Jobs GUI page as well as our logs. Seems quite

Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-04 Thread Dmitry Goldenberg
Tathagata, Checkpointing is turned on but we were not recovering. I'm looking at the logs now, feeding fresh content hours after the restart. Here's a snippet: 2015-09-04 06:11:20,013 ... Documents processed: 0. 2015-09-04 06:11:30,014 ... Documents processed: 0. 2015-09-04 06:11:40,011 ...

Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-04 Thread Tathagata Das
Could you see what the streaming tab in the Spark UI says? It should show the underlying batch duration of the StreamingContext, the details of when the batch starts, etc. BTW, it seems that the 5.6 or 6.8 seconds delay is present only when data is present (that is, * Documents processed: > 0)*

Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-04 Thread Dmitry Goldenberg
Tathagata, In our logs I see the batch duration millis being set first to 10 then to 20 seconds. I don't see the 20 being reflected later during ingestion. In the Spark UI under Streaming I see the below output, notice the *10 second* Batch interval. Can you think of a reason why it's stuck at

Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-04 Thread Tathagata Das
Are you sure you are not accidentally recovering from checkpoint? How are you using StreamingContext.getOrCreate() in your code? TD On Fri, Sep 4, 2015 at 4:53 PM, Dmitry Goldenberg wrote: > Tathagata, > > In our logs I see the batch duration millis being set first to

Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-04 Thread Dmitry Goldenberg
I'd think that we wouldn't be "accidentally recovering from checkpoint" hours or even days after consumers have been restarted, plus the content is the fresh content that I'm feeding, not some content that had been fed before the last restart. The code is basically as follows: SparkConf

Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-04 Thread Dmitry Goldenberg
Sorry, more relevant code below: SparkConf sparkConf = createSparkConf(appName, kahunaEnv); JavaStreamingContext jssc = params.isCheckpointed() ? createCheckpointedContext(sparkConf, params) : createContext(sparkConf, params); jssc.start(); jssc.awaitTermination(); jssc.close(); ………..

Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-03 Thread Dmitry Goldenberg
I'm seeing an oddity where I initially set the batchdurationmillis to 1 second and it works fine: JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(batchDurationMillis)); Then I tried changing the value to 10 seconds. The change didn't seem to take. I've

Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-03 Thread Tathagata Das
Are you accidentally recovering from checkpoint files which has 10 second as the batch interval? On Thu, Sep 3, 2015 at 7:34 AM, Dmitry Goldenberg wrote: > I'm seeing an oddity where I initially set the batchdurationmillis to 1 > second and it works fine: > >