[ https://issues.apache.org/jira/browse/KAFKA-8104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16946061#comment-16946061 ]
Nikolay Izhikov edited comment on KAFKA-8104 at 10/7/19 5:28 PM: ----------------------------------------------------------------- Reproducer simplified and updated. [1] https://github.com/nizhikov/kafka/pull/1 was (Author: nizhikov): Reproducer simplified and updated. > Consumer cannot rejoin to the group after rebalancing > ----------------------------------------------------- > > Key: KAFKA-8104 > URL: https://issues.apache.org/jira/browse/KAFKA-8104 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 2.0.0, 2.1.0, 2.2.0, 2.3.0 > Reporter: Gregory Koshelev > Assignee: Nikolay Izhikov > Priority: Critical > Attachments: consumer-rejoin-fail.log > > > TL;DR; {{KafkaConsumer}} cannot rejoin to the group due to inconsistent > {{AbstractCoordinator.generation}} (which is {{NO_GENERATION}} and > {{AbstractCoordinator.joinFuture}} (which is succeeded {{RequestFuture}}). > See explanation below. > There are 16 consumers in single process (threads from pool-4-thread-1 to > pool-4-thread-16). All of them belong to single consumer group > {{hercules.sink.elastic.legacy_logs_elk_c2}}. Rebalancing has been acquired > and consumers have got {{CommitFailedException}} as expected: > {noformat} > 2019-03-10T03:16:37.023Z [pool-4-thread-10] WARN > r.k.vostok.hercules.sink.SimpleSink - Commit failed due to rebalancing > org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be > completed since the group has already rebalanced and assigned the partitions > to another member. This means that the time between subsequent calls to > poll() was longer than the configured max.poll.interval.ms, which typically > implies that the poll loop is spending too much time message processing. You > can address this either by increasing the session timeout or by reducing the > maximum size of batches returned in poll() with max.poll.records. > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:798) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:681) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1334) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1298) > at ru.kontur.vostok.hercules.sink.Sink.commit(Sink.java:156) > at ru.kontur.vostok.hercules.sink.SimpleSink.run(SimpleSink.java:104) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {noformat} > After that, most of them successfully rejoined to the group with generation > 10699: > {noformat} > 2019-03-10T03:16:39.208Z [pool-4-thread-13] INFO > o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-13, > groupId=hercules.sink.elastic.legacy_logs_elk_c2] Successfully joined group > with generation 10699 > 2019-03-10T03:16:39.209Z [pool-4-thread-13] INFO > o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-13, > groupId=hercules.sink.elastic.legacy_logs_elk_c2] Setting newly assigned > partitions [legacy_logs_elk_c2-18] > ... > 2019-03-10T03:16:39.216Z [pool-4-thread-11] INFO > o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-11, > groupId=hercules.sink.elastic.legacy_logs_elk_c2] Successfully joined group > with generation 10699 > 2019-03-10T03:16:39.217Z [pool-4-thread-11] INFO > o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-11, > groupId=hercules.sink.elastic.legacy_logs_elk_c2] Setting newly assigned > partitions [legacy_logs_elk_c2-10, legacy_logs_elk_c2-11] > ... > 2019-03-10T03:16:39.218Z [pool-4-thread-15] INFO > o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-15, > groupId=hercules.sink.elastic.legacy_logs_elk_c2] Setting newly assigned > partitions [legacy_logs_elk_c2-24] > 2019-03-10T03:16:42.320Z [kafka-coordinator-heartbeat-thread | > hercules.sink.elastic.legacy_logs_elk_c2] INFO > o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-6, > groupId=hercules.sink.elastic.legacy_logs_elk_c2] Attempt to heartbeat failed > since group is rebalancing > 2019-03-10T03:16:42.320Z [kafka-coordinator-heartbeat-thread | > hercules.sink.elastic.legacy_logs_elk_c2] INFO > o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-5, > groupId=hercules.sink.elastic.legacy_logs_elk_c2] Attempt to heartbeat failed > since group is rebalancing > 2019-03-10T03:16:42.323Z [kafka-coordinator-heartbeat-thread | > hercules.sink.elastic.legacy_logs_elk_c2] INFO > o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-7, > groupId=hercules.sink.elastic.legacy_logs_elk_c2] Attempt to heartbeat failed > since group is rebalancing > 2019-03-10T03:17:13.235Z [pool-4-thread-4] INFO > o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-4, > groupId=hercules.sink.elastic.legacy_logs_elk_c2] Successfully joined group > with generation -1 > {noformat} > But one consumer (pool-4-thread-4) got strange generation -1 (see last log > record from above). > Further log records in attached log file. > Finally, 15 consumers successfully rejoined. But consumer with thread > {{pool-4-thread-4}} didn't rejoin: > {noformat} > 2019-03-10T03:17:13.355Z [pool-4-thread-4] ERROR > r.k.vostok.hercules.sink.SimpleSink - Unspecified exception has been acquired > java.lang.IllegalStateException: Coordinator selected invalid assignment > protocol: null > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:241) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:422) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:333) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1175) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154) > at ru.kontur.vostok.hercules.sink.Sink.poll(Sink.java:152) > at ru.kontur.vostok.hercules.sink.SimpleSink.run(SimpleSink.java:70) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > 2019-03-10T03:17:13.360Z [pool-4-thread-4] ERROR > r.k.vostok.hercules.sink.SimpleSink - Unspecified exception has been acquired > java.lang.IllegalStateException: Coordinator selected invalid assignment > protocol: null > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:241) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:422) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:333) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1175) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154) > at ru.kontur.vostok.hercules.sink.Sink.poll(Sink.java:152) > at ru.kontur.vostok.hercules.sink.SimpleSink.run(SimpleSink.java:70) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748)}} > {noformat} > It is important to note, that {{KafkaConsumer.coordinator.joinFuture}} is not > null and succeeded, but {{ConsumerCoordinator}} cannot perform > {{resetJoinGroupFuture()}} due to exception was thrown from > {{onJoinComplete()}}: > {code:java} > if (future.succeeded()) { > // Duplicate the buffer in case `onJoinComplete` does not > complete and needs to be retried. > ByteBuffer memberAssignment = future.value().duplicate(); > onJoinComplete(generation.generationId, generation.memberId, > generation.protocol, memberAssignment); > // We reset the join group future only after the completion > callback returns. This ensures > // that if the callback is woken up, we will retry it on the > next joinGroupIfNeeded. > resetJoinGroupFuture(); > needsJoinPrepare = true; > } > {code} > If I understood correctly, the generation was changed to {{NO_GENERATION}} in > another thread by one of CoordinatorResponseHandlers. > > [^consumer-rejoin-fail.log] -- This message was sent by Atlassian Jira (v8.3.4#803005)