This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new f56b9e7  KAFKA-13063: Make DescribeConsumerGroupsHandler unmap for 
COORDINATOR_NOT_AVAILABLE error (#11022)
f56b9e7 is described below

commit f56b9e70bda5591c96aa4eb747fd6d2ffeb70953
Author: Luke Chen <show...@gmail.com>
AuthorDate: Thu Jul 15 20:25:41 2021 +0800

    KAFKA-13063: Make DescribeConsumerGroupsHandler unmap for 
COORDINATOR_NOT_AVAILABLE error (#11022)
    
    This patch improve the error handling in `DescribeConsumerGroupsHandler` 
and ensure that `COORDINATOR_NOT_AVAILABLE` is unmapped in order to look up the 
coordinator again.
    
    Reviewers: David Jacot <dja...@confluent.io>
---
 .../internals/DescribeConsumerGroupsHandler.java   | 41 ++++++++++++----------
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 27 +++++++-------
 .../DescribeConsumerGroupsHandlerTest.java         |  2 +-
 3 files changed, 38 insertions(+), 32 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java
index 8a94bec..10756a6 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java
@@ -109,16 +109,16 @@ public class DescribeConsumerGroupsHandler implements 
AdminApiHandler<Coordinato
         Set<CoordinatorKey> groupIds,
         AbstractResponse abstractResponse
     ) {
-        DescribeGroupsResponse response = (DescribeGroupsResponse) 
abstractResponse;
-        Map<CoordinatorKey, ConsumerGroupDescription> completed = new 
HashMap<>();
-        Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-        List<CoordinatorKey> unmapped = new ArrayList<>();
+        final DescribeGroupsResponse response = (DescribeGroupsResponse) 
abstractResponse;
+        final Map<CoordinatorKey, ConsumerGroupDescription> completed = new 
HashMap<>();
+        final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+        final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
 
         for (DescribedGroup describedGroup : response.data().groups()) {
             CoordinatorKey groupIdKey = 
CoordinatorKey.byGroupId(describedGroup.groupId());
             Errors error = Errors.forCode(describedGroup.errorCode());
             if (error != Errors.NONE) {
-                handleError(groupIdKey, error, failed, unmapped);
+                handleError(groupIdKey, error, failed, groupsToUnmap);
                 continue;
             }
             final String protocolType = describedGroup.protocolType();
@@ -151,38 +151,41 @@ public class DescribeConsumerGroupsHandler implements 
AdminApiHandler<Coordinato
                 completed.put(groupIdKey, consumerGroupDescription);
             } else {
                 failed.put(groupIdKey, new IllegalArgumentException(
-                        String.format("GroupId %s is not a consumer group 
(%s).",
-                                groupIdKey.idValue, protocolType)));
+                    String.format("GroupId %s is not a consumer group (%s).",
+                        groupIdKey.idValue, protocolType)));
             }
         }
-        return new ApiResult<>(completed, failed, unmapped);
+
+        return new ApiResult<>(completed, failed, new 
ArrayList<>(groupsToUnmap));
     }
 
     private void handleError(
         CoordinatorKey groupId,
         Errors error,
         Map<CoordinatorKey, Throwable> failed,
-        List<CoordinatorKey> unmapped
+        Set<CoordinatorKey> groupsToUnmap
     ) {
         switch (error) {
             case GROUP_AUTHORIZATION_FAILED:
-                log.error("Received authorization failure for group {} in 
`DescribeGroups` response", groupId,
-                        error.exception());
+                log.debug("`DescribeGroups` request for group id {} failed due 
to error {}", groupId.idValue, error);
                 failed.put(groupId, error.exception());
                 break;
             case COORDINATOR_LOAD_IN_PROGRESS:
-            case COORDINATOR_NOT_AVAILABLE:
+                // If the coordinator is in the middle of loading, then we 
just need to retry
+                log.debug("`DescribeGroups` request for group id {} failed 
because the coordinator " +
+                    "is still in the process of loading state. Will retry", 
groupId.idValue);
                 break;
+            case COORDINATOR_NOT_AVAILABLE:
             case NOT_COORDINATOR:
-                log.debug("DescribeGroups request for group {} returned error 
{}. Will retry",
-                        groupId, error);
-                unmapped.add(groupId);
+                // If the coordinator is unavailable or there was a 
coordinator change, then we unmap
+                // the key so that we retry the `FindCoordinator` request
+                log.debug("`DescribeGroups` request for group id {} returned 
error {}. " +
+                    "Will attempt to find the coordinator again and retry", 
groupId.idValue, error);
+                groupsToUnmap.add(groupId);
                 break;
             default:
-                log.error("Received unexpected error for group {} in 
`DescribeGroups` response", 
-                        groupId, error.exception());
-                failed.put(groupId, error.exception(
-                        "Received unexpected error for group " + groupId + " 
in `DescribeGroups` response"));
+                log.error("`DescribeGroups` request for group id {} failed due 
to unexpected error {}", groupId.idValue, error);
+                failed.put(groupId, error.exception());
         }
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index e79890f..ab11833 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -2688,7 +2688,7 @@ public class KafkaAdminClientTest {
         try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
 
-            //Retriable FindCoordinatorResponse errors should be retried
+            // Retriable FindCoordinatorResponse errors should be retried
             
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE,
  Node.noNode()));
             
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS,
  Node.noNode()));
 
@@ -2707,21 +2707,12 @@ public class KafkaAdminClientTest {
                 Collections.emptySet()));
             env.kafkaClient().prepareResponse(new 
DescribeGroupsResponse(data));
 
-            data = new DescribeGroupsResponseData();
-            data.groups().add(DescribeGroupsResponse.groupMetadata(
-                GROUP_ID,
-                Errors.COORDINATOR_NOT_AVAILABLE,
-                "",
-                "",
-                "",
-                Collections.emptyList(),
-                Collections.emptySet()));
-            env.kafkaClient().prepareResponse(new 
DescribeGroupsResponse(data));
-
             /*
              * We need to return two responses here, one with NOT_COORDINATOR 
error when calling describe consumer group
              * api using coordinator that has moved. This will retry whole 
operation. So we need to again respond with a
              * FindCoordinatorResponse.
+             *
+             * And the same reason for COORDINATOR_NOT_AVAILABLE error response
              */
             data = new DescribeGroupsResponseData();
             data.groups().add(DescribeGroupsResponse.groupMetadata(
@@ -2736,6 +2727,18 @@ public class KafkaAdminClientTest {
             
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
 
             data = new DescribeGroupsResponseData();
+            data.groups().add(DescribeGroupsResponse.groupMetadata(
+                GROUP_ID,
+                Errors.COORDINATOR_NOT_AVAILABLE,
+                "",
+                "",
+                "",
+                Collections.emptyList(),
+                Collections.emptySet()));
+            env.kafkaClient().prepareResponse(new 
DescribeGroupsResponse(data));
+            
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
+
+            data = new DescribeGroupsResponseData();
             TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 
0);
             TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 
1);
             TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 
2);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java
index fe26043..aef207a 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java
@@ -104,13 +104,13 @@ public class DescribeConsumerGroupsHandlerTest {
 
     @Test
     public void testUnmappedHandleResponse() {
+        assertUnmapped(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE, ""));
         assertUnmapped(handleWithError(Errors.NOT_COORDINATOR, ""));
     }
 
     @Test
     public void testRetriableHandleResponse() {
         assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS, 
""));
-        assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE, ""));
     }
 
     @Test

Reply via email to