Hi Bill. Thank you pointing out, But in actual code I am calling iter.close() in the finally block if the iterator is not null. I don't see any issues when I am running it on light traffic. As soon as I switch to production traffic I start seeing these issues.
Below I have provided additional details about our current application. If you are looking for specific logs or details , please let me know. I will get the details captured. In production environment I am receiving 10,000 messages per second. There are 36 partitions for the topic and there are around 2500 unique entities per partition for which I have to maintain the state. Below I have mentioned the hardware configuration and number of instances we are using for this solution. Please let me know if hardware is the limiting factor here. We didn't go for higher configuration since the load average on these instances were quite low and I could hardly see any CPU spikes . Kafka Machine Machine Details: - 2 Broker Instances with below Configuration , (Current CPU Usage 2%- 8%) Instance Type : AWS T2 Large Machine Configuration : 2 VCPU;s, 8gb Ram, Storage : EBS Kafka Streams Instance : 3 Kafka Streams Application Instances (Current CPU Usage 8%- 24%) Instance Type : AWS M4 Large Machine Configuration : 2 VCPU;s, 8gb Ram, Storage : EBS (Dedicated EBS bandwidth 450 mbps) On Sat, Sep 16, 2017 at 10:05 PM, Bill Bejeck <b...@confluent.io> wrote: > Hi, > > It's hard to say exactly without a little more information. > > On a side note, I don't see where you are closing the KeyValueIterator in > the code above. Not closing a KeyValueIterator on a Permanent State Store > can cause a resource leak over time, so I'd add `iter.close()` right before > your `logger.info` call. It might be worth retrying at that point. > > Thanks, > Bill > > On Sat, Sep 16, 2017 at 9:14 AM, dev loper <spark...@gmail.com> wrote: > > > Hi Kafka Streams Users, > > > > I am trying to improve the performance of Kafka Streams State Store > > Persistent Store. In our application we are using Kafka Streams Processor > > API and using Persistent State Store.. My application when starts up it > > performing well but over a period of time the performance deteriorated. I > > am computing certain results in computeAnalytics method and this method > is > > not taking time at all. This method is being called within both process > and > > punctuate and I am storing the updated object back to store. Over the > > period of time its taking huge time for completing the punctuate process > > and I could see majority of the time is spent in storing the records and > > Iterating the records. The record size is just 2500 per partition. I am > not > > where I am going wrong and how can I improve the performance. > > > > Below is one such sample log record. > > > > INFO | 07:59:58 | processors.MyProcessor (MyProcessor.java:123) - Time > > Metrics for punctuate for TimeStamp :: 1505564655878 processed Records > :: > > 2109 totalTimeTakenToProcessRecords :: 2 totalTimeTakenToStoreRecord :: > > 27605toal time Taken to retrieve Records :: 12787 Total time Taken :: > 40394 > > > > Below I have given my pseudo code for my processor which exactly > resembles > > the code which I am using in my application. > > > > MyProcessor(){ > > > > process(Key objectkey, Update eventupdate){ > > long timestamp=context.timestamp(); > > AnalyticeObj storeobj=store.get(objectkey); > > > > if( storeobj ===null) > > { > > storeobj=new AnalyticeObj(objectkey,eventupdate,timestamp) > > } > > else > > { > > storeobj.update(eventupdate,timestamp) > > } > > storeobj=storeobj.computeAnalytics(); > > > > store.put(objectkey,storeobj); > > context.commit(); > > } > > // Every 5 seconds > > punctuate(long timestamp) > > { > > long startTime = System.currentTimeMillis(); > > long totalTimeTakenToProcessRecords=0; > > long totalTimeTakenToStoreRecords=0; > > long counter=0; > > KeyValueIterator iter=this.visitStore.all(); > > while (iter.hasNext()) { > > KeyValue<Key, AnalyticeObj> entry = iter.next(); > > > > if(AnalyticeObj.hasExpired(timestamp) > > store.remove(entry.key) > > else > > { > > long processStartTime=System.currentTimeMillis(); > > AnalyticeObj storeobj= entry.value.computeAnalytics(timestamp); > > > > totalTimeTakenToProcessRecords=totalTimeTakenToProcessRecords > > +(System.currentTimeMillis()-processStartTime); > > > > long storeStartTime=System.currentTimeMillis(); > > store.put(entry.key,storeobj); > > > > totalTimeTakenToStoreRecords=totalTimeTakenToStoreRecords+( > > System.currentTimeMillis()-storeStartTime); > > } > > counter++; > > } > > logger.info(" Time Metrics for punctuate " > > " for TimeStamp :: " + "" + timestamp + " processed > > Records :: " > > + counter +" totalTimeTakenToProcessRecords :: > > "+totalTimeTakenToProcessRecords +" totalTimeTakenToStoreRecord :: > > "+totalTimeTakenToStoreRecords > > +"toal time Taken to retrieve Records :: "+ > > (System.currentTimeMillis() - > > (startTime+totalTimeTakenToProcessRecords+totalTimeTakenToStoreRecords) > )+" > > Total time Taken :: " + (System.currentTimeMillis() - startTime)); > > } > > } > > >