hachikuji commented on a change in pull request #10281:
URL: https://github.com/apache/kafka/pull/10281#discussion_r592865485



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1136,17 +1185,12 @@ private void timeoutCallsInFlight(TimeoutProcessor 
processor) {
                 // only one we need to check the timeout for.
                 Call call = contexts.get(0);
                 if (processor.callHasExpired(call)) {
-                    if (call.aborted) {
-                        log.warn("Aborted call {} is still in callsInFlight.", 
call);
-                    } else {
-                        log.debug("Closing connection to {} due to timeout 
while awaiting {}", nodeId, call);
-                        call.aborted = true;
-                        client.disconnect(nodeId);
-                        numTimedOut++;
-                        // We don't remove anything from the callsInFlight 
data structure. Because the connection
-                        // has been closed, the calls should be returned by 
the next client#poll(),
-                        // and handled at that point.
-                    }
+                    log.debug("Disconnecting from {} due to timeout while 
awaiting {}", nodeId, call);

Review comment:
       I wonder if we can raise this to info? This is a really important event 
to see in the logs, and we are always left guessing about it when the user does 
not have debug logging enabled. The frequency at the rate of the request 
timeout should keep it from being too spammy.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1089,29 +1106,61 @@ private long sendEligibleCalls(long now) {
                     continue;
                 }
                 Node node = entry.getKey();
+                if (!callsInFlight.getOrDefault(node.idString(), 
Collections.emptyList()).isEmpty()) {
+                    log.trace("Still waiting for other calls to finish on node 
{}.", node);
+                    nodeReadyDeadlines.remove(node);
+                    continue;

Review comment:
       Maybe I am missing something here. Why do we need to continue? Do we not 
support multiple in-flight requests? If not, then why does `callsInFlight` map 
to a list?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1089,29 +1106,61 @@ private long sendEligibleCalls(long now) {
                     continue;
                 }
                 Node node = entry.getKey();
+                if (!callsInFlight.getOrDefault(node.idString(), 
Collections.emptyList()).isEmpty()) {
+                    log.trace("Still waiting for other calls to finish on node 
{}.", node);
+                    nodeReadyDeadlines.remove(node);
+                    continue;
+                }
                 if (!client.ready(node, now)) {
+                    Long deadline = nodeReadyDeadlines.get(node);
+                    if (deadline != null) {
+                        if (now >= deadline) {
+                            log.info("Disconnecting from {} and revoking {} 
node assignment(s) " +
+                                "because the node is taking too long to become 
ready.",
+                                node.idString(), calls.size());
+                            transitionToPendingAndClearList(calls);
+                            client.disconnect(node.idString());
+                            nodeReadyDeadlines.remove(node);
+                            iter.remove();
+                            continue;
+                        }
+                        pollTimeout = Math.min(pollTimeout, deadline - now);
+                    } else {
+                        nodeReadyDeadlines.put(node, now + requestTimeoutMs);

Review comment:
       It seems like it would be quite intuitive to use 
`socket.connection.setup.timeout.ms`. Is it because of the complexity of 
`socket.connection.setup.timeout.max.ms` that we don't do this? Basically we 
don't know how long the network client itself is planning to wait for the 
connection to be established.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1089,29 +1106,61 @@ private long sendEligibleCalls(long now) {
                     continue;
                 }
                 Node node = entry.getKey();
+                if (!callsInFlight.getOrDefault(node.idString(), 
Collections.emptyList()).isEmpty()) {
+                    log.trace("Still waiting for other calls to finish on node 
{}.", node);
+                    nodeReadyDeadlines.remove(node);
+                    continue;
+                }
                 if (!client.ready(node, now)) {
+                    Long deadline = nodeReadyDeadlines.get(node);
+                    if (deadline != null) {
+                        if (now >= deadline) {
+                            log.info("Disconnecting from {} and revoking {} 
node assignment(s) " +
+                                "because the node is taking too long to become 
ready.",
+                                node.idString(), calls.size());
+                            transitionToPendingAndClearList(calls);
+                            client.disconnect(node.idString());
+                            nodeReadyDeadlines.remove(node);
+                            iter.remove();
+                            continue;
+                        }
+                        pollTimeout = Math.min(pollTimeout, deadline - now);
+                    } else {
+                        nodeReadyDeadlines.put(node, now + requestTimeoutMs);
+                    }
                     long nodeTimeout = client.pollDelayMs(node, now);
                     pollTimeout = Math.min(pollTimeout, nodeTimeout);
                     log.trace("Client is not ready to send to {}. Must delay 
{} ms", node, nodeTimeout);
                     continue;
                 }
-                Call call = calls.remove(0);
-                int requestTimeoutMs = 
Math.min(KafkaAdminClient.this.requestTimeoutMs,
+                int remainingRequestTime;
+                Long deadlineMs = nodeReadyDeadlines.remove(node);

Review comment:
       So I guess the intent here is to count the "ready time" toward the 
request timeout. I think that's reasonable. It means that request timeout 
represents the maximum time we will commit to trying to get a response from a 
particular node before retrying. It might be worth a comment to explain this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to