dajac commented on a change in pull request #11019: URL: https://github.com/apache/kafka/pull/11019#discussion_r668781007
########## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java ########## @@ -100,51 +100,115 @@ public String apiName() { final OffsetDeleteResponse response = (OffsetDeleteResponse) abstractResponse; Map<CoordinatorKey, Map<TopicPartition, Errors>> completed = new HashMap<>(); Map<CoordinatorKey, Throwable> failed = new HashMap<>(); - List<CoordinatorKey> unmapped = new ArrayList<>(); + final Set<CoordinatorKey> groupsToUnmap = new HashSet<>(); Review comment: Not related to this line. Is it worth verifying that `groupIds` only contains the expected `groupId` here and in `buildRequest`? I did it here: https://github.com/apache/kafka/pull/11016/files#diff-72f508d8e6b9b7f8fde5de8b75bedb6e7985824b71d00fb172338ec9c4782651R121. ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java ########## @@ -100,51 +100,115 @@ public String apiName() { final OffsetDeleteResponse response = (OffsetDeleteResponse) abstractResponse; Map<CoordinatorKey, Map<TopicPartition, Errors>> completed = new HashMap<>(); Map<CoordinatorKey, Throwable> failed = new HashMap<>(); - List<CoordinatorKey> unmapped = new ArrayList<>(); + final Set<CoordinatorKey> groupsToUnmap = new HashSet<>(); + final Set<CoordinatorKey> groupsToRetry = new HashSet<>(); final Errors error = Errors.forCode(response.data().errorCode()); if (error != Errors.NONE) { - handleError(groupId, error, failed, unmapped); + handleGroupError(groupId, error, failed, groupsToUnmap, groupsToRetry); } else { - final Map<TopicPartition, Errors> partitions = new HashMap<>(); - response.data().topics().forEach(topic -> - topic.partitions().forEach(partition -> { - Errors partitionError = Errors.forCode(partition.errorCode()); - if (!handleError(groupId, partitionError, failed, unmapped)) { - partitions.put(new TopicPartition(topic.name(), partition.partitionIndex()), partitionError); + final Map<TopicPartition, Errors> partitionResults = new HashMap<>(); + response.data().topics().forEach(topic -> + topic.partitions().forEach(partitionoffsetDeleteResponse -> { + Errors partitionError = Errors.forCode(partitionoffsetDeleteResponse.errorCode()); + TopicPartition topicPartition = new TopicPartition(topic.name(), partitionoffsetDeleteResponse.partitionIndex()); + if (partitionError != Errors.NONE) { + handlePartitionError(groupId, partitionError, topicPartition, groupsToUnmap, groupsToRetry); } + + partitionResults.put(new TopicPartition(topic.name(), partitionoffsetDeleteResponse.partitionIndex()), partitionError); }) ); - if (!partitions.isEmpty()) - completed.put(groupId, partitions); + + completed.put(groupId, partitionResults); + } + + if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) { + return new ApiResult<>( + completed, + failed, + Collections.emptyList() + ); + } else { + // retry the request, so don't send completed/failed results back + return new ApiResult<>( + Collections.emptyMap(), + Collections.emptyMap(), + new ArrayList<>(groupsToUnmap) + ); } - return new ApiResult<>(completed, failed, unmapped); } - private boolean handleError( + private void handleGroupError( CoordinatorKey groupId, Errors error, Map<CoordinatorKey, Throwable> failed, - List<CoordinatorKey> unmapped + Set<CoordinatorKey> groupsToUnmap, + Set<CoordinatorKey> groupsToRetry ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: case GROUP_ID_NOT_FOUND: case INVALID_GROUP_ID: - log.error("Received non retriable error for group {} in `DeleteConsumerGroupOffsets` response", groupId, - error.exception()); + case NON_EMPTY_GROUP: + log.error("Received non retriable error for group {} in `{}` response", groupId, + apiName(), error.exception()); failed.put(groupId, error.exception()); - return true; + break; + case COORDINATOR_LOAD_IN_PROGRESS: + // If the coordinator is in the middle of loading, then we just need to retry + log.debug("`{}` request for group {} failed because the coordinator" + + " is still in the process of loading state. Will retry.", apiName(), groupId); + groupsToRetry.add(groupId); + break; + case COORDINATOR_NOT_AVAILABLE: + case NOT_COORDINATOR: + // If the coordinator is unavailable or there was a coordinator change, then we unmap + // the key so that we retry the `FindCoordinator` request + log.debug("`{}` request for group {} returned error {}. " + + "Will attempt to find the coordinator again and retry.", apiName(), groupId, error); + groupsToUnmap.add(groupId); + break; + default: + final String unexpectedErrorMsg = String.format("Received unexpected error for group %s in `%s` response", Review comment: `unexpectedErrorMsg` is not necessary as used only once. I would also follow the same partern that we use for other messages. ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java ########## @@ -100,51 +100,115 @@ public String apiName() { final OffsetDeleteResponse response = (OffsetDeleteResponse) abstractResponse; Map<CoordinatorKey, Map<TopicPartition, Errors>> completed = new HashMap<>(); Map<CoordinatorKey, Throwable> failed = new HashMap<>(); - List<CoordinatorKey> unmapped = new ArrayList<>(); + final Set<CoordinatorKey> groupsToUnmap = new HashSet<>(); + final Set<CoordinatorKey> groupsToRetry = new HashSet<>(); final Errors error = Errors.forCode(response.data().errorCode()); if (error != Errors.NONE) { - handleError(groupId, error, failed, unmapped); + handleGroupError(groupId, error, failed, groupsToUnmap, groupsToRetry); } else { - final Map<TopicPartition, Errors> partitions = new HashMap<>(); - response.data().topics().forEach(topic -> - topic.partitions().forEach(partition -> { - Errors partitionError = Errors.forCode(partition.errorCode()); - if (!handleError(groupId, partitionError, failed, unmapped)) { - partitions.put(new TopicPartition(topic.name(), partition.partitionIndex()), partitionError); + final Map<TopicPartition, Errors> partitionResults = new HashMap<>(); + response.data().topics().forEach(topic -> + topic.partitions().forEach(partitionoffsetDeleteResponse -> { + Errors partitionError = Errors.forCode(partitionoffsetDeleteResponse.errorCode()); + TopicPartition topicPartition = new TopicPartition(topic.name(), partitionoffsetDeleteResponse.partitionIndex()); + if (partitionError != Errors.NONE) { + handlePartitionError(groupId, partitionError, topicPartition, groupsToUnmap, groupsToRetry); Review comment: I am actually not sure about this. Looking at the code on the broker side, it seems that group errors are always returned in the top level error field. I think that we could simply return the partition errors without checking them. ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java ########## @@ -100,51 +100,115 @@ public String apiName() { final OffsetDeleteResponse response = (OffsetDeleteResponse) abstractResponse; Map<CoordinatorKey, Map<TopicPartition, Errors>> completed = new HashMap<>(); Map<CoordinatorKey, Throwable> failed = new HashMap<>(); - List<CoordinatorKey> unmapped = new ArrayList<>(); + final Set<CoordinatorKey> groupsToUnmap = new HashSet<>(); + final Set<CoordinatorKey> groupsToRetry = new HashSet<>(); final Errors error = Errors.forCode(response.data().errorCode()); if (error != Errors.NONE) { - handleError(groupId, error, failed, unmapped); + handleGroupError(groupId, error, failed, groupsToUnmap, groupsToRetry); } else { - final Map<TopicPartition, Errors> partitions = new HashMap<>(); - response.data().topics().forEach(topic -> - topic.partitions().forEach(partition -> { - Errors partitionError = Errors.forCode(partition.errorCode()); - if (!handleError(groupId, partitionError, failed, unmapped)) { - partitions.put(new TopicPartition(topic.name(), partition.partitionIndex()), partitionError); + final Map<TopicPartition, Errors> partitionResults = new HashMap<>(); + response.data().topics().forEach(topic -> + topic.partitions().forEach(partitionoffsetDeleteResponse -> { + Errors partitionError = Errors.forCode(partitionoffsetDeleteResponse.errorCode()); + TopicPartition topicPartition = new TopicPartition(topic.name(), partitionoffsetDeleteResponse.partitionIndex()); + if (partitionError != Errors.NONE) { + handlePartitionError(groupId, partitionError, topicPartition, groupsToUnmap, groupsToRetry); } + + partitionResults.put(new TopicPartition(topic.name(), partitionoffsetDeleteResponse.partitionIndex()), partitionError); }) ); - if (!partitions.isEmpty()) - completed.put(groupId, partitions); + + completed.put(groupId, partitionResults); + } + + if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) { + return new ApiResult<>( + completed, + failed, + Collections.emptyList() + ); + } else { + // retry the request, so don't send completed/failed results back + return new ApiResult<>( + Collections.emptyMap(), + Collections.emptyMap(), + new ArrayList<>(groupsToUnmap) + ); } - return new ApiResult<>(completed, failed, unmapped); } - private boolean handleError( + private void handleGroupError( CoordinatorKey groupId, Errors error, Map<CoordinatorKey, Throwable> failed, - List<CoordinatorKey> unmapped + Set<CoordinatorKey> groupsToUnmap, + Set<CoordinatorKey> groupsToRetry ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: case GROUP_ID_NOT_FOUND: case INVALID_GROUP_ID: - log.error("Received non retriable error for group {} in `DeleteConsumerGroupOffsets` response", groupId, - error.exception()); + case NON_EMPTY_GROUP: + log.error("Received non retriable error for group {} in `{}` response", groupId, + apiName(), error.exception()); failed.put(groupId, error.exception()); - return true; + break; + case COORDINATOR_LOAD_IN_PROGRESS: + // If the coordinator is in the middle of loading, then we just need to retry + log.debug("`{}` request for group {} failed because the coordinator" + + " is still in the process of loading state. Will retry.", apiName(), groupId); Review comment: I am not a fan of using `apiName()` here because the name `offsetDelete` does not start with a capital letter. ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java ########## @@ -100,51 +100,115 @@ public String apiName() { final OffsetDeleteResponse response = (OffsetDeleteResponse) abstractResponse; Map<CoordinatorKey, Map<TopicPartition, Errors>> completed = new HashMap<>(); Map<CoordinatorKey, Throwable> failed = new HashMap<>(); - List<CoordinatorKey> unmapped = new ArrayList<>(); + final Set<CoordinatorKey> groupsToUnmap = new HashSet<>(); + final Set<CoordinatorKey> groupsToRetry = new HashSet<>(); final Errors error = Errors.forCode(response.data().errorCode()); if (error != Errors.NONE) { - handleError(groupId, error, failed, unmapped); + handleGroupError(groupId, error, failed, groupsToUnmap, groupsToRetry); } else { - final Map<TopicPartition, Errors> partitions = new HashMap<>(); - response.data().topics().forEach(topic -> - topic.partitions().forEach(partition -> { - Errors partitionError = Errors.forCode(partition.errorCode()); - if (!handleError(groupId, partitionError, failed, unmapped)) { - partitions.put(new TopicPartition(topic.name(), partition.partitionIndex()), partitionError); + final Map<TopicPartition, Errors> partitionResults = new HashMap<>(); + response.data().topics().forEach(topic -> + topic.partitions().forEach(partitionoffsetDeleteResponse -> { + Errors partitionError = Errors.forCode(partitionoffsetDeleteResponse.errorCode()); + TopicPartition topicPartition = new TopicPartition(topic.name(), partitionoffsetDeleteResponse.partitionIndex()); + if (partitionError != Errors.NONE) { + handlePartitionError(groupId, partitionError, topicPartition, groupsToUnmap, groupsToRetry); } + + partitionResults.put(new TopicPartition(topic.name(), partitionoffsetDeleteResponse.partitionIndex()), partitionError); }) ); - if (!partitions.isEmpty()) - completed.put(groupId, partitions); + + completed.put(groupId, partitionResults); + } + + if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) { + return new ApiResult<>( + completed, + failed, + Collections.emptyList() + ); + } else { + // retry the request, so don't send completed/failed results back + return new ApiResult<>( + Collections.emptyMap(), + Collections.emptyMap(), + new ArrayList<>(groupsToUnmap) + ); } - return new ApiResult<>(completed, failed, unmapped); } - private boolean handleError( + private void handleGroupError( CoordinatorKey groupId, Errors error, Map<CoordinatorKey, Throwable> failed, - List<CoordinatorKey> unmapped + Set<CoordinatorKey> groupsToUnmap, + Set<CoordinatorKey> groupsToRetry ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: case GROUP_ID_NOT_FOUND: case INVALID_GROUP_ID: - log.error("Received non retriable error for group {} in `DeleteConsumerGroupOffsets` response", groupId, - error.exception()); + case NON_EMPTY_GROUP: + log.error("Received non retriable error for group {} in `{}` response", groupId, + apiName(), error.exception()); Review comment: Could we try to uniformize the error messages? For instance `OffsetDelete request for group id {} failed due to error {}.` I would also print it as debug and we don't need to provide the exception to the logger. The exception doesn't bring much here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org