Hmm, Seems it just do a trick. Using this method, it's very hard to recovery from failure, since we don't know which batch have been done.
I really want to maintain the whole running stats in memory to archive full failure-tolerant. I just wonder if the performance of data checkpoint is that bad? or I misses something in my setup? 30 seconds for data checkpoint of 1M keys is too much for me. On Sat, Nov 7, 2015 at 1:25 PM, Aniket Bhatnagar <aniket.bhatna...@gmail.com > wrote: > It depends on the stats you are collecting. For example, if you just > collecting counts, you can do away with updateStateByKey completely by > doing insert or update operation on the data store after reduce. I.e. > > For each (key, batchCount) > if (key exists in dataStore) > update count = count + batchCount for the key > else > insert (key, batchCount) > > Thanks, > Aniket > > On Sat, Nov 7, 2015 at 11:38 AM Thúy Hằng Lê <thuyhang...@gmail.com> > wrote: > >> Thanks Aniket, >> >> I want to store the state to an external storage but it should be in >> later step I think. >> Basically, I have to use updateStateByKey function to maintain the >> running state (which requires checkpoint), and my bottleneck is now in data >> checkpoint. >> >> My pseudo code is like below: >> >> JavaStreamingContext jssc = new JavaStreamingContext( >> sparkConf,Durations.seconds(2)); >> jssc.checkpoint("spark-data/checkpoint"); >> JavaPairInputDStream<String, String> messages = >> KafkaUtils.createDirectStream(...); >> JavaPairDStream<String, List<Double>> stats = >> messages.mapToPair(parseJson) >> .reduceByKey(REDUCE_STATS) >> .updateStateByKey(RUNNING_STATS); >> >> JavaPairDStream<String, List<Double>> newData = >> stages.filter(NEW_STATS); >> >> newData.foreachRDD{ >> rdd.forEachPartition{ >> //Store to external storage. >> } >> } >> >> Without using updateStageByKey, I'm only have the stats of the last >> micro-batch. >> >> Any advise on this? >> >> >> 2015-11-07 11:35 GMT+07:00 Aniket Bhatnagar <aniket.bhatna...@gmail.com>: >> >>> Can you try storing the state (word count) in an external key value >>> store? >>> >>> On Sat, Nov 7, 2015, 8:40 AM Thúy Hằng Lê <thuyhang...@gmail.com> wrote: >>> >>>> Hi all, >>>> >>>> Anyone could help me on this. It's a bit urgent for me on this. >>>> I'm very confused and curious about Spark data checkpoint performance? >>>> Is there any detail implementation of checkpoint I can look into? >>>> Spark Streaming only take sub-second to process 20K messages/sec, >>>> however it take 25 seconds for checkpoint. Now my application have average >>>> 30 seconds latency and keep increasingly. >>>> >>>> >>>> 2015-11-06 11:11 GMT+07:00 Thúy Hằng Lê <thuyhang...@gmail.com>: >>>> >>>>> Thankd all, it would be great to have this feature soon. >>>>> Do you know what's the release plan for 1.6? >>>>> >>>>> In addition to this, I still have checkpoint performance problem >>>>> >>>>> My code is just simple like this: >>>>> JavaStreamingContext jssc = new >>>>> JavaStreamingContext(sparkConf,Durations.seconds(2)); >>>>> jssc.checkpoint("spark-data/checkpoint"); >>>>> JavaPairInputDStream<String, String> messages = >>>>> KafkaUtils.createDirectStream(...); >>>>> JavaPairDStream<String, List<Double>> stats = >>>>> messages.mapToPair(parseJson) >>>>> .reduceByKey(REDUCE_STATS) >>>>> .updateStateByKey(RUNNING_STATS); >>>>> >>>>> stats.print() >>>>> >>>>> Now I need to maintain about 800k keys, the stats here is only count >>>>> number of occurence for key. >>>>> While running the cache dir is very small (about 50M), my question >>>>> is: >>>>> >>>>> 1/ For regular micro-batch it takes about 800ms to finish, but every >>>>> 10 seconds when data checkpoint is running >>>>> It took me 5 seconds to finish the same size micro-batch, why it's >>>>> too high? what's kind of job in checkpoint? >>>>> why it's keep increasing? >>>>> >>>>> 2/ When I changes the data checkpoint interval like using: >>>>> stats.checkpoint(Durations.seconds(100)); //change to 100, >>>>> defaults is 10 >>>>> >>>>> The checkpoint is keep increasing significantly first checkpoint is >>>>> 10s, second is 30s, third is 70s ... and keep increasing :) >>>>> Why it's too high when increasing checkpoint interval? >>>>> >>>>> It seems that default interval works more stable. >>>>> >>>>> On Nov 4, 2015 9:08 PM, "Adrian Tanase" <atan...@adobe.com> wrote: >>>>> >>>>>> Nice! Thanks for sharing, I wasn’t aware of the new API. >>>>>> >>>>>> Left some comments on the JIRA and design doc. >>>>>> >>>>>> -adrian >>>>>> >>>>>> From: Shixiong Zhu >>>>>> Date: Tuesday, November 3, 2015 at 3:32 AM >>>>>> To: Thúy Hằng Lê >>>>>> Cc: Adrian Tanase, "user@spark.apache.org" >>>>>> Subject: Re: Spark Streaming data checkpoint performance >>>>>> >>>>>> "trackStateByKey" is about to be added in 1.6 to resolve the >>>>>> performance issue of "updateStateByKey". You can take a look at >>>>>> https://issues.apache.org/jira/browse/SPARK-2629 and >>>>>> https://github.com/apache/spark/pull/9256 >>>>>> >>>>> >>>> >> -- Thanks Kien