jsancio commented on a change in pull request #10864: URL: https://github.com/apache/kafka/pull/10864#discussion_r660808793
########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -16,32 +16,37 @@ */ package kafka.raft -import java.io.File -import java.nio.file.{Files, NoSuchFileException, Path} -import java.util.{Optional, Properties} import kafka.api.ApiVersion import kafka.log.{AppendOrigin, Log, LogConfig, LogOffsetSnapshot, SnapshotGenerated} -import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, LogDirFailureChannel, RequestLocal} +import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, KafkaConfig, LogDirFailureChannel, RequestLocal} import kafka.utils.{CoreUtils, Logging, Scheduler} +import org.apache.kafka.common.config.AbstractConfig import org.apache.kafka.common.record.{MemoryRecords, Records} import org.apache.kafka.common.utils.Time import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid} import org.apache.kafka.raft.{Isolation, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, ValidOffsetAndEpoch} import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots} +import java.io.File +import java.nio.file.{Files, NoSuchFileException, Path} +import java.util.{Optional, Properties} import scala.annotation.nowarn import scala.collection.mutable import scala.compat.java8.OptionConverters._ final class KafkaMetadataLog private ( - log: Log, + val log: Log, + time: Time, scheduler: Scheduler, // Access to this object needs to be synchronized because it is used by the snapshotting thread to notify the // polling thread when snapshots are created. This object is also used to store any opened snapshot reader. - snapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]], + val snapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]], Review comment: I don't think we should do this. Access to this data needs to be synchronized. It is also very likely that this is an implementation detail that will change in the near future. ########## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java ########## @@ -244,10 +246,10 @@ public void testFetchRequestOffsetLessThanLogStart() throws Exception { assertEquals(snapshotId, snapshot.snapshotId()); snapshot.freeze(); } - + context.log.deleteBeforeSnapshot(snapshotId); context.client.poll(); - assertEquals(snapshotId.offset, context.log.startOffset()); + //context.log.logStartOffset(snapshotId.offset); Review comment: Commented out code. ########## File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java ########## @@ -316,6 +316,11 @@ public void flush() { lastFlushedOffset = endOffset().offset; } + @Override + public boolean maybeClean() { + return false; + } Review comment: Are you planning to implement this and use this instead of `deleteBeforeSnapshot` below? Most of the tests in the `raft` module depend on `MockLog` having the same semantic as `KafkaMetadataLog`. ########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -312,26 +313,152 @@ final class KafkaMetadataLog private ( } } - override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = { - val (deleted, forgottenSnapshots) = snapshots synchronized { - latestSnapshotId().asScala match { - case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) && - startOffset < logStartSnapshotId.offset && - logStartSnapshotId.offset <= snapshotId.offset && - log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) => + /** + * Delete a snapshot, advance the log start offset, and clean old log segments. This will only happen if the + * following invariants all hold true: + * + * <li>This is not the latest snapshot (i.e., another snapshot proceeds this one)</li> + * <li>The offset of the next snapshot is greater than the log start offset</li> + * <li>The log can be advanced to the offset of the next snapshot</li> + * + * This method is not thread safe and assumes a lock on the snapshots collection is held + */ + private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch, nextSnapshotIdOpt: Option[OffsetAndEpoch]): Boolean = { + nextSnapshotIdOpt.exists { nextSnapshotId => + if (snapshots.contains(snapshotId) && + snapshots.contains(nextSnapshotId) && + startOffset < nextSnapshotId.offset && + snapshotId.offset < nextSnapshotId.offset && + log.maybeIncrementLogStartOffset(nextSnapshotId.offset, SnapshotGenerated)) { + log.deleteOldSegments() + val forgotten = mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]] + snapshots.remove(snapshotId) match { + case Some(removedSnapshot) => forgotten.put(snapshotId, removedSnapshot) + case None => throw new IllegalStateException(s"Could not remove snapshot $snapshotId from our cache.") + } + removeSnapshots(forgotten) + true + } else { + false + } + } + } - // Delete all segments that have a "last offset" less than the log start offset - log.deleteOldSegments() + /** + * Force all known snapshots to have an open reader so we can know their sizes. This method is not thread-safe + */ + private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = { + snapshots.keys.toSeq.flatMap { + snapshotId => readSnapshot(snapshotId).asScala.map { reader => (snapshotId, reader.sizeInBytes())} + } + } - // Forget snapshots less than the log start offset - (true, forgetSnapshotsBefore(logStartSnapshotId)) - case _ => - (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]]) + /** + * Return the max timestamp of the first batch in a snapshot, if the snapshot exists and has records + */ + private def firstBatchMaxTimestamp(snapshotId: OffsetAndEpoch): Option[Long] = { + readSnapshot(snapshotId).asScala.flatMap { reader => + val it = reader.records().batchIterator() + if (it.hasNext) { + Some(it.next.maxTimestamp()) + } else { + None } } + } - removeSnapshots(forgottenSnapshots) - deleted + /** + * Perform cleaning of old snapshots and log segments based on size. + * + * If our configured retention size has been violated, we perform cleaning as follows: + * + * <li>Find oldest snapshot and delete it</li> + * <li>Advance log start offset to end of next oldest snapshot</li> + * <li>Delete log segments which wholly precede the new log start offset</li> + * + * This process is repeated until the retention size is no longer violated, or until only + * a single snapshot remains. + */ + override def maybeClean(): Boolean = { + snapshots synchronized { + cleanSnapshotsRetentionSize() || cleanSnapshotsRetentionMs() + } + } + + /** + * Iterate through the snapshots a test the given predicate to see if we should attempt to delete it. Since + * we have some additional invariants regarding snapshots and log segments we cannot simply delete a snapshot in + * all cases. + * + * For the given predicate, we are testing if the snapshot identified by the first argument should be deleted. + */ + private def cleanSnapshots(predicate: (OffsetAndEpoch, Option[OffsetAndEpoch]) => Boolean): Boolean = { + val snapshotIterator = snapshots.keys.iterator + var snapshotOpt = Log.nextOption(snapshotIterator) + var didClean = false + while (snapshotOpt.isDefined) { + val snapshot = snapshotOpt.get + val nextOpt = Log.nextOption(snapshotIterator) + if (predicate(snapshot, nextOpt)) { + if (deleteSnapshot(snapshot, nextOpt)) { Review comment: `deleteSnapshot` is a noop if `nextOpt` is `None`. If we move that check here then I think we can simplify a bit of the code an for example change the predicate to `(OffsetAndEpoch, OffsetAndEpoch) => Boolean`. ########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -312,26 +313,152 @@ final class KafkaMetadataLog private ( } } - override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = { - val (deleted, forgottenSnapshots) = snapshots synchronized { - latestSnapshotId().asScala match { - case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) && - startOffset < logStartSnapshotId.offset && - logStartSnapshotId.offset <= snapshotId.offset && - log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) => + /** + * Delete a snapshot, advance the log start offset, and clean old log segments. This will only happen if the + * following invariants all hold true: + * + * <li>This is not the latest snapshot (i.e., another snapshot proceeds this one)</li> + * <li>The offset of the next snapshot is greater than the log start offset</li> + * <li>The log can be advanced to the offset of the next snapshot</li> + * + * This method is not thread safe and assumes a lock on the snapshots collection is held + */ + private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch, nextSnapshotIdOpt: Option[OffsetAndEpoch]): Boolean = { + nextSnapshotIdOpt.exists { nextSnapshotId => + if (snapshots.contains(snapshotId) && + snapshots.contains(nextSnapshotId) && + startOffset < nextSnapshotId.offset && + snapshotId.offset < nextSnapshotId.offset && + log.maybeIncrementLogStartOffset(nextSnapshotId.offset, SnapshotGenerated)) { + log.deleteOldSegments() + val forgotten = mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]] + snapshots.remove(snapshotId) match { + case Some(removedSnapshot) => forgotten.put(snapshotId, removedSnapshot) + case None => throw new IllegalStateException(s"Could not remove snapshot $snapshotId from our cache.") + } + removeSnapshots(forgotten) + true + } else { + false + } + } + } - // Delete all segments that have a "last offset" less than the log start offset - log.deleteOldSegments() + /** + * Force all known snapshots to have an open reader so we can know their sizes. This method is not thread-safe + */ + private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = { + snapshots.keys.toSeq.flatMap { + snapshotId => readSnapshot(snapshotId).asScala.map { reader => (snapshotId, reader.sizeInBytes())} + } + } - // Forget snapshots less than the log start offset - (true, forgetSnapshotsBefore(logStartSnapshotId)) - case _ => - (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]]) + /** + * Return the max timestamp of the first batch in a snapshot, if the snapshot exists and has records + */ + private def firstBatchMaxTimestamp(snapshotId: OffsetAndEpoch): Option[Long] = { + readSnapshot(snapshotId).asScala.flatMap { reader => + val it = reader.records().batchIterator() + if (it.hasNext) { + Some(it.next.maxTimestamp()) + } else { + None } } + } - removeSnapshots(forgottenSnapshots) - deleted + /** + * Perform cleaning of old snapshots and log segments based on size. + * + * If our configured retention size has been violated, we perform cleaning as follows: + * + * <li>Find oldest snapshot and delete it</li> + * <li>Advance log start offset to end of next oldest snapshot</li> + * <li>Delete log segments which wholly precede the new log start offset</li> + * + * This process is repeated until the retention size is no longer violated, or until only + * a single snapshot remains. + */ + override def maybeClean(): Boolean = { + snapshots synchronized { + cleanSnapshotsRetentionSize() || cleanSnapshotsRetentionMs() + } + } + + /** + * Iterate through the snapshots a test the given predicate to see if we should attempt to delete it. Since + * we have some additional invariants regarding snapshots and log segments we cannot simply delete a snapshot in + * all cases. + * + * For the given predicate, we are testing if the snapshot identified by the first argument should be deleted. + */ + private def cleanSnapshots(predicate: (OffsetAndEpoch, Option[OffsetAndEpoch]) => Boolean): Boolean = { + val snapshotIterator = snapshots.keys.iterator + var snapshotOpt = Log.nextOption(snapshotIterator) + var didClean = false + while (snapshotOpt.isDefined) { + val snapshot = snapshotOpt.get + val nextOpt = Log.nextOption(snapshotIterator) + if (predicate(snapshot, nextOpt)) { + if (deleteSnapshot(snapshot, nextOpt)) { + didClean = true + snapshotOpt = nextOpt + } else { + snapshotOpt = None + } + } else { + snapshotOpt = None + } + } + didClean + } + + private def cleanSnapshotsRetentionMs(): Boolean = { + if (retentionMs < 0) + return false + + // If the timestamp of the first batch in the _next_ snapshot exceeds retention time, then we infer that + // the current snapshot also exceeds retention time. We make this inference to avoid reading the full snapshot. Review comment: I see. Is this code simplified now that we include `LastContainedLogTimestamp` in the header of the snapshot? ########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -312,26 +313,152 @@ final class KafkaMetadataLog private ( } } - override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = { - val (deleted, forgottenSnapshots) = snapshots synchronized { - latestSnapshotId().asScala match { - case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) && - startOffset < logStartSnapshotId.offset && - logStartSnapshotId.offset <= snapshotId.offset && - log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) => + /** + * Delete a snapshot, advance the log start offset, and clean old log segments. This will only happen if the + * following invariants all hold true: + * + * <li>This is not the latest snapshot (i.e., another snapshot proceeds this one)</li> + * <li>The offset of the next snapshot is greater than the log start offset</li> + * <li>The log can be advanced to the offset of the next snapshot</li> + * + * This method is not thread safe and assumes a lock on the snapshots collection is held + */ + private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch, nextSnapshotIdOpt: Option[OffsetAndEpoch]): Boolean = { + nextSnapshotIdOpt.exists { nextSnapshotId => + if (snapshots.contains(snapshotId) && + snapshots.contains(nextSnapshotId) && + startOffset < nextSnapshotId.offset && + snapshotId.offset < nextSnapshotId.offset && + log.maybeIncrementLogStartOffset(nextSnapshotId.offset, SnapshotGenerated)) { + log.deleteOldSegments() + val forgotten = mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]] + snapshots.remove(snapshotId) match { + case Some(removedSnapshot) => forgotten.put(snapshotId, removedSnapshot) + case None => throw new IllegalStateException(s"Could not remove snapshot $snapshotId from our cache.") + } + removeSnapshots(forgotten) + true + } else { + false + } + } + } - // Delete all segments that have a "last offset" less than the log start offset - log.deleteOldSegments() + /** + * Force all known snapshots to have an open reader so we can know their sizes. This method is not thread-safe + */ + private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = { + snapshots.keys.toSeq.flatMap { + snapshotId => readSnapshot(snapshotId).asScala.map { reader => (snapshotId, reader.sizeInBytes())} + } + } - // Forget snapshots less than the log start offset - (true, forgetSnapshotsBefore(logStartSnapshotId)) - case _ => - (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]]) + /** + * Return the max timestamp of the first batch in a snapshot, if the snapshot exists and has records + */ + private def firstBatchMaxTimestamp(snapshotId: OffsetAndEpoch): Option[Long] = { Review comment: PR https://github.com/apache/kafka/pull/10899 added a header that includes the `LastContainedLogTimestamp`. Should we change this method to use that instead? ########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -358,6 +485,7 @@ final class KafkaMetadataLog private ( expiredSnapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]] ): Unit = { expiredSnapshots.foreach { case (snapshotId, _) => + info(s"Marking snapshot $snapshotId for deletion") Review comment: Let's either include the topic partition or the log dir of this log. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org