kamalcph commented on code in PR #14004:
URL: https://github.com/apache/kafka/pull/14004#discussion_r1268399339


##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java:
##########
@@ -100,17 +100,29 @@ void 
handleSegmentWithCopySegmentStartedState(RemoteLogSegmentId remoteLogSegmen
 
     void handleSegmentWithCopySegmentFinishedState(Long startOffset, 
RemoteLogSegmentId remoteLogSegmentId,
                                                    Long leaderEpochEndOffset) {
+        // If there are duplicate segments uploaded due to leader-election, 
then mark them as unreferenced.
+        // Duplicate segments can be uploaded when the previous leader had 
tier-lags and the next leader uploads the
+        // segment for the same leader-epoch which is a super-set of 
previously uploaded segments.
+        // (eg)
+        // case-1: Duplicate segment
+        //      L0 uploaded segment S0 with offsets 0-100 and L1 uploaded 
segment S1 with offsets 0-200.
+        //      We will mark the segment S0 as duplicate and add it to 
unreferencedSegmentIds.
+        // case-2: Overlapping segments
+        //      L0 uploaded segment S0 with offsets 10-90 and L1 uploaded 
segment S1 with offsets 5-100, S2-101-200,
+        //      and so on. When the consumer request for segment with offset 
95, it should get the segment S1 and not S0.
+        Map.Entry<Long, RemoteLogSegmentId> lastEntry = offsetToId.lastEntry();
+        while (lastEntry != null && lastEntry.getKey() >= startOffset && 
highestLogOffset <= leaderEpochEndOffset) {

Review Comment:
   @satishd 
   The logic to find the `copiedOffset`  from remoteStorage doesn't take in 
account of the current-leader-epoch checkpoint file. In the above test, when B2 
becomes leader, it's leader-epoch-checkpoint file will look like:
   
   (The case mentioned in the test can happen when acks is set to 1)
   
   ```
   0
   2
   0 0
   1 151
   ```
   
   The logic to find the copied offset traverses the checkpoint file from 
latest-epoch. So, when B2 tries to find the copied offset:
   
   For epoch(1), there won't any uploaded segments, so it returns empty.
   For epoch(0), the highest copied offset will be 200
   
   So, B2 will skip the segment S2 (101-190) which means there is a data loss 
from [151-190]
   
   @Nickstery This can be fixed if we update the  logic to find the 
`copiedOffset`:
   ```
   find-highest-remote-offset = min(end-offset-for-epoch-in-the-checkpoint, 
highest-remote-offset-for-epoch)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to