Thanks Aniket, I want to store the state to an external storage but it should be in later step I think. Basically, I have to use updateStateByKey function to maintain the running state (which requires checkpoint), and my bottleneck is now in data checkpoint.
My pseudo code is like below: JavaStreamingContext jssc = new JavaStreamingContext( sparkConf,Durations.seconds(2)); jssc.checkpoint("spark-data/checkpoint"); JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(...); JavaPairDStream<String, List<Double>> stats = messages.mapToPair(parseJson) .reduceByKey(REDUCE_STATS) .updateStateByKey(RUNNING_STATS); JavaPairDStream<String, List<Double>> newData = stages.filter(NEW_STATS); newData.foreachRDD{ rdd.forEachPartition{ //Store to external storage. } } Without using updateStageByKey, I'm only have the stats of the last micro-batch. Any advise on this? 2015-11-07 11:35 GMT+07:00 Aniket Bhatnagar <aniket.bhatna...@gmail.com>: > Can you try storing the state (word count) in an external key value store? > > On Sat, Nov 7, 2015, 8:40 AM Thúy Hằng Lê <thuyhang...@gmail.com> wrote: > >> Hi all, >> >> Anyone could help me on this. It's a bit urgent for me on this. >> I'm very confused and curious about Spark data checkpoint performance? Is >> there any detail implementation of checkpoint I can look into? >> Spark Streaming only take sub-second to process 20K messages/sec, however >> it take 25 seconds for checkpoint. Now my application have average 30 >> seconds latency and keep increasingly. >> >> >> 2015-11-06 11:11 GMT+07:00 Thúy Hằng Lê <thuyhang...@gmail.com>: >> >>> Thankd all, it would be great to have this feature soon. >>> Do you know what's the release plan for 1.6? >>> >>> In addition to this, I still have checkpoint performance problem >>> >>> My code is just simple like this: >>> JavaStreamingContext jssc = new >>> JavaStreamingContext(sparkConf,Durations.seconds(2)); >>> jssc.checkpoint("spark-data/checkpoint"); >>> JavaPairInputDStream<String, String> messages = >>> KafkaUtils.createDirectStream(...); >>> JavaPairDStream<String, List<Double>> stats = >>> messages.mapToPair(parseJson) >>> .reduceByKey(REDUCE_STATS) >>> .updateStateByKey(RUNNING_STATS); >>> >>> stats.print() >>> >>> Now I need to maintain about 800k keys, the stats here is only count >>> number of occurence for key. >>> While running the cache dir is very small (about 50M), my question is: >>> >>> 1/ For regular micro-batch it takes about 800ms to finish, but every >>> 10 seconds when data checkpoint is running >>> It took me 5 seconds to finish the same size micro-batch, why it's too >>> high? what's kind of job in checkpoint? >>> why it's keep increasing? >>> >>> 2/ When I changes the data checkpoint interval like using: >>> stats.checkpoint(Durations.seconds(100)); //change to 100, >>> defaults is 10 >>> >>> The checkpoint is keep increasing significantly first checkpoint is >>> 10s, second is 30s, third is 70s ... and keep increasing :) >>> Why it's too high when increasing checkpoint interval? >>> >>> It seems that default interval works more stable. >>> >>> On Nov 4, 2015 9:08 PM, "Adrian Tanase" <atan...@adobe.com> wrote: >>> >>>> Nice! Thanks for sharing, I wasn’t aware of the new API. >>>> >>>> Left some comments on the JIRA and design doc. >>>> >>>> -adrian >>>> >>>> From: Shixiong Zhu >>>> Date: Tuesday, November 3, 2015 at 3:32 AM >>>> To: Thúy Hằng Lê >>>> Cc: Adrian Tanase, "user@spark.apache.org" >>>> Subject: Re: Spark Streaming data checkpoint performance >>>> >>>> "trackStateByKey" is about to be added in 1.6 to resolve the >>>> performance issue of "updateStateByKey". You can take a look at >>>> https://issues.apache.org/jira/browse/SPARK-2629 and >>>> https://github.com/apache/spark/pull/9256 >>>> >>> >>