Re: Spark Streaming data checkpoint performance

2015-11-07 Thread trung kien
Hmm,

Seems it just do a trick.
Using this method, it's very hard to recovery from failure, since we don't
know which batch have been done.

I really want to maintain the whole running stats in memory to archive full
failure-tolerant.

I just wonder if the performance of data checkpoint is that bad? or I
misses something in my setup?

30 seconds for data checkpoint of 1M keys is too much for me.


On Sat, Nov 7, 2015 at 1:25 PM, Aniket Bhatnagar <aniket.bhatna...@gmail.com
> wrote:

> It depends on the stats you are collecting. For example, if you just
> collecting counts, you can do away with updateStateByKey completely by
> doing insert or update operation on the data store after reduce. I.e.
>
> For each (key, batchCount)
>   if (key exists in dataStore)
> update count = count + batchCount for the key
>  else
> insert (key, batchCount)
>
> Thanks,
> Aniket
>
> On Sat, Nov 7, 2015 at 11:38 AM Thúy Hằng Lê <thuyhang...@gmail.com>
> wrote:
>
>> Thanks Aniket,
>>
>> I want to store the state to an external storage but it should be in
>> later step I think.
>> Basically, I have to use updateStateByKey function to maintain the
>> running state (which requires checkpoint), and my bottleneck is now in data
>> checkpoint.
>>
>> My pseudo code is like below:
>>
>> JavaStreamingContext jssc = new JavaStreamingContext(
>> sparkConf,Durations.seconds(2));
>> jssc.checkpoint("spark-data/checkpoint");
>> JavaPairInputDStream<String, String> messages =
>> KafkaUtils.createDirectStream(...);
>> JavaPairDStream<String, List> stats =
>> messages.mapToPair(parseJson)
>> .reduceByKey(REDUCE_STATS)
>> .updateStateByKey(RUNNING_STATS);
>>
>>JavaPairDStream<String, List> newData =
>> stages.filter(NEW_STATS);
>>
>>newData.foreachRDD{
>>  rdd.forEachPartition{
>>//Store to external storage.
>>  }
>>   }
>>
>>   Without using updateStageByKey, I'm only have the stats of the last
>> micro-batch.
>>
>> Any advise on this?
>>
>>
>> 2015-11-07 11:35 GMT+07:00 Aniket Bhatnagar <aniket.bhatna...@gmail.com>:
>>
>>> Can you try storing the state (word count) in an external key value
>>> store?
>>>
>>> On Sat, Nov 7, 2015, 8:40 AM Thúy Hằng Lê <thuyhang...@gmail.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> Anyone could help me on this. It's a bit urgent for me on this.
>>>> I'm very confused and curious about Spark data checkpoint performance?
>>>> Is there any detail implementation of checkpoint I can look into?
>>>> Spark Streaming only take sub-second to process 20K messages/sec,
>>>> however it take 25 seconds for checkpoint. Now my application have average
>>>> 30 seconds latency and keep increasingly.
>>>>
>>>>
>>>> 2015-11-06 11:11 GMT+07:00 Thúy Hằng Lê <thuyhang...@gmail.com>:
>>>>
>>>>> Thankd all, it would be great to have this feature soon.
>>>>> Do you know what's the release plan for 1.6?
>>>>>
>>>>> In addition to this, I still have checkpoint performance problem
>>>>>
>>>>> My code is just simple like this:
>>>>> JavaStreamingContext jssc = new
>>>>> JavaStreamingContext(sparkConf,Durations.seconds(2));
>>>>> jssc.checkpoint("spark-data/checkpoint");
>>>>> JavaPairInputDStream<String, String> messages =
>>>>> KafkaUtils.createDirectStream(...);
>>>>> JavaPairDStream<String, List> stats =
>>>>> messages.mapToPair(parseJson)
>>>>> .reduceByKey(REDUCE_STATS)
>>>>> .updateStateByKey(RUNNING_STATS);
>>>>>
>>>>> stats.print()
>>>>>
>>>>>   Now I need to maintain about 800k keys, the stats here is only count
>>>>> number of occurence for key.
>>>>>   While running the cache dir is very small (about 50M), my question
>>>>> is:
>>>>>
>>>>>   1/ For regular micro-batch it takes about 800ms to finish, but every
>>>>> 10 seconds when data checkpoint is running
>>>>>   It took me 5 seconds to finish the same size micro-batch, why it's
>>>>> too high? what's kind of job in checkpoint?
>>>>>   why it's keep inc

Re: Spark Streaming data checkpoint performance

2015-11-06 Thread Thúy Hằng Lê
Hi all,

Anyone could help me on this. It's a bit urgent for me on this.
I'm very confused and curious about Spark data checkpoint performance? Is
there any detail implementation of checkpoint I can look into?
Spark Streaming only take sub-second to process 20K messages/sec, however
it take 25 seconds for checkpoint. Now my application have average 30
seconds latency and keep increasingly.


2015-11-06 11:11 GMT+07:00 Thúy Hằng Lê <thuyhang...@gmail.com>:

> Thankd all, it would be great to have this feature soon.
> Do you know what's the release plan for 1.6?
>
> In addition to this, I still have checkpoint performance problem
>
> My code is just simple like this:
> JavaStreamingContext jssc = new
> JavaStreamingContext(sparkConf,Durations.seconds(2));
> jssc.checkpoint("spark-data/checkpoint");
> JavaPairInputDStream<String, String> messages =
> KafkaUtils.createDirectStream(...);
> JavaPairDStream<String, List> stats =
> messages.mapToPair(parseJson)
> .reduceByKey(REDUCE_STATS)
> .updateStateByKey(RUNNING_STATS);
>
> stats.print()
>
>   Now I need to maintain about 800k keys, the stats here is only count
> number of occurence for key.
>   While running the cache dir is very small (about 50M), my question is:
>
>   1/ For regular micro-batch it takes about 800ms to finish, but every 10
> seconds when data checkpoint is running
>   It took me 5 seconds to finish the same size micro-batch, why it's too
> high? what's kind of job in checkpoint?
>   why it's keep increasing?
>
>   2/ When I changes the data checkpoint interval like using:
>   stats.checkpoint(Durations.seconds(100)); //change to 100, defaults
> is 10
>
>   The checkpoint is keep increasing significantly first checkpoint is 10s,
> second is 30s, third is 70s ... and keep increasing :)
>   Why it's too high when increasing checkpoint interval?
>
> It seems that default interval works more stable.
>
> On Nov 4, 2015 9:08 PM, "Adrian Tanase" <atan...@adobe.com> wrote:
>
>> Nice! Thanks for sharing, I wasn’t aware of the new API.
>>
>> Left some comments on the JIRA and design doc.
>>
>> -adrian
>>
>> From: Shixiong Zhu
>> Date: Tuesday, November 3, 2015 at 3:32 AM
>> To: Thúy Hằng Lê
>> Cc: Adrian Tanase, "user@spark.apache.org"
>> Subject: Re: Spark Streaming data checkpoint performance
>>
>> "trackStateByKey" is about to be added in 1.6 to resolve the performance
>> issue of "updateStateByKey". You can take a look at
>> https://issues.apache.org/jira/browse/SPARK-2629 and
>> https://github.com/apache/spark/pull/9256
>>
>


Re: Spark Streaming data checkpoint performance

2015-11-06 Thread Aniket Bhatnagar
Can you try storing the state (word count) in an external key value store?

On Sat, Nov 7, 2015, 8:40 AM Thúy Hằng Lê <thuyhang...@gmail.com> wrote:

> Hi all,
>
> Anyone could help me on this. It's a bit urgent for me on this.
> I'm very confused and curious about Spark data checkpoint performance? Is
> there any detail implementation of checkpoint I can look into?
> Spark Streaming only take sub-second to process 20K messages/sec, however
> it take 25 seconds for checkpoint. Now my application have average 30
> seconds latency and keep increasingly.
>
>
> 2015-11-06 11:11 GMT+07:00 Thúy Hằng Lê <thuyhang...@gmail.com>:
>
>> Thankd all, it would be great to have this feature soon.
>> Do you know what's the release plan for 1.6?
>>
>> In addition to this, I still have checkpoint performance problem
>>
>> My code is just simple like this:
>> JavaStreamingContext jssc = new
>> JavaStreamingContext(sparkConf,Durations.seconds(2));
>> jssc.checkpoint("spark-data/checkpoint");
>> JavaPairInputDStream<String, String> messages =
>> KafkaUtils.createDirectStream(...);
>> JavaPairDStream<String, List> stats =
>> messages.mapToPair(parseJson)
>> .reduceByKey(REDUCE_STATS)
>> .updateStateByKey(RUNNING_STATS);
>>
>> stats.print()
>>
>>   Now I need to maintain about 800k keys, the stats here is only count
>> number of occurence for key.
>>   While running the cache dir is very small (about 50M), my question is:
>>
>>   1/ For regular micro-batch it takes about 800ms to finish, but every 10
>> seconds when data checkpoint is running
>>   It took me 5 seconds to finish the same size micro-batch, why it's too
>> high? what's kind of job in checkpoint?
>>   why it's keep increasing?
>>
>>   2/ When I changes the data checkpoint interval like using:
>>   stats.checkpoint(Durations.seconds(100)); //change to 100, defaults
>> is 10
>>
>>   The checkpoint is keep increasing significantly first checkpoint is
>> 10s, second is 30s, third is 70s ... and keep increasing :)
>>   Why it's too high when increasing checkpoint interval?
>>
>> It seems that default interval works more stable.
>>
>> On Nov 4, 2015 9:08 PM, "Adrian Tanase" <atan...@adobe.com> wrote:
>>
>>> Nice! Thanks for sharing, I wasn’t aware of the new API.
>>>
>>> Left some comments on the JIRA and design doc.
>>>
>>> -adrian
>>>
>>> From: Shixiong Zhu
>>> Date: Tuesday, November 3, 2015 at 3:32 AM
>>> To: Thúy Hằng Lê
>>> Cc: Adrian Tanase, "user@spark.apache.org"
>>> Subject: Re: Spark Streaming data checkpoint performance
>>>
>>> "trackStateByKey" is about to be added in 1.6 to resolve the
>>> performance issue of "updateStateByKey". You can take a look at
>>> https://issues.apache.org/jira/browse/SPARK-2629 and
>>> https://github.com/apache/spark/pull/9256
>>>
>>
>


Re: Spark Streaming data checkpoint performance

2015-11-06 Thread Aniket Bhatnagar
It depends on the stats you are collecting. For example, if you just
collecting counts, you can do away with updateStateByKey completely by
doing insert or update operation on the data store after reduce. I.e.

For each (key, batchCount)
  if (key exists in dataStore)
update count = count + batchCount for the key
 else
insert (key, batchCount)

Thanks,
Aniket

On Sat, Nov 7, 2015 at 11:38 AM Thúy Hằng Lê <thuyhang...@gmail.com> wrote:

> Thanks Aniket,
>
> I want to store the state to an external storage but it should be in later
> step I think.
> Basically, I have to use updateStateByKey function to maintain the
> running state (which requires checkpoint), and my bottleneck is now in data
> checkpoint.
>
> My pseudo code is like below:
>
> JavaStreamingContext jssc = new JavaStreamingContext(
> sparkConf,Durations.seconds(2));
> jssc.checkpoint("spark-data/checkpoint");
> JavaPairInputDStream<String, String> messages =
> KafkaUtils.createDirectStream(...);
> JavaPairDStream<String, List> stats =
> messages.mapToPair(parseJson)
> .reduceByKey(REDUCE_STATS)
> .updateStateByKey(RUNNING_STATS);
>
>JavaPairDStream<String, List> newData =
> stages.filter(NEW_STATS);
>
>newData.foreachRDD{
>  rdd.forEachPartition{
>//Store to external storage.
>  }
>   }
>
>   Without using updateStageByKey, I'm only have the stats of the last
> micro-batch.
>
> Any advise on this?
>
>
> 2015-11-07 11:35 GMT+07:00 Aniket Bhatnagar <aniket.bhatna...@gmail.com>:
>
>> Can you try storing the state (word count) in an external key value store?
>>
>> On Sat, Nov 7, 2015, 8:40 AM Thúy Hằng Lê <thuyhang...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> Anyone could help me on this. It's a bit urgent for me on this.
>>> I'm very confused and curious about Spark data checkpoint performance?
>>> Is there any detail implementation of checkpoint I can look into?
>>> Spark Streaming only take sub-second to process 20K messages/sec,
>>> however it take 25 seconds for checkpoint. Now my application have average
>>> 30 seconds latency and keep increasingly.
>>>
>>>
>>> 2015-11-06 11:11 GMT+07:00 Thúy Hằng Lê <thuyhang...@gmail.com>:
>>>
>>>> Thankd all, it would be great to have this feature soon.
>>>> Do you know what's the release plan for 1.6?
>>>>
>>>> In addition to this, I still have checkpoint performance problem
>>>>
>>>> My code is just simple like this:
>>>> JavaStreamingContext jssc = new
>>>> JavaStreamingContext(sparkConf,Durations.seconds(2));
>>>> jssc.checkpoint("spark-data/checkpoint");
>>>> JavaPairInputDStream<String, String> messages =
>>>> KafkaUtils.createDirectStream(...);
>>>> JavaPairDStream<String, List> stats =
>>>> messages.mapToPair(parseJson)
>>>> .reduceByKey(REDUCE_STATS)
>>>> .updateStateByKey(RUNNING_STATS);
>>>>
>>>> stats.print()
>>>>
>>>>   Now I need to maintain about 800k keys, the stats here is only count
>>>> number of occurence for key.
>>>>   While running the cache dir is very small (about 50M), my question is:
>>>>
>>>>   1/ For regular micro-batch it takes about 800ms to finish, but every
>>>> 10 seconds when data checkpoint is running
>>>>   It took me 5 seconds to finish the same size micro-batch, why it's
>>>> too high? what's kind of job in checkpoint?
>>>>   why it's keep increasing?
>>>>
>>>>   2/ When I changes the data checkpoint interval like using:
>>>>   stats.checkpoint(Durations.seconds(100)); //change to 100,
>>>> defaults is 10
>>>>
>>>>   The checkpoint is keep increasing significantly first checkpoint is
>>>> 10s, second is 30s, third is 70s ... and keep increasing :)
>>>>   Why it's too high when increasing checkpoint interval?
>>>>
>>>> It seems that default interval works more stable.
>>>>
>>>> On Nov 4, 2015 9:08 PM, "Adrian Tanase" <atan...@adobe.com> wrote:
>>>>
>>>>> Nice! Thanks for sharing, I wasn’t aware of the new API.
>>>>>
>>>>> Left some comments on the JIRA and design doc.
>>>>>
>>>>> -adrian
>>>>>
>>>>> From: Shixiong Zhu
>>>>> Date: Tuesday, November 3, 2015 at 3:32 AM
>>>>> To: Thúy Hằng Lê
>>>>> Cc: Adrian Tanase, "user@spark.apache.org"
>>>>> Subject: Re: Spark Streaming data checkpoint performance
>>>>>
>>>>> "trackStateByKey" is about to be added in 1.6 to resolve the
>>>>> performance issue of "updateStateByKey". You can take a look at
>>>>> https://issues.apache.org/jira/browse/SPARK-2629 and
>>>>> https://github.com/apache/spark/pull/9256
>>>>>
>>>>
>>>
>


Re: Spark Streaming data checkpoint performance

2015-11-06 Thread Thúy Hằng Lê
Thanks Aniket,

I want to store the state to an external storage but it should be in later
step I think.
Basically, I have to use updateStateByKey function to maintain the running
state (which requires checkpoint), and my bottleneck is now in data
checkpoint.

My pseudo code is like below:

JavaStreamingContext jssc = new JavaStreamingContext(
sparkConf,Durations.seconds(2));
jssc.checkpoint("spark-data/checkpoint");
JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(...);
JavaPairDStream<String, List> stats =
messages.mapToPair(parseJson)
.reduceByKey(REDUCE_STATS)
.updateStateByKey(RUNNING_STATS);

   JavaPairDStream<String, List> newData = stages.filter(NEW_STATS);

   newData.foreachRDD{
 rdd.forEachPartition{
   //Store to external storage.
 }
  }

  Without using updateStageByKey, I'm only have the stats of the last
micro-batch.

Any advise on this?


2015-11-07 11:35 GMT+07:00 Aniket Bhatnagar <aniket.bhatna...@gmail.com>:

> Can you try storing the state (word count) in an external key value store?
>
> On Sat, Nov 7, 2015, 8:40 AM Thúy Hằng Lê <thuyhang...@gmail.com> wrote:
>
>> Hi all,
>>
>> Anyone could help me on this. It's a bit urgent for me on this.
>> I'm very confused and curious about Spark data checkpoint performance? Is
>> there any detail implementation of checkpoint I can look into?
>> Spark Streaming only take sub-second to process 20K messages/sec, however
>> it take 25 seconds for checkpoint. Now my application have average 30
>> seconds latency and keep increasingly.
>>
>>
>> 2015-11-06 11:11 GMT+07:00 Thúy Hằng Lê <thuyhang...@gmail.com>:
>>
>>> Thankd all, it would be great to have this feature soon.
>>> Do you know what's the release plan for 1.6?
>>>
>>> In addition to this, I still have checkpoint performance problem
>>>
>>> My code is just simple like this:
>>> JavaStreamingContext jssc = new
>>> JavaStreamingContext(sparkConf,Durations.seconds(2));
>>> jssc.checkpoint("spark-data/checkpoint");
>>> JavaPairInputDStream<String, String> messages =
>>> KafkaUtils.createDirectStream(...);
>>> JavaPairDStream<String, List> stats =
>>> messages.mapToPair(parseJson)
>>> .reduceByKey(REDUCE_STATS)
>>> .updateStateByKey(RUNNING_STATS);
>>>
>>> stats.print()
>>>
>>>   Now I need to maintain about 800k keys, the stats here is only count
>>> number of occurence for key.
>>>   While running the cache dir is very small (about 50M), my question is:
>>>
>>>   1/ For regular micro-batch it takes about 800ms to finish, but every
>>> 10 seconds when data checkpoint is running
>>>   It took me 5 seconds to finish the same size micro-batch, why it's too
>>> high? what's kind of job in checkpoint?
>>>   why it's keep increasing?
>>>
>>>   2/ When I changes the data checkpoint interval like using:
>>>   stats.checkpoint(Durations.seconds(100)); //change to 100,
>>> defaults is 10
>>>
>>>   The checkpoint is keep increasing significantly first checkpoint is
>>> 10s, second is 30s, third is 70s ... and keep increasing :)
>>>   Why it's too high when increasing checkpoint interval?
>>>
>>> It seems that default interval works more stable.
>>>
>>> On Nov 4, 2015 9:08 PM, "Adrian Tanase" <atan...@adobe.com> wrote:
>>>
>>>> Nice! Thanks for sharing, I wasn’t aware of the new API.
>>>>
>>>> Left some comments on the JIRA and design doc.
>>>>
>>>> -adrian
>>>>
>>>> From: Shixiong Zhu
>>>> Date: Tuesday, November 3, 2015 at 3:32 AM
>>>> To: Thúy Hằng Lê
>>>> Cc: Adrian Tanase, "user@spark.apache.org"
>>>> Subject: Re: Spark Streaming data checkpoint performance
>>>>
>>>> "trackStateByKey" is about to be added in 1.6 to resolve the
>>>> performance issue of "updateStateByKey". You can take a look at
>>>> https://issues.apache.org/jira/browse/SPARK-2629 and
>>>> https://github.com/apache/spark/pull/9256
>>>>
>>>
>>


Re: Spark Streaming data checkpoint performance

2015-11-05 Thread Thúy Hằng Lê
Thankd all, it would be great to have this feature soon.
Do you know what's the release plan for 1.6?

In addition to this, I still have checkpoint performance problem

My code is just simple like this:
JavaStreamingContext jssc = new
JavaStreamingContext(sparkConf,Durations.seconds(2));
jssc.checkpoint("spark-data/checkpoint");
JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(...);
JavaPairDStream<String, List> stats =
messages.mapToPair(parseJson)
.reduceByKey(REDUCE_STATS)
.updateStateByKey(RUNNING_STATS);

stats.print()

  Now I need to maintain about 800k keys, the stats here is only count
number of occurence for key.
  While running the cache dir is very small (about 50M), my question is:

  1/ For regular micro-batch it takes about 800ms to finish, but every 10
seconds when data checkpoint is running
  It took me 5 seconds to finish the same size micro-batch, why it's too
high? what's kind of job in checkpoint?
  why it's keep increasing?

  2/ When I changes the data checkpoint interval like using:
  stats.checkpoint(Durations.seconds(100)); //change to 100, defaults
is 10

  The checkpoint is keep increasing significantly first checkpoint is 10s,
second is 30s, third is 70s ... and keep increasing :)
  Why it's too high when increasing checkpoint interval?

It seems that default interval works more stable.

On Nov 4, 2015 9:08 PM, "Adrian Tanase" <atan...@adobe.com> wrote:

> Nice! Thanks for sharing, I wasn’t aware of the new API.
>
> Left some comments on the JIRA and design doc.
>
> -adrian
>
> From: Shixiong Zhu
> Date: Tuesday, November 3, 2015 at 3:32 AM
> To: Thúy Hằng Lê
> Cc: Adrian Tanase, "user@spark.apache.org"
> Subject: Re: Spark Streaming data checkpoint performance
>
> "trackStateByKey" is about to be added in 1.6 to resolve the performance
> issue of "updateStateByKey". You can take a look at
> https://issues.apache.org/jira/browse/SPARK-2629 and
> https://github.com/apache/spark/pull/9256
>


Re: Spark Streaming data checkpoint performance

2015-11-04 Thread Adrian Tanase
Nice! Thanks for sharing, I wasn’t aware of the new API.

Left some comments on the JIRA and design doc.

-adrian

From: Shixiong Zhu
Date: Tuesday, November 3, 2015 at 3:32 AM
To: Thúy Hằng Lê
Cc: Adrian Tanase, "user@spark.apache.org<mailto:user@spark.apache.org>"
Subject: Re: Spark Streaming data checkpoint performance

"trackStateByKey" is about to be added in 1.6 to resolve the performance issue 
of "updateStateByKey". You can take a look at 
https://issues.apache.org/jira/browse/SPARK-2629 and 
https://github.com/apache/spark/pull/9256


Re: Spark Streaming data checkpoint performance

2015-11-02 Thread Adrian Tanase
You are correct, the default checkpointing interval is 10 seconds or your batch 
size, whichever is bigger. You can change it by calling .checkpoint(x) on your 
resulting Dstream.

For the rest, you are probably keeping an “all time” word count that grows 
unbounded if you never remove words from the map. Keep in mind that 
updateStateByKey is called for every key in the state RDD, regardless if you 
have new occurrences or not.

You should consider at least one of these strategies:

  *   run your word count on a windowed Dstream (e.g. Unique counts over the 
last 15 minutes)
 *   Your best bet here is reduceByKeyAndWindow with an inverse function
  *   Make your state object more complicated and try to prune out words with 
very few occurrences or that haven’t been updated for a long time
 *   You can do this by emitting None from updateStateByKey

Hope this helps,
-adrian

From: Thúy Hằng Lê
Date: Monday, November 2, 2015 at 7:20 AM
To: "user@spark.apache.org<mailto:user@spark.apache.org>"
Subject: Spark Streaming data checkpoint performance

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, 
Durations.seconds(2));


Re: Spark Streaming data checkpoint performance

2015-11-02 Thread Thúy Hằng Lê
Hi Andrian,

Thanks for the information.

However your 2 suggestions couldn't really work for me.

Accuracy is the most important aspect in my application. So keeping only 15
minutes window stats or prune out some of keys is impossible for my
application.

I can change the checking point interval as your suggestion,
however is there any other Spark configuration to turning the data
checkpoint performance?

And just curious, technically why updateStateByKey need to be called for
very key (regardless the new occurrences or not)? Does Spark has any plan
to fix it?
I have 4M keys need to maintain the statistics however only few of them are
changed in each batch interval.

2015-11-02 22:37 GMT+07:00 Adrian Tanase <atan...@adobe.com>:

> You are correct, the default checkpointing interval is 10 seconds or your
> batch size, whichever is bigger. You can change it by calling
> .checkpoint(x) on your resulting Dstream.
>
> For the rest, you are probably keeping an “all time” word count that grows
> unbounded if you never remove words from the map. Keep in mind that
> updateStateByKey is called for every key in the state RDD, regardless if
> you have new occurrences or not.
>
> You should consider at least one of these strategies:
>
>- run your word count on a windowed Dstream (e.g. Unique counts over
>the last 15 minutes)
>   - Your best bet here is reduceByKeyAndWindow with an inverse
>   function
>- Make your state object more complicated and try to prune out words
>with very few occurrences or that haven’t been updated for a long time
>   - You can do this by emitting None from updateStateByKey
>
> Hope this helps,
> -adrian
>
> From: Thúy Hằng Lê
> Date: Monday, November 2, 2015 at 7:20 AM
> To: "user@spark.apache.org"
> Subject: Spark Streaming data checkpoint performance
>
> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
> Durations.seconds(2));
>


Re: Spark Streaming data checkpoint performance

2015-11-02 Thread Shixiong Zhu
"trackStateByKey" is about to be added in 1.6 to resolve the performance
issue of "updateStateByKey". You can take a look at
https://issues.apache.org/jira/browse/SPARK-2629 and
https://github.com/apache/spark/pull/9256


Spark Streaming data checkpoint performance

2015-11-01 Thread Thúy Hằng Lê
Hi Spark guru

I am evaluating Spark Streaming,

In my application I need to maintain cumulative statistics (e.g the total
running word count), so I need to call the updateStateByKey function on
very micro-batch.

After setting those things, I got following behaviors:
* The Processing Time is very high every 10 seconds - usually 5x
higher (which I guess it's data checking point job)
* The Processing Time becomes higher and higher over time, after 10
minutes it's much higher than the batch interval and lead to huge
Scheduling Delay and a lots Active Batches in queue.

My questions is:

 * Is this expected behavior? Is there any way to improve the
performance of data checking point?
 * How data checking point in Spark Streaming works? Does it need
to load all previous checking point data in order to build new one?

My job is very simple:

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.seconds(2));
JavaPairInputDStream messages =
KafkaUtils.createDirectStream(...);

JavaPairDStream stats = messages.mapToPair(parseJson)
.reduceByKey(REDUCE_STATS) .updateStateByKey(RUNNING_STATS);

stats.print()