[GitHub] [kafka] cmccabe commented on a change in pull request #10281: KAFKA-12432: AdminClient should time out nodes that are never ready
cmccabe commented on a change in pull request #10281: URL: https://github.com/apache/kafka/pull/10281#discussion_r597952673 ## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ## @@ -5307,6 +5308,71 @@ public void testUnregisterBrokerTimeoutMaxWait() { } } +/** + * Test that if the client can obtain a node assignment, but can't send to the given + * node, it will disconnect and try a different node. + */ +@Test +public void testClientSideTimeoutAfterFailureToSend() throws Exception { +Cluster cluster = mockCluster(3, 0); +CompletableFuture disconnectFuture = new CompletableFuture<>(); +MockTime time = new MockTime(); +try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster, +newStrMap(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1", + AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "10", + AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "1"))) { +env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); +for (Node node : cluster.nodes()) { +env.kafkaClient().delayReady(node, 100); +} +env.kafkaClient().setDisconnectFuture(disconnectFuture); +final ListTopicsResult result = env.adminClient().listTopics(); +env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); +log.debug("Advancing clock by 25 ms to trigger client-side disconnect."); +time.sleep(25); +disconnectFuture.get(); +log.debug("Enabling nodes to send requests again."); +for (Node node : cluster.nodes()) { +env.kafkaClient().delayReady(node, 0); +} +time.sleep(5); +log.info("Waiting for result."); +assertEquals(0, result.listings().get().size()); +} +} + +/** + * Test that if the client can send to a node, but doesn't receive a response, it will + * disconnect and try a different node. + */ +@Test +public void testClientSideTimeoutAfterFailureToReceiveResponse() throws Exception { +Cluster cluster = mockCluster(3, 0); +CompletableFuture disconnectFuture = new CompletableFuture<>(); +MockTime time = new MockTime(); +try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster, +newStrMap(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1", +AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "10", +AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "1"))) { +env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); +env.kafkaClient().setDisconnectFuture(disconnectFuture); +final ListTopicsResult result = env.adminClient().listTopics(); +while (true) { +time.sleep(1); +try { +disconnectFuture.get(1, TimeUnit.MICROSECONDS); Review comment: In this case, `isDone` is not supposed to be true until we advance the clock (so that a new node can be tried, after the previous one disconnected) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10281: KAFKA-12432: AdminClient should time out nodes that are never ready
cmccabe commented on a change in pull request #10281: URL: https://github.com/apache/kafka/pull/10281#discussion_r597950877 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -1089,29 +1106,63 @@ private long sendEligibleCalls(long now) { continue; } Node node = entry.getKey(); +if (!callsInFlight.getOrDefault(node.idString(), Collections.emptyList()).isEmpty()) { +log.trace("Still waiting for other calls to finish on node {}.", node); +nodeReadyDeadlines.remove(node); +continue; +} if (!client.ready(node, now)) { +Long deadline = nodeReadyDeadlines.get(node); +if (deadline != null) { +if (now >= deadline) { +log.info("Disconnecting from {} and revoking {} node assignment(s) " + +"because the node is taking too long to become ready.", +node.idString(), calls.size()); +transitionToPendingAndClearList(calls); +client.disconnect(node.idString()); +nodeReadyDeadlines.remove(node); +iter.remove(); +continue; +} +pollTimeout = Math.min(pollTimeout, deadline - now); Review comment: Let's just do that in the call to `NetworkClient#poll`. In general I agree that monotonic time would have been better (maybe this needs a rewrite...) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10281: KAFKA-12432: AdminClient should time out nodes that are never ready
cmccabe commented on a change in pull request #10281: URL: https://github.com/apache/kafka/pull/10281#discussion_r597950036 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -1089,29 +1106,61 @@ private long sendEligibleCalls(long now) { continue; } Node node = entry.getKey(); +if (!callsInFlight.getOrDefault(node.idString(), Collections.emptyList()).isEmpty()) { +log.trace("Still waiting for other calls to finish on node {}.", node); +nodeReadyDeadlines.remove(node); +continue; Review comment: ok. I changed it to just `Map` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10281: KAFKA-12432: AdminClient should time out nodes that are never ready
cmccabe commented on a change in pull request #10281: URL: https://github.com/apache/kafka/pull/10281#discussion_r595633992 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -1089,29 +1106,61 @@ private long sendEligibleCalls(long now) { continue; } Node node = entry.getKey(); +if (!callsInFlight.getOrDefault(node.idString(), Collections.emptyList()).isEmpty()) { +log.trace("Still waiting for other calls to finish on node {}.", node); +nodeReadyDeadlines.remove(node); +continue; Review comment: Oh, and on the question of why inflight requests has a list, it was done for future-proofing This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10281: KAFKA-12432: AdminClient should time out nodes that are never ready
cmccabe commented on a change in pull request #10281: URL: https://github.com/apache/kafka/pull/10281#discussion_r595633668 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -1089,29 +1106,61 @@ private long sendEligibleCalls(long now) { continue; } Node node = entry.getKey(); +if (!callsInFlight.getOrDefault(node.idString(), Collections.emptyList()).isEmpty()) { +log.trace("Still waiting for other calls to finish on node {}.", node); +nodeReadyDeadlines.remove(node); +continue; Review comment: Notice that we set `maxInFlightRequestsPerConnection` to 1 when constructing the `NetworkClient`. We don't support sending multiple requests to a single node on a single connection in `AdminClient`. I think we could add this support, but we'd have to check how the server handled it since we've never done it before. Maybe there should be a JIRA. Also, if we do choose to add this support for multiple outstanding requests per node per socket we'd need some way to distinguish between "waiting for a chance to use this connection" from "waiting for this connection to be opened" in NetworkClient. Currently ready just returns a boolean, which isn't enough information to distinguish these two cases. We could probably add a new method that returned an enum or something. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10281: KAFKA-12432: AdminClient should time out nodes that are never ready
cmccabe commented on a change in pull request #10281: URL: https://github.com/apache/kafka/pull/10281#discussion_r595632280 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -1089,29 +1106,61 @@ private long sendEligibleCalls(long now) { continue; } Node node = entry.getKey(); +if (!callsInFlight.getOrDefault(node.idString(), Collections.emptyList()).isEmpty()) { +log.trace("Still waiting for other calls to finish on node {}.", node); +nodeReadyDeadlines.remove(node); +continue; +} if (!client.ready(node, now)) { +Long deadline = nodeReadyDeadlines.get(node); +if (deadline != null) { +if (now >= deadline) { +log.info("Disconnecting from {} and revoking {} node assignment(s) " + +"because the node is taking too long to become ready.", +node.idString(), calls.size()); +transitionToPendingAndClearList(calls); +client.disconnect(node.idString()); +nodeReadyDeadlines.remove(node); +iter.remove(); +continue; +} +pollTimeout = Math.min(pollTimeout, deadline - now); +} else { +nodeReadyDeadlines.put(node, now + requestTimeoutMs); +} long nodeTimeout = client.pollDelayMs(node, now); pollTimeout = Math.min(pollTimeout, nodeTimeout); log.trace("Client is not ready to send to {}. Must delay {} ms", node, nodeTimeout); continue; } -Call call = calls.remove(0); -int requestTimeoutMs = Math.min(KafkaAdminClient.this.requestTimeoutMs, +int remainingRequestTime; +Long deadlineMs = nodeReadyDeadlines.remove(node); Review comment: I added a comment This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10281: KAFKA-12432: AdminClient should time out nodes that are never ready
cmccabe commented on a change in pull request #10281: URL: https://github.com/apache/kafka/pull/10281#discussion_r595631870 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -1089,29 +1106,61 @@ private long sendEligibleCalls(long now) { continue; } Node node = entry.getKey(); +if (!callsInFlight.getOrDefault(node.idString(), Collections.emptyList()).isEmpty()) { +log.trace("Still waiting for other calls to finish on node {}.", node); +nodeReadyDeadlines.remove(node); +continue; +} if (!client.ready(node, now)) { +Long deadline = nodeReadyDeadlines.get(node); +if (deadline != null) { +if (now >= deadline) { +log.info("Disconnecting from {} and revoking {} node assignment(s) " + +"because the node is taking too long to become ready.", +node.idString(), calls.size()); +transitionToPendingAndClearList(calls); +client.disconnect(node.idString()); +nodeReadyDeadlines.remove(node); +iter.remove(); +continue; +} +pollTimeout = Math.min(pollTimeout, deadline - now); +} else { +nodeReadyDeadlines.put(node, now + requestTimeoutMs); Review comment: The complexity of the min / max issue is one thing. Another thing is that we don't know when the connection has been established, and when it has not. NetworkClient doesn't expose this information. `NetworkClient#ready` may return false for a variety of reasons, many of which are not indicative of connection establishment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10281: KAFKA-12432: AdminClient should time out nodes that are never ready
cmccabe commented on a change in pull request #10281: URL: https://github.com/apache/kafka/pull/10281#discussion_r595631316 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -1136,17 +1185,12 @@ private void timeoutCallsInFlight(TimeoutProcessor processor) { // only one we need to check the timeout for. Call call = contexts.get(0); if (processor.callHasExpired(call)) { -if (call.aborted) { -log.warn("Aborted call {} is still in callsInFlight.", call); -} else { -log.debug("Closing connection to {} due to timeout while awaiting {}", nodeId, call); -call.aborted = true; -client.disconnect(nodeId); -numTimedOut++; -// We don't remove anything from the callsInFlight data structure. Because the connection -// has been closed, the calls should be returned by the next client#poll(), -// and handled at that point. -} +log.debug("Disconnecting from {} due to timeout while awaiting {}", nodeId, call); Review comment: OK, let's raise it to `INFO`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org