Hi Kafka Streams Users,

I am trying to improve the performance of Kafka Streams State Store
Persistent Store. In our application we are using Kafka Streams Processor
API  and using Persistent State Store.. My application when starts up it
performing well but over a period of time the performance deteriorated. I
am computing certain results in computeAnalytics method and this method is
not taking time at all. This method is being called within both process and
punctuate and I am storing the updated object back to store. Over the
period of time its taking huge time for completing the punctuate process
and I could see majority of the time is spent in storing the records and
Iterating the records. The record size is just 2500 per partition. I am not
where I am going wrong and how can I improve the performance.

Below is one such sample log record.

INFO  | 07:59:58 | processors.MyProcessor (MyProcessor.java:123) - Time
Metrics for punctuate  for TimeStamp :: 1505564655878 processed Records ::
2109 totalTimeTakenToProcessRecords :: 2 totalTimeTakenToStoreRecord ::
27605toal time Taken to retrieve Records :: 12787 Total time Taken :: 40394

Below I have given my pseudo code for my processor which exactly resembles
the code which I am using in my application.

MyProcessor(){

  process(Key objectkey, Update eventupdate){
   long timestamp=context.timestamp();
   AnalyticeObj storeobj=store.get(objectkey);

   if( storeobj ===null)
         {
          storeobj=new  AnalyticeObj(objectkey,eventupdate,timestamp)
         }
         else
        {
           storeobj.update(eventupdate,timestamp)
        }
     storeobj=storeobj.computeAnalytics();

   store.put(objectkey,storeobj);
  context.commit();
}
// Every 5 seconds
punctuate(long timestamp)
{
 long startTime = System.currentTimeMillis();
long totalTimeTakenToProcessRecords=0;
long totalTimeTakenToStoreRecords=0;
long counter=0;
KeyValueIterator iter=this.visitStore.all();
while (iter.hasNext()) {
KeyValue<Key, AnalyticeObj> entry = iter.next();

    if(AnalyticeObj.hasExpired(timestamp)
         store.remove(entry.key)
      else
      {
        long processStartTime=System.currentTimeMillis();
         AnalyticeObj storeobj= entry.value.computeAnalytics(timestamp);

totalTimeTakenToProcessRecords=totalTimeTakenToProcessRecords+(System.currentTimeMillis()-processStartTime);

         long storeStartTime=System.currentTimeMillis();
          store.put(entry.key,storeobj);

totalTimeTakenToStoreRecords=totalTimeTakenToStoreRecords+(System.currentTimeMillis()-storeStartTime);
       }
   counter++;
}
     logger.info(" Time Metrics for punctuate  "
                    " for TimeStamp :: " + "" + timestamp + " processed
Records :: "
                    + counter +" totalTimeTakenToProcessRecords ::
"+totalTimeTakenToProcessRecords +" totalTimeTakenToStoreRecord ::
"+totalTimeTakenToStoreRecords
                    +"toal time Taken to retrieve Records :: "+
(System.currentTimeMillis() -
(startTime+totalTimeTakenToProcessRecords+totalTimeTakenToStoreRecords))+"
Total time Taken :: " + (System.currentTimeMillis() - startTime));
}
}

Reply via email to