kirktrue commented on code in PR #18050:
URL: https://github.com/apache/kafka/pull/18050#discussion_r1904717131
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -196,6 +198,16 @@ public NetworkClientDelegate.PollResult poll(final long
currentTimeMs) {
return new NetworkClientDelegate.PollResult(timeUntilNextPoll,
requests);
}
+ private void maybeFailPendingRequestsOnCoordinatorFatalError() {
+ Optional<Throwable> fatalError =
coordinatorRequestManager.fatalError();
+ if (fatalError.isPresent()) {
+ log.warn("Failing all unset commit requests and offset fetches
because of coordinator fatal error. ", fatalError.get());
Review Comment:
```suggestion
log.warn("Failing all unsent commit requests and offset fetches
because of coordinator fatal error. ", fatalError.get());
```
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -196,6 +198,14 @@ public NetworkClientDelegate.PollResult poll(final long
currentTimeMs) {
return new NetworkClientDelegate.PollResult(timeUntilNextPoll,
requests);
}
+ private void maybeFailPendingRequestsOnCoordinatorFatalError() {
+ Optional<Throwable> fatalError =
coordinatorRequestManager.fatalError();
+ if (fatalError.isPresent()) {
+ pendingRequests.unsentOffsetCommits.forEach(request ->
request.future.completeExceptionally(fatalError.get()));
+ pendingRequests.unsentOffsetFetches.forEach(request ->
request.future.completeExceptionally(fatalError.get()));
Review Comment:
> don't we need to remove the pending requests that we're failing here?
`pendingRequests` are usually cleared on poll [clearAll]
Isn't that what `clearAll()` on line 207 does?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -263,6 +264,11 @@ public void resetPollTimer(final long pollMs) {
pollTimer.reset(maxPollIntervalMs);
}
+ private void maybePropagateCoordinatorFatalErrorEvent() {
+ coordinatorRequestManager.getAndClearFatalError()
+ .ifPresent(fatalError -> backgroundEventHandler.add(new
ErrorEvent(fatalError)));
+ }
+
Review Comment:
My apologies—it's still not clear from looking at the code why the
`AbstractHeartbeatRequestManager` clears the fatal error but
`CommitRequestManager` doesn't. Can you provide some context for the different
ways of interacting with the error? Thanks!
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -196,6 +198,16 @@ public NetworkClientDelegate.PollResult poll(final long
currentTimeMs) {
return new NetworkClientDelegate.PollResult(timeUntilNextPoll,
requests);
}
+ private void maybeFailPendingRequestsOnCoordinatorFatalError() {
Review Comment:
What about moving the entire
`maybeFailPendingRequestsOnCoordinatorFatalError()` method into the inner
`PendingRequests` class directly? That way the outer class doesn't have to
worry about the details of how to fail them, only that it has to call
`pendingRequests.maybeFailOnCoordinatorFatalError()`?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##########
@@ -188,12 +184,12 @@ private void onFailedResponse(final long currentTimeMs,
final Throwable exceptio
if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) {
log.debug("FindCoordinator request failed due to authorization
error {}", exception.getMessage());
KafkaException groupAuthorizationException =
GroupAuthorizationException.forGroupId(this.groupId);
- backgroundEventHandler.add(new
ErrorEvent(groupAuthorizationException));
+ fatalError = Optional.of(groupAuthorizationException);
return;
}
log.warn("FindCoordinator request failed due to fatal exception",
exception);
- backgroundEventHandler.add(new ErrorEvent(exception));
+ fatalError = Optional.of(exception);
Review Comment:
Clearing the fatal error on a successful response from the coordinator seems
_intuitively_ correct, but I could easily be wrong.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##########
@@ -53,24 +51,27 @@
public class CoordinatorRequestManager implements RequestManager {
private static final long COORDINATOR_DISCONNECT_LOGGING_INTERVAL_MS = 60
* 1000;
private final Logger log;
- private final BackgroundEventHandler backgroundEventHandler;
private final String groupId;
private final RequestState coordinatorRequestState;
private long timeMarkedUnknownMs = -1L; // starting logging a warning only
after unable to connect for a while
private long totalDisconnectedMin = 0;
private Node coordinator;
+ // Hold the last fatal error received. It is exposed so that managers
requiring a coordinator can access it and take
Review Comment:
Super nit picky, but can we change the term _last_ to either _latest_ or
_most recent_?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]