kamalcph commented on code in PR #14787: URL: https://github.com/apache/kafka/pull/14787#discussion_r1397045499
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -625,9 +626,10 @@ private void maybeUpdateCopiedOffset(UnifiedLog log) throws RemoteStorageExcepti // of a segment with that epoch copied into remote storage. If it can not find an entry then it checks for the // previous leader epoch till it finds an entry, If there are no entries till the earliest leader epoch in leader // epoch cache then it starts copying the segments from the earliest epoch entry's offset. - copiedOffsetOption = OptionalLong.of(findHighestRemoteOffset(topicIdPartition, log)); + copiedOffsetOption = Optional.of(findHighestRemoteOffset(topicIdPartition, log)); logger.info("Found the highest copiedRemoteOffset: {} for partition: {} after becoming leader, " + "leaderEpoch: {}", copiedOffsetOption, topicIdPartition, leaderEpoch); + copiedOffsetOption.ifPresent(epochAndOffset -> log.updateHighestOffsetInRemoteStorage(epochAndOffset.offset())); Review Comment: After broker restart, if there are no more segments to upload, then the `copiedOffset` might be stale. It's good to update the `highestOffsetInRemoteStorage` in the UnifiedLog once we compute it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org