satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1281672320
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -618,6 +625,230 @@ public void run() { } } + public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { + if (isLeader()) { + logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); + updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); + } + } + + class RemoteLogRetentionHandler { + + private final Optional<RetentionSizeData> retentionSizeData; + private final Optional<RetentionTimeData> retentionTimeData; + + private long remainingBreachedSize; + + private OptionalLong logStartOffset = OptionalLong.empty(); + + public RemoteLogRetentionHandler(Optional<RetentionSizeData> retentionSizeData, Optional<RetentionTimeData> retentionTimeData) { + this.retentionSizeData = retentionSizeData; + this.retentionTimeData = retentionTimeData; + remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); + } + + private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { + if (!retentionSizeData.isPresent()) { + return false; + } + + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { + // Assumption that segments contain size >= 0 + if (retentionSizeData.get().remainingBreachedSize > 0) { + remainingBreachedSize -= x.segmentSizeInBytes(); + return remainingBreachedSize >= 0; + } else return false; + }); + if (isSegmentDeleted) { + logStartOffset = OptionalLong.of(metadata.endOffset() + 1); + logger.info("Deleted remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", + metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize); + } + return isSegmentDeleted; + } + + public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) + throws RemoteStorageException, ExecutionException, InterruptedException { + if (!retentionTimeData.isPresent()) { + return false; + } + + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, + x -> x.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs); + if (isSegmentDeleted) { + remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); + // It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals + // are ascending with in an epoch. + logStartOffset = OptionalLong.of(metadata.endOffset() + 1); + logger.info("Deleted remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment", + metadata.remoteLogSegmentId(), retentionTimeData.get().retentionMs); + } + return isSegmentDeleted; + } + + private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) + throws RemoteStorageException, ExecutionException, InterruptedException { + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); + if (isSegmentDeleted && retentionSizeData.isPresent()) { + remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); + logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); + } + + // No need to update the logStartOffset. + return isSegmentDeleted; + } + + // It removes the segments beyond the current leader's earliest epoch. Those segments are considered as + // unreferenced because they are not part of the current leader epoch lineage. + private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> + x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch)); + if (isSegmentDeleted) { + logger.info("Deleted remote log segment {} due to leader epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and segmentEpochs: {}", + metadata.remoteLogSegmentId(), earliestEpochEntry, metadata.endOffset(), metadata.segmentLeaderEpochs().keySet()); + } + + // No need to update the log-start-offset as these epochs/offsets are earlier to that value. + return isSegmentDeleted; + } + + private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate) + throws RemoteStorageException, ExecutionException, InterruptedException { + if (predicate.test(segmentMetadata)) { + // Publish delete segment started event. + remoteLogMetadataManager.updateRemoteLogSegmentMetadata( + new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(), + RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get(); + + // Delete the segment in remote storage. + remoteLogStorageManager.deleteLogSegmentData(segmentMetadata); + + // Publish delete segment finished event. + remoteLogMetadataManager.updateRemoteLogSegmentMetadata( + new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(), + RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get(); + return true; + } else { + return false; + } + } + + } + + private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException { + if (isCancelled() || !isLeader()) { + logger.info("Returning from remote log segments cleanup as the task state is changed"); + return; + } + + // Cleanup remote log segments and update the log start offset if applicable. + final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition); + if (!segmentMetadataIter.hasNext()) { + return; + } + + final Optional<UnifiedLog> logOptional = fetchLog.apply(topicIdPartition.topicPartition()); + if (!logOptional.isPresent()) { + return; + } + + final UnifiedLog log = logOptional.get(); + final Option<LeaderEpochFileCache> leaderEpochCacheOption = log.leaderEpochCache(); + if (leaderEpochCacheOption.isEmpty()) { + return; + } + + final long retentionSize = log.config().retentionSize; + final boolean checkSizeRetention = retentionSize > -1; + + final long retentionMs = log.config().retentionMs; + final boolean checkTimestampRetention = retentionMs > -1; + + // Iterate once + // - to build the log size of segments with base-offset < local-log-start-offset + // - to collect all the epochs of remote log segments + // These values can be cached and updated in RLMTask for this topic partition without computing in each + // iteration. But the logic can become little complex and need to cover different scenarios to avoid any + // leaks. We can have a followup to improve it by maintaining these values through both copying and deletion. + final Set<Integer> epochsSet = new HashSet<>(); + long totalSizeEarlierToLocalLogStartOffset = 0L; + while (segmentMetadataIter.hasNext()) { + RemoteLogSegmentMetadata segmentMetadata = segmentMetadataIter.next(); + epochsSet.addAll(segmentMetadata.segmentLeaderEpochs().keySet()); + + if (checkSizeRetention && segmentMetadata.endOffset() < log.localLogStartOffset()) { + totalSizeEarlierToLocalLogStartOffset += segmentMetadata.segmentSizeInBytes(); + } + } + + // All the leader epochs in sorted order that exists in remote storage + final List<Integer> remoteLeaderEpochs = new ArrayList<>(epochsSet); + Collections.sort(remoteLeaderEpochs); + + Optional<RetentionSizeData> retentionSizeData = buildRetentionSizeData(checkSizeRetention, retentionSize, log, totalSizeEarlierToLocalLogStartOffset); + Optional<RetentionTimeData> retentionTimeData = checkTimestampRetention + ? Optional.of(new RetentionTimeData(retentionMs, time.milliseconds() - retentionMs)) + : Optional.empty(); + RemoteLogRetentionHandler remoteLogRetentionHandler = new RemoteLogRetentionHandler(retentionSizeData, retentionTimeData); + + LeaderEpochFileCache leaderEpochCache = leaderEpochCacheOption.get(); + Iterator<EpochEntry> epochEntryIterator = leaderEpochCache.epochEntries().iterator(); + boolean isSegmentDeleted = true; + while (isSegmentDeleted && epochEntryIterator.hasNext()) { + EpochEntry epochEntry = epochEntryIterator.next(); + Iterator<RemoteLogSegmentMetadata> segmentsIterator = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epochEntry.epoch); + while (isSegmentDeleted && segmentsIterator.hasNext()) { + if (isCancelled() || !isLeader()) { + return; + } + + RemoteLogSegmentMetadata metadata = segmentsIterator.next(); + isSegmentDeleted = + remoteLogRetentionHandler.deleteRetentionTimeBreachedSegments(metadata) || + remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata) || + remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(metadata, log.logStartOffset()); + } + } + + // Remove the remote log segments whose segment-leader-epochs are lesser than the earliest-epoch known Review Comment: All the size/time/startOffset handlers run based on the current leader’s leader epochs. Here, we are removing the segments which have leader epochs earlier to the lowest leader epoch on this broker(partition leader). ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -618,6 +625,230 @@ public void run() { } } + public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { + if (isLeader()) { + logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); + updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); + } + } + + class RemoteLogRetentionHandler { + + private final Optional<RetentionSizeData> retentionSizeData; + private final Optional<RetentionTimeData> retentionTimeData; + + private long remainingBreachedSize; + + private OptionalLong logStartOffset = OptionalLong.empty(); + + public RemoteLogRetentionHandler(Optional<RetentionSizeData> retentionSizeData, Optional<RetentionTimeData> retentionTimeData) { + this.retentionSizeData = retentionSizeData; + this.retentionTimeData = retentionTimeData; + remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); + } + + private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { + if (!retentionSizeData.isPresent()) { + return false; + } + + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { + // Assumption that segments contain size >= 0 + if (retentionSizeData.get().remainingBreachedSize > 0) { + remainingBreachedSize -= x.segmentSizeInBytes(); + return remainingBreachedSize >= 0; + } else return false; + }); + if (isSegmentDeleted) { + logStartOffset = OptionalLong.of(metadata.endOffset() + 1); + logger.info("Deleted remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", + metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize); + } + return isSegmentDeleted; + } + + public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) + throws RemoteStorageException, ExecutionException, InterruptedException { + if (!retentionTimeData.isPresent()) { + return false; + } + + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, + x -> x.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs); + if (isSegmentDeleted) { + remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); + // It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals + // are ascending with in an epoch. + logStartOffset = OptionalLong.of(metadata.endOffset() + 1); + logger.info("Deleted remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment", + metadata.remoteLogSegmentId(), retentionTimeData.get().retentionMs); + } + return isSegmentDeleted; + } + + private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) + throws RemoteStorageException, ExecutionException, InterruptedException { + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); + if (isSegmentDeleted && retentionSizeData.isPresent()) { + remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); + logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); + } + + // No need to update the logStartOffset. + return isSegmentDeleted; + } + + // It removes the segments beyond the current leader's earliest epoch. Those segments are considered as + // unreferenced because they are not part of the current leader epoch lineage. + private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> + x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch)); + if (isSegmentDeleted) { + logger.info("Deleted remote log segment {} due to leader epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and segmentEpochs: {}", + metadata.remoteLogSegmentId(), earliestEpochEntry, metadata.endOffset(), metadata.segmentLeaderEpochs().keySet()); + } + + // No need to update the log-start-offset as these epochs/offsets are earlier to that value. + return isSegmentDeleted; + } + + private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate) + throws RemoteStorageException, ExecutionException, InterruptedException { + if (predicate.test(segmentMetadata)) { + // Publish delete segment started event. + remoteLogMetadataManager.updateRemoteLogSegmentMetadata( + new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(), + RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get(); + + // Delete the segment in remote storage. + remoteLogStorageManager.deleteLogSegmentData(segmentMetadata); + + // Publish delete segment finished event. + remoteLogMetadataManager.updateRemoteLogSegmentMetadata( + new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(), + RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get(); + return true; + } else { + return false; + } + } + + } + + private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException { + if (isCancelled() || !isLeader()) { + logger.info("Returning from remote log segments cleanup as the task state is changed"); + return; + } + + // Cleanup remote log segments and update the log start offset if applicable. + final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition); + if (!segmentMetadataIter.hasNext()) { + return; + } + + final Optional<UnifiedLog> logOptional = fetchLog.apply(topicIdPartition.topicPartition()); + if (!logOptional.isPresent()) { + return; + } + + final UnifiedLog log = logOptional.get(); + final Option<LeaderEpochFileCache> leaderEpochCacheOption = log.leaderEpochCache(); + if (leaderEpochCacheOption.isEmpty()) { + return; + } + + final long retentionSize = log.config().retentionSize; + final boolean checkSizeRetention = retentionSize > -1; + + final long retentionMs = log.config().retentionMs; + final boolean checkTimestampRetention = retentionMs > -1; + + // Iterate once + // - to build the log size of segments with base-offset < local-log-start-offset + // - to collect all the epochs of remote log segments + // These values can be cached and updated in RLMTask for this topic partition without computing in each + // iteration. But the logic can become little complex and need to cover different scenarios to avoid any + // leaks. We can have a followup to improve it by maintaining these values through both copying and deletion. + final Set<Integer> epochsSet = new HashSet<>(); + long totalSizeEarlierToLocalLogStartOffset = 0L; + while (segmentMetadataIter.hasNext()) { + RemoteLogSegmentMetadata segmentMetadata = segmentMetadataIter.next(); + epochsSet.addAll(segmentMetadata.segmentLeaderEpochs().keySet()); + + if (checkSizeRetention && segmentMetadata.endOffset() < log.localLogStartOffset()) { + totalSizeEarlierToLocalLogStartOffset += segmentMetadata.segmentSizeInBytes(); + } + } + + // All the leader epochs in sorted order that exists in remote storage + final List<Integer> remoteLeaderEpochs = new ArrayList<>(epochsSet); + Collections.sort(remoteLeaderEpochs); + + Optional<RetentionSizeData> retentionSizeData = buildRetentionSizeData(checkSizeRetention, retentionSize, log, totalSizeEarlierToLocalLogStartOffset); + Optional<RetentionTimeData> retentionTimeData = checkTimestampRetention + ? Optional.of(new RetentionTimeData(retentionMs, time.milliseconds() - retentionMs)) + : Optional.empty(); + RemoteLogRetentionHandler remoteLogRetentionHandler = new RemoteLogRetentionHandler(retentionSizeData, retentionTimeData); + + LeaderEpochFileCache leaderEpochCache = leaderEpochCacheOption.get(); + Iterator<EpochEntry> epochEntryIterator = leaderEpochCache.epochEntries().iterator(); + boolean isSegmentDeleted = true; + while (isSegmentDeleted && epochEntryIterator.hasNext()) { + EpochEntry epochEntry = epochEntryIterator.next(); + Iterator<RemoteLogSegmentMetadata> segmentsIterator = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epochEntry.epoch); + while (isSegmentDeleted && segmentsIterator.hasNext()) { + if (isCancelled() || !isLeader()) { + return; + } + + RemoteLogSegmentMetadata metadata = segmentsIterator.next(); + isSegmentDeleted = + remoteLogRetentionHandler.deleteRetentionTimeBreachedSegments(metadata) || + remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata) || + remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(metadata, log.logStartOffset()); + } + } + + // Remove the remote log segments whose segment-leader-epochs are lesser than the earliest-epoch known Review Comment: All the size/time/startOffset handlers run based on the current leader’s leader epochs. Here, we are removing the segments which have leader epochs earlier to the lowest leader epoch on this broker(partition leader). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org