satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1213107956
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -618,6 +629,193 @@ public void run() { } } + public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { + logger.debug("Updating $topicPartition with remoteLogStartOffset: {}", remoteLogStartOffset); + updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); + } + + class RemoteLogRetentionHandler { + + private long remainingBreachedSize = 0L; + private OptionalLong logStartOffset = OptionalLong.empty(); + + public RemoteLogRetentionHandler(long remainingBreachedSize) { + this.remainingBreachedSize = remainingBreachedSize; + } + + private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean checkSizeRetention) throws RemoteStorageException, ExecutionException, InterruptedException { + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { + // Assumption that segments contain size > 0 + if (checkSizeRetention && remainingBreachedSize > 0) { + remainingBreachedSize -= x.segmentSizeInBytes(); + return remainingBreachedSize >= 0; + } else return false; + }); + if (isSegmentDeleted) { + logStartOffset = OptionalLong.of(metadata.endOffset() + 1); + logger.info("Deleted remote log segment ${metadata.remoteLogSegmentId()} due to retention size " + + "${log.config.retentionSize} breach. Log size after deletion will be " + + "${remainingBreachedSize + log.config.retentionSize}."); + } + return isSegmentDeleted; + } + + private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) + throws RemoteStorageException, ExecutionException, InterruptedException { + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); + if (isSegmentDeleted) { + remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); + logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); + } + + // No need to update the logStartOffset. + return isSegmentDeleted; + } + + // There are two cases: + // 1) When there are offline partitions and a new replica with empty disk is brought as leader, then the Review Comment: Updated the comment to make it more clear. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org