satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1293344902
########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -1390,7 +1424,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: SegmentDeletionReason): Int = { def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = { - highWatermark >= nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) && + val upperBoundOffset = nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) + + // Check not to delete segments which are not yet copied to tiered storage + val isSegmentTieredToRemoteStorage = Review Comment: Let me rephrase what you mentioned here retention.bytes= 100MB segment1 - 90MB When remote storage is not enabled, then this segment is not deleted from local log segments becuas eof the retention size check. retention.bytes= 100MB local.retention.bytes= 20MB segment1 - 90MB When remote storage is enabled, and there are no segments uploaded to remote storage. That means it will not allow this segment to be deleted as it is not yet copied to remote storage based on the introduced check in this PR. If it is copied to remote storage, that means it is not an active segment and there are one or more local segments after this segment. This segment will be eligible for deletion based on the local retention policy as it is already copied to remote storage earlier. @junrao Am I missing antyhing here? ########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -1390,7 +1424,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: SegmentDeletionReason): Int = { def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = { - highWatermark >= nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) && + val upperBoundOffset = nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) + + // Check not to delete segments which are not yet copied to tiered storage + val isSegmentTieredToRemoteStorage = Review Comment: Let me rephrase what you mentioned here retention.bytes= 100MB segment1 - 90MB When remote storage is not enabled, then this segment is not deleted from local log segments becuas eof the retention size check. retention.bytes= 100MB local.retention.bytes= 20MB segment1 - 90MB When remote storage is enabled, and there are no segments uploaded to remote storage. That means it will not allow this segment to be deleted as it is not yet copied to remote storage based on the introduced check in this PR. If it is copied to remote storage, that means it is not an active segment and there are one or more local segments after this segment. This segment will be eligible for deletion based on the local retention policy as it is already copied to remote storage earlier. @junrao Am I missing antyhing here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org