[GitHub] [kafka] showuon commented on a change in pull request #11026: KAFKA-13064: refactor ListConsumerGroupOffsetsHandler and tests

2021-07-16 Thread GitBox


showuon commented on a change in pull request #11026:
URL: https://github.com/apache/kafka/pull/11026#discussion_r671039369



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
##
@@ -110,40 +124,96 @@ public String apiName() {
 groupOffsetsListing.put(topicPartition, new 
OffsetAndMetadata(offset, leaderEpoch, metadata));
 }
 } else {
-log.warn("Skipping return offset for {} due to error {}.", 
topicPartition, error);
+// In responseData V0 and V1, there's no top level error, 
we have to handle errors here
+handlePartitionError(groupId, topicPartition, error, 
groupsToUnmap, groupsToRetry);
 }
 }
 completed.put(groupId, groupOffsetsListing);
 }
-return new ApiResult<>(completed, failed, unmapped);
+
+if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
+return new ApiResult<>(
+completed,
+failed,
+Collections.emptyList()
+);
+} else {
+// retry the request, so don't send completed/failed results back
+return new ApiResult<>(
+Collections.emptyMap(),
+Collections.emptyMap(),
+new ArrayList<>(groupsToUnmap)
+);
+}
 }
 
-private void handleError(
+private void handleGroupError(
 CoordinatorKey groupId,
 Errors error,
-Map failed,
-List unmapped
+Map failed,
+Set groupsToUnmap,
+Set groupsToRetry
 ) {
 switch (error) {
 case GROUP_AUTHORIZATION_FAILED:
-log.error("Received authorization failure for group {} in 
`OffsetFetch` response", groupId,
-error.exception());
+log.debug("`OffsetFetch` request for group id {} failed due to 
error {}", groupId.idValue, error);
 failed.put(groupId, error.exception());
 break;
+
 case COORDINATOR_LOAD_IN_PROGRESS:
+// If the coordinator is in the middle of loading, then we 
just need to retry
+log.debug("`OffsetFetch` request for group id {} failed 
because the coordinator " +
+"is still in the process of loading state. Will retry", 
groupId.idValue);
+groupsToRetry.add(groupId);
+break;
 case COORDINATOR_NOT_AVAILABLE:
+case NOT_COORDINATOR:
+// If the coordinator is unavailable or there was a 
coordinator change, then we unmap
+// the key so that we retry the `FindCoordinator` request
+log.debug("`OffsetFetch` request for group id {} returned 
error {}. " +
+"Will attempt to find the coordinator again and retry", 
groupId.idValue, error);
+groupsToUnmap.add(groupId);
+break;
+
+default:
+final String unexpectedErrorMsg =
+String.format("`OffsetFetch` request for group id %s 
failed due to error %s", groupId.idValue, error);
+log.error(unexpectedErrorMsg);
+failed.put(groupId, error.exception(unexpectedErrorMsg));

Review comment:
   Updated. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11026: KAFKA-13064: refactor ListConsumerGroupOffsetsHandler and tests

2021-07-15 Thread GitBox


showuon commented on a change in pull request #11026:
URL: https://github.com/apache/kafka/pull/11026#discussion_r670917051



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
##
@@ -82,14 +93,17 @@ public String apiName() {
 Set groupIds,
 AbstractResponse abstractResponse
 ) {
+validateKeys(groupIds);
+
 final OffsetFetchResponse response = (OffsetFetchResponse) 
abstractResponse;
-Map> completed 
= new HashMap<>();
-Map failed = new HashMap<>();
-List unmapped = new ArrayList<>();
+final Map> 
completed = new HashMap<>();
+final Map failed = new HashMap<>();
+final Set groupsToUnmap = new HashSet<>();
+final Set groupsToRetry = new HashSet<>();
 
 Errors responseError = response.groupLevelError(groupId.idValue);

Review comment:
   @dajac , I also confirmed you are right. No need to sorry, I also missed 
that. I removed the `handlePartitionError`, and just return ApiResult just like 
other PRs did. Thank you. :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11026: KAFKA-13064: refactor ListConsumerGroupOffsetsHandler and tests

2021-07-15 Thread GitBox


showuon commented on a change in pull request #11026:
URL: https://github.com/apache/kafka/pull/11026#discussion_r670917051



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
##
@@ -82,14 +93,17 @@ public String apiName() {
 Set groupIds,
 AbstractResponse abstractResponse
 ) {
+validateKeys(groupIds);
+
 final OffsetFetchResponse response = (OffsetFetchResponse) 
abstractResponse;
-Map> completed 
= new HashMap<>();
-Map failed = new HashMap<>();
-List unmapped = new ArrayList<>();
+final Map> 
completed = new HashMap<>();
+final Map failed = new HashMap<>();
+final Set groupsToUnmap = new HashSet<>();
+final Set groupsToRetry = new HashSet<>();
 
 Errors responseError = response.groupLevelError(groupId.idValue);

Review comment:
   @dajac , I also confirmed that. No need to sorry, I also missed that. I 
removed the `handlePartitionError`, and just return ApiResult just like other 
PRs did. Thank you. :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11026: KAFKA-13064: refactor ListConsumerGroupOffsetsHandler and tests

2021-07-15 Thread GitBox


showuon commented on a change in pull request #11026:
URL: https://github.com/apache/kafka/pull/11026#discussion_r670488214



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
##
@@ -110,40 +124,96 @@ public String apiName() {
 groupOffsetsListing.put(topicPartition, new 
OffsetAndMetadata(offset, leaderEpoch, metadata));
 }
 } else {
-log.warn("Skipping return offset for {} due to error {}.", 
topicPartition, error);
+// In responseData V0 and V1, there's no top level error, 
we have to handle errors here
+handlePartitionError(groupId, topicPartition, error, 
groupsToUnmap, groupsToRetry);
 }
 }
 completed.put(groupId, groupOffsetsListing);
 }
-return new ApiResult<>(completed, failed, unmapped);
+
+if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
+return new ApiResult<>(
+completed,
+failed,
+Collections.emptyList()
+);
+} else {
+// retry the request, so don't send completed/failed results back
+return new ApiResult<>(
+Collections.emptyMap(),
+Collections.emptyMap(),
+new ArrayList<>(groupsToUnmap)
+);
+}
 }
 
-private void handleError(
+private void handleGroupError(
 CoordinatorKey groupId,
 Errors error,
-Map failed,
-List unmapped
+Map failed,
+Set groupsToUnmap,
+Set groupsToRetry
 ) {
 switch (error) {
 case GROUP_AUTHORIZATION_FAILED:
-log.error("Received authorization failure for group {} in 
`OffsetFetch` response", groupId,
-error.exception());
+log.debug("`OffsetFetch` request for group id {} failed due to 
error {}", groupId.idValue, error);
 failed.put(groupId, error.exception());
 break;
+
 case COORDINATOR_LOAD_IN_PROGRESS:
+// If the coordinator is in the middle of loading, then we 
just need to retry
+log.debug("`OffsetFetch` request for group id {} failed 
because the coordinator " +
+"is still in the process of loading state. Will retry", 
groupId.idValue);
+groupsToRetry.add(groupId);
+break;
 case COORDINATOR_NOT_AVAILABLE:
+case NOT_COORDINATOR:
+// If the coordinator is unavailable or there was a 
coordinator change, then we unmap
+// the key so that we retry the `FindCoordinator` request
+log.debug("`OffsetFetch` request for group id {} returned 
error {}. " +
+"Will attempt to find the coordinator again and retry", 
groupId.idValue, error);
+groupsToUnmap.add(groupId);
+break;
+
+default:
+final String unexpectedErrorMsg =
+String.format("`OffsetFetch` request for group id %s 
failed due to error %s", groupId.idValue, error);
+log.error(unexpectedErrorMsg);
+failed.put(groupId, error.exception(unexpectedErrorMsg));
+}
+}
+
+private void handlePartitionError(
+CoordinatorKey groupId,
+TopicPartition topicPartition,
+Errors error,
+Set groupsToUnmap,
+Set groupsToRetry
+) {
+switch (error) {
+case COORDINATOR_LOAD_IN_PROGRESS:
+// If the coordinator is in the middle of loading, then we 
just need to retry
+log.debug("`{}` request for group {} failed because the 
coordinator " +

Review comment:
   Oh, sorry, I forgot the partitionError section. Will do.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11026: KAFKA-13064: refactor ListConsumerGroupOffsetsHandler and tests

2021-07-15 Thread GitBox


showuon commented on a change in pull request #11026:
URL: https://github.com/apache/kafka/pull/11026#discussion_r670487440



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
##
@@ -110,41 +112,97 @@ public String apiName() {
 groupOffsetsListing.put(topicPartition, new 
OffsetAndMetadata(offset, leaderEpoch, metadata));
 }
 } else {
-log.warn("Skipping return offset for {} due to error {}.", 
topicPartition, error);
+handlePartitionError(groupId, topicPartition, error, 
groupsToUnmap, groupsToRetry);
 }
 }
 completed.put(groupId, groupOffsetsListing);
 }
-return new ApiResult<>(completed, failed, unmapped);
+
+if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
+return new ApiResult<>(
+completed,

Review comment:
   Good suggestion! Will do it tomorrow (my time). Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11026: KAFKA-13064: refactor ListConsumerGroupOffsetsHandler and tests

2021-07-14 Thread GitBox


showuon commented on a change in pull request #11026:
URL: https://github.com/apache/kafka/pull/11026#discussion_r669628826



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
##
@@ -110,41 +112,97 @@ public String apiName() {
 groupOffsetsListing.put(topicPartition, new 
OffsetAndMetadata(offset, leaderEpoch, metadata));
 }
 } else {
-log.warn("Skipping return offset for {} due to error {}.", 
topicPartition, error);
+handlePartitionError(groupId, topicPartition, error, 
groupsToUnmap, groupsToRetry);

Review comment:
   Sorry, it should be v0-v1, I'll update it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11026: KAFKA-13064: refactor ListConsumerGroupOffsetsHandler and tests

2021-07-14 Thread GitBox


showuon commented on a change in pull request #11026:
URL: https://github.com/apache/kafka/pull/11026#discussion_r669554178



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
##
@@ -85,11 +86,12 @@ public String apiName() {
 final OffsetFetchResponse response = (OffsetFetchResponse) 
abstractResponse;
 Map> completed 
= new HashMap<>();
 Map failed = new HashMap<>();
-List unmapped = new ArrayList<>();
+final Set groupsToUnmap = new HashSet<>();

Review comment:
   Updated! I also updated in all other PRs. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11026: KAFKA-13064: refactor ListConsumerGroupOffsetsHandler and tests

2021-07-14 Thread GitBox


showuon commented on a change in pull request #11026:
URL: https://github.com/apache/kafka/pull/11026#discussion_r669553529



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
##
@@ -110,41 +112,97 @@ public String apiName() {
 groupOffsetsListing.put(topicPartition, new 
OffsetAndMetadata(offset, leaderEpoch, metadata));
 }
 } else {
-log.warn("Skipping return offset for {} due to error {}.", 
topicPartition, error);
+handlePartitionError(groupId, topicPartition, error, 
groupsToUnmap, groupsToRetry);
 }
 }
 completed.put(groupId, groupOffsetsListing);
 }
-return new ApiResult<>(completed, failed, unmapped);
+
+if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
+return new ApiResult<>(
+completed,

Review comment:
   No, we can't do that because the `completed` here could be `empty map`. 
If we put `Collections.singletonMap(groupId, groupOffsetsListing)`, it'll 
always not empty. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11026: KAFKA-13064: refactor ListConsumerGroupOffsetsHandler and tests

2021-07-14 Thread GitBox


showuon commented on a change in pull request #11026:
URL: https://github.com/apache/kafka/pull/11026#discussion_r669551754



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
##
@@ -110,41 +112,97 @@ public String apiName() {
 groupOffsetsListing.put(topicPartition, new 
OffsetAndMetadata(offset, leaderEpoch, metadata));
 }
 } else {
-log.warn("Skipping return offset for {} due to error {}.", 
topicPartition, error);
+handlePartitionError(groupId, topicPartition, error, 
groupsToUnmap, groupsToRetry);

Review comment:
   Marked KAFKA-13064 as blocker, and add comments:
   `// In responseData V0-V7, there's no group level error, we have to handle 
partition errors here`
   Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org