satishd commented on code in PR #14905: URL: https://github.com/apache/kafka/pull/14905#discussion_r1417134578
########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -1443,11 +1443,14 @@ class UnifiedLog(@volatile var logStartOffset: Long, * @return the segments ready to be deleted */ private[log] def deletableSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean): Iterable[LogSegment] = { - def isSegmentEligibleForDeletion(upperBoundOffset: Long): Boolean = { + def isSegmentEligibleForDeletion(nextSegmentOpt: Option[LogSegment], upperBoundOffset: Long): Boolean = { + val allowDeletionDueToLogStartOffsetIncremented = nextSegmentOpt.isDefined && logStartOffset >= nextSegmentOpt.get.baseOffset // Segments are eligible for deletion when: // 1. they are uploaded to the remote storage + // 2. log-start-offset was incremented higher than the largest offset in the candidate segment if (remoteLogEnabled()) { - upperBoundOffset > 0 && upperBoundOffset - 1 <= highestOffsetInRemoteStorage + (upperBoundOffset > 0 && upperBoundOffset - 1 <= highestOffsetInRemoteStorage) || + allowDeletionDueToLogStartOffsetIncremented Review Comment: Note: This check can not be added as part of `deleteLogStartOffsetBreachedSegments()` as the current `upperBoundOffset > 0 && upperBoundOffset - 1 <= highestOffsetInRemoteStorage` becomes false and does not allow the segment to be deleted. Added a note here that may help give clarity to the readers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org