showuon commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1180121259


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -581,11 +588,18 @@ public void run() {
                 if (isLeader()) {
                     // Copy log segments to remote storage
                     copyLogSegmentsToRemote();
+                    // Cleanup/delete expired remote log segments
+                    cleanupExpiredRemoteLogSegments();
+                } else {
+                    Optional<UnifiedLog> unifiedLogOptional = 
fetchLog.apply(topicIdPartition.topicPartition());
+                    if (unifiedLogOptional.isPresent()) {
+                        long offset = 
findHighestRemoteOffset(topicIdPartition);
+                        
unifiedLogOptional.get().updateHighestOffsetInRemoteStorage(offset);

Review Comment:
   We might need to add logs here to describe why we need to update highest 
offset in remote storage for followers. I think that's for fetch from follower 
replica feature, right?



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -595,6 +609,193 @@ public void run() {
             }
         }
 
+        public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+            logger.debug("Updating $topicPartition with remoteLogStartOffset: 
{}", remoteLogStartOffset);
+            updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+        }
+
+        class RemoteLogRetentionHandler {
+
+            private long remainingBreachedSize = 0L;
+            private OptionalLong logStartOffset = OptionalLong.empty();
+
+            public RemoteLogRetentionHandler(long remainingBreachedSize) {
+                this.remainingBreachedSize = remainingBreachedSize;
+            }
+
+            private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean 
checkSizeRetention) throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+                    // Assumption that segments contain size > 0

Review Comment:
   nit: assume that segments contain size >= 0



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -595,6 +609,193 @@ public void run() {
             }
         }
 
+        public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+            logger.debug("Updating $topicPartition with remoteLogStartOffset: 
{}", remoteLogStartOffset);
+            updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+        }
+
+        class RemoteLogRetentionHandler {
+
+            private long remainingBreachedSize = 0L;
+            private OptionalLong logStartOffset = OptionalLong.empty();
+
+            public RemoteLogRetentionHandler(long remainingBreachedSize) {
+                this.remainingBreachedSize = remainingBreachedSize;
+            }
+
+            private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean 
checkSizeRetention) throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+                    // Assumption that segments contain size > 0
+                    if (checkSizeRetention && remainingBreachedSize > 0) {
+                        remainingBreachedSize -= x.segmentSizeInBytes();
+                        return remainingBreachedSize >= 0;
+                    } else return false;
+                });
+                if (isSegmentDeleted) {
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment 
${metadata.remoteLogSegmentId()} due to retention size " +
+                            "${log.config.retentionSize} breach. Log size 
after deletion will be " +
+                            "${remainingBreachedSize + 
log.config.retentionSize}.");
+                }
+                return isSegmentDeleted;
+            }
+
+            private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+                    logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+                }
+
+                // No need to update the logStartOffset.
+                return isSegmentDeleted;
+            }
+
+            // There are two cases:
+            // 1) When there are offline partitions and a new replica with 
empty disk is brought as leader, then the
+            //    leader-epoch gets bumped but the log-start-offset gets 
truncated back to 0.
+            // 2) To remove the unreferenced segments.
+            private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageException, 
ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
->
+                        
x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < 
earliestEpochEntry.epoch));
+                if (isSegmentDeleted) {
+                    logger.info("Deleted remote log segment ${} due to leader 
epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and 
segmentEpochs: {}",
+                            metadata.remoteLogSegmentId(), earliestEpochEntry, 
metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+                }
+
+                // No need to update the log-start-offset as these 
epochs/offsets are earlier to that value.
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata 
segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                if (predicate.test(segmentMetadata)) {
+                    // Publish delete segment started event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+                                    
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+                    // Delete the segment in remote storage.
+                    
remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+                    // Publish delete segment finished event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+                                    
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get();
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+
+            public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata,
+                                                               boolean 
checkTimestampRetention,
+                                                               long cleanupTs,
+                                                               long 
retentionMs)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+                        x -> checkTimestampRetention && x.maxTimestampMs() <= 
cleanupTs);
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+                    // It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+                    // are ascending with in an epoch.
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment {} due to 
retention time " +
+                            "{}ms breach based on the largest record timestamp 
in the segment", metadata.remoteLogSegmentId(), retentionMs);
+                }
+                return isSegmentDeleted;
+            }
+        }
+
+        private void cleanupExpiredRemoteLogSegments() throws 
RemoteStorageException, ExecutionException, InterruptedException {
+            if (isCancelled())
+                return;
+
+            // Cleanup remote log segments and update the log start offset if 
applicable.
+            final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition);

Review Comment:
   I agree this could be costly if the RLMM implementation doesn't cache the 
metadata. But I don't think there's an implementation constraint for plugin 
providers. They can always cache them in the plugin. I'm thinking it should be 
enough if we add something about it in the RLMM#listRemoteLogSegments javadoc.



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -581,11 +588,18 @@ public void run() {
                 if (isLeader()) {
                     // Copy log segments to remote storage
                     copyLogSegmentsToRemote();
+                    // Cleanup/delete expired remote log segments
+                    cleanupExpiredRemoteLogSegments();

Review Comment:
   > Should this operation be performed in a separate thread pool which can 
have a defined quota? (similar to how we perform cleaning for local log using 
separate cleaner/background threads).
   
   Good suggestion. But we don't include this part in the original KIP, we need 
another KIP to improve it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to