Hi Kafka Streams Users, I am trying to improve the performance of Kafka Streams State Store Persistent Store. In our application we are using Kafka Streams Processor API and using Persistent State Store.. My application when starts up it performing well but over a period of time the performance deteriorated. I am computing certain results in computeAnalytics method and this method is not taking time at all. This method is being called within both process and punctuate and I am storing the updated object back to store. Over the period of time its taking huge time for completing the punctuate process and I could see majority of the time is spent in storing the records and Iterating the records. The record size is just 2500 per partition. I am not where I am going wrong and how can I improve the performance.
Below is one such sample log record. INFO | 07:59:58 | processors.MyProcessor (MyProcessor.java:123) - Time Metrics for punctuate for TimeStamp :: 1505564655878 processed Records :: 2109 totalTimeTakenToProcessRecords :: 2 totalTimeTakenToStoreRecord :: 27605toal time Taken to retrieve Records :: 12787 Total time Taken :: 40394 Below I have given my pseudo code for my processor which exactly resembles the code which I am using in my application. MyProcessor(){ process(Key objectkey, Update eventupdate){ long timestamp=context.timestamp(); AnalyticeObj storeobj=store.get(objectkey); if( storeobj ===null) { storeobj=new AnalyticeObj(objectkey,eventupdate,timestamp) } else { storeobj.update(eventupdate,timestamp) } storeobj=storeobj.computeAnalytics(); store.put(objectkey,storeobj); context.commit(); } // Every 5 seconds punctuate(long timestamp) { long startTime = System.currentTimeMillis(); long totalTimeTakenToProcessRecords=0; long totalTimeTakenToStoreRecords=0; long counter=0; KeyValueIterator iter=this.visitStore.all(); while (iter.hasNext()) { KeyValue<Key, AnalyticeObj> entry = iter.next(); if(AnalyticeObj.hasExpired(timestamp) store.remove(entry.key) else { long processStartTime=System.currentTimeMillis(); AnalyticeObj storeobj= entry.value.computeAnalytics(timestamp); totalTimeTakenToProcessRecords=totalTimeTakenToProcessRecords+(System.currentTimeMillis()-processStartTime); long storeStartTime=System.currentTimeMillis(); store.put(entry.key,storeobj); totalTimeTakenToStoreRecords=totalTimeTakenToStoreRecords+(System.currentTimeMillis()-storeStartTime); } counter++; } logger.info(" Time Metrics for punctuate " " for TimeStamp :: " + "" + timestamp + " processed Records :: " + counter +" totalTimeTakenToProcessRecords :: "+totalTimeTakenToProcessRecords +" totalTimeTakenToStoreRecord :: "+totalTimeTakenToStoreRecords +"toal time Taken to retrieve Records :: "+ (System.currentTimeMillis() - (startTime+totalTimeTakenToProcessRecords+totalTimeTakenToStoreRecords))+" Total time Taken :: " + (System.currentTimeMillis() - startTime)); } }