mumrah commented on a change in pull request #10398:
URL: https://github.com/apache/kafka/pull/10398#discussion_r604267269



##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -5325,12 +5326,23 @@ public void testClientSideTimeoutAfterFailureToSend() 
throws Exception {
             for (Node node : cluster.nodes()) {
                 env.kafkaClient().delayReady(node, 100);
             }
+
+            // We use a countdown latch to ensure that we get to the first
+            // call to `ready` before we increment the time below to trigger
+            // the disconnect.
+            CountDownLatch readyLatch = new CountDownLatch(2);
+
             env.kafkaClient().setDisconnectFuture(disconnectFuture);
-            final ListTopicsResult result = env.adminClient().listTopics();
+            env.kafkaClient().setReadyCallback(node -> readyLatch.countDown());
             env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+
+            final ListTopicsResult result = env.adminClient().listTopics();
+
+            readyLatch.await();

Review comment:
       This could block forever. Should we use the timeout version of `await` 
so we don't get stuck (in failure cases)?

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -5353,22 +5365,20 @@ public void 
testClientSideTimeoutAfterFailureToReceiveResponse() throws Exceptio
         try (final AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(time, cluster,
             newStrMap(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1",
                 AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "100000",
-                AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "1"))) {
+                AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "0"))) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
             env.kafkaClient().setDisconnectFuture(disconnectFuture);
             final ListTopicsResult result = env.adminClient().listTopics();
-            while (true) {
+            TestUtils.waitForCondition(() -> {
                 time.sleep(1);
-                try {
-                    disconnectFuture.get(1, TimeUnit.MICROSECONDS);
-                    break;
-                } catch (java.util.concurrent.TimeoutException e) {
-                }
-            }
+                return disconnectFuture.isDone();
+            }, 5000, 1, () -> "Timed out waiting for expected disconnect");
+            assertFalse(disconnectFuture.isCompletedExceptionally());
             assertFalse(result.future.isDone());
-            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
-            log.debug("Advancing clock by 10 ms to trigger client-side 
retry.");
-            time.sleep(10);
+            TestUtils.waitForCondition(() -> {
+                return env.kafkaClient().hasInFlightRequests();

Review comment:
       nit: can use method reference here i think




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to