Hi,
So recently we fixed the deadlock issue and also built the streams jar by
copying the rocks db configs from trunk.
So we don't get any deadlock issue now and also we see that the wait time
of CPU cores stays around 5% (down from 50% earlier).

However we now get a new exception which is not handled by streams
application and causes the instance to shutdown.

org.apache.kafka.streams.errors.StreamsException: stream-thread
[StreamThread-2] Failed to rebalance
    at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:622)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
    at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:378)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
Caused by: java.lang.IllegalStateException: task [0_9] Log end offset of
new-part-advice-key-table-changelog-9 should not change while restoring:
old end offset 647352, current offset 647632
    at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:240)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
    at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:193)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
    at
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
    at
org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.init(RocksDBSegmentedBytesStore.java:101)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
    at
org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.init(ChangeLoggingSegmentedBytesStore.java:68)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
    at
org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.init(MeteredSegmentedBytesStore.java:66)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
    at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:76)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
    at
org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
    at
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]

What I check from logs is this
DEBUG 2017-03-25 02:07:24,499 [StreamThread-2]:
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[StreamThread-2] creating new task 0_9
So it creates the task at this time.

To create the local state store from the chnagelog topic it starts at

DEBUG 2017-03-25 02:07:24,550 [StreamThread-2]:
org.apache.kafka.clients.consumer.KafkaConsumer - Subscribed to
partition(s): new-part-advice-key-table-changelog-9
DEBUG 2017-03-25 02:07:24,550 [StreamThread-2]:
org.apache.kafka.clients.consumer.KafkaConsumer - Seeking to end of
partition new-part-advice-key-table-changelog-9
DEBUG 2017-03-25 02:07:24,550 [StreamThread-2]:
org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for
partition new-part-advice-key-table-changelog-9 to latest offset.
DEBUG 2017-03-25 02:07:24,552 [StreamThread-2]:
org.apache.kafka.clients.consumer.internals.Fetcher - Handling
ListOffsetResponse response for new-part-advice-key-table-changelog-9.
Fetched offset 647352, timestamp -1
DEBUG 2017-03-25 02:07:24,552 [StreamThread-2]:
org.apache.kafka.clients.consumer.KafkaConsumer - Seeking to beginning of
partition new-part-advice-key-table-changelog-9
DEBUG 2017-03-25 02:07:24,552 [StreamThread-2]:
org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for
partition new-part-advice-key-table-changelog-9 to earliest offset.

and process is over at
DEBUG 2017-03-25 02:10:21,225 [StreamThread-2]:
org.apache.kafka.clients.consumer.internals.Fetcher - Sending fetch for
partitions [new-part-advice-key-table-changelog-9] to broker
192.168.73.199:9092 (id: 5 rack: null)
DEBUG 2017-03-25 02:10:21,230 [StreamThread-2]:
org.apache.kafka.clients.consumer.KafkaConsumer - Unsubscribed all topics
or patterns and assigned partitions

And the exception is thrown at:
ERROR 2017-03-25 02:10:21,232 [StreamThread-2]:
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - User
provided listener
org.apache.kafka.streams.processor.internals.StreamThread$1 for group
new-part-advice failed on partition assignment
java.lang.IllegalStateException: task [0_9] Log end offset of
new-part-advice-key-table-changelog-9 should not change while restoring:
old end offset 647352, current offset 647632

So you can clearly see that while restoring the state some other thread (on
same or other instance) did commit some more offset for this change long
partition so in the end of the process two offsets did not match. I think
this is fairly a reasonable scenario and while restoring the state it
should also consider any added offsets and not assume that this is the only
thread processing that partition. It may have been some other instance did
commit some more offsets while this thread is trying to restore the state.

So I feel this exception should be handled and not thrown all the way to
the streams.

What do you all think.

Thanks
Sachin

Reply via email to