satishd commented on code in PR #14787: URL: https://github.com/apache/kafka/pull/14787#discussion_r1400289949
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -1422,20 +1428,39 @@ RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offse return nextBatch; } - long findHighestRemoteOffset(TopicIdPartition topicIdPartition, UnifiedLog log) throws RemoteStorageException { - Optional<Long> offset = Optional.empty(); - - Option<LeaderEpochFileCache> maybeLeaderEpochFileCache = log.leaderEpochCache(); - if (maybeLeaderEpochFileCache.isDefined()) { - LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get(); - OptionalInt epoch = cache.latestEpoch(); - while (!offset.isPresent() && epoch.isPresent()) { - offset = remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch.getAsInt()); - epoch = cache.previousEpoch(epoch.getAsInt()); + OffsetAndEpoch findHighestRemoteOffset(TopicIdPartition topicIdPartition, UnifiedLog log) throws RemoteStorageException { + OffsetAndEpoch offsetAndEpoch = null; + Option<LeaderEpochFileCache> leaderEpochCacheOpt = log.leaderEpochCache(); + if (leaderEpochCacheOpt.isDefined()) { + LeaderEpochFileCache cache = leaderEpochCacheOpt.get(); + Optional<EpochEntry> maybeEpochEntry = cache.latestEntry(); + while (offsetAndEpoch == null && maybeEpochEntry.isPresent()) { + int epoch = maybeEpochEntry.get().epoch; + Optional<Long> highestRemoteOffsetOpt = + remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch); + if (highestRemoteOffsetOpt.isPresent()) { + Map.Entry<Integer, Long> entry = cache.endOffsetFor(epoch, log.logEndOffset()); + int requestedEpoch = entry.getKey(); + long endOffset = entry.getValue(); + long highestRemoteOffset = highestRemoteOffsetOpt.get(); + // It is implicit that the (epoch == requestedEpoch) since we are traversing the leader-epoch-cache Review Comment: It may not always be the same if truncation occurs in leader-epoch-cache because of log truncation for some reason. ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -1422,20 +1428,39 @@ RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offse return nextBatch; } - long findHighestRemoteOffset(TopicIdPartition topicIdPartition, UnifiedLog log) throws RemoteStorageException { - Optional<Long> offset = Optional.empty(); - - Option<LeaderEpochFileCache> maybeLeaderEpochFileCache = log.leaderEpochCache(); - if (maybeLeaderEpochFileCache.isDefined()) { - LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get(); - OptionalInt epoch = cache.latestEpoch(); - while (!offset.isPresent() && epoch.isPresent()) { - offset = remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch.getAsInt()); - epoch = cache.previousEpoch(epoch.getAsInt()); + OffsetAndEpoch findHighestRemoteOffset(TopicIdPartition topicIdPartition, UnifiedLog log) throws RemoteStorageException { + OffsetAndEpoch offsetAndEpoch = null; + Option<LeaderEpochFileCache> leaderEpochCacheOpt = log.leaderEpochCache(); + if (leaderEpochCacheOpt.isDefined()) { + LeaderEpochFileCache cache = leaderEpochCacheOpt.get(); + Optional<EpochEntry> maybeEpochEntry = cache.latestEntry(); + while (offsetAndEpoch == null && maybeEpochEntry.isPresent()) { + int epoch = maybeEpochEntry.get().epoch; + Optional<Long> highestRemoteOffsetOpt = + remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch); + if (highestRemoteOffsetOpt.isPresent()) { + Map.Entry<Integer, Long> entry = cache.endOffsetFor(epoch, log.logEndOffset()); + int requestedEpoch = entry.getKey(); + long endOffset = entry.getValue(); + long highestRemoteOffset = highestRemoteOffsetOpt.get(); + // It is implicit that the (epoch == requestedEpoch) since we are traversing the leader-epoch-cache + // in descending order. + if (endOffset <= highestRemoteOffset) { + LOGGER.warn("The end-offset for epoch {}: ({}, {}) is less than or equal to the " + Review Comment: I do not think it is a warn message here as unclean leader election can happen based on the topic configuration. We can leave it as INFO level. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org