mumrah commented on a change in pull request #10398: URL: https://github.com/apache/kafka/pull/10398#discussion_r604267269
########## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ########## @@ -5325,12 +5326,23 @@ public void testClientSideTimeoutAfterFailureToSend() throws Exception { for (Node node : cluster.nodes()) { env.kafkaClient().delayReady(node, 100); } + + // We use a countdown latch to ensure that we get to the first + // call to `ready` before we increment the time below to trigger + // the disconnect. + CountDownLatch readyLatch = new CountDownLatch(2); + env.kafkaClient().setDisconnectFuture(disconnectFuture); - final ListTopicsResult result = env.adminClient().listTopics(); + env.kafkaClient().setReadyCallback(node -> readyLatch.countDown()); env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + + final ListTopicsResult result = env.adminClient().listTopics(); + + readyLatch.await(); Review comment: This could block forever. Should we use the timeout version of `await` so we don't get stuck (in failure cases)? ########## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ########## @@ -5353,22 +5365,20 @@ public void testClientSideTimeoutAfterFailureToReceiveResponse() throws Exceptio try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster, newStrMap(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1", AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "100000", - AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "1"))) { + AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "0"))) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().setDisconnectFuture(disconnectFuture); final ListTopicsResult result = env.adminClient().listTopics(); - while (true) { + TestUtils.waitForCondition(() -> { time.sleep(1); - try { - disconnectFuture.get(1, TimeUnit.MICROSECONDS); - break; - } catch (java.util.concurrent.TimeoutException e) { - } - } + return disconnectFuture.isDone(); + }, 5000, 1, () -> "Timed out waiting for expected disconnect"); + assertFalse(disconnectFuture.isCompletedExceptionally()); assertFalse(result.future.isDone()); - env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); - log.debug("Advancing clock by 10 ms to trigger client-side retry."); - time.sleep(10); + TestUtils.waitForCondition(() -> { + return env.kafkaClient().hasInFlightRequests(); Review comment: nit: can use method reference here i think -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org