[ 
https://issues.apache.org/jira/browse/KAFKA-13840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17575186#comment-17575186
 ] 

Luke Chen commented on KAFKA-13840:
-----------------------------------

[~kyle.stehbens] , I've tested with Kafka 3.2.1 and it works well. Here are 
some logs:
{code:java}
# consumer tried to commit the offset, and failed with "not_coordinator"
[info] 17:40:38.012 WARN  [Source Data Fetcher for Source: input-kafka-source 
-> Sink: output-stdout-sink (1/1)#0] o.a.k.c.c.i.ConsumerCoordinator - 
[Consumer clientId=flink-kafka-testjob-0, groupId=flink-kafka-testjob] Offset 
commit failed on partition test-topic-1 at offset 21957: This is not the 
correct coordinator.
[info] 17:40:38.012 INFO  [Source Data Fetcher for Source: input-kafka-source 
-> Sink: output-stdout-sink (1/1)#0] o.a.k.c.c.i.AbstractCoordinator - 
[Consumer clientId=flink-kafka-testjob-0, groupId=flink-kafka-testjob] Group 
coordinator localhost:9093 (id: 2147483644 rack: null) is unavailable or 
invalid due to cause: error response NOT_COORDINATOR.isDisconnected: false. 
Rediscovery will be attempted.
[info] 17:40:38.012 WARN  [Source Data Fetcher for Source: input-kafka-source 
-> Sink: output-stdout-sink (1/1)#0] o.a.f.c.k.s.reader.KafkaSourceReader - 
Failed to commit consumer offsets for checkpoint 16
[info] org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset 
commit failed with a retriable exception. You should retry committing the 
latest consumed offsets.
[info] Caused by: org.apache.kafka.common.errors.NotCoordinatorException: This 
is not the correct coordinator.
.....

[info] 17:40:42.942 INFO  [Checkpoint Timer] o.a.f.r.c.CheckpointCoordinator - 
Triggering checkpoint 17 (type=CheckpointType{name='Checkpoint', 
sharingFilesStrategy=FORWARD_BACKWARD}) @ 1659606042941 for job 
7b3ad6191a558f6d5b4276c1ebaeba39.

...
# we did send a FindCoordinator request and the offset commit succeeds in next 
try.
[info] 17:40:43.022 DEBUG [Source Data Fetcher for Source: input-kafka-source 
-> Sink: output-stdout-sink (1/1)#0] o.a.k.clients.consumer.KafkaConsumer - 
[Consumer clientId=flink-kafka-testjob-0, groupId=flink-kafka-testjob] 
Committing offsets: {test-topic-2=OffsetAndMetadata{offset=21631, 
leaderEpoch=null, metadata=''}, test-topic-1=OffsetAndMetadata{offset=21983, 
leaderEpoch=null, metadata=''}}
[info] 17:40:43.022 DEBUG [Source Data Fetcher for Source: input-kafka-source 
-> Sink: output-stdout-sink (1/1)#0] o.a.k.c.c.i.AbstractCoordinator - 
[Consumer clientId=flink-kafka-testjob-0, groupId=flink-kafka-testjob] Sending 
FindCoordinator request to broker localhost:9092 (id: 2 rack: null)
[info] 23f509e5-48a8-461e-b021-f4092edb00cc
[info] 17:40:43.023 DEBUG [Source Data Fetcher for Source: input-kafka-source 
-> Sink: output-stdout-sink (1/1)#0] o.apache.kafka.clients.NetworkClient - 
[Consumer clientId=flink-kafka-testjob-0, groupId=flink-kafka-testjob] Sending 
FIND_COORDINATOR request with header RequestHeader(apiKey=FIND_COORDINATOR, 
apiVersion=3, clientId=flink-kafka-testjob-0, correlationId=1244) and timeout 
30000 to node 2: FindCoordinatorRequestData(key='flink-kafka-testjob', 
keyType=0){code}
 

 

And as [~guozhang] said, in Flink, we did commitAsync in a period of time:

[https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L235]

 

And after our hotfix (which is included in v3.2.1/v3.3.0), we will clear 
previous find coordinator future, and send another request to find coordinator, 
if coordinator is unknown. It should fix this issue. Could you please try it 
again with v3.2.1 (just released)?

Thank you.

 

cc [~martijnvisser]

> KafkaConsumer is unable to recover connection to group coordinator after 
> commitOffsetsAsync exception
> -----------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13840
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13840
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>    Affects Versions: 2.6.1, 3.1.0, 2.7.2, 2.8.1, 3.0.0
>            Reporter: Kyle R Stehbens
>            Assignee: Luke Chen
>            Priority: Critical
>
> Hi, I've discovered an issue with the java Kafka client (consumer) whereby a 
> timeout or any other retry-able exception triggered during an async offset 
> commit, renders the client unable to recover its group co-coordinator and 
> leaves the client in a broken state.
>  
> I first encountered this using v2.8.1 of the java client, and after going 
> through the code base for all versions of the client, have found it affects 
> all versions of the client from 2.6.1 onward.
> I also confirmed that by rolling back to 2.5.1, the issue is not present.
>  
> The issue stems from changes to how the FindCoordinatorResponseHandler in 
> 2.5.1 used to call clearFindCoordinatorFuture(); on both success and failure 
> here:
> [https://github.com/apache/kafka/blob/0efa8fb0f4c73d92b6e55a112fa45417a67a7dc2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L783]
>  
> In all future version of the client this call is not made:
> [https://github.com/apache/kafka/blob/839b886f9b732b151e1faeace7303c80641c08c4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L838]
>  
> What this results in, is when the KafkaConsumer makes a call to 
> coordinator.commitOffsetsAsync(...), if an error occurs such that the 
> coordinator is unavailable here:
> [https://github.com/apache/kafka/blob/c5077c679c372589215a1b58ca84360c683aa6e8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1007]
>  
> then the client will try call:
> [https://github.com/apache/kafka/blob/c5077c679c372589215a1b58ca84360c683aa6e8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1017]
> However this will never be able to succeed as it perpetually returns a 
> reference to a failed future: findCoordinatorFuture that is never cleared out.
>  
> This manifests in all future calls to commitOffsetsAsync() throwing a 
> "coordinator unavailable" exception forever going forward after any 
> retry-able exception causes the coordinator to close. 
> Note we discovered this when we upgraded the kafka client in our Flink 
> consumers from 2.4.1 to 2.8.1 and subsequently needed to downgrade the 
> client. We noticed this occurring in our non-flink java consumers too running 
> 3.x client versions.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to