satishd commented on code in PR #14787:
URL: https://github.com/apache/kafka/pull/14787#discussion_r1400289949


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1422,20 +1428,39 @@ RecordBatch findFirstBatch(RemoteLogInputStream 
remoteLogInputStream, long offse
         return nextBatch;
     }
 
-    long findHighestRemoteOffset(TopicIdPartition topicIdPartition, UnifiedLog 
log) throws RemoteStorageException {
-        Optional<Long> offset = Optional.empty();
-
-        Option<LeaderEpochFileCache> maybeLeaderEpochFileCache = 
log.leaderEpochCache();
-        if (maybeLeaderEpochFileCache.isDefined()) {
-            LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get();
-            OptionalInt epoch = cache.latestEpoch();
-            while (!offset.isPresent() && epoch.isPresent()) {
-                offset = 
remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, 
epoch.getAsInt());
-                epoch = cache.previousEpoch(epoch.getAsInt());
+    OffsetAndEpoch findHighestRemoteOffset(TopicIdPartition topicIdPartition, 
UnifiedLog log) throws RemoteStorageException {
+        OffsetAndEpoch offsetAndEpoch = null;
+        Option<LeaderEpochFileCache> leaderEpochCacheOpt = 
log.leaderEpochCache();
+        if (leaderEpochCacheOpt.isDefined()) {
+            LeaderEpochFileCache cache = leaderEpochCacheOpt.get();
+            Optional<EpochEntry> maybeEpochEntry = cache.latestEntry();
+            while (offsetAndEpoch == null && maybeEpochEntry.isPresent()) {
+                int epoch = maybeEpochEntry.get().epoch;
+                Optional<Long> highestRemoteOffsetOpt =
+                        
remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch);
+                if (highestRemoteOffsetOpt.isPresent()) {
+                    Map.Entry<Integer, Long> entry = cache.endOffsetFor(epoch, 
log.logEndOffset());
+                    int requestedEpoch = entry.getKey();
+                    long endOffset = entry.getValue();
+                    long highestRemoteOffset = highestRemoteOffsetOpt.get();
+                    // It is implicit that the (epoch == requestedEpoch) since 
we are traversing the leader-epoch-cache

Review Comment:
   It may not always be the same if truncation occurs in leader-epoch-cache 
because of log truncation for some reason. 



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1422,20 +1428,39 @@ RecordBatch findFirstBatch(RemoteLogInputStream 
remoteLogInputStream, long offse
         return nextBatch;
     }
 
-    long findHighestRemoteOffset(TopicIdPartition topicIdPartition, UnifiedLog 
log) throws RemoteStorageException {
-        Optional<Long> offset = Optional.empty();
-
-        Option<LeaderEpochFileCache> maybeLeaderEpochFileCache = 
log.leaderEpochCache();
-        if (maybeLeaderEpochFileCache.isDefined()) {
-            LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get();
-            OptionalInt epoch = cache.latestEpoch();
-            while (!offset.isPresent() && epoch.isPresent()) {
-                offset = 
remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, 
epoch.getAsInt());
-                epoch = cache.previousEpoch(epoch.getAsInt());
+    OffsetAndEpoch findHighestRemoteOffset(TopicIdPartition topicIdPartition, 
UnifiedLog log) throws RemoteStorageException {
+        OffsetAndEpoch offsetAndEpoch = null;
+        Option<LeaderEpochFileCache> leaderEpochCacheOpt = 
log.leaderEpochCache();
+        if (leaderEpochCacheOpt.isDefined()) {
+            LeaderEpochFileCache cache = leaderEpochCacheOpt.get();
+            Optional<EpochEntry> maybeEpochEntry = cache.latestEntry();
+            while (offsetAndEpoch == null && maybeEpochEntry.isPresent()) {
+                int epoch = maybeEpochEntry.get().epoch;
+                Optional<Long> highestRemoteOffsetOpt =
+                        
remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch);
+                if (highestRemoteOffsetOpt.isPresent()) {
+                    Map.Entry<Integer, Long> entry = cache.endOffsetFor(epoch, 
log.logEndOffset());
+                    int requestedEpoch = entry.getKey();
+                    long endOffset = entry.getValue();
+                    long highestRemoteOffset = highestRemoteOffsetOpt.get();
+                    // It is implicit that the (epoch == requestedEpoch) since 
we are traversing the leader-epoch-cache
+                    // in descending order.
+                    if (endOffset <= highestRemoteOffset) {
+                        LOGGER.warn("The end-offset for epoch {}: ({}, {}) is 
less than or equal to the " +

Review Comment:
   I do not think it is a warn message here as unclean leader election can 
happen based on the topic configuration. We can leave it as INFO level.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to