mumrah commented on code in PR #15478:
URL: https://github.com/apache/kafka/pull/15478#discussion_r1523630806


##########
core/src/main/scala/kafka/raft/KafkaMetadataLog.scala:
##########
@@ -404,21 +414,33 @@ final class KafkaMetadataLog private (
    * all cases.
    *
    * For the given predicate, we are testing if the snapshot identified by the 
first argument should be deleted.
+   * The predicate returns a Some with the reason if the snapshot should be 
deleted and a None if the snapshot
+   * should not be deleted
    */
-  private def cleanSnapshots(predicate: OffsetAndEpoch => Boolean): Boolean = {
-    if (snapshots.size < 2)
+  private def cleanSnapshots(predicate: OffsetAndEpoch => 
Option[SnapshotDeletionReason]): Boolean = {
+    if (snapshots.size < 2) {
       return false
+    }
 
     var didClean = false
     snapshots.keys.toSeq.sliding(2).foreach {
       case Seq(snapshot: OffsetAndEpoch, nextSnapshot: OffsetAndEpoch) =>
-        if (predicate(snapshot) && deleteBeforeSnapshot(nextSnapshot)) {
-          didClean = true
-        } else {
-          return didClean
+        predicate(snapshot) match {
+          case Some(reason) =>
+            if (deleteBeforeSnapshot(nextSnapshot, reason)) {

Review Comment:
   I assume `&&` will short-circuit in Scala like it does in Java (in which 
case this change looks correct)



##########
core/src/main/scala/kafka/raft/KafkaMetadataLog.scala:
##########
@@ -677,4 +675,38 @@ object KafkaMetadataLog extends Logging {
       Snapshots.deleteIfExists(logDir, snapshotId)
     }
   }
+
+  private sealed trait SnapshotDeletionReason {
+    def reason(snapshotId: OffsetAndEpoch): String
+  }
+
+  private final case class RetentionMsBreach(now: Long, timestamp: Long, 
retentionMillis: Long) extends SnapshotDeletionReason {
+    override def reason(snapshotId: OffsetAndEpoch): String = {
+      s"""Marking snapshot $snapshotId for deletion because it timestamp 
($timestamp) is now ($now) older than the
+          |retention ($retentionMillis""".stripMargin

Review Comment:
   is there a missing ")" here, or should we not have the opening "("?



##########
core/src/main/scala/kafka/raft/KafkaMetadataLog.scala:
##########
@@ -348,14 +357,15 @@ final class KafkaMetadataLog private (
           snapshotId.offset <= latestSnapshotId.offset &&
           log.maybeIncrementLogStartOffset(snapshotId.offset, 
LogStartOffsetIncrementReason.SnapshotGenerated) =>
             // Delete all segments that have a "last offset" less than the log 
start offset
-            log.deleteOldSegments()
+            val deletedSegments = log.deleteOldSegments()
             // Remove older snapshots from the snapshots cache
-            (true, forgetSnapshotsBefore(snapshotId))
+            val forgottenSnapshots = forgetSnapshotsBefore(snapshotId)
+            (deletedSegments != 0 || forgottenSnapshots.nonEmpty, 
forgottenSnapshots)

Review Comment:
   Interesting, did we have a bug here previously? Looks like before we would 
always report `deleted=true` even if nothing was deleted. I wonder if we even 
use this return value 🤔 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to