satishd commented on code in PR #14787: URL: https://github.com/apache/kafka/pull/14787#discussion_r1400289949
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -1422,20 +1428,39 @@ RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offse return nextBatch; } - long findHighestRemoteOffset(TopicIdPartition topicIdPartition, UnifiedLog log) throws RemoteStorageException { - Optional<Long> offset = Optional.empty(); - - Option<LeaderEpochFileCache> maybeLeaderEpochFileCache = log.leaderEpochCache(); - if (maybeLeaderEpochFileCache.isDefined()) { - LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get(); - OptionalInt epoch = cache.latestEpoch(); - while (!offset.isPresent() && epoch.isPresent()) { - offset = remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch.getAsInt()); - epoch = cache.previousEpoch(epoch.getAsInt()); + OffsetAndEpoch findHighestRemoteOffset(TopicIdPartition topicIdPartition, UnifiedLog log) throws RemoteStorageException { + OffsetAndEpoch offsetAndEpoch = null; + Option<LeaderEpochFileCache> leaderEpochCacheOpt = log.leaderEpochCache(); + if (leaderEpochCacheOpt.isDefined()) { + LeaderEpochFileCache cache = leaderEpochCacheOpt.get(); + Optional<EpochEntry> maybeEpochEntry = cache.latestEntry(); + while (offsetAndEpoch == null && maybeEpochEntry.isPresent()) { + int epoch = maybeEpochEntry.get().epoch; + Optional<Long> highestRemoteOffsetOpt = + remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch); + if (highestRemoteOffsetOpt.isPresent()) { + Map.Entry<Integer, Long> entry = cache.endOffsetFor(epoch, log.logEndOffset()); + int requestedEpoch = entry.getKey(); + long endOffset = entry.getValue(); + long highestRemoteOffset = highestRemoteOffsetOpt.get(); + // It is implicit that the (epoch == requestedEpoch) since we are traversing the leader-epoch-cache Review Comment: It may not always be the same if truncation occurs in leader-epoch-cache because of log truncation for some reason after the latestEntry is accessed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org