In addition to the measurements that you are doing yourself, Kafka Streams also 
has its own metrics. They are exposed via JMX, if you have that enabled:

http://kafka.apache.org/documentation/#monitoring 
<http://kafka.apache.org/documentation/#monitoring>

If you set metrics.recording.level="debug", you can see a bunch of metrics 
around the state stores. Stuff like put-latency-avg, for example.

See http://kafka.apache.org/documentation/#kafka_streams_store_monitoring 
<http://kafka.apache.org/documentation/#kafka_streams_store_monitoring>

-James

> On Sep 16, 2017, at 6:14 AM, dev loper <spark...@gmail.com> wrote:
> 
> Hi Kafka Streams Users,
> 
> I am trying to improve the performance of Kafka Streams State Store
> Persistent Store. In our application we are using Kafka Streams Processor
> API  and using Persistent State Store.. My application when starts up it
> performing well but over a period of time the performance deteriorated. I
> am computing certain results in computeAnalytics method and this method is
> not taking time at all. This method is being called within both process and
> punctuate and I am storing the updated object back to store. Over the
> period of time its taking huge time for completing the punctuate process
> and I could see majority of the time is spent in storing the records and
> Iterating the records. The record size is just 2500 per partition. I am not
> where I am going wrong and how can I improve the performance.
> 
> Below is one such sample log record.
> 
> INFO  | 07:59:58 | processors.MyProcessor (MyProcessor.java:123) - Time
> Metrics for punctuate  for TimeStamp :: 1505564655878 processed Records ::
> 2109 totalTimeTakenToProcessRecords :: 2 totalTimeTakenToStoreRecord ::
> 27605toal time Taken to retrieve Records :: 12787 Total time Taken :: 40394
> 
> Below I have given my pseudo code for my processor which exactly resembles
> the code which I am using in my application.
> 
> MyProcessor(){
> 
>  process(Key objectkey, Update eventupdate){
>   long timestamp=context.timestamp();
>   AnalyticeObj storeobj=store.get(objectkey);
> 
>   if( storeobj ===null)
>         {
>          storeobj=new  AnalyticeObj(objectkey,eventupdate,timestamp)
>         }
>         else
>        {
>           storeobj.update(eventupdate,timestamp)
>        }
>     storeobj=storeobj.computeAnalytics();
> 
>   store.put(objectkey,storeobj);
>  context.commit();
> }
> // Every 5 seconds
> punctuate(long timestamp)
> {
> long startTime = System.currentTimeMillis();
> long totalTimeTakenToProcessRecords=0;
> long totalTimeTakenToStoreRecords=0;
> long counter=0;
> KeyValueIterator iter=this.visitStore.all();
> while (iter.hasNext()) {
> KeyValue<Key, AnalyticeObj> entry = iter.next();
> 
>    if(AnalyticeObj.hasExpired(timestamp)
>         store.remove(entry.key)
>      else
>      {
>        long processStartTime=System.currentTimeMillis();
>         AnalyticeObj storeobj= entry.value.computeAnalytics(timestamp);
> 
> totalTimeTakenToProcessRecords=totalTimeTakenToProcessRecords+(System.currentTimeMillis()-processStartTime);
> 
>         long storeStartTime=System.currentTimeMillis();
>          store.put(entry.key,storeobj);
> 
> totalTimeTakenToStoreRecords=totalTimeTakenToStoreRecords+(System.currentTimeMillis()-storeStartTime);
>       }
>   counter++;
> }
>     logger.info(" Time Metrics for punctuate  "
>                    " for TimeStamp :: " + "" + timestamp + " processed
> Records :: "
>                    + counter +" totalTimeTakenToProcessRecords ::
> "+totalTimeTakenToProcessRecords +" totalTimeTakenToStoreRecord ::
> "+totalTimeTakenToStoreRecords
>                    +"toal time Taken to retrieve Records :: "+
> (System.currentTimeMillis() -
> (startTime+totalTimeTakenToProcessRecords+totalTimeTakenToStoreRecords))+"
> Total time Taken :: " + (System.currentTimeMillis() - startTime));
> }
> }

Reply via email to