[ 
https://issues.apache.org/jira/browse/KAFKA-13840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17557759#comment-17557759
 ] 

Luke Chen commented on KAFKA-13840:
-----------------------------------

Thanks [~guozhang] ! Make sense.

> KafkaConsumer is unable to recover connection to group coordinator after 
> commitOffsetsAsync exception
> -----------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13840
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13840
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>    Affects Versions: 2.6.1, 3.1.0, 2.7.2, 2.8.1, 3.0.0
>            Reporter: Kyle R Stehbens
>            Assignee: Luke Chen
>            Priority: Major
>
> Hi, I've discovered an issue with the java Kafka client (consumer) whereby a 
> timeout or any other retry-able exception triggered during an async offset 
> commit, renders the client unable to recover its group co-coordinator and 
> leaves the client in a broken state.
>  
> I first encountered this using v2.8.1 of the java client, and after going 
> through the code base for all versions of the client, have found it affects 
> all versions of the client from 2.6.1 onward.
> I also confirmed that by rolling back to 2.5.1, the issue is not present.
>  
> The issue stems from changes to how the FindCoordinatorResponseHandler in 
> 2.5.1 used to call clearFindCoordinatorFuture(); on both success and failure 
> here:
> [https://github.com/apache/kafka/blob/0efa8fb0f4c73d92b6e55a112fa45417a67a7dc2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L783]
>  
> In all future version of the client this call is not made:
> [https://github.com/apache/kafka/blob/839b886f9b732b151e1faeace7303c80641c08c4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L838]
>  
> What this results in, is when the KafkaConsumer makes a call to 
> coordinator.commitOffsetsAsync(...), if an error occurs such that the 
> coordinator is unavailable here:
> [https://github.com/apache/kafka/blob/c5077c679c372589215a1b58ca84360c683aa6e8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1007]
>  
> then the client will try call:
> [https://github.com/apache/kafka/blob/c5077c679c372589215a1b58ca84360c683aa6e8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1017]
> However this will never be able to succeed as it perpetually returns a 
> reference to a failed future: findCoordinatorFuture that is never cleared out.
>  
> This manifests in all future calls to commitOffsetsAsync() throwing a 
> "coordinator unavailable" exception forever going forward after any 
> retry-able exception causes the coordinator to close. 
> Note we discovered this when we upgraded the kafka client in our Flink 
> consumers from 2.4.1 to 2.8.1 and subsequently needed to downgrade the 
> client. We noticed this occurring in our non-flink java consumers too running 
> 3.x client versions.
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to