hachikuji commented on a change in pull request #10281: URL: https://github.com/apache/kafka/pull/10281#discussion_r596376827
########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -1089,29 +1106,63 @@ private long sendEligibleCalls(long now) { continue; } Node node = entry.getKey(); + if (!callsInFlight.getOrDefault(node.idString(), Collections.emptyList()).isEmpty()) { + log.trace("Still waiting for other calls to finish on node {}.", node); + nodeReadyDeadlines.remove(node); + continue; + } if (!client.ready(node, now)) { + Long deadline = nodeReadyDeadlines.get(node); + if (deadline != null) { + if (now >= deadline) { + log.info("Disconnecting from {} and revoking {} node assignment(s) " + + "because the node is taking too long to become ready.", + node.idString(), calls.size()); + transitionToPendingAndClearList(calls); + client.disconnect(node.idString()); + nodeReadyDeadlines.remove(node); + iter.remove(); + continue; + } + pollTimeout = Math.min(pollTimeout, deadline - now); Review comment: Might be a good idea to use `Math.max(0, deadline - now)` since system time is non-monotonic. ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -1367,15 +1414,15 @@ private void processRequests() { void enqueue(Call call, long now) { if (call.tries > maxRetries) { log.debug("Max retries {} for {} reached", maxRetries, call); - call.fail(time.milliseconds(), new TimeoutException()); + call.handleTimeoutFailure(time.milliseconds(), new TimeoutException()); Review comment: While we're here, can we set the exception message to indicate that max retries had been reached? ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -1089,29 +1106,61 @@ private long sendEligibleCalls(long now) { continue; } Node node = entry.getKey(); + if (!callsInFlight.getOrDefault(node.idString(), Collections.emptyList()).isEmpty()) { + log.trace("Still waiting for other calls to finish on node {}.", node); + nodeReadyDeadlines.remove(node); + continue; Review comment: Ok. I think we might as well simplify the type since we already have logic which assumes one inflight request. It doesn't feel like we're buying ourselves much in terms of future-proofing. ########## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ########## @@ -5307,6 +5308,71 @@ public void testUnregisterBrokerTimeoutMaxWait() { } } + /** + * Test that if the client can obtain a node assignment, but can't send to the given + * node, it will disconnect and try a different node. + */ + @Test + public void testClientSideTimeoutAfterFailureToSend() throws Exception { + Cluster cluster = mockCluster(3, 0); + CompletableFuture<String> disconnectFuture = new CompletableFuture<>(); + MockTime time = new MockTime(); + try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster, + newStrMap(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1", + AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "100000", + AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "1"))) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + for (Node node : cluster.nodes()) { + env.kafkaClient().delayReady(node, 100); + } + env.kafkaClient().setDisconnectFuture(disconnectFuture); + final ListTopicsResult result = env.adminClient().listTopics(); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + log.debug("Advancing clock by 25 ms to trigger client-side disconnect."); + time.sleep(25); + disconnectFuture.get(); + log.debug("Enabling nodes to send requests again."); + for (Node node : cluster.nodes()) { + env.kafkaClient().delayReady(node, 0); + } + time.sleep(5); + log.info("Waiting for result."); + assertEquals(0, result.listings().get().size()); + } + } + + /** + * Test that if the client can send to a node, but doesn't receive a response, it will + * disconnect and try a different node. + */ + @Test + public void testClientSideTimeoutAfterFailureToReceiveResponse() throws Exception { + Cluster cluster = mockCluster(3, 0); + CompletableFuture<String> disconnectFuture = new CompletableFuture<>(); + MockTime time = new MockTime(); + try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster, + newStrMap(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1", + AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "100000", + AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "1"))) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().setDisconnectFuture(disconnectFuture); + final ListTopicsResult result = env.adminClient().listTopics(); + while (true) { + time.sleep(1); + try { + disconnectFuture.get(1, TimeUnit.MICROSECONDS); Review comment: Might be simpler to check `isDone`. ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -1089,29 +1106,63 @@ private long sendEligibleCalls(long now) { continue; } Node node = entry.getKey(); + if (!callsInFlight.getOrDefault(node.idString(), Collections.emptyList()).isEmpty()) { + log.trace("Still waiting for other calls to finish on node {}.", node); + nodeReadyDeadlines.remove(node); + continue; + } if (!client.ready(node, now)) { + Long deadline = nodeReadyDeadlines.get(node); + if (deadline != null) { + if (now >= deadline) { + log.info("Disconnecting from {} and revoking {} node assignment(s) " + + "because the node is taking too long to become ready.", + node.idString(), calls.size()); + transitionToPendingAndClearList(calls); + client.disconnect(node.idString()); + nodeReadyDeadlines.remove(node); + iter.remove(); + continue; + } + pollTimeout = Math.min(pollTimeout, deadline - now); + } else { + nodeReadyDeadlines.put(node, now + requestTimeoutMs); + } long nodeTimeout = client.pollDelayMs(node, now); pollTimeout = Math.min(pollTimeout, nodeTimeout); log.trace("Client is not ready to send to {}. Must delay {} ms", node, nodeTimeout); continue; } - Call call = calls.remove(0); - int requestTimeoutMs = Math.min(KafkaAdminClient.this.requestTimeoutMs, + // Subtract the time we spent waiting for the node to become ready from + // the total request time. + int remainingRequestTime; + Long deadlineMs = nodeReadyDeadlines.remove(node); + if (deadlineMs == null) { + remainingRequestTime = requestTimeoutMs; + } else { + remainingRequestTime = calcTimeoutMsRemainingAsInt(now, deadlineMs); + } + while (!calls.isEmpty()) { + Call call = calls.remove(0); + int timeoutMs = Math.min(remainingRequestTime, calcTimeoutMsRemainingAsInt(now, call.deadlineMs)); - AbstractRequest.Builder<?> requestBuilder; - try { - requestBuilder = call.createRequest(requestTimeoutMs); - } catch (Throwable throwable) { - call.fail(now, new KafkaException(String.format( - "Internal error sending %s to %s.", call.callName, node))); - continue; + AbstractRequest.Builder<?> requestBuilder; + try { + requestBuilder = call.createRequest(timeoutMs); + } catch (Throwable throwable) { + call.fail(now, new KafkaException(String.format( Review comment: I noticed that we lose the throwable that we caught here. Can we add it as a cause? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org