hachikuji commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r556219160



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1314,11 +1318,24 @@ class Log(@volatile private var _dir: File,
 
   /**
    * Increment the log start offset if the provided offset is larger.
+   *
+   * If the log start offset changed, then this method:
+   *
+   * 1. Records the new log start offset.
+   * 2. Updates the high watermark if it is less than the new log start offset

Review comment:
       nit: Some additional documentation may be helpful here, but just 
describing the logic in the code does not seem too useful. Comments like these 
tend to get out of sync with the code over time.

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -106,6 +106,7 @@ case class LogAppendInfo(var firstOffset: Option[Long],
                          var logAppendTime: Long,
                          var logStartOffset: Long,
                          var recordConversionStats: RecordConversionStats,
+                         var rolled: Boolean,

Review comment:
       Painful to me to have another parameter here. I wonder if we could make 
the type of `firstOffset` be `Option[LogOffsetMetadata]`. Then we could infer 
`rolled` from the segment position. If we keep the extra parameter, we need to 
update the doc above.

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -113,6 +145,22 @@ class KafkaMetadataLog(
     log.truncateTo(offset)
   }
 
+  override def truncateFullyToLatestSnapshot(): Boolean = {
+    // Truncate the log fully if the latest snapshot is greater than the log 
end offset
+    var truncated = false
+    latestSnapshotId.ifPresent { snapshotId =>
+      if (snapshotId.epoch > log.latestEpoch.getOrElse(0) ||

Review comment:
       nit: use `val` for `log.latestEpoch.getOrElse(0)` to avoid duplication?

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -147,18 +199,106 @@ class KafkaMetadataLog(
   }
 
   override def createSnapshot(snapshotId: raft.OffsetAndEpoch): 
RawSnapshotWriter = {
-    FileRawSnapshotWriter.create(log.dir.toPath, snapshotId)
+    // TODO: Talk to Jason about truncation past the high-watermark since it 
can lead to truncation past snapshots.
+    // This can result in the leader having a snapshot that is less that the 
follower's snapshot. I think that the Raft
+    // Client checks against this and aborts. If so, then this check and 
exception is okay.
+
+    // Do let the state machine create snapshots older than the latest snapshot
+    latestSnapshotId().ifPresent { latest =>
+      if (latest.epoch > snapshotId.epoch || latest.offset > 
snapshotId.offset) {
+        // Since snapshots are less than the high-watermark absolute offset 
comparison is okay.
+        throw new IllegalArgumentException(
+          s"Attemting to create a snapshot ($snapshotId) that is not greater 
than the latest snapshot ($latest)"
+        )
+      }
+    }
+
+    FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this))
   }
 
   override def readSnapshot(snapshotId: raft.OffsetAndEpoch): 
Optional[RawSnapshotReader] = {
     try {
-      Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+      if (snapshotIds.contains(snapshotId)) {
+        Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+      } else {
+        Optional.empty()
+      }
     } catch {
-      case e: NoSuchFileException => Optional.empty()
+      case e: NoSuchFileException =>
+        Optional.empty()
     }
   }
 
+  override def latestSnapshotId(): Optional[raft.OffsetAndEpoch] = {
+    try {
+      Optional.of(snapshotIds.last)
+    } catch {
+      case _: NoSuchElementException =>
+        Optional.empty()
+    }
+  }
+
+  override def startSnapshotId(): Optional[raft.OffsetAndEpoch] = {
+    startSnapshotId;

Review comment:
       nit: remove semicolon

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -147,18 +199,106 @@ class KafkaMetadataLog(
   }
 
   override def createSnapshot(snapshotId: raft.OffsetAndEpoch): 
RawSnapshotWriter = {
-    FileRawSnapshotWriter.create(log.dir.toPath, snapshotId)
+    // TODO: Talk to Jason about truncation past the high-watermark since it 
can lead to truncation past snapshots.
+    // This can result in the leader having a snapshot that is less that the 
follower's snapshot. I think that the Raft
+    // Client checks against this and aborts. If so, then this check and 
exception is okay.
+
+    // Do let the state machine create snapshots older than the latest snapshot
+    latestSnapshotId().ifPresent { latest =>
+      if (latest.epoch > snapshotId.epoch || latest.offset > 
snapshotId.offset) {
+        // Since snapshots are less than the high-watermark absolute offset 
comparison is okay.
+        throw new IllegalArgumentException(
+          s"Attemting to create a snapshot ($snapshotId) that is not greater 
than the latest snapshot ($latest)"
+        )
+      }
+    }
+
+    FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this))
   }
 
   override def readSnapshot(snapshotId: raft.OffsetAndEpoch): 
Optional[RawSnapshotReader] = {
     try {
-      Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+      if (snapshotIds.contains(snapshotId)) {
+        Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+      } else {
+        Optional.empty()
+      }
     } catch {
-      case e: NoSuchFileException => Optional.empty()
+      case e: NoSuchFileException =>
+        Optional.empty()
     }
   }
 
+  override def latestSnapshotId(): Optional[raft.OffsetAndEpoch] = {
+    try {
+      Optional.of(snapshotIds.last)
+    } catch {
+      case _: NoSuchElementException =>
+        Optional.empty()
+    }
+  }
+
+  override def startSnapshotId(): Optional[raft.OffsetAndEpoch] = {

Review comment:
       nit: how about `oldestSnapshotId` to go along with `latestSnapshotId`?

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -29,15 +32,22 @@ import org.apache.kafka.snapshot.FileRawSnapshotReader
 import org.apache.kafka.snapshot.FileRawSnapshotWriter
 import org.apache.kafka.snapshot.RawSnapshotReader
 import org.apache.kafka.snapshot.RawSnapshotWriter
+import org.apache.kafka.snapshot.Snapshots
 
 import scala.compat.java8.OptionConverters._
 
-class KafkaMetadataLog(
+final class KafkaMetadataLog private (
   log: Log,
+  snapshotIds: ConcurrentSkipListSet[raft.OffsetAndEpoch],
   topicPartition: TopicPartition,
-  maxFetchSizeInBytes: Int = 1024 * 1024
+  maxFetchSizeInBytes: Int
 ) extends ReplicatedLog {
 
+  private[this] var startSnapshotId = snapshotIds

Review comment:
       nit: isn't `[this]` redundant?

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -113,6 +145,22 @@ class KafkaMetadataLog(
     log.truncateTo(offset)
   }
 
+  override def truncateFullyToLatestSnapshot(): Boolean = {
+    // Truncate the log fully if the latest snapshot is greater than the log 
end offset
+    var truncated = false
+    latestSnapshotId.ifPresent { snapshotId =>
+      if (snapshotId.epoch > log.latestEpoch.getOrElse(0) ||
+        (snapshotId.epoch == log.latestEpoch.getOrElse(0) && snapshotId.offset 
> endOffset().offset)) {
+
+        log.truncateFullyAndStartAt(snapshotId.offset)

Review comment:
       Do we need to delete older snapshots?

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -79,13 +94,30 @@ class KafkaMetadataLog(
       throw new IllegalArgumentException("Attempt to append an empty record 
set")
 
     val appendInfo = log.appendAsFollower(records.asInstanceOf[MemoryRecords])
+
+    if (appendInfo.rolled) {
+      log.deleteOldSegments()
+    }
+
     new LogAppendInfo(appendInfo.firstOffset.getOrElse {
       throw new KafkaException("Append failed unexpectedly")
     }, appendInfo.lastOffset)
   }
 
   override def lastFetchedEpoch: Int = {
-    log.latestEpoch.getOrElse(0)
+    log.latestEpoch.getOrElse {
+      latestSnapshotId.map { snapshotId =>
+        val logEndOffset = endOffset().offset
+        if (snapshotId.offset == logEndOffset) {

Review comment:
       I think the invariant we are trying to enforce is that we always have a 
snapshot at the log start offset, so it's a little surprising to see the check 
for the end offset here. I think this is handling the case when the log is 
empty, so could we just as well use the log start offset?

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -131,6 +179,10 @@ class KafkaMetadataLog(
     }
   }
 
+  override def highWatermark: Long = {

Review comment:
       It might be useful to expose this as `LogOffsetMetadata`. That would be 
more consistent with `updateHighWatermark`.

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -29,15 +32,22 @@ import org.apache.kafka.snapshot.FileRawSnapshotReader
 import org.apache.kafka.snapshot.FileRawSnapshotWriter
 import org.apache.kafka.snapshot.RawSnapshotReader
 import org.apache.kafka.snapshot.RawSnapshotWriter
+import org.apache.kafka.snapshot.Snapshots
 
 import scala.compat.java8.OptionConverters._
 
-class KafkaMetadataLog(
+final class KafkaMetadataLog private (
   log: Log,
+  snapshotIds: ConcurrentSkipListSet[raft.OffsetAndEpoch],

Review comment:
       It would be useful to add some comments about concurrency somewhere. I 
assume we are using a concurrent collection because we want to allow snapshots 
to be created outside of the raft IO thread. Are there any other reasons?

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -147,18 +199,106 @@ class KafkaMetadataLog(
   }
 
   override def createSnapshot(snapshotId: raft.OffsetAndEpoch): 
RawSnapshotWriter = {
-    FileRawSnapshotWriter.create(log.dir.toPath, snapshotId)
+    // TODO: Talk to Jason about truncation past the high-watermark since it 
can lead to truncation past snapshots.

Review comment:
       Can you clarify the question? Do you mean truncation to an offset below 
the high watermark? 

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -79,13 +94,30 @@ class KafkaMetadataLog(
       throw new IllegalArgumentException("Attempt to append an empty record 
set")
 
     val appendInfo = log.appendAsFollower(records.asInstanceOf[MemoryRecords])
+
+    if (appendInfo.rolled) {
+      log.deleteOldSegments()
+    }
+
     new LogAppendInfo(appendInfo.firstOffset.getOrElse {
       throw new KafkaException("Append failed unexpectedly")
     }, appendInfo.lastOffset)
   }
 
   override def lastFetchedEpoch: Int = {
-    log.latestEpoch.getOrElse(0)
+    log.latestEpoch.getOrElse {
+      latestSnapshotId.map { snapshotId =>
+        val logEndOffset = endOffset().offset
+        if (snapshotId.offset == logEndOffset) {
+          snapshotId.epoch
+        } else {
+          throw new KafkaException(
+            s"Log doesn't have a last fetch epoch and there is a snapshot 
($snapshotId). " +
+            s"Expected the snapshot's end offset to match the logs end offset 
($logEndOffset)"
+          )
+        }
+      } orElse(0)

Review comment:
       nit: it's a little unusual to leave off the `.` in an expression like 
this

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -113,6 +145,22 @@ class KafkaMetadataLog(
     log.truncateTo(offset)
   }
 
+  override def truncateFullyToLatestSnapshot(): Boolean = {
+    // Truncate the log fully if the latest snapshot is greater than the log 
end offset
+    var truncated = false

Review comment:
       nit: we could probably convert to scala option and rewrite the check 
below as `exists`

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -147,18 +199,106 @@ class KafkaMetadataLog(
   }
 
   override def createSnapshot(snapshotId: raft.OffsetAndEpoch): 
RawSnapshotWriter = {
-    FileRawSnapshotWriter.create(log.dir.toPath, snapshotId)
+    // TODO: Talk to Jason about truncation past the high-watermark since it 
can lead to truncation past snapshots.
+    // This can result in the leader having a snapshot that is less that the 
follower's snapshot. I think that the Raft
+    // Client checks against this and aborts. If so, then this check and 
exception is okay.
+
+    // Do let the state machine create snapshots older than the latest snapshot
+    latestSnapshotId().ifPresent { latest =>
+      if (latest.epoch > snapshotId.epoch || latest.offset > 
snapshotId.offset) {
+        // Since snapshots are less than the high-watermark absolute offset 
comparison is okay.
+        throw new IllegalArgumentException(
+          s"Attemting to create a snapshot ($snapshotId) that is not greater 
than the latest snapshot ($latest)"
+        )
+      }
+    }
+
+    FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this))
   }
 
   override def readSnapshot(snapshotId: raft.OffsetAndEpoch): 
Optional[RawSnapshotReader] = {
     try {
-      Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+      if (snapshotIds.contains(snapshotId)) {
+        Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+      } else {
+        Optional.empty()
+      }
     } catch {
-      case e: NoSuchFileException => Optional.empty()
+      case e: NoSuchFileException =>
+        Optional.empty()
     }
   }
 
+  override def latestSnapshotId(): Optional[raft.OffsetAndEpoch] = {
+    try {
+      Optional.of(snapshotIds.last)
+    } catch {
+      case _: NoSuchElementException =>

Review comment:
       Man, this is annoying. There is also a `pollLast` which returns null if 
the set is empty.

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -147,18 +199,106 @@ class KafkaMetadataLog(
   }
 
   override def createSnapshot(snapshotId: raft.OffsetAndEpoch): 
RawSnapshotWriter = {
-    FileRawSnapshotWriter.create(log.dir.toPath, snapshotId)
+    // TODO: Talk to Jason about truncation past the high-watermark since it 
can lead to truncation past snapshots.
+    // This can result in the leader having a snapshot that is less that the 
follower's snapshot. I think that the Raft
+    // Client checks against this and aborts. If so, then this check and 
exception is okay.
+
+    // Do let the state machine create snapshots older than the latest snapshot
+    latestSnapshotId().ifPresent { latest =>
+      if (latest.epoch > snapshotId.epoch || latest.offset > 
snapshotId.offset) {
+        // Since snapshots are less than the high-watermark absolute offset 
comparison is okay.
+        throw new IllegalArgumentException(
+          s"Attemting to create a snapshot ($snapshotId) that is not greater 
than the latest snapshot ($latest)"
+        )
+      }
+    }
+
+    FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this))
   }
 
   override def readSnapshot(snapshotId: raft.OffsetAndEpoch): 
Optional[RawSnapshotReader] = {
     try {
-      Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+      if (snapshotIds.contains(snapshotId)) {
+        Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+      } else {
+        Optional.empty()
+      }
     } catch {
-      case e: NoSuchFileException => Optional.empty()
+      case e: NoSuchFileException =>
+        Optional.empty()
     }
   }
 
+  override def latestSnapshotId(): Optional[raft.OffsetAndEpoch] = {
+    try {
+      Optional.of(snapshotIds.last)
+    } catch {
+      case _: NoSuchElementException =>
+        Optional.empty()
+    }
+  }
+
+  override def startSnapshotId(): Optional[raft.OffsetAndEpoch] = {
+    startSnapshotId;
+  }
+
+  override def snapshotFrozen(snapshotId: raft.OffsetAndEpoch): Unit = {
+    snapshotIds.add(snapshotId)
+  }
+
+  override def updateLogStart(logStartSnapshotId: raft.OffsetAndEpoch): 
Boolean = {
+    var updated = false
+    latestSnapshotId.ifPresent { snapshotId =>
+      if (startOffset < logStartSnapshotId.offset &&
+          logStartSnapshotId.offset <= snapshotId.offset &&
+          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, 
SnapshotGenerated)) {
+
+        log.deleteOldSegments()
+        startSnapshotId = Optional.of(logStartSnapshotId)
+        updated = true
+      }
+    }
+
+    updated
+  }
+
   override def close(): Unit = {
     log.close()
   }
 }
+
+object KafkaMetadataLog {

Review comment:
       Kind of a general note, but I think there are some logging gaps in here. 
I think we should tend toward the verbose side initially since snapshot events 
will be relatively infrequent.

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -147,18 +199,106 @@ class KafkaMetadataLog(
   }
 
   override def createSnapshot(snapshotId: raft.OffsetAndEpoch): 
RawSnapshotWriter = {
-    FileRawSnapshotWriter.create(log.dir.toPath, snapshotId)
+    // TODO: Talk to Jason about truncation past the high-watermark since it 
can lead to truncation past snapshots.
+    // This can result in the leader having a snapshot that is less that the 
follower's snapshot. I think that the Raft
+    // Client checks against this and aborts. If so, then this check and 
exception is okay.
+
+    // Do let the state machine create snapshots older than the latest snapshot
+    latestSnapshotId().ifPresent { latest =>
+      if (latest.epoch > snapshotId.epoch || latest.offset > 
snapshotId.offset) {
+        // Since snapshots are less than the high-watermark absolute offset 
comparison is okay.
+        throw new IllegalArgumentException(
+          s"Attemting to create a snapshot ($snapshotId) that is not greater 
than the latest snapshot ($latest)"
+        )
+      }
+    }
+
+    FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this))
   }
 
   override def readSnapshot(snapshotId: raft.OffsetAndEpoch): 
Optional[RawSnapshotReader] = {
     try {
-      Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+      if (snapshotIds.contains(snapshotId)) {
+        Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+      } else {
+        Optional.empty()
+      }
     } catch {
-      case e: NoSuchFileException => Optional.empty()
+      case e: NoSuchFileException =>
+        Optional.empty()
     }
   }
 
+  override def latestSnapshotId(): Optional[raft.OffsetAndEpoch] = {
+    try {
+      Optional.of(snapshotIds.last)
+    } catch {
+      case _: NoSuchElementException =>
+        Optional.empty()
+    }
+  }
+
+  override def startSnapshotId(): Optional[raft.OffsetAndEpoch] = {
+    startSnapshotId;
+  }
+
+  override def snapshotFrozen(snapshotId: raft.OffsetAndEpoch): Unit = {

Review comment:
       Maybe `onSnapshotFrozen`?

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -147,18 +199,106 @@ class KafkaMetadataLog(
   }
 
   override def createSnapshot(snapshotId: raft.OffsetAndEpoch): 
RawSnapshotWriter = {
-    FileRawSnapshotWriter.create(log.dir.toPath, snapshotId)
+    // TODO: Talk to Jason about truncation past the high-watermark since it 
can lead to truncation past snapshots.
+    // This can result in the leader having a snapshot that is less that the 
follower's snapshot. I think that the Raft
+    // Client checks against this and aborts. If so, then this check and 
exception is okay.
+
+    // Do let the state machine create snapshots older than the latest snapshot
+    latestSnapshotId().ifPresent { latest =>
+      if (latest.epoch > snapshotId.epoch || latest.offset > 
snapshotId.offset) {
+        // Since snapshots are less than the high-watermark absolute offset 
comparison is okay.
+        throw new IllegalArgumentException(
+          s"Attemting to create a snapshot ($snapshotId) that is not greater 
than the latest snapshot ($latest)"
+        )
+      }
+    }
+
+    FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this))
   }
 
   override def readSnapshot(snapshotId: raft.OffsetAndEpoch): 
Optional[RawSnapshotReader] = {
     try {
-      Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+      if (snapshotIds.contains(snapshotId)) {
+        Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+      } else {
+        Optional.empty()
+      }
     } catch {
-      case e: NoSuchFileException => Optional.empty()
+      case e: NoSuchFileException =>
+        Optional.empty()
     }
   }
 
+  override def latestSnapshotId(): Optional[raft.OffsetAndEpoch] = {
+    try {
+      Optional.of(snapshotIds.last)
+    } catch {
+      case _: NoSuchElementException =>
+        Optional.empty()
+    }
+  }
+
+  override def startSnapshotId(): Optional[raft.OffsetAndEpoch] = {
+    startSnapshotId;
+  }
+
+  override def snapshotFrozen(snapshotId: raft.OffsetAndEpoch): Unit = {
+    snapshotIds.add(snapshotId)
+  }
+
+  override def updateLogStart(logStartSnapshotId: raft.OffsetAndEpoch): 
Boolean = {

Review comment:
       Do we need to validate that there really is a snapshot associated with 
this id?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to