[ 
https://issues.apache.org/jira/browse/KAFKA-10827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17246273#comment-17246273
 ] 

Jaebin Yoon commented on KAFKA-10827:
-------------------------------------

[~Jack-Lee] I don't think your patch would work for our case. The connection 
state is actually in failed state. the problem is more on 
client.connectionDelay() always returns 0.  


{code:java}
    public long connectionDelay(String id, long now) {
        NodeConnectionState state = nodeState.get(id);
        if (state == null) return 0;
        if (state.state.isDisconnected()) {
            long timeWaited = now - state.lastConnectAttemptMs;
            return Math.max(state.reconnectBackoffMs - timeWaited, 0);
{code}
timeWaited is always greater than state.reconnectBackoffMs in our case because 
of infrequent request. so connectionDelay returns 0 making isUnavailable false.

> Consumer group coordinator node never gets updated for manual partition 
> assignment with infrequent requests
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-10827
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10827
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 2.2.0
>            Reporter: Jaebin Yoon
>            Assignee: lqjacklee
>            Priority: Major
>         Attachments: KAFKA-10827.patch
>
>
> We've run into a situation where the coordinator node in the consumer never 
> gets updated with the new coordinator when the coordinator broker gets 
> replaced with a new instance. Once the consumer gets into this mode, the 
> consumer keeps trying to connect to the old coordinator and never recovers 
> unless restarted.
> This happens when the consumer uses manual partition assignment and commits 
> offsets very infrequently (every 5 minutes) and the coordinator broker is not 
> reachable (ip address, hostname are gone in a cloud environment).
> The exception the consumer keeps getting isĀ 
> {code:java}
> Offset commit failed with a retriable exception. You should retry committing 
> the latest consumed offsets. Caused by: 
> org.apache.kafka.common.errors.TimeoutException: Failed to send request after 
> 120000 ms.
> {code}
> We could see a bunch of *SYN_SENT* tcp state from the consumer app to the old 
> hostname in this error condition.
> In the current manual partition assignment scenario, the only way for the 
> coordinator to gets updated is through checkAndGetCoordinator in 
> AbstractCoordinator but this gets called only in committing offsets every 5 
> minutes in our case.  
> The current logic of checkAndGetCoordinator is using 
> ConsumerNetworkClient.isUnavailable but it returns false unless the Network 
> client is in reconnect backoff time, which is currently configured with 
> default values (reconnect.backoff.ms (50), reconnect.backoff.max.ms (1000) 
> while request.timeout.ms is 120000.  In this scenario, 
> ConsumerNetworkClient.isUnavailable for the old coordinator node always 
> returns false, resulting in checkAndGetCoordinator keeps the old coordinator 
> node forever.
> What the consumer does essentially, in this case, is that it sends one Commit 
> offsets request every 5 min to the coordinator. And that request times out 
> and when the consumer calls checkAndGetCoordinator in 5 min, it returns old 
> coordinator again since the last attempt was more than 3 min ago (with 2 min 
> request.timeout.ms) and repeats this forever.
> The current implementation assumes that there are many requests to the 
> coordinator (normally through heartbeat thread, etc) to detect new 
> coordinator but with this infrequent request, it never gets out of old 
> coordinator.
> We had to restart the consumer to recover from this condition.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to