cadonna commented on code in PR #14532:
URL: https://github.com/apache/kafka/pull/14532#discussion_r1363336947


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -188,9 +188,7 @@ private NetworkClientDelegate.UnsentRequest 
makeHeartbeatRequest() {
             if (response != null) {
                 onResponse((ConsumerGroupHeartbeatResponse) 
response.responseBody(), response.receivedTimeMs());

Review Comment:
   nit:
   I was wondering whether you should use 
`request.handler().completionTimeMs()` instead of `response.receivedTimeMs()` 
here and in other places. IMO, it would make the calls clearer because 
`request.handler()` would be the single component that manages the completion 
time.
   I also realized that if the completion handler is passed into the 
constructor of `UnsentRequest` using  `request.handler()` would not be 
possible. I was also thinking that maybe you should get rid of that constructor 
because it makes the code less readable. 
   Those are just nit comments to think about. They do not block merging this 
PR.   



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -256,27 +256,39 @@ public String toString() {
 
     public static class FutureCompletionHandler implements 
RequestCompletionHandler {
 
+        private long responseCompletionTimeMs;
         private final CompletableFuture<ClientResponse> future;
 
         FutureCompletionHandler() {
-            this.future = new CompletableFuture<>();
+            future = new CompletableFuture<>();
         }
 
-        public void onFailure(final RuntimeException e) {
-            future.completeExceptionally(e);
+        public void onFailure(final long currentTimeMs, final RuntimeException 
e) {

Review Comment:
   I could not find unit tests to verify the setting of the correct completion 
time in `onFailure()` and `onComplete()`.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManagerTest.java:
##########
@@ -151,6 +152,36 @@ void testHardFailures(Exception exception) {
         }
     }
 
+    @Test
+    public void testNetworkTimeout() {

Review Comment:
   I expected such a test also in the other request manager tests. The only 
request manager test that has such test other than this is 
`HeartbeatRequestManager`. 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##########
@@ -148,30 +148,33 @@ private Optional<NetworkClientDelegate.UnsentRequest> 
send(final long currentTim
 
         private NetworkClientDelegate.UnsentRequest createUnsentRequest(
                 final MetadataRequest.Builder request) {
-            return new NetworkClientDelegate.UnsentRequest(
-                    request,
-                    Optional.empty(),
-                    this::processResponseOrException
-            );
-        }
+            NetworkClientDelegate.UnsentRequest unsent = new 
NetworkClientDelegate.UnsentRequest(
+                request,
+                Optional.empty());
+
+            unsent.future().whenComplete((response, exception) -> {
+                if (response == null) {
+                    // Backoff if the error is retriable

Review Comment:
   I think this is not the right place for this comment because there is no 
concept of a backoff here. I think you should remove it.
   
   (Maybe you realised by now that I am not a big fan of inline comments. I 
think they eventually start to lie. In this case it is even very likely because 
you cannot see the code to which this comment refers. If something changes in 
`handleError()`, it is very unlikely that this comment will be updated.)   



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -254,14 +254,14 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
                     new ArrayList<>(this.requestedPartitions),
                     throwOnFetchStableOffsetUnsupported);
             return new NetworkClientDelegate.UnsentRequest(
-                    builder,
-                    coordinatorRequestManager.coordinator(),
-                    (r, t) -> onResponse(r.receivedTimeMs(), 
(OffsetFetchResponse) r.responseBody()));
+                builder,
+                coordinatorRequestManager.coordinator(),
+                (r, t) -> onResponse(r.receivedTimeMs(), (OffsetFetchResponse) 
r.responseBody()));

Review Comment:
   Out of curiosity, can the response never be `null` here as in 
`HeartbeatRequestManager`? Can timeouts not happen here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to