[ 
https://issues.apache.org/jira/browse/KAFKA-8420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17551082#comment-17551082
 ] 

Matthew de Detrich commented on KAFKA-8420:
-------------------------------------------

So in order to work on this issue I tried making a test to replicate what you 
are describing and I came across some interesting, the test that I wrote looks 
like this


{code:java}
    @Test
    public void gracefulHandlingSwitchSubscribeToManualAssign() {
        ConsumerMetadata metadata = createMetadata(subscription);
        MockClient client = new MockClient(time, metadata);

        initMetadata(client, Collections.singletonMap(topic, 1));
        Node node = metadata.fetch().nodes().get(0);

        KafkaConsumer<String, String> consumer = newConsumer(time, client, 
subscription, metadata, assignor, true, groupInstanceId);

        consumer.subscribe(singleton(topic), 
getConsumerRebalanceListener(consumer));
        prepareRebalance(client, node, singleton(topic), assignor, 
singletonList(tp0), null);


        ConsumerRecords<String, String> initialConsumerRecords = 
consumer.poll(Duration.ofMillis(0));
        assertTrue(initialConsumerRecords.isEmpty());

        consumer.unsubscribe();

        consumer.assign(singleton(tp0));
        client.prepareResponseFrom(syncGroupResponse(singletonList(tp0), 
Errors.NONE), coordinator);
        consumer.poll(Duration.ofSeconds(1));
    } {code}
The problem that I am currently getting is that the 
{{consumer.poll(Duration.ofSeconds(1));}} is causing an infinite loop/deadlock 
(note that originally I had a {{consumer.poll(Duration.ofSeconds(0));}} however 
this caused the {{consumer.poll}} method to short circuit due to 
{{timer.notExpired()}} never executing and hence just immediately returning an 
{{ConsumerRecords.empty();}} without the consumer ever sending a request to 
trigger a sync-group resonse).

After spending some time debugging this is the piece of code that is not 
terminating 
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L250-L251].
 What I am finding highly confusing if the fact that the 
{{lookupCoordinator()}} does actually complete (in this case it immediately 
returns {{findCoordinatorFuture}} at 
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L294])
 however for some reason the loop at 
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L215]
 never terminates. It doesn't appear to detect that the future has finished 
which I believe to be the case? I am not sure if this is related to what you 
mentioned, i.e. 
{quote}
In the worst case (i.e. leader keep sending incompatible assignment), this 
would case the consumer to fall into endless re-joins.
{quote}
but it looks like that I have either found something else or I am barking up 
the wrong tree?

> Graceful handling when consumer switches from subscribe to manual assign
> ------------------------------------------------------------------------
>
>                 Key: KAFKA-8420
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8420
>             Project: Kafka
>          Issue Type: Improvement
>          Components: consumer
>            Reporter: Guozhang Wang
>            Assignee: Matthew de Detrich
>            Priority: Major
>
> Today if a consumer switches between subscribe (and hence relies on group 
> rebalance to get assignment) and manual assign, it may cause unnecessary 
> rebalances. For example:
> 1. consumer.subscribe();
> 2. consumer.poll();     // join-group request sent, returns empty because 
> poll timeout
> 3. consumer.unsubscribe();
> 4. consumer.assign(..);
> 5. consumer.poll();     // sync-group request received, and the assigned 
> partitions does not match the current subscription-state. In this case it 
> will tries to re-join which is not necessary.
> In the worst case (i.e. leader keep sending incompatible assignment), this 
> would case the consumer to fall into endless re-joins.
> Although it is not a very common usage scenario, it still worth being better 
> handled than the status-quo.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to