Nickstery commented on code in PR #14004:
URL: https://github.com/apache/kafka/pull/14004#discussion_r1266864272


##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java:
##########
@@ -100,17 +100,29 @@ void 
handleSegmentWithCopySegmentStartedState(RemoteLogSegmentId remoteLogSegmen
 
     void handleSegmentWithCopySegmentFinishedState(Long startOffset, 
RemoteLogSegmentId remoteLogSegmentId,
                                                    Long leaderEpochEndOffset) {
+        // If there are duplicate segments uploaded due to leader-election, 
then mark them as unreferenced.
+        // Duplicate segments can be uploaded when the previous leader had 
tier-lags and the next leader uploads the
+        // segment for the same leader-epoch which is a super-set of 
previously uploaded segments.
+        // (eg)
+        // case-1: Duplicate segment
+        //      L0 uploaded segment S0 with offsets 0-100 and L1 uploaded 
segment S1 with offsets 0-200.
+        //      We will mark the segment S0 as duplicate and add it to 
unreferencedSegmentIds.
+        // case-2: Overlapping segments
+        //      L0 uploaded segment S0 with offsets 10-90 and L1 uploaded 
segment S1 with offsets 5-100, S2-101-200,
+        //      and so on. When the consumer request for segment with offset 
95, it should get the segment S1 and not S0.
+        Map.Entry<Long, RemoteLogSegmentId> lastEntry = offsetToId.lastEntry();
+        while (lastEntry != null && lastEntry.getKey() >= startOffset && 
highestLogOffset <= leaderEpochEndOffset) {

Review Comment:
   Hi, I have two questions here:
   1) In the while loop we are checking `highestLogOffset <= 
leaderEpochEndOffset` but we do not change them, Do we really need it inside 
while, or we can wrap while in that if check?
   2) Why do we care about `highestLogOffset`? 
   Input: We have 1 topic with RF=2 and min.insync.replica=1. Two brokers per 
cluster.
   Imagine situation when Broker-1 and Broker-2 have segment each with 10 
synced messages, so they both have segment [start offset = 1 and end offset = 
11]. New segment rolled, Broker-1 got 100 messages, Broker-2 replicated 80 of 
then become offline [non tiered topic existed on cluster and occupied 100% of 
storage]. Broker-1 sends segment to remote storage. At that point Remote 
storage has 2 segments [startOffset=1, endOffset=11], [startOffset=12, 
endOffset=112]. Broker-1 dies with same data disk issue, and Broker-2 restored 
after some time before Broker-1 revived. Since it had 80 messages and retention 
by segment size was not reached, producer produces few bigger messages than 
usual, segment is rolled with 99 messages [startOffset=12, endOffset=101]
   
   What kafka does in that case? when Broker-1 is up, it checks that new leader 
epoch change took place, and it removes messages up to latest synced message 
and replicate missing messages from Broker-2. That means previous non 
replicated messages on Broker-1 is gone, but in tiered storage it will be 
duplicate segment. Are we ok with that?
   
   If no, we should remove check for `highestLogOffset <= leaderEpochEndOffset` 
or extend logic with checking leaderEpoch number as well
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to