jsancio commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r566463607



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -147,18 +223,102 @@ class KafkaMetadataLog(
   }
 
   override def createSnapshot(snapshotId: raft.OffsetAndEpoch): 
RawSnapshotWriter = {
-    FileRawSnapshotWriter.create(log.dir.toPath, snapshotId)
+    // Do not let the state machine create snapshots older than the latest 
snapshot
+    latestSnapshotId().ifPresent { latest =>
+      if (latest.epoch > snapshotId.epoch || latest.offset > 
snapshotId.offset) {
+        // Since snapshots are less than the high-watermark absolute offset 
comparison is okay.
+        throw new IllegalArgumentException(
+          s"Attemting to create a snapshot ($snapshotId) that is not greater 
than the latest snapshot ($latest)"
+        )
+      }
+    }
+
+    FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this))
   }
 
   override def readSnapshot(snapshotId: raft.OffsetAndEpoch): 
Optional[RawSnapshotReader] = {
     try {
-      Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+      if (snapshotIds.contains(snapshotId)) {
+        Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+      } else {
+        Optional.empty()
+      }
+    } catch {
+      case _: NoSuchFileException =>
+        Optional.empty()
+    }
+  }
+
+  override def latestSnapshotId(): Optional[raft.OffsetAndEpoch] = {
+    try {
+      Optional.of(snapshotIds.last)
     } catch {
-      case e: NoSuchFileException => Optional.empty()
+      case _: NoSuchElementException =>
+        Optional.empty()
+    }
+  }
+
+  override def oldestSnapshotId(): Optional[raft.OffsetAndEpoch] = {
+    oldestSnapshotId
+  }
+
+  override def onSnapshotFrozen(snapshotId: raft.OffsetAndEpoch): Unit = {
+    snapshotIds.add(snapshotId)
+  }
+
+  override def deleteToNewOldestSnapshotId(logStartSnapshotId: 
raft.OffsetAndEpoch): Boolean = {

Review comment:
       Picked `deleteBeforeSnapshot`.

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -113,6 +160,22 @@ class KafkaMetadataLog(
     log.truncateTo(offset)
   }
 
+  override def maybeTruncateFullyToLatestSnapshot(): Boolean = {

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to