satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1296732476


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -696,11 +704,327 @@ public void run() {
             }
         }
 
+        public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+            if (isLeader()) {
+                logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+                updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+            }
+        }
+
+        class RemoteLogRetentionHandler {
+
+            private final Optional<RetentionSizeData> retentionSizeData;
+            private final Optional<RetentionTimeData> retentionTimeData;
+
+            private long remainingBreachedSize;
+
+            private OptionalLong logStartOffset = OptionalLong.empty();
+
+            public RemoteLogRetentionHandler(Optional<RetentionSizeData> 
retentionSizeData, Optional<RetentionTimeData> retentionTimeData) {
+                this.retentionSizeData = retentionSizeData;
+                this.retentionTimeData = retentionTimeData;
+                remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+            }
+
+            private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+                if (!retentionSizeData.isPresent()) {
+                    return false;
+                }
+
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+                    // Assumption that segments contain size >= 0
+                    if (remainingBreachedSize > 0) {
+                        long remainingBytes = remainingBreachedSize - 
x.segmentSizeInBytes();
+                        if (remainingBytes >= 0) {
+                            remainingBreachedSize = remainingBytes;
+                            return true;
+                        }
+                    }
+
+                    return false;
+                });
+                if (isSegmentDeleted) {
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
+                            metadata.remoteLogSegmentId(), 
retentionSizeData.get().retentionSize, remainingBreachedSize + 
retentionSizeData.get().retentionSize);
+                }
+                return isSegmentDeleted;
+            }
+
+            public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                if (!retentionTimeData.isPresent()) {
+                    return false;
+                }
+
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+                        x -> x.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs);
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+                    // It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+                    // are ascending with in an epoch.
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment {} due to 
retention time {}ms breach based on the largest record timestamp in the 
segment",
+                            metadata.remoteLogSegmentId(), 
retentionTimeData.get().retentionMs);
+                }
+                return isSegmentDeleted;
+            }
+
+            private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+                if (isSegmentDeleted && retentionSizeData.isPresent()) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+                    logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+                }
+
+                return isSegmentDeleted;
+            }
+
+            // It removes the segments beyond the current leader's earliest 
epoch. Those segments are considered as
+            // unreferenced because they are not part of the current leader 
epoch lineage.
+            private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageException, 
ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
->
+                        
x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < 
earliestEpochEntry.epoch));
+                if (isSegmentDeleted) {
+                    logger.info("Deleted remote log segment {} due to leader 
epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and 
segmentEpochs: {}",
+                            metadata.remoteLogSegmentId(), earliestEpochEntry, 
metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+                }
+
+                // No need to update the log-start-offset as these 
epochs/offsets are earlier to that value.
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata 
segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                if (predicate.test(segmentMetadata)) {
+                    logger.info("Deleting remote log segment {}", 
segmentMetadata.remoteLogSegmentId());
+                    // Publish delete segment started event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+                                    segmentMetadata.customMetadata(), 
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+                    // Delete the segment in remote storage.
+                    
remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+                    // Publish delete segment finished event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+                                    segmentMetadata.customMetadata(), 
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get();
+                    logger.info("Deleted remote log segment {}", 
segmentMetadata.remoteLogSegmentId());
+                    return true;
+                }
+
+                return false;
+            }
+
+        }
+
+        private void cleanupExpiredRemoteLogSegments() throws 
RemoteStorageException, ExecutionException, InterruptedException {
+            if (isCancelled() || !isLeader()) {
+                logger.info("Returning from remote log segments cleanup as the 
task state is changed");
+                return;
+            }
+
+            // Cleanup remote log segments and update the log start offset if 
applicable.
+            final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition);
+            if (!segmentMetadataIter.hasNext()) {
+                logger.debug("No remote log segments available on remote 
storage for partition: {}", topicIdPartition);
+                return;
+            }
+
+            final Optional<UnifiedLog> logOptional = 
fetchLog.apply(topicIdPartition.topicPartition());
+            if (!logOptional.isPresent()) {
+                logger.debug("No UnifiedLog instance available for partition: 
{}", topicIdPartition);
+                return;
+            }
+
+            final UnifiedLog log = logOptional.get();
+            final Option<LeaderEpochFileCache> leaderEpochCacheOption = 
log.leaderEpochCache();
+            if (leaderEpochCacheOption.isEmpty()) {
+                logger.debug("No leader epoch cache available for partition: 
{}", topicIdPartition);
+                return;
+            }
+
+            final Set<Integer> epochsSet = new HashSet<>();
+            // Good to have an API from RLMM to get all the remote leader 
epochs of all the segments of a partition
+            // instead of going through all the segments and building it here.
+            while (segmentMetadataIter.hasNext()) {
+                RemoteLogSegmentMetadata segmentMetadata = 
segmentMetadataIter.next();
+                
epochsSet.addAll(segmentMetadata.segmentLeaderEpochs().keySet());
+            }
+
+            // All the leader epochs in sorted order that exists in remote 
storage
+            final List<Integer> remoteLeaderEpochs = new 
ArrayList<>(epochsSet);
+            Collections.sort(remoteLeaderEpochs);
+
+            LeaderEpochFileCache leaderEpochCache = 
leaderEpochCacheOption.get();
+            NavigableMap<Integer, Long> epochWithOffsets = 
leaderEpochCache.epochWithOffsets();
+            Optional<EpochEntry> earliestEpochEntryOptional = 
leaderEpochCache.earliestEntry();
+
+            Optional<RetentionSizeData> retentionSizeData = 
buildRetentionSizeData(log.config().retentionSize,
+                    log.onlyLocalLogSegmentsSize(), log.logEndOffset(), 
epochWithOffsets);
+            Optional<RetentionTimeData> retentionTimeData = 
buildRetentionTimeData(log.config().retentionMs);
+
+            RemoteLogRetentionHandler remoteLogRetentionHandler = new 
RemoteLogRetentionHandler(retentionSizeData, retentionTimeData);
+            Iterator<Integer> epochIterator = 
epochWithOffsets.navigableKeySet().iterator();
+            boolean isSegmentDeleted = true;
+            while (isSegmentDeleted && epochIterator.hasNext()) {
+                Integer epoch = epochIterator.next();
+                Iterator<RemoteLogSegmentMetadata> segmentsIterator = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
+                while (isSegmentDeleted && segmentsIterator.hasNext()) {
+                    if (isCancelled() || !isLeader()) {
+                        logger.info("Returning from remote log segments 
cleanup for the remaining segments as the task state is changed.");
+                        return;
+                    }
+                    RemoteLogSegmentMetadata metadata = 
segmentsIterator.next();
+
+                    // check whether the segment contains the required epoch 
range with in the current leader epoch lineage.
+                    if (isRemoteSegmentWithinLeaderEpochs(metadata, 
log.logEndOffset(), epochWithOffsets)) {
+                        isSegmentDeleted =
+                                
remoteLogRetentionHandler.deleteRetentionTimeBreachedSegments(metadata) ||
+                                        
remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata) ||
+                                        
remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(metadata, 
log.logStartOffset());
+                    }
+                }
+            }
+
+            // Remove the remote log segments whose segment-leader-epochs are 
less than the earliest-epoch known

Review Comment:
   In case of unclean leader election, there is already a durability loss when 
a non in-sync replica needs to be chosen as a leader and given preference to 
availability. The approach taken in this PR uses the current tradeoff of 
durability loss and avoids remote log segment leaks. This is slightly different 
from local log cleanup which we can clarify in the release notes. 
   
   Retention/cleanup logic spread across multiple layers(outside of Kafka) 
poses significant risks and could lead to more extensive problems. So, it is 
better that to be handled by Kafka's retention mechanism.
   
   We will discuss further on finalizing the approach before we make this 
feature production ready. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to