satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1293344902


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1390,7 +1424,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => 
Boolean,
                                 reason: SegmentDeletionReason): Int = {
     def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
-      highWatermark >= 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) &&
+      val upperBoundOffset = 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset)
+
+      // Check not to delete segments which are not yet copied to tiered 
storage
+      val isSegmentTieredToRemoteStorage =

Review Comment:
   Let me rephrase what you mentioned here
   
   retention.bytes= 100MB
   segment1 - 90MB
   
   When remote storage is not enabled, then this segment is not deleted from 
local log segments becuas eof the retention size check. 
   
   retention.bytes= 100MB
   local.retention.bytes= 20MB
   segment1 - 90MB
   
   When remote storage is enabled, and there are no segments uploaded to remote 
storage. That means it will not allow this segment to be deleted as it is not 
yet copied to remote storage based on the introduced check in this PR. 
   
   If it is copied to remote storage, that means it is not an active segment 
and there are one or more local segments after this segment. This segment will 
be eligible for deletion based on the local retention policy as it is already 
copied to remote storage earlier.
   
   @junrao Am I missing antyhing here?
   



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1390,7 +1424,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => 
Boolean,
                                 reason: SegmentDeletionReason): Int = {
     def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
-      highWatermark >= 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) &&
+      val upperBoundOffset = 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset)
+
+      // Check not to delete segments which are not yet copied to tiered 
storage
+      val isSegmentTieredToRemoteStorage =

Review Comment:
   Let me rephrase what you mentioned here
   
   retention.bytes= 100MB
   segment1 - 90MB
   
   When remote storage is not enabled, then this segment is not deleted from 
local log segments becuas eof the retention size check. 
   
   retention.bytes= 100MB
   local.retention.bytes= 20MB
   segment1 - 90MB
   
   When remote storage is enabled, and there are no segments uploaded to remote 
storage. That means it will not allow this segment to be deleted as it is not 
yet copied to remote storage based on the introduced check in this PR. 
   
   If it is copied to remote storage, that means it is not an active segment 
and there are one or more local segments after this segment. This segment will 
be eligible for deletion based on the local retention policy as it is already 
copied to remote storage earlier.
   
   @junrao Am I missing antyhing here?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to