kamalcph commented on code in PR #14787:
URL: https://github.com/apache/kafka/pull/14787#discussion_r1397045499


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -625,9 +626,10 @@ private void maybeUpdateCopiedOffset(UnifiedLog log) 
throws RemoteStorageExcepti
                 // of a segment with that epoch copied into remote storage. If 
it can not find an entry then it checks for the
                 // previous leader epoch till it finds an entry, If there are 
no entries till the earliest leader epoch in leader
                 // epoch cache then it starts copying the segments from the 
earliest epoch entry's offset.
-                copiedOffsetOption = 
OptionalLong.of(findHighestRemoteOffset(topicIdPartition, log));
+                copiedOffsetOption = 
Optional.of(findHighestRemoteOffset(topicIdPartition, log));
                 logger.info("Found the highest copiedRemoteOffset: {} for 
partition: {} after becoming leader, " +
                                 "leaderEpoch: {}", copiedOffsetOption, 
topicIdPartition, leaderEpoch);
+                copiedOffsetOption.ifPresent(epochAndOffset ->  
log.updateHighestOffsetInRemoteStorage(epochAndOffset.offset()));

Review Comment:
   After broker restart, if there are no more segments to upload, then the 
`copiedOffset` might be stale. It's good to update the 
`highestOffsetInRemoteStorage` in the UnifiedLog once we compute it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to