mumrah commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r664594269



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -312,28 +311,151 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): 
Boolean = {
+  /**
+   * Delete snapshots that come before a given snapshot ID. This is done by 
advancing the log start offset to the given
+   * snapshot and cleaning old log segments.
+   *
+   * This will only happen if the following invariants all hold true:
+   *
+   * <li>The given snapshot precedes the latest snapshot</li>
+   * <li>The offset of the given snapshot is greater than the log start 
offset</li>
+   * <li>The log layer can advance the offset to the given snapshot</li>
+   *
+   * This method is thread-safe
+   */
+  override def deleteBeforeSnapshot(snapshotId: OffsetAndEpoch): Boolean = {
     val (deleted, forgottenSnapshots) = snapshots synchronized {
       latestSnapshotId().asScala match {
-        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
-          startOffset < logStartSnapshotId.offset &&
-          logStartSnapshotId.offset <= snapshotId.offset &&
-          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, 
SnapshotGenerated)) =>
-
-          // Delete all segments that have a "last offset" less than the log 
start offset
-          log.deleteOldSegments()
-
-          // Forget snapshots less than the log start offset
-          (true, forgetSnapshotsBefore(logStartSnapshotId))
+        case Some(latestSnapshotId) if
+          snapshots.contains(snapshotId) &&
+          startOffset < snapshotId.offset &&
+          snapshotId.offset <= latestSnapshotId.offset &&
+          log.maybeIncrementLogStartOffset(snapshotId.offset, 
SnapshotGenerated) =>
+            // Delete all segments that have a "last offset" less than the log 
start offset
+            log.deleteOldSegments()
+            // Remove older snapshots from the snapshots cache
+            (true, forgetSnapshotsBefore(snapshotId))
         case _ =>
-          (false, mutable.TreeMap.empty[OffsetAndEpoch, 
Option[FileRawSnapshotReader]])
+            (false, mutable.TreeMap.empty[OffsetAndEpoch, 
Option[FileRawSnapshotReader]])
       }
     }
-
     removeSnapshots(forgottenSnapshots)
     deleted
   }
 
+  /**
+   * Force all known snapshots to have an open reader so we can know their 
sizes. This method is not thread-safe
+   */
+  private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = {
+    snapshots.keys.toSeq.flatMap {
+      snapshotId => readSnapshot(snapshotId).asScala.map { reader => 
(snapshotId, reader.sizeInBytes())}
+    }
+  }
+
+  /**
+   * Return the max timestamp of the first batch in a snapshot, if the 
snapshot exists and has records
+   */
+  private def readSnapshotTimestamp(snapshotId: OffsetAndEpoch): Option[Long] 
= {
+    readSnapshot(snapshotId).asScala.flatMap { reader =>
+      val batchIterator = reader.records().batchIterator()
+
+      val firstBatch = batchIterator.next()
+      val records = firstBatch.streamingIterator(new 
BufferSupplier.GrowableBufferSupplier())
+      if (firstBatch.isControlBatch) {
+        val header = 
ControlRecordUtils.deserializedSnapshotHeaderRecord(records.next());
+        Some(header.lastContainedLogTimestamp())
+      } else {
+        warn("Did not find control record at beginning of snapshot")
+        None
+      }
+    }
+  }
+
+  /**
+   * Perform cleaning of old snapshots and log segments based on size.
+   *
+   * If our configured retention size has been violated, we perform cleaning 
as follows:
+   *
+   * <li>Find oldest snapshot and delete it</li>
+   * <li>Advance log start offset to end of next oldest snapshot</li>
+   * <li>Delete log segments which wholly precede the new log start offset</li>
+   *
+   * This process is repeated until the retention size is no longer violated, 
or until only
+   * a single snapshot remains.
+   */
+  override def maybeClean(): Boolean = {
+    snapshots synchronized {
+      cleanSnapshotsRetentionSize() || cleanSnapshotsRetentionMs()

Review comment:
       Good catch, @dengziming. We should always try to clean both ways. I'll 
rewrite this part (since I don't like using `|` as it's fairly uncommon)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to