lianetm commented on code in PR #14346:
URL: https://github.com/apache/kafka/pull/14346#discussion_r1320338879


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java:
##########
@@ -315,6 +319,57 @@ void onFailedRequestForResettingPositions(
             log.error("Discarding error in ListOffsetResponse because another 
error is pending", error);
     }
 
+
+    void onSuccessfulResponseForValidatingPositions(
+            final Map<TopicPartition, SubscriptionState.FetchPosition> 
fetchPositions,
+            final OffsetsForLeaderEpochUtils.OffsetForEpochResult 
offsetsResult) {
+        List<SubscriptionState.LogTruncation> truncations = new ArrayList<>();
+        if (!offsetsResult.partitionsToRetry().isEmpty()) {
+            
subscriptionState.setNextAllowedRetry(offsetsResult.partitionsToRetry(),
+                    time.milliseconds() + retryBackoffMs);
+            metadata.requestUpdate(false);
+        }
+
+        // For each OffsetsForLeader response, check if the end-offset is 
lower than our current offset
+        // for the partition. If so, it means we have experienced log 
truncation and need to reposition
+        // that partition's offset.
+        // In addition, check whether the returned offset and epoch are valid. 
If not, then we should reset
+        // its offset if reset policy is configured, or throw out of range 
exception.
+        offsetsResult.endOffsets().forEach((topicPartition, respEndOffset) -> {
+            SubscriptionState.FetchPosition requestPosition = 
fetchPositions.get(topicPartition);
+            Optional<SubscriptionState.LogTruncation> truncationOpt =
+                    subscriptionState.maybeCompleteValidation(topicPartition, 
requestPosition,
+                            respEndOffset);
+            truncationOpt.ifPresent(truncations::add);
+        });
+
+        if (!truncations.isEmpty()) {
+            
maybeSetOffsetForLeaderException(buildLogTruncationException(truncations));
+        }
+    }
+
+    void onFailedResponseForValidatingPositions(final Map<TopicPartition, 
SubscriptionState.FetchPosition> fetchPositions,
+                                                final RuntimeException error) {
+        subscriptionState.requestFailed(fetchPositions.keySet(), 
time.milliseconds() + retryBackoffMs);
+        metadata.requestUpdate(false);
+
+        if (!(error instanceof RetriableException)) {
+            maybeSetOffsetForLeaderException(error);
+        }
+    }
+
+    private LogTruncationException 
buildLogTruncationException(List<SubscriptionState.LogTruncation> truncations) {
+        Map<TopicPartition, OffsetAndMetadata> divergentOffsets = new 
HashMap<>();
+        Map<TopicPartition, Long> truncatedFetchOffsets = new HashMap<>();
+        for (SubscriptionState.LogTruncation truncation : truncations) {
+            truncation.divergentOffsetOpt.ifPresent(divergentOffset ->
+                    divergentOffsets.put(truncation.topicPartition, 
divergentOffset));
+            truncatedFetchOffsets.put(truncation.topicPartition, 
truncation.fetchPosition.offset);
+        }
+        return new LogTruncationException("Detected truncated partitions: " + 
truncations,

Review Comment:
   Sure, done. I just added a custom message internally to the 
LogTruncationException that will end up in the toString with the base 
implementation. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to