showuon commented on code in PR #14330:
URL: https://github.com/apache/kafka/pull/14330#discussion_r1317166423


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -954,24 +953,29 @@ private void cleanupExpiredRemoteLogSegments() throws 
RemoteStorageException, Ex
 
             RemoteLogRetentionHandler remoteLogRetentionHandler = new 
RemoteLogRetentionHandler(retentionSizeData, retentionTimeData);
             Iterator<Integer> epochIterator = 
epochWithOffsets.navigableKeySet().iterator();
-            boolean isSegmentDeleted = true;
-            while (isSegmentDeleted && epochIterator.hasNext()) {
+            boolean canProcess = true;
+            while (canProcess && epochIterator.hasNext()) {
                 Integer epoch = epochIterator.next();
                 Iterator<RemoteLogSegmentMetadata> segmentsIterator = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
-                while (isSegmentDeleted && segmentsIterator.hasNext()) {
+                while (canProcess && segmentsIterator.hasNext()) {
                     if (isCancelled() || !isLeader()) {
                         logger.info("Returning from remote log segments 
cleanup for the remaining segments as the task state is changed.");
                         return;
                     }
                     RemoteLogSegmentMetadata metadata = 
segmentsIterator.next();
 
                     // check whether the segment contains the required epoch 
range with in the current leader epoch lineage.
-                    if (isRemoteSegmentWithinLeaderEpochs(metadata, 
logEndOffset, epochWithOffsets)) {
+                    boolean isValidSegment = 
isRemoteSegmentWithinLeaderEpochs(metadata, logEndOffset, epochWithOffsets);
+                    boolean isSegmentDeleted = false;
+                    if (isValidSegment) {
                         isSegmentDeleted =
                                 
remoteLogRetentionHandler.deleteRetentionTimeBreachedSegments(metadata) ||
-                                        
remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata) ||
-                                        
remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(metadata, 
logStartOffset);
+                                        
remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata);
+                    }
+                    if (!isSegmentDeleted) {
+                        isSegmentDeleted = 
remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(metadata, 
logStartOffset);

Review Comment:
   Is it possible we run `deleteLogStartOffsetBreachedSegments` first before 
sizeBreach/timeBreach? Ex:
   
   ```
   isSegmentDeleted = 
remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(
                                   metadata, logStartOffset, epochWithOffsets);
   
    boolean isValidSegment = isRemoteSegmentWithinLeaderEpochs(metadata, 
logEndOffset, epochWithOffsets);
   
   if (isValidSegment) {
                           isSegmentDeleted =         
remoteLogRetentionHandler.deleteRetentionTimeBreachedSegments(metadata) || 
remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to