showuon commented on code in PR #15817: URL: https://github.com/apache/kafka/pull/15817#discussion_r1587041449
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -1217,26 +1217,41 @@ public String toString() { * @return true if the remote segment's epoch/offsets are within the leader epoch lineage of the partition. */ // Visible for testing - public static boolean isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata segmentMetadata, - long logEndOffset, - NavigableMap<Integer, Long> leaderEpochs) { + static boolean isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata segmentMetadata, + long logEndOffset, + NavigableMap<Integer, Long> leaderEpochs) { long segmentEndOffset = segmentMetadata.endOffset(); // Filter epochs that does not have any messages/records associated with them. NavigableMap<Integer, Long> segmentLeaderEpochs = buildFilteredLeaderEpochMap(segmentMetadata.segmentLeaderEpochs()); // Check for out of bound epochs between segment epochs and current leader epochs. - Integer segmentFirstEpoch = segmentLeaderEpochs.firstKey(); Integer segmentLastEpoch = segmentLeaderEpochs.lastKey(); - if (segmentFirstEpoch < leaderEpochs.firstKey() || segmentLastEpoch > leaderEpochs.lastKey()) { + if (segmentLastEpoch < leaderEpochs.firstKey() || segmentLastEpoch > leaderEpochs.lastKey()) { Review Comment: I'm not sure if the check `segmentLastEpoch < leaderEpochs.firstKey()` makes sense or not. Suppose: leader-epoch-file-cache: {(5, 10), (7, 15), (9, 100)} segment1: offset-range = 5-50, Broker = 0, epochs = {(5, 10), (7, 15)} Now, delete_records are called and log start offset incremented to 100, so the new leader-epoch-file-cache will be: {(9, 100)} When entering this check, it'll fail because the segmentLastEpoch (7) will be < leaderEpochs.firstKey() (9). But we still want to delete this segment, right? ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -1217,26 +1217,41 @@ public String toString() { * @return true if the remote segment's epoch/offsets are within the leader epoch lineage of the partition. */ // Visible for testing - public static boolean isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata segmentMetadata, - long logEndOffset, - NavigableMap<Integer, Long> leaderEpochs) { + static boolean isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata segmentMetadata, + long logEndOffset, + NavigableMap<Integer, Long> leaderEpochs) { long segmentEndOffset = segmentMetadata.endOffset(); // Filter epochs that does not have any messages/records associated with them. NavigableMap<Integer, Long> segmentLeaderEpochs = buildFilteredLeaderEpochMap(segmentMetadata.segmentLeaderEpochs()); // Check for out of bound epochs between segment epochs and current leader epochs. - Integer segmentFirstEpoch = segmentLeaderEpochs.firstKey(); Integer segmentLastEpoch = segmentLeaderEpochs.lastKey(); - if (segmentFirstEpoch < leaderEpochs.firstKey() || segmentLastEpoch > leaderEpochs.lastKey()) { + if (segmentLastEpoch < leaderEpochs.firstKey() || segmentLastEpoch > leaderEpochs.lastKey()) { + LOGGER.debug("Segment {} is not within the partition leader epoch lineage. " + + "Remote segment epochs: {} and partition leader epochs: {}", + segmentMetadata.remoteLogSegmentId(), segmentLeaderEpochs, leaderEpochs); + return false; + } + // There can be overlapping remote log segments in the remote storage. (eg) + // leader-epoch-file-cache: {(5, 10), (7, 15), (9, 100)} + // segment1: offset-range = 5-50, Broker = 0, epochs = {(5, 10), (7, 15)} + // segment2: offset-range = 14-150, Broker = 1, epochs = {(5, 14), (7, 15), (9, 100)}, after leader-election. + // When the segment1 gets deleted, then the log-start-offset = 51 and leader-epoch-file-cache gets updated to: {(7, 51), (9, 100)}. + // While validating the segment2, we should ensure the overlapping remote log segments case. + Integer segmentFirstEpoch = segmentLeaderEpochs.ceilingKey(leaderEpochs.firstKey()); + if (segmentFirstEpoch == null || !leaderEpochs.containsKey(segmentFirstEpoch)) { Review Comment: Same here, if the above case makes sense, this check also fails to delete the segment. ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -1217,26 +1217,41 @@ public String toString() { * @return true if the remote segment's epoch/offsets are within the leader epoch lineage of the partition. */ // Visible for testing - public static boolean isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata segmentMetadata, - long logEndOffset, - NavigableMap<Integer, Long> leaderEpochs) { + static boolean isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata segmentMetadata, + long logEndOffset, + NavigableMap<Integer, Long> leaderEpochs) { long segmentEndOffset = segmentMetadata.endOffset(); // Filter epochs that does not have any messages/records associated with them. NavigableMap<Integer, Long> segmentLeaderEpochs = buildFilteredLeaderEpochMap(segmentMetadata.segmentLeaderEpochs()); // Check for out of bound epochs between segment epochs and current leader epochs. - Integer segmentFirstEpoch = segmentLeaderEpochs.firstKey(); Integer segmentLastEpoch = segmentLeaderEpochs.lastKey(); - if (segmentFirstEpoch < leaderEpochs.firstKey() || segmentLastEpoch > leaderEpochs.lastKey()) { + if (segmentLastEpoch < leaderEpochs.firstKey() || segmentLastEpoch > leaderEpochs.lastKey()) { + LOGGER.debug("Segment {} is not within the partition leader epoch lineage. " + + "Remote segment epochs: {} and partition leader epochs: {}", + segmentMetadata.remoteLogSegmentId(), segmentLeaderEpochs, leaderEpochs); + return false; + } + // There can be overlapping remote log segments in the remote storage. (eg) + // leader-epoch-file-cache: {(5, 10), (7, 15), (9, 100)} + // segment1: offset-range = 5-50, Broker = 0, epochs = {(5, 10), (7, 15)} + // segment2: offset-range = 14-150, Broker = 1, epochs = {(5, 14), (7, 15), (9, 100)}, after leader-election. Review Comment: Could I understand in which case, we'll upload 2 segments with offset overlapped? I thought the `findHighestRemoteOffset` should help get the `copiedOffset` for the new leader, no? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org