kamalcph commented on code in PR #14330:
URL: https://github.com/apache/kafka/pull/14330#discussion_r1316896078


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -954,24 +953,29 @@ private void cleanupExpiredRemoteLogSegments() throws 
RemoteStorageException, Ex
 
             RemoteLogRetentionHandler remoteLogRetentionHandler = new 
RemoteLogRetentionHandler(retentionSizeData, retentionTimeData);
             Iterator<Integer> epochIterator = 
epochWithOffsets.navigableKeySet().iterator();
-            boolean isSegmentDeleted = true;
-            while (isSegmentDeleted && epochIterator.hasNext()) {
+            boolean canProcess = true;
+            while (canProcess && epochIterator.hasNext()) {
                 Integer epoch = epochIterator.next();
                 Iterator<RemoteLogSegmentMetadata> segmentsIterator = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
-                while (isSegmentDeleted && segmentsIterator.hasNext()) {
+                while (canProcess && segmentsIterator.hasNext()) {
                     if (isCancelled() || !isLeader()) {
                         logger.info("Returning from remote log segments 
cleanup for the remaining segments as the task state is changed.");
                         return;
                     }
                     RemoteLogSegmentMetadata metadata = 
segmentsIterator.next();
 
                     // check whether the segment contains the required epoch 
range with in the current leader epoch lineage.
-                    if (isRemoteSegmentWithinLeaderEpochs(metadata, 
logEndOffset, epochWithOffsets)) {
+                    boolean isValidSegment = 
isRemoteSegmentWithinLeaderEpochs(metadata, logEndOffset, epochWithOffsets);
+                    boolean isSegmentDeleted = false;
+                    if (isValidSegment) {
                         isSegmentDeleted =
                                 
remoteLogRetentionHandler.deleteRetentionTimeBreachedSegments(metadata) ||
-                                        
remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata) ||
-                                        
remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(metadata, 
logStartOffset);
+                                        
remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata);
+                    }
+                    if (!isSegmentDeleted) {
+                        isSegmentDeleted = 
remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(metadata, 
logStartOffset);

Review Comment:
   When the log-start-offset is moved by the user, the leader-epoch-checkpoint 
file gets truncated as per the log-start-offset. Until the rlm-cleaner-thread 
runs in the next iteration, the remote log segments won't be removed.
   
   The `isRemoteSegmentWithinLeaderEpoch` verifies whether the epochs present 
in the segment lies in the checkpoint file. It will always return false since 
the checkpoint file was already truncated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to