hachikuji commented on a change in pull request #9816: URL: https://github.com/apache/kafka/pull/9816#discussion_r556219160
########## File path: core/src/main/scala/kafka/log/Log.scala ########## @@ -1314,11 +1318,24 @@ class Log(@volatile private var _dir: File, /** * Increment the log start offset if the provided offset is larger. + * + * If the log start offset changed, then this method: + * + * 1. Records the new log start offset. + * 2. Updates the high watermark if it is less than the new log start offset Review comment: nit: Some additional documentation may be helpful here, but just describing the logic in the code does not seem too useful. Comments like these tend to get out of sync with the code over time. ########## File path: core/src/main/scala/kafka/log/Log.scala ########## @@ -106,6 +106,7 @@ case class LogAppendInfo(var firstOffset: Option[Long], var logAppendTime: Long, var logStartOffset: Long, var recordConversionStats: RecordConversionStats, + var rolled: Boolean, Review comment: Painful to me to have another parameter here. I wonder if we could make the type of `firstOffset` be `Option[LogOffsetMetadata]`. Then we could infer `rolled` from the segment position. If we keep the extra parameter, we need to update the doc above. ########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -113,6 +145,22 @@ class KafkaMetadataLog( log.truncateTo(offset) } + override def truncateFullyToLatestSnapshot(): Boolean = { + // Truncate the log fully if the latest snapshot is greater than the log end offset + var truncated = false + latestSnapshotId.ifPresent { snapshotId => + if (snapshotId.epoch > log.latestEpoch.getOrElse(0) || Review comment: nit: use `val` for `log.latestEpoch.getOrElse(0)` to avoid duplication? ########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -147,18 +199,106 @@ class KafkaMetadataLog( } override def createSnapshot(snapshotId: raft.OffsetAndEpoch): RawSnapshotWriter = { - FileRawSnapshotWriter.create(log.dir.toPath, snapshotId) + // TODO: Talk to Jason about truncation past the high-watermark since it can lead to truncation past snapshots. + // This can result in the leader having a snapshot that is less that the follower's snapshot. I think that the Raft + // Client checks against this and aborts. If so, then this check and exception is okay. + + // Do let the state machine create snapshots older than the latest snapshot + latestSnapshotId().ifPresent { latest => + if (latest.epoch > snapshotId.epoch || latest.offset > snapshotId.offset) { + // Since snapshots are less than the high-watermark absolute offset comparison is okay. + throw new IllegalArgumentException( + s"Attemting to create a snapshot ($snapshotId) that is not greater than the latest snapshot ($latest)" + ) + } + } + + FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this)) } override def readSnapshot(snapshotId: raft.OffsetAndEpoch): Optional[RawSnapshotReader] = { try { - Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) + if (snapshotIds.contains(snapshotId)) { + Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) + } else { + Optional.empty() + } } catch { - case e: NoSuchFileException => Optional.empty() + case e: NoSuchFileException => + Optional.empty() } } + override def latestSnapshotId(): Optional[raft.OffsetAndEpoch] = { + try { + Optional.of(snapshotIds.last) + } catch { + case _: NoSuchElementException => + Optional.empty() + } + } + + override def startSnapshotId(): Optional[raft.OffsetAndEpoch] = { + startSnapshotId; Review comment: nit: remove semicolon ########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -147,18 +199,106 @@ class KafkaMetadataLog( } override def createSnapshot(snapshotId: raft.OffsetAndEpoch): RawSnapshotWriter = { - FileRawSnapshotWriter.create(log.dir.toPath, snapshotId) + // TODO: Talk to Jason about truncation past the high-watermark since it can lead to truncation past snapshots. + // This can result in the leader having a snapshot that is less that the follower's snapshot. I think that the Raft + // Client checks against this and aborts. If so, then this check and exception is okay. + + // Do let the state machine create snapshots older than the latest snapshot + latestSnapshotId().ifPresent { latest => + if (latest.epoch > snapshotId.epoch || latest.offset > snapshotId.offset) { + // Since snapshots are less than the high-watermark absolute offset comparison is okay. + throw new IllegalArgumentException( + s"Attemting to create a snapshot ($snapshotId) that is not greater than the latest snapshot ($latest)" + ) + } + } + + FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this)) } override def readSnapshot(snapshotId: raft.OffsetAndEpoch): Optional[RawSnapshotReader] = { try { - Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) + if (snapshotIds.contains(snapshotId)) { + Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) + } else { + Optional.empty() + } } catch { - case e: NoSuchFileException => Optional.empty() + case e: NoSuchFileException => + Optional.empty() } } + override def latestSnapshotId(): Optional[raft.OffsetAndEpoch] = { + try { + Optional.of(snapshotIds.last) + } catch { + case _: NoSuchElementException => + Optional.empty() + } + } + + override def startSnapshotId(): Optional[raft.OffsetAndEpoch] = { Review comment: nit: how about `oldestSnapshotId` to go along with `latestSnapshotId`? ########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -29,15 +32,22 @@ import org.apache.kafka.snapshot.FileRawSnapshotReader import org.apache.kafka.snapshot.FileRawSnapshotWriter import org.apache.kafka.snapshot.RawSnapshotReader import org.apache.kafka.snapshot.RawSnapshotWriter +import org.apache.kafka.snapshot.Snapshots import scala.compat.java8.OptionConverters._ -class KafkaMetadataLog( +final class KafkaMetadataLog private ( log: Log, + snapshotIds: ConcurrentSkipListSet[raft.OffsetAndEpoch], topicPartition: TopicPartition, - maxFetchSizeInBytes: Int = 1024 * 1024 + maxFetchSizeInBytes: Int ) extends ReplicatedLog { + private[this] var startSnapshotId = snapshotIds Review comment: nit: isn't `[this]` redundant? ########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -113,6 +145,22 @@ class KafkaMetadataLog( log.truncateTo(offset) } + override def truncateFullyToLatestSnapshot(): Boolean = { + // Truncate the log fully if the latest snapshot is greater than the log end offset + var truncated = false + latestSnapshotId.ifPresent { snapshotId => + if (snapshotId.epoch > log.latestEpoch.getOrElse(0) || + (snapshotId.epoch == log.latestEpoch.getOrElse(0) && snapshotId.offset > endOffset().offset)) { + + log.truncateFullyAndStartAt(snapshotId.offset) Review comment: Do we need to delete older snapshots? ########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -79,13 +94,30 @@ class KafkaMetadataLog( throw new IllegalArgumentException("Attempt to append an empty record set") val appendInfo = log.appendAsFollower(records.asInstanceOf[MemoryRecords]) + + if (appendInfo.rolled) { + log.deleteOldSegments() + } + new LogAppendInfo(appendInfo.firstOffset.getOrElse { throw new KafkaException("Append failed unexpectedly") }, appendInfo.lastOffset) } override def lastFetchedEpoch: Int = { - log.latestEpoch.getOrElse(0) + log.latestEpoch.getOrElse { + latestSnapshotId.map { snapshotId => + val logEndOffset = endOffset().offset + if (snapshotId.offset == logEndOffset) { Review comment: I think the invariant we are trying to enforce is that we always have a snapshot at the log start offset, so it's a little surprising to see the check for the end offset here. I think this is handling the case when the log is empty, so could we just as well use the log start offset? ########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -131,6 +179,10 @@ class KafkaMetadataLog( } } + override def highWatermark: Long = { Review comment: It might be useful to expose this as `LogOffsetMetadata`. That would be more consistent with `updateHighWatermark`. ########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -29,15 +32,22 @@ import org.apache.kafka.snapshot.FileRawSnapshotReader import org.apache.kafka.snapshot.FileRawSnapshotWriter import org.apache.kafka.snapshot.RawSnapshotReader import org.apache.kafka.snapshot.RawSnapshotWriter +import org.apache.kafka.snapshot.Snapshots import scala.compat.java8.OptionConverters._ -class KafkaMetadataLog( +final class KafkaMetadataLog private ( log: Log, + snapshotIds: ConcurrentSkipListSet[raft.OffsetAndEpoch], Review comment: It would be useful to add some comments about concurrency somewhere. I assume we are using a concurrent collection because we want to allow snapshots to be created outside of the raft IO thread. Are there any other reasons? ########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -147,18 +199,106 @@ class KafkaMetadataLog( } override def createSnapshot(snapshotId: raft.OffsetAndEpoch): RawSnapshotWriter = { - FileRawSnapshotWriter.create(log.dir.toPath, snapshotId) + // TODO: Talk to Jason about truncation past the high-watermark since it can lead to truncation past snapshots. Review comment: Can you clarify the question? Do you mean truncation to an offset below the high watermark? ########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -79,13 +94,30 @@ class KafkaMetadataLog( throw new IllegalArgumentException("Attempt to append an empty record set") val appendInfo = log.appendAsFollower(records.asInstanceOf[MemoryRecords]) + + if (appendInfo.rolled) { + log.deleteOldSegments() + } + new LogAppendInfo(appendInfo.firstOffset.getOrElse { throw new KafkaException("Append failed unexpectedly") }, appendInfo.lastOffset) } override def lastFetchedEpoch: Int = { - log.latestEpoch.getOrElse(0) + log.latestEpoch.getOrElse { + latestSnapshotId.map { snapshotId => + val logEndOffset = endOffset().offset + if (snapshotId.offset == logEndOffset) { + snapshotId.epoch + } else { + throw new KafkaException( + s"Log doesn't have a last fetch epoch and there is a snapshot ($snapshotId). " + + s"Expected the snapshot's end offset to match the logs end offset ($logEndOffset)" + ) + } + } orElse(0) Review comment: nit: it's a little unusual to leave off the `.` in an expression like this ########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -113,6 +145,22 @@ class KafkaMetadataLog( log.truncateTo(offset) } + override def truncateFullyToLatestSnapshot(): Boolean = { + // Truncate the log fully if the latest snapshot is greater than the log end offset + var truncated = false Review comment: nit: we could probably convert to scala option and rewrite the check below as `exists` ########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -147,18 +199,106 @@ class KafkaMetadataLog( } override def createSnapshot(snapshotId: raft.OffsetAndEpoch): RawSnapshotWriter = { - FileRawSnapshotWriter.create(log.dir.toPath, snapshotId) + // TODO: Talk to Jason about truncation past the high-watermark since it can lead to truncation past snapshots. + // This can result in the leader having a snapshot that is less that the follower's snapshot. I think that the Raft + // Client checks against this and aborts. If so, then this check and exception is okay. + + // Do let the state machine create snapshots older than the latest snapshot + latestSnapshotId().ifPresent { latest => + if (latest.epoch > snapshotId.epoch || latest.offset > snapshotId.offset) { + // Since snapshots are less than the high-watermark absolute offset comparison is okay. + throw new IllegalArgumentException( + s"Attemting to create a snapshot ($snapshotId) that is not greater than the latest snapshot ($latest)" + ) + } + } + + FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this)) } override def readSnapshot(snapshotId: raft.OffsetAndEpoch): Optional[RawSnapshotReader] = { try { - Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) + if (snapshotIds.contains(snapshotId)) { + Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) + } else { + Optional.empty() + } } catch { - case e: NoSuchFileException => Optional.empty() + case e: NoSuchFileException => + Optional.empty() } } + override def latestSnapshotId(): Optional[raft.OffsetAndEpoch] = { + try { + Optional.of(snapshotIds.last) + } catch { + case _: NoSuchElementException => Review comment: Man, this is annoying. There is also a `pollLast` which returns null if the set is empty. ########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -147,18 +199,106 @@ class KafkaMetadataLog( } override def createSnapshot(snapshotId: raft.OffsetAndEpoch): RawSnapshotWriter = { - FileRawSnapshotWriter.create(log.dir.toPath, snapshotId) + // TODO: Talk to Jason about truncation past the high-watermark since it can lead to truncation past snapshots. + // This can result in the leader having a snapshot that is less that the follower's snapshot. I think that the Raft + // Client checks against this and aborts. If so, then this check and exception is okay. + + // Do let the state machine create snapshots older than the latest snapshot + latestSnapshotId().ifPresent { latest => + if (latest.epoch > snapshotId.epoch || latest.offset > snapshotId.offset) { + // Since snapshots are less than the high-watermark absolute offset comparison is okay. + throw new IllegalArgumentException( + s"Attemting to create a snapshot ($snapshotId) that is not greater than the latest snapshot ($latest)" + ) + } + } + + FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this)) } override def readSnapshot(snapshotId: raft.OffsetAndEpoch): Optional[RawSnapshotReader] = { try { - Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) + if (snapshotIds.contains(snapshotId)) { + Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) + } else { + Optional.empty() + } } catch { - case e: NoSuchFileException => Optional.empty() + case e: NoSuchFileException => + Optional.empty() } } + override def latestSnapshotId(): Optional[raft.OffsetAndEpoch] = { + try { + Optional.of(snapshotIds.last) + } catch { + case _: NoSuchElementException => + Optional.empty() + } + } + + override def startSnapshotId(): Optional[raft.OffsetAndEpoch] = { + startSnapshotId; + } + + override def snapshotFrozen(snapshotId: raft.OffsetAndEpoch): Unit = { + snapshotIds.add(snapshotId) + } + + override def updateLogStart(logStartSnapshotId: raft.OffsetAndEpoch): Boolean = { + var updated = false + latestSnapshotId.ifPresent { snapshotId => + if (startOffset < logStartSnapshotId.offset && + logStartSnapshotId.offset <= snapshotId.offset && + log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) { + + log.deleteOldSegments() + startSnapshotId = Optional.of(logStartSnapshotId) + updated = true + } + } + + updated + } + override def close(): Unit = { log.close() } } + +object KafkaMetadataLog { Review comment: Kind of a general note, but I think there are some logging gaps in here. I think we should tend toward the verbose side initially since snapshot events will be relatively infrequent. ########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -147,18 +199,106 @@ class KafkaMetadataLog( } override def createSnapshot(snapshotId: raft.OffsetAndEpoch): RawSnapshotWriter = { - FileRawSnapshotWriter.create(log.dir.toPath, snapshotId) + // TODO: Talk to Jason about truncation past the high-watermark since it can lead to truncation past snapshots. + // This can result in the leader having a snapshot that is less that the follower's snapshot. I think that the Raft + // Client checks against this and aborts. If so, then this check and exception is okay. + + // Do let the state machine create snapshots older than the latest snapshot + latestSnapshotId().ifPresent { latest => + if (latest.epoch > snapshotId.epoch || latest.offset > snapshotId.offset) { + // Since snapshots are less than the high-watermark absolute offset comparison is okay. + throw new IllegalArgumentException( + s"Attemting to create a snapshot ($snapshotId) that is not greater than the latest snapshot ($latest)" + ) + } + } + + FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this)) } override def readSnapshot(snapshotId: raft.OffsetAndEpoch): Optional[RawSnapshotReader] = { try { - Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) + if (snapshotIds.contains(snapshotId)) { + Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) + } else { + Optional.empty() + } } catch { - case e: NoSuchFileException => Optional.empty() + case e: NoSuchFileException => + Optional.empty() } } + override def latestSnapshotId(): Optional[raft.OffsetAndEpoch] = { + try { + Optional.of(snapshotIds.last) + } catch { + case _: NoSuchElementException => + Optional.empty() + } + } + + override def startSnapshotId(): Optional[raft.OffsetAndEpoch] = { + startSnapshotId; + } + + override def snapshotFrozen(snapshotId: raft.OffsetAndEpoch): Unit = { Review comment: Maybe `onSnapshotFrozen`? ########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -147,18 +199,106 @@ class KafkaMetadataLog( } override def createSnapshot(snapshotId: raft.OffsetAndEpoch): RawSnapshotWriter = { - FileRawSnapshotWriter.create(log.dir.toPath, snapshotId) + // TODO: Talk to Jason about truncation past the high-watermark since it can lead to truncation past snapshots. + // This can result in the leader having a snapshot that is less that the follower's snapshot. I think that the Raft + // Client checks against this and aborts. If so, then this check and exception is okay. + + // Do let the state machine create snapshots older than the latest snapshot + latestSnapshotId().ifPresent { latest => + if (latest.epoch > snapshotId.epoch || latest.offset > snapshotId.offset) { + // Since snapshots are less than the high-watermark absolute offset comparison is okay. + throw new IllegalArgumentException( + s"Attemting to create a snapshot ($snapshotId) that is not greater than the latest snapshot ($latest)" + ) + } + } + + FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this)) } override def readSnapshot(snapshotId: raft.OffsetAndEpoch): Optional[RawSnapshotReader] = { try { - Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) + if (snapshotIds.contains(snapshotId)) { + Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) + } else { + Optional.empty() + } } catch { - case e: NoSuchFileException => Optional.empty() + case e: NoSuchFileException => + Optional.empty() } } + override def latestSnapshotId(): Optional[raft.OffsetAndEpoch] = { + try { + Optional.of(snapshotIds.last) + } catch { + case _: NoSuchElementException => + Optional.empty() + } + } + + override def startSnapshotId(): Optional[raft.OffsetAndEpoch] = { + startSnapshotId; + } + + override def snapshotFrozen(snapshotId: raft.OffsetAndEpoch): Unit = { + snapshotIds.add(snapshotId) + } + + override def updateLogStart(logStartSnapshotId: raft.OffsetAndEpoch): Boolean = { Review comment: Do we need to validate that there really is a snapshot associated with this id? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org