This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new 80b153d KAFKA-12948: Remove node from
ClusterConnectionStates.connectingNodes when node is removed (#10882)
80b153d is described below
commit 80b153dbba649d020999335b827e260417a97ae6
Author: Rajini Sivaram <[email protected]>
AuthorDate: Tue Jun 15 09:18:30 2021 +0100
KAFKA-12948: Remove node from ClusterConnectionStates.connectingNodes when
node is removed (#10882)
NetworkClient.poll() throws IllegalStateException when checking
isConnectionSetupTimeout if all nodes in
ClusterConnectionStates.connectingNodes aren't present in
ClusterConnectionStates.nodeState. This commit ensures that when we remove a
node from nodeState, we also remove from connectingNodes.
Reviewers: David Jacot <[email protected]>
---
.../kafka/clients/ClusterConnectionStates.java | 1 +
.../apache/kafka/clients/NetworkClientTest.java | 33 ++++++++++++++++++++++
2 files changed, 34 insertions(+)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index 16bf59ac..7a5dc93 100644
---
a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++
b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -389,6 +389,7 @@ final class ClusterConnectionStates {
*/
public void remove(String id) {
nodeState.remove(id);
+ connectingNodes.remove(id);
}
/**
diff --git
a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index de0e524..6dd2ed7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -1099,6 +1099,39 @@ public class NetworkClientTest {
assertEquals(2, mockHostResolver.resolutionCount());
}
+ @Test
+ public void testCloseConnectingNode() {
+ Cluster cluster = TestUtils.clusterWith(2);
+ Node node0 = cluster.nodeById(0);
+ Node node1 = cluster.nodeById(1);
+ client.ready(node0, time.milliseconds());
+ selector.serverConnectionBlocked(node0.idString());
+ client.poll(1, time.milliseconds());
+ client.close(node0.idString());
+
+ // Poll without any connections should return without exceptions
+ client.poll(0, time.milliseconds());
+ assertFalse(NetworkClientUtils.isReady(client, node0,
time.milliseconds()));
+ assertFalse(NetworkClientUtils.isReady(client, node1,
time.milliseconds()));
+
+ // Connection to new node should work
+ client.ready(node1, time.milliseconds());
+ ByteBuffer buffer =
ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.serialize(ApiKeys.API_VERSIONS,
ApiKeys.API_VERSIONS.latestVersion(), 0);
+ selector.delayedReceive(new DelayedReceive(node1.idString(), new
NetworkReceive(node1.idString(), buffer)));
+ while (!client.ready(node1, time.milliseconds()))
+ client.poll(1, time.milliseconds());
+ assertTrue(client.isReady(node1, time.milliseconds()));
+ selector.clear();
+
+ // New connection to node closed earlier should work
+ client.ready(node0, time.milliseconds());
+ buffer =
ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.serialize(ApiKeys.API_VERSIONS,
ApiKeys.API_VERSIONS.latestVersion(), 1);
+ selector.delayedReceive(new DelayedReceive(node0.idString(), new
NetworkReceive(node0.idString(), buffer)));
+ while (!client.ready(node0, time.milliseconds()))
+ client.poll(1, time.milliseconds());
+ assertTrue(client.isReady(node0, time.milliseconds()));
+ }
+
private RequestHeader parseHeader(ByteBuffer buffer) {
buffer.getInt(); // skip size
return RequestHeader.parse(buffer.slice());