philipnee commented on code in PR #13380: URL: https://github.com/apache/kafka/pull/13380#discussion_r1139257566
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -235,4 +373,105 @@ public void ack(final long currentTimeMs) { this.timer.update(currentTimeMs); } } + + private class FetchCommittedOffsetResponseHandler { + private final UnsentOffsetFetchRequestState request; + + private FetchCommittedOffsetResponseHandler(final UnsentOffsetFetchRequestState request) { + this.request = request; + } + + public void onResponse( + final long currentTimeMs, + final OffsetFetchResponse response) { + Errors responseError = response.groupLevelError(groupState.groupId); + if (responseError != Errors.NONE) { + onFailure(currentTimeMs, responseError); + return; + } + + onSuccess(currentTimeMs, response); + } + private void onFailure(final long currentTimeMs, + final Errors responseError) { + log.debug("Offset fetch failed: {}", responseError.message()); + + if (responseError == Errors.COORDINATOR_LOAD_IN_PROGRESS) { + retry(currentTimeMs); + } else if (responseError == Errors.NOT_COORDINATOR) { + // re-discover the coordinator and retry + coordinatorRequestManager.markCoordinatorUnknown(responseError.message(), Time.SYSTEM.milliseconds()); + retry(currentTimeMs); + } else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) { + // TODO: I'm not sure if we should retry here. Sounds like we should propagate the error to let the + // user to fix the permission + request.future.completeExceptionally(GroupAuthorizationException.forGroupId(groupState.groupId)); + } else { + request.future.completeExceptionally(new KafkaException("Unexpected error in fetch offset response: " + responseError.message())); + } + return; + } + + private void retry(final long currentTimeMs) { + this.request.onFailedAttempt(currentTimeMs); + unsentOffsetFetchRequests.enqueue(this.request); + } + + private void onSuccess(final long currentTimeMs, + final OffsetFetchResponse response) { + Set<String> unauthorizedTopics = null; + Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = + response.partitionDataMap(groupState.groupId); + Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(responseData.size()); + Set<TopicPartition> unstableTxnOffsetTopicPartitions = new HashSet<>(); + for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : responseData.entrySet()) { + TopicPartition tp = entry.getKey(); + OffsetFetchResponse.PartitionData partitionData = entry.getValue(); + if (partitionData.hasError()) { + Errors error = partitionData.error; + log.debug("Failed to fetch offset for partition {}: {}", tp, error.message()); + + if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { + request.future.completeExceptionally(new KafkaException("Topic or Partition " + tp + " does not " + + "exist")); + return; + } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) { + if (unauthorizedTopics == null) { + unauthorizedTopics = new HashSet<>(); + } + unauthorizedTopics.add(tp.topic()); + } else if (error == Errors.UNSTABLE_OFFSET_COMMIT) { + System.out.println("asdkljlsadjflksajfdlk"); + unstableTxnOffsetTopicPartitions.add(tp); + } else { + request.future.completeExceptionally(new KafkaException("Unexpected error in fetch offset " + + "response for partition " + tp + ": " + error.message())); + return; + } + } else if (partitionData.offset >= 0) { + // record the position with the offset (-1 indicates no committed offset to fetch); + // if there's no committed offset, record as null + offsets.put(tp, new OffsetAndMetadata(partitionData.offset, partitionData.leaderEpoch, partitionData.metadata)); + } else { + log.info("Found no committed offset for partition {}", tp); + offsets.put(tp, null); + } + } + + if (unauthorizedTopics != null) { + request.future.completeExceptionally(new TopicAuthorizationException(unauthorizedTopics)); + } else if (!unstableTxnOffsetTopicPartitions.isEmpty()) { + System.out.println("oshdofjakl"); Review Comment: Oh god, my secret debug weapon is revealed accidentally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org