This is an automated email from the ASF dual-hosted git repository. dajac pushed a commit to branch 3.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push: new f56b9e7 KAFKA-13063: Make DescribeConsumerGroupsHandler unmap for COORDINATOR_NOT_AVAILABLE error (#11022) f56b9e7 is described below commit f56b9e70bda5591c96aa4eb747fd6d2ffeb70953 Author: Luke Chen <show...@gmail.com> AuthorDate: Thu Jul 15 20:25:41 2021 +0800 KAFKA-13063: Make DescribeConsumerGroupsHandler unmap for COORDINATOR_NOT_AVAILABLE error (#11022) This patch improve the error handling in `DescribeConsumerGroupsHandler` and ensure that `COORDINATOR_NOT_AVAILABLE` is unmapped in order to look up the coordinator again. Reviewers: David Jacot <dja...@confluent.io> --- .../internals/DescribeConsumerGroupsHandler.java | 41 ++++++++++++---------- .../kafka/clients/admin/KafkaAdminClientTest.java | 27 +++++++------- .../DescribeConsumerGroupsHandlerTest.java | 2 +- 3 files changed, 38 insertions(+), 32 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java index 8a94bec..10756a6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java @@ -109,16 +109,16 @@ public class DescribeConsumerGroupsHandler implements AdminApiHandler<Coordinato Set<CoordinatorKey> groupIds, AbstractResponse abstractResponse ) { - DescribeGroupsResponse response = (DescribeGroupsResponse) abstractResponse; - Map<CoordinatorKey, ConsumerGroupDescription> completed = new HashMap<>(); - Map<CoordinatorKey, Throwable> failed = new HashMap<>(); - List<CoordinatorKey> unmapped = new ArrayList<>(); + final DescribeGroupsResponse response = (DescribeGroupsResponse) abstractResponse; + final Map<CoordinatorKey, ConsumerGroupDescription> completed = new HashMap<>(); + final Map<CoordinatorKey, Throwable> failed = new HashMap<>(); + final Set<CoordinatorKey> groupsToUnmap = new HashSet<>(); for (DescribedGroup describedGroup : response.data().groups()) { CoordinatorKey groupIdKey = CoordinatorKey.byGroupId(describedGroup.groupId()); Errors error = Errors.forCode(describedGroup.errorCode()); if (error != Errors.NONE) { - handleError(groupIdKey, error, failed, unmapped); + handleError(groupIdKey, error, failed, groupsToUnmap); continue; } final String protocolType = describedGroup.protocolType(); @@ -151,38 +151,41 @@ public class DescribeConsumerGroupsHandler implements AdminApiHandler<Coordinato completed.put(groupIdKey, consumerGroupDescription); } else { failed.put(groupIdKey, new IllegalArgumentException( - String.format("GroupId %s is not a consumer group (%s).", - groupIdKey.idValue, protocolType))); + String.format("GroupId %s is not a consumer group (%s).", + groupIdKey.idValue, protocolType))); } } - return new ApiResult<>(completed, failed, unmapped); + + return new ApiResult<>(completed, failed, new ArrayList<>(groupsToUnmap)); } private void handleError( CoordinatorKey groupId, Errors error, Map<CoordinatorKey, Throwable> failed, - List<CoordinatorKey> unmapped + Set<CoordinatorKey> groupsToUnmap ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: - log.error("Received authorization failure for group {} in `DescribeGroups` response", groupId, - error.exception()); + log.debug("`DescribeGroups` request for group id {} failed due to error {}", groupId.idValue, error); failed.put(groupId, error.exception()); break; case COORDINATOR_LOAD_IN_PROGRESS: - case COORDINATOR_NOT_AVAILABLE: + // If the coordinator is in the middle of loading, then we just need to retry + log.debug("`DescribeGroups` request for group id {} failed because the coordinator " + + "is still in the process of loading state. Will retry", groupId.idValue); break; + case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: - log.debug("DescribeGroups request for group {} returned error {}. Will retry", - groupId, error); - unmapped.add(groupId); + // If the coordinator is unavailable or there was a coordinator change, then we unmap + // the key so that we retry the `FindCoordinator` request + log.debug("`DescribeGroups` request for group id {} returned error {}. " + + "Will attempt to find the coordinator again and retry", groupId.idValue, error); + groupsToUnmap.add(groupId); break; default: - log.error("Received unexpected error for group {} in `DescribeGroups` response", - groupId, error.exception()); - failed.put(groupId, error.exception( - "Received unexpected error for group " + groupId + " in `DescribeGroups` response")); + log.error("`DescribeGroups` request for group id {} failed due to unexpected error {}", groupId.idValue, error); + failed.put(groupId, error.exception()); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index e79890f..ab11833 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -2688,7 +2688,7 @@ public class KafkaAdminClientTest { try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); - //Retriable FindCoordinatorResponse errors should be retried + // Retriable FindCoordinatorResponse errors should be retried env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode())); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode())); @@ -2707,21 +2707,12 @@ public class KafkaAdminClientTest { Collections.emptySet())); env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data)); - data = new DescribeGroupsResponseData(); - data.groups().add(DescribeGroupsResponse.groupMetadata( - GROUP_ID, - Errors.COORDINATOR_NOT_AVAILABLE, - "", - "", - "", - Collections.emptyList(), - Collections.emptySet())); - env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data)); - /* * We need to return two responses here, one with NOT_COORDINATOR error when calling describe consumer group * api using coordinator that has moved. This will retry whole operation. So we need to again respond with a * FindCoordinatorResponse. + * + * And the same reason for COORDINATOR_NOT_AVAILABLE error response */ data = new DescribeGroupsResponseData(); data.groups().add(DescribeGroupsResponse.groupMetadata( @@ -2736,6 +2727,18 @@ public class KafkaAdminClientTest { env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); data = new DescribeGroupsResponseData(); + data.groups().add(DescribeGroupsResponse.groupMetadata( + GROUP_ID, + Errors.COORDINATOR_NOT_AVAILABLE, + "", + "", + "", + Collections.emptyList(), + Collections.emptySet())); + env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data)); + env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + + data = new DescribeGroupsResponseData(); TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0); TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1); TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 2); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java index fe26043..aef207a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java @@ -104,13 +104,13 @@ public class DescribeConsumerGroupsHandlerTest { @Test public void testUnmappedHandleResponse() { + assertUnmapped(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE, "")); assertUnmapped(handleWithError(Errors.NOT_COORDINATOR, "")); } @Test public void testRetriableHandleResponse() { assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS, "")); - assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE, "")); } @Test