lianetm commented on code in PR #18050:
URL: https://github.com/apache/kafka/pull/18050#discussion_r1901880810


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -196,6 +198,14 @@ public NetworkClientDelegate.PollResult poll(final long 
currentTimeMs) {
         return new NetworkClientDelegate.PollResult(timeUntilNextPoll, 
requests);
     }
 
+    private void maybeFailPendingRequestsOnCoordinatorFatalError() {
+        Optional<Throwable> fatalError = 
coordinatorRequestManager.fatalError();
+        if (fatalError.isPresent()) {
+            pendingRequests.unsentOffsetCommits.forEach(request -> 
request.future.completeExceptionally(fatalError.get()));

Review Comment:
   should we add a debug line here with something like "Failing all pending 
offset fetch and commit requests due to coordinator fatal error"?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##########
@@ -188,12 +184,12 @@ private void onFailedResponse(final long currentTimeMs, 
final Throwable exceptio
         if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) {
             log.debug("FindCoordinator request failed due to authorization 
error {}", exception.getMessage());
             KafkaException groupAuthorizationException = 
GroupAuthorizationException.forGroupId(this.groupId);
-            backgroundEventHandler.add(new 
ErrorEvent(groupAuthorizationException));
+            fatalError = Optional.of(groupAuthorizationException);
             return;
         }
 
         log.warn("FindCoordinator request failed due to fatal exception", 
exception);
-        backgroundEventHandler.add(new ErrorEvent(exception));
+        fatalError = Optional.of(exception);

Review Comment:
   second thoughts on this one, should we clear it also when we receive a 
successful coordinator response? 
   
https://github.com/apache/kafka/blob/9818e9db26fa4eb1242f2dcba384702781e50930/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java#L176
   
   Just for completeness, to have this coordinator mgr not leaving any loose 
ends. I expect it's really not needed at this point just because of the way we 
poll all managers (all polled sequentially, HB always polled if there is a 
groupId, even if not in group), but clearing it after a "transient" fatal error 
(ie. missing ACLs added) seems conceptually right.  Thoughts?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -196,6 +198,14 @@ public NetworkClientDelegate.PollResult poll(final long 
currentTimeMs) {
         return new NetworkClientDelegate.PollResult(timeUntilNextPoll, 
requests);
     }
 
+    private void maybeFailPendingRequestsOnCoordinatorFatalError() {
+        Optional<Throwable> fatalError = 
coordinatorRequestManager.fatalError();
+        if (fatalError.isPresent()) {
+            pendingRequests.unsentOffsetCommits.forEach(request -> 
request.future.completeExceptionally(fatalError.get()));
+            pendingRequests.unsentOffsetFetches.forEach(request -> 
request.future.completeExceptionally(fatalError.get()));

Review Comment:
   don't we need to remove the pending requests that we're failing here? 
`pendingRequests` are usually cleared on poll 
[clearAll](https://github.com/apache/kafka/blob/64bbdb1a0319044d1fa60e3836ab4052c7a28c12/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L1221),
 but if there is a fatal error we won't make it there right?
   
   If this fix makes sense, let's try to add a unit test to ensure that we 
don't leave any pending commit request after a fatal error with the 
coordinator. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to