[GitHub] [kafka] cmccabe commented on a change in pull request #10281: KAFKA-12432: AdminClient should time out nodes that are never ready

2021-03-19 Thread GitBox


cmccabe commented on a change in pull request #10281:
URL: https://github.com/apache/kafka/pull/10281#discussion_r597952673



##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##
@@ -5307,6 +5308,71 @@ public void testUnregisterBrokerTimeoutMaxWait() {
 }
 }
 
+/**
+ * Test that if the client can obtain a node assignment, but can't send to 
the given
+ * node, it will disconnect and try a different node.
+ */
+@Test
+public void testClientSideTimeoutAfterFailureToSend() throws Exception {
+Cluster cluster = mockCluster(3, 0);
+CompletableFuture disconnectFuture = new CompletableFuture<>();
+MockTime time = new MockTime();
+try (final AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(time, cluster,
+newStrMap(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1",
+  AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 
"10",
+  AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "1"))) {
+env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+for (Node node : cluster.nodes()) {
+env.kafkaClient().delayReady(node, 100);
+}
+env.kafkaClient().setDisconnectFuture(disconnectFuture);
+final ListTopicsResult result = env.adminClient().listTopics();
+env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+log.debug("Advancing clock by 25 ms to trigger client-side 
disconnect.");
+time.sleep(25);
+disconnectFuture.get();
+log.debug("Enabling nodes to send requests again.");
+for (Node node : cluster.nodes()) {
+env.kafkaClient().delayReady(node, 0);
+}
+time.sleep(5);
+log.info("Waiting for result.");
+assertEquals(0, result.listings().get().size());
+}
+}
+
+/**
+ * Test that if the client can send to a node, but doesn't receive a 
response, it will
+ * disconnect and try a different node.
+ */
+@Test
+public void testClientSideTimeoutAfterFailureToReceiveResponse() throws 
Exception {
+Cluster cluster = mockCluster(3, 0);
+CompletableFuture disconnectFuture = new CompletableFuture<>();
+MockTime time = new MockTime();
+try (final AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(time, cluster,
+newStrMap(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1",
+AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "10",
+AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "1"))) {
+env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+env.kafkaClient().setDisconnectFuture(disconnectFuture);
+final ListTopicsResult result = env.adminClient().listTopics();
+while (true) {
+time.sleep(1);
+try {
+disconnectFuture.get(1, TimeUnit.MICROSECONDS);

Review comment:
   In this case, `isDone` is not supposed to be true until we advance the 
clock (so that a new node can be tried, after the previous one disconnected)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10281: KAFKA-12432: AdminClient should time out nodes that are never ready

2021-03-19 Thread GitBox


cmccabe commented on a change in pull request #10281:
URL: https://github.com/apache/kafka/pull/10281#discussion_r597950877



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -1089,29 +1106,63 @@ private long sendEligibleCalls(long now) {
 continue;
 }
 Node node = entry.getKey();
+if (!callsInFlight.getOrDefault(node.idString(), 
Collections.emptyList()).isEmpty()) {
+log.trace("Still waiting for other calls to finish on node 
{}.", node);
+nodeReadyDeadlines.remove(node);
+continue;
+}
 if (!client.ready(node, now)) {
+Long deadline = nodeReadyDeadlines.get(node);
+if (deadline != null) {
+if (now >= deadline) {
+log.info("Disconnecting from {} and revoking {} 
node assignment(s) " +
+"because the node is taking too long to become 
ready.",
+node.idString(), calls.size());
+transitionToPendingAndClearList(calls);
+client.disconnect(node.idString());
+nodeReadyDeadlines.remove(node);
+iter.remove();
+continue;
+}
+pollTimeout = Math.min(pollTimeout, deadline - now);

Review comment:
   Let's just do that in the call to `NetworkClient#poll`.  In general I 
agree that monotonic time would have been better (maybe this needs a rewrite...)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10281: KAFKA-12432: AdminClient should time out nodes that are never ready

2021-03-19 Thread GitBox


cmccabe commented on a change in pull request #10281:
URL: https://github.com/apache/kafka/pull/10281#discussion_r597950036



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -1089,29 +1106,61 @@ private long sendEligibleCalls(long now) {
 continue;
 }
 Node node = entry.getKey();
+if (!callsInFlight.getOrDefault(node.idString(), 
Collections.emptyList()).isEmpty()) {
+log.trace("Still waiting for other calls to finish on node 
{}.", node);
+nodeReadyDeadlines.remove(node);
+continue;

Review comment:
   ok.  I changed it to just `Map`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10281: KAFKA-12432: AdminClient should time out nodes that are never ready

2021-03-16 Thread GitBox


cmccabe commented on a change in pull request #10281:
URL: https://github.com/apache/kafka/pull/10281#discussion_r595633992



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -1089,29 +1106,61 @@ private long sendEligibleCalls(long now) {
 continue;
 }
 Node node = entry.getKey();
+if (!callsInFlight.getOrDefault(node.idString(), 
Collections.emptyList()).isEmpty()) {
+log.trace("Still waiting for other calls to finish on node 
{}.", node);
+nodeReadyDeadlines.remove(node);
+continue;

Review comment:
   Oh, and on the question of why inflight requests has a list, it was done 
for future-proofing





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10281: KAFKA-12432: AdminClient should time out nodes that are never ready

2021-03-16 Thread GitBox


cmccabe commented on a change in pull request #10281:
URL: https://github.com/apache/kafka/pull/10281#discussion_r595633668



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -1089,29 +1106,61 @@ private long sendEligibleCalls(long now) {
 continue;
 }
 Node node = entry.getKey();
+if (!callsInFlight.getOrDefault(node.idString(), 
Collections.emptyList()).isEmpty()) {
+log.trace("Still waiting for other calls to finish on node 
{}.", node);
+nodeReadyDeadlines.remove(node);
+continue;

Review comment:
   Notice that we set `maxInFlightRequestsPerConnection` to 1 when 
constructing the `NetworkClient`.  We don't support sending multiple requests 
to a single node on a single connection in `AdminClient`.  I think we could add 
this support, but we'd have to check how the server handled it since we've 
never done it before.  Maybe there should be a JIRA.
   
   Also, if we do choose to add this support for multiple outstanding requests 
per node per socket we'd need some way to distinguish between "waiting for a 
chance to use this connection" from "waiting for this connection to be opened" 
in NetworkClient.  Currently ready just returns a boolean, which isn't enough 
information to distinguish these two cases.  We could probably add a new method 
that returned an enum or something.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10281: KAFKA-12432: AdminClient should time out nodes that are never ready

2021-03-16 Thread GitBox


cmccabe commented on a change in pull request #10281:
URL: https://github.com/apache/kafka/pull/10281#discussion_r595632280



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -1089,29 +1106,61 @@ private long sendEligibleCalls(long now) {
 continue;
 }
 Node node = entry.getKey();
+if (!callsInFlight.getOrDefault(node.idString(), 
Collections.emptyList()).isEmpty()) {
+log.trace("Still waiting for other calls to finish on node 
{}.", node);
+nodeReadyDeadlines.remove(node);
+continue;
+}
 if (!client.ready(node, now)) {
+Long deadline = nodeReadyDeadlines.get(node);
+if (deadline != null) {
+if (now >= deadline) {
+log.info("Disconnecting from {} and revoking {} 
node assignment(s) " +
+"because the node is taking too long to become 
ready.",
+node.idString(), calls.size());
+transitionToPendingAndClearList(calls);
+client.disconnect(node.idString());
+nodeReadyDeadlines.remove(node);
+iter.remove();
+continue;
+}
+pollTimeout = Math.min(pollTimeout, deadline - now);
+} else {
+nodeReadyDeadlines.put(node, now + requestTimeoutMs);
+}
 long nodeTimeout = client.pollDelayMs(node, now);
 pollTimeout = Math.min(pollTimeout, nodeTimeout);
 log.trace("Client is not ready to send to {}. Must delay 
{} ms", node, nodeTimeout);
 continue;
 }
-Call call = calls.remove(0);
-int requestTimeoutMs = 
Math.min(KafkaAdminClient.this.requestTimeoutMs,
+int remainingRequestTime;
+Long deadlineMs = nodeReadyDeadlines.remove(node);

Review comment:
   I added a comment





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10281: KAFKA-12432: AdminClient should time out nodes that are never ready

2021-03-16 Thread GitBox


cmccabe commented on a change in pull request #10281:
URL: https://github.com/apache/kafka/pull/10281#discussion_r595631870



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -1089,29 +1106,61 @@ private long sendEligibleCalls(long now) {
 continue;
 }
 Node node = entry.getKey();
+if (!callsInFlight.getOrDefault(node.idString(), 
Collections.emptyList()).isEmpty()) {
+log.trace("Still waiting for other calls to finish on node 
{}.", node);
+nodeReadyDeadlines.remove(node);
+continue;
+}
 if (!client.ready(node, now)) {
+Long deadline = nodeReadyDeadlines.get(node);
+if (deadline != null) {
+if (now >= deadline) {
+log.info("Disconnecting from {} and revoking {} 
node assignment(s) " +
+"because the node is taking too long to become 
ready.",
+node.idString(), calls.size());
+transitionToPendingAndClearList(calls);
+client.disconnect(node.idString());
+nodeReadyDeadlines.remove(node);
+iter.remove();
+continue;
+}
+pollTimeout = Math.min(pollTimeout, deadline - now);
+} else {
+nodeReadyDeadlines.put(node, now + requestTimeoutMs);

Review comment:
   The complexity of the min / max issue is one thing.  Another thing is 
that we don't know when the connection has been established, and when it has 
not.  NetworkClient doesn't expose this information.  `NetworkClient#ready` may 
return false for a variety of reasons, many of which are not indicative of 
connection establishment.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10281: KAFKA-12432: AdminClient should time out nodes that are never ready

2021-03-16 Thread GitBox


cmccabe commented on a change in pull request #10281:
URL: https://github.com/apache/kafka/pull/10281#discussion_r595631316



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -1136,17 +1185,12 @@ private void timeoutCallsInFlight(TimeoutProcessor 
processor) {
 // only one we need to check the timeout for.
 Call call = contexts.get(0);
 if (processor.callHasExpired(call)) {
-if (call.aborted) {
-log.warn("Aborted call {} is still in callsInFlight.", 
call);
-} else {
-log.debug("Closing connection to {} due to timeout 
while awaiting {}", nodeId, call);
-call.aborted = true;
-client.disconnect(nodeId);
-numTimedOut++;
-// We don't remove anything from the callsInFlight 
data structure. Because the connection
-// has been closed, the calls should be returned by 
the next client#poll(),
-// and handled at that point.
-}
+log.debug("Disconnecting from {} due to timeout while 
awaiting {}", nodeId, call);

Review comment:
   OK, let's raise it to `INFO`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org