Hi, I am now starting with rocksdb monitoring and comparing 2 identical streams with 2 identical input topics.
Case1 Single partition topic with single thread streams apps running 0.10.1.1 This case is pretty stable with hardly any cpu wait time noticed. Case 2 12 partition topic with 3 instances running on separate machines using 0.10.2.0.rc0 and each app having 4 threads so ideal state is 1 partition 1 thread. This case is very unstable with frequent re-balance and dying of threads. CPU wait time is very high and many times grater than 50% kafka Cluster is same and of version 0.10.1.1 I had few questions 1. What attributes of rocksdb-window metrices should I usually monitor and what would be their ideal values. 2. Also I tried accessing some attributes like put, fetch, get and flush in both 0.10.1.1 instance and 0.10.2.0 In 0.10.1.1 I got some values which I understand are values in nano seconds. In 0.10.2.0 value of all these attributes I queried were 0. What would this mean? Thanks Sachin On Fri, Feb 10, 2017 at 12:33 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Sachin, > > Thanks for sharing your observations, that are very helpful. > > Regards to monitoring, there are indeed metrics inside the Streams library > to meter state store operations; for rocksdb it records the average latency > and callrate for put / get / delete / all / range and flush / restore. You > can find the corresponding metric names in the monitoring section on web > docs once 0.10.2 is out: > > https://kafka.apache.org/documentation/#monitoring > > Or you can just grep all metric names and see which ones are of interests > to you. > > > Regards to the observations, I'm particularly interested in the fact that > "memory utilization keeps increasing". But since rocksdb uses both on-heap > and off heap memory it is hard to tell if there is really a memory leak. > Could you share your usage of the state stores, like are they used for > aggregations / joins / else, and did you override any of the rocksdb config > settings? > > > Guozhang > > > On Thu, Feb 9, 2017 at 8:58 PM, Sachin Mittal <sjmit...@gmail.com> wrote: > > > Hi, > > We recently upgraded to 0.10.2.0-rc0, the rocksdb issue seems to be > > resolved however we are just not able to get the streams going under our > > current scenario. > > The issue seems to be still related to rocksdb. > > Let me explain the situation: > > > > We have 40 partitions and 12 threads distributes across three machines > > (each having 4). > > Roughly 15 partition gets assigned to each machine. Thus we have 15 state > > stores on each machine. > > > > What we see is that streams frequently gets rebalanced due to > > CommitFailedException and as you have said this should not happen once > > application reaches steady state. > > We are running an instance on 4 core linux box (virtual machine). What we > > observe is that there is lot of waiting time for each core consistently > > greater than 50%. What we suspect is that application is spending lot of > > time on disk I/O this CPU keeps waiting. Also we observe that rate of > > consumption from source topic falls over time. Also overtime we see that > > CPU utilization falls and memory utilization increases. > > > > Then we we did was used 12 partition for same topic and 12 threads, so > each > > thread processes from single partition and 4 state stores get created per > > machine. (note streams application code is identical to above > > configuration). > > Here what we have observed so far is a stable configuration. After a > stead > > state no rebalance has same thread continues processing from same > > partition. > > Also we see that per core wait time fairly stable around 10%. > > > > Finally we have another configuration where a same single thread streams > > application reading from identical single partition topic. Here also > > application never fails. This is most consistent configuration running > from > > months now. The per core wait time is usually 0 - 3%. > > > > So over these experiments we feel that it is rocksdb that may be causing > > the state of streams application deteriorate over time. Is there any > rocks > > db metrics/logs we collect that we can watch and infer anything. > > I doubt there is any memory leak (as some have reported) because single > > thread application seems to be running forever. > > However is there any tuning that anyone can recommend which will make > > rocksdb more optimized. > > > > Has anyone else facing similar issues when there are many rocksdb local > > state store on a machine then application is not performing as well as > > single state store. > > > > Thanks > > Sachin > > > > > > > > > > > > On Tue, Feb 7, 2017 at 3:21 PM, Damian Guy <damian....@gmail.com> wrote: > > > > > Hi Sachin, > > > > > > Sorry i misunderstood what you had said. You are running 3 instances, > one > > > per machine? I thought you said you were running 3 instances on each > > > machine. > > > > > > Regarding partitions: you are better off having more partitions as this > > > effects the maximum degree of parallelism you can achieve in the app. > If > > > you only have 12 partitions then you can only have at most 12 threads > in > > > total. If you have 40 partitions then you can have up to 40 threads and > > so > > > on. It won't help with rebalancing anyway. > > > > > > You are correct, once all the instances are up and running then > > rebalances > > > generally shouldn't happen unless there is a failure of some kind. To > > > maintain membership of the consumer group the consumer used by streams > > > needs to poll at least every max.poll.interval.ms - the default for > this > > > is > > > 5 minutes. If for any reason, the processing of records returned from a > > > call to poll takes longer than max.poll.interval.ms then the consumer > > > will > > > be kicked out of the group and a rebalance will happen. If you are > seeing > > > rebalances occurring after everything has been up and running and > > reached a > > > steady state, you might want to increase the value of > > max.poll.interval.ms. > > > You > > > can also adjust max.poll.records to decrease the maximum number of > > records > > > retrieved by a single call to poll - this will help to reduce the > > > processing time. > > > > > > Thanks, > > > Damian > > > > > > On Tue, 7 Feb 2017 at 07:24 Sachin Mittal <sjmit...@gmail.com> wrote: > > > > > > > Hi, > > > > Everything is understood and I will try out 0.10.2.0-rc0 shortly. > > > > > > > > However one this is not clear: > > > > Firstly i'd recommend you have different state directory configs for > > each > > > > application instance. > > > > > > > > Well I am running three separate instance of 4 threads each on three > > > > different machines. > > > > So each machine has its own physical structure though path to the > > states > > > > dir is same for each, because that is the relative path where we have > > > > mounted the data directory separately for each of these three > machine. > > > > > > > > So my streams state.dir setting is identical for all the instances, > but > > > > physically there are located at different locations. > > > > So why do I need to have different config for each is not clear. > > > > > > > > I will also test with CLEANUP_DELAY_MS_CONFIG to be 30 minutes. > > > > > > > > Also one thing I wanted to know if I make partitions equal to total > > > streams > > > > threads which is 12, will that help in one thread always reading > from a > > > > single partition, and never a need to re-balance. > > > > > > > > I however don't understand one thing that once a steady state has > > reached > > > > and all threads have picked up their partitions then why there is > ever > > a > > > > need to do future re-balance, unless untill a thread dies. > > > > > > > > Like this is not clear that how often is re-balance is triggered and > is > > > > there a way we can control it. > > > > > > > > Thanks > > > > Sachin > > > > > > > > > > > > > > > > On Mon, Feb 6, 2017 at 10:10 PM, Damian Guy <damian....@gmail.com> > > > wrote: > > > > > > > > > Hi Sachin, > > > > > > > > > > Firstly i'd recommend you have different state directory configs > for > > > each > > > > > application instance. I suspect you are potentially hitting an > issue > > > > where > > > > > the partition assignment has changed, the state directory locks get > > > > > released, and i directory gets removed just before the lock is > taken > > > out > > > > by > > > > > another thread or process. Though it is pretty hard to tell from > the > > > > above > > > > > logs. You might also want to set StreamsConfig.CLEANUP_DELAY_ > > MS_CONFIG > > > to > > > > > a > > > > > higher value than the default of 60 seconds. There is no reason why > > it > > > > > couldn't be much higher, 30 minutes, maybe more, depends on how > badly > > > you > > > > > need old task data cleaned up. Setting this value higher will > reduce > > > the > > > > > likelihood of the exception occurring. > > > > > > > > > > 0.10.2 will be out shortly. It would be good if you can try that. > You > > > can > > > > > find the current RC here: http://home.apache.org/~ > > > > > ewencp/kafka-0.10.2.0-rc0/ > > > > > You don't need to upgrade your kafka brokers, just the streams > > client. > > > > > > > > > > Thanks, > > > > > Damian > > > > > > > > > > On Mon, 6 Feb 2017 at 10:53 Sachin Mittal <sjmit...@gmail.com> > > wrote: > > > > > > > > > > > Hi, > > > > > > Yes on first we have three machines with same data directory > > setting. > > > > > > So the state dir config is same in for each. > > > > > > > > > > > > If it helps this is the sequence of logs just before the thread > > > > shutting > > > > > > down > > > > > > > > > > > > .... > > > > > > > > > > > > stream-thread [StreamThread-3] Committing all tasks because the > > > commit > > > > > > interval 30000ms has elapsed > > > > > > stream-thread [StreamThread-3] Committing task 0_35 > > > > > > stream-thread [StreamThread-3] Committing task 0_38 > > > > > > stream-thread [StreamThread-3] Committing task 0_27 > > > > > > stream-thread [StreamThread-3] Committing task 0_12 > > > > > > > > > > > > stream-thread [StreamThread-3] Committing all tasks because the > > > commit > > > > > > interval 30000ms has elapsed > > > > > > stream-thread [StreamThread-3] Committing task 0_35 > > > > > > stream-thread [StreamThread-3] Committing task 0_38 > > > > > > stream-thread [StreamThread-3] Committing task 0_27 > > > > > > stream-thread [StreamThread-3] Committing task 0_12 > > > > > > > > > > > > stream-thread [StreamThread-3] Committing all tasks because the > > > commit > > > > > > interval 30000ms has elapsed > > > > > > stream-thread [StreamThread-3] Committing task 0_35 > > > > > > stream-thread [StreamThread-3] Committing task 0_38 > > > > > > stream-thread [StreamThread-3] Failed to commit StreamTask 0_38 > > > state: > > > > > > org.apache.kafka.streams.errors.ProcessorStateException: task > > [0_38] > > > > > Failed > > > > > > to flush state store key-table > > > > > > Caused by: org.rocksdb.RocksDBException: IO error: > > > > > > > > > > > > /data/advice/kafka-streams/new-part-advice/0_38/key- > > > > > table/key-table-201702052100/000009.sst: > > > > > > No such file or directory > > > > > > stream-thread [StreamThread-3] Shutting down > > > > > > stream-thread [StreamThread-3] Committing consumer offsets of > task > > > 0_35 > > > > > > stream-thread [StreamThread-3] Committing consumer offsets of > task > > > 0_38 > > > > > > stream-thread [StreamThread-3] Committing consumer offsets of > task > > > 0_27 > > > > > > stream-thread [StreamThread-3] Committing consumer offsets of > task > > > 0_12 > > > > > > stream-thread [StreamThread-3] Closing a task 0_35 > > > > > > stream-thread [StreamThread-3] Closing a task 0_38 > > > > > > stream-thread [StreamThread-3] Closing a task 0_27 > > > > > > stream-thread [StreamThread-3] Closing a task 0_12 > > > > > > stream-thread [StreamThread-3] Flushing state stores of task 0_35 > > > > > > stream-thread [StreamThread-3] Flushing state stores of task 0_38 > > > > > > stream-thread [StreamThread-3] Failed while executing StreamTask > > 0_38 > > > > > duet > > > > > > to flush state > > > > > > org.apache.kafka.streams.errors.ProcessorStateException: task > > [0_38] > > > > > Failed > > > > > > to flush state store key-table > > > > > > Caused by: org.rocksdb.RocksDBException: IO error: > > > > > > > > > > > > /data/advice/kafka-streams/new-part-advice/0_38/key- > > > > > table/key-table-201702052100/000009.sst: > > > > > > No such file or directory > > > > > > stream-thread [StreamThread-3] Flushing state stores of task 0_27 > > > > > > stream-thread [StreamThread-3] Flushing state stores of task 0_12 > > > > > > stream-thread [StreamThread-3] Closing the state manager of task > > 0_35 > > > > > > stream-thread [StreamThread-3] Closing the state manager of task > > 0_38 > > > > > > stream-thread [StreamThread-3] Failed while executing StreamTask > > 0_38 > > > > > duet > > > > > > to close state manager: > > > > > > org.apache.kafka.streams.errors.ProcessorStateException: task > > [0_38] > > > > > Failed > > > > > > to close state store key-table > > > > > > Caused by: org.rocksdb.RocksDBException: IO error: > > > > > > > > > > > > /data/advice/kafka-streams/new-part-advice/0_38/key- > > > > > table/key-table-201702052100/000009.sst: > > > > > > No such file or directory > > > > > > stream-thread [StreamThread-3] Closing the state manager of task > > 0_27 > > > > > > stream-thread [StreamThread-3] Closing the state manager of task > > 0_12 > > > > > > Closing the Kafka producer with timeoutMillis = > 9223372036854775807 > > > ms. > > > > > > stream-thread [StreamThread-3] Removing all active tasks [[0_35, > > > 0_38, > > > > > > 0_27, 0_12]] > > > > > > stream-thread [StreamThread-3] Removing all standby tasks [[]] > > > > > > stream-thread [StreamThread-3] Stream thread shutdown complete > > > > > > > > > > > > It was working for iterations before that and then suddenly that > > > > dir/file > > > > > > was gone and it could not commit/flush/close the state. > > > > > > > > > > > > > > > > > > For second, what do you recommend. Is there something we can > patch > > > from > > > > > > 10.2 into 10.1. > > > > > > Certain commits or something. > > > > > > > > > > > > Or do we again need to upgrade the cluster or 10.2, or if we just > > > > upgrade > > > > > > streams client to 10.2 will it work fine? > > > > > > Since 10.2 is not released yet I suppose we would have build the > > > > snapshot > > > > > > version. > > > > > > > > > > > > Thanks > > > > > > Sachin > > > > > > > > > > > > > > > > > > On Mon, Feb 6, 2017 at 3:58 PM, Damian Guy <damian....@gmail.com > > > > > > wrote: > > > > > > > > > > > > > Hi Sachin, > > > > > > > > > > > > > > The first exception - Is each instance of your streams app on a > > > > single > > > > > > > machine running with the same state directory config? > > > > > > > The second exception - i believe is a bug in 0.10.1 that has > been > > > > fixed > > > > > > in > > > > > > > 0.10.2. There has been a number of issues fixed in this area. > > > > > > > > > > > > > > Thanks, > > > > > > > Damian > > > > > > > > > > > > > > On Mon, 6 Feb 2017 at 05:43 Sachin Mittal <sjmit...@gmail.com> > > > > wrote: > > > > > > > > > > > > > > > Hello All, > > > > > > > > We recently upgraded to kafka_2.10-0.10.1.1. > > > > > > > > > > > > > > > > We have a source topic with replication = 3 and partition = > 40. > > > > > > > > We have a streams application run with > > NUM_STREAM_THREADS_CONFIG > > > = > > > > 4 > > > > > > and > > > > > > > on > > > > > > > > three machines. So 12 threads in total. > > > > > > > > > > > > > > > > What we do is start the same streams application one by one > on > > > > three > > > > > > > > machines. > > > > > > > > > > > > > > > > After some time what we noticed was that one of the machine > > > streams > > > > > > > > application just crashed. When we inspected the log here is > > what > > > we > > > > > > > found. > > > > > > > > There were 2 sets of errors like: > > > > > > > > > > > > > > > > stream-thread [StreamThread-3] Failed to commit StreamTask > 0_38 > > > > > state: > > > > > > > > org.apache.kafka.streams.errors.ProcessorStateException: > task > > > > [0_38] > > > > > > > Failed > > > > > > > > to flush state store key-table > > > > > > > > at > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > > > > > ProcessorStateManager.flush(ProcessorStateManager.java:331) > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > > at > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > > > > > StreamTask.commit(StreamTask.java:267) > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.commitOne( > > > > > > > StreamThread.java:576) > > > > > > > > [kafka-streams-0.10.1.1.jar:na] > > > > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.commitAll( > > > > > > > StreamThread.java:562) > > > > > > > > [kafka-streams-0.10.1.1.jar:na] > > > > > > > > at > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > > > StreamThread.maybeCommit( > > > > > > > StreamThread.java:538) > > > > > > > > [kafka-streams-0.10.1.1.jar:na] > > > > > > > > at > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > StreamThread.runLoop( > > > > > > > StreamThread.java:456) > > > > > > > > [kafka-streams-0.10.1.1.jar:na] > > > > > > > > at > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > > > > > StreamThread.run(StreamThread.java:242) > > > > > > > > [kafka-streams-0.10.1.1.jar:na] > > > > > > > > Caused by: org.apache.kafka.streams.errors. > > > ProcessorStateException: > > > > > > > Error > > > > > > > > while executing flush from store key-table-201702052100 > > > > > > > > at > > > > > > > > > > > > > > > > org.apache.kafka.streams.state.internals.RocksDBStore. > > > > > > > flushInternal(RocksDBStore.java:375) > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > > at > > > > > > > > > > > > > > > > org.apache.kafka.streams.state.internals.RocksDBStore. > > > > > > > flush(RocksDBStore.java:366) > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > > at > > > > > > > > > > > > > > > > org.apache.kafka.streams.state.internals. > > > RocksDBWindowStore.flush( > > > > > > > RocksDBWindowStore.java:256) > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > > at > > > > > > > > > > > > > > > > org.apache.kafka.streams.state.internals. > > > MeteredWindowStore.flush( > > > > > > > MeteredWindowStore.java:116) > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > > at > > > > > > > > > > > > > > > > org.apache.kafka.streams.state.internals. > > > CachingWindowStore.flush( > > > > > > > CachingWindowStore.java:119) > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > > at > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > > > > > ProcessorStateManager.flush(ProcessorStateManager.java:329) > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > > ... 6 common frames omitted > > > > > > > > Caused by: org.rocksdb.RocksDBException: IO error: > > > > > > > > > > > > > > > > /data/kafka-streams/new-part/0_38/key-table/key-table- > > > > > > > 201702052100/000009.sst: > > > > > > > > No such file or directory > > > > > > > > at org.rocksdb.RocksDB.flush(Native Method) > > > > > > > ~[rocksdbjni-4.9.0.jar:na] > > > > > > > > at org.rocksdb.RocksDB.flush(RocksDB.java:1360) > > > > > > > > ~[rocksdbjni-4.9.0.jar:na] > > > > > > > > at > > > > > > > > > > > > > > > > org.apache.kafka.streams.state.internals.RocksDBStore. > > > > > > > flushInternal(RocksDBStore.java:373) > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > > ... 11 common frames omitted > > > > > > > > > > > > > > > > When we queried the directory > > > > > > > > /data/kafka-streams/new-part/0_38/key-table/key-table- > > > 201702052100/ > > > > > we > > > > > > > > could not find the directory. > > > > > > > > So looks like this directory was earlier deleted by some task > > and > > > > now > > > > > > > some > > > > > > > > other task is trying to flush it too. > > > > > > > > What could be possible the reason for the same? > > > > > > > > > > > > > > > > Another set of error we see is this: > > > > > > > > org.apache.kafka.streams.errors.StreamsException: > > stream-thread > > > > > > > > [StreamThread-1] Failed to rebalance > > > > > > > > at > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > StreamThread.runLoop( > > > > > > > StreamThread.java:410) > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > > at > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > > > > > StreamThread.run(StreamThread.java:242) > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > > Caused by: org.apache.kafka.streams.errors. > > > ProcessorStateException: > > > > > > > Error > > > > > > > > opening store key-table-201702052000 at location > > > > > > > > /data/kafka-streams/new-part/0_38/key-table/key-table- > > > 201702052000 > > > > > > > > at > > > > > > > > > > > > > > > > org.apache.kafka.streams.state.internals.RocksDBStore. > > > > > > > openDB(RocksDBStore.java:190) > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > > at > > > > > > > > > > > > > > > > org.apache.kafka.streams.state.internals.RocksDBStore. > > > > > > > openDB(RocksDBStore.java:159) > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.streams.state.internals.RocksDBWindowStore$Segment. > > > > > > > openDB(RocksDBWindowStore.java:72) > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > > at > > > > > > > > > > > > > > > > org.apache.kafka.streams.state.internals.RocksDBWindowStore. > > > > > > > getOrCreateSegment(RocksDBWindowStore.java:388) > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > org.apache.kafka.streams.state.internals.RocksDBWindowStore. > > putInternal( > > > > > > > RocksDBWindowStore.java:319) > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > > at > > > > > > > > > > > > > > > > org.apache.kafka.streams.state.internals. > > > > > RocksDBWindowStore.access$000( > > > > > > > RocksDBWindowStore.java:51) > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > > at > > > > > > > > > > > > > > > > org.apache.kafka.streams.state.internals. > > > > > RocksDBWindowStore$1.restore( > > > > > > > RocksDBWindowStore.java:206) > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > > at > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > ProcessorStateManager. > > > > > > > restoreActiveState(ProcessorStateManager.java:235) > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > > at > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > ProcessorStateManager. > > > > > > > register(ProcessorStateManager.java:198) > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > > at > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > > > > > ProcessorContextImpl.register(ProcessorContextImpl.java:123) > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > > at > > > > > > > > > > > > > > > > org.apache.kafka.streams.state.internals. > > > RocksDBWindowStore.init( > > > > > > > RocksDBWindowStore.java:200) > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > > at > > > > > > > > > > > > > > > > org.apache.kafka.streams.state.internals. > > > MeteredWindowStore.init( > > > > > > > MeteredWindowStore.java:66) > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > > at > > > > > > > > > > > > > > > > org.apache.kafka.streams.state.internals. > > > CachingWindowStore.init( > > > > > > > CachingWindowStore.java:64) > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > > at > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.AbstractTask. > > > > > > > initializeStateStores(AbstractTask.java:81) > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > > at > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > > > > > StreamTask.<init>(StreamTask.java:119) > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > > at > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > > > > > StreamThread.createStreamTask(StreamThread.java:633) > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > > at > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > > > > > StreamThread.addStreamTasks(StreamThread.java:660) > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > > at > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > > > StreamThread.access$100( > > > > > > > StreamThread.java:69) > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > > at > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread$1. > > > > > > > onPartitionsAssigned(StreamThread.java:124) > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > > at > > > > > > > > > > > > > > > > org.apache.kafka.clients.consumer.internals. > > ConsumerCoordinator. > > > > > > > onJoinComplete(ConsumerCoordinator.java:228) > > > > > > > > ~[kafka-clients-0.10.1.1.jar:na] > > > > > > > > at > > > > > > > > > > > > > > > > org.apache.kafka.clients.consumer.internals. > > AbstractCoordinator. > > > > > > > joinGroupIfNeeded(AbstractCoordinator.java:313) > > > > > > > > ~[kafka-clients-0.10.1.1.jar:na] > > > > > > > > at > > > > > > > > > > > > > > > > org.apache.kafka.clients.consumer.internals. > > AbstractCoordinator. > > > > > > > ensureActiveGroup(AbstractCoordinator.java:277) > > > > > > > > ~[kafka-clients-0.10.1.1.jar:na] > > > > > > > > at > > > > > > > > > > > > > > > > org.apache.kafka.clients.consumer.internals. > > > > > ConsumerCoordinator.poll( > > > > > > > ConsumerCoordinator.java:259) > > > > > > > > ~[kafka-clients-0.10.1.1.jar:na] > > > > > > > > at > > > > > > > > > > > > > > > > org.apache.kafka.clients.consumer.KafkaConsumer. > > > > > > > pollOnce(KafkaConsumer.java:1013) > > > > > > > > ~[kafka-clients-0.10.1.1.jar:na] > > > > > > > > at > > > > > > > > > > > > > > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll( > > > > > > > KafkaConsumer.java:979) > > > > > > > > ~[kafka-clients-0.10.1.1.jar:na] > > > > > > > > at > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > StreamThread.runLoop( > > > > > > > StreamThread.java:407) > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > > ... 1 common frames omitted > > > > > > > > Caused by: org.rocksdb.RocksDBException: IO error: lock > > > > > > > > > > > > > > > > > > /data/kafka-streams/new-part/0_38/key-table/key-table- > > 201702052000/LOCK: > > > > > > > No > > > > > > > > locks available > > > > > > > > at org.rocksdb.RocksDB.open(Native Method) > > > > > > > ~[rocksdbjni-4.9.0.jar:na] > > > > > > > > at org.rocksdb.RocksDB.open(RocksDB.java:184) > > > > > > > > ~[rocksdbjni-4.9.0.jar:na] > > > > > > > > at > > > > > > > > > > > > > > > > org.apache.kafka.streams.state.internals.RocksDBStore. > > > > > > > openDB(RocksDBStore.java:183) > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > > ... 26 common frames omitted > > > > > > > > > > > > > > > > And then the whole application shuts down. We have not > > understood > > > > > this > > > > > > > part > > > > > > > > of the error and if this second error is somehow related to > > first > > > > > > error. > > > > > > > > > > > > > > > > Please let us know what could be the cause of these errors or > > if > > > > they > > > > > > > have > > > > > > > > been fixed and if there is some way to fix this in current > > > release. > > > > > > > > > > > > > > > > Thanks > > > > > > > > Sachin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > -- Guozhang >