[ https://issues.apache.org/jira/browse/KAFKA-8420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17551082#comment-17551082 ]
Matthew de Detrich commented on KAFKA-8420: ------------------------------------------- So in order to work on this issue I tried making a test to replicate what you are describing and I came across some interesting, the test that I wrote looks like this {code:java} @Test public void gracefulHandlingSwitchSubscribeToManualAssign() { ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null); ConsumerRecords<String, String> initialConsumerRecords = consumer.poll(Duration.ofMillis(0)); assertTrue(initialConsumerRecords.isEmpty()); consumer.unsubscribe(); consumer.assign(singleton(tp0)); client.prepareResponseFrom(syncGroupResponse(singletonList(tp0), Errors.NONE), coordinator); consumer.poll(Duration.ofSeconds(1)); } {code} The problem that I am currently getting is that the {{consumer.poll(Duration.ofSeconds(1));}} is causing an infinite loop/deadlock (note that originally I had a {{consumer.poll(Duration.ofSeconds(0));}} however this caused the {{consumer.poll}} method to short circuit due to {{timer.notExpired()}} never executing and hence just immediately returning an {{ConsumerRecords.empty();}} without the consumer ever sending a request to trigger a sync-group resonse). After spending some time debugging this is the piece of code that is not terminating [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L250-L251]. What I am finding highly confusing if the fact that the {{lookupCoordinator()}} does actually complete (in this case it immediately returns {{findCoordinatorFuture}} at [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L294]) however for some reason the loop at [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L215] never terminates. It doesn't appear to detect that the future has finished which I believe to be the case? I am not sure if this is related to what you mentioned, i.e. {quote} In the worst case (i.e. leader keep sending incompatible assignment), this would case the consumer to fall into endless re-joins. {quote} but it looks like that I have either found something else or I am barking up the wrong tree? > Graceful handling when consumer switches from subscribe to manual assign > ------------------------------------------------------------------------ > > Key: KAFKA-8420 > URL: https://issues.apache.org/jira/browse/KAFKA-8420 > Project: Kafka > Issue Type: Improvement > Components: consumer > Reporter: Guozhang Wang > Assignee: Matthew de Detrich > Priority: Major > > Today if a consumer switches between subscribe (and hence relies on group > rebalance to get assignment) and manual assign, it may cause unnecessary > rebalances. For example: > 1. consumer.subscribe(); > 2. consumer.poll(); // join-group request sent, returns empty because > poll timeout > 3. consumer.unsubscribe(); > 4. consumer.assign(..); > 5. consumer.poll(); // sync-group request received, and the assigned > partitions does not match the current subscription-state. In this case it > will tries to re-join which is not necessary. > In the worst case (i.e. leader keep sending incompatible assignment), this > would case the consumer to fall into endless re-joins. > Although it is not a very common usage scenario, it still worth being better > handled than the status-quo. -- This message was sent by Atlassian Jira (v8.20.7#820007)