junrao commented on code in PR #13898:
URL: https://github.com/apache/kafka/pull/13898#discussion_r1245887623


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java:
##########
@@ -117,11 +104,6 @@ else if (timestamp == ListOffsetsRequest.LATEST_TIMESTAMP)
      *                                                                         
and one or more partitions aren't awaiting a seekToBeginning() or seekToEnd().
      */
     public void resetPositionsIfNeeded() {
-        // Raise exception from previous offset fetch if there is one
-        RuntimeException exception = 
cachedListOffsetsException.getAndSet(null);

Review Comment:
   his seems a change in existing logic and not just a refactoring. Is this 
change expected?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java:
##########
@@ -198,6 +208,11 @@ void validatePositionsOnMetadataChange() {
     }
 
     Map<TopicPartition, Long> getOffsetResetTimestamp() {
+        // Raise exception from previous offset fetch if there is one
+        RuntimeException exception = 
cachedListOffsetsException.getAndSet(null);

Review Comment:
   This seems a change in existing logic and not just a refactoring. Is this 
change expected?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java:
##########
@@ -261,6 +276,73 @@ void updateSubscriptionState(Map<TopicPartition, 
OffsetFetcherUtils.ListOffsetDa
         }
     }
 
+    OffsetResetStrategy timestampToOffsetResetStrategy(long timestamp) {

Review Comment:
   Should this be static? Ditto for a few other methods moved into this class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to