kamalcph commented on code in PR #15817:
URL: https://github.com/apache/kafka/pull/15817#discussion_r1588784969


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1217,26 +1217,41 @@ public String toString() {
      * @return true if the remote segment's epoch/offsets are within the 
leader epoch lineage of the partition.
      */
     // Visible for testing
-    public static boolean 
isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata segmentMetadata,
-                                                            long logEndOffset,
-                                                            
NavigableMap<Integer, Long> leaderEpochs) {
+    static boolean isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata 
segmentMetadata,
+                                                     long logEndOffset,
+                                                     NavigableMap<Integer, 
Long> leaderEpochs) {
         long segmentEndOffset = segmentMetadata.endOffset();
         // Filter epochs that does not have any messages/records associated 
with them.
         NavigableMap<Integer, Long> segmentLeaderEpochs = 
buildFilteredLeaderEpochMap(segmentMetadata.segmentLeaderEpochs());
         // Check for out of bound epochs between segment epochs and current 
leader epochs.
-        Integer segmentFirstEpoch = segmentLeaderEpochs.firstKey();
         Integer segmentLastEpoch = segmentLeaderEpochs.lastKey();
-        if (segmentFirstEpoch < leaderEpochs.firstKey() || segmentLastEpoch > 
leaderEpochs.lastKey()) {
+        if (segmentLastEpoch < leaderEpochs.firstKey() || segmentLastEpoch > 
leaderEpochs.lastKey()) {
+            LOGGER.debug("Segment {} is not within the partition leader epoch 
lineage. " +
+                            "Remote segment epochs: {} and partition leader 
epochs: {}",
+                    segmentMetadata.remoteLogSegmentId(), segmentLeaderEpochs, 
leaderEpochs);
+            return false;
+        }
+        // There can be overlapping remote log segments in the remote storage. 
(eg)
+        // leader-epoch-file-cache: {(5, 10), (7, 15), (9, 100)}
+        // segment1: offset-range = 5-50, Broker = 0, epochs = {(5, 10), (7, 
15)}
+        // segment2: offset-range = 14-150, Broker = 1, epochs = {(5, 14), (7, 
15), (9, 100)}, after leader-election.

Review Comment:
   yes, you're right. The `findHighestRemoteOffset` will return the 
`copiedOffset` for the new leader. 
   
   The case mentioned can happen as the logic to rotate the segments is 
independent of brokers in the cluster. 
   
   Broker-0 can have local-log segments like:
      segment0 = 0-50
      segment1 = 51-100
      active-segment = 101-120
    
    and it uploaded both segment0 and segment1 to remote storage.
    
    Broker1 can have local-log segments like:
      segment0 = 0 - 80
      active-segment = 81-120
      
   When Broker1 becomes leader, it finds the highest remote-offset as 100, then 
it has to upload the current active segment segment once it gets rotated. 
Thereby, we will have overlapping remote log segments.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to