kamalcph commented on code in PR #14330: URL: https://github.com/apache/kafka/pull/14330#discussion_r1316896078
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -954,24 +953,29 @@ private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, Ex RemoteLogRetentionHandler remoteLogRetentionHandler = new RemoteLogRetentionHandler(retentionSizeData, retentionTimeData); Iterator<Integer> epochIterator = epochWithOffsets.navigableKeySet().iterator(); - boolean isSegmentDeleted = true; - while (isSegmentDeleted && epochIterator.hasNext()) { + boolean canProcess = true; + while (canProcess && epochIterator.hasNext()) { Integer epoch = epochIterator.next(); Iterator<RemoteLogSegmentMetadata> segmentsIterator = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch); - while (isSegmentDeleted && segmentsIterator.hasNext()) { + while (canProcess && segmentsIterator.hasNext()) { if (isCancelled() || !isLeader()) { logger.info("Returning from remote log segments cleanup for the remaining segments as the task state is changed."); return; } RemoteLogSegmentMetadata metadata = segmentsIterator.next(); // check whether the segment contains the required epoch range with in the current leader epoch lineage. - if (isRemoteSegmentWithinLeaderEpochs(metadata, logEndOffset, epochWithOffsets)) { + boolean isValidSegment = isRemoteSegmentWithinLeaderEpochs(metadata, logEndOffset, epochWithOffsets); + boolean isSegmentDeleted = false; + if (isValidSegment) { isSegmentDeleted = remoteLogRetentionHandler.deleteRetentionTimeBreachedSegments(metadata) || - remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata) || - remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(metadata, logStartOffset); + remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata); + } + if (!isSegmentDeleted) { + isSegmentDeleted = remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(metadata, logStartOffset); Review Comment: When the log-start-offset is moved by the user, the leader-epoch-checkpoint file gets truncated as per the log-start-ofset. Until the rlm-cleaner-thread runs in the next iteration, the remote log segments won't be removed. The `isRemoteSegmentWithinLeaderEpoch' verifies whether the epoch's present in the segment lies in the checkpoint file. It will always return false since the checkpoint file was already truncated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org