jeqo commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1247799637


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -152,19 +156,23 @@ public class RemoteLogManager implements Closeable {
      * @param time      Time instance.
      * @param clusterId The cluster id.
      * @param fetchLog  function to get UnifiedLog instance for a given topic.
+     * @param updateRemoteLogStartOffset function to update the 
log-start-offset for a given topic partition.
      */
     public RemoteLogManager(RemoteLogManagerConfig rlmConfig,
                             int brokerId,
                             String logDir,
                             String clusterId,
                             Time time,
-                            Function<TopicPartition, Optional<UnifiedLog>> 
fetchLog) {
+                            Function<TopicPartition, Optional<UnifiedLog>> 
fetchLog,
+                            BiConsumer<TopicPartition, Long> 
updateRemoteLogStartOffset) {

Review Comment:
   nit: 
   ```suggestion
                               BiConsumer<TopicPartition, Long> 
updateLogStartOffsetFromRemoteTier) {
   ```



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -634,6 +642,241 @@ public void run() {
             }
         }
 
+        public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+            if (isLeader()) {
+                logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+                updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+            }
+        }
+
+        class RemoteLogRetentionHandler {
+
+            private final Optional<RetentionSizeData> retentionSizeData;
+            private final Optional<RetentionTimeData> retentionTimeData;
+
+            private long remainingBreachedSize;
+
+            private OptionalLong logStartOffset = OptionalLong.empty();
+
+            public RemoteLogRetentionHandler(Optional<RetentionSizeData> 
retentionSizeData, Optional<RetentionTimeData> retentionTimeData) {
+                this.retentionSizeData = retentionSizeData;
+                this.retentionTimeData = retentionTimeData;
+                remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+            }
+
+            private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+                if (!retentionSizeData.isPresent()) {
+                    return false;
+                }
+
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+                    // Assumption that segments contain size >= 0
+                    if (remainingBreachedSize > 0) {
+                        long remainingBytes = remainingBreachedSize - 
x.segmentSizeInBytes();
+                        if (remainingBytes >= 0) {
+                            remainingBreachedSize = remainingBytes;
+                            return true;
+                        }
+                    }
+
+                    return false;
+                });
+                if (isSegmentDeleted) {
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
+                            metadata.remoteLogSegmentId(), 
retentionSizeData.get().retentionSize, remainingBreachedSize + 
retentionSizeData.get().retentionSize);
+                }
+                return isSegmentDeleted;
+            }
+
+            public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                if (!retentionTimeData.isPresent()) {
+                    return false;
+                }
+
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+                        x -> x.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs);
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+                    // It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+                    // are ascending with in an epoch.
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment {} due to 
retention time {}ms breach based on the largest record timestamp in the 
segment",
+                            metadata.remoteLogSegmentId(), 
retentionTimeData.get().retentionMs);
+                }
+                return isSegmentDeleted;
+            }
+
+            private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+                if (isSegmentDeleted && retentionSizeData.isPresent()) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+                    logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+                }
+
+                // No need to update the logStartOffset.
+                return isSegmentDeleted;
+            }
+
+            // It removes the segments beyond the current leader's earliest 
epoch. Those segments are considered as
+            // unreferenced because they are not part of the current leader 
epoch lineage.
+            private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageException, 
ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
->
+                        
x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < 
earliestEpochEntry.epoch));
+                if (isSegmentDeleted) {
+                    logger.info("Deleted remote log segment {} due to leader 
epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and 
segmentEpochs: {}",
+                            metadata.remoteLogSegmentId(), earliestEpochEntry, 
metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+                }
+
+                // No need to update the log-start-offset as these 
epochs/offsets are earlier to that value.
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata 
segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                if (predicate.test(segmentMetadata)) {
+                    // Publish delete segment started event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+                                    
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+                    // Delete the segment in remote storage.
+                    
remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+                    // Publish delete segment finished event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+                                    
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get();
+                    return true;
+                }
+
+                return false;
+            }
+
+        }
+
+        private void cleanupExpiredRemoteLogSegments() throws 
RemoteStorageException, ExecutionException, InterruptedException {
+            if (isCancelled() || !isLeader()) {
+                logger.info("Returning from remote log segments cleanup as the 
task state is changed");
+                return;
+            }
+
+            // Cleanup remote log segments and update the log start offset if 
applicable.
+            final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition);
+            if (!segmentMetadataIter.hasNext()) {
+                logger.debug("No remote log segments available on remote 
storage for partition: {}", topicIdPartition);
+                return;
+            }
+
+            final Optional<UnifiedLog> logOptional = 
fetchLog.apply(topicIdPartition.topicPartition());
+            if (!logOptional.isPresent()) {
+                logger.debug("No UnifiedLog instance available for partition: 
{}", topicIdPartition);
+                return;
+            }
+
+            final UnifiedLog log = logOptional.get();
+            final Option<LeaderEpochFileCache> leaderEpochCacheOption = 
log.leaderEpochCache();
+            if (leaderEpochCacheOption.isEmpty()) {
+                return;

Review Comment:
   maybe worth adding a log message here stating why cleanup is not happening? 
or maybe just a comment explaining why this scenario may never happen given the 
low prob that recordVersion < 2 is used. 



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -581,11 +588,18 @@ public void run() {
                 if (isLeader()) {
                     // Copy log segments to remote storage
                     copyLogSegmentsToRemote();
+                    // Cleanup/delete expired remote log segments
+                    cleanupExpiredRemoteLogSegments();

Review Comment:
   @satishd , could you elaborate a bit more what do you mean by
   > Respective garbage collectors in those storages will take care of deleting 
the data asynchronously.
   
   ?
   Is this relying on some specific storage backend implementation? 



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -618,6 +625,230 @@ public void run() {
             }
         }
 
+        public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+            if (isLeader()) {

Review Comment:
   > even if we are not the leader at this stage, we have deleted the logs in 
remote. 
   
   If I'm reading the call path correctly, this is not the case. 
`handleLogStartOffsetUpdate` function is called only at the end of 
`cleanupExpiredRemoteLogSegments` that filters out calls from followers.
   I guess we could either remote the `isLeader` validation here, or move this 
logic within the lambda itself?



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -634,6 +642,241 @@ public void run() {
             }
         }
 
+        public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+            if (isLeader()) {
+                logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+                updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+            }
+        }
+
+        class RemoteLogRetentionHandler {
+
+            private final Optional<RetentionSizeData> retentionSizeData;
+            private final Optional<RetentionTimeData> retentionTimeData;
+
+            private long remainingBreachedSize;
+
+            private OptionalLong logStartOffset = OptionalLong.empty();
+
+            public RemoteLogRetentionHandler(Optional<RetentionSizeData> 
retentionSizeData, Optional<RetentionTimeData> retentionTimeData) {
+                this.retentionSizeData = retentionSizeData;
+                this.retentionTimeData = retentionTimeData;
+                remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+            }
+
+            private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+                if (!retentionSizeData.isPresent()) {
+                    return false;
+                }
+
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+                    // Assumption that segments contain size >= 0
+                    if (remainingBreachedSize > 0) {
+                        long remainingBytes = remainingBreachedSize - 
x.segmentSizeInBytes();
+                        if (remainingBytes >= 0) {
+                            remainingBreachedSize = remainingBytes;
+                            return true;
+                        }
+                    }
+
+                    return false;
+                });
+                if (isSegmentDeleted) {
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
+                            metadata.remoteLogSegmentId(), 
retentionSizeData.get().retentionSize, remainingBreachedSize + 
retentionSizeData.get().retentionSize);
+                }
+                return isSegmentDeleted;
+            }
+
+            public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                if (!retentionTimeData.isPresent()) {
+                    return false;
+                }
+
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+                        x -> x.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs);
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+                    // It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+                    // are ascending with in an epoch.
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment {} due to 
retention time {}ms breach based on the largest record timestamp in the 
segment",
+                            metadata.remoteLogSegmentId(), 
retentionTimeData.get().retentionMs);
+                }
+                return isSegmentDeleted;
+            }
+
+            private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+                if (isSegmentDeleted && retentionSizeData.isPresent()) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+                    logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+                }
+
+                // No need to update the logStartOffset.
+                return isSegmentDeleted;
+            }
+
+            // It removes the segments beyond the current leader's earliest 
epoch. Those segments are considered as
+            // unreferenced because they are not part of the current leader 
epoch lineage.
+            private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageException, 
ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
->
+                        
x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < 
earliestEpochEntry.epoch));
+                if (isSegmentDeleted) {
+                    logger.info("Deleted remote log segment {} due to leader 
epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and 
segmentEpochs: {}",
+                            metadata.remoteLogSegmentId(), earliestEpochEntry, 
metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+                }
+
+                // No need to update the log-start-offset as these 
epochs/offsets are earlier to that value.
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata 
segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                if (predicate.test(segmentMetadata)) {

Review Comment:
   Could we add a log info here similar to copy?
   ```suggestion
                   if (predicate.test(segmentMetadata)) {
                       logger.info("Deleting remote log segment {}", 
metadata.remoteSegmentId());
   ```



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -922,6 +947,10 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     }
   }
 
+  private def maybeIncrementLocalLogStartOffset(newLogStartOffset: Long, 
reason: LogStartOffsetIncrementReason): Unit = {
+    maybeIncrementLogStartOffset(newLogStartOffset, reason, 
onlyLocalLogStartOffsetUpdate = true)
+  }

Review Comment:
   found this a bit confusing. In main operation 
`onlyLocalLogStartOffsetUpdate` is false by default, but here we are overriding 
with `onlyLocalLogStartOffsetUpdate` as true, and methods signature are mainly 
the same. Wouldn't be clearer to use the default method with 
`onlyLocalLogStartOffsetUpdate=true` instead of creating this private method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to