skaundinya15 commented on a change in pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#discussion_r664689037



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -1308,29 +1308,31 @@ private OffsetFetchResponseHandler() {
 
         @Override
         public void handle(OffsetFetchResponse response, 
RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
-            if (response.hasError()) {
-                Errors error = response.error();
-                log.debug("Offset fetch failed: {}", error.message());
+            Errors responseError = 
response.groupLevelError(rebalanceConfig.groupId);
+            if (responseError != Errors.NONE) {
+                log.debug("Offset fetch failed: {}", responseError.message());
 
-                if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
+                if (responseError == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
                     // just retry
-                    future.raise(error);
-                } else if (error == Errors.NOT_COORDINATOR) {
+                    future.raise(responseError);
+                } else if (responseError == Errors.NOT_COORDINATOR) {
                     // re-discover the coordinator and retry
-                    markCoordinatorUnknown(error);
-                    future.raise(error);
-                } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
+                    markCoordinatorUnknown(responseError);
+                    future.raise(responseError);
+                } else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) 
{
                     
future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
                 } else {
-                    future.raise(new KafkaException("Unexpected error in fetch 
offset response: " + error.message()));
+                    future.raise(new KafkaException("Unexpected error in fetch 
offset response: " + responseError.message()));
                 }
                 return;
             }
 
             Set<String> unauthorizedTopics = null;
-            Map<TopicPartition, OffsetAndMetadata> offsets = new 
HashMap<>(response.responseData().size());
+            Map<TopicPartition, OffsetFetchResponse.PartitionData> 
responseData =

Review comment:
       What should we rename this to? I guess technically this would be the 
`responseData` as in the next line we extract the actual `offsets` map that is 
a`Map<TopicPartition, OffsetAndMetadata>`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to