lianetm commented on code in PR #14346: URL: https://github.com/apache/kafka/pull/14346#discussion_r1320309330
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java: ########## @@ -315,6 +319,57 @@ void onFailedRequestForResettingPositions( log.error("Discarding error in ListOffsetResponse because another error is pending", error); } + + void onSuccessfulResponseForValidatingPositions( + final Map<TopicPartition, SubscriptionState.FetchPosition> fetchPositions, + final OffsetsForLeaderEpochUtils.OffsetForEpochResult offsetsResult) { + List<SubscriptionState.LogTruncation> truncations = new ArrayList<>(); + if (!offsetsResult.partitionsToRetry().isEmpty()) { + subscriptionState.setNextAllowedRetry(offsetsResult.partitionsToRetry(), + time.milliseconds() + retryBackoffMs); + metadata.requestUpdate(false); + } + + // For each OffsetsForLeader response, check if the end-offset is lower than our current offset + // for the partition. If so, it means we have experienced log truncation and need to reposition + // that partition's offset. + // In addition, check whether the returned offset and epoch are valid. If not, then we should reset + // its offset if reset policy is configured, or throw out of range exception. + offsetsResult.endOffsets().forEach((topicPartition, respEndOffset) -> { + SubscriptionState.FetchPosition requestPosition = fetchPositions.get(topicPartition); + Optional<SubscriptionState.LogTruncation> truncationOpt = + subscriptionState.maybeCompleteValidation(topicPartition, requestPosition, + respEndOffset); + truncationOpt.ifPresent(truncations::add); + }); + + if (!truncations.isEmpty()) { + maybeSetOffsetForLeaderException(buildLogTruncationException(truncations)); Review Comment: Agree. This is handling validate positions exceptions, so renamed vars and funcs accordingly, also for the reset one, to make it consistent, and added comments to clarify what could lead to each. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org