[ 
https://issues.apache.org/jira/browse/KAFKA-5073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xavier Léauté updated KAFKA-5073:
---------------------------------
    Description: 
An exception thrown in the Steams rebalance listener will cause the Kafka 
consumer coordinator to log an error, but the streams app will not bubble the 
exception up to the uncaught exception handler.

This will leave the app stuck in rebalancing state if an exception is thrown by 
the consumer when starting state restore.

Here is an example log that shows the error when the consumer throws a CRC 
error during state restore.

{code}
[2017-04-13 14:46:41,409] ERROR [XXX-StreamThread-1] User provided listener 
org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener for 
group XXXXXXX failed on partition assignment 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:269)
org.apache.kafka.common.KafkaException: Record batch for partition _my_topic-0 
at offset 42 is invalid, cause: Record is corrupt (stored crc = 1982353474, 
computed crc = 1572524932)
        at 
org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.maybeEnsureValid(Fetcher.java:904)
        at 
org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.nextFetchedRecord(Fetcher.java:936)
        at 
org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:960)
        at 
org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:864)
        at 
org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:517)
        at 
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:482)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1069)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1002)
        at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:145)
        at 
org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:1329)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:296)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1037)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1002)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:546)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:702)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:326)
{code}

  was:
An exception thrown in the Steams rebalance listener will cause the Kafka 
consumer coordinator to log an error, but the streams app will not bubble the 
exception up to the uncaught exception handler.

This will leave the app stuck in rebalancing state if an exception is thrown by 
the consumer when starting state restore.

Here is an example log that shows the error when the consumer throws a CRC 
error during state restore.

{{code}}
[2017-04-13 14:46:41,409] ERROR [XXX-StreamThread-1] User provided listener 
org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener for 
group XXXXXXX failed on partition assignment 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:269)
org.apache.kafka.common.KafkaException: Record batch for partition _my_topic-0 
at offset 42 is invalid, cause: Record is corrupt (stored crc = 1982353474, 
computed crc = 1572524932)
        at 
org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.maybeEnsureValid(Fetcher.java:904)
        at 
org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.nextFetchedRecord(Fetcher.java:936)
        at 
org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:960)
        at 
org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:864)
        at 
org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:517)
        at 
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:482)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1069)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1002)
        at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:145)
        at 
org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:1329)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:296)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1037)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1002)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:546)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:702)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:326)
{{code}}


> Kafka Streams stuck rebalancing after exception thrown in rebalance listener
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-5073
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5073
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.11.0.0, 0.10.2.1
>            Reporter: Xavier Léauté
>
> An exception thrown in the Steams rebalance listener will cause the Kafka 
> consumer coordinator to log an error, but the streams app will not bubble the 
> exception up to the uncaught exception handler.
> This will leave the app stuck in rebalancing state if an exception is thrown 
> by the consumer when starting state restore.
> Here is an example log that shows the error when the consumer throws a CRC 
> error during state restore.
> {code}
> [2017-04-13 14:46:41,409] ERROR [XXX-StreamThread-1] User provided listener 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener 
> for group XXXXXXX failed on partition assignment 
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:269)
> org.apache.kafka.common.KafkaException: Record batch for partition 
> _my_topic-0 at offset 42 is invalid, cause: Record is corrupt (stored crc = 
> 1982353474, computed crc = 1572524932)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.maybeEnsureValid(Fetcher.java:904)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.nextFetchedRecord(Fetcher.java:936)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:960)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:864)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:517)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:482)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1069)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1002)
>         at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:145)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:1329)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:296)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1037)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1002)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:546)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:702)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:326)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to