lianetm commented on code in PR #14346: URL: https://github.com/apache/kafka/pull/14346#discussion_r1320338879
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java: ########## @@ -315,6 +319,57 @@ void onFailedRequestForResettingPositions( log.error("Discarding error in ListOffsetResponse because another error is pending", error); } + + void onSuccessfulResponseForValidatingPositions( + final Map<TopicPartition, SubscriptionState.FetchPosition> fetchPositions, + final OffsetsForLeaderEpochUtils.OffsetForEpochResult offsetsResult) { + List<SubscriptionState.LogTruncation> truncations = new ArrayList<>(); + if (!offsetsResult.partitionsToRetry().isEmpty()) { + subscriptionState.setNextAllowedRetry(offsetsResult.partitionsToRetry(), + time.milliseconds() + retryBackoffMs); + metadata.requestUpdate(false); + } + + // For each OffsetsForLeader response, check if the end-offset is lower than our current offset + // for the partition. If so, it means we have experienced log truncation and need to reposition + // that partition's offset. + // In addition, check whether the returned offset and epoch are valid. If not, then we should reset + // its offset if reset policy is configured, or throw out of range exception. + offsetsResult.endOffsets().forEach((topicPartition, respEndOffset) -> { + SubscriptionState.FetchPosition requestPosition = fetchPositions.get(topicPartition); + Optional<SubscriptionState.LogTruncation> truncationOpt = + subscriptionState.maybeCompleteValidation(topicPartition, requestPosition, + respEndOffset); + truncationOpt.ifPresent(truncations::add); + }); + + if (!truncations.isEmpty()) { + maybeSetOffsetForLeaderException(buildLogTruncationException(truncations)); + } + } + + void onFailedResponseForValidatingPositions(final Map<TopicPartition, SubscriptionState.FetchPosition> fetchPositions, + final RuntimeException error) { + subscriptionState.requestFailed(fetchPositions.keySet(), time.milliseconds() + retryBackoffMs); + metadata.requestUpdate(false); + + if (!(error instanceof RetriableException)) { + maybeSetOffsetForLeaderException(error); + } + } + + private LogTruncationException buildLogTruncationException(List<SubscriptionState.LogTruncation> truncations) { + Map<TopicPartition, OffsetAndMetadata> divergentOffsets = new HashMap<>(); + Map<TopicPartition, Long> truncatedFetchOffsets = new HashMap<>(); + for (SubscriptionState.LogTruncation truncation : truncations) { + truncation.divergentOffsetOpt.ifPresent(divergentOffset -> + divergentOffsets.put(truncation.topicPartition, divergentOffset)); + truncatedFetchOffsets.put(truncation.topicPartition, truncation.fetchPosition.offset); + } + return new LogTruncationException("Detected truncated partitions: " + truncations, Review Comment: Sure, done. I just added a custom message internally to the LogTruncationException that will end up in the toString with the base implementation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org