abbccdda commented on a change in pull request #8822: URL: https://github.com/apache/kafka/pull/8822#discussion_r436318626
########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ########## @@ -852,14 +835,28 @@ public void onFailure(RuntimeException e) { subscriptions.requestFailed(fetchPositions.keySet(), time.milliseconds() + retryBackoffMs); metadata.requestUpdate(); - setFatalOffsetForLeaderException(e); + if (!(e instanceof RetriableException)) { Review comment: Why do we move this check out of `setFatalOffsetForLeaderException`? ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ########## @@ -1325,23 +1323,15 @@ private CompletedFetch initializeCompletedFetch(CompletedFetch nextCompletedFetc return completedFetch; } - private void handleOffsetOutOfRange(FetchPosition fetchPosition, - TopicPartition topicPartition, - String reason) { + private void handleOffsetOutOfRange(FetchPosition fetchPosition, TopicPartition topicPartition) { + String errorMessage = "Fetch position " + fetchPosition + " is out of range for partition " + topicPartition; if (subscriptions.hasDefaultOffsetResetPolicy()) { - log.info("Fetch offset epoch {} is out of range for partition {}, resetting offset", - fetchPosition, topicPartition); + log.info("{}, resetting offset", errorMessage); subscriptions.requestOffsetReset(topicPartition); } else { - Map<TopicPartition, Long> offsetOutOfRangePartitions = - Collections.singletonMap(topicPartition, fetchPosition.offset); - String errorMessage = String.format("Offsets out of range " + - "with no configured reset policy for partitions: %s" + - ", for fetch offset: %d, " + - "root cause: %s", - offsetOutOfRangePartitions, fetchPosition.offset, reason); - log.info(errorMessage); - throw new OffsetOutOfRangeException(errorMessage, offsetOutOfRangePartitions); + log.info("{}, raising error to the application", errorMessage); + throw new OffsetOutOfRangeException(errorMessage, Review comment: Should still mention `no configured reset policy` here IMHO. ########## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ########## @@ -3856,12 +3857,16 @@ private void testOffsetValidationWithGivenEpochOffset(final EpochEndOffset epoch assertEquals(initialOffset, subscriptions.position(tp0).offset); if (offsetResetStrategy == OffsetResetStrategy.NONE) { - OffsetOutOfRangeException thrown = - assertThrows(OffsetOutOfRangeException.class, () -> fetcher.validateOffsetsIfNeeded()); + LogTruncationException thrown = + assertThrows(LogTruncationException.class, () -> fetcher.validateOffsetsIfNeeded()); + assertEquals(initialOffset, thrown.offsetOutOfRangePartitions().get(tp0).longValue()); - // If epoch offset is valid, we are testing for the log truncation case. - if (!epochEndOffset.hasUndefinedEpochOrOffset()) { - assertTrue(thrown instanceof LogTruncationException); + if (epochEndOffset.hasUndefinedEpochOrOffset()) { + assertFalse(thrown.divergentOffsets().containsKey(tp0)); + } else { + OffsetAndMetadata expectedDivergentOffset = new OffsetAndMetadata( + epochEndOffset.endOffset(), Optional.of(epochEndOffset.leaderEpoch()), ""); + assertEquals(expectedDivergentOffset, thrown.divergentOffsets().get(tp0)); Review comment: Similar here, compare against an actual map to ensure no unexpected topic partitions exist. ########## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ########## @@ -3856,12 +3857,16 @@ private void testOffsetValidationWithGivenEpochOffset(final EpochEndOffset epoch assertEquals(initialOffset, subscriptions.position(tp0).offset); if (offsetResetStrategy == OffsetResetStrategy.NONE) { - OffsetOutOfRangeException thrown = - assertThrows(OffsetOutOfRangeException.class, () -> fetcher.validateOffsetsIfNeeded()); + LogTruncationException thrown = + assertThrows(LogTruncationException.class, () -> fetcher.validateOffsetsIfNeeded()); + assertEquals(initialOffset, thrown.offsetOutOfRangePartitions().get(tp0).longValue()); Review comment: We could just build the actual singleton mapping to compare with `thrown.offsetOutOfRangePartitions()` ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java ########## @@ -1050,4 +1062,43 @@ public String toString() { '}'; } } + + public static class LogTruncation { + public final TopicPartition topicPartition; + public final FetchPosition fetchPosition; + public final Optional<OffsetAndMetadata> divergentOffsetOpt; + + public LogTruncation(TopicPartition topicPartition, + FetchPosition fetchPosition, + Optional<OffsetAndMetadata> divergentOffsetOpt) { + this.topicPartition = topicPartition; + this.fetchPosition = fetchPosition; Review comment: Should require non-null for `fetchPosition` ########## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java ########## @@ -572,7 +573,7 @@ public void testMaybeCompleteValidationAfterPositionChange() { Optional.of(updateOffsetEpoch), new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(currentEpoch))); state.seekUnvalidated(tp0, updatePosition); - Optional<OffsetAndMetadata> divergentOffsetMetadataOpt = state.maybeCompleteValidation(tp0, initialPosition, + Optional<LogTruncation> divergentOffsetMetadataOpt = state.maybeCompleteValidation(tp0, initialPosition, Review comment: divergentOffsetMetadataOpt -> logTruncationOpt, same for the rest. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java ########## @@ -480,6 +480,17 @@ public synchronized boolean maybeValidatePositionForCurrentLeader(ApiVersions ap log.debug("Skipping completed validation for partition {} since the current position {} " + "no longer matches the position {} when the request was sent", tp, currentPosition, requestPosition); + } else if (epochEndOffset.hasUndefinedEpochOrOffset()) { Review comment: I was thinking where is the best to put the check, since previously it was before `maybeCompleteValidation`. If the partition is not awaiting validation or the returned result doesn't match our current position, should we still trigger undefined epoch offset logic here? ########## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ########## @@ -3856,12 +3857,16 @@ private void testOffsetValidationWithGivenEpochOffset(final EpochEndOffset epoch assertEquals(initialOffset, subscriptions.position(tp0).offset); if (offsetResetStrategy == OffsetResetStrategy.NONE) { - OffsetOutOfRangeException thrown = - assertThrows(OffsetOutOfRangeException.class, () -> fetcher.validateOffsetsIfNeeded()); + LogTruncationException thrown = + assertThrows(LogTruncationException.class, () -> fetcher.validateOffsetsIfNeeded()); + assertEquals(initialOffset, thrown.offsetOutOfRangePartitions().get(tp0).longValue()); - // If epoch offset is valid, we are testing for the log truncation case. - if (!epochEndOffset.hasUndefinedEpochOrOffset()) { - assertTrue(thrown instanceof LogTruncationException); + if (epochEndOffset.hasUndefinedEpochOrOffset()) { + assertFalse(thrown.divergentOffsets().containsKey(tp0)); Review comment: Should we just assert `thrown.divergentOffsets().isEmpty()`? ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ########## @@ -1325,23 +1323,15 @@ private CompletedFetch initializeCompletedFetch(CompletedFetch nextCompletedFetc return completedFetch; } - private void handleOffsetOutOfRange(FetchPosition fetchPosition, - TopicPartition topicPartition, - String reason) { + private void handleOffsetOutOfRange(FetchPosition fetchPosition, TopicPartition topicPartition) { Review comment: I feel we could still share `handleOffsetOutOfRange` in two places by letting it return a struct of `Optional<LogTruncation>` and decide when to throw it by the caller. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org