lianetm commented on code in PR #14346:
URL: https://github.com/apache/kafka/pull/14346#discussion_r1320309330


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java:
##########
@@ -315,6 +319,57 @@ void onFailedRequestForResettingPositions(
             log.error("Discarding error in ListOffsetResponse because another 
error is pending", error);
     }
 
+
+    void onSuccessfulResponseForValidatingPositions(
+            final Map<TopicPartition, SubscriptionState.FetchPosition> 
fetchPositions,
+            final OffsetsForLeaderEpochUtils.OffsetForEpochResult 
offsetsResult) {
+        List<SubscriptionState.LogTruncation> truncations = new ArrayList<>();
+        if (!offsetsResult.partitionsToRetry().isEmpty()) {
+            
subscriptionState.setNextAllowedRetry(offsetsResult.partitionsToRetry(),
+                    time.milliseconds() + retryBackoffMs);
+            metadata.requestUpdate(false);
+        }
+
+        // For each OffsetsForLeader response, check if the end-offset is 
lower than our current offset
+        // for the partition. If so, it means we have experienced log 
truncation and need to reposition
+        // that partition's offset.
+        // In addition, check whether the returned offset and epoch are valid. 
If not, then we should reset
+        // its offset if reset policy is configured, or throw out of range 
exception.
+        offsetsResult.endOffsets().forEach((topicPartition, respEndOffset) -> {
+            SubscriptionState.FetchPosition requestPosition = 
fetchPositions.get(topicPartition);
+            Optional<SubscriptionState.LogTruncation> truncationOpt =
+                    subscriptionState.maybeCompleteValidation(topicPartition, 
requestPosition,
+                            respEndOffset);
+            truncationOpt.ifPresent(truncations::add);
+        });
+
+        if (!truncations.isEmpty()) {
+            
maybeSetOffsetForLeaderException(buildLogTruncationException(truncations));

Review Comment:
   Agree. This is handling validate positions exceptions, so renamed vars and 
funcs accordingly, also for the reset one, to make it consistent, and added 
comments to clarify what could lead to each.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to