lianetm commented on code in PR #18050:
URL: https://github.com/apache/kafka/pull/18050#discussion_r1887078953


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -163,6 +163,11 @@ public abstract class AbstractHeartbeatRequestManager<R 
extends AbstractResponse
     public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
         if (coordinatorRequestManager.coordinator().isEmpty() || 
membershipManager().shouldSkipHeartbeat()) {
             membershipManager().onHeartbeatRequestSkipped();
+            // When Consumer#poll, it will need to check Error events without 
triggering an API event,
+            // also some managers may need to take actions based on the Error 
events. Thus, the fatal error
+            // lifecycle is end, and the error event is propagated to the 
event queue.
+            coordinatorRequestManager.getAndClearFatalError()
+                    .ifPresent(fatalError -> backgroundEventHandler.add(new 
ErrorEvent(fatalError)));

Review Comment:
   what do you think about encapsulating this is a kind of 
`maybePropagateCoordinatorFatalErrorEvent`. I think it will make the intention 
much clearer (without needing comments)



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -176,9 +176,18 @@ public CommitRequestManager(
      */
     @Override
     public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
-        // poll only when the coordinator node is known.
-        if (coordinatorRequestManager.coordinator().isEmpty())
+        // poll when the coordinator node is known and fatal errors are not 
present
+        if (coordinatorRequestManager.coordinator().isEmpty()) {
+            // The fatal error completeExceptionally all pending requests, but 
application thread may not get the
+            // fatal error When Consumer#poll, because it will need to check 
Error events without triggering an API event.
+            // So, we won't clear the fatal error here.
+            Optional<Throwable> fatalError = 
coordinatorRequestManager.fatalError();
+            if (fatalError.isPresent()) {
+                pendingRequests.unsentOffsetCommits.forEach(request -> 
request.future.completeExceptionally(fatalError.get()));
+                pendingRequests.unsentOffsetFetches.forEach(request -> 
request.future.completeExceptionally(fatalError.get()));
+            }

Review Comment:
   for clarity, and to find some consistency with how the HBMgr handles 
coordinator errors, we could encapsulate this in a kind of 
`maybeFailPendingRequestsOnCoordinatorFatalError`? maybe with that it's clear 
enough and we don't even need the comments 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##########
@@ -53,24 +51,22 @@
 public class CoordinatorRequestManager implements RequestManager {
     private static final long COORDINATOR_DISCONNECT_LOGGING_INTERVAL_MS = 60 
* 1000;
     private final Logger log;
-    private final BackgroundEventHandler backgroundEventHandler;
     private final String groupId;
 
     private final RequestState coordinatorRequestState;
     private long timeMarkedUnknownMs = -1L; // starting logging a warning only 
after unable to connect for a while
     private long totalDisconnectedMin = 0;
     private Node coordinator;
+    private Optional<Throwable> fatalError = Optional.empty();

Review Comment:
   here I would suggest a comment to describe this is the last fatal error 
received, exposed so that managers that require a coordinator can know about it 
and take whatever action the need (ex. HB manager propagates error event to app 
thread, commit mgr cancel cached requests)



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##########
@@ -188,12 +184,12 @@ private void onFailedResponse(final long currentTimeMs, 
final Throwable exceptio
         if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) {
             log.debug("FindCoordinator request failed due to authorization 
error {}", exception.getMessage());
             KafkaException groupAuthorizationException = 
GroupAuthorizationException.forGroupId(this.groupId);
-            backgroundEventHandler.add(new 
ErrorEvent(groupAuthorizationException));
+            fatalError = Optional.of(groupAuthorizationException);
             return;
         }
 
         log.warn("FindCoordinator request failed due to fatal exception", 
exception);
-        backgroundEventHandler.add(new ErrorEvent(exception));
+        fatalError = Optional.of(exception);

Review Comment:
   I think clearing it on the HB manager should be enough, given that it's the 
one propagating it to the app thread (that's what we need to make sure happens 
before clearing it I would say)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to