jsancio commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r660808793



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -16,32 +16,37 @@
  */
 package kafka.raft
 
-import java.io.File
-import java.nio.file.{Files, NoSuchFileException, Path}
-import java.util.{Optional, Properties}
 import kafka.api.ApiVersion
 import kafka.log.{AppendOrigin, Log, LogConfig, LogOffsetSnapshot, 
SnapshotGenerated}
-import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, 
LogDirFailureChannel, RequestLocal}
+import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, 
KafkaConfig, LogDirFailureChannel, RequestLocal}
 import kafka.utils.{CoreUtils, Logging, Scheduler}
+import org.apache.kafka.common.config.AbstractConfig
 import org.apache.kafka.common.record.{MemoryRecords, Records}
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
 import org.apache.kafka.raft.{Isolation, LogAppendInfo, LogFetchInfo, 
LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, 
ValidOffsetAndEpoch}
 import org.apache.kafka.snapshot.{FileRawSnapshotReader, 
FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, 
Snapshots}
 
+import java.io.File
+import java.nio.file.{Files, NoSuchFileException, Path}
+import java.util.{Optional, Properties}
 import scala.annotation.nowarn
 import scala.collection.mutable
 import scala.compat.java8.OptionConverters._
 
 final class KafkaMetadataLog private (
-  log: Log,
+  val log: Log,
+  time: Time,
   scheduler: Scheduler,
   // Access to this object needs to be synchronized because it is used by the 
snapshotting thread to notify the
   // polling thread when snapshots are created. This object is also used to 
store any opened snapshot reader.
-  snapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]],
+  val snapshots: mutable.TreeMap[OffsetAndEpoch, 
Option[FileRawSnapshotReader]],

Review comment:
       I don't think we should do this. Access to this data needs to be 
synchronized.
   
   It is also very likely that this is an implementation detail that will 
change in the near future.

##########
File path: 
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
##########
@@ -244,10 +246,10 @@ public void testFetchRequestOffsetLessThanLogStart() 
throws Exception {
             assertEquals(snapshotId, snapshot.snapshotId());
             snapshot.freeze();
         }
-
+        context.log.deleteBeforeSnapshot(snapshotId);
         context.client.poll();
 
-        assertEquals(snapshotId.offset, context.log.startOffset());
+        //context.log.logStartOffset(snapshotId.offset);

Review comment:
       Commented out code.

##########
File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java
##########
@@ -316,6 +316,11 @@ public void flush() {
         lastFlushedOffset = endOffset().offset;
     }
 
+    @Override
+    public boolean maybeClean() {
+        return false;
+    }

Review comment:
       Are you planning to implement this and use this instead of 
`deleteBeforeSnapshot` below? Most of the tests in the `raft` module depend on 
`MockLog` having the same semantic as `KafkaMetadataLog`.

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -312,26 +313,152 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): 
Boolean = {
-    val (deleted, forgottenSnapshots) = snapshots synchronized {
-      latestSnapshotId().asScala match {
-        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
-          startOffset < logStartSnapshotId.offset &&
-          logStartSnapshotId.offset <= snapshotId.offset &&
-          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, 
SnapshotGenerated)) =>
+  /**
+   * Delete a snapshot, advance the log start offset, and clean old log 
segments. This will only happen if the
+   * following invariants all hold true:
+   *
+   * <li>This is not the latest snapshot (i.e., another snapshot proceeds this 
one)</li>
+   * <li>The offset of the next snapshot is greater than the log start 
offset</li>
+   * <li>The log can be advanced to the offset of the next snapshot</li>
+   *
+   * This method is not thread safe and assumes a lock on the snapshots 
collection is held
+   */
+  private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch, 
nextSnapshotIdOpt: Option[OffsetAndEpoch]): Boolean = {
+    nextSnapshotIdOpt.exists { nextSnapshotId =>
+      if (snapshots.contains(snapshotId) &&
+          snapshots.contains(nextSnapshotId) &&
+          startOffset < nextSnapshotId.offset &&
+          snapshotId.offset < nextSnapshotId.offset &&
+          log.maybeIncrementLogStartOffset(nextSnapshotId.offset, 
SnapshotGenerated)) {
+        log.deleteOldSegments()
+        val forgotten = mutable.TreeMap.empty[OffsetAndEpoch, 
Option[FileRawSnapshotReader]]
+        snapshots.remove(snapshotId) match {
+          case Some(removedSnapshot) => forgotten.put(snapshotId, 
removedSnapshot)
+          case None => throw new IllegalStateException(s"Could not remove 
snapshot $snapshotId from our cache.")
+        }
+        removeSnapshots(forgotten)
+        true
+      } else {
+        false
+      }
+    }
+  }
 
-          // Delete all segments that have a "last offset" less than the log 
start offset
-          log.deleteOldSegments()
+  /**
+   * Force all known snapshots to have an open reader so we can know their 
sizes. This method is not thread-safe
+   */
+  private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = {
+    snapshots.keys.toSeq.flatMap {
+      snapshotId => readSnapshot(snapshotId).asScala.map { reader => 
(snapshotId, reader.sizeInBytes())}
+    }
+  }
 
-          // Forget snapshots less than the log start offset
-          (true, forgetSnapshotsBefore(logStartSnapshotId))
-        case _ =>
-          (false, mutable.TreeMap.empty[OffsetAndEpoch, 
Option[FileRawSnapshotReader]])
+  /**
+   * Return the max timestamp of the first batch in a snapshot, if the 
snapshot exists and has records
+   */
+  private def firstBatchMaxTimestamp(snapshotId: OffsetAndEpoch): Option[Long] 
= {
+    readSnapshot(snapshotId).asScala.flatMap { reader =>
+      val it = reader.records().batchIterator()
+      if (it.hasNext) {
+        Some(it.next.maxTimestamp())
+      } else {
+        None
       }
     }
+  }
 
-    removeSnapshots(forgottenSnapshots)
-    deleted
+  /**
+   * Perform cleaning of old snapshots and log segments based on size.
+   *
+   * If our configured retention size has been violated, we perform cleaning 
as follows:
+   *
+   * <li>Find oldest snapshot and delete it</li>
+   * <li>Advance log start offset to end of next oldest snapshot</li>
+   * <li>Delete log segments which wholly precede the new log start offset</li>
+   *
+   * This process is repeated until the retention size is no longer violated, 
or until only
+   * a single snapshot remains.
+   */
+  override def maybeClean(): Boolean = {
+    snapshots synchronized {
+      cleanSnapshotsRetentionSize() || cleanSnapshotsRetentionMs()
+    }
+  }
+
+  /**
+   * Iterate through the snapshots a test the given predicate to see if we 
should attempt to delete it. Since
+   * we have some additional invariants regarding snapshots and log segments 
we cannot simply delete a snapshot in
+   * all cases.
+   *
+   * For the given predicate, we are testing if the snapshot identified by the 
first argument should be deleted.
+   */
+  private def cleanSnapshots(predicate: (OffsetAndEpoch, 
Option[OffsetAndEpoch]) => Boolean): Boolean = {
+    val snapshotIterator = snapshots.keys.iterator
+    var snapshotOpt = Log.nextOption(snapshotIterator)
+    var didClean = false
+    while (snapshotOpt.isDefined) {
+      val snapshot = snapshotOpt.get
+      val nextOpt = Log.nextOption(snapshotIterator)
+      if (predicate(snapshot, nextOpt)) {
+        if (deleteSnapshot(snapshot, nextOpt)) {

Review comment:
       `deleteSnapshot` is a noop if `nextOpt` is `None`. If we move that check 
here then I think we can simplify a bit of the code an for example change the 
predicate to `(OffsetAndEpoch, OffsetAndEpoch) => Boolean`.

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -312,26 +313,152 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): 
Boolean = {
-    val (deleted, forgottenSnapshots) = snapshots synchronized {
-      latestSnapshotId().asScala match {
-        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
-          startOffset < logStartSnapshotId.offset &&
-          logStartSnapshotId.offset <= snapshotId.offset &&
-          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, 
SnapshotGenerated)) =>
+  /**
+   * Delete a snapshot, advance the log start offset, and clean old log 
segments. This will only happen if the
+   * following invariants all hold true:
+   *
+   * <li>This is not the latest snapshot (i.e., another snapshot proceeds this 
one)</li>
+   * <li>The offset of the next snapshot is greater than the log start 
offset</li>
+   * <li>The log can be advanced to the offset of the next snapshot</li>
+   *
+   * This method is not thread safe and assumes a lock on the snapshots 
collection is held
+   */
+  private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch, 
nextSnapshotIdOpt: Option[OffsetAndEpoch]): Boolean = {
+    nextSnapshotIdOpt.exists { nextSnapshotId =>
+      if (snapshots.contains(snapshotId) &&
+          snapshots.contains(nextSnapshotId) &&
+          startOffset < nextSnapshotId.offset &&
+          snapshotId.offset < nextSnapshotId.offset &&
+          log.maybeIncrementLogStartOffset(nextSnapshotId.offset, 
SnapshotGenerated)) {
+        log.deleteOldSegments()
+        val forgotten = mutable.TreeMap.empty[OffsetAndEpoch, 
Option[FileRawSnapshotReader]]
+        snapshots.remove(snapshotId) match {
+          case Some(removedSnapshot) => forgotten.put(snapshotId, 
removedSnapshot)
+          case None => throw new IllegalStateException(s"Could not remove 
snapshot $snapshotId from our cache.")
+        }
+        removeSnapshots(forgotten)
+        true
+      } else {
+        false
+      }
+    }
+  }
 
-          // Delete all segments that have a "last offset" less than the log 
start offset
-          log.deleteOldSegments()
+  /**
+   * Force all known snapshots to have an open reader so we can know their 
sizes. This method is not thread-safe
+   */
+  private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = {
+    snapshots.keys.toSeq.flatMap {
+      snapshotId => readSnapshot(snapshotId).asScala.map { reader => 
(snapshotId, reader.sizeInBytes())}
+    }
+  }
 
-          // Forget snapshots less than the log start offset
-          (true, forgetSnapshotsBefore(logStartSnapshotId))
-        case _ =>
-          (false, mutable.TreeMap.empty[OffsetAndEpoch, 
Option[FileRawSnapshotReader]])
+  /**
+   * Return the max timestamp of the first batch in a snapshot, if the 
snapshot exists and has records
+   */
+  private def firstBatchMaxTimestamp(snapshotId: OffsetAndEpoch): Option[Long] 
= {
+    readSnapshot(snapshotId).asScala.flatMap { reader =>
+      val it = reader.records().batchIterator()
+      if (it.hasNext) {
+        Some(it.next.maxTimestamp())
+      } else {
+        None
       }
     }
+  }
 
-    removeSnapshots(forgottenSnapshots)
-    deleted
+  /**
+   * Perform cleaning of old snapshots and log segments based on size.
+   *
+   * If our configured retention size has been violated, we perform cleaning 
as follows:
+   *
+   * <li>Find oldest snapshot and delete it</li>
+   * <li>Advance log start offset to end of next oldest snapshot</li>
+   * <li>Delete log segments which wholly precede the new log start offset</li>
+   *
+   * This process is repeated until the retention size is no longer violated, 
or until only
+   * a single snapshot remains.
+   */
+  override def maybeClean(): Boolean = {
+    snapshots synchronized {
+      cleanSnapshotsRetentionSize() || cleanSnapshotsRetentionMs()
+    }
+  }
+
+  /**
+   * Iterate through the snapshots a test the given predicate to see if we 
should attempt to delete it. Since
+   * we have some additional invariants regarding snapshots and log segments 
we cannot simply delete a snapshot in
+   * all cases.
+   *
+   * For the given predicate, we are testing if the snapshot identified by the 
first argument should be deleted.
+   */
+  private def cleanSnapshots(predicate: (OffsetAndEpoch, 
Option[OffsetAndEpoch]) => Boolean): Boolean = {
+    val snapshotIterator = snapshots.keys.iterator
+    var snapshotOpt = Log.nextOption(snapshotIterator)
+    var didClean = false
+    while (snapshotOpt.isDefined) {
+      val snapshot = snapshotOpt.get
+      val nextOpt = Log.nextOption(snapshotIterator)
+      if (predicate(snapshot, nextOpt)) {
+        if (deleteSnapshot(snapshot, nextOpt)) {
+          didClean = true
+          snapshotOpt = nextOpt
+        } else {
+          snapshotOpt = None
+        }
+      } else {
+        snapshotOpt = None
+      }
+    }
+    didClean
+  }
+
+  private def cleanSnapshotsRetentionMs(): Boolean = {
+    if (retentionMs < 0)
+      return false
+
+    // If the timestamp of the first batch in the _next_ snapshot exceeds 
retention time, then we infer that
+    // the current snapshot also exceeds retention time. We make this 
inference to avoid reading the full snapshot.

Review comment:
       I see. Is this code simplified now that we include 
`LastContainedLogTimestamp` in the header of the snapshot?

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -312,26 +313,152 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): 
Boolean = {
-    val (deleted, forgottenSnapshots) = snapshots synchronized {
-      latestSnapshotId().asScala match {
-        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
-          startOffset < logStartSnapshotId.offset &&
-          logStartSnapshotId.offset <= snapshotId.offset &&
-          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, 
SnapshotGenerated)) =>
+  /**
+   * Delete a snapshot, advance the log start offset, and clean old log 
segments. This will only happen if the
+   * following invariants all hold true:
+   *
+   * <li>This is not the latest snapshot (i.e., another snapshot proceeds this 
one)</li>
+   * <li>The offset of the next snapshot is greater than the log start 
offset</li>
+   * <li>The log can be advanced to the offset of the next snapshot</li>
+   *
+   * This method is not thread safe and assumes a lock on the snapshots 
collection is held
+   */
+  private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch, 
nextSnapshotIdOpt: Option[OffsetAndEpoch]): Boolean = {
+    nextSnapshotIdOpt.exists { nextSnapshotId =>
+      if (snapshots.contains(snapshotId) &&
+          snapshots.contains(nextSnapshotId) &&
+          startOffset < nextSnapshotId.offset &&
+          snapshotId.offset < nextSnapshotId.offset &&
+          log.maybeIncrementLogStartOffset(nextSnapshotId.offset, 
SnapshotGenerated)) {
+        log.deleteOldSegments()
+        val forgotten = mutable.TreeMap.empty[OffsetAndEpoch, 
Option[FileRawSnapshotReader]]
+        snapshots.remove(snapshotId) match {
+          case Some(removedSnapshot) => forgotten.put(snapshotId, 
removedSnapshot)
+          case None => throw new IllegalStateException(s"Could not remove 
snapshot $snapshotId from our cache.")
+        }
+        removeSnapshots(forgotten)
+        true
+      } else {
+        false
+      }
+    }
+  }
 
-          // Delete all segments that have a "last offset" less than the log 
start offset
-          log.deleteOldSegments()
+  /**
+   * Force all known snapshots to have an open reader so we can know their 
sizes. This method is not thread-safe
+   */
+  private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = {
+    snapshots.keys.toSeq.flatMap {
+      snapshotId => readSnapshot(snapshotId).asScala.map { reader => 
(snapshotId, reader.sizeInBytes())}
+    }
+  }
 
-          // Forget snapshots less than the log start offset
-          (true, forgetSnapshotsBefore(logStartSnapshotId))
-        case _ =>
-          (false, mutable.TreeMap.empty[OffsetAndEpoch, 
Option[FileRawSnapshotReader]])
+  /**
+   * Return the max timestamp of the first batch in a snapshot, if the 
snapshot exists and has records
+   */
+  private def firstBatchMaxTimestamp(snapshotId: OffsetAndEpoch): Option[Long] 
= {

Review comment:
       PR https://github.com/apache/kafka/pull/10899 added a header that 
includes the `LastContainedLogTimestamp`. Should we change this method to use 
that instead?

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -358,6 +485,7 @@ final class KafkaMetadataLog private (
     expiredSnapshots: mutable.TreeMap[OffsetAndEpoch, 
Option[FileRawSnapshotReader]]
   ): Unit = {
     expiredSnapshots.foreach { case (snapshotId, _) =>
+      info(s"Marking snapshot $snapshotId for deletion")

Review comment:
       Let's either include the topic partition or the log dir of this log.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to