cmccabe commented on a change in pull request #10281: URL: https://github.com/apache/kafka/pull/10281#discussion_r597952673
########## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ########## @@ -5307,6 +5308,71 @@ public void testUnregisterBrokerTimeoutMaxWait() { } } + /** + * Test that if the client can obtain a node assignment, but can't send to the given + * node, it will disconnect and try a different node. + */ + @Test + public void testClientSideTimeoutAfterFailureToSend() throws Exception { + Cluster cluster = mockCluster(3, 0); + CompletableFuture<String> disconnectFuture = new CompletableFuture<>(); + MockTime time = new MockTime(); + try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster, + newStrMap(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1", + AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "100000", + AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "1"))) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + for (Node node : cluster.nodes()) { + env.kafkaClient().delayReady(node, 100); + } + env.kafkaClient().setDisconnectFuture(disconnectFuture); + final ListTopicsResult result = env.adminClient().listTopics(); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + log.debug("Advancing clock by 25 ms to trigger client-side disconnect."); + time.sleep(25); + disconnectFuture.get(); + log.debug("Enabling nodes to send requests again."); + for (Node node : cluster.nodes()) { + env.kafkaClient().delayReady(node, 0); + } + time.sleep(5); + log.info("Waiting for result."); + assertEquals(0, result.listings().get().size()); + } + } + + /** + * Test that if the client can send to a node, but doesn't receive a response, it will + * disconnect and try a different node. + */ + @Test + public void testClientSideTimeoutAfterFailureToReceiveResponse() throws Exception { + Cluster cluster = mockCluster(3, 0); + CompletableFuture<String> disconnectFuture = new CompletableFuture<>(); + MockTime time = new MockTime(); + try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster, + newStrMap(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1", + AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "100000", + AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "1"))) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().setDisconnectFuture(disconnectFuture); + final ListTopicsResult result = env.adminClient().listTopics(); + while (true) { + time.sleep(1); + try { + disconnectFuture.get(1, TimeUnit.MICROSECONDS); Review comment: In this case, `isDone` is not supposed to be true until we advance the clock (so that a new node can be tried, after the previous one disconnected) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org