[GitHub] [kafka] showuon commented on a change in pull request #11019: KAFKA-13059: refactor DeleteConsumerGroupOffsetsHandler and tests

2021-07-14 Thread GitBox


showuon commented on a change in pull request #11019:
URL: https://github.com/apache/kafka/pull/11019#discussion_r670111886



##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java
##
@@ -67,48 +69,88 @@ public void testBuildRequest() {
 @Test
 public void testSuccessfulHandleResponse() {
 Map responseData = 
Collections.singletonMap(t0p0, Errors.NONE);
-assertCompleted(handleWithError(Errors.NONE), responseData);
+assertCompleted(handleWithGroupError(Errors.NONE), responseData);
 }
 
 @Test
 public void testUnmappedHandleResponse() {
-assertUnmapped(handleWithError(Errors.NOT_COORDINATOR));
+assertUnmapped(handleWithGroupError(Errors.NOT_COORDINATOR));
+assertUnmapped(handleWithGroupError(Errors.COORDINATOR_NOT_AVAILABLE));
 }
 
 @Test
 public void testRetriableHandleResponse() {
-assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS));
-assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE));
+
assertRetriable(handleWithGroupError(Errors.COORDINATOR_LOAD_IN_PROGRESS));
 }
 
 @Test
-public void testFailedHandleResponse() {
-assertFailed(GroupAuthorizationException.class, 
handleWithError(Errors.GROUP_AUTHORIZATION_FAILED));
-assertFailed(GroupIdNotFoundException.class, 
handleWithError(Errors.GROUP_ID_NOT_FOUND));
-assertFailed(InvalidGroupIdException.class, 
handleWithError(Errors.INVALID_GROUP_ID));
+public void testFailedHandleResponseWithGroupError() {
+assertGroupFailed(GroupAuthorizationException.class, 
handleWithGroupError(Errors.GROUP_AUTHORIZATION_FAILED));
+assertGroupFailed(GroupIdNotFoundException.class, 
handleWithGroupError(Errors.GROUP_ID_NOT_FOUND));
+assertGroupFailed(InvalidGroupIdException.class, 
handleWithGroupError(Errors.INVALID_GROUP_ID));
+assertGroupFailed(GroupNotEmptyException.class, 
handleWithGroupError(Errors.NON_EMPTY_GROUP));
 }
 
-private OffsetDeleteResponse buildResponse(Errors error) {
+@Test
+public void testFailedHandleResponseWithPartitionError() {
+assertPartitionFailed(Collections.singletonMap(t0p0, 
Errors.GROUP_SUBSCRIBED_TO_TOPIC),
+handleWithPartitionError(Errors.GROUP_SUBSCRIBED_TO_TOPIC));
+assertPartitionFailed(Collections.singletonMap(t0p0, 
Errors.TOPIC_AUTHORIZATION_FAILED),
+handleWithPartitionError(Errors.TOPIC_AUTHORIZATION_FAILED));
+assertPartitionFailed(Collections.singletonMap(t0p0, 
Errors.UNKNOWN_TOPIC_OR_PARTITION),
+handleWithPartitionError(Errors.UNKNOWN_TOPIC_OR_PARTITION));
+}
+
+private OffsetDeleteResponse buildGroupErrorResponse(Errors error) {
+OffsetDeleteResponse response = new OffsetDeleteResponse(
+new OffsetDeleteResponseData()
+.setErrorCode(error.code()));
+if (error == Errors.NONE) {
+response.data()
+.setThrottleTimeMs(0)
+.setTopics(new 
OffsetDeleteResponseTopicCollection(singletonList(
+new OffsetDeleteResponseTopic()
+.setName(t0p0.topic())
+.setPartitions(new 
OffsetDeleteResponsePartitionCollection(singletonList(
+new OffsetDeleteResponsePartition()
+.setPartitionIndex(t0p0.partition())
+.setErrorCode(error.code())
+).iterator()))
+).iterator()));
+}
+return response;
+}
+
+private OffsetDeleteResponse buildPartitionErrorResponse(Errors error) {
 OffsetDeleteResponse response = new OffsetDeleteResponse(
-new OffsetDeleteResponseData()
-.setThrottleTimeMs(0)
-.setTopics(new 
OffsetDeleteResponseTopicCollection(singletonList(
-new OffsetDeleteResponseTopic()
-.setName("t0")
-.setPartitions(new 
OffsetDeleteResponsePartitionCollection(singletonList(
-new OffsetDeleteResponsePartition()
-.setPartitionIndex(0)
-.setErrorCode(error.code())
- ).iterator()))
-  ).iterator(;
+new OffsetDeleteResponseData()
+.setThrottleTimeMs(0)
+.setTopics(new 
OffsetDeleteResponseTopicCollection(singletonList(
+new OffsetDeleteResponseTopic()
+.setName(t0p0.topic())
+.setPartitions(new 
OffsetDeleteResponsePartitionCollection(singletonList(
+new OffsetDeleteResponsePartition()
+   

[GitHub] [kafka] showuon commented on a change in pull request #11019: KAFKA-13059: refactor DeleteConsumerGroupOffsetsHandler and tests

2021-07-14 Thread GitBox


showuon commented on a change in pull request #11019:
URL: https://github.com/apache/kafka/pull/11019#discussion_r669472461



##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java
##
@@ -149,4 +191,21 @@ private void assertFailed(
 assertEquals(singleton(key), result.failedKeys.keySet());
 
assertTrue(expectedExceptionType.isInstance(result.failedKeys.get(key)));
 }
-}
+
+private void assertPartitionFailed(
+Map expectedResult,
+AdminApiHandler.ApiResult> 
result
+) {
+CoordinatorKey key = CoordinatorKey.byGroupId(groupId);
+assertEquals(singleton(key), result.completedKeys.keySet());
+
+// verify the completed value is expected result
+Collection> completeCollection = 
result.completedKeys.values();
+assertEquals(1, completeCollection.size());
+Map completeMap = 
completeCollection.iterator().next();
+assertEquals(expectedResult, completeMap);

Review comment:
   Good suggestion! Updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11019: KAFKA-13059: refactor DeleteConsumerGroupOffsetsHandler and tests

2021-07-14 Thread GitBox


showuon commented on a change in pull request #11019:
URL: https://github.com/apache/kafka/pull/11019#discussion_r669472287



##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##
@@ -3359,23 +3360,23 @@ public void testDeleteConsumerGroupOffsets() throws 
Exception {
 prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
 
 env.kafkaClient().prepareResponse(new OffsetDeleteResponse(
-new OffsetDeleteResponseData()
-.setTopics(new 
OffsetDeleteResponseTopicCollection(Stream.of(
-new OffsetDeleteResponseTopic()
-.setName("foo")
-.setPartitions(new 
OffsetDeleteResponsePartitionCollection(Collections.singletonList(
-new OffsetDeleteResponsePartition()
-.setPartitionIndex(0)
-.setErrorCode(Errors.NONE.code())
-).iterator())),
-new OffsetDeleteResponseTopic()
-.setName("bar")
-.setPartitions(new 
OffsetDeleteResponsePartitionCollection(Collections.singletonList(
-new OffsetDeleteResponsePartition()
-.setPartitionIndex(0)
-
.setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code())
-).iterator()))
-).collect(Collectors.toList()).iterator()))
+new OffsetDeleteResponseData()
+.setTopics(new 
OffsetDeleteResponseTopicCollection(Stream.of(
+new OffsetDeleteResponseTopic()
+.setName("foo")
+.setPartitions(new 
OffsetDeleteResponsePartitionCollection(Collections.singletonList(
+new OffsetDeleteResponsePartition()
+.setPartitionIndex(0)
+.setErrorCode(Errors.NONE.code())
+).iterator())),
+new OffsetDeleteResponseTopic()
+.setName("bar")
+.setPartitions(new 
OffsetDeleteResponsePartitionCollection(Collections.singletonList(
+new OffsetDeleteResponsePartition()
+.setPartitionIndex(0)
+
.setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code())
+).iterator()))
+).collect(Collectors.toList()).iterator()))

Review comment:
   Sorry, I accidentally did it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11019: KAFKA-13059: refactor DeleteConsumerGroupOffsetsHandler and tests

2021-07-14 Thread GitBox


showuon commented on a change in pull request #11019:
URL: https://github.com/apache/kafka/pull/11019#discussion_r669471854



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java
##
@@ -97,54 +108,79 @@ public String apiName() {
 Set groupIds,
 AbstractResponse abstractResponse
 ) {
+validateKeys(groupIds);
+
 final OffsetDeleteResponse response = (OffsetDeleteResponse) 
abstractResponse;
-Map> completed = new 
HashMap<>();
-Map failed = new HashMap<>();
-List unmapped = new ArrayList<>();
+final Map> completed = new 
HashMap<>();
+final Map failed = new HashMap<>();
+final Set groupsToUnmap = new HashSet<>();
+final Set groupsToRetry = new HashSet<>();
 
 final Errors error = Errors.forCode(response.data().errorCode());
 if (error != Errors.NONE) {
-handleError(groupId, error, failed, unmapped);
+handleGroupError(groupId, error, failed, groupsToUnmap, 
groupsToRetry);
 } else {
-final Map partitions = new HashMap<>();
-response.data().topics().forEach(topic -> 
+final Map partitionResults = new 
HashMap<>();
+response.data().topics().forEach(topic ->
 topic.partitions().forEach(partition -> {
 Errors partitionError = 
Errors.forCode(partition.errorCode());
-if (!handleError(groupId, partitionError, failed, 
unmapped)) {
-partitions.put(new TopicPartition(topic.name(), 
partition.partitionIndex()), partitionError);
-}
+
+partitionResults.put(new TopicPartition(topic.name(), 
partition.partitionIndex()), partitionError);
 })
 );
-if (!partitions.isEmpty())
-completed.put(groupId, partitions);
+
+completed.put(groupId, partitionResults);
+}
+
+if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
+return new ApiResult<>(
+completed,
+failed,
+Collections.emptyList()
+);
+} else {
+// retry the request, so don't send completed/failed results back
+return new ApiResult<>(
+Collections.emptyMap(),
+Collections.emptyMap(),
+new ArrayList<>(groupsToUnmap)
+);
 }
-return new ApiResult<>(completed, failed, unmapped);
 }
 
-private boolean handleError(
+private void handleGroupError(
 CoordinatorKey groupId,
 Errors error,
 Map failed,
-List unmapped
+Set groupsToUnmap,
+Set groupsToRetry
 ) {
 switch (error) {
 case GROUP_AUTHORIZATION_FAILED:
 case GROUP_ID_NOT_FOUND:
 case INVALID_GROUP_ID:
-log.error("Received non retriable error for group {} in 
`DeleteConsumerGroupOffsets` response", groupId,
-error.exception());
+case NON_EMPTY_GROUP:
+log.debug("`OffsetDelete` request for group id {} failed due 
to error {}.", groupId, error);
 failed.put(groupId, error.exception());
-return true;
+break;
 case COORDINATOR_LOAD_IN_PROGRESS:
+// If the coordinator is in the middle of loading, then we 
just need to retry
+log.debug("`OffsetDelete` request for group {} failed because 
the coordinator" +

Review comment:
   Updated. I'll also update other PRs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11019: KAFKA-13059: refactor DeleteConsumerGroupOffsetsHandler and tests

2021-07-14 Thread GitBox


showuon commented on a change in pull request #11019:
URL: https://github.com/apache/kafka/pull/11019#discussion_r669471550



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java
##
@@ -97,54 +108,79 @@ public String apiName() {
 Set groupIds,
 AbstractResponse abstractResponse
 ) {
+validateKeys(groupIds);
+
 final OffsetDeleteResponse response = (OffsetDeleteResponse) 
abstractResponse;
-Map> completed = new 
HashMap<>();
-Map failed = new HashMap<>();
-List unmapped = new ArrayList<>();
+final Map> completed = new 
HashMap<>();
+final Map failed = new HashMap<>();
+final Set groupsToUnmap = new HashSet<>();
+final Set groupsToRetry = new HashSet<>();
 
 final Errors error = Errors.forCode(response.data().errorCode());
 if (error != Errors.NONE) {
-handleError(groupId, error, failed, unmapped);
+handleGroupError(groupId, error, failed, groupsToUnmap, 
groupsToRetry);
 } else {
-final Map partitions = new HashMap<>();
-response.data().topics().forEach(topic -> 
+final Map partitionResults = new 
HashMap<>();
+response.data().topics().forEach(topic ->
 topic.partitions().forEach(partition -> {
 Errors partitionError = 
Errors.forCode(partition.errorCode());
-if (!handleError(groupId, partitionError, failed, 
unmapped)) {
-partitions.put(new TopicPartition(topic.name(), 
partition.partitionIndex()), partitionError);
-}
+
+partitionResults.put(new TopicPartition(topic.name(), 
partition.partitionIndex()), partitionError);
 })
 );
-if (!partitions.isEmpty())
-completed.put(groupId, partitions);
+
+completed.put(groupId, partitionResults);
+}
+
+if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
+return new ApiResult<>(
+completed,
+failed,
+Collections.emptyList()
+);
+} else {
+// retry the request, so don't send completed/failed results back
+return new ApiResult<>(
+Collections.emptyMap(),
+Collections.emptyMap(),
+new ArrayList<>(groupsToUnmap)
+);
 }
-return new ApiResult<>(completed, failed, unmapped);
 }
 
-private boolean handleError(
+private void handleGroupError(
 CoordinatorKey groupId,
 Errors error,
 Map failed,
-List unmapped
+Set groupsToUnmap,
+Set groupsToRetry
 ) {
 switch (error) {
 case GROUP_AUTHORIZATION_FAILED:
 case GROUP_ID_NOT_FOUND:
 case INVALID_GROUP_ID:
-log.error("Received non retriable error for group {} in 
`DeleteConsumerGroupOffsets` response", groupId,
-error.exception());
+case NON_EMPTY_GROUP:
+log.debug("`OffsetDelete` request for group id {} failed due 
to error {}.", groupId, error);

Review comment:
   Nice catch! I'll also update other PRs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11019: KAFKA-13059: refactor DeleteConsumerGroupOffsetsHandler and tests

2021-07-14 Thread GitBox


showuon commented on a change in pull request #11019:
URL: https://github.com/apache/kafka/pull/11019#discussion_r669471228



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java
##
@@ -97,54 +108,79 @@ public String apiName() {
 Set groupIds,
 AbstractResponse abstractResponse
 ) {
+validateKeys(groupIds);
+
 final OffsetDeleteResponse response = (OffsetDeleteResponse) 
abstractResponse;
-Map> completed = new 
HashMap<>();
-Map failed = new HashMap<>();
-List unmapped = new ArrayList<>();
+final Map> completed = new 
HashMap<>();
+final Map failed = new HashMap<>();
+final Set groupsToUnmap = new HashSet<>();
+final Set groupsToRetry = new HashSet<>();
 
 final Errors error = Errors.forCode(response.data().errorCode());
 if (error != Errors.NONE) {
-handleError(groupId, error, failed, unmapped);
+handleGroupError(groupId, error, failed, groupsToUnmap, 
groupsToRetry);

Review comment:
   good suggestion! Updated!

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java
##
@@ -97,54 +108,79 @@ public String apiName() {
 Set groupIds,
 AbstractResponse abstractResponse
 ) {
+validateKeys(groupIds);
+
 final OffsetDeleteResponse response = (OffsetDeleteResponse) 
abstractResponse;
-Map> completed = new 
HashMap<>();
-Map failed = new HashMap<>();
-List unmapped = new ArrayList<>();
+final Map> completed = new 
HashMap<>();
+final Map failed = new HashMap<>();
+final Set groupsToUnmap = new HashSet<>();
+final Set groupsToRetry = new HashSet<>();
 
 final Errors error = Errors.forCode(response.data().errorCode());
 if (error != Errors.NONE) {
-handleError(groupId, error, failed, unmapped);
+handleGroupError(groupId, error, failed, groupsToUnmap, 
groupsToRetry);
 } else {
-final Map partitions = new HashMap<>();
-response.data().topics().forEach(topic -> 
+final Map partitionResults = new 
HashMap<>();
+response.data().topics().forEach(topic ->
 topic.partitions().forEach(partition -> {
 Errors partitionError = 
Errors.forCode(partition.errorCode());
-if (!handleError(groupId, partitionError, failed, 
unmapped)) {
-partitions.put(new TopicPartition(topic.name(), 
partition.partitionIndex()), partitionError);
-}
+
+partitionResults.put(new TopicPartition(topic.name(), 
partition.partitionIndex()), partitionError);
 })
 );
-if (!partitions.isEmpty())
-completed.put(groupId, partitions);
+
+completed.put(groupId, partitionResults);

Review comment:
   Updated




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11019: KAFKA-13059: refactor DeleteConsumerGroupOffsetsHandler and tests

2021-07-13 Thread GitBox


showuon commented on a change in pull request #11019:
URL: https://github.com/apache/kafka/pull/11019#discussion_r668787834



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java
##
@@ -100,51 +100,115 @@ public String apiName() {
 final OffsetDeleteResponse response = (OffsetDeleteResponse) 
abstractResponse;
 Map> completed = new 
HashMap<>();
 Map failed = new HashMap<>();
-List unmapped = new ArrayList<>();
+final Set groupsToUnmap = new HashSet<>();
+final Set groupsToRetry = new HashSet<>();
 
 final Errors error = Errors.forCode(response.data().errorCode());
 if (error != Errors.NONE) {
-handleError(groupId, error, failed, unmapped);
+handleGroupError(groupId, error, failed, groupsToUnmap, 
groupsToRetry);
 } else {
-final Map partitions = new HashMap<>();
-response.data().topics().forEach(topic -> 
-topic.partitions().forEach(partition -> {
-Errors partitionError = 
Errors.forCode(partition.errorCode());
-if (!handleError(groupId, partitionError, failed, 
unmapped)) {
-partitions.put(new TopicPartition(topic.name(), 
partition.partitionIndex()), partitionError);
+final Map partitionResults = new 
HashMap<>();
+response.data().topics().forEach(topic ->
+topic.partitions().forEach(partitionoffsetDeleteResponse -> {
+Errors partitionError = 
Errors.forCode(partitionoffsetDeleteResponse.errorCode());
+TopicPartition topicPartition = new 
TopicPartition(topic.name(), partitionoffsetDeleteResponse.partitionIndex());
+if (partitionError != Errors.NONE) {
+handlePartitionError(groupId, partitionError, 
topicPartition, groupsToUnmap, groupsToRetry);

Review comment:
   Yes, I was doing the way you suggested, but there's test failed due to 
that change: `testDeleteConsumerGroupOffsetsNumRetries` in 
`KafkaAdminClientTest`. It put the `NOT_COORDINATOR` in partition error, and 
expected to retry. That's why I changed to this. 
   What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11019: KAFKA-13059: refactor DeleteConsumerGroupOffsetsHandler and tests

2021-07-13 Thread GitBox


showuon commented on a change in pull request #11019:
URL: https://github.com/apache/kafka/pull/11019#discussion_r668780259



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java
##
@@ -100,51 +100,115 @@ public String apiName() {
 final OffsetDeleteResponse response = (OffsetDeleteResponse) 
abstractResponse;
 Map> completed = new 
HashMap<>();
 Map failed = new HashMap<>();
-List unmapped = new ArrayList<>();
+final Set groupsToUnmap = new HashSet<>();
+final Set groupsToRetry = new HashSet<>();
 
 final Errors error = Errors.forCode(response.data().errorCode());
 if (error != Errors.NONE) {
-handleError(groupId, error, failed, unmapped);
+handleGroupError(groupId, error, failed, groupsToUnmap, 
groupsToRetry);
 } else {
-final Map partitions = new HashMap<>();
-response.data().topics().forEach(topic -> 
-topic.partitions().forEach(partition -> {
-Errors partitionError = 
Errors.forCode(partition.errorCode());
-if (!handleError(groupId, partitionError, failed, 
unmapped)) {
-partitions.put(new TopicPartition(topic.name(), 
partition.partitionIndex()), partitionError);
+final Map partitionResults = new 
HashMap<>();
+response.data().topics().forEach(topic ->
+topic.partitions().forEach(partitionoffsetDeleteResponse -> {

Review comment:
   Updated. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11019: KAFKA-13059: refactor DeleteConsumerGroupOffsetsHandler and tests

2021-07-12 Thread GitBox


showuon commented on a change in pull request #11019:
URL: https://github.com/apache/kafka/pull/11019#discussion_r667675187



##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##
@@ -3345,12 +3345,9 @@ public void testDeleteConsumerGroupOffsetsRetryBackoff() 
throws Exception {
 }
 
 @Test
-public void testDeleteConsumerGroupOffsets() throws Exception {
-// Happy path
-
+public void 
testDeleteConsumerGroupOffsetsResponseIncludeCoordinatorErrorAndNoneError() 
throws Exception {

Review comment:
   Add a test to include coordinator error and other None errors in all 
partition response. We should retry it, too.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11019: KAFKA-13059: refactor DeleteConsumerGroupOffsetsHandler and tests

2021-07-12 Thread GitBox


showuon commented on a change in pull request #11019:
URL: https://github.com/apache/kafka/pull/11019#discussion_r667673170



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java
##
@@ -100,51 +100,115 @@ public String apiName() {
 final OffsetDeleteResponse response = (OffsetDeleteResponse) 
abstractResponse;
 Map> completed = new 
HashMap<>();
 Map failed = new HashMap<>();
-List unmapped = new ArrayList<>();
+final Set groupsToUnmap = new HashSet<>();
+final Set groupsToRetry = new HashSet<>();
 
 final Errors error = Errors.forCode(response.data().errorCode());
 if (error != Errors.NONE) {
-handleError(groupId, error, failed, unmapped);
+handleGroupError(groupId, error, failed, groupsToUnmap, 
groupsToRetry);
 } else {
-final Map partitions = new HashMap<>();
-response.data().topics().forEach(topic -> 
-topic.partitions().forEach(partition -> {
-Errors partitionError = 
Errors.forCode(partition.errorCode());
-if (!handleError(groupId, partitionError, failed, 
unmapped)) {
-partitions.put(new TopicPartition(topic.name(), 
partition.partitionIndex()), partitionError);
+final Map partitionResults = new 
HashMap<>();
+response.data().topics().forEach(topic ->
+topic.partitions().forEach(partitionoffsetDeleteResponse -> {
+Errors partitionError = 
Errors.forCode(partitionoffsetDeleteResponse.errorCode());
+TopicPartition topicPartition = new 
TopicPartition(topic.name(), partitionoffsetDeleteResponse.partitionIndex());
+if (partitionError != Errors.NONE) {
+handlePartitionError(groupId, partitionError, 
topicPartition, groupsToUnmap, groupsToRetry);
 }
+
+partitionResults.put(new TopicPartition(topic.name(), 
partitionoffsetDeleteResponse.partitionIndex()), partitionError);
 })
 );
-if (!partitions.isEmpty())
-completed.put(groupId, partitions);
+
+completed.put(groupId, partitionResults);
+}
+
+if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
+return new ApiResult<>(
+completed,
+failed,
+Collections.emptyList()
+);
+} else {
+// retry the request, so don't send completed/failed results back

Review comment:
   Refer to https://github.com/apache/kafka/pull/11016, we don't return any 
`completed/failed` results if we need to retry.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11019: KAFKA-13059: refactor DeleteConsumerGroupOffsetsHandler and tests

2021-07-11 Thread GitBox


showuon commented on a change in pull request #11019:
URL: https://github.com/apache/kafka/pull/11019#discussion_r667672477



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java
##
@@ -100,51 +100,115 @@ public String apiName() {
 final OffsetDeleteResponse response = (OffsetDeleteResponse) 
abstractResponse;
 Map> completed = new 
HashMap<>();
 Map failed = new HashMap<>();
-List unmapped = new ArrayList<>();
+final Set groupsToUnmap = new HashSet<>();
+final Set groupsToRetry = new HashSet<>();
 
 final Errors error = Errors.forCode(response.data().errorCode());
 if (error != Errors.NONE) {
-handleError(groupId, error, failed, unmapped);
+handleGroupError(groupId, error, failed, groupsToUnmap, 
groupsToRetry);
 } else {
-final Map partitions = new HashMap<>();
-response.data().topics().forEach(topic -> 
-topic.partitions().forEach(partition -> {
-Errors partitionError = 
Errors.forCode(partition.errorCode());
-if (!handleError(groupId, partitionError, failed, 
unmapped)) {
-partitions.put(new TopicPartition(topic.name(), 
partition.partitionIndex()), partitionError);
+final Map partitionResults = new 
HashMap<>();
+response.data().topics().forEach(topic ->
+topic.partitions().forEach(partitionoffsetDeleteResponse -> {
+Errors partitionError = 
Errors.forCode(partitionoffsetDeleteResponse.errorCode());
+TopicPartition topicPartition = new 
TopicPartition(topic.name(), partitionoffsetDeleteResponse.partitionIndex());
+if (partitionError != Errors.NONE) {
+handlePartitionError(groupId, partitionError, 
topicPartition, groupsToUnmap, groupsToRetry);
 }
+
+partitionResults.put(new TopicPartition(topic.name(), 
partitionoffsetDeleteResponse.partitionIndex()), partitionError);

Review comment:
   put every error into partitionResults, as the log logic did




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org