kirktrue commented on code in PR #19886:
URL: https://github.com/apache/kafka/pull/19886#discussion_r2536041299


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -229,6 +242,11 @@ protected void checkDisconnects(final long currentTimeMs) {
                 
asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() - 
u.enqueueTimeMs());
                 AuthenticationException authenticationException = 
client.authenticationException(u.node.get());
                 u.handler.onFailure(currentTimeMs, authenticationException);
+            } else if (u.node.isEmpty() && onClose) {
+                log.debug("Removing unsent request {} because the client is 
closing", u);
+                iter.remove();
+                
asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() - 
u.enqueueTimeMs());
+                u.handler.onFailure(currentTimeMs, 
Errors.NETWORK_EXCEPTION.exception());

Review Comment:
   I had another minor concern regarding the handling in 
`CoordinatorRequestManager` of the `NetworkException` that's passed to 
`onFailure()`. How does the `CoordinatorRequestManager` logic handle that error?
   
   For example, does this result in potentially misleading logging? For 
example, in `CoordinatorRequestManager.markCoordinatorUnknown()`, there is some 
logging that states that `Rediscovery will be attempted`, which isn't really 
true.
   
   Would it be better to pass a different exception type to `onFailure()` that 
we know the `CoordinatorRequestManager` will interpret correctly in this 
special case?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -229,6 +242,11 @@ protected void checkDisconnects(final long currentTimeMs) {
                 
asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() - 
u.enqueueTimeMs());
                 AuthenticationException authenticationException = 
client.authenticationException(u.node.get());
                 u.handler.onFailure(currentTimeMs, authenticationException);
+            } else if (u.node.isEmpty() && onClose) {
+                log.debug("Removing unsent request {} because the client is 
closing", u);
+                iter.remove();
+                
asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() - 
u.enqueueTimeMs());
+                u.handler.onFailure(currentTimeMs, 
Errors.NETWORK_EXCEPTION.exception());

Review Comment:
   IIUC, the logic in this PR is assuming the `UnsentRequest` represents a 
`FIND_COORDINATOR` RPC because it has an empty `Node`. I was curious if we 
could make the check a little more explicit, for example, in `UnsentRequest`:
   
   ```java
   public boolean isFindCoordinatorRequest() {
       return requestBuilder.apiKey() == ApiKeys.FIND_COORDINATOR;
   }
   ```
   
   And then the code here becomes more clear:
   
   ```java
               } else if (u.isFindCoordinatorRequest() && onClose) {
                   log.debug("Removing unsent request {} because the client is 
closing", u);
                   iter.remove();
                   
asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() - 
u.enqueueTimeMs());
                   u.handler.onFailure(currentTimeMs, 
Errors.NETWORK_EXCEPTION.exception());
               }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to