This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new 3878165  KAFKA-12948: Remove node from 
ClusterConnectionStates.connectingNodes when node is removed (#10882)
3878165 is described below

commit 3878165a066791bf850030423724b4342da98985
Author: Rajini Sivaram <[email protected]>
AuthorDate: Tue Jun 15 09:18:30 2021 +0100

    KAFKA-12948: Remove node from ClusterConnectionStates.connectingNodes when 
node is removed (#10882)
    
    NetworkClient.poll() throws IllegalStateException when checking 
isConnectionSetupTimeout if all nodes in 
ClusterConnectionStates.connectingNodes aren't present in 
ClusterConnectionStates.nodeState. This commit ensures that when we remove a 
node from nodeState, we also remove from connectingNodes.
    
    Reviewers: David Jacot <[email protected]>
---
 .../kafka/clients/ClusterConnectionStates.java     |  1 +
 .../apache/kafka/clients/NetworkClientTest.java    | 33 ++++++++++++++++++++++
 2 files changed, 34 insertions(+)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index e00494c..20de2ec 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -389,6 +389,7 @@ final class ClusterConnectionStates {
      */
     public void remove(String id) {
         nodeState.remove(id);
+        connectingNodes.remove(id);
     }
 
     /**
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index b13f854..dac6424 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -1064,6 +1064,39 @@ public class NetworkClientTest {
         assertEquals(2, mockHostResolver.resolutionCount());
     }
 
+    @Test
+    public void testCloseConnectingNode() {
+        Cluster cluster = TestUtils.clusterWith(2);
+        Node node0 = cluster.nodeById(0);
+        Node node1 = cluster.nodeById(1);
+        client.ready(node0, time.milliseconds());
+        selector.serverConnectionBlocked(node0.idString());
+        client.poll(1, time.milliseconds());
+        client.close(node0.idString());
+
+        // Poll without any connections should return without exceptions
+        client.poll(0, time.milliseconds());
+        assertFalse(NetworkClientUtils.isReady(client, node0, 
time.milliseconds()));
+        assertFalse(NetworkClientUtils.isReady(client, node1, 
time.milliseconds()));
+
+        // Connection to new node should work
+        client.ready(node1, time.milliseconds());
+        ByteBuffer buffer = 
RequestTestUtils.serializeResponseWithHeader(defaultApiVersionsResponse(), 
ApiKeys.API_VERSIONS.latestVersion(), 0);
+        selector.delayedReceive(new DelayedReceive(node1.idString(), new 
NetworkReceive(node1.idString(), buffer)));
+        while (!client.ready(node1, time.milliseconds()))
+            client.poll(1, time.milliseconds());
+        assertTrue(client.isReady(node1, time.milliseconds()));
+        selector.clear();
+
+        // New connection to node closed earlier should work
+        client.ready(node0, time.milliseconds());
+        buffer = 
RequestTestUtils.serializeResponseWithHeader(defaultApiVersionsResponse(), 
ApiKeys.API_VERSIONS.latestVersion(), 1);
+        selector.delayedReceive(new DelayedReceive(node0.idString(), new 
NetworkReceive(node0.idString(), buffer)));
+        while (!client.ready(node0, time.milliseconds()))
+            client.poll(1, time.milliseconds());
+        assertTrue(client.isReady(node0, time.milliseconds()));
+    }
+
     private RequestHeader parseHeader(ByteBuffer buffer) {
         buffer.getInt(); // skip size
         return RequestHeader.parse(buffer.slice());

Reply via email to