[ https://issues.apache.org/jira/browse/KAFKA-12971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17366011#comment-17366011 ]
GEORGE LI commented on KAFKA-12971: ----------------------------------- This issue is fixed in the {{1.1.x}} kafka client by back porting the fix in KAFKA-7890 from 2.x to invalidate the cache when the hostname is changed. > Kakfa 1.1.x clients cache broker hostnames, client stuck when host is > swapped for the same broker.id > ----------------------------------------------------------------------------------------------------- > > Key: KAFKA-12971 > URL: https://issues.apache.org/jira/browse/KAFKA-12971 > Project: Kafka > Issue Type: Bug > Components: clients > Affects Versions: 1.1.0, 1.1.1, 1.1.2 > Reporter: GEORGE LI > Priority: Major > Fix For: 2.1.2 > > > There was an upgrade of kafka-client version from 0.11 to 1.1.x to fix a bug > in 0.11 with too frequent consumer offset commits. Due to the Flink version, > it can be directly using latest 2.x kafka-client version. > {code} > Error sending fetch request (sessionId=178328175, epoch=INITIAL) to node 425: > org.apache.kafka.common.errors.DisconnectException. > {code} > some consumers were stuck with above messages with broker.id 425 had hardware > failures and got swapped with a different hostname. > Comparing the {{ClusterConnectionStates.connecting()}} of the 3 versions: > 0.11.0.3: > {code} > public void connecting(String id, long now, String host, ClientDnsLookup > clientDnsLookup) { > nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, > now, this.reconnectBackoffInitMs, host, clientDnsLookup)); > } > {code} > 1.1.x: > {code} > public void connecting(String id, long now, String host, ClientDnsLookup > clientDnsLookup) { > if (nodeState.containsKey(id)) { > NodeConnectionState connectionState = nodeState.get(id); > connectionState.lastConnectAttemptMs = now; > connectionState.state = ConnectionState.CONNECTING; > // Move to next resolved address, or if addresses are exhausted, > mark node to be re-resolved > connectionState.moveToNextAddress(); > } else { > nodeState.put(id, new > NodeConnectionState(ConnectionState.CONNECTING, now, > this.reconnectBackoffInitMs, host, clientDnsLookup)); > } > } > {code} > 2.2.x: > {code} > public void connecting(String id, long now, String host, ClientDnsLookup > clientDnsLookup) { > NodeConnectionState connectionState = nodeState.get(id); > if (connectionState != null && connectionState.host().equals(host)) { > connectionState.lastConnectAttemptMs = now; > connectionState.state = ConnectionState.CONNECTING; > // Move to next resolved address, or if addresses are exhausted, > mark node to be re-resolved > connectionState.moveToNextAddress(); > return; > } else if (connectionState != null) { > log.info("Hostname for node {} changed from {} to {}.", id, > connectionState.host(), host); > } > // Create a new NodeConnectionState if nodeState does not already > contain one > // for the specified id or if the hostname associated with the node > id changed. > nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, > now, > this.reconnectBackoffInitMs, host, clientDnsLookup)); > } > {code} > From above, the {{0.11.0.3}} is just putting the node to the NodeState > HashMap to retry with update host. > In {{1.1.x}}, it adds a logic of "caching". {{if > (nodeState.containsKey(id))}}, However, if the HOSTNAME of the broker.id is > swapped/changed, it never gets to the else block to update the NodeState with > the new hostname. > In {{2.2.x}}, it adds an additional check {{if (connectionState != null && > connectionState.host().equals(host))}}, if the Hostname changed, then called > {{nodeState.put()}} to update the host. > So from above, it looks like the 1.1.x caching logic introduced a bug of not > updating the nodeState()'s host when that is changed (e..g host failure, swap > with a different hostname, but use the same broker.id). -- This message was sent by Atlassian Jira (v8.3.4#803005)