Thanks Aniket,

I want to store the state to an external storage but it should be in later
step I think.
Basically, I have to use updateStateByKey function to maintain the running
state (which requires checkpoint), and my bottleneck is now in data
checkpoint.

My pseudo code is like below:

JavaStreamingContext jssc = new JavaStreamingContext(
sparkConf,Durations.seconds(2));
    jssc.checkpoint("spark-data/checkpoint");
    JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(...);
    JavaPairDStream<String, List<Double>> stats =
messages.mapToPair(parseJson)
                            .reduceByKey(REDUCE_STATS)
                            .updateStateByKey(RUNNING_STATS);

   JavaPairDStream<String, List<Double>> newData = stages.filter(NEW_STATS);

   newData.foreachRDD{
         rdd.forEachPartition{
               //Store to external storage.
         }
  }

  Without using updateStageByKey, I'm only have the stats of the last
micro-batch.

Any advise on this?


2015-11-07 11:35 GMT+07:00 Aniket Bhatnagar <aniket.bhatna...@gmail.com>:

> Can you try storing the state (word count) in an external key value store?
>
> On Sat, Nov 7, 2015, 8:40 AM Thúy Hằng Lê <thuyhang...@gmail.com> wrote:
>
>> Hi all,
>>
>> Anyone could help me on this. It's a bit urgent for me on this.
>> I'm very confused and curious about Spark data checkpoint performance? Is
>> there any detail implementation of checkpoint I can look into?
>> Spark Streaming only take sub-second to process 20K messages/sec, however
>> it take 25 seconds for checkpoint. Now my application have average 30
>> seconds latency and keep increasingly.
>>
>>
>> 2015-11-06 11:11 GMT+07:00 Thúy Hằng Lê <thuyhang...@gmail.com>:
>>
>>> Thankd all, it would be great to have this feature soon.
>>> Do you know what's the release plan for 1.6?
>>>
>>> In addition to this, I still have checkpoint performance problem
>>>
>>> My code is just simple like this:
>>>     JavaStreamingContext jssc = new
>>> JavaStreamingContext(sparkConf,Durations.seconds(2));
>>>     jssc.checkpoint("spark-data/checkpoint");
>>>     JavaPairInputDStream<String, String> messages =
>>> KafkaUtils.createDirectStream(...);
>>>     JavaPairDStream<String, List<Double>> stats =
>>> messages.mapToPair(parseJson)
>>>                             .reduceByKey(REDUCE_STATS)
>>>                             .updateStateByKey(RUNNING_STATS);
>>>
>>>     stats.print()
>>>
>>>   Now I need to maintain about 800k keys, the stats here is only count
>>> number of occurence for key.
>>>   While running the cache dir is very small (about 50M), my question is:
>>>
>>>   1/ For regular micro-batch it takes about 800ms to finish, but every
>>> 10 seconds when data checkpoint is running
>>>   It took me 5 seconds to finish the same size micro-batch, why it's too
>>> high? what's kind of job in checkpoint?
>>>   why it's keep increasing?
>>>
>>>   2/ When I changes the data checkpoint interval like using:
>>>       stats.checkpoint(Durations.seconds(100)); //change to 100,
>>> defaults is 10
>>>
>>>   The checkpoint is keep increasing significantly first checkpoint is
>>> 10s, second is 30s, third is 70s ... and keep increasing :)
>>>   Why it's too high when increasing checkpoint interval?
>>>
>>> It seems that default interval works more stable.
>>>
>>> On Nov 4, 2015 9:08 PM, "Adrian Tanase" <atan...@adobe.com> wrote:
>>>
>>>> Nice! Thanks for sharing, I wasn’t aware of the new API.
>>>>
>>>> Left some comments on the JIRA and design doc.
>>>>
>>>> -adrian
>>>>
>>>> From: Shixiong Zhu
>>>> Date: Tuesday, November 3, 2015 at 3:32 AM
>>>> To: Thúy Hằng Lê
>>>> Cc: Adrian Tanase, "user@spark.apache.org"
>>>> Subject: Re: Spark Streaming data checkpoint performance
>>>>
>>>> "trackStateByKey" is about to be added in 1.6 to resolve the
>>>> performance issue of "updateStateByKey". You can take a look at
>>>> https://issues.apache.org/jira/browse/SPARK-2629 and
>>>> https://github.com/apache/spark/pull/9256
>>>>
>>>
>>

Reply via email to