dajac commented on a change in pull request #11016:
URL: https://github.com/apache/kafka/pull/11016#discussion_r672231624



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
##########
@@ -105,53 +118,96 @@ public String apiName() {
         Set<CoordinatorKey> groupIds,
         AbstractResponse abstractResponse
     ) {
+        validateKeys(groupIds);
+
         final OffsetCommitResponse response = (OffsetCommitResponse) 
abstractResponse;
-        Map<CoordinatorKey, Map<TopicPartition, Errors>> completed = new 
HashMap<>();
-        Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-        List<CoordinatorKey> unmapped = new ArrayList<>();
+        final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
+        final Set<CoordinatorKey> groupsToRetry = new HashSet<>();
+        final Map<TopicPartition, Errors> partitionResults = new HashMap<>();
 
-        Map<TopicPartition, Errors> partitions = new HashMap<>();
         for (OffsetCommitResponseTopic topic : response.data().topics()) {
             for (OffsetCommitResponsePartition partition : topic.partitions()) 
{
-                TopicPartition tp = new TopicPartition(topic.name(), 
partition.partitionIndex());
+                TopicPartition topicPartition = new 
TopicPartition(topic.name(), partition.partitionIndex());
                 Errors error = Errors.forCode(partition.errorCode());
+
                 if (error != Errors.NONE) {
-                    handleError(groupId, error, failed, unmapped);
+                    handleError(
+                        groupId,
+                        topicPartition,
+                        error,
+                        partitionResults,
+                        groupsToUnmap,
+                        groupsToRetry
+                    );
                 } else {
-                    partitions.put(tp, error);
+                    partitionResults.put(topicPartition, error);
                 }
             }
         }
-        if (failed.isEmpty() && unmapped.isEmpty())
-            completed.put(groupId, partitions);
 
-        return new ApiResult<>(completed, failed, unmapped);
+        if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
+            return new ApiResult<>(
+                Collections.singletonMap(groupId, partitionResults),
+                Collections.emptyMap(),
+                Collections.emptyList()
+            );
+        } else {
+            return new ApiResult<>(
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                new ArrayList<>(groupsToUnmap)
+            );
+        }
     }
 
     private void handleError(
         CoordinatorKey groupId,
+        TopicPartition topicPartition,
         Errors error,
-        Map<CoordinatorKey, Throwable> failed,
-        List<CoordinatorKey> unmapped
+        Map<TopicPartition, Errors> partitionResults,
+        Set<CoordinatorKey> groupsToUnmap,
+        Set<CoordinatorKey> groupsToRetry
     ) {
         switch (error) {
-            case GROUP_AUTHORIZATION_FAILED:
-                log.error("Received authorization failure for group {} in 
`OffsetCommit` response", groupId,
-                        error.exception());
-                failed.put(groupId, error.exception());
-                break;
+            // If the coordinator is in the middle of loading, then we just 
need to retry.
             case COORDINATOR_LOAD_IN_PROGRESS:
+                log.debug("OffsetCommit request for group id {} failed because 
the coordinator" +
+                    " is still in the process of loading state. Will retry.", 
groupId.idValue);
+                groupsToRetry.add(groupId);
+                break;
+
+            // If the coordinator is not available, then we unmap and retry.
             case COORDINATOR_NOT_AVAILABLE:
             case NOT_COORDINATOR:
-                log.debug("OffsetCommit request for group {} returned error 
{}. Will retry", groupId, error);
-                unmapped.add(groupId);
+                log.debug("OffsetCommit request for group id {} returned error 
{}. Will retry.",
+                    groupId.idValue, error);
+                groupsToUnmap.add(groupId);
                 break;
+
+            // Group level errors.
+            case INVALID_GROUP_ID:
+            case REBALANCE_IN_PROGRESS:
+            case INVALID_COMMIT_OFFSET_SIZE:

Review comment:
       I think it is. It basically indicate that we could write the group 
metadata to the log so it concerns the group. 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L448

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
##########
@@ -105,53 +118,96 @@ public String apiName() {
         Set<CoordinatorKey> groupIds,
         AbstractResponse abstractResponse
     ) {
+        validateKeys(groupIds);
+
         final OffsetCommitResponse response = (OffsetCommitResponse) 
abstractResponse;
-        Map<CoordinatorKey, Map<TopicPartition, Errors>> completed = new 
HashMap<>();
-        Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-        List<CoordinatorKey> unmapped = new ArrayList<>();
+        final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
+        final Set<CoordinatorKey> groupsToRetry = new HashSet<>();
+        final Map<TopicPartition, Errors> partitionResults = new HashMap<>();
 
-        Map<TopicPartition, Errors> partitions = new HashMap<>();
         for (OffsetCommitResponseTopic topic : response.data().topics()) {
             for (OffsetCommitResponsePartition partition : topic.partitions()) 
{
-                TopicPartition tp = new TopicPartition(topic.name(), 
partition.partitionIndex());
+                TopicPartition topicPartition = new 
TopicPartition(topic.name(), partition.partitionIndex());
                 Errors error = Errors.forCode(partition.errorCode());
+
                 if (error != Errors.NONE) {
-                    handleError(groupId, error, failed, unmapped);
+                    handleError(
+                        groupId,
+                        topicPartition,
+                        error,
+                        partitionResults,
+                        groupsToUnmap,
+                        groupsToRetry
+                    );
                 } else {
-                    partitions.put(tp, error);
+                    partitionResults.put(topicPartition, error);
                 }
             }
         }
-        if (failed.isEmpty() && unmapped.isEmpty())
-            completed.put(groupId, partitions);
 
-        return new ApiResult<>(completed, failed, unmapped);
+        if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
+            return new ApiResult<>(
+                Collections.singletonMap(groupId, partitionResults),
+                Collections.emptyMap(),
+                Collections.emptyList()
+            );
+        } else {
+            return new ApiResult<>(
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                new ArrayList<>(groupsToUnmap)
+            );
+        }
     }
 
     private void handleError(
         CoordinatorKey groupId,
+        TopicPartition topicPartition,
         Errors error,
-        Map<CoordinatorKey, Throwable> failed,
-        List<CoordinatorKey> unmapped
+        Map<TopicPartition, Errors> partitionResults,
+        Set<CoordinatorKey> groupsToUnmap,
+        Set<CoordinatorKey> groupsToRetry
     ) {
         switch (error) {
-            case GROUP_AUTHORIZATION_FAILED:
-                log.error("Received authorization failure for group {} in 
`OffsetCommit` response", groupId,
-                        error.exception());
-                failed.put(groupId, error.exception());
-                break;
+            // If the coordinator is in the middle of loading, then we just 
need to retry.
             case COORDINATOR_LOAD_IN_PROGRESS:
+                log.debug("OffsetCommit request for group id {} failed because 
the coordinator" +
+                    " is still in the process of loading state. Will retry.", 
groupId.idValue);
+                groupsToRetry.add(groupId);
+                break;
+
+            // If the coordinator is not available, then we unmap and retry.
             case COORDINATOR_NOT_AVAILABLE:
             case NOT_COORDINATOR:
-                log.debug("OffsetCommit request for group {} returned error 
{}. Will retry", groupId, error);
-                unmapped.add(groupId);
+                log.debug("OffsetCommit request for group id {} returned error 
{}. Will retry.",
+                    groupId.idValue, error);
+                groupsToUnmap.add(groupId);
                 break;
+
+            // Group level errors.
+            case INVALID_GROUP_ID:
+            case REBALANCE_IN_PROGRESS:
+            case INVALID_COMMIT_OFFSET_SIZE:
+            case GROUP_AUTHORIZATION_FAILED:
+                log.debug("OffsetCommit request for group id {} failed due to 
error {}.",
+                    groupId.idValue, error);
+                partitionResults.put(topicPartition, error);

Review comment:
       Yeah, that's a good question. Prior to KIP-599, we considered them as 
non retryable errors so I sticked to this here.  I think that it might be a 
good idea to consider them as retryable errors but we should do it consistently 
for all the group handlers. How about filing a Jira for this and tackling it 
separately?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
##########
@@ -105,53 +118,96 @@ public String apiName() {
         Set<CoordinatorKey> groupIds,
         AbstractResponse abstractResponse
     ) {
+        validateKeys(groupIds);
+
         final OffsetCommitResponse response = (OffsetCommitResponse) 
abstractResponse;
-        Map<CoordinatorKey, Map<TopicPartition, Errors>> completed = new 
HashMap<>();
-        Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-        List<CoordinatorKey> unmapped = new ArrayList<>();
+        final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
+        final Set<CoordinatorKey> groupsToRetry = new HashSet<>();
+        final Map<TopicPartition, Errors> partitionResults = new HashMap<>();
 
-        Map<TopicPartition, Errors> partitions = new HashMap<>();
         for (OffsetCommitResponseTopic topic : response.data().topics()) {
             for (OffsetCommitResponsePartition partition : topic.partitions()) 
{
-                TopicPartition tp = new TopicPartition(topic.name(), 
partition.partitionIndex());
+                TopicPartition topicPartition = new 
TopicPartition(topic.name(), partition.partitionIndex());
                 Errors error = Errors.forCode(partition.errorCode());
+
                 if (error != Errors.NONE) {
-                    handleError(groupId, error, failed, unmapped);
+                    handleError(
+                        groupId,
+                        topicPartition,
+                        error,
+                        partitionResults,
+                        groupsToUnmap,
+                        groupsToRetry
+                    );
                 } else {
-                    partitions.put(tp, error);
+                    partitionResults.put(topicPartition, error);
                 }
             }
         }
-        if (failed.isEmpty() && unmapped.isEmpty())
-            completed.put(groupId, partitions);
 
-        return new ApiResult<>(completed, failed, unmapped);
+        if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
+            return new ApiResult<>(
+                Collections.singletonMap(groupId, partitionResults),
+                Collections.emptyMap(),
+                Collections.emptyList()
+            );
+        } else {
+            return new ApiResult<>(
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                new ArrayList<>(groupsToUnmap)
+            );
+        }
     }
 
     private void handleError(
         CoordinatorKey groupId,
+        TopicPartition topicPartition,
         Errors error,
-        Map<CoordinatorKey, Throwable> failed,
-        List<CoordinatorKey> unmapped
+        Map<TopicPartition, Errors> partitionResults,
+        Set<CoordinatorKey> groupsToUnmap,
+        Set<CoordinatorKey> groupsToRetry
     ) {
         switch (error) {
-            case GROUP_AUTHORIZATION_FAILED:
-                log.error("Received authorization failure for group {} in 
`OffsetCommit` response", groupId,
-                        error.exception());
-                failed.put(groupId, error.exception());
-                break;
+            // If the coordinator is in the middle of loading, then we just 
need to retry.
             case COORDINATOR_LOAD_IN_PROGRESS:
+                log.debug("OffsetCommit request for group id {} failed because 
the coordinator" +
+                    " is still in the process of loading state. Will retry.", 
groupId.idValue);
+                groupsToRetry.add(groupId);
+                break;
+
+            // If the coordinator is not available, then we unmap and retry.
             case COORDINATOR_NOT_AVAILABLE:
             case NOT_COORDINATOR:
-                log.debug("OffsetCommit request for group {} returned error 
{}. Will retry", groupId, error);
-                unmapped.add(groupId);
+                log.debug("OffsetCommit request for group id {} returned error 
{}. Will retry.",
+                    groupId.idValue, error);
+                groupsToUnmap.add(groupId);
                 break;
+
+            // Group level errors.
+            case INVALID_GROUP_ID:
+            case REBALANCE_IN_PROGRESS:
+            case INVALID_COMMIT_OFFSET_SIZE:
+            case GROUP_AUTHORIZATION_FAILED:
+                log.debug("OffsetCommit request for group id {} failed due to 
error {}.",
+                    groupId.idValue, error);
+                partitionResults.put(topicPartition, error);

Review comment:
       https://issues.apache.org/jira/browse/KAFKA-13103




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to