Hi, So recently we fixed the deadlock issue and also built the streams jar by copying the rocks db configs from trunk. So we don't get any deadlock issue now and also we see that the wait time of CPU cores stays around 5% (down from 50% earlier).
However we now get a new exception which is not handled by streams application and causes the instance to shutdown. org.apache.kafka.streams.errors.StreamsException: stream-thread [StreamThread-2] Failed to rebalance at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:622) ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:378) ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] Caused by: java.lang.IllegalStateException: task [0_9] Log end offset of new-part-advice-key-table-changelog-9 should not change while restoring: old end offset 647352, current offset 647632 at org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:240) ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:193) ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99) ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] at org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.init(RocksDBSegmentedBytesStore.java:101) ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] at org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.init(ChangeLoggingSegmentedBytesStore.java:68) ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] at org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.init(MeteredSegmentedBytesStore.java:66) ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] at org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:76) ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86) ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141) ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] What I check from logs is this DEBUG 2017-03-25 02:07:24,499 [StreamThread-2]: org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [StreamThread-2] creating new task 0_9 So it creates the task at this time. To create the local state store from the chnagelog topic it starts at DEBUG 2017-03-25 02:07:24,550 [StreamThread-2]: org.apache.kafka.clients.consumer.KafkaConsumer - Subscribed to partition(s): new-part-advice-key-table-changelog-9 DEBUG 2017-03-25 02:07:24,550 [StreamThread-2]: org.apache.kafka.clients.consumer.KafkaConsumer - Seeking to end of partition new-part-advice-key-table-changelog-9 DEBUG 2017-03-25 02:07:24,550 [StreamThread-2]: org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for partition new-part-advice-key-table-changelog-9 to latest offset. DEBUG 2017-03-25 02:07:24,552 [StreamThread-2]: org.apache.kafka.clients.consumer.internals.Fetcher - Handling ListOffsetResponse response for new-part-advice-key-table-changelog-9. Fetched offset 647352, timestamp -1 DEBUG 2017-03-25 02:07:24,552 [StreamThread-2]: org.apache.kafka.clients.consumer.KafkaConsumer - Seeking to beginning of partition new-part-advice-key-table-changelog-9 DEBUG 2017-03-25 02:07:24,552 [StreamThread-2]: org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for partition new-part-advice-key-table-changelog-9 to earliest offset. and process is over at DEBUG 2017-03-25 02:10:21,225 [StreamThread-2]: org.apache.kafka.clients.consumer.internals.Fetcher - Sending fetch for partitions [new-part-advice-key-table-changelog-9] to broker 192.168.73.199:9092 (id: 5 rack: null) DEBUG 2017-03-25 02:10:21,230 [StreamThread-2]: org.apache.kafka.clients.consumer.KafkaConsumer - Unsubscribed all topics or patterns and assigned partitions And the exception is thrown at: ERROR 2017-03-25 02:10:21,232 [StreamThread-2]: org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - User provided listener org.apache.kafka.streams.processor.internals.StreamThread$1 for group new-part-advice failed on partition assignment java.lang.IllegalStateException: task [0_9] Log end offset of new-part-advice-key-table-changelog-9 should not change while restoring: old end offset 647352, current offset 647632 So you can clearly see that while restoring the state some other thread (on same or other instance) did commit some more offset for this change long partition so in the end of the process two offsets did not match. I think this is fairly a reasonable scenario and while restoring the state it should also consider any added offsets and not assume that this is the only thread processing that partition. It may have been some other instance did commit some more offsets while this thread is trying to restore the state. So I feel this exception should be handled and not thrown all the way to the streams. What do you all think. Thanks Sachin