dajac commented on a change in pull request #11019:
URL: https://github.com/apache/kafka/pull/11019#discussion_r668781007



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java
##########
@@ -100,51 +100,115 @@ public String apiName() {
         final OffsetDeleteResponse response = (OffsetDeleteResponse) 
abstractResponse;
         Map<CoordinatorKey, Map<TopicPartition, Errors>> completed = new 
HashMap<>();
         Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-        List<CoordinatorKey> unmapped = new ArrayList<>();
+        final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();

Review comment:
       Not related to this line. Is it worth verifying that `groupIds` only 
contains the expected `groupId` here and in `buildRequest`? I did it here: 
https://github.com/apache/kafka/pull/11016/files#diff-72f508d8e6b9b7f8fde5de8b75bedb6e7985824b71d00fb172338ec9c4782651R121.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java
##########
@@ -100,51 +100,115 @@ public String apiName() {
         final OffsetDeleteResponse response = (OffsetDeleteResponse) 
abstractResponse;
         Map<CoordinatorKey, Map<TopicPartition, Errors>> completed = new 
HashMap<>();
         Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-        List<CoordinatorKey> unmapped = new ArrayList<>();
+        final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
+        final Set<CoordinatorKey> groupsToRetry = new HashSet<>();
 
         final Errors error = Errors.forCode(response.data().errorCode());
         if (error != Errors.NONE) {
-            handleError(groupId, error, failed, unmapped);
+            handleGroupError(groupId, error, failed, groupsToUnmap, 
groupsToRetry);
         } else {
-            final Map<TopicPartition, Errors> partitions = new HashMap<>();
-            response.data().topics().forEach(topic -> 
-                topic.partitions().forEach(partition -> {
-                    Errors partitionError = 
Errors.forCode(partition.errorCode());
-                    if (!handleError(groupId, partitionError, failed, 
unmapped)) {
-                        partitions.put(new TopicPartition(topic.name(), 
partition.partitionIndex()), partitionError);
+            final Map<TopicPartition, Errors> partitionResults = new 
HashMap<>();
+            response.data().topics().forEach(topic ->
+                topic.partitions().forEach(partitionoffsetDeleteResponse -> {
+                    Errors partitionError = 
Errors.forCode(partitionoffsetDeleteResponse.errorCode());
+                    TopicPartition topicPartition = new 
TopicPartition(topic.name(), partitionoffsetDeleteResponse.partitionIndex());
+                    if (partitionError != Errors.NONE) {
+                        handlePartitionError(groupId, partitionError, 
topicPartition, groupsToUnmap, groupsToRetry);
                     }
+
+                    partitionResults.put(new TopicPartition(topic.name(), 
partitionoffsetDeleteResponse.partitionIndex()), partitionError);
                 })
             );
-            if (!partitions.isEmpty())
-                completed.put(groupId, partitions);
+
+            completed.put(groupId, partitionResults);
+        }
+
+        if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
+            return new ApiResult<>(
+                completed,
+                failed,
+                Collections.emptyList()
+            );
+        } else {
+            // retry the request, so don't send completed/failed results back
+            return new ApiResult<>(
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                new ArrayList<>(groupsToUnmap)
+            );
         }
-        return new ApiResult<>(completed, failed, unmapped);
     }
 
-    private boolean handleError(
+    private void handleGroupError(
         CoordinatorKey groupId,
         Errors error,
         Map<CoordinatorKey, Throwable> failed,
-        List<CoordinatorKey> unmapped
+        Set<CoordinatorKey> groupsToUnmap,
+        Set<CoordinatorKey> groupsToRetry
     ) {
         switch (error) {
             case GROUP_AUTHORIZATION_FAILED:
             case GROUP_ID_NOT_FOUND:
             case INVALID_GROUP_ID:
-                log.error("Received non retriable error for group {} in 
`DeleteConsumerGroupOffsets` response", groupId,
-                        error.exception());
+            case NON_EMPTY_GROUP:
+                log.error("Received non retriable error for group {} in `{}` 
response", groupId,
+                    apiName(), error.exception());
                 failed.put(groupId, error.exception());
-                return true;
+                break;
+            case COORDINATOR_LOAD_IN_PROGRESS:
+                // If the coordinator is in the middle of loading, then we 
just need to retry
+                log.debug("`{}` request for group {} failed because the 
coordinator" +
+                    " is still in the process of loading state. Will retry.", 
apiName(), groupId);
+                groupsToRetry.add(groupId);
+                break;
+            case COORDINATOR_NOT_AVAILABLE:
+            case NOT_COORDINATOR:
+                // If the coordinator is unavailable or there was a 
coordinator change, then we unmap
+                // the key so that we retry the `FindCoordinator` request
+                log.debug("`{}` request for group {} returned error {}. " +
+                    "Will attempt to find the coordinator again and retry.", 
apiName(), groupId, error);
+                groupsToUnmap.add(groupId);
+                break;
+            default:
+                final String unexpectedErrorMsg = String.format("Received 
unexpected error for group %s in `%s` response",

Review comment:
       `unexpectedErrorMsg` is not necessary as used only once. I would also 
follow the same partern that we use for other messages.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java
##########
@@ -100,51 +100,115 @@ public String apiName() {
         final OffsetDeleteResponse response = (OffsetDeleteResponse) 
abstractResponse;
         Map<CoordinatorKey, Map<TopicPartition, Errors>> completed = new 
HashMap<>();
         Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-        List<CoordinatorKey> unmapped = new ArrayList<>();
+        final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
+        final Set<CoordinatorKey> groupsToRetry = new HashSet<>();
 
         final Errors error = Errors.forCode(response.data().errorCode());
         if (error != Errors.NONE) {
-            handleError(groupId, error, failed, unmapped);
+            handleGroupError(groupId, error, failed, groupsToUnmap, 
groupsToRetry);
         } else {
-            final Map<TopicPartition, Errors> partitions = new HashMap<>();
-            response.data().topics().forEach(topic -> 
-                topic.partitions().forEach(partition -> {
-                    Errors partitionError = 
Errors.forCode(partition.errorCode());
-                    if (!handleError(groupId, partitionError, failed, 
unmapped)) {
-                        partitions.put(new TopicPartition(topic.name(), 
partition.partitionIndex()), partitionError);
+            final Map<TopicPartition, Errors> partitionResults = new 
HashMap<>();
+            response.data().topics().forEach(topic ->
+                topic.partitions().forEach(partitionoffsetDeleteResponse -> {
+                    Errors partitionError = 
Errors.forCode(partitionoffsetDeleteResponse.errorCode());
+                    TopicPartition topicPartition = new 
TopicPartition(topic.name(), partitionoffsetDeleteResponse.partitionIndex());
+                    if (partitionError != Errors.NONE) {
+                        handlePartitionError(groupId, partitionError, 
topicPartition, groupsToUnmap, groupsToRetry);

Review comment:
       I am actually not sure about this. Looking at the code on the broker 
side, it seems that group errors are always returned in the top level error 
field. I think that we could simply return the partition errors without 
checking them.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java
##########
@@ -100,51 +100,115 @@ public String apiName() {
         final OffsetDeleteResponse response = (OffsetDeleteResponse) 
abstractResponse;
         Map<CoordinatorKey, Map<TopicPartition, Errors>> completed = new 
HashMap<>();
         Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-        List<CoordinatorKey> unmapped = new ArrayList<>();
+        final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
+        final Set<CoordinatorKey> groupsToRetry = new HashSet<>();
 
         final Errors error = Errors.forCode(response.data().errorCode());
         if (error != Errors.NONE) {
-            handleError(groupId, error, failed, unmapped);
+            handleGroupError(groupId, error, failed, groupsToUnmap, 
groupsToRetry);
         } else {
-            final Map<TopicPartition, Errors> partitions = new HashMap<>();
-            response.data().topics().forEach(topic -> 
-                topic.partitions().forEach(partition -> {
-                    Errors partitionError = 
Errors.forCode(partition.errorCode());
-                    if (!handleError(groupId, partitionError, failed, 
unmapped)) {
-                        partitions.put(new TopicPartition(topic.name(), 
partition.partitionIndex()), partitionError);
+            final Map<TopicPartition, Errors> partitionResults = new 
HashMap<>();
+            response.data().topics().forEach(topic ->
+                topic.partitions().forEach(partitionoffsetDeleteResponse -> {
+                    Errors partitionError = 
Errors.forCode(partitionoffsetDeleteResponse.errorCode());
+                    TopicPartition topicPartition = new 
TopicPartition(topic.name(), partitionoffsetDeleteResponse.partitionIndex());
+                    if (partitionError != Errors.NONE) {
+                        handlePartitionError(groupId, partitionError, 
topicPartition, groupsToUnmap, groupsToRetry);
                     }
+
+                    partitionResults.put(new TopicPartition(topic.name(), 
partitionoffsetDeleteResponse.partitionIndex()), partitionError);
                 })
             );
-            if (!partitions.isEmpty())
-                completed.put(groupId, partitions);
+
+            completed.put(groupId, partitionResults);
+        }
+
+        if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
+            return new ApiResult<>(
+                completed,
+                failed,
+                Collections.emptyList()
+            );
+        } else {
+            // retry the request, so don't send completed/failed results back
+            return new ApiResult<>(
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                new ArrayList<>(groupsToUnmap)
+            );
         }
-        return new ApiResult<>(completed, failed, unmapped);
     }
 
-    private boolean handleError(
+    private void handleGroupError(
         CoordinatorKey groupId,
         Errors error,
         Map<CoordinatorKey, Throwable> failed,
-        List<CoordinatorKey> unmapped
+        Set<CoordinatorKey> groupsToUnmap,
+        Set<CoordinatorKey> groupsToRetry
     ) {
         switch (error) {
             case GROUP_AUTHORIZATION_FAILED:
             case GROUP_ID_NOT_FOUND:
             case INVALID_GROUP_ID:
-                log.error("Received non retriable error for group {} in 
`DeleteConsumerGroupOffsets` response", groupId,
-                        error.exception());
+            case NON_EMPTY_GROUP:
+                log.error("Received non retriable error for group {} in `{}` 
response", groupId,
+                    apiName(), error.exception());
                 failed.put(groupId, error.exception());
-                return true;
+                break;
+            case COORDINATOR_LOAD_IN_PROGRESS:
+                // If the coordinator is in the middle of loading, then we 
just need to retry
+                log.debug("`{}` request for group {} failed because the 
coordinator" +
+                    " is still in the process of loading state. Will retry.", 
apiName(), groupId);

Review comment:
       I am not a fan of using `apiName()` here because the name `offsetDelete` 
does not start with a capital letter.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java
##########
@@ -100,51 +100,115 @@ public String apiName() {
         final OffsetDeleteResponse response = (OffsetDeleteResponse) 
abstractResponse;
         Map<CoordinatorKey, Map<TopicPartition, Errors>> completed = new 
HashMap<>();
         Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-        List<CoordinatorKey> unmapped = new ArrayList<>();
+        final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
+        final Set<CoordinatorKey> groupsToRetry = new HashSet<>();
 
         final Errors error = Errors.forCode(response.data().errorCode());
         if (error != Errors.NONE) {
-            handleError(groupId, error, failed, unmapped);
+            handleGroupError(groupId, error, failed, groupsToUnmap, 
groupsToRetry);
         } else {
-            final Map<TopicPartition, Errors> partitions = new HashMap<>();
-            response.data().topics().forEach(topic -> 
-                topic.partitions().forEach(partition -> {
-                    Errors partitionError = 
Errors.forCode(partition.errorCode());
-                    if (!handleError(groupId, partitionError, failed, 
unmapped)) {
-                        partitions.put(new TopicPartition(topic.name(), 
partition.partitionIndex()), partitionError);
+            final Map<TopicPartition, Errors> partitionResults = new 
HashMap<>();
+            response.data().topics().forEach(topic ->
+                topic.partitions().forEach(partitionoffsetDeleteResponse -> {
+                    Errors partitionError = 
Errors.forCode(partitionoffsetDeleteResponse.errorCode());
+                    TopicPartition topicPartition = new 
TopicPartition(topic.name(), partitionoffsetDeleteResponse.partitionIndex());
+                    if (partitionError != Errors.NONE) {
+                        handlePartitionError(groupId, partitionError, 
topicPartition, groupsToUnmap, groupsToRetry);
                     }
+
+                    partitionResults.put(new TopicPartition(topic.name(), 
partitionoffsetDeleteResponse.partitionIndex()), partitionError);
                 })
             );
-            if (!partitions.isEmpty())
-                completed.put(groupId, partitions);
+
+            completed.put(groupId, partitionResults);
+        }
+
+        if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
+            return new ApiResult<>(
+                completed,
+                failed,
+                Collections.emptyList()
+            );
+        } else {
+            // retry the request, so don't send completed/failed results back
+            return new ApiResult<>(
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                new ArrayList<>(groupsToUnmap)
+            );
         }
-        return new ApiResult<>(completed, failed, unmapped);
     }
 
-    private boolean handleError(
+    private void handleGroupError(
         CoordinatorKey groupId,
         Errors error,
         Map<CoordinatorKey, Throwable> failed,
-        List<CoordinatorKey> unmapped
+        Set<CoordinatorKey> groupsToUnmap,
+        Set<CoordinatorKey> groupsToRetry
     ) {
         switch (error) {
             case GROUP_AUTHORIZATION_FAILED:
             case GROUP_ID_NOT_FOUND:
             case INVALID_GROUP_ID:
-                log.error("Received non retriable error for group {} in 
`DeleteConsumerGroupOffsets` response", groupId,
-                        error.exception());
+            case NON_EMPTY_GROUP:
+                log.error("Received non retriable error for group {} in `{}` 
response", groupId,
+                    apiName(), error.exception());

Review comment:
       Could we try to uniformize the error messages? For instance 
`OffsetDelete request for group id {} failed due to error {}.` I would also 
print it as debug and we don't need to provide the exception to the logger. The 
exception doesn't bring much here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to