dajac commented on a change in pull request #11026:
URL: https://github.com/apache/kafka/pull/11026#discussion_r668864349



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
##########
@@ -110,41 +112,97 @@ public String apiName() {
                         groupOffsetsListing.put(topicPartition, new 
OffsetAndMetadata(offset, leaderEpoch, metadata));
                     }
                 } else {
-                    log.warn("Skipping return offset for {} due to error {}.", 
topicPartition, error);
+                    handlePartitionError(groupId, topicPartition, error, 
groupsToUnmap, groupsToRetry);
                 }
             }
             completed.put(groupId, groupOffsetsListing);
         }
-        return new ApiResult<>(completed, failed, unmapped);
+
+        if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
+            return new ApiResult<>(
+                completed,

Review comment:
       We could get rid of `completed` and use 
`Collections.singletonMap(groupId, groupOffsetsListing)`, no?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
##########
@@ -85,11 +86,12 @@ public String apiName() {
         final OffsetFetchResponse response = (OffsetFetchResponse) 
abstractResponse;

Review comment:
       As we expect a specific `groupId`, I would check the provided `groupIds`.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
##########
@@ -85,11 +86,12 @@ public String apiName() {
         final OffsetFetchResponse response = (OffsetFetchResponse) 
abstractResponse;
         Map<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> completed 
= new HashMap<>();
         Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-        List<CoordinatorKey> unmapped = new ArrayList<>();
+        final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();

Review comment:
       nit: Let's make all variables as final or none in this block.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
##########
@@ -110,41 +112,97 @@ public String apiName() {
                         groupOffsetsListing.put(topicPartition, new 
OffsetAndMetadata(offset, leaderEpoch, metadata));
                     }
                 } else {
-                    log.warn("Skipping return offset for {} due to error {}.", 
topicPartition, error);
+                    handlePartitionError(groupId, topicPartition, error, 
groupsToUnmap, groupsToRetry);
                 }
             }
             completed.put(groupId, groupOffsetsListing);
         }
-        return new ApiResult<>(completed, failed, unmapped);
+
+        if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
+            return new ApiResult<>(
+                completed,
+                failed,
+                Collections.emptyList()
+            );
+        } else {
+            // retry the request, so don't send completed/failed results back
+            return new ApiResult<>(
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                new ArrayList<>(groupsToUnmap)
+            );
+        }
     }
 
-    private void handleError(
+    private void handleGroupError(
         CoordinatorKey groupId,
         Errors error,
-        Map<CoordinatorKey,
-        Throwable> failed,
-        List<CoordinatorKey> unmapped
+        Map<CoordinatorKey, Throwable> failed,
+        Set<CoordinatorKey> groupsToUnmap,
+        Set<CoordinatorKey> groupsToRetry
     ) {
         switch (error) {
             case GROUP_AUTHORIZATION_FAILED:
-                log.error("Received authorization failure for group {} in 
`OffsetFetch` response", groupId,
-                        error.exception());
+                log.error("Received authorization failure for group {} in `{}` 
response", groupId,
+                    apiName(), error.exception());
                 failed.put(groupId, error.exception());
                 break;
+
             case COORDINATOR_LOAD_IN_PROGRESS:
+                // If the coordinator is in the middle of loading, then we 
just need to retry
+                log.debug("`{}` request for group {} failed because the 
coordinator " +
+                    "is still in the process of loading state. Will retry", 
apiName(), groupId);
+                groupsToRetry.add(groupId);
+                break;
             case COORDINATOR_NOT_AVAILABLE:
+            case NOT_COORDINATOR:
+                // If the coordinator is unavailable or there was a 
coordinator change, then we unmap
+                // the key so that we retry the `FindCoordinator` request
+                log.debug("`{}` request for group {} returned error {}. " +
+                    "Will attempt to find the coordinator again and retry", 
apiName(), groupId, error);
+                groupsToUnmap.add(groupId);
                 break;
+
+            default:
+                final String unexpectedErrorMsg = String.format("Received 
unexpected error for group %s in `%s` response",
+                    groupId, apiName());
+                log.error(unexpectedErrorMsg, error.exception());
+                failed.put(groupId, error.exception(unexpectedErrorMsg));
+        }
+    }
+
+    private void handlePartitionError(
+        CoordinatorKey groupId,
+        TopicPartition topicPartition,
+        Errors error,
+        Set<CoordinatorKey> groupsToUnmap,
+        Set<CoordinatorKey> groupsToRetry
+    ) {
+        switch (error) {
+            case COORDINATOR_LOAD_IN_PROGRESS:
+                // If the coordinator is in the middle of loading, then we 
just need to retry
+                log.debug("`{}` request for group {} failed because the 
coordinator " +
+                    "is still in the process of loading state. Will retry", 
apiName(), groupId);
+                groupsToRetry.add(groupId);
+                break;
+            case COORDINATOR_NOT_AVAILABLE:
             case NOT_COORDINATOR:
-                log.debug("OffsetFetch request for group {} returned error {}. 
Will retry",
-                        groupId, error);
-                unmapped.add(groupId);
+                // If the coordinator is unavailable or there was a 
coordinator change, then we unmap
+                // the key so that we retry the `FindCoordinator` request
+                log.debug("`{}` request for group {} returned error {}. " +
+                    "Will attempt to find the coordinator again and retry", 
apiName(), groupId, error);
+                groupsToUnmap.add(groupId);
+                break;
+            case UNKNOWN_TOPIC_OR_PARTITION:
+            case TOPIC_AUTHORIZATION_FAILED:
+            case UNSTABLE_OFFSET_COMMIT:
+                log.warn("`{}` request for group {} returned error {} in 
partition {}. Skipping return offset for it.",
+                    apiName(), groupId, error, topicPartition);
                 break;
             default:
-                log.error("Received unexpected error for group {} in 
`OffsetFetch` response",
-                        groupId, error.exception());
-                failed.put(groupId, error.exception(
-                        "Received unexpected error for group " + groupId + " 
in `OffsetFetch` response"));
+                log.error("`{}` request for group {} returned unexpected error 
{} in partition {}. Skipping return offset for it.",
+                    apiName(), groupId, error, topicPartition);
         }
     }
 
-}
+}

Review comment:
       nit: Could we bring this back?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
##########
@@ -110,41 +112,97 @@ public String apiName() {
                         groupOffsetsListing.put(topicPartition, new 
OffsetAndMetadata(offset, leaderEpoch, metadata));
                     }
                 } else {
-                    log.warn("Skipping return offset for {} due to error {}.", 
topicPartition, error);
+                    handlePartitionError(groupId, topicPartition, error, 
groupsToUnmap, groupsToRetry);

Review comment:
       v0 and v1 do not have the top level error so it is indeed correct to 
check them here. Could you put a small comment to explain this here? Could you 
also mark KAFKA-13064 as a blocker for 3.0? This is a regression, I think.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to