dengziming commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r663445933
##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -312,28 +311,151 @@ final class KafkaMetadataLog private (
}
}
- override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch):
Boolean = {
+ /**
+ * Delete snapshots that come before a given snapshot ID. This is done by
advancing the log start offset to the given
+ * snapshot and cleaning old log segments.
+ *
+ * This will only happen if the following invariants all hold true:
+ *
+ * <li>The given snapshot precedes the latest snapshot</li>
+ * <li>The offset of the given snapshot is greater than the log start
offset</li>
+ * <li>The log layer can advance the offset to the given snapshot</li>
+ *
+ * This method is thread-safe
+ */
+ override def deleteBeforeSnapshot(snapshotId: OffsetAndEpoch): Boolean = {
val (deleted, forgottenSnapshots) = snapshots synchronized {
latestSnapshotId().asScala match {
- case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
- startOffset < logStartSnapshotId.offset &&
- logStartSnapshotId.offset <= snapshotId.offset &&
- log.maybeIncrementLogStartOffset(logStartSnapshotId.offset,
SnapshotGenerated)) =>
-
- // Delete all segments that have a "last offset" less than the log
start offset
- log.deleteOldSegments()
-
- // Forget snapshots less than the log start offset
- (true, forgetSnapshotsBefore(logStartSnapshotId))
+ case Some(latestSnapshotId) if
+ snapshots.contains(snapshotId) &&
+ startOffset < snapshotId.offset &&
+ snapshotId.offset <= latestSnapshotId.offset &&
+ log.maybeIncrementLogStartOffset(snapshotId.offset,
SnapshotGenerated) =>
+ // Delete all segments that have a "last offset" less than the log
start offset
+ log.deleteOldSegments()
+ // Remove older snapshots from the snapshots cache
+ (true, forgetSnapshotsBefore(snapshotId))
case _ =>
- (false, mutable.TreeMap.empty[OffsetAndEpoch,
Option[FileRawSnapshotReader]])
+ (false, mutable.TreeMap.empty[OffsetAndEpoch,
Option[FileRawSnapshotReader]])
}
}
-
removeSnapshots(forgottenSnapshots)
deleted
}
+ /**
+ * Force all known snapshots to have an open reader so we can know their
sizes. This method is not thread-safe
+ */
+ private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = {
+ snapshots.keys.toSeq.flatMap {
+ snapshotId => readSnapshot(snapshotId).asScala.map { reader =>
(snapshotId, reader.sizeInBytes())}
+ }
+ }
+
+ /**
+ * Return the max timestamp of the first batch in a snapshot, if the
snapshot exists and has records
+ */
+ private def readSnapshotTimestamp(snapshotId: OffsetAndEpoch): Option[Long]
= {
+ readSnapshot(snapshotId).asScala.flatMap { reader =>
+ val batchIterator = reader.records().batchIterator()
+
+ val firstBatch = batchIterator.next()
+ val records = firstBatch.streamingIterator(new
BufferSupplier.GrowableBufferSupplier())
+ if (firstBatch.isControlBatch) {
+ val header =
ControlRecordUtils.deserializedSnapshotHeaderRecord(records.next());
+ Some(header.lastContainedLogTimestamp())
+ } else {
+ warn("Did not find control record at beginning of snapshot")
+ None
+ }
+ }
+ }
+
+ /**
+ * Perform cleaning of old snapshots and log segments based on size.
+ *
+ * If our configured retention size has been violated, we perform cleaning
as follows:
+ *
+ * <li>Find oldest snapshot and delete it</li>
+ * <li>Advance log start offset to end of next oldest snapshot</li>
+ * <li>Delete log segments which wholly precede the new log start offset</li>
+ *
+ * This process is repeated until the retention size is no longer violated,
or until only
+ * a single snapshot remains.
+ */
+ override def maybeClean(): Boolean = {
+ snapshots synchronized {
+ cleanSnapshotsRetentionSize() || cleanSnapshotsRetentionMs()
+ }
+ }
+
+ /**
+ * Iterate through the snapshots a test the given predicate to see if we
should attempt to delete it. Since
+ * we have some additional invariants regarding snapshots and log segments
we cannot simply delete a snapshot in
+ * all cases.
+ *
+ * For the given predicate, we are testing if the snapshot identified by the
first argument should be deleted.
+ */
+ private def cleanSnapshots(predicate: (OffsetAndEpoch) => Boolean): Boolean
= {
+ if (snapshots.size < 2)
+ return false;
Review comment:
nit: unnecessary ";"
##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -312,28 +311,151 @@ final class KafkaMetadataLog private (
}
}
- override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch):
Boolean = {
+ /**
+ * Delete snapshots that come before a given snapshot ID. This is done by
advancing the log start offset to the given
+ * snapshot and cleaning old log segments.
+ *
+ * This will only happen if the following invariants all hold true:
+ *
+ * <li>The given snapshot precedes the latest snapshot</li>
+ * <li>The offset of the given snapshot is greater than the log start
offset</li>
+ * <li>The log layer can advance the offset to the given snapshot</li>
+ *
+ * This method is thread-safe
+ */
+ override def deleteBeforeSnapshot(snapshotId: OffsetAndEpoch): Boolean = {
val (deleted, forgottenSnapshots) = snapshots synchronized {
latestSnapshotId().asScala match {
- case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
- startOffset < logStartSnapshotId.offset &&
- logStartSnapshotId.offset <= snapshotId.offset &&
- log.maybeIncrementLogStartOffset(logStartSnapshotId.offset,
SnapshotGenerated)) =>
-
- // Delete all segments that have a "last offset" less than the log
start offset
- log.deleteOldSegments()
-
- // Forget snapshots less than the log start offset
- (true, forgetSnapshotsBefore(logStartSnapshotId))
+ case Some(latestSnapshotId) if
+ snapshots.contains(snapshotId) &&
+ startOffset < snapshotId.offset &&
+ snapshotId.offset <= latestSnapshotId.offset &&
+ log.maybeIncrementLogStartOffset(snapshotId.offset,
SnapshotGenerated) =>
+ // Delete all segments that have a "last offset" less than the log
start offset
+ log.deleteOldSegments()
+ // Remove older snapshots from the snapshots cache
+ (true, forgetSnapshotsBefore(snapshotId))
case _ =>
- (false, mutable.TreeMap.empty[OffsetAndEpoch,
Option[FileRawSnapshotReader]])
+ (false, mutable.TreeMap.empty[OffsetAndEpoch,
Option[FileRawSnapshotReader]])
}
}
-
removeSnapshots(forgottenSnapshots)
deleted
}
+ /**
+ * Force all known snapshots to have an open reader so we can know their
sizes. This method is not thread-safe
+ */
+ private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = {
+ snapshots.keys.toSeq.flatMap {
+ snapshotId => readSnapshot(snapshotId).asScala.map { reader =>
(snapshotId, reader.sizeInBytes())}
+ }
+ }
+
+ /**
+ * Return the max timestamp of the first batch in a snapshot, if the
snapshot exists and has records
+ */
+ private def readSnapshotTimestamp(snapshotId: OffsetAndEpoch): Option[Long]
= {
+ readSnapshot(snapshotId).asScala.flatMap { reader =>
+ val batchIterator = reader.records().batchIterator()
+
+ val firstBatch = batchIterator.next()
+ val records = firstBatch.streamingIterator(new
BufferSupplier.GrowableBufferSupplier())
+ if (firstBatch.isControlBatch) {
+ val header =
ControlRecordUtils.deserializedSnapshotHeaderRecord(records.next());
+ Some(header.lastContainedLogTimestamp())
+ } else {
+ warn("Did not find control record at beginning of snapshot")
+ None
+ }
+ }
+ }
+
+ /**
+ * Perform cleaning of old snapshots and log segments based on size.
+ *
+ * If our configured retention size has been violated, we perform cleaning
as follows:
+ *
+ * <li>Find oldest snapshot and delete it</li>
+ * <li>Advance log start offset to end of next oldest snapshot</li>
+ * <li>Delete log segments which wholly precede the new log start offset</li>
+ *
+ * This process is repeated until the retention size is no longer violated,
or until only
+ * a single snapshot remains.
+ */
+ override def maybeClean(): Boolean = {
+ snapshots synchronized {
+ cleanSnapshotsRetentionSize() || cleanSnapshotsRetentionMs()
Review comment:
can we use | to replace || here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]