@James,
I will enable the monitoring metrics and try out. But I seriously think the
performance
 degradation was due to rocks DB. As soon as I switched to in Memory State
Store all my problems disappeared.

@Sabarish,

 Only my kafka instances are on T2 Instance. My application instances are
on M4 large instance. Thank you very much for pointing out your experience
with rocksDB. I switched to InMemory State Store after your suggestion and
all my problems disappeared. Maybe if I switch to  SSD storage , I  might
get  able to get better performance from Rocks DB , Currently it was
running on EBS. But i don't think the current level of performance The
current experience with ROCKS DB was terrible it was degrading over the
period of time. If you haven't pointer out your experience with Rocks DB I
wouldn't have tried it out.





On Sun, Sep 17, 2017 at 3:25 PM, Sabarish Sasidharan <sabarish....@gmail.com
> wrote:

> T2 instances are credit based instances that do not provide constant
> throughout in cpu and io. You would want to reconsider using t2.
>
> That said we too face issues with scaling rocksdb. Although I don't
> remember the performance benchmark numbers but I do remember it was at
> least 3x slower when compared to hashmap storage. Every lookup, every write
> contributed to the degradation. Changelogging had negligible overhead btw.
>
> Regards
> Sab
>
> On 17 Sep 2017 8:52 am, "dev loper" <spark...@gmail.com> wrote:
>
> > Hi Bill.
> >
> > Thank you pointing out, But in actual code I am calling iter.close() in
> the
> > finally block if the iterator is not null. I don't see any issues when I
> am
> > running it on light traffic. As soon as I switch to production traffic I
> > start seeing these issues.
> >
> > Below I have provided additional details about our current application.
> If
> > you are looking for specific logs or details , please let me know. I will
> > get the details captured.
> >
> > In production environment I am receiving 10,000 messages per second.
> There
> > are 36 partitions  for the topic and there are around 2500 unique
> entities
> > per partition for which I have to maintain the state.
> >
> > Below I have mentioned the hardware configuration and number of instances
> > we are using for this solution. Please let me know if hardware is the
> > limiting factor here. We didn't go for higher configuration since the
> load
> > average on these instances were quite low and I could hardly see any CPU
> > spikes .
> >
> >
> > Kafka Machine Machine Details: - 2 Broker Instances with below
> > Configuration ,  (Current CPU Usage 2%- 8%)
> >
> >  Instance Type : AWS T2 Large
> >   Machine Configuration : 2 VCPU;s, 8gb Ram, Storage : EBS
> >
> > Kafka Streams Instance : 3 Kafka Streams  Application Instances (Current
> > CPU Usage 8%- 24%)
> >
> >     Instance Type : AWS M4 Large
> >     Machine Configuration : 2 VCPU;s, 8gb Ram, Storage : EBS (Dedicated
> EBS
> > bandwidth 450 mbps)
> >
> >
> >
> > On Sat, Sep 16, 2017 at 10:05 PM, Bill Bejeck <b...@confluent.io> wrote:
> >
> > > Hi,
> > >
> > > It's hard to say exactly without a little more information.
> > >
> > > On a side note, I don't see where you are closing the KeyValueIterator
> in
> > > the code above. Not closing a KeyValueIterator on a Permanent State
> Store
> > > can cause a resource leak over time, so I'd add `iter.close()` right
> > before
> > > your `logger.info` call.  It might be worth retrying at that point.
> > >
> > > Thanks,
> > > Bill
> > >
> > > On Sat, Sep 16, 2017 at 9:14 AM, dev loper <spark...@gmail.com> wrote:
> > >
> > > > Hi Kafka Streams Users,
> > > >
> > > > I am trying to improve the performance of Kafka Streams State Store
> > > > Persistent Store. In our application we are using Kafka Streams
> > Processor
> > > > API  and using Persistent State Store.. My application when starts up
> > it
> > > > performing well but over a period of time the performance
> > deteriorated. I
> > > > am computing certain results in computeAnalytics method and this
> method
> > > is
> > > > not taking time at all. This method is being called within both
> process
> > > and
> > > > punctuate and I am storing the updated object back to store. Over the
> > > > period of time its taking huge time for completing the punctuate
> > process
> > > > and I could see majority of the time is spent in storing the records
> > and
> > > > Iterating the records. The record size is just 2500 per partition. I
> am
> > > not
> > > > where I am going wrong and how can I improve the performance.
> > > >
> > > > Below is one such sample log record.
> > > >
> > > > INFO  | 07:59:58 | processors.MyProcessor (MyProcessor.java:123) -
> Time
> > > > Metrics for punctuate  for TimeStamp :: 1505564655878 processed
> Records
> > > ::
> > > > 2109 totalTimeTakenToProcessRecords :: 2 totalTimeTakenToStoreRecord
> ::
> > > > 27605toal time Taken to retrieve Records :: 12787 Total time Taken ::
> > > 40394
> > > >
> > > > Below I have given my pseudo code for my processor which exactly
> > > resembles
> > > > the code which I am using in my application.
> > > >
> > > > MyProcessor(){
> > > >
> > > >   process(Key objectkey, Update eventupdate){
> > > >    long timestamp=context.timestamp();
> > > >    AnalyticeObj storeobj=store.get(objectkey);
> > > >
> > > >    if( storeobj ===null)
> > > >          {
> > > >           storeobj=new  AnalyticeObj(objectkey,
> eventupdate,timestamp)
> > > >          }
> > > >          else
> > > >         {
> > > >            storeobj.update(eventupdate,timestamp)
> > > >         }
> > > >      storeobj=storeobj.computeAnalytics();
> > > >
> > > >    store.put(objectkey,storeobj);
> > > >   context.commit();
> > > > }
> > > > // Every 5 seconds
> > > > punctuate(long timestamp)
> > > > {
> > > >  long startTime = System.currentTimeMillis();
> > > > long totalTimeTakenToProcessRecords=0;
> > > > long totalTimeTakenToStoreRecords=0;
> > > > long counter=0;
> > > > KeyValueIterator iter=this.visitStore.all();
> > > > while (iter.hasNext()) {
> > > > KeyValue<Key, AnalyticeObj> entry = iter.next();
> > > >
> > > >     if(AnalyticeObj.hasExpired(timestamp)
> > > >          store.remove(entry.key)
> > > >       else
> > > >       {
> > > >         long processStartTime=System.currentTimeMillis();
> > > >          AnalyticeObj storeobj= entry.value.computeAnalytics(
> > timestamp);
> > > >
> > > > totalTimeTakenToProcessRecords=totalTimeTakenToProcessRecords
> > > > +(System.currentTimeMillis()-processStartTime);
> > > >
> > > >          long storeStartTime=System.currentTimeMillis();
> > > >           store.put(entry.key,storeobj);
> > > >
> > > > totalTimeTakenToStoreRecords=totalTimeTakenToStoreRecords+(
> > > > System.currentTimeMillis()-storeStartTime);
> > > >        }
> > > >    counter++;
> > > > }
> > > >      logger.info(" Time Metrics for punctuate  "
> > > >                     " for TimeStamp :: " + "" + timestamp + "
> processed
> > > > Records :: "
> > > >                     + counter +" totalTimeTakenToProcessRecords ::
> > > > "+totalTimeTakenToProcessRecords +" totalTimeTakenToStoreRecord ::
> > > > "+totalTimeTakenToStoreRecords
> > > >                     +"toal time Taken to retrieve Records :: "+
> > > > (System.currentTimeMillis() -
> > > > (startTime+totalTimeTakenToProcessRecords
> > +totalTimeTakenToStoreRecords)
> > > )+"
> > > > Total time Taken :: " + (System.currentTimeMillis() - startTime));
> > > > }
> > > > }
> > > >
> > >
> >
>

Reply via email to