Hi,
I am now starting with rocksdb monitoring and comparing 2 identical streams
with 2 identical input topics.

Case1
Single partition topic with single thread streams apps running 0.10.1.1
This case is pretty stable with hardly any cpu wait time noticed.

Case 2
12 partition topic with 3 instances running on separate machines using
0.10.2.0.rc0 and each app having 4 threads so ideal state is 1 partition 1
thread.
This case is very unstable with frequent re-balance and dying of threads.
CPU wait time is very high and many times grater than 50%

kafka Cluster is same and of version 0.10.1.1

I had few questions
1. What attributes of rocksdb-window metrices should I usually monitor and
what would be their ideal values.


2. Also I tried accessing some attributes like put, fetch, get and flush in
both 0.10.1.1 instance and 0.10.2.0
In 0.10.1.1 I got some values which I understand are values in nano seconds.
In 0.10.2.0 value of all these attributes I queried were 0.

What would this mean?

Thanks
Sachin


On Fri, Feb 10, 2017 at 12:33 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Sachin,
>
> Thanks for sharing your observations, that are very helpful.
>
> Regards to monitoring, there are indeed metrics inside the Streams library
> to meter state store operations; for rocksdb it records the average latency
> and callrate for put / get / delete / all / range and flush / restore. You
> can find the corresponding metric names in the monitoring section on web
> docs once 0.10.2 is out:
>
> https://kafka.apache.org/documentation/#monitoring
>
> Or you can just grep all metric names and see which ones are of interests
> to you.
>
>
> Regards to the observations, I'm particularly interested in the fact that
> "memory utilization keeps increasing". But since rocksdb uses both on-heap
> and off heap memory it is hard to tell if there is really a memory leak.
> Could you share your usage of the state stores, like are they used for
> aggregations / joins / else, and did you override any of the rocksdb config
> settings?
>
>
> Guozhang
>
>
> On Thu, Feb 9, 2017 at 8:58 PM, Sachin Mittal <sjmit...@gmail.com> wrote:
>
> > Hi,
> > We recently upgraded to 0.10.2.0-rc0, the rocksdb issue seems to be
> > resolved however we are just not able to get the streams going under our
> > current scenario.
> > The issue seems to be still related to rocksdb.
> > Let me explain the situation:
> >
> > We have 40 partitions and 12 threads distributes across three machines
> > (each having 4).
> > Roughly 15 partition gets assigned to each machine. Thus we have 15 state
> > stores on each machine.
> >
> > What we see is that streams frequently gets rebalanced due to
> > CommitFailedException and as you have said this should not happen once
> > application reaches steady state.
> > We are running an instance on 4 core linux box (virtual machine). What we
> > observe is that there is lot of waiting time for each core consistently
> > greater than 50%. What we suspect is that application is spending lot of
> > time on disk I/O this CPU keeps waiting. Also we observe that rate of
> > consumption from source topic falls over time. Also overtime we see that
> > CPU utilization falls and memory utilization increases.
> >
> > Then we we did was used 12 partition for same topic and 12 threads, so
> each
> > thread processes from single partition and 4 state stores get created per
> > machine. (note streams application code is identical to above
> > configuration).
> > Here what we have observed so far is a stable configuration. After a
> stead
> > state no rebalance has same thread continues processing from same
> > partition.
> > Also we see that per core wait time fairly stable around 10%.
> >
> > Finally we have another configuration where a same single thread streams
> > application reading from identical single partition topic. Here also
> > application never fails. This is most consistent configuration running
> from
> > months now. The per core wait time is usually 0 - 3%.
> >
> > So over these experiments we feel that it is rocksdb that may be causing
> > the state of streams application deteriorate over time. Is there any
> rocks
> > db metrics/logs we collect that we can watch and infer anything.
> > I doubt there is any memory leak (as some have reported) because single
> > thread application seems to be running forever.
> > However is there any tuning that anyone can recommend which will make
> > rocksdb more optimized.
> >
> > Has anyone else facing similar issues when there are many rocksdb local
> > state store on a machine then application is not performing as well as
> > single state store.
> >
> > Thanks
> > Sachin
> >
> >
> >
> >
> >
> > On Tue, Feb 7, 2017 at 3:21 PM, Damian Guy <damian....@gmail.com> wrote:
> >
> > > Hi Sachin,
> > >
> > > Sorry i misunderstood what you had said. You are running 3 instances,
> one
> > > per machine? I thought you said you were running 3 instances on each
> > > machine.
> > >
> > > Regarding partitions: you are better off having more partitions as this
> > > effects the maximum degree of parallelism you can achieve in the app.
> If
> > > you only have 12 partitions then you can only have at most 12 threads
> in
> > > total. If you have 40 partitions then you can have up to 40 threads and
> > so
> > > on. It won't help with rebalancing anyway.
> > >
> > > You are correct, once all the instances are up and running then
> > rebalances
> > > generally shouldn't happen unless there is a failure of some kind. To
> > > maintain membership of the consumer group the consumer used by streams
> > > needs to poll at least every max.poll.interval.ms - the default for
> this
> > > is
> > > 5 minutes. If for any reason, the processing of records returned from a
> > > call to poll takes longer than  max.poll.interval.ms then the consumer
> > > will
> > > be kicked out of the group and a rebalance will happen. If you are
> seeing
> > > rebalances occurring after everything has been up and running and
> > reached a
> > > steady state, you might want to increase the value of
> > max.poll.interval.ms.
> > > You
> > > can also adjust  max.poll.records to decrease the maximum number of
> > records
> > > retrieved by a single call to poll - this will help to reduce the
> > > processing time.
> > >
> > > Thanks,
> > > Damian
> > >
> > > On Tue, 7 Feb 2017 at 07:24 Sachin Mittal <sjmit...@gmail.com> wrote:
> > >
> > > > Hi,
> > > > Everything is understood and I will try out 0.10.2.0-rc0 shortly.
> > > >
> > > > However one this is not clear:
> > > > Firstly i'd recommend you have different state directory configs for
> > each
> > > > application instance.
> > > >
> > > > Well I am running three separate instance of 4 threads each on three
> > > > different machines.
> > > > So each machine has its own physical structure though path to the
> > states
> > > > dir is same for each, because that is the relative path where we have
> > > > mounted the data directory separately for each of these three
> machine.
> > > >
> > > > So my streams state.dir setting is identical for all the instances,
> but
> > > > physically there are located at different locations.
> > > > So why do I need to have different config for each is not clear.
> > > >
> > > > I will also test with CLEANUP_DELAY_MS_CONFIG to be 30 minutes.
> > > >
> > > > Also one thing I wanted to know if I make partitions equal to total
> > > streams
> > > > threads which is 12, will that help in one thread always reading
> from a
> > > > single partition, and never a need to re-balance.
> > > >
> > > > I however don't understand one thing that once a steady state has
> > reached
> > > > and all threads have picked up their partitions then why there is
> ever
> > a
> > > > need to do future re-balance, unless untill a thread dies.
> > > >
> > > > Like this is not clear that how often is re-balance is triggered and
> is
> > > > there a way we can control it.
> > > >
> > > > Thanks
> > > > Sachin
> > > >
> > > >
> > > >
> > > > On Mon, Feb 6, 2017 at 10:10 PM, Damian Guy <damian....@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Sachin,
> > > > >
> > > > > Firstly i'd recommend you have different state directory configs
> for
> > > each
> > > > > application instance. I suspect you are potentially hitting an
> issue
> > > > where
> > > > > the partition assignment has changed, the state directory locks get
> > > > > released, and i directory gets removed just before the lock is
> taken
> > > out
> > > > by
> > > > > another thread or process. Though it is pretty hard to tell from
> the
> > > > above
> > > > > logs. You might also want to set StreamsConfig.CLEANUP_DELAY_
> > MS_CONFIG
> > > to
> > > > > a
> > > > > higher value than the default of 60 seconds. There is no reason why
> > it
> > > > > couldn't be much higher, 30 minutes, maybe more, depends on how
> badly
> > > you
> > > > > need old task data cleaned up. Setting this value higher will
> reduce
> > > the
> > > > > likelihood of the exception occurring.
> > > > >
> > > > > 0.10.2 will be out shortly. It would be good if you can try that.
> You
> > > can
> > > > > find the current RC here: http://home.apache.org/~
> > > > > ewencp/kafka-0.10.2.0-rc0/
> > > > > You don't need to upgrade your kafka brokers, just the streams
> > client.
> > > > >
> > > > > Thanks,
> > > > > Damian
> > > > >
> > > > > On Mon, 6 Feb 2017 at 10:53 Sachin Mittal <sjmit...@gmail.com>
> > wrote:
> > > > >
> > > > > > Hi,
> > > > > > Yes on first we have three machines with same data directory
> > setting.
> > > > > > So the state dir config is same in for each.
> > > > > >
> > > > > > If it helps this is the sequence of logs just before the thread
> > > > shutting
> > > > > > down
> > > > > >
> > > > > > ....
> > > > > >
> > > > > > stream-thread [StreamThread-3] Committing all tasks because the
> > > commit
> > > > > > interval 30000ms has elapsed
> > > > > > stream-thread [StreamThread-3] Committing task 0_35
> > > > > > stream-thread [StreamThread-3] Committing task 0_38
> > > > > > stream-thread [StreamThread-3] Committing task 0_27
> > > > > > stream-thread [StreamThread-3] Committing task 0_12
> > > > > >
> > > > > > stream-thread [StreamThread-3] Committing all tasks because the
> > > commit
> > > > > > interval 30000ms has elapsed
> > > > > > stream-thread [StreamThread-3] Committing task 0_35
> > > > > > stream-thread [StreamThread-3] Committing task 0_38
> > > > > > stream-thread [StreamThread-3] Committing task 0_27
> > > > > > stream-thread [StreamThread-3] Committing task 0_12
> > > > > >
> > > > > > stream-thread [StreamThread-3] Committing all tasks because the
> > > commit
> > > > > > interval 30000ms has elapsed
> > > > > > stream-thread [StreamThread-3] Committing task 0_35
> > > > > > stream-thread [StreamThread-3] Committing task 0_38
> > > > > > stream-thread [StreamThread-3] Failed to commit StreamTask 0_38
> > > state:
> > > > > > org.apache.kafka.streams.errors.ProcessorStateException: task
> > [0_38]
> > > > > Failed
> > > > > > to flush state store key-table
> > > > > > Caused by: org.rocksdb.RocksDBException: IO error:
> > > > > >
> > > > > > /data/advice/kafka-streams/new-part-advice/0_38/key-
> > > > > table/key-table-201702052100/000009.sst:
> > > > > > No such file or directory
> > > > > > stream-thread [StreamThread-3] Shutting down
> > > > > > stream-thread [StreamThread-3] Committing consumer offsets of
> task
> > > 0_35
> > > > > > stream-thread [StreamThread-3] Committing consumer offsets of
> task
> > > 0_38
> > > > > > stream-thread [StreamThread-3] Committing consumer offsets of
> task
> > > 0_27
> > > > > > stream-thread [StreamThread-3] Committing consumer offsets of
> task
> > > 0_12
> > > > > > stream-thread [StreamThread-3] Closing a task 0_35
> > > > > > stream-thread [StreamThread-3] Closing a task 0_38
> > > > > > stream-thread [StreamThread-3] Closing a task 0_27
> > > > > > stream-thread [StreamThread-3] Closing a task 0_12
> > > > > > stream-thread [StreamThread-3] Flushing state stores of task 0_35
> > > > > > stream-thread [StreamThread-3] Flushing state stores of task 0_38
> > > > > > stream-thread [StreamThread-3] Failed while executing StreamTask
> > 0_38
> > > > > duet
> > > > > > to flush state
> > > > > > org.apache.kafka.streams.errors.ProcessorStateException: task
> > [0_38]
> > > > > Failed
> > > > > > to flush state store key-table
> > > > > > Caused by: org.rocksdb.RocksDBException: IO error:
> > > > > >
> > > > > > /data/advice/kafka-streams/new-part-advice/0_38/key-
> > > > > table/key-table-201702052100/000009.sst:
> > > > > > No such file or directory
> > > > > > stream-thread [StreamThread-3] Flushing state stores of task 0_27
> > > > > > stream-thread [StreamThread-3] Flushing state stores of task 0_12
> > > > > > stream-thread [StreamThread-3] Closing the state manager of task
> > 0_35
> > > > > > stream-thread [StreamThread-3] Closing the state manager of task
> > 0_38
> > > > > > stream-thread [StreamThread-3] Failed while executing StreamTask
> > 0_38
> > > > > duet
> > > > > > to close state manager:
> > > > > > org.apache.kafka.streams.errors.ProcessorStateException: task
> > [0_38]
> > > > > Failed
> > > > > > to close state store key-table
> > > > > > Caused by: org.rocksdb.RocksDBException: IO error:
> > > > > >
> > > > > > /data/advice/kafka-streams/new-part-advice/0_38/key-
> > > > > table/key-table-201702052100/000009.sst:
> > > > > > No such file or directory
> > > > > > stream-thread [StreamThread-3] Closing the state manager of task
> > 0_27
> > > > > > stream-thread [StreamThread-3] Closing the state manager of task
> > 0_12
> > > > > > Closing the Kafka producer with timeoutMillis =
> 9223372036854775807
> > > ms.
> > > > > > stream-thread [StreamThread-3] Removing all active tasks [[0_35,
> > > 0_38,
> > > > > > 0_27, 0_12]]
> > > > > > stream-thread [StreamThread-3] Removing all standby tasks [[]]
> > > > > > stream-thread [StreamThread-3] Stream thread shutdown complete
> > > > > >
> > > > > > It was working for iterations before that and then suddenly that
> > > > dir/file
> > > > > > was gone and it could not commit/flush/close the state.
> > > > > >
> > > > > >
> > > > > > For second, what do you recommend. Is there something we can
> patch
> > > from
> > > > > > 10.2 into 10.1.
> > > > > > Certain commits or something.
> > > > > >
> > > > > > Or do we again need to upgrade the cluster or 10.2, or if we just
> > > > upgrade
> > > > > > streams client to 10.2 will it work fine?
> > > > > > Since 10.2 is not released yet I suppose we would have build the
> > > > snapshot
> > > > > > version.
> > > > > >
> > > > > > Thanks
> > > > > > Sachin
> > > > > >
> > > > > >
> > > > > > On Mon, Feb 6, 2017 at 3:58 PM, Damian Guy <damian....@gmail.com
> >
> > > > wrote:
> > > > > >
> > > > > > > Hi Sachin,
> > > > > > >
> > > > > > > The first exception - Is each instance of your streams app on a
> > > > single
> > > > > > > machine running with the same state directory config?
> > > > > > > The second exception - i believe is a bug in 0.10.1 that has
> been
> > > > fixed
> > > > > > in
> > > > > > > 0.10.2. There has been a number of issues fixed in this area.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Damian
> > > > > > >
> > > > > > > On Mon, 6 Feb 2017 at 05:43 Sachin Mittal <sjmit...@gmail.com>
> > > > wrote:
> > > > > > >
> > > > > > > > Hello All,
> > > > > > > > We recently upgraded to  kafka_2.10-0.10.1.1.
> > > > > > > >
> > > > > > > > We have a source topic with replication = 3 and partition =
> 40.
> > > > > > > > We have a streams application run with
> > NUM_STREAM_THREADS_CONFIG
> > > =
> > > > 4
> > > > > > and
> > > > > > > on
> > > > > > > > three machines. So 12 threads in total.
> > > > > > > >
> > > > > > > > What we do is start the same streams application one by one
> on
> > > > three
> > > > > > > > machines.
> > > > > > > >
> > > > > > > > After some time what we noticed was that one of the machine
> > > streams
> > > > > > > > application just crashed. When we inspected the log here is
> > what
> > > we
> > > > > > > found.
> > > > > > > > There were 2 sets of errors like:
> > > > > > > >
> > > > > > > > stream-thread [StreamThread-3] Failed to commit StreamTask
> 0_38
> > > > > state:
> > > > > > > > org.apache.kafka.streams.errors.ProcessorStateException:
> task
> > > > [0_38]
> > > > > > > Failed
> > > > > > > > to flush state store key-table
> > > > > > > >     at
> > > > > > > >
> > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > > ProcessorStateManager.flush(ProcessorStateManager.java:331)
> > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > >     at
> > > > > > > >
> > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > > StreamTask.commit(StreamTask.java:267)
> > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > >     at
> > > > > > > >
> > > > > > > >
> > > > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
> > > > > > > StreamThread.java:576)
> > > > > > > > [kafka-streams-0.10.1.1.jar:na]
> > > > > > > >     at
> > > > > > > >
> > > > > > > >
> > > > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(
> > > > > > > StreamThread.java:562)
> > > > > > > > [kafka-streams-0.10.1.1.jar:na]
> > > > > > > >     at
> > > > > > > >
> > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > StreamThread.maybeCommit(
> > > > > > > StreamThread.java:538)
> > > > > > > > [kafka-streams-0.10.1.1.jar:na]
> > > > > > > >     at
> > > > > > > >
> > > > > > > > org.apache.kafka.streams.processor.internals.
> > > StreamThread.runLoop(
> > > > > > > StreamThread.java:456)
> > > > > > > > [kafka-streams-0.10.1.1.jar:na]
> > > > > > > >     at
> > > > > > > >
> > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > > StreamThread.run(StreamThread.java:242)
> > > > > > > > [kafka-streams-0.10.1.1.jar:na]
> > > > > > > > Caused by: org.apache.kafka.streams.errors.
> > > ProcessorStateException:
> > > > > > > Error
> > > > > > > > while executing flush from store key-table-201702052100
> > > > > > > >     at
> > > > > > > >
> > > > > > > > org.apache.kafka.streams.state.internals.RocksDBStore.
> > > > > > > flushInternal(RocksDBStore.java:375)
> > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > >     at
> > > > > > > >
> > > > > > > > org.apache.kafka.streams.state.internals.RocksDBStore.
> > > > > > > flush(RocksDBStore.java:366)
> > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > >     at
> > > > > > > >
> > > > > > > > org.apache.kafka.streams.state.internals.
> > > RocksDBWindowStore.flush(
> > > > > > > RocksDBWindowStore.java:256)
> > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > >     at
> > > > > > > >
> > > > > > > > org.apache.kafka.streams.state.internals.
> > > MeteredWindowStore.flush(
> > > > > > > MeteredWindowStore.java:116)
> > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > >     at
> > > > > > > >
> > > > > > > > org.apache.kafka.streams.state.internals.
> > > CachingWindowStore.flush(
> > > > > > > CachingWindowStore.java:119)
> > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > >     at
> > > > > > > >
> > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > > ProcessorStateManager.flush(ProcessorStateManager.java:329)
> > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > >     ... 6 common frames omitted
> > > > > > > > Caused by: org.rocksdb.RocksDBException: IO error:
> > > > > > > >
> > > > > > > > /data/kafka-streams/new-part/0_38/key-table/key-table-
> > > > > > > 201702052100/000009.sst:
> > > > > > > > No such file or directory
> > > > > > > >     at org.rocksdb.RocksDB.flush(Native Method)
> > > > > > > ~[rocksdbjni-4.9.0.jar:na]
> > > > > > > >     at org.rocksdb.RocksDB.flush(RocksDB.java:1360)
> > > > > > > > ~[rocksdbjni-4.9.0.jar:na]
> > > > > > > >     at
> > > > > > > >
> > > > > > > > org.apache.kafka.streams.state.internals.RocksDBStore.
> > > > > > > flushInternal(RocksDBStore.java:373)
> > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > >     ... 11 common frames omitted
> > > > > > > >
> > > > > > > > When we queried the directory
> > > > > > > > /data/kafka-streams/new-part/0_38/key-table/key-table-
> > > 201702052100/
> > > > > we
> > > > > > > > could not find the directory.
> > > > > > > > So looks like this directory was earlier deleted by some task
> > and
> > > > now
> > > > > > > some
> > > > > > > > other task is trying to flush it too.
> > > > > > > > What could be possible the reason for the same?
> > > > > > > >
> > > > > > > > Another set of error we see is this:
> > > > > > > > org.apache.kafka.streams.errors.StreamsException:
> > stream-thread
> > > > > > > > [StreamThread-1] Failed to rebalance
> > > > > > > >     at
> > > > > > > >
> > > > > > > > org.apache.kafka.streams.processor.internals.
> > > StreamThread.runLoop(
> > > > > > > StreamThread.java:410)
> > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > >     at
> > > > > > > >
> > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > > StreamThread.run(StreamThread.java:242)
> > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > Caused by: org.apache.kafka.streams.errors.
> > > ProcessorStateException:
> > > > > > > Error
> > > > > > > > opening store key-table-201702052000 at location
> > > > > > > > /data/kafka-streams/new-part/0_38/key-table/key-table-
> > > 201702052000
> > > > > > > >     at
> > > > > > > >
> > > > > > > > org.apache.kafka.streams.state.internals.RocksDBStore.
> > > > > > > openDB(RocksDBStore.java:190)
> > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > >     at
> > > > > > > >
> > > > > > > > org.apache.kafka.streams.state.internals.RocksDBStore.
> > > > > > > openDB(RocksDBStore.java:159)
> > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > >     at
> > > > > > > >
> > > > > > > >
> > > > org.apache.kafka.streams.state.internals.RocksDBWindowStore$Segment.
> > > > > > > openDB(RocksDBWindowStore.java:72)
> > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > >     at
> > > > > > > >
> > > > > > > > org.apache.kafka.streams.state.internals.RocksDBWindowStore.
> > > > > > > getOrCreateSegment(RocksDBWindowStore.java:388)
> > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > >     at
> > > > > > > >
> > > > > > > >
> > > > > >
> > > > org.apache.kafka.streams.state.internals.RocksDBWindowStore.
> > putInternal(
> > > > > > > RocksDBWindowStore.java:319)
> > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > >     at
> > > > > > > >
> > > > > > > > org.apache.kafka.streams.state.internals.
> > > > > RocksDBWindowStore.access$000(
> > > > > > > RocksDBWindowStore.java:51)
> > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > >     at
> > > > > > > >
> > > > > > > > org.apache.kafka.streams.state.internals.
> > > > > RocksDBWindowStore$1.restore(
> > > > > > > RocksDBWindowStore.java:206)
> > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > >     at
> > > > > > > >
> > > > > > > > org.apache.kafka.streams.processor.internals.
> > > ProcessorStateManager.
> > > > > > > restoreActiveState(ProcessorStateManager.java:235)
> > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > >     at
> > > > > > > >
> > > > > > > > org.apache.kafka.streams.processor.internals.
> > > ProcessorStateManager.
> > > > > > > register(ProcessorStateManager.java:198)
> > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > >     at
> > > > > > > >
> > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > > ProcessorContextImpl.register(ProcessorContextImpl.java:123)
> > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > >     at
> > > > > > > >
> > > > > > > > org.apache.kafka.streams.state.internals.
> > > RocksDBWindowStore.init(
> > > > > > > RocksDBWindowStore.java:200)
> > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > >     at
> > > > > > > >
> > > > > > > > org.apache.kafka.streams.state.internals.
> > > MeteredWindowStore.init(
> > > > > > > MeteredWindowStore.java:66)
> > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > >     at
> > > > > > > >
> > > > > > > > org.apache.kafka.streams.state.internals.
> > > CachingWindowStore.init(
> > > > > > > CachingWindowStore.java:64)
> > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > >     at
> > > > > > > >
> > > > > > > > org.apache.kafka.streams.processor.internals.AbstractTask.
> > > > > > > initializeStateStores(AbstractTask.java:81)
> > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > >     at
> > > > > > > >
> > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > > StreamTask.<init>(StreamTask.java:119)
> > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > >     at
> > > > > > > >
> > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > > StreamThread.createStreamTask(StreamThread.java:633)
> > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > >     at
> > > > > > > >
> > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > > StreamThread.addStreamTasks(StreamThread.java:660)
> > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > >     at
> > > > > > > >
> > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > StreamThread.access$100(
> > > > > > > StreamThread.java:69)
> > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > >     at
> > > > > > > >
> > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread$1.
> > > > > > > onPartitionsAssigned(StreamThread.java:124)
> > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > >     at
> > > > > > > >
> > > > > > > > org.apache.kafka.clients.consumer.internals.
> > ConsumerCoordinator.
> > > > > > > onJoinComplete(ConsumerCoordinator.java:228)
> > > > > > > > ~[kafka-clients-0.10.1.1.jar:na]
> > > > > > > >     at
> > > > > > > >
> > > > > > > > org.apache.kafka.clients.consumer.internals.
> > AbstractCoordinator.
> > > > > > > joinGroupIfNeeded(AbstractCoordinator.java:313)
> > > > > > > > ~[kafka-clients-0.10.1.1.jar:na]
> > > > > > > >     at
> > > > > > > >
> > > > > > > > org.apache.kafka.clients.consumer.internals.
> > AbstractCoordinator.
> > > > > > > ensureActiveGroup(AbstractCoordinator.java:277)
> > > > > > > > ~[kafka-clients-0.10.1.1.jar:na]
> > > > > > > >     at
> > > > > > > >
> > > > > > > > org.apache.kafka.clients.consumer.internals.
> > > > > ConsumerCoordinator.poll(
> > > > > > > ConsumerCoordinator.java:259)
> > > > > > > > ~[kafka-clients-0.10.1.1.jar:na]
> > > > > > > >     at
> > > > > > > >
> > > > > > > > org.apache.kafka.clients.consumer.KafkaConsumer.
> > > > > > > pollOnce(KafkaConsumer.java:1013)
> > > > > > > > ~[kafka-clients-0.10.1.1.jar:na]
> > > > > > > >     at
> > > > > > > >
> > > > > > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > > > > > > KafkaConsumer.java:979)
> > > > > > > > ~[kafka-clients-0.10.1.1.jar:na]
> > > > > > > >     at
> > > > > > > >
> > > > > > > > org.apache.kafka.streams.processor.internals.
> > > StreamThread.runLoop(
> > > > > > > StreamThread.java:407)
> > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > >     ... 1 common frames omitted
> > > > > > > > Caused by: org.rocksdb.RocksDBException: IO error: lock
> > > > > > > >
> > > > > >
> > > > /data/kafka-streams/new-part/0_38/key-table/key-table-
> > 201702052000/LOCK:
> > > > > > > No
> > > > > > > > locks available
> > > > > > > >     at org.rocksdb.RocksDB.open(Native Method)
> > > > > > > ~[rocksdbjni-4.9.0.jar:na]
> > > > > > > >     at org.rocksdb.RocksDB.open(RocksDB.java:184)
> > > > > > > > ~[rocksdbjni-4.9.0.jar:na]
> > > > > > > >     at
> > > > > > > >
> > > > > > > > org.apache.kafka.streams.state.internals.RocksDBStore.
> > > > > > > openDB(RocksDBStore.java:183)
> > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > >     ... 26 common frames omitted
> > > > > > > >
> > > > > > > > And then the whole application shuts down. We have not
> > understood
> > > > > this
> > > > > > > part
> > > > > > > > of the error and if this second error is somehow related to
> > first
> > > > > > error.
> > > > > > > >
> > > > > > > > Please let us know what could be the cause of these errors or
> > if
> > > > they
> > > > > > > have
> > > > > > > > been fixed and if there is some way to fix this in current
> > > release.
> > > > > > > >
> > > > > > > > Thanks
> > > > > > > > Sachin
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to