satishd commented on code in PR #14905:
URL: https://github.com/apache/kafka/pull/14905#discussion_r1417134578


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1443,11 +1443,14 @@ class UnifiedLog(@volatile var logStartOffset: Long,
    * @return the segments ready to be deleted
    */
   private[log] def deletableSegments(predicate: (LogSegment, 
Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
-    def isSegmentEligibleForDeletion(upperBoundOffset: Long): Boolean = {
+    def isSegmentEligibleForDeletion(nextSegmentOpt: Option[LogSegment], 
upperBoundOffset: Long): Boolean = {
+      val allowDeletionDueToLogStartOffsetIncremented = 
nextSegmentOpt.isDefined && logStartOffset >= nextSegmentOpt.get.baseOffset
       // Segments are eligible for deletion when:
       //    1. they are uploaded to the remote storage
+      //    2. log-start-offset was incremented higher than the largest offset 
in the candidate segment
       if (remoteLogEnabled()) {
-        upperBoundOffset > 0 && upperBoundOffset - 1 <= 
highestOffsetInRemoteStorage
+        (upperBoundOffset > 0 && upperBoundOffset - 1 <= 
highestOffsetInRemoteStorage) ||
+          allowDeletionDueToLogStartOffsetIncremented

Review Comment:
   Note: This check can not be added as part of 
`deleteLogStartOffsetBreachedSegments()` as the current  `upperBoundOffset > 0 
&& upperBoundOffset - 1 <= highestOffsetInRemoteStorage` becomes false and does 
not allow the segment to be deleted. 
   Added a note here that may help give clarity to the readers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to