jsancio commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r662550101



##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1048,9 +1058,14 @@ object KafkaConfig {
       .defineInternal(InitialBrokerRegistrationTimeoutMsProp, INT, 
Defaults.InitialBrokerRegistrationTimeoutMs, null, MEDIUM, 
InitialBrokerRegistrationTimeoutMsDoc)
       .defineInternal(BrokerHeartbeatIntervalMsProp, INT, 
Defaults.BrokerHeartbeatIntervalMs, null, MEDIUM, BrokerHeartbeatIntervalMsDoc)
       .defineInternal(BrokerSessionTimeoutMsProp, INT, 
Defaults.BrokerSessionTimeoutMs, null, MEDIUM, BrokerSessionTimeoutMsDoc)
-      .defineInternal(MetadataLogDirProp, STRING, null, null, HIGH, 
MetadataLogDirDoc)
       .defineInternal(ControllerListenerNamesProp, STRING, null, null, HIGH, 
ControllerListenerNamesDoc)
       .defineInternal(SaslMechanismControllerProtocolProp, STRING, 
SaslConfigs.DEFAULT_SASL_MECHANISM, null, HIGH, 
SaslMechanismControllerProtocolDoc)
+      .defineInternal(MetadataLogDirProp, STRING, null, null, HIGH, 
MetadataLogDirDoc)
+      .defineInternal(MetadataLogSegmentBytesProp, INT, 
Defaults.LogSegmentBytes, atLeast(Records.LOG_OVERHEAD), HIGH, 
MetadataLogSegmentBytesDoc)
+      .defineInternal(MetadataLogSegmentMillisProp, LONG, 
Defaults.LogRollHours * 60 * 60 * 1000L, null, HIGH, 
MetadataLogSegmentMillisDoc)
+      .defineInternal(MetadataMaxRetentionBytesProp, LONG, 
Defaults.LogRetentionBytes, null, HIGH, MetadataMaxRetentionBytesDoc)
+      .defineInternal(MetadataMaxRetentionMillisProp, LONG, null, null, HIGH, 
MetadataMaxRetentionMillisDoc)

Review comment:
       Are we going to keep the properties added in this PR as internal after 
3.0. If not, let's just make them public now.
   
   Do we need a default value to the retention milliseconds property?

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -2651,7 +2651,7 @@ object Log extends Logging {
    * @tparam T the type of object held within the iterator
    * @return Some(iterator.next) if a next element exists, None otherwise.
    */
-  private def nextOption[T](iterator: Iterator[T]): Option[T] = {
+  def nextOption[T](iterator: Iterator[T]): Option[T] = {

Review comment:
       I don't think you need to make this public. Scala's `Iterator` has 
`nextOption`.

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -312,26 +315,142 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): 
Boolean = {
-    val (deleted, forgottenSnapshots) = snapshots synchronized {
-      latestSnapshotId().asScala match {
-        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
-          startOffset < logStartSnapshotId.offset &&
-          logStartSnapshotId.offset <= snapshotId.offset &&
-          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, 
SnapshotGenerated)) =>
+  /**
+   * Delete a snapshot, advance the log start offset, and clean old log 
segments. This will only happen if the
+   * following invariants all hold true:
+   *
+   * <li>This is not the latest snapshot (i.e., another snapshot proceeds this 
one)</li>
+   * <li>The offset of the next snapshot is greater than the log start 
offset</li>
+   * <li>The log can be advanced to the offset of the next snapshot</li>
+   *
+   * This method is not thread safe and assumes a lock on the snapshots 
collection is held
+   */
+  private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch, nextSnapshotId: 
OffsetAndEpoch): Boolean = {

Review comment:
       Why change and replace the implementation for `deleteBeforeSnapshot`? 
For example, why not always delete every snapshot that is less than 
`nextSnapshotId` when the `if` statement predicate is true?
   
   For example, `log.deleteOldSegments()` deletes every segment that is less 
than the log start offset. Why not also delete every snapshot that is less than 
the log start offset which is the same as `nextSnapshotId`?
   
   This deletes both segments and snapshot. The old name of 
`deleteBeforeSnapshot` seems more accurate.

##########
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##########
@@ -817,11 +863,19 @@ object KafkaMetadataLogTest {
     }
   }
 
+  val DefaultMetadataLogConfig = new MetadataLogConfig(

Review comment:
       This is a `case class` so new not needed or recommended.

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2153,14 +2154,36 @@ private boolean maybeCompleteShutdown(long 
currentTimeMs) {
         return false;
     }
 
-    private void maybeDeleteBeforeSnapshot() {
-        log.latestSnapshotId().ifPresent(snapshotId -> {
-            quorum.highWatermark().ifPresent(highWatermark -> {
-                if (highWatermark.offset >= snapshotId.offset) {
-                    log.deleteBeforeSnapshot(snapshotId);
+    /**
+     * A simple timer based log cleaner
+     */
+    private static class RaftMetadataLogCleaner {
+        private final Logger logger;
+        private final Timer timer;
+        private final long delayMs;
+        private final Runnable cleaner;
+
+        RaftMetadataLogCleaner(Logger logger, Time time, long delayMs, 
Runnable cleaner) {
+            this.logger = logger;
+            this.timer = time.timer(delayMs);
+            this.delayMs = delayMs;
+            this.cleaner = cleaner;
+        }
+
+        public boolean maybeClean(long currentTimeMs) {
+            timer.update(currentTimeMs);
+            if (timer.isExpired()) {

Review comment:
       Do we need this timer because `log.maybeClean` is expensive even when 
there are no snapshots to clean?

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -312,26 +313,152 @@ final class KafkaMetadataLog private (
     }
   }
 
-  override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): 
Boolean = {
-    val (deleted, forgottenSnapshots) = snapshots synchronized {
-      latestSnapshotId().asScala match {
-        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
-          startOffset < logStartSnapshotId.offset &&
-          logStartSnapshotId.offset <= snapshotId.offset &&
-          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, 
SnapshotGenerated)) =>
+  /**
+   * Delete a snapshot, advance the log start offset, and clean old log 
segments. This will only happen if the
+   * following invariants all hold true:
+   *
+   * <li>This is not the latest snapshot (i.e., another snapshot proceeds this 
one)</li>
+   * <li>The offset of the next snapshot is greater than the log start 
offset</li>
+   * <li>The log can be advanced to the offset of the next snapshot</li>
+   *
+   * This method is not thread safe and assumes a lock on the snapshots 
collection is held
+   */
+  private[raft] def deleteSnapshot(snapshotId: OffsetAndEpoch, 
nextSnapshotIdOpt: Option[OffsetAndEpoch]): Boolean = {
+    nextSnapshotIdOpt.exists { nextSnapshotId =>
+      if (snapshots.contains(snapshotId) &&
+          snapshots.contains(nextSnapshotId) &&
+          startOffset < nextSnapshotId.offset &&
+          snapshotId.offset < nextSnapshotId.offset &&
+          log.maybeIncrementLogStartOffset(nextSnapshotId.offset, 
SnapshotGenerated)) {
+        log.deleteOldSegments()
+        val forgotten = mutable.TreeMap.empty[OffsetAndEpoch, 
Option[FileRawSnapshotReader]]
+        snapshots.remove(snapshotId) match {
+          case Some(removedSnapshot) => forgotten.put(snapshotId, 
removedSnapshot)
+          case None => throw new IllegalStateException(s"Could not remove 
snapshot $snapshotId from our cache.")
+        }
+        removeSnapshots(forgotten)
+        true
+      } else {
+        false
+      }
+    }
+  }
 
-          // Delete all segments that have a "last offset" less than the log 
start offset
-          log.deleteOldSegments()
+  /**
+   * Force all known snapshots to have an open reader so we can know their 
sizes. This method is not thread-safe
+   */
+  private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = {
+    snapshots.keys.toSeq.flatMap {
+      snapshotId => readSnapshot(snapshotId).asScala.map { reader => 
(snapshotId, reader.sizeInBytes())}
+    }
+  }
 
-          // Forget snapshots less than the log start offset
-          (true, forgetSnapshotsBefore(logStartSnapshotId))
-        case _ =>
-          (false, mutable.TreeMap.empty[OffsetAndEpoch, 
Option[FileRawSnapshotReader]])
+  /**
+   * Return the max timestamp of the first batch in a snapshot, if the 
snapshot exists and has records
+   */
+  private def firstBatchMaxTimestamp(snapshotId: OffsetAndEpoch): Option[Long] 
= {

Review comment:
       Sounds good. Can we move this implementation to `SnapshotReader`? It 
will match what we are doing here 
https://github.com/apache/kafka/pull/10946/files#diff-83295dbf4af9755c79987b390f72f53dda47af9f82eb8305755d90da18d5b9f2R85

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2089,7 +2090,7 @@ private long pollUnattachedAsObserver(UnattachedState 
state, long currentTimeMs)
     }
 
     private long pollCurrentState(long currentTimeMs) {
-        maybeDeleteBeforeSnapshot();
+        snapshotCleaner.maybeClean(currentTimeMs);

Review comment:
       We need to communicate the cleaner's timeout to the poll method so that 
knows for how long to wait in the message queue.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to