cmccabe commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r662498528



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -312,26 +315,142 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): 
Boolean = {
-    val (deleted, forgottenSnapshots) = snapshots synchronized {
-      latestSnapshotId().asScala match {
-        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
-          startOffset < logStartSnapshotId.offset &&
-          logStartSnapshotId.offset <= snapshotId.offset &&
-          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, 
SnapshotGenerated)) =>
+  /**
+   * Delete a snapshot, advance the log start offset, and clean old log 
segments. This will only happen if the
+   * following invariants all hold true:
+   *
+   * <li>This is not the latest snapshot (i.e., another snapshot proceeds this 
one)</li>
+   * <li>The offset of the next snapshot is greater than the log start 
offset</li>
+   * <li>The log can be advanced to the offset of the next snapshot</li>
+   *
+   * This method is not thread safe and assumes a lock on the snapshots 
collection is held
+   */
+  private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch, nextSnapshotId: 
OffsetAndEpoch): Boolean = {
+    if (snapshots.contains(snapshotId) &&
+        snapshots.contains(nextSnapshotId) &&
+        startOffset < nextSnapshotId.offset &&
+        snapshotId.offset < nextSnapshotId.offset &&
+        log.maybeIncrementLogStartOffset(nextSnapshotId.offset, 
SnapshotGenerated)) {
+      log.deleteOldSegments()
+      val forgotten = mutable.TreeMap.empty[OffsetAndEpoch, 
Option[FileRawSnapshotReader]]
+      snapshots.remove(snapshotId) match {
+        case Some(removedSnapshot) => forgotten.put(snapshotId, 
removedSnapshot)
+        case None => throw new IllegalStateException(s"Could not remove 
snapshot $snapshotId from our cache.")
+      }
+      removeSnapshots(forgotten)
+      true
+    } else {
+      false
+    }
+  }
 
-          // Delete all segments that have a "last offset" less than the log 
start offset
-          log.deleteOldSegments()
+  /**
+   * Force all known snapshots to have an open reader so we can know their 
sizes. This method is not thread-safe
+   */
+  private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = {
+    snapshots.keys.toSeq.flatMap {
+      snapshotId => readSnapshot(snapshotId).asScala.map { reader => 
(snapshotId, reader.sizeInBytes())}
+    }
+  }
 
-          // Forget snapshots less than the log start offset
-          (true, forgetSnapshotsBefore(logStartSnapshotId))
-        case _ =>
-          (false, mutable.TreeMap.empty[OffsetAndEpoch, 
Option[FileRawSnapshotReader]])
+  /**
+   * Return the max timestamp of the first batch in a snapshot, if the 
snapshot exists and has records
+   */
+  private def firstBatchMaxTimestamp(snapshotId: OffsetAndEpoch): Option[Long] 
= {
+    readSnapshot(snapshotId).asScala.flatMap { reader =>
+      val it = reader.records().batchIterator()
+      if (it.hasNext) {
+        Some(it.next.maxTimestamp())
+      } else {
+        None
       }
     }
+  }
 
-    removeSnapshots(forgottenSnapshots)
-    deleted
+  /**
+   * Perform cleaning of old snapshots and log segments based on size.
+   *
+   * If our configured retention size has been violated, we perform cleaning 
as follows:
+   *
+   * <li>Find oldest snapshot and delete it</li>
+   * <li>Advance log start offset to end of next oldest snapshot</li>
+   * <li>Delete log segments which wholly precede the new log start offset</li>
+   *
+   * This process is repeated until the retention size is no longer violated, 
or until only
+   * a single snapshot remains.
+   */
+  override def maybeClean(): Boolean = {
+    snapshots synchronized {
+      cleanSnapshotsRetentionSize() || cleanSnapshotsRetentionMs()
+    }
+  }
+
+  /**
+   * Iterate through the snapshots a test the given predicate to see if we 
should attempt to delete it. Since
+   * we have some additional invariants regarding snapshots and log segments 
we cannot simply delete a snapshot in
+   * all cases.
+   *
+   * For the given predicate, we are testing if the snapshot identified by the 
first argument should be deleted.
+   */
+  private def cleanSnapshots(predicate: (OffsetAndEpoch, OffsetAndEpoch) => 
Boolean): Boolean = {
+    if (snapshots.size < 2)
+      return false;
+
+    var didClean = false
+    snapshots.keys.toSeq.sliding(2).toSeq.takeWhile {
+      case Seq(snapshot: OffsetAndEpoch, nextSnapshot: OffsetAndEpoch) =>
+        if (predicate(snapshot, nextSnapshot) && deleteSnapshot(snapshot, 
nextSnapshot)) {
+          didClean = true
+          true
+        } else {
+          false
+        }
+      case _ => false // Shouldn't get here with sliding(2)
+    }
+    didClean
+  }
+
+  private def cleanSnapshotsRetentionMs(): Boolean = {
+    if (retentionMs < 0)
+      return false
+
+    // If the timestamp of the first batch in the _next_ snapshot exceeds 
retention time, then we infer that
+    // the current snapshot also exceeds retention time. We make this 
inference to avoid reading the full snapshot.
+    def shouldClean(snapshotId: OffsetAndEpoch, nextSnapshotId: 
OffsetAndEpoch): Boolean = {
+      val now = time.milliseconds()
+      firstBatchMaxTimestamp(nextSnapshotId).exists { timestamp =>
+        if (now - timestamp > retentionMs) {
+          true
+        } else {
+          false
+        }
+      }
+    }
+
+    cleanSnapshots(shouldClean)
+  }
+
+  private def cleanSnapshotsRetentionSize(): Boolean = {

Review comment:
       It seems like we should be looking at the size of all files in the log 
directory, right? If we just look at snapshot sizes, log sizes could push us 
over our max.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to