hachikuji commented on a change in pull request #10281:
URL: https://github.com/apache/kafka/pull/10281#discussion_r592865485
##########
File path:
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1136,17 +1185,12 @@ private void timeoutCallsInFlight(TimeoutProcessor
processor) {
// only one we need to check the timeout for.
Call call = contexts.get(0);
if (processor.callHasExpired(call)) {
- if (call.aborted) {
- log.warn("Aborted call {} is still in callsInFlight.",
call);
- } else {
- log.debug("Closing connection to {} due to timeout
while awaiting {}", nodeId, call);
- call.aborted = true;
- client.disconnect(nodeId);
- numTimedOut++;
- // We don't remove anything from the callsInFlight
data structure. Because the connection
- // has been closed, the calls should be returned by
the next client#poll(),
- // and handled at that point.
- }
+ log.debug("Disconnecting from {} due to timeout while
awaiting {}", nodeId, call);
Review comment:
I wonder if we can raise this to info? This is a really important event
to see in the logs, and we are always left guessing about it when the user does
not have debug logging enabled. The frequency at the rate of the request
timeout should keep it from being too spammy.
##########
File path:
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1089,29 +1106,61 @@ private long sendEligibleCalls(long now) {
continue;
}
Node node = entry.getKey();
+ if (!callsInFlight.getOrDefault(node.idString(),
Collections.emptyList()).isEmpty()) {
+ log.trace("Still waiting for other calls to finish on node
{}.", node);
+ nodeReadyDeadlines.remove(node);
+ continue;
Review comment:
Maybe I am missing something here. Why do we need to continue? Do we not
support multiple in-flight requests? If not, then why does `callsInFlight` map
to a list?
##########
File path:
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1089,29 +1106,61 @@ private long sendEligibleCalls(long now) {
continue;
}
Node node = entry.getKey();
+ if (!callsInFlight.getOrDefault(node.idString(),
Collections.emptyList()).isEmpty()) {
+ log.trace("Still waiting for other calls to finish on node
{}.", node);
+ nodeReadyDeadlines.remove(node);
+ continue;
+ }
if (!client.ready(node, now)) {
+ Long deadline = nodeReadyDeadlines.get(node);
+ if (deadline != null) {
+ if (now >= deadline) {
+ log.info("Disconnecting from {} and revoking {}
node assignment(s) " +
+ "because the node is taking too long to become
ready.",
+ node.idString(), calls.size());
+ transitionToPendingAndClearList(calls);
+ client.disconnect(node.idString());
+ nodeReadyDeadlines.remove(node);
+ iter.remove();
+ continue;
+ }
+ pollTimeout = Math.min(pollTimeout, deadline - now);
+ } else {
+ nodeReadyDeadlines.put(node, now + requestTimeoutMs);
Review comment:
It seems like it would be quite intuitive to use
`socket.connection.setup.timeout.ms`. Is it because of the complexity of
`socket.connection.setup.timeout.max.ms` that we don't do this? Basically we
don't know how long the network client itself is planning to wait for the
connection to be established.
##########
File path:
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1089,29 +1106,61 @@ private long sendEligibleCalls(long now) {
continue;
}
Node node = entry.getKey();
+ if (!callsInFlight.getOrDefault(node.idString(),
Collections.emptyList()).isEmpty()) {
+ log.trace("Still waiting for other calls to finish on node
{}.", node);
+ nodeReadyDeadlines.remove(node);
+ continue;
+ }
if (!client.ready(node, now)) {
+ Long deadline = nodeReadyDeadlines.get(node);
+ if (deadline != null) {
+ if (now >= deadline) {
+ log.info("Disconnecting from {} and revoking {}
node assignment(s) " +
+ "because the node is taking too long to become
ready.",
+ node.idString(), calls.size());
+ transitionToPendingAndClearList(calls);
+ client.disconnect(node.idString());
+ nodeReadyDeadlines.remove(node);
+ iter.remove();
+ continue;
+ }
+ pollTimeout = Math.min(pollTimeout, deadline - now);
+ } else {
+ nodeReadyDeadlines.put(node, now + requestTimeoutMs);
+ }
long nodeTimeout = client.pollDelayMs(node, now);
pollTimeout = Math.min(pollTimeout, nodeTimeout);
log.trace("Client is not ready to send to {}. Must delay
{} ms", node, nodeTimeout);
continue;
}
- Call call = calls.remove(0);
- int requestTimeoutMs =
Math.min(KafkaAdminClient.this.requestTimeoutMs,
+ int remainingRequestTime;
+ Long deadlineMs = nodeReadyDeadlines.remove(node);
Review comment:
So I guess the intent here is to count the "ready time" toward the
request timeout. I think that's reasonable. It means that request timeout
represents the maximum time we will commit to trying to get a response from a
particular node before retrying. It might be worth a comment to explain this?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]