Nickstery commented on code in PR #14004:
URL: https://github.com/apache/kafka/pull/14004#discussion_r1267954353
##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java:
##########
@@ -100,17 +100,29 @@ void
handleSegmentWithCopySegmentStartedState(RemoteLogSegmentId remoteLogSegmen
void handleSegmentWithCopySegmentFinishedState(Long startOffset,
RemoteLogSegmentId remoteLogSegmentId,
Long leaderEpochEndOffset) {
+ // If there are duplicate segments uploaded due to leader-election,
then mark them as unreferenced.
+ // Duplicate segments can be uploaded when the previous leader had
tier-lags and the next leader uploads the
+ // segment for the same leader-epoch which is a super-set of
previously uploaded segments.
+ // (eg)
+ // case-1: Duplicate segment
+ // L0 uploaded segment S0 with offsets 0-100 and L1 uploaded
segment S1 with offsets 0-200.
+ // We will mark the segment S0 as duplicate and add it to
unreferencedSegmentIds.
+ // case-2: Overlapping segments
+ // L0 uploaded segment S0 with offsets 10-90 and L1 uploaded
segment S1 with offsets 5-100, S2-101-200,
+ // and so on. When the consumer request for segment with offset
95, it should get the segment S1 and not S0.
+ Map.Entry<Long, RemoteLogSegmentId> lastEntry = offsetToId.lastEntry();
+ while (lastEntry != null && lastEntry.getKey() >= startOffset &&
highestLogOffset <= leaderEpochEndOffset) {
Review Comment:
Test example
```
@Test
void
handleSegmentWithCopySegmentFinishedStateForDuplicateSegmentsWithSecondSegmentEarlierEndOffset()
{
// Broker-1 is a Leader for Partition-0, Broker-2 is follower
RemoteLogSegmentId segmentId1BRoker1 = new RemoteLogSegmentId(tpId,
Uuid.randomUuid());
RemoteLogSegmentId segmentId1Broker2 = new RemoteLogSegmentId(tpId,
Uuid.randomUuid());
//Leader and follower are in sync
epochState.handleSegmentWithCopySegmentFinishedState(10L,
segmentId1BRoker1, 100L);
epochState.handleSegmentWithCopySegmentFinishedState(10L,
segmentId1Broker2, 100L);
//Broker-1 received some messages and part of them were replicated
by follower
// Broker-2 was not able to replicate all the messages because
Leader is died.
//But were able to upload segment before death [101, 200]
RemoteLogSegmentId segmentId2Broker1 = new RemoteLogSegmentId(tpId,
Uuid.randomUuid());
epochState.handleSegmentWithCopySegmentFinishedState(101L,
segmentId2Broker1, 200L);
// Broker-2 still in-sync according to `replica.lag.time.max.ms`
// Last offset in sync is 150L
// Broker-2 gets the leadership and fills the segment 101L-190L and
uploads to Tiered storage
RemoteLogSegmentId segmentId2Broker2 = new RemoteLogSegmentId(tpId,
Uuid.randomUuid());
epochState.handleSegmentWithCopySegmentFinishedState(101L,
segmentId2Broker2, 190L);
//Traffic stops
//Since LeaderEpoch was changed, I expect that more fresh data will
be on the TS
assertEquals(190L, epochState.highestLogOffset());
//segment 2 uploaded by Broker-1 is expected to be unreferenced
assertTrue(epochState.unreferencedSegmentIds().contains(segmentId2Broker1));
assertEquals(1, epochState.unreferencedSegmentIds().size());
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]