guozhangwang commented on code in PR #13380:
URL: https://github.com/apache/kafka/pull/13380#discussion_r1137588681


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -235,4 +373,105 @@ public void ack(final long currentTimeMs) {
             this.timer.update(currentTimeMs);
         }
     }
+
+    private class FetchCommittedOffsetResponseHandler {

Review Comment:
   I saw that we do retry on just the `COORDINATOR_LOAD_IN_PROGRESS` and 
`NOT_COORDINATOR` and `unstableTxnOffsetTopicPartitions`. Just confirming with 
you are they the only possible retriable errors for this response (saw 
`OffsetFetchResponse` there are others)? What about `COORDINATOR_NOT_AVAILABLE`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -235,4 +373,105 @@ public void ack(final long currentTimeMs) {
             this.timer.update(currentTimeMs);
         }
     }
+
+    private class FetchCommittedOffsetResponseHandler {
+        private final UnsentOffsetFetchRequestState request;
+
+        private FetchCommittedOffsetResponseHandler(final 
UnsentOffsetFetchRequestState request) {
+            this.request = request;
+        }
+
+        public void onResponse(
+                final long currentTimeMs,
+                final OffsetFetchResponse response) {
+            Errors responseError = 
response.groupLevelError(groupState.groupId);
+            if (responseError != Errors.NONE) {
+                onFailure(currentTimeMs, responseError);
+                return;
+            }
+
+            onSuccess(currentTimeMs, response);
+        }
+        private void onFailure(final long currentTimeMs,
+                               final Errors responseError) {
+            log.debug("Offset fetch failed: {}", responseError.message());
+
+            if (responseError == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
+                retry(currentTimeMs);
+            } else if (responseError == Errors.NOT_COORDINATOR) {
+                // re-discover the coordinator and retry
+                
coordinatorRequestManager.markCoordinatorUnknown(responseError.message(), 
Time.SYSTEM.milliseconds());
+                retry(currentTimeMs);
+            } else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) {
+                // TODO: I'm not sure if we should retry here.  Sounds like we 
should propagate the error to let the
+                //  user to fix the permission
+                
request.future.completeExceptionally(GroupAuthorizationException.forGroupId(groupState.groupId));
+            } else {
+                request.future.completeExceptionally(new 
KafkaException("Unexpected error in fetch offset response: " + 
responseError.message()));
+            }
+            return;
+        }
+
+        private void retry(final long currentTimeMs) {
+            this.request.onFailedAttempt(currentTimeMs);
+            unsentOffsetFetchRequests.enqueue(this.request);
+        }
+
+        private void onSuccess(final long currentTimeMs,
+                               final OffsetFetchResponse response) {
+            Set<String> unauthorizedTopics = null;
+            Map<TopicPartition, OffsetFetchResponse.PartitionData> 
responseData =
+                    response.partitionDataMap(groupState.groupId);
+            Map<TopicPartition, OffsetAndMetadata> offsets = new 
HashMap<>(responseData.size());
+            Set<TopicPartition> unstableTxnOffsetTopicPartitions = new 
HashSet<>();
+            for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> 
entry : responseData.entrySet()) {
+                TopicPartition tp = entry.getKey();
+                OffsetFetchResponse.PartitionData partitionData = 
entry.getValue();
+                if (partitionData.hasError()) {
+                    Errors error = partitionData.error;
+                    log.debug("Failed to fetch offset for partition {}: {}", 
tp, error.message());
+
+                    if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
+                        request.future.completeExceptionally(new 
KafkaException("Topic or Partition " + tp + " does not " +
+                                "exist"));
+                        return;
+                    } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
+                        if (unauthorizedTopics == null) {
+                            unauthorizedTopics = new HashSet<>();
+                        }
+                        unauthorizedTopics.add(tp.topic());
+                    } else if (error == Errors.UNSTABLE_OFFSET_COMMIT) {
+                        System.out.println("asdkljlsadjflksajfdlk");
+                        unstableTxnOffsetTopicPartitions.add(tp);
+                    } else {
+                        request.future.completeExceptionally(new 
KafkaException("Unexpected error in fetch offset " +
+                                "response for partition " + tp + ": " + 
error.message()));
+                        return;
+                    }
+                } else if (partitionData.offset >= 0) {
+                    // record the position with the offset (-1 indicates no 
committed offset to fetch);
+                    // if there's no committed offset, record as null
+                    offsets.put(tp, new 
OffsetAndMetadata(partitionData.offset, partitionData.leaderEpoch, 
partitionData.metadata));
+                } else {
+                    log.info("Found no committed offset for partition {}", tp);
+                    offsets.put(tp, null);
+                }
+            }
+
+            if (unauthorizedTopics != null) {
+                request.future.completeExceptionally(new 
TopicAuthorizationException(unauthorizedTopics));
+            } else if (!unstableTxnOffsetTopicPartitions.isEmpty()) {
+                System.out.println("oshdofjakl");
+                // just retry
+                log.info("The following partitions still have unstable offsets 
" +
+                        "which are not cleared on the broker side: {}" +
+                        ", this could be either " +
+                        "transactional offsets waiting for completion, or " +
+                        "normal offsets waiting for replication after 
appending to local log", unstableTxnOffsetTopicPartitions);

Review Comment:
   Unlike `COORDINATOR_LOAD_IN_PROGRESS` which is a request-level global error 
code, `UNSTABLE_OFFSET_COMMIT` is a partition-level error. Do we always retry 
all the partitions as long as one of them has this error today?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -235,4 +373,105 @@ public void ack(final long currentTimeMs) {
             this.timer.update(currentTimeMs);
         }
     }
+
+    private class FetchCommittedOffsetResponseHandler {
+        private final UnsentOffsetFetchRequestState request;
+
+        private FetchCommittedOffsetResponseHandler(final 
UnsentOffsetFetchRequestState request) {
+            this.request = request;
+        }
+
+        public void onResponse(
+                final long currentTimeMs,
+                final OffsetFetchResponse response) {
+            Errors responseError = 
response.groupLevelError(groupState.groupId);
+            if (responseError != Errors.NONE) {
+                onFailure(currentTimeMs, responseError);
+                return;
+            }
+
+            onSuccess(currentTimeMs, response);
+        }
+        private void onFailure(final long currentTimeMs,
+                               final Errors responseError) {
+            log.debug("Offset fetch failed: {}", responseError.message());
+
+            if (responseError == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
+                retry(currentTimeMs);
+            } else if (responseError == Errors.NOT_COORDINATOR) {
+                // re-discover the coordinator and retry
+                
coordinatorRequestManager.markCoordinatorUnknown(responseError.message(), 
Time.SYSTEM.milliseconds());
+                retry(currentTimeMs);
+            } else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) {
+                // TODO: I'm not sure if we should retry here.  Sounds like we 
should propagate the error to let the
+                //  user to fix the permission
+                
request.future.completeExceptionally(GroupAuthorizationException.forGroupId(groupState.groupId));
+            } else {
+                request.future.completeExceptionally(new 
KafkaException("Unexpected error in fetch offset response: " + 
responseError.message()));
+            }
+            return;
+        }
+
+        private void retry(final long currentTimeMs) {
+            this.request.onFailedAttempt(currentTimeMs);
+            unsentOffsetFetchRequests.enqueue(this.request);
+        }
+
+        private void onSuccess(final long currentTimeMs,
+                               final OffsetFetchResponse response) {
+            Set<String> unauthorizedTopics = null;
+            Map<TopicPartition, OffsetFetchResponse.PartitionData> 
responseData =
+                    response.partitionDataMap(groupState.groupId);
+            Map<TopicPartition, OffsetAndMetadata> offsets = new 
HashMap<>(responseData.size());
+            Set<TopicPartition> unstableTxnOffsetTopicPartitions = new 
HashSet<>();
+            for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> 
entry : responseData.entrySet()) {
+                TopicPartition tp = entry.getKey();
+                OffsetFetchResponse.PartitionData partitionData = 
entry.getValue();
+                if (partitionData.hasError()) {
+                    Errors error = partitionData.error;
+                    log.debug("Failed to fetch offset for partition {}: {}", 
tp, error.message());
+
+                    if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
+                        request.future.completeExceptionally(new 
KafkaException("Topic or Partition " + tp + " does not " +
+                                "exist"));
+                        return;
+                    } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
+                        if (unauthorizedTopics == null) {
+                            unauthorizedTopics = new HashSet<>();
+                        }
+                        unauthorizedTopics.add(tp.topic());
+                    } else if (error == Errors.UNSTABLE_OFFSET_COMMIT) {
+                        System.out.println("asdkljlsadjflksajfdlk");
+                        unstableTxnOffsetTopicPartitions.add(tp);
+                    } else {
+                        request.future.completeExceptionally(new 
KafkaException("Unexpected error in fetch offset " +
+                                "response for partition " + tp + ": " + 
error.message()));
+                        return;
+                    }
+                } else if (partitionData.offset >= 0) {
+                    // record the position with the offset (-1 indicates no 
committed offset to fetch);
+                    // if there's no committed offset, record as null
+                    offsets.put(tp, new 
OffsetAndMetadata(partitionData.offset, partitionData.leaderEpoch, 
partitionData.metadata));
+                } else {
+                    log.info("Found no committed offset for partition {}", tp);
+                    offsets.put(tp, null);
+                }
+            }
+
+            if (unauthorizedTopics != null) {
+                request.future.completeExceptionally(new 
TopicAuthorizationException(unauthorizedTopics));
+            } else if (!unstableTxnOffsetTopicPartitions.isEmpty()) {
+                System.out.println("oshdofjakl");

Review Comment:
   Remove the debug line? :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to