junrao commented on a change in pull request #10280: URL: https://github.com/apache/kafka/pull/10280#discussion_r643445208
########## File path: core/src/main/scala/kafka/log/Log.scala ########## @@ -1500,50 +1325,67 @@ class Log(@volatile private var _dir: File, } } - deleteOldSegments(shouldDelete, RetentionSizeBreach) + deleteOldSegments(shouldDelete, RetentionSizeBreach(this)) } private def deleteLogStartOffsetBreachedSegments(): Int = { def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = { nextSegmentOpt.exists(_.baseOffset <= logStartOffset) } - deleteOldSegments(shouldDelete, StartOffsetBreach) + deleteOldSegments(shouldDelete, StartOffsetBreach(this)) } def isFuture: Boolean = dir.getName.endsWith(Log.FutureDirSuffix) /** * The size of the log in bytes */ - def size: Long = Log.sizeInBytes(logSegments) + def size: Long = localLog.segments.sizeInBytes /** - * The offset metadata of the next message that will be appended to the log + * The offset of the next message that will be appended to the log */ - def logEndOffsetMetadata: LogOffsetMetadata = nextOffsetMetadata + def logEndOffset: Long = localLog.logEndOffset /** - * The offset of the next message that will be appended to the log + * The offset metadata of the next message that will be appended to the log */ - def logEndOffset: Long = nextOffsetMetadata.messageOffset + def logEndOffsetMetadata: LogOffsetMetadata = localLog.logEndOffsetMetadata + + private val rollAction = RollAction( Review comment: Right, I don't think there is a particular reason that we have to take a snapshot before adding a new empty segment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org