[ 
https://issues.apache.org/jira/browse/KAFKA-8104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16891092#comment-16891092
 ] 

Mattia Barbon commented on KAFKA-8104:
--------------------------------------

It also happens with version 2.3 of the client

> Consumer cannot rejoin to the group after rebalancing
> -----------------------------------------------------
>
>                 Key: KAFKA-8104
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8104
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 2.0.0
>            Reporter: Gregory Koshelev
>            Priority: Critical
>         Attachments: consumer-rejoin-fail.log
>
>
> TL;DR; {{KafkaConsumer}} cannot rejoin to the group due to inconsistent 
> {{AbstractCoordinator.generation}} (which is {{NO_GENERATION}} and 
> {{AbstractCoordinator.joinFuture}} (which is succeeded {{RequestFuture}}). 
> See explanation below.
> There are 16 consumers in single process (threads from pool-4-thread-1 to 
> pool-4-thread-16). All of them belong to single consumer group 
> {{hercules.sink.elastic.legacy_logs_elk_c2}}. Rebalancing has been acquired 
> and consumers have got {{CommitFailedException}} as expected:
> {noformat}
> 2019-03-10T03:16:37.023Z [pool-4-thread-10] WARN  
> r.k.vostok.hercules.sink.SimpleSink - Commit failed due to rebalancing
> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be 
> completed since the group has already rebalanced and assigned the partitions 
> to another member. This means that the time between subsequent calls to 
> poll() was longer than the configured max.poll.interval.ms, which typically 
> implies that the poll loop is spending too much time message processing. You 
> can address this either by increasing the session timeout or by reducing the 
> maximum size of batches returned in poll() with max.poll.records.
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:798)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:681)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1334)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1298)
>       at ru.kontur.vostok.hercules.sink.Sink.commit(Sink.java:156)
>       at ru.kontur.vostok.hercules.sink.SimpleSink.run(SimpleSink.java:104)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> {noformat}
> After that, most of them successfully rejoined to the group with generation 
> 10699:
> {noformat}
> 2019-03-10T03:16:39.208Z [pool-4-thread-13] INFO  
> o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-13, 
> groupId=hercules.sink.elastic.legacy_logs_elk_c2] Successfully joined group 
> with generation 10699
> 2019-03-10T03:16:39.209Z [pool-4-thread-13] INFO  
> o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-13, 
> groupId=hercules.sink.elastic.legacy_logs_elk_c2] Setting newly assigned 
> partitions [legacy_logs_elk_c2-18]
> ...
> 2019-03-10T03:16:39.216Z [pool-4-thread-11] INFO  
> o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-11, 
> groupId=hercules.sink.elastic.legacy_logs_elk_c2] Successfully joined group 
> with generation 10699
> 2019-03-10T03:16:39.217Z [pool-4-thread-11] INFO  
> o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-11, 
> groupId=hercules.sink.elastic.legacy_logs_elk_c2] Setting newly assigned 
> partitions [legacy_logs_elk_c2-10, legacy_logs_elk_c2-11]
> ...
> 2019-03-10T03:16:39.218Z [pool-4-thread-15] INFO  
> o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-15, 
> groupId=hercules.sink.elastic.legacy_logs_elk_c2] Setting newly assigned 
> partitions [legacy_logs_elk_c2-24]
> 2019-03-10T03:16:42.320Z [kafka-coordinator-heartbeat-thread | 
> hercules.sink.elastic.legacy_logs_elk_c2] INFO  
> o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-6, 
> groupId=hercules.sink.elastic.legacy_logs_elk_c2] Attempt to heartbeat failed 
> since group is rebalancing
> 2019-03-10T03:16:42.320Z [kafka-coordinator-heartbeat-thread | 
> hercules.sink.elastic.legacy_logs_elk_c2] INFO  
> o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-5, 
> groupId=hercules.sink.elastic.legacy_logs_elk_c2] Attempt to heartbeat failed 
> since group is rebalancing
> 2019-03-10T03:16:42.323Z [kafka-coordinator-heartbeat-thread | 
> hercules.sink.elastic.legacy_logs_elk_c2] INFO  
> o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-7, 
> groupId=hercules.sink.elastic.legacy_logs_elk_c2] Attempt to heartbeat failed 
> since group is rebalancing
> 2019-03-10T03:17:13.235Z [pool-4-thread-4] INFO  
> o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-4, 
> groupId=hercules.sink.elastic.legacy_logs_elk_c2] Successfully joined group 
> with generation -1
> {noformat}
> But one consumer (pool-4-thread-4) got strange generation -1 (see last log 
> record from above).
> Further log records in attached log file.
> Finally, 15 consumers successfully rejoined. But consumer with thread 
> {{pool-4-thread-4}} didn't rejoin:
> {noformat}
> 2019-03-10T03:17:13.355Z [pool-4-thread-4] ERROR 
> r.k.vostok.hercules.sink.SimpleSink - Unspecified exception has been acquired
> java.lang.IllegalStateException: Coordinator selected invalid assignment 
> protocol: null
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:241)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:422)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:333)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1175)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
>       at ru.kontur.vostok.hercules.sink.Sink.poll(Sink.java:152)
>       at ru.kontur.vostok.hercules.sink.SimpleSink.run(SimpleSink.java:70)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> 2019-03-10T03:17:13.360Z [pool-4-thread-4] ERROR 
> r.k.vostok.hercules.sink.SimpleSink - Unspecified exception has been acquired
> java.lang.IllegalStateException: Coordinator selected invalid assignment 
> protocol: null
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:241)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:422)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:333)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1175)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
>       at ru.kontur.vostok.hercules.sink.Sink.poll(Sink.java:152)
>       at ru.kontur.vostok.hercules.sink.SimpleSink.run(SimpleSink.java:70)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)}}
> {noformat}
> It is important to note, that {{KafkaConsumer.coordinator.joinFuture}} is not 
> null and succeeded, but {{ConsumerCoordinator}} cannot perform 
> {{resetJoinGroupFuture()}} due to exception was thrown from 
> {{onJoinComplete()}}:
> {code:java}
>             if (future.succeeded()) {
>                 // Duplicate the buffer in case `onJoinComplete` does not 
> complete and needs to be retried.
>                 ByteBuffer memberAssignment = future.value().duplicate();
>                 onJoinComplete(generation.generationId, generation.memberId, 
> generation.protocol, memberAssignment);
>                 // We reset the join group future only after the completion 
> callback returns. This ensures
>                 // that if the callback is woken up, we will retry it on the 
> next joinGroupIfNeeded.
>                 resetJoinGroupFuture();
>                 needsJoinPrepare = true;
>             }
> {code}
> If I understood correctly, the generation was changed to {{NO_GENERATION}} in 
> another thread by one of CoordinatorResponseHandlers.
>  
>  [^consumer-rejoin-fail.log] 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to