abbccdda commented on a change in pull request #8822:
URL: https://github.com/apache/kafka/pull/8822#discussion_r436318626



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -852,14 +835,28 @@ public void onFailure(RuntimeException e) {
                     subscriptions.requestFailed(fetchPositions.keySet(), 
time.milliseconds() + retryBackoffMs);
                     metadata.requestUpdate();
 
-                    setFatalOffsetForLeaderException(e);
+                    if (!(e instanceof RetriableException)) {

Review comment:
       Why do we move this check out of `setFatalOffsetForLeaderException`?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -1325,23 +1323,15 @@ private CompletedFetch 
initializeCompletedFetch(CompletedFetch nextCompletedFetc
         return completedFetch;
     }
 
-    private void handleOffsetOutOfRange(FetchPosition fetchPosition,
-                                        TopicPartition topicPartition,
-                                        String reason) {
+    private void handleOffsetOutOfRange(FetchPosition fetchPosition, 
TopicPartition topicPartition) {
+        String errorMessage = "Fetch position " + fetchPosition + " is out of 
range for partition " + topicPartition;
         if (subscriptions.hasDefaultOffsetResetPolicy()) {
-            log.info("Fetch offset epoch {} is out of range for partition {}, 
resetting offset",
-                fetchPosition, topicPartition);
+            log.info("{}, resetting offset", errorMessage);
             subscriptions.requestOffsetReset(topicPartition);
         } else {
-            Map<TopicPartition, Long> offsetOutOfRangePartitions =
-                Collections.singletonMap(topicPartition, fetchPosition.offset);
-            String errorMessage = String.format("Offsets out of range " +
-                "with no configured reset policy for partitions: %s" +
-                ", for fetch offset: %d, " +
-                "root cause: %s",
-                offsetOutOfRangePartitions, fetchPosition.offset, reason);
-            log.info(errorMessage);
-            throw new OffsetOutOfRangeException(errorMessage, 
offsetOutOfRangePartitions);
+            log.info("{}, raising error to the application", errorMessage);
+            throw new OffsetOutOfRangeException(errorMessage,

Review comment:
       Should still mention `no configured reset policy` here IMHO.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
##########
@@ -3856,12 +3857,16 @@ private void 
testOffsetValidationWithGivenEpochOffset(final EpochEndOffset epoch
         assertEquals(initialOffset, subscriptions.position(tp0).offset);
 
         if (offsetResetStrategy == OffsetResetStrategy.NONE) {
-            OffsetOutOfRangeException thrown =
-                assertThrows(OffsetOutOfRangeException.class, () -> 
fetcher.validateOffsetsIfNeeded());
+            LogTruncationException thrown =
+                assertThrows(LogTruncationException.class, () -> 
fetcher.validateOffsetsIfNeeded());
+            assertEquals(initialOffset, 
thrown.offsetOutOfRangePartitions().get(tp0).longValue());
 
-            // If epoch offset is valid, we are testing for the log truncation 
case.
-            if (!epochEndOffset.hasUndefinedEpochOrOffset()) {
-                assertTrue(thrown instanceof LogTruncationException);
+            if (epochEndOffset.hasUndefinedEpochOrOffset()) {
+                assertFalse(thrown.divergentOffsets().containsKey(tp0));
+            } else {
+                OffsetAndMetadata expectedDivergentOffset = new 
OffsetAndMetadata(
+                    epochEndOffset.endOffset(), 
Optional.of(epochEndOffset.leaderEpoch()), "");
+                assertEquals(expectedDivergentOffset, 
thrown.divergentOffsets().get(tp0));

Review comment:
       Similar here, compare against an actual map to ensure no unexpected 
topic partitions exist.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
##########
@@ -3856,12 +3857,16 @@ private void 
testOffsetValidationWithGivenEpochOffset(final EpochEndOffset epoch
         assertEquals(initialOffset, subscriptions.position(tp0).offset);
 
         if (offsetResetStrategy == OffsetResetStrategy.NONE) {
-            OffsetOutOfRangeException thrown =
-                assertThrows(OffsetOutOfRangeException.class, () -> 
fetcher.validateOffsetsIfNeeded());
+            LogTruncationException thrown =
+                assertThrows(LogTruncationException.class, () -> 
fetcher.validateOffsetsIfNeeded());
+            assertEquals(initialOffset, 
thrown.offsetOutOfRangePartitions().get(tp0).longValue());

Review comment:
       We could just build the actual singleton mapping to compare with 
`thrown.offsetOutOfRangePartitions()`

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -1050,4 +1062,43 @@ public String toString() {
                     '}';
         }
     }
+
+    public static class LogTruncation {
+        public final TopicPartition topicPartition;
+        public final FetchPosition fetchPosition;
+        public final Optional<OffsetAndMetadata> divergentOffsetOpt;
+
+        public LogTruncation(TopicPartition topicPartition,
+                             FetchPosition fetchPosition,
+                             Optional<OffsetAndMetadata> divergentOffsetOpt) {
+            this.topicPartition = topicPartition;
+            this.fetchPosition = fetchPosition;

Review comment:
       Should require non-null for `fetchPosition`

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
##########
@@ -572,7 +573,7 @@ public void 
testMaybeCompleteValidationAfterPositionChange() {
                 Optional.of(updateOffsetEpoch), new 
Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(currentEpoch)));
         state.seekUnvalidated(tp0, updatePosition);
 
-        Optional<OffsetAndMetadata> divergentOffsetMetadataOpt = 
state.maybeCompleteValidation(tp0, initialPosition,
+        Optional<LogTruncation> divergentOffsetMetadataOpt = 
state.maybeCompleteValidation(tp0, initialPosition,

Review comment:
       divergentOffsetMetadataOpt -> logTruncationOpt, same for the rest.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -480,6 +480,17 @@ public synchronized boolean 
maybeValidatePositionForCurrentLeader(ApiVersions ap
                 log.debug("Skipping completed validation for partition {} 
since the current position {} " +
                                 "no longer matches the position {} when the 
request was sent",
                         tp, currentPosition, requestPosition);
+            } else if (epochEndOffset.hasUndefinedEpochOrOffset()) {

Review comment:
       I was thinking where is the best to put the check, since previously it 
was before `maybeCompleteValidation`. If the partition is not awaiting 
validation or the returned result doesn't match our current position, should we 
still trigger undefined epoch offset logic here?

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
##########
@@ -3856,12 +3857,16 @@ private void 
testOffsetValidationWithGivenEpochOffset(final EpochEndOffset epoch
         assertEquals(initialOffset, subscriptions.position(tp0).offset);
 
         if (offsetResetStrategy == OffsetResetStrategy.NONE) {
-            OffsetOutOfRangeException thrown =
-                assertThrows(OffsetOutOfRangeException.class, () -> 
fetcher.validateOffsetsIfNeeded());
+            LogTruncationException thrown =
+                assertThrows(LogTruncationException.class, () -> 
fetcher.validateOffsetsIfNeeded());
+            assertEquals(initialOffset, 
thrown.offsetOutOfRangePartitions().get(tp0).longValue());
 
-            // If epoch offset is valid, we are testing for the log truncation 
case.
-            if (!epochEndOffset.hasUndefinedEpochOrOffset()) {
-                assertTrue(thrown instanceof LogTruncationException);
+            if (epochEndOffset.hasUndefinedEpochOrOffset()) {
+                assertFalse(thrown.divergentOffsets().containsKey(tp0));

Review comment:
       Should we just assert `thrown.divergentOffsets().isEmpty()`?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -1325,23 +1323,15 @@ private CompletedFetch 
initializeCompletedFetch(CompletedFetch nextCompletedFetc
         return completedFetch;
     }
 
-    private void handleOffsetOutOfRange(FetchPosition fetchPosition,
-                                        TopicPartition topicPartition,
-                                        String reason) {
+    private void handleOffsetOutOfRange(FetchPosition fetchPosition, 
TopicPartition topicPartition) {

Review comment:
       I feel we could still share `handleOffsetOutOfRange` in two places by 
letting it return a struct of `Optional<LogTruncation>` and decide when to 
throw it by the caller.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to