kowshik commented on a change in pull request #10280: URL: https://github.com/apache/kafka/pull/10280#discussion_r645434947
########## File path: core/src/main/scala/kafka/log/Log.scala ########## @@ -1500,50 +1325,67 @@ class Log(@volatile private var _dir: File, } } - deleteOldSegments(shouldDelete, RetentionSizeBreach) + deleteOldSegments(shouldDelete, RetentionSizeBreach(this)) } private def deleteLogStartOffsetBreachedSegments(): Int = { def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = { nextSegmentOpt.exists(_.baseOffset <= logStartOffset) } - deleteOldSegments(shouldDelete, StartOffsetBreach) + deleteOldSegments(shouldDelete, StartOffsetBreach(this)) } def isFuture: Boolean = dir.getName.endsWith(Log.FutureDirSuffix) /** * The size of the log in bytes */ - def size: Long = Log.sizeInBytes(logSegments) + def size: Long = localLog.segments.sizeInBytes /** - * The offset metadata of the next message that will be appended to the log + * The offset of the next message that will be appended to the log */ - def logEndOffsetMetadata: LogOffsetMetadata = nextOffsetMetadata + def logEndOffset: Long = localLog.logEndOffset /** - * The offset of the next message that will be appended to the log + * The offset metadata of the next message that will be appended to the log */ - def logEndOffset: Long = nextOffsetMetadata.messageOffset + def logEndOffsetMetadata: LogOffsetMetadata = localLog.logEndOffsetMetadata + + private val rollAction = RollAction( + preRollAction = (newSegment: LogSegment) => { + // Take a snapshot of the producer state to facilitate recovery. It is useful to have the snapshot + // offset align with the new segment offset since this ensures we can recover the segment by beginning + // with the corresponding snapshot file and scanning the segment data. Because the segment base offset + // may actually be ahead of the current producer state end offset (which corresponds to the log end offset), + // we manually override the state offset here prior to taking the snapshot. + producerStateManager.updateMapEndOffset(newSegment.baseOffset) + producerStateManager.takeSnapshot() + }, + postRollAction = (newSegment: LogSegment, deletedSegment: Option[LogSegment]) => { + deletedSegment.foreach(segment => deleteProducerSnapshotAsync(Seq(segment))) Review comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org