Tim Van Laer created KAFKA-7290:
-----------------------------------
Summary: Kafka Streams application fails to rebalance and is stuck
in "Updated cluster metadata version"
Key: KAFKA-7290
URL: https://issues.apache.org/jira/browse/KAFKA-7290
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 0.11.0.3, 0.10.2.2, 0.10.2.1
Reporter: Tim Van Laer
Attachments: cg_metadata_failure.txt
Our kafka streams application crashed due to a RocksDBException, after that the
consumer group basically became unusable. Every consumer in the group went from
RUNNING to REBALANCING and were stuck to that state.
The application was still on an older version of Kafka Streams (0.10.2.1), but
an upgrade of the library didn't got the consumer group back active.
We tried:
* adding and removing consumers to the group, no luck, none of the consumers
starts processing
* stopping all consumers and restarted the application, no luck
* stopping all consumer, reset the consumer group (using the
kafka-streams-application-reset tool), no luck
* replaced the underlying machines, no luck
* Upgrading our application from Kafka Streams 0.10.2.1 to 0.10.2.2 and
0.11.0.3 after it got stuck, no luck
We finally got the application back running by changing the applicationId (we
could afford to loose the state in this particular case).
See attachment for debug logs of the application. The application can reach the
Kafka cluster but fails to join the group.
The RocksDBException that triggered this state (I lost the container, so
unfortunately I don't have more logging):
{code}
2018-08-14 01:40:39 ERROR StreamThread:813 - stream-thread [StreamThread-1]
Failed to commit StreamTask 1_1 state:
org.apache.kafka.streams.errors.ProcessorStateException: task [1_1] Failed to
flush state store firehose_subscriptions
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:337)
~[firechief.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:72)
~[firechief.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
~[firechief.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
~[firechief.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
[firechief.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
[firechief.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
[firechief.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
[firechief.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
[firechief.jar:?]
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error while
executing flush from store firehose_subscriptions
at
org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:354)
~[firechief.jar:?]
at
org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:345)
~[firechief.jar:?]
at
org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractWrappedStateStore.flush(WrappedStateStore.java:80)
~[firechief.jar:?]
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore$6.run(MeteredKeyValueStore.java:92)
~[firechief.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
~[firechief.jar:?]
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:186)
~[firechief.jar:?]
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:113)
~[firechief.jar:?]
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:335)
~[firechief.jar:?]
... 8 more
Caused by: org.rocksdb.RocksDBException: _
at org.rocksdb.RocksDB.flush(Native Method) ~[firechief.jar:?]
at org.rocksdb.RocksDB.flush(RocksDB.java:1642) ~[firechief.jar:?]
at
org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:352)
~[firechief.jar:?]
at
org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:345)
~[firechief.jar:?]
at
org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractWrappedStateStore.flush(WrappedStateStore.java:80)
~[firechief.jar:?]
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore$6.run(MeteredKeyValueStore.java:92)
~[firechief.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
~[firechief.jar:?]
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:186)
~[firechief.jar:?]
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:113)
~[firechief.jar:?]
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:335)
~[firechief.jar:?]
... 8 more
{code}
Any ideas on what is wrong or what we can do to workaround this issue?
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)