[GitHub] [kafka] showuon commented on a change in pull request #11019: KAFKA-13059: refactor DeleteConsumerGroupOffsetsHandler and tests
showuon commented on a change in pull request #11019: URL: https://github.com/apache/kafka/pull/11019#discussion_r670111886 ## File path: clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java ## @@ -67,48 +69,88 @@ public void testBuildRequest() { @Test public void testSuccessfulHandleResponse() { Map responseData = Collections.singletonMap(t0p0, Errors.NONE); -assertCompleted(handleWithError(Errors.NONE), responseData); +assertCompleted(handleWithGroupError(Errors.NONE), responseData); } @Test public void testUnmappedHandleResponse() { -assertUnmapped(handleWithError(Errors.NOT_COORDINATOR)); +assertUnmapped(handleWithGroupError(Errors.NOT_COORDINATOR)); +assertUnmapped(handleWithGroupError(Errors.COORDINATOR_NOT_AVAILABLE)); } @Test public void testRetriableHandleResponse() { -assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); -assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); + assertRetriable(handleWithGroupError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); } @Test -public void testFailedHandleResponse() { -assertFailed(GroupAuthorizationException.class, handleWithError(Errors.GROUP_AUTHORIZATION_FAILED)); -assertFailed(GroupIdNotFoundException.class, handleWithError(Errors.GROUP_ID_NOT_FOUND)); -assertFailed(InvalidGroupIdException.class, handleWithError(Errors.INVALID_GROUP_ID)); +public void testFailedHandleResponseWithGroupError() { +assertGroupFailed(GroupAuthorizationException.class, handleWithGroupError(Errors.GROUP_AUTHORIZATION_FAILED)); +assertGroupFailed(GroupIdNotFoundException.class, handleWithGroupError(Errors.GROUP_ID_NOT_FOUND)); +assertGroupFailed(InvalidGroupIdException.class, handleWithGroupError(Errors.INVALID_GROUP_ID)); +assertGroupFailed(GroupNotEmptyException.class, handleWithGroupError(Errors.NON_EMPTY_GROUP)); } -private OffsetDeleteResponse buildResponse(Errors error) { +@Test +public void testFailedHandleResponseWithPartitionError() { +assertPartitionFailed(Collections.singletonMap(t0p0, Errors.GROUP_SUBSCRIBED_TO_TOPIC), +handleWithPartitionError(Errors.GROUP_SUBSCRIBED_TO_TOPIC)); +assertPartitionFailed(Collections.singletonMap(t0p0, Errors.TOPIC_AUTHORIZATION_FAILED), +handleWithPartitionError(Errors.TOPIC_AUTHORIZATION_FAILED)); +assertPartitionFailed(Collections.singletonMap(t0p0, Errors.UNKNOWN_TOPIC_OR_PARTITION), +handleWithPartitionError(Errors.UNKNOWN_TOPIC_OR_PARTITION)); +} + +private OffsetDeleteResponse buildGroupErrorResponse(Errors error) { +OffsetDeleteResponse response = new OffsetDeleteResponse( +new OffsetDeleteResponseData() +.setErrorCode(error.code())); +if (error == Errors.NONE) { +response.data() +.setThrottleTimeMs(0) +.setTopics(new OffsetDeleteResponseTopicCollection(singletonList( +new OffsetDeleteResponseTopic() +.setName(t0p0.topic()) +.setPartitions(new OffsetDeleteResponsePartitionCollection(singletonList( +new OffsetDeleteResponsePartition() +.setPartitionIndex(t0p0.partition()) +.setErrorCode(error.code()) +).iterator())) +).iterator())); +} +return response; +} + +private OffsetDeleteResponse buildPartitionErrorResponse(Errors error) { OffsetDeleteResponse response = new OffsetDeleteResponse( -new OffsetDeleteResponseData() -.setThrottleTimeMs(0) -.setTopics(new OffsetDeleteResponseTopicCollection(singletonList( -new OffsetDeleteResponseTopic() -.setName("t0") -.setPartitions(new OffsetDeleteResponsePartitionCollection(singletonList( -new OffsetDeleteResponsePartition() -.setPartitionIndex(0) -.setErrorCode(error.code()) - ).iterator())) - ).iterator(; +new OffsetDeleteResponseData() +.setThrottleTimeMs(0) +.setTopics(new OffsetDeleteResponseTopicCollection(singletonList( +new OffsetDeleteResponseTopic() +.setName(t0p0.topic()) +.setPartitions(new OffsetDeleteResponsePartitionCollection(singletonList( +new OffsetDeleteResponsePartition() +
[GitHub] [kafka] showuon commented on a change in pull request #11019: KAFKA-13059: refactor DeleteConsumerGroupOffsetsHandler and tests
showuon commented on a change in pull request #11019: URL: https://github.com/apache/kafka/pull/11019#discussion_r669472461 ## File path: clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java ## @@ -149,4 +191,21 @@ private void assertFailed( assertEquals(singleton(key), result.failedKeys.keySet()); assertTrue(expectedExceptionType.isInstance(result.failedKeys.get(key))); } -} + +private void assertPartitionFailed( +Map expectedResult, +AdminApiHandler.ApiResult> result +) { +CoordinatorKey key = CoordinatorKey.byGroupId(groupId); +assertEquals(singleton(key), result.completedKeys.keySet()); + +// verify the completed value is expected result +Collection> completeCollection = result.completedKeys.values(); +assertEquals(1, completeCollection.size()); +Map completeMap = completeCollection.iterator().next(); +assertEquals(expectedResult, completeMap); Review comment: Good suggestion! Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11019: KAFKA-13059: refactor DeleteConsumerGroupOffsetsHandler and tests
showuon commented on a change in pull request #11019: URL: https://github.com/apache/kafka/pull/11019#discussion_r669472287 ## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ## @@ -3359,23 +3360,23 @@ public void testDeleteConsumerGroupOffsets() throws Exception { prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); env.kafkaClient().prepareResponse(new OffsetDeleteResponse( -new OffsetDeleteResponseData() -.setTopics(new OffsetDeleteResponseTopicCollection(Stream.of( -new OffsetDeleteResponseTopic() -.setName("foo") -.setPartitions(new OffsetDeleteResponsePartitionCollection(Collections.singletonList( -new OffsetDeleteResponsePartition() -.setPartitionIndex(0) -.setErrorCode(Errors.NONE.code()) -).iterator())), -new OffsetDeleteResponseTopic() -.setName("bar") -.setPartitions(new OffsetDeleteResponsePartitionCollection(Collections.singletonList( -new OffsetDeleteResponsePartition() -.setPartitionIndex(0) - .setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code()) -).iterator())) -).collect(Collectors.toList()).iterator())) +new OffsetDeleteResponseData() +.setTopics(new OffsetDeleteResponseTopicCollection(Stream.of( +new OffsetDeleteResponseTopic() +.setName("foo") +.setPartitions(new OffsetDeleteResponsePartitionCollection(Collections.singletonList( +new OffsetDeleteResponsePartition() +.setPartitionIndex(0) +.setErrorCode(Errors.NONE.code()) +).iterator())), +new OffsetDeleteResponseTopic() +.setName("bar") +.setPartitions(new OffsetDeleteResponsePartitionCollection(Collections.singletonList( +new OffsetDeleteResponsePartition() +.setPartitionIndex(0) + .setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code()) +).iterator())) +).collect(Collectors.toList()).iterator())) Review comment: Sorry, I accidentally did it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11019: KAFKA-13059: refactor DeleteConsumerGroupOffsetsHandler and tests
showuon commented on a change in pull request #11019: URL: https://github.com/apache/kafka/pull/11019#discussion_r669471854 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java ## @@ -97,54 +108,79 @@ public String apiName() { Set groupIds, AbstractResponse abstractResponse ) { +validateKeys(groupIds); + final OffsetDeleteResponse response = (OffsetDeleteResponse) abstractResponse; -Map> completed = new HashMap<>(); -Map failed = new HashMap<>(); -List unmapped = new ArrayList<>(); +final Map> completed = new HashMap<>(); +final Map failed = new HashMap<>(); +final Set groupsToUnmap = new HashSet<>(); +final Set groupsToRetry = new HashSet<>(); final Errors error = Errors.forCode(response.data().errorCode()); if (error != Errors.NONE) { -handleError(groupId, error, failed, unmapped); +handleGroupError(groupId, error, failed, groupsToUnmap, groupsToRetry); } else { -final Map partitions = new HashMap<>(); -response.data().topics().forEach(topic -> +final Map partitionResults = new HashMap<>(); +response.data().topics().forEach(topic -> topic.partitions().forEach(partition -> { Errors partitionError = Errors.forCode(partition.errorCode()); -if (!handleError(groupId, partitionError, failed, unmapped)) { -partitions.put(new TopicPartition(topic.name(), partition.partitionIndex()), partitionError); -} + +partitionResults.put(new TopicPartition(topic.name(), partition.partitionIndex()), partitionError); }) ); -if (!partitions.isEmpty()) -completed.put(groupId, partitions); + +completed.put(groupId, partitionResults); +} + +if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) { +return new ApiResult<>( +completed, +failed, +Collections.emptyList() +); +} else { +// retry the request, so don't send completed/failed results back +return new ApiResult<>( +Collections.emptyMap(), +Collections.emptyMap(), +new ArrayList<>(groupsToUnmap) +); } -return new ApiResult<>(completed, failed, unmapped); } -private boolean handleError( +private void handleGroupError( CoordinatorKey groupId, Errors error, Map failed, -List unmapped +Set groupsToUnmap, +Set groupsToRetry ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: case GROUP_ID_NOT_FOUND: case INVALID_GROUP_ID: -log.error("Received non retriable error for group {} in `DeleteConsumerGroupOffsets` response", groupId, -error.exception()); +case NON_EMPTY_GROUP: +log.debug("`OffsetDelete` request for group id {} failed due to error {}.", groupId, error); failed.put(groupId, error.exception()); -return true; +break; case COORDINATOR_LOAD_IN_PROGRESS: +// If the coordinator is in the middle of loading, then we just need to retry +log.debug("`OffsetDelete` request for group {} failed because the coordinator" + Review comment: Updated. I'll also update other PRs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11019: KAFKA-13059: refactor DeleteConsumerGroupOffsetsHandler and tests
showuon commented on a change in pull request #11019: URL: https://github.com/apache/kafka/pull/11019#discussion_r669471550 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java ## @@ -97,54 +108,79 @@ public String apiName() { Set groupIds, AbstractResponse abstractResponse ) { +validateKeys(groupIds); + final OffsetDeleteResponse response = (OffsetDeleteResponse) abstractResponse; -Map> completed = new HashMap<>(); -Map failed = new HashMap<>(); -List unmapped = new ArrayList<>(); +final Map> completed = new HashMap<>(); +final Map failed = new HashMap<>(); +final Set groupsToUnmap = new HashSet<>(); +final Set groupsToRetry = new HashSet<>(); final Errors error = Errors.forCode(response.data().errorCode()); if (error != Errors.NONE) { -handleError(groupId, error, failed, unmapped); +handleGroupError(groupId, error, failed, groupsToUnmap, groupsToRetry); } else { -final Map partitions = new HashMap<>(); -response.data().topics().forEach(topic -> +final Map partitionResults = new HashMap<>(); +response.data().topics().forEach(topic -> topic.partitions().forEach(partition -> { Errors partitionError = Errors.forCode(partition.errorCode()); -if (!handleError(groupId, partitionError, failed, unmapped)) { -partitions.put(new TopicPartition(topic.name(), partition.partitionIndex()), partitionError); -} + +partitionResults.put(new TopicPartition(topic.name(), partition.partitionIndex()), partitionError); }) ); -if (!partitions.isEmpty()) -completed.put(groupId, partitions); + +completed.put(groupId, partitionResults); +} + +if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) { +return new ApiResult<>( +completed, +failed, +Collections.emptyList() +); +} else { +// retry the request, so don't send completed/failed results back +return new ApiResult<>( +Collections.emptyMap(), +Collections.emptyMap(), +new ArrayList<>(groupsToUnmap) +); } -return new ApiResult<>(completed, failed, unmapped); } -private boolean handleError( +private void handleGroupError( CoordinatorKey groupId, Errors error, Map failed, -List unmapped +Set groupsToUnmap, +Set groupsToRetry ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: case GROUP_ID_NOT_FOUND: case INVALID_GROUP_ID: -log.error("Received non retriable error for group {} in `DeleteConsumerGroupOffsets` response", groupId, -error.exception()); +case NON_EMPTY_GROUP: +log.debug("`OffsetDelete` request for group id {} failed due to error {}.", groupId, error); Review comment: Nice catch! I'll also update other PRs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11019: KAFKA-13059: refactor DeleteConsumerGroupOffsetsHandler and tests
showuon commented on a change in pull request #11019: URL: https://github.com/apache/kafka/pull/11019#discussion_r669471228 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java ## @@ -97,54 +108,79 @@ public String apiName() { Set groupIds, AbstractResponse abstractResponse ) { +validateKeys(groupIds); + final OffsetDeleteResponse response = (OffsetDeleteResponse) abstractResponse; -Map> completed = new HashMap<>(); -Map failed = new HashMap<>(); -List unmapped = new ArrayList<>(); +final Map> completed = new HashMap<>(); +final Map failed = new HashMap<>(); +final Set groupsToUnmap = new HashSet<>(); +final Set groupsToRetry = new HashSet<>(); final Errors error = Errors.forCode(response.data().errorCode()); if (error != Errors.NONE) { -handleError(groupId, error, failed, unmapped); +handleGroupError(groupId, error, failed, groupsToUnmap, groupsToRetry); Review comment: good suggestion! Updated! ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java ## @@ -97,54 +108,79 @@ public String apiName() { Set groupIds, AbstractResponse abstractResponse ) { +validateKeys(groupIds); + final OffsetDeleteResponse response = (OffsetDeleteResponse) abstractResponse; -Map> completed = new HashMap<>(); -Map failed = new HashMap<>(); -List unmapped = new ArrayList<>(); +final Map> completed = new HashMap<>(); +final Map failed = new HashMap<>(); +final Set groupsToUnmap = new HashSet<>(); +final Set groupsToRetry = new HashSet<>(); final Errors error = Errors.forCode(response.data().errorCode()); if (error != Errors.NONE) { -handleError(groupId, error, failed, unmapped); +handleGroupError(groupId, error, failed, groupsToUnmap, groupsToRetry); } else { -final Map partitions = new HashMap<>(); -response.data().topics().forEach(topic -> +final Map partitionResults = new HashMap<>(); +response.data().topics().forEach(topic -> topic.partitions().forEach(partition -> { Errors partitionError = Errors.forCode(partition.errorCode()); -if (!handleError(groupId, partitionError, failed, unmapped)) { -partitions.put(new TopicPartition(topic.name(), partition.partitionIndex()), partitionError); -} + +partitionResults.put(new TopicPartition(topic.name(), partition.partitionIndex()), partitionError); }) ); -if (!partitions.isEmpty()) -completed.put(groupId, partitions); + +completed.put(groupId, partitionResults); Review comment: Updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11019: KAFKA-13059: refactor DeleteConsumerGroupOffsetsHandler and tests
showuon commented on a change in pull request #11019: URL: https://github.com/apache/kafka/pull/11019#discussion_r668787834 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java ## @@ -100,51 +100,115 @@ public String apiName() { final OffsetDeleteResponse response = (OffsetDeleteResponse) abstractResponse; Map> completed = new HashMap<>(); Map failed = new HashMap<>(); -List unmapped = new ArrayList<>(); +final Set groupsToUnmap = new HashSet<>(); +final Set groupsToRetry = new HashSet<>(); final Errors error = Errors.forCode(response.data().errorCode()); if (error != Errors.NONE) { -handleError(groupId, error, failed, unmapped); +handleGroupError(groupId, error, failed, groupsToUnmap, groupsToRetry); } else { -final Map partitions = new HashMap<>(); -response.data().topics().forEach(topic -> -topic.partitions().forEach(partition -> { -Errors partitionError = Errors.forCode(partition.errorCode()); -if (!handleError(groupId, partitionError, failed, unmapped)) { -partitions.put(new TopicPartition(topic.name(), partition.partitionIndex()), partitionError); +final Map partitionResults = new HashMap<>(); +response.data().topics().forEach(topic -> +topic.partitions().forEach(partitionoffsetDeleteResponse -> { +Errors partitionError = Errors.forCode(partitionoffsetDeleteResponse.errorCode()); +TopicPartition topicPartition = new TopicPartition(topic.name(), partitionoffsetDeleteResponse.partitionIndex()); +if (partitionError != Errors.NONE) { +handlePartitionError(groupId, partitionError, topicPartition, groupsToUnmap, groupsToRetry); Review comment: Yes, I was doing the way you suggested, but there's test failed due to that change: `testDeleteConsumerGroupOffsetsNumRetries` in `KafkaAdminClientTest`. It put the `NOT_COORDINATOR` in partition error, and expected to retry. That's why I changed to this. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11019: KAFKA-13059: refactor DeleteConsumerGroupOffsetsHandler and tests
showuon commented on a change in pull request #11019: URL: https://github.com/apache/kafka/pull/11019#discussion_r668780259 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java ## @@ -100,51 +100,115 @@ public String apiName() { final OffsetDeleteResponse response = (OffsetDeleteResponse) abstractResponse; Map> completed = new HashMap<>(); Map failed = new HashMap<>(); -List unmapped = new ArrayList<>(); +final Set groupsToUnmap = new HashSet<>(); +final Set groupsToRetry = new HashSet<>(); final Errors error = Errors.forCode(response.data().errorCode()); if (error != Errors.NONE) { -handleError(groupId, error, failed, unmapped); +handleGroupError(groupId, error, failed, groupsToUnmap, groupsToRetry); } else { -final Map partitions = new HashMap<>(); -response.data().topics().forEach(topic -> -topic.partitions().forEach(partition -> { -Errors partitionError = Errors.forCode(partition.errorCode()); -if (!handleError(groupId, partitionError, failed, unmapped)) { -partitions.put(new TopicPartition(topic.name(), partition.partitionIndex()), partitionError); +final Map partitionResults = new HashMap<>(); +response.data().topics().forEach(topic -> +topic.partitions().forEach(partitionoffsetDeleteResponse -> { Review comment: Updated. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11019: KAFKA-13059: refactor DeleteConsumerGroupOffsetsHandler and tests
showuon commented on a change in pull request #11019: URL: https://github.com/apache/kafka/pull/11019#discussion_r667675187 ## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ## @@ -3345,12 +3345,9 @@ public void testDeleteConsumerGroupOffsetsRetryBackoff() throws Exception { } @Test -public void testDeleteConsumerGroupOffsets() throws Exception { -// Happy path - +public void testDeleteConsumerGroupOffsetsResponseIncludeCoordinatorErrorAndNoneError() throws Exception { Review comment: Add a test to include coordinator error and other None errors in all partition response. We should retry it, too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11019: KAFKA-13059: refactor DeleteConsumerGroupOffsetsHandler and tests
showuon commented on a change in pull request #11019: URL: https://github.com/apache/kafka/pull/11019#discussion_r667673170 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java ## @@ -100,51 +100,115 @@ public String apiName() { final OffsetDeleteResponse response = (OffsetDeleteResponse) abstractResponse; Map> completed = new HashMap<>(); Map failed = new HashMap<>(); -List unmapped = new ArrayList<>(); +final Set groupsToUnmap = new HashSet<>(); +final Set groupsToRetry = new HashSet<>(); final Errors error = Errors.forCode(response.data().errorCode()); if (error != Errors.NONE) { -handleError(groupId, error, failed, unmapped); +handleGroupError(groupId, error, failed, groupsToUnmap, groupsToRetry); } else { -final Map partitions = new HashMap<>(); -response.data().topics().forEach(topic -> -topic.partitions().forEach(partition -> { -Errors partitionError = Errors.forCode(partition.errorCode()); -if (!handleError(groupId, partitionError, failed, unmapped)) { -partitions.put(new TopicPartition(topic.name(), partition.partitionIndex()), partitionError); +final Map partitionResults = new HashMap<>(); +response.data().topics().forEach(topic -> +topic.partitions().forEach(partitionoffsetDeleteResponse -> { +Errors partitionError = Errors.forCode(partitionoffsetDeleteResponse.errorCode()); +TopicPartition topicPartition = new TopicPartition(topic.name(), partitionoffsetDeleteResponse.partitionIndex()); +if (partitionError != Errors.NONE) { +handlePartitionError(groupId, partitionError, topicPartition, groupsToUnmap, groupsToRetry); } + +partitionResults.put(new TopicPartition(topic.name(), partitionoffsetDeleteResponse.partitionIndex()), partitionError); }) ); -if (!partitions.isEmpty()) -completed.put(groupId, partitions); + +completed.put(groupId, partitionResults); +} + +if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) { +return new ApiResult<>( +completed, +failed, +Collections.emptyList() +); +} else { +// retry the request, so don't send completed/failed results back Review comment: Refer to https://github.com/apache/kafka/pull/11016, we don't return any `completed/failed` results if we need to retry. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11019: KAFKA-13059: refactor DeleteConsumerGroupOffsetsHandler and tests
showuon commented on a change in pull request #11019: URL: https://github.com/apache/kafka/pull/11019#discussion_r667672477 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java ## @@ -100,51 +100,115 @@ public String apiName() { final OffsetDeleteResponse response = (OffsetDeleteResponse) abstractResponse; Map> completed = new HashMap<>(); Map failed = new HashMap<>(); -List unmapped = new ArrayList<>(); +final Set groupsToUnmap = new HashSet<>(); +final Set groupsToRetry = new HashSet<>(); final Errors error = Errors.forCode(response.data().errorCode()); if (error != Errors.NONE) { -handleError(groupId, error, failed, unmapped); +handleGroupError(groupId, error, failed, groupsToUnmap, groupsToRetry); } else { -final Map partitions = new HashMap<>(); -response.data().topics().forEach(topic -> -topic.partitions().forEach(partition -> { -Errors partitionError = Errors.forCode(partition.errorCode()); -if (!handleError(groupId, partitionError, failed, unmapped)) { -partitions.put(new TopicPartition(topic.name(), partition.partitionIndex()), partitionError); +final Map partitionResults = new HashMap<>(); +response.data().topics().forEach(topic -> +topic.partitions().forEach(partitionoffsetDeleteResponse -> { +Errors partitionError = Errors.forCode(partitionoffsetDeleteResponse.errorCode()); +TopicPartition topicPartition = new TopicPartition(topic.name(), partitionoffsetDeleteResponse.partitionIndex()); +if (partitionError != Errors.NONE) { +handlePartitionError(groupId, partitionError, topicPartition, groupsToUnmap, groupsToRetry); } + +partitionResults.put(new TopicPartition(topic.name(), partitionoffsetDeleteResponse.partitionIndex()), partitionError); Review comment: put every error into partitionResults, as the log logic did -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org