lianetm commented on code in PR #17440:
URL: https://github.com/apache/kafka/pull/17440#discussion_r1846829642
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##########
@@ -188,6 +190,8 @@ private void onFailedResponse(final long currentTimeMs,
final Throwable exceptio
if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) {
log.debug("FindCoordinator request failed due to authorization
error {}", exception.getMessage());
KafkaException groupAuthorizationException =
GroupAuthorizationException.forGroupId(this.groupId);
+ metadataError.completeExceptionally(groupAuthorizationException);
+ metadataError = metadataError.newIncompleteFuture();
Review Comment:
Just with the goal of simplifying here, is a future really needed or just
keeping the error could do? The main difference with the metadata errors is
that the `CommitRequestManager.poll` called regularly will have the pending
requests in hand, so we could fail them if there is a fatal error in the
coordinator manager. Would that work?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -425,6 +427,13 @@ public CompletableFuture<Map<TopicPartition,
OffsetAndMetadata>> commitSync(fina
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = new
CompletableFuture<>();
OffsetCommitRequestState requestState =
createOffsetCommitRequest(commitOffsets, deadlineMs);
commitSyncWithRetries(requestState, result);
+ new ArrayList<>(metadataErrors).forEach(metadataError ->
metadataError.whenComplete((__, error) -> {
+ if (error != null) {
+ result.completeExceptionally(error);
+ metadataErrors.remove(metadataError);
+ }
+ }));
+
Review Comment:
this is only handling the commit sync but we do have other requests that
should also fail if there is a fatal error in the coordinator. Given that all
requests that this manager sends end up being added to the `pendingRequests`,
and we attempt to send them on poll, shouldn't we just check on poll of there
is no coordinator due to a fatal error and fail all pendingRequests? It would
ensure we behave consistently for all requests.
```
public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
if (!coordinatorRequestManager.coordinator().isPresent()) {
if (coordinatorRequestManager.fatalError().isPresent()) {
// Fatal error looking up coordinator => Fail all
pendingRequests
}
return EMPTY;
}
...
```
I created this separate Jira
https://issues.apache.org/jira/browse/KAFKA-18034 for this, given that it's a
different gap and fix. I would suggest we address that in a separate/simpler PR
with these thoughts, what do you think?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]