It depends on the stats you are collecting. For example, if you just
collecting counts, you can do away with updateStateByKey completely by
doing insert or update operation on the data store after reduce. I.e.

For each (key, batchCount)
  if (key exists in dataStore)
    update count = count + batchCount for the key
 else
    insert (key, batchCount)

Thanks,
Aniket

On Sat, Nov 7, 2015 at 11:38 AM Thúy Hằng Lê <thuyhang...@gmail.com> wrote:

> Thanks Aniket,
>
> I want to store the state to an external storage but it should be in later
> step I think.
> Basically, I have to use updateStateByKey function to maintain the
> running state (which requires checkpoint), and my bottleneck is now in data
> checkpoint.
>
> My pseudo code is like below:
>
> JavaStreamingContext jssc = new JavaStreamingContext(
> sparkConf,Durations.seconds(2));
>     jssc.checkpoint("spark-data/checkpoint");
>     JavaPairInputDStream<String, String> messages =
> KafkaUtils.createDirectStream(...);
>     JavaPairDStream<String, List<Double>> stats =
> messages.mapToPair(parseJson)
>                             .reduceByKey(REDUCE_STATS)
>                             .updateStateByKey(RUNNING_STATS);
>
>    JavaPairDStream<String, List<Double>> newData =
> stages.filter(NEW_STATS);
>
>    newData.foreachRDD{
>          rdd.forEachPartition{
>                //Store to external storage.
>          }
>   }
>
>   Without using updateStageByKey, I'm only have the stats of the last
> micro-batch.
>
> Any advise on this?
>
>
> 2015-11-07 11:35 GMT+07:00 Aniket Bhatnagar <aniket.bhatna...@gmail.com>:
>
>> Can you try storing the state (word count) in an external key value store?
>>
>> On Sat, Nov 7, 2015, 8:40 AM Thúy Hằng Lê <thuyhang...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> Anyone could help me on this. It's a bit urgent for me on this.
>>> I'm very confused and curious about Spark data checkpoint performance?
>>> Is there any detail implementation of checkpoint I can look into?
>>> Spark Streaming only take sub-second to process 20K messages/sec,
>>> however it take 25 seconds for checkpoint. Now my application have average
>>> 30 seconds latency and keep increasingly.
>>>
>>>
>>> 2015-11-06 11:11 GMT+07:00 Thúy Hằng Lê <thuyhang...@gmail.com>:
>>>
>>>> Thankd all, it would be great to have this feature soon.
>>>> Do you know what's the release plan for 1.6?
>>>>
>>>> In addition to this, I still have checkpoint performance problem
>>>>
>>>> My code is just simple like this:
>>>>     JavaStreamingContext jssc = new
>>>> JavaStreamingContext(sparkConf,Durations.seconds(2));
>>>>     jssc.checkpoint("spark-data/checkpoint");
>>>>     JavaPairInputDStream<String, String> messages =
>>>> KafkaUtils.createDirectStream(...);
>>>>     JavaPairDStream<String, List<Double>> stats =
>>>> messages.mapToPair(parseJson)
>>>>                             .reduceByKey(REDUCE_STATS)
>>>>                             .updateStateByKey(RUNNING_STATS);
>>>>
>>>>     stats.print()
>>>>
>>>>   Now I need to maintain about 800k keys, the stats here is only count
>>>> number of occurence for key.
>>>>   While running the cache dir is very small (about 50M), my question is:
>>>>
>>>>   1/ For regular micro-batch it takes about 800ms to finish, but every
>>>> 10 seconds when data checkpoint is running
>>>>   It took me 5 seconds to finish the same size micro-batch, why it's
>>>> too high? what's kind of job in checkpoint?
>>>>   why it's keep increasing?
>>>>
>>>>   2/ When I changes the data checkpoint interval like using:
>>>>       stats.checkpoint(Durations.seconds(100)); //change to 100,
>>>> defaults is 10
>>>>
>>>>   The checkpoint is keep increasing significantly first checkpoint is
>>>> 10s, second is 30s, third is 70s ... and keep increasing :)
>>>>   Why it's too high when increasing checkpoint interval?
>>>>
>>>> It seems that default interval works more stable.
>>>>
>>>> On Nov 4, 2015 9:08 PM, "Adrian Tanase" <atan...@adobe.com> wrote:
>>>>
>>>>> Nice! Thanks for sharing, I wasn’t aware of the new API.
>>>>>
>>>>> Left some comments on the JIRA and design doc.
>>>>>
>>>>> -adrian
>>>>>
>>>>> From: Shixiong Zhu
>>>>> Date: Tuesday, November 3, 2015 at 3:32 AM
>>>>> To: Thúy Hằng Lê
>>>>> Cc: Adrian Tanase, "user@spark.apache.org"
>>>>> Subject: Re: Spark Streaming data checkpoint performance
>>>>>
>>>>> "trackStateByKey" is about to be added in 1.6 to resolve the
>>>>> performance issue of "updateStateByKey". You can take a look at
>>>>> https://issues.apache.org/jira/browse/SPARK-2629 and
>>>>> https://github.com/apache/spark/pull/9256
>>>>>
>>>>
>>>
>

Reply via email to