satishd commented on code in PR #14787:
URL: https://github.com/apache/kafka/pull/14787#discussion_r1400289949


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1422,20 +1428,39 @@ RecordBatch findFirstBatch(RemoteLogInputStream 
remoteLogInputStream, long offse
         return nextBatch;
     }
 
-    long findHighestRemoteOffset(TopicIdPartition topicIdPartition, UnifiedLog 
log) throws RemoteStorageException {
-        Optional<Long> offset = Optional.empty();
-
-        Option<LeaderEpochFileCache> maybeLeaderEpochFileCache = 
log.leaderEpochCache();
-        if (maybeLeaderEpochFileCache.isDefined()) {
-            LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get();
-            OptionalInt epoch = cache.latestEpoch();
-            while (!offset.isPresent() && epoch.isPresent()) {
-                offset = 
remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, 
epoch.getAsInt());
-                epoch = cache.previousEpoch(epoch.getAsInt());
+    OffsetAndEpoch findHighestRemoteOffset(TopicIdPartition topicIdPartition, 
UnifiedLog log) throws RemoteStorageException {
+        OffsetAndEpoch offsetAndEpoch = null;
+        Option<LeaderEpochFileCache> leaderEpochCacheOpt = 
log.leaderEpochCache();
+        if (leaderEpochCacheOpt.isDefined()) {
+            LeaderEpochFileCache cache = leaderEpochCacheOpt.get();
+            Optional<EpochEntry> maybeEpochEntry = cache.latestEntry();
+            while (offsetAndEpoch == null && maybeEpochEntry.isPresent()) {
+                int epoch = maybeEpochEntry.get().epoch;
+                Optional<Long> highestRemoteOffsetOpt =
+                        
remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch);
+                if (highestRemoteOffsetOpt.isPresent()) {
+                    Map.Entry<Integer, Long> entry = cache.endOffsetFor(epoch, 
log.logEndOffset());
+                    int requestedEpoch = entry.getKey();
+                    long endOffset = entry.getValue();
+                    long highestRemoteOffset = highestRemoteOffsetOpt.get();
+                    // It is implicit that the (epoch == requestedEpoch) since 
we are traversing the leader-epoch-cache

Review Comment:
   It may not always be the same if truncation occurs in leader-epoch-cache 
because of log truncation for some reason after the latestEntry is accessed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to