cmccabe commented on a change in pull request #10864: URL: https://github.com/apache/kafka/pull/10864#discussion_r662498528
########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -312,26 +315,142 @@ final class KafkaMetadataLog private ( } } - override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = { - val (deleted, forgottenSnapshots) = snapshots synchronized { - latestSnapshotId().asScala match { - case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) && - startOffset < logStartSnapshotId.offset && - logStartSnapshotId.offset <= snapshotId.offset && - log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) => + /** + * Delete a snapshot, advance the log start offset, and clean old log segments. This will only happen if the + * following invariants all hold true: + * + * <li>This is not the latest snapshot (i.e., another snapshot proceeds this one)</li> + * <li>The offset of the next snapshot is greater than the log start offset</li> + * <li>The log can be advanced to the offset of the next snapshot</li> + * + * This method is not thread safe and assumes a lock on the snapshots collection is held + */ + private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch, nextSnapshotId: OffsetAndEpoch): Boolean = { + if (snapshots.contains(snapshotId) && + snapshots.contains(nextSnapshotId) && + startOffset < nextSnapshotId.offset && + snapshotId.offset < nextSnapshotId.offset && + log.maybeIncrementLogStartOffset(nextSnapshotId.offset, SnapshotGenerated)) { + log.deleteOldSegments() + val forgotten = mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]] + snapshots.remove(snapshotId) match { + case Some(removedSnapshot) => forgotten.put(snapshotId, removedSnapshot) + case None => throw new IllegalStateException(s"Could not remove snapshot $snapshotId from our cache.") + } + removeSnapshots(forgotten) + true + } else { + false + } + } - // Delete all segments that have a "last offset" less than the log start offset - log.deleteOldSegments() + /** + * Force all known snapshots to have an open reader so we can know their sizes. This method is not thread-safe + */ + private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = { + snapshots.keys.toSeq.flatMap { + snapshotId => readSnapshot(snapshotId).asScala.map { reader => (snapshotId, reader.sizeInBytes())} + } + } - // Forget snapshots less than the log start offset - (true, forgetSnapshotsBefore(logStartSnapshotId)) - case _ => - (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]]) + /** + * Return the max timestamp of the first batch in a snapshot, if the snapshot exists and has records + */ + private def firstBatchMaxTimestamp(snapshotId: OffsetAndEpoch): Option[Long] = { + readSnapshot(snapshotId).asScala.flatMap { reader => + val it = reader.records().batchIterator() + if (it.hasNext) { + Some(it.next.maxTimestamp()) + } else { + None } } + } - removeSnapshots(forgottenSnapshots) - deleted + /** + * Perform cleaning of old snapshots and log segments based on size. + * + * If our configured retention size has been violated, we perform cleaning as follows: + * + * <li>Find oldest snapshot and delete it</li> + * <li>Advance log start offset to end of next oldest snapshot</li> + * <li>Delete log segments which wholly precede the new log start offset</li> + * + * This process is repeated until the retention size is no longer violated, or until only + * a single snapshot remains. + */ + override def maybeClean(): Boolean = { + snapshots synchronized { + cleanSnapshotsRetentionSize() || cleanSnapshotsRetentionMs() + } + } + + /** + * Iterate through the snapshots a test the given predicate to see if we should attempt to delete it. Since + * we have some additional invariants regarding snapshots and log segments we cannot simply delete a snapshot in + * all cases. + * + * For the given predicate, we are testing if the snapshot identified by the first argument should be deleted. + */ + private def cleanSnapshots(predicate: (OffsetAndEpoch, OffsetAndEpoch) => Boolean): Boolean = { + if (snapshots.size < 2) + return false; + + var didClean = false + snapshots.keys.toSeq.sliding(2).toSeq.takeWhile { + case Seq(snapshot: OffsetAndEpoch, nextSnapshot: OffsetAndEpoch) => + if (predicate(snapshot, nextSnapshot) && deleteSnapshot(snapshot, nextSnapshot)) { + didClean = true + true + } else { + false + } + case _ => false // Shouldn't get here with sliding(2) + } + didClean + } + + private def cleanSnapshotsRetentionMs(): Boolean = { + if (retentionMs < 0) + return false + + // If the timestamp of the first batch in the _next_ snapshot exceeds retention time, then we infer that + // the current snapshot also exceeds retention time. We make this inference to avoid reading the full snapshot. + def shouldClean(snapshotId: OffsetAndEpoch, nextSnapshotId: OffsetAndEpoch): Boolean = { + val now = time.milliseconds() + firstBatchMaxTimestamp(nextSnapshotId).exists { timestamp => + if (now - timestamp > retentionMs) { + true + } else { + false + } + } + } + + cleanSnapshots(shouldClean) + } + + private def cleanSnapshotsRetentionSize(): Boolean = { Review comment: It seems like we should be looking at the size of all files in the log directory, right? If we just look at snapshot sizes, log sizes could push us over our max. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org