Xavier Léauté created KAFKA-5073:
------------------------------------
Summary: Kafka Streams stuck rebalancing after exception thrown in
rebalance listener
Key: KAFKA-5073
URL: https://issues.apache.org/jira/browse/KAFKA-5073
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 0.11.0.0, 0.10.2.1
Reporter: Xavier Léauté
An exception thrown in the Steams rebalance listener will cause the Kafka
consumer coordinator to log an error, but the streams app will not bubble the
exception up to the uncaught exception handler.
This will leave the app stuck in rebalancing state if an exception is thrown by
the consumer when starting state restore.
Here is an example log that shows the error when the consumer throws a CRC
error during state restore.
{{code}}
[2017-04-13 14:46:41,409] ERROR [XXX-StreamThread-1] User provided listener
org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener for
group XXXXXXX failed on partition assignment
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:269)
org.apache.kafka.common.KafkaException: Record batch for partition _my_topic-0
at offset 42 is invalid, cause: Record is corrupt (stored crc = 1982353474,
computed crc = 1572524932)
at
org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.maybeEnsureValid(Fetcher.java:904)
at
org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.nextFetchedRecord(Fetcher.java:936)
at
org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:960)
at
org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:864)
at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:517)
at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:482)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1069)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1002)
at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:145)
at
org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:1329)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:296)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1037)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1002)
at
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:546)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:702)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:326)
{{code}}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)