[ https://issues.apache.org/jira/browse/KAFKA-13840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17575189#comment-17575189 ]
Luke Chen commented on KAFKA-13840: ----------------------------------- I should make it clear, I tested with the reproducer in https://issues.apache.org/jira/browse/FLINK-28060. > KafkaConsumer is unable to recover connection to group coordinator after > commitOffsetsAsync exception > ----------------------------------------------------------------------------------------------------- > > Key: KAFKA-13840 > URL: https://issues.apache.org/jira/browse/KAFKA-13840 > Project: Kafka > Issue Type: Bug > Components: clients, consumer > Affects Versions: 2.6.1, 3.1.0, 2.7.2, 2.8.1, 3.0.0 > Reporter: Kyle R Stehbens > Assignee: Luke Chen > Priority: Critical > > Hi, I've discovered an issue with the java Kafka client (consumer) whereby a > timeout or any other retry-able exception triggered during an async offset > commit, renders the client unable to recover its group co-coordinator and > leaves the client in a broken state. > > I first encountered this using v2.8.1 of the java client, and after going > through the code base for all versions of the client, have found it affects > all versions of the client from 2.6.1 onward. > I also confirmed that by rolling back to 2.5.1, the issue is not present. > > The issue stems from changes to how the FindCoordinatorResponseHandler in > 2.5.1 used to call clearFindCoordinatorFuture(); on both success and failure > here: > [https://github.com/apache/kafka/blob/0efa8fb0f4c73d92b6e55a112fa45417a67a7dc2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L783] > > In all future version of the client this call is not made: > [https://github.com/apache/kafka/blob/839b886f9b732b151e1faeace7303c80641c08c4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L838] > > What this results in, is when the KafkaConsumer makes a call to > coordinator.commitOffsetsAsync(...), if an error occurs such that the > coordinator is unavailable here: > [https://github.com/apache/kafka/blob/c5077c679c372589215a1b58ca84360c683aa6e8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1007] > > then the client will try call: > [https://github.com/apache/kafka/blob/c5077c679c372589215a1b58ca84360c683aa6e8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1017] > However this will never be able to succeed as it perpetually returns a > reference to a failed future: findCoordinatorFuture that is never cleared out. > > This manifests in all future calls to commitOffsetsAsync() throwing a > "coordinator unavailable" exception forever going forward after any > retry-able exception causes the coordinator to close. > Note we discovered this when we upgraded the kafka client in our Flink > consumers from 2.4.1 to 2.8.1 and subsequently needed to downgrade the > client. We noticed this occurring in our non-flink java consumers too running > 3.x client versions. > -- This message was sent by Atlassian Jira (v8.20.10#820010)