junrao commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1293823449
########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -2202,19 +2262,24 @@ case class RetentionMsBreach(log: UnifiedLog) extends SegmentDeletionReason { } } -case class RetentionSizeBreach(log: UnifiedLog) extends SegmentDeletionReason { +case class RetentionSizeBreach(log: UnifiedLog, remoteLogEnabled: Boolean) extends SegmentDeletionReason { override def logReason(toDelete: List[LogSegment]): Unit = { var size = log.size toDelete.foreach { segment => size -= segment.size - log.info(s"Deleting segment $segment due to retention size ${log.config.retentionSize} breach. Log size " + + if (remoteLogEnabled) log.info(s"Deleting segment $segment due to local log retention size ${UnifiedLog.localRetentionSize(log.config)} breach. " + Review Comment: Should we do the same for RetentionMsBreach to log whether the retention time is for local retention or not? ########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -1464,12 +1513,12 @@ class UnifiedLog(@volatile var logStartOffset: Long, } } - deleteOldSegments(shouldDelete, RetentionSizeBreach(this)) + deleteOldSegments(shouldDelete, RetentionSizeBreach(this, remoteLogEnabled())) } private def deleteLogStartOffsetBreachedSegments(): Int = { def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = { - nextSegmentOpt.exists(_.baseOffset <= logStartOffset) + nextSegmentOpt.exists(_.baseOffset <= (if(remoteLogEnabled()) localLogStartOffset() else logStartOffset)) Review Comment: Yes, it just means that the segment won't be deleted until it's uploaded to the remote store. But this is probably ok. ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -698,11 +707,329 @@ public void run() { } } + public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { + if (isLeader()) { + logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); + updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); + } + } + + class RemoteLogRetentionHandler { + + private final Optional<RetentionSizeData> retentionSizeData; + private final Optional<RetentionTimeData> retentionTimeData; + + private long remainingBreachedSize; + + private OptionalLong logStartOffset = OptionalLong.empty(); + + public RemoteLogRetentionHandler(Optional<RetentionSizeData> retentionSizeData, Optional<RetentionTimeData> retentionTimeData) { + this.retentionSizeData = retentionSizeData; + this.retentionTimeData = retentionTimeData; + remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); + } + + private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { + if (!retentionSizeData.isPresent()) { + return false; + } + + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { + // Assumption that segments contain size >= 0 + if (remainingBreachedSize > 0) { + long remainingBytes = remainingBreachedSize - x.segmentSizeInBytes(); + if (remainingBytes >= 0) { + remainingBreachedSize = remainingBytes; + return true; + } + } + + return false; + }); + if (isSegmentDeleted) { + logStartOffset = OptionalLong.of(metadata.endOffset() + 1); + logger.info("Deleted remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", + metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize); + } + return isSegmentDeleted; + } + + public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) + throws RemoteStorageException, ExecutionException, InterruptedException { + if (!retentionTimeData.isPresent()) { + return false; + } + + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, + x -> x.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs); + if (isSegmentDeleted) { + remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); + // It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals + // are ascending with in an epoch. + logStartOffset = OptionalLong.of(metadata.endOffset() + 1); + logger.info("Deleted remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment", + metadata.remoteLogSegmentId(), retentionTimeData.get().retentionMs); + } + return isSegmentDeleted; + } + + private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) + throws RemoteStorageException, ExecutionException, InterruptedException { + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); + if (isSegmentDeleted && retentionSizeData.isPresent()) { + remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); + logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); + } + + return isSegmentDeleted; + } + + // It removes the segments beyond the current leader's earliest epoch. Those segments are considered as + // unreferenced because they are not part of the current leader epoch lineage. + private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> + x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch)); + if (isSegmentDeleted) { + logger.info("Deleted remote log segment {} due to leader epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and segmentEpochs: {}", + metadata.remoteLogSegmentId(), earliestEpochEntry, metadata.endOffset(), metadata.segmentLeaderEpochs().keySet()); + } + + // No need to update the log-start-offset as these epochs/offsets are earlier to that value. + return isSegmentDeleted; + } + + private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate) + throws RemoteStorageException, ExecutionException, InterruptedException { + if (predicate.test(segmentMetadata)) { + logger.info("Deleting remote log segment {}", segmentMetadata.remoteLogSegmentId()); + // Publish delete segment started event. + remoteLogMetadataManager.updateRemoteLogSegmentMetadata( + new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(), + segmentMetadata.customMetadata(), RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get(); + + // Delete the segment in remote storage. + remoteLogStorageManager.deleteLogSegmentData(segmentMetadata); + + // Publish delete segment finished event. + remoteLogMetadataManager.updateRemoteLogSegmentMetadata( + new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(), + segmentMetadata.customMetadata(), RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get(); + logger.info("Deleted remote log segment {}", segmentMetadata.remoteLogSegmentId()); + return true; + } + + return false; + } + + } + + private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException { + if (isCancelled() || !isLeader()) { + logger.info("Returning from remote log segments cleanup as the task state is changed"); + return; + } + + // Cleanup remote log segments and update the log start offset if applicable. + final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition); + if (!segmentMetadataIter.hasNext()) { + logger.debug("No remote log segments available on remote storage for partition: {}", topicIdPartition); + return; + } + + final Optional<UnifiedLog> logOptional = fetchLog.apply(topicIdPartition.topicPartition()); + if (!logOptional.isPresent()) { + logger.debug("No UnifiedLog instance available for partition: {}", topicIdPartition); + return; + } + + final UnifiedLog log = logOptional.get(); + final Option<LeaderEpochFileCache> leaderEpochCacheOption = log.leaderEpochCache(); + if (leaderEpochCacheOption.isEmpty()) { + logger.debug("No leader epoch cache available for partition: {}", topicIdPartition); + return; + } + + final Set<Integer> epochsSet = new HashSet<>(); + // Good to have an API from RLMM to get all the remote leader epochs of all the segments of a partition + // instead of going through all the segments and building it here. + while (segmentMetadataIter.hasNext()) { + RemoteLogSegmentMetadata segmentMetadata = segmentMetadataIter.next(); + epochsSet.addAll(segmentMetadata.segmentLeaderEpochs().keySet()); + } + + // All the leader epochs in sorted order that exists in remote storage + final List<Integer> remoteLeaderEpochs = new ArrayList<>(epochsSet); + Collections.sort(remoteLeaderEpochs); + + LeaderEpochFileCache leaderEpochCache = leaderEpochCacheOption.get(); + NavigableMap<Integer, Long> epochWithOffsets = leaderEpochCache.epochWithOffsets(); + Optional<EpochEntry> earliestEpochEntryOptional = leaderEpochCache.earliestEntry(); + + Optional<RetentionSizeData> retentionSizeData = buildRetentionSizeData(log.config().retentionSize, + log.onlyLocalLogSegmentsSize(), log.logEndOffset(), epochWithOffsets); + Optional<RetentionTimeData> retentionTimeData = buildRetentionTimeData(log.config().retentionMs); + + RemoteLogRetentionHandler remoteLogRetentionHandler = new RemoteLogRetentionHandler(retentionSizeData, retentionTimeData); + Iterator<Integer> epochIterator = epochWithOffsets.navigableKeySet().iterator(); + boolean isSegmentDeleted = true; + while (isSegmentDeleted && epochIterator.hasNext()) { + Integer epoch = epochIterator.next(); + Iterator<RemoteLogSegmentMetadata> segmentsIterator = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch); + while (isSegmentDeleted && segmentsIterator.hasNext()) { + if (isCancelled() || !isLeader()) { + logger.info("Returning from remote log segments cleanup for the remaining segments as the task state is changed."); + return; + } + RemoteLogSegmentMetadata metadata = segmentsIterator.next(); + + // check whether the segment contains the required epoch range with in the current leader epoch lineage. + if (isRemoteSegmentWithinLeaderEpochs(metadata, log.logEndOffset(), epochWithOffsets)) { + isSegmentDeleted = + remoteLogRetentionHandler.deleteRetentionTimeBreachedSegments(metadata) || + remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata) || + remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(metadata, log.logStartOffset()); + } + } + } + + // Remove the remote log segments whose segment-leader-epochs are less than the earliest-epoch known + // to the leader. This will remove the unreferenced segments in the remote storage. This is needed for + // unclean leader election scenarios as the remote storage can have epochs earlier to the current leader's + // earliest leader epoch. + if (earliestEpochEntryOptional.isPresent()) { + EpochEntry earliestEpochEntry = earliestEpochEntryOptional.get(); + Iterator<Integer> epochsToClean = remoteLeaderEpochs.stream().filter(x -> x < earliestEpochEntry.epoch).iterator(); + while (epochsToClean.hasNext()) { + int epoch = epochsToClean.next(); + Iterator<RemoteLogSegmentMetadata> segmentsToBeCleaned = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch); + while (segmentsToBeCleaned.hasNext()) { + if (isCancelled() || !isLeader()) { + return; + } + // No need to update the log-start-offset even though the segment is deleted as these epochs/offsets are earlier to that value. + remoteLogRetentionHandler.deleteLogSegmentsDueToLeaderEpochCacheTruncation(earliestEpochEntry, segmentsToBeCleaned.next()); + } + } + } + + // Update log start offset with the computed value after retention cleanup is done + remoteLogRetentionHandler.logStartOffset.ifPresent(offset -> handleLogStartOffsetUpdate(topicIdPartition.topicPartition(), offset)); + } + + private Optional<RetentionTimeData> buildRetentionTimeData(long retentionMs) { + return retentionMs > -1 + ? Optional.of(new RetentionTimeData(retentionMs, time.milliseconds() - retentionMs)) + : Optional.empty(); + } + + private Optional<RetentionSizeData> buildRetentionSizeData(long retentionSize, + long onlyLocalLogSegmentsSize, + long logEndOffset, + NavigableMap<Integer, Long> epochEntries) throws RemoteStorageException { + if (retentionSize > -1) { + long remoteLogSizeBytes = 0L; + for (Integer epoch : epochEntries.navigableKeySet()) { + // remoteLogSize(topicIdPartition, epochEntry.epoch) may not be completely accurate as the remote + // log size may be computed for all the segments but not for segments with in the current + // partition's leader epoch lineage. Better to revisit this API. + // remoteLogSizeBytes += remoteLogMetadataManager.remoteLogSize(topicIdPartition, epochEntry.epoch); + Iterator<RemoteLogSegmentMetadata> segmentsIterator = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch); + while (segmentsIterator.hasNext()) { + RemoteLogSegmentMetadata segmentMetadata = segmentsIterator.next(); + if (isRemoteSegmentWithinLeaderEpochs(segmentMetadata, logEndOffset, epochEntries)) { + remoteLogSizeBytes += segmentMetadata.segmentSizeInBytes(); + } + } + } + + // This is the total size of segments in local log that have their base-offset > local-log-start-offset + // and size of the segments in remote storage which have their end-offset < local-log-start-offset. + long totalSize = onlyLocalLogSegmentsSize + remoteLogSizeBytes; + if (totalSize > retentionSize) { + long remainingBreachedSize = totalSize - retentionSize; + RetentionSizeData retentionSizeData = new RetentionSizeData(retentionSize, remainingBreachedSize); + return Optional.of(retentionSizeData); + } + } + + return Optional.empty(); + } + public String toString() { return this.getClass().toString() + "[" + topicIdPartition + "]"; } } + /** + * Returns true if the remote segment's epoch/offsets are within the leader epoch lineage of the partition. + * The constraints here are as follows: + * - The segment's first epoch's offset should be more than or equal to the respective leader epoch's offset in the partition leader epoch lineage. + * - The segment's end offset should be less than or equal to the respective leader epoch's offset in the partition leader epoch lineage. + * - The segment's epoch lineage(epoch and offset) should be same as leader epoch lineage((epoch and offset)) except + * for the first and the last epochs in the segment. + * + * @param segmentMetadata The remote segment metadata to be validated. + * @param logEndOffset The log end offset of the partition. + * @param leaderEpochs The leader epoch lineage of the partition. + * @return true if the remote segment's epoch/offsets are within the leader epoch lineage of the partition. + */ + // Visible for testing + public static boolean isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata segmentMetadata, + long logEndOffset, + NavigableMap<Integer, Long> leaderEpochs) { Review Comment: We want to be a bit careful of using this method. LeaderEpochCache is mostly derived from the data in the log. However, on new leader epoch, the new leader also appends the new epoch to LeaderEpochCache before any record is appended for the epoch. This could cause a slight mis-match between the epoch chain in the remote segment and LeaderEpochCache. For example, it's possible for a LeaderEpochCache to have 10 100 11 200 //no record appended for epoch 11 12 200 where a segment's epoch chain only has 10 100 12 200 We don't want to prevent the remote segment from be deleted through the retention logic because of this slight mismatch on leader epoch chain. ########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -1390,7 +1424,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: SegmentDeletionReason): Int = { def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = { - highWatermark >= nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) && + val upperBoundOffset = nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) + + // Check not to delete segments which are not yet copied to tiered storage + val isSegmentTieredToRemoteStorage = Review Comment: @satishd : Sorry, I didn't give the right example. This is the case. Without remote storage, retention.bytes= 100MB segment1 - 200MB We will delete segment1 (even if it's the active segment). With remote storage, retention.bytes= 100MB local.retention.bytes= 20MB segment1 - 200MB If segment1 is the active segment, it won't be deleted until it rolls and is uploaded to the remote store. It's a very subtle difference. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org