cmccabe commented on a change in pull request #10281:
URL: https://github.com/apache/kafka/pull/10281#discussion_r597952673



##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -5307,6 +5308,71 @@ public void testUnregisterBrokerTimeoutMaxWait() {
         }
     }
 
+    /**
+     * Test that if the client can obtain a node assignment, but can't send to 
the given
+     * node, it will disconnect and try a different node.
+     */
+    @Test
+    public void testClientSideTimeoutAfterFailureToSend() throws Exception {
+        Cluster cluster = mockCluster(3, 0);
+        CompletableFuture<String> disconnectFuture = new CompletableFuture<>();
+        MockTime time = new MockTime();
+        try (final AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(time, cluster,
+                newStrMap(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1",
+                          AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 
"100000",
+                          AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "1"))) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            for (Node node : cluster.nodes()) {
+                env.kafkaClient().delayReady(node, 100);
+            }
+            env.kafkaClient().setDisconnectFuture(disconnectFuture);
+            final ListTopicsResult result = env.adminClient().listTopics();
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+            log.debug("Advancing clock by 25 ms to trigger client-side 
disconnect.");
+            time.sleep(25);
+            disconnectFuture.get();
+            log.debug("Enabling nodes to send requests again.");
+            for (Node node : cluster.nodes()) {
+                env.kafkaClient().delayReady(node, 0);
+            }
+            time.sleep(5);
+            log.info("Waiting for result.");
+            assertEquals(0, result.listings().get().size());
+        }
+    }
+
+    /**
+     * Test that if the client can send to a node, but doesn't receive a 
response, it will
+     * disconnect and try a different node.
+     */
+    @Test
+    public void testClientSideTimeoutAfterFailureToReceiveResponse() throws 
Exception {
+        Cluster cluster = mockCluster(3, 0);
+        CompletableFuture<String> disconnectFuture = new CompletableFuture<>();
+        MockTime time = new MockTime();
+        try (final AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(time, cluster,
+            newStrMap(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1",
+                AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "100000",
+                AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "1"))) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().setDisconnectFuture(disconnectFuture);
+            final ListTopicsResult result = env.adminClient().listTopics();
+            while (true) {
+                time.sleep(1);
+                try {
+                    disconnectFuture.get(1, TimeUnit.MICROSECONDS);

Review comment:
       In this case, `isDone` is not supposed to be true until we advance the 
clock (so that a new node can be tried, after the previous one disconnected)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to