[ 
https://issues.apache.org/jira/browse/KAFKA-10827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17263707#comment-17263707
 ] 

Jaebin Yoon commented on KAFKA-10827:
-------------------------------------

[~guozhang] yeah that makes sense. And connection failures don't happen often 
usually and if it happens often, the consumer already has some performance 
issue so changing to use client.connectionFailed is safe, I think. Thanks for 
the reference!

> Consumer group coordinator node never gets updated for manual partition 
> assignment with infrequent requests
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-10827
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10827
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 2.2.0
>            Reporter: Jaebin Yoon
>            Assignee: lqjacklee
>            Priority: Major
>         Attachments: KAFKA-10827.patch
>
>
> We've run into a situation where the coordinator node in the consumer never 
> gets updated with the new coordinator when the coordinator broker gets 
> replaced with a new instance. Once the consumer gets into this mode, the 
> consumer keeps trying to connect to the old coordinator and never recovers 
> unless restarted.
> This happens when the consumer uses manual partition assignment and commits 
> offsets very infrequently (every 5 minutes) and the coordinator broker is not 
> reachable (ip address, hostname are gone in a cloud environment).
> The exception the consumer keeps getting isĀ 
> {code:java}
> Offset commit failed with a retriable exception. You should retry committing 
> the latest consumed offsets. Caused by: 
> org.apache.kafka.common.errors.TimeoutException: Failed to send request after 
> 120000 ms.
> {code}
> We could see a bunch of *SYN_SENT* tcp state from the consumer app to the old 
> hostname in this error condition.
> In the current manual partition assignment scenario, the only way for the 
> coordinator to gets updated is through checkAndGetCoordinator in 
> AbstractCoordinator but this gets called only in committing offsets every 5 
> minutes in our case.  
> The current logic of checkAndGetCoordinator is using 
> ConsumerNetworkClient.isUnavailable but it returns false unless the Network 
> client is in reconnect backoff time, which is currently configured with 
> default values (reconnect.backoff.ms (50), reconnect.backoff.max.ms (1000) 
> while request.timeout.ms is 120000.  In this scenario, 
> ConsumerNetworkClient.isUnavailable for the old coordinator node always 
> returns false, resulting in checkAndGetCoordinator keeps the old coordinator 
> node forever.
> What the consumer does essentially, in this case, is that it sends one Commit 
> offsets request every 5 min to the coordinator. And that request times out 
> and when the consumer calls checkAndGetCoordinator in 5 min, it returns old 
> coordinator again since the last attempt was more than 3 min ago (with 2 min 
> request.timeout.ms) and repeats this forever.
> The current implementation assumes that there are many requests to the 
> coordinator (normally through heartbeat thread, etc) to detect new 
> coordinator but with this infrequent request, it never gets out of old 
> coordinator.
> We had to restart the consumer to recover from this condition.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to