Hi, It's hard to say exactly without a little more information.
On a side note, I don't see where you are closing the KeyValueIterator in the code above. Not closing a KeyValueIterator on a Permanent State Store can cause a resource leak over time, so I'd add `iter.close()` right before your `logger.info` call. It might be worth retrying at that point. Thanks, Bill On Sat, Sep 16, 2017 at 9:14 AM, dev loper <spark...@gmail.com> wrote: > Hi Kafka Streams Users, > > I am trying to improve the performance of Kafka Streams State Store > Persistent Store. In our application we are using Kafka Streams Processor > API and using Persistent State Store.. My application when starts up it > performing well but over a period of time the performance deteriorated. I > am computing certain results in computeAnalytics method and this method is > not taking time at all. This method is being called within both process and > punctuate and I am storing the updated object back to store. Over the > period of time its taking huge time for completing the punctuate process > and I could see majority of the time is spent in storing the records and > Iterating the records. The record size is just 2500 per partition. I am not > where I am going wrong and how can I improve the performance. > > Below is one such sample log record. > > INFO | 07:59:58 | processors.MyProcessor (MyProcessor.java:123) - Time > Metrics for punctuate for TimeStamp :: 1505564655878 processed Records :: > 2109 totalTimeTakenToProcessRecords :: 2 totalTimeTakenToStoreRecord :: > 27605toal time Taken to retrieve Records :: 12787 Total time Taken :: 40394 > > Below I have given my pseudo code for my processor which exactly resembles > the code which I am using in my application. > > MyProcessor(){ > > process(Key objectkey, Update eventupdate){ > long timestamp=context.timestamp(); > AnalyticeObj storeobj=store.get(objectkey); > > if( storeobj ===null) > { > storeobj=new AnalyticeObj(objectkey,eventupdate,timestamp) > } > else > { > storeobj.update(eventupdate,timestamp) > } > storeobj=storeobj.computeAnalytics(); > > store.put(objectkey,storeobj); > context.commit(); > } > // Every 5 seconds > punctuate(long timestamp) > { > long startTime = System.currentTimeMillis(); > long totalTimeTakenToProcessRecords=0; > long totalTimeTakenToStoreRecords=0; > long counter=0; > KeyValueIterator iter=this.visitStore.all(); > while (iter.hasNext()) { > KeyValue<Key, AnalyticeObj> entry = iter.next(); > > if(AnalyticeObj.hasExpired(timestamp) > store.remove(entry.key) > else > { > long processStartTime=System.currentTimeMillis(); > AnalyticeObj storeobj= entry.value.computeAnalytics(timestamp); > > totalTimeTakenToProcessRecords=totalTimeTakenToProcessRecords > +(System.currentTimeMillis()-processStartTime); > > long storeStartTime=System.currentTimeMillis(); > store.put(entry.key,storeobj); > > totalTimeTakenToStoreRecords=totalTimeTakenToStoreRecords+( > System.currentTimeMillis()-storeStartTime); > } > counter++; > } > logger.info(" Time Metrics for punctuate " > " for TimeStamp :: " + "" + timestamp + " processed > Records :: " > + counter +" totalTimeTakenToProcessRecords :: > "+totalTimeTakenToProcessRecords +" totalTimeTakenToStoreRecord :: > "+totalTimeTakenToStoreRecords > +"toal time Taken to retrieve Records :: "+ > (System.currentTimeMillis() - > (startTime+totalTimeTakenToProcessRecords+totalTimeTakenToStoreRecords))+" > Total time Taken :: " + (System.currentTimeMillis() - startTime)); > } > } >