cadonna commented on code in PR #14532: URL: https://github.com/apache/kafka/pull/14532#discussion_r1363336947
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ########## @@ -188,9 +188,7 @@ private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() { if (response != null) { onResponse((ConsumerGroupHeartbeatResponse) response.responseBody(), response.receivedTimeMs()); Review Comment: nit: I was wondering whether you should use `request.handler().completionTimeMs()` instead of `response.receivedTimeMs()` here and in other places. IMO, it would make the calls clearer because `request.handler()` would be the single component that manages the completion time. I also realized that if the completion handler is passed into the constructor of `UnsentRequest` using `request.handler()` would not be possible. I was also thinking that maybe you should get rid of that constructor because it makes the code less readable. Those are just nit comments to think about. They do not block merging this PR. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ########## @@ -256,27 +256,39 @@ public String toString() { public static class FutureCompletionHandler implements RequestCompletionHandler { + private long responseCompletionTimeMs; private final CompletableFuture<ClientResponse> future; FutureCompletionHandler() { - this.future = new CompletableFuture<>(); + future = new CompletableFuture<>(); } - public void onFailure(final RuntimeException e) { - future.completeExceptionally(e); + public void onFailure(final long currentTimeMs, final RuntimeException e) { Review Comment: I could not find unit tests to verify the setting of the correct completion time in `onFailure()` and `onComplete()`. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManagerTest.java: ########## @@ -151,6 +152,36 @@ void testHardFailures(Exception exception) { } } + @Test + public void testNetworkTimeout() { Review Comment: I expected such a test also in the other request manager tests. The only request manager test that has such test other than this is `HeartbeatRequestManager`. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ########## @@ -148,30 +148,33 @@ private Optional<NetworkClientDelegate.UnsentRequest> send(final long currentTim private NetworkClientDelegate.UnsentRequest createUnsentRequest( final MetadataRequest.Builder request) { - return new NetworkClientDelegate.UnsentRequest( - request, - Optional.empty(), - this::processResponseOrException - ); - } + NetworkClientDelegate.UnsentRequest unsent = new NetworkClientDelegate.UnsentRequest( + request, + Optional.empty()); + + unsent.future().whenComplete((response, exception) -> { + if (response == null) { + // Backoff if the error is retriable Review Comment: I think this is not the right place for this comment because there is no concept of a backoff here. I think you should remove it. (Maybe you realised by now that I am not a big fan of inline comments. I think they eventually start to lie. In this case it is even very likely because you cannot see the code to which this comment refers. If something changes in `handleError()`, it is very unlikely that this comment will be updated.) ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -254,14 +254,14 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() { new ArrayList<>(this.requestedPartitions), throwOnFetchStableOffsetUnsupported); return new NetworkClientDelegate.UnsentRequest( - builder, - coordinatorRequestManager.coordinator(), - (r, t) -> onResponse(r.receivedTimeMs(), (OffsetFetchResponse) r.responseBody())); + builder, + coordinatorRequestManager.coordinator(), + (r, t) -> onResponse(r.receivedTimeMs(), (OffsetFetchResponse) r.responseBody())); Review Comment: Out of curiosity, can the response never be `null` here as in `HeartbeatRequestManager`? Can timeouts not happen here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org