hachikuji commented on a change in pull request #10281:
URL: https://github.com/apache/kafka/pull/10281#discussion_r596376827



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1089,29 +1106,63 @@ private long sendEligibleCalls(long now) {
                     continue;
                 }
                 Node node = entry.getKey();
+                if (!callsInFlight.getOrDefault(node.idString(), 
Collections.emptyList()).isEmpty()) {
+                    log.trace("Still waiting for other calls to finish on node 
{}.", node);
+                    nodeReadyDeadlines.remove(node);
+                    continue;
+                }
                 if (!client.ready(node, now)) {
+                    Long deadline = nodeReadyDeadlines.get(node);
+                    if (deadline != null) {
+                        if (now >= deadline) {
+                            log.info("Disconnecting from {} and revoking {} 
node assignment(s) " +
+                                "because the node is taking too long to become 
ready.",
+                                node.idString(), calls.size());
+                            transitionToPendingAndClearList(calls);
+                            client.disconnect(node.idString());
+                            nodeReadyDeadlines.remove(node);
+                            iter.remove();
+                            continue;
+                        }
+                        pollTimeout = Math.min(pollTimeout, deadline - now);

Review comment:
       Might be a good idea to use `Math.max(0, deadline - now)` since system 
time is non-monotonic.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1367,15 +1414,15 @@ private void processRequests() {
         void enqueue(Call call, long now) {
             if (call.tries > maxRetries) {
                 log.debug("Max retries {} for {} reached", maxRetries, call);
-                call.fail(time.milliseconds(), new TimeoutException());
+                call.handleTimeoutFailure(time.milliseconds(), new 
TimeoutException());

Review comment:
       While we're here, can we set the exception message to indicate that max 
retries had been reached?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1089,29 +1106,61 @@ private long sendEligibleCalls(long now) {
                     continue;
                 }
                 Node node = entry.getKey();
+                if (!callsInFlight.getOrDefault(node.idString(), 
Collections.emptyList()).isEmpty()) {
+                    log.trace("Still waiting for other calls to finish on node 
{}.", node);
+                    nodeReadyDeadlines.remove(node);
+                    continue;

Review comment:
       Ok. I think we might as well simplify the type since we already have 
logic which assumes one inflight request. It doesn't feel like we're buying 
ourselves much in terms of future-proofing.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -5307,6 +5308,71 @@ public void testUnregisterBrokerTimeoutMaxWait() {
         }
     }
 
+    /**
+     * Test that if the client can obtain a node assignment, but can't send to 
the given
+     * node, it will disconnect and try a different node.
+     */
+    @Test
+    public void testClientSideTimeoutAfterFailureToSend() throws Exception {
+        Cluster cluster = mockCluster(3, 0);
+        CompletableFuture<String> disconnectFuture = new CompletableFuture<>();
+        MockTime time = new MockTime();
+        try (final AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(time, cluster,
+                newStrMap(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1",
+                          AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 
"100000",
+                          AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "1"))) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            for (Node node : cluster.nodes()) {
+                env.kafkaClient().delayReady(node, 100);
+            }
+            env.kafkaClient().setDisconnectFuture(disconnectFuture);
+            final ListTopicsResult result = env.adminClient().listTopics();
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+            log.debug("Advancing clock by 25 ms to trigger client-side 
disconnect.");
+            time.sleep(25);
+            disconnectFuture.get();
+            log.debug("Enabling nodes to send requests again.");
+            for (Node node : cluster.nodes()) {
+                env.kafkaClient().delayReady(node, 0);
+            }
+            time.sleep(5);
+            log.info("Waiting for result.");
+            assertEquals(0, result.listings().get().size());
+        }
+    }
+
+    /**
+     * Test that if the client can send to a node, but doesn't receive a 
response, it will
+     * disconnect and try a different node.
+     */
+    @Test
+    public void testClientSideTimeoutAfterFailureToReceiveResponse() throws 
Exception {
+        Cluster cluster = mockCluster(3, 0);
+        CompletableFuture<String> disconnectFuture = new CompletableFuture<>();
+        MockTime time = new MockTime();
+        try (final AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(time, cluster,
+            newStrMap(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1",
+                AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "100000",
+                AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "1"))) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().setDisconnectFuture(disconnectFuture);
+            final ListTopicsResult result = env.adminClient().listTopics();
+            while (true) {
+                time.sleep(1);
+                try {
+                    disconnectFuture.get(1, TimeUnit.MICROSECONDS);

Review comment:
       Might be simpler to check `isDone`.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1089,29 +1106,63 @@ private long sendEligibleCalls(long now) {
                     continue;
                 }
                 Node node = entry.getKey();
+                if (!callsInFlight.getOrDefault(node.idString(), 
Collections.emptyList()).isEmpty()) {
+                    log.trace("Still waiting for other calls to finish on node 
{}.", node);
+                    nodeReadyDeadlines.remove(node);
+                    continue;
+                }
                 if (!client.ready(node, now)) {
+                    Long deadline = nodeReadyDeadlines.get(node);
+                    if (deadline != null) {
+                        if (now >= deadline) {
+                            log.info("Disconnecting from {} and revoking {} 
node assignment(s) " +
+                                "because the node is taking too long to become 
ready.",
+                                node.idString(), calls.size());
+                            transitionToPendingAndClearList(calls);
+                            client.disconnect(node.idString());
+                            nodeReadyDeadlines.remove(node);
+                            iter.remove();
+                            continue;
+                        }
+                        pollTimeout = Math.min(pollTimeout, deadline - now);
+                    } else {
+                        nodeReadyDeadlines.put(node, now + requestTimeoutMs);
+                    }
                     long nodeTimeout = client.pollDelayMs(node, now);
                     pollTimeout = Math.min(pollTimeout, nodeTimeout);
                     log.trace("Client is not ready to send to {}. Must delay 
{} ms", node, nodeTimeout);
                     continue;
                 }
-                Call call = calls.remove(0);
-                int requestTimeoutMs = 
Math.min(KafkaAdminClient.this.requestTimeoutMs,
+                // Subtract the time we spent waiting for the node to become 
ready from
+                // the total request time.
+                int remainingRequestTime;
+                Long deadlineMs = nodeReadyDeadlines.remove(node);
+                if (deadlineMs == null) {
+                    remainingRequestTime = requestTimeoutMs;
+                } else {
+                    remainingRequestTime = calcTimeoutMsRemainingAsInt(now, 
deadlineMs);
+                }
+                while (!calls.isEmpty()) {
+                    Call call = calls.remove(0);
+                    int timeoutMs = Math.min(remainingRequestTime,
                         calcTimeoutMsRemainingAsInt(now, call.deadlineMs));
-                AbstractRequest.Builder<?> requestBuilder;
-                try {
-                    requestBuilder = call.createRequest(requestTimeoutMs);
-                } catch (Throwable throwable) {
-                    call.fail(now, new KafkaException(String.format(
-                        "Internal error sending %s to %s.", call.callName, 
node)));
-                    continue;
+                    AbstractRequest.Builder<?> requestBuilder;
+                    try {
+                        requestBuilder = call.createRequest(timeoutMs);
+                    } catch (Throwable throwable) {
+                        call.fail(now, new KafkaException(String.format(

Review comment:
       I noticed that we lose the throwable that we caught here. Can we add it 
as a cause?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to