jsancio commented on a change in pull request #10786: URL: https://github.com/apache/kafka/pull/10786#discussion_r649273990
########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -233,18 +233,40 @@ final class KafkaMetadataLog private ( log.topicId.get } - override def createSnapshot(snapshotId: OffsetAndEpoch): RawSnapshotWriter = { - // Do not let the state machine create snapshots older than the latest snapshot - latestSnapshotId().ifPresent { latest => - if (latest.epoch > snapshotId.epoch || latest.offset > snapshotId.offset) { - // Since snapshots are less than the high-watermark absolute offset comparison is okay. - throw new IllegalArgumentException( - s"Attempting to create a snapshot ($snapshotId) that is not greater than the latest snapshot ($latest)" - ) - } + override def createSnapshot(snapshotId: OffsetAndEpoch): Optional[RawSnapshotWriter] = { + if (snapshots.contains(snapshotId)) { + Optional.empty() + } else { + Optional.of(FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this))) + } + } + + override def createSnapshotFromEndOffset(endOffset: Long): Optional[RawSnapshotWriter] = { + val highWatermarkOffset = highWatermark.offset + if (endOffset > highWatermarkOffset) { + throw new IllegalArgumentException( + s"Cannot create a snapshot for an end offset ($endOffset) greater than the high-watermark ($highWatermarkOffset)" + ) + } + + if (endOffset < startOffset) { + throw new IllegalArgumentException( + s"Cannot create a snapshot for an end offset ($endOffset) less than the log start offset ($startOffset)" + ) + } + + val epoch = log.leaderEpochCache.flatMap(_.findEpochEntryByEndOffset(endOffset)) match { + case Some(epochEntry) => + epochEntry.epoch + case None => + // Assume that the end offset falls in the current epoch since based on the check above: Review comment: I removed this code. To avoid scanning the leader epoch cache, I reverted the snapshot creation API so that both the offset and the epoch is pass to `createSnapshot`. The new code just validates that the given offset and epoch are valid according to the record batches in the log and leader epoch cache. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org