junrao commented on code in PR #13898: URL: https://github.com/apache/kafka/pull/13898#discussion_r1245887623
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java: ########## @@ -117,11 +104,6 @@ else if (timestamp == ListOffsetsRequest.LATEST_TIMESTAMP) * and one or more partitions aren't awaiting a seekToBeginning() or seekToEnd(). */ public void resetPositionsIfNeeded() { - // Raise exception from previous offset fetch if there is one - RuntimeException exception = cachedListOffsetsException.getAndSet(null); Review Comment: his seems a change in existing logic and not just a refactoring. Is this change expected? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java: ########## @@ -198,6 +208,11 @@ void validatePositionsOnMetadataChange() { } Map<TopicPartition, Long> getOffsetResetTimestamp() { + // Raise exception from previous offset fetch if there is one + RuntimeException exception = cachedListOffsetsException.getAndSet(null); Review Comment: This seems a change in existing logic and not just a refactoring. Is this change expected? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java: ########## @@ -261,6 +276,73 @@ void updateSubscriptionState(Map<TopicPartition, OffsetFetcherUtils.ListOffsetDa } } + OffsetResetStrategy timestampToOffsetResetStrategy(long timestamp) { Review Comment: Should this be static? Ditto for a few other methods moved into this class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org