hachikuji commented on a change in pull request #9816: URL: https://github.com/apache/kafka/pull/9816#discussion_r564976547
########## File path: core/src/main/scala/kafka/log/Log.scala ########## @@ -2118,7 +2139,7 @@ class Log(@volatile private var _dir: File, * * @param newOffset The new offset to start the log with */ - private[log] def truncateFullyAndStartAt(newOffset: Long): Unit = { + def truncateFullyAndStartAt(newOffset: Long): Unit = { Review comment: It is a little unfortunate to expose the truncation APIs from `Log`. For logs which are managed by `LogManager`, truncation calls are expected to go through `LogManager`. That makes me think it might be useful to have a distinction between the low-level `Log` object and that which is managed by `LogManager`. For example, maybe we could use `ManagedLog` or something like that. Not something to address here, just food for thought in the way of future improvement. ########## File path: core/src/main/scala/kafka/log/Log.scala ########## @@ -1910,7 +1931,7 @@ class Log(@volatile private var _dir: File, in the header. */ appendInfo.firstOffset match { - case Some(firstOffset) => roll(Some(firstOffset)) + case Some(firstOffset) => roll(Some(firstOffset.messageOffset)) Review comment: nit: might be able to simplify this a little bit. Maybe something like this: ```scala val rollOffset = appendInfo.firstOffset.map(_.messageOffset) .getOrElse(maxOffsetInMessages - Integer.MAX_VALUE)) roll(Some(rollOffset)) ``` ########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -66,32 +78,67 @@ class KafkaMetadataLog( if (records.sizeInBytes == 0) throw new IllegalArgumentException("Attempt to append an empty record set") - val appendInfo = log.appendAsLeader(records.asInstanceOf[MemoryRecords], - leaderEpoch = epoch, - origin = AppendOrigin.Coordinator) - new LogAppendInfo(appendInfo.firstOffset.getOrElse { - throw new KafkaException("Append failed unexpectedly") - }, appendInfo.lastOffset) + handleAndConvertLogAppendInfo( + log.appendAsLeader(records.asInstanceOf[MemoryRecords], + leaderEpoch = epoch, + origin = AppendOrigin.Coordinator + ) + ) } override def appendAsFollower(records: Records): LogAppendInfo = { if (records.sizeInBytes == 0) throw new IllegalArgumentException("Attempt to append an empty record set") - val appendInfo = log.appendAsFollower(records.asInstanceOf[MemoryRecords]) - new LogAppendInfo(appendInfo.firstOffset.getOrElse { - throw new KafkaException("Append failed unexpectedly") - }, appendInfo.lastOffset) + handleAndConvertLogAppendInfo(log.appendAsFollower(records.asInstanceOf[MemoryRecords])) + } + + private def handleAndConvertLogAppendInfo(appendInfo: kafka.log.LogAppendInfo): LogAppendInfo = { + appendInfo.firstOffset match { + case Some(firstOffset) => + if (firstOffset.relativePositionInSegment == 0) { + // Assume that a new segment was created if the relative position is 0 + log.deleteOldSegments() + } + new LogAppendInfo(firstOffset.messageOffset, appendInfo.lastOffset) + case None => + throw new KafkaException(s"Append failed unexpectedly: $appendInfo") + } } override def lastFetchedEpoch: Int = { - log.latestEpoch.getOrElse(0) + log.latestEpoch.getOrElse { + latestSnapshotId.map { snapshotId => + val logEndOffset = endOffset().offset + if (snapshotId.offset == startOffset && snapshotId.offset == logEndOffset) { + // Return the epoch of the snapshot when the log is empty + snapshotId.epoch + } else { + throw new KafkaException( + s"Log doesn't have a last fetch epoch and there is a snapshot ($snapshotId). " + + s"Expected the snapshot's end offset to match the log's end offset ($logEndOffset) " + + s"and the log start offset ($startOffset)" + ) + } + }.orElse(0) + } } override def endOffsetForEpoch(leaderEpoch: Int): Optional[raft.OffsetAndEpoch] = { val endOffsetOpt = log.endOffsetForEpoch(leaderEpoch).map { offsetAndEpoch => - new raft.OffsetAndEpoch(offsetAndEpoch.offset, offsetAndEpoch.leaderEpoch) + if (oldestSnapshotId.isPresent() && + offsetAndEpoch.offset == oldestSnapshotId.get().offset && + offsetAndEpoch.leaderEpoch == leaderEpoch) { + + // The leaderEpoch is smaller thant the smallest epoch on the log. Overide the diverging + // epoch to the oldest snapshot which should be the snapshot at the log start offset + val snapshotId = oldestSnapshotId().get(); Review comment: nit: remove semicolon here and below. also some unnecessary parenthesis in here ########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -66,32 +78,67 @@ class KafkaMetadataLog( if (records.sizeInBytes == 0) throw new IllegalArgumentException("Attempt to append an empty record set") - val appendInfo = log.appendAsLeader(records.asInstanceOf[MemoryRecords], - leaderEpoch = epoch, - origin = AppendOrigin.Coordinator) - new LogAppendInfo(appendInfo.firstOffset.getOrElse { - throw new KafkaException("Append failed unexpectedly") - }, appendInfo.lastOffset) + handleAndConvertLogAppendInfo( + log.appendAsLeader(records.asInstanceOf[MemoryRecords], + leaderEpoch = epoch, + origin = AppendOrigin.Coordinator + ) + ) } override def appendAsFollower(records: Records): LogAppendInfo = { if (records.sizeInBytes == 0) throw new IllegalArgumentException("Attempt to append an empty record set") - val appendInfo = log.appendAsFollower(records.asInstanceOf[MemoryRecords]) - new LogAppendInfo(appendInfo.firstOffset.getOrElse { - throw new KafkaException("Append failed unexpectedly") - }, appendInfo.lastOffset) + handleAndConvertLogAppendInfo(log.appendAsFollower(records.asInstanceOf[MemoryRecords])) + } + + private def handleAndConvertLogAppendInfo(appendInfo: kafka.log.LogAppendInfo): LogAppendInfo = { + appendInfo.firstOffset match { + case Some(firstOffset) => + if (firstOffset.relativePositionInSegment == 0) { + // Assume that a new segment was created if the relative position is 0 + log.deleteOldSegments() + } + new LogAppendInfo(firstOffset.messageOffset, appendInfo.lastOffset) + case None => + throw new KafkaException(s"Append failed unexpectedly: $appendInfo") Review comment: The `LogAppendInfo` contains an `errorMessage` field. Perhaps we can include it here? ########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -66,32 +78,67 @@ class KafkaMetadataLog( if (records.sizeInBytes == 0) throw new IllegalArgumentException("Attempt to append an empty record set") - val appendInfo = log.appendAsLeader(records.asInstanceOf[MemoryRecords], - leaderEpoch = epoch, - origin = AppendOrigin.Coordinator) - new LogAppendInfo(appendInfo.firstOffset.getOrElse { - throw new KafkaException("Append failed unexpectedly") - }, appendInfo.lastOffset) + handleAndConvertLogAppendInfo( + log.appendAsLeader(records.asInstanceOf[MemoryRecords], + leaderEpoch = epoch, + origin = AppendOrigin.Coordinator + ) + ) } override def appendAsFollower(records: Records): LogAppendInfo = { if (records.sizeInBytes == 0) throw new IllegalArgumentException("Attempt to append an empty record set") - val appendInfo = log.appendAsFollower(records.asInstanceOf[MemoryRecords]) - new LogAppendInfo(appendInfo.firstOffset.getOrElse { - throw new KafkaException("Append failed unexpectedly") - }, appendInfo.lastOffset) + handleAndConvertLogAppendInfo(log.appendAsFollower(records.asInstanceOf[MemoryRecords])) + } + + private def handleAndConvertLogAppendInfo(appendInfo: kafka.log.LogAppendInfo): LogAppendInfo = { + appendInfo.firstOffset match { + case Some(firstOffset) => + if (firstOffset.relativePositionInSegment == 0) { + // Assume that a new segment was created if the relative position is 0 + log.deleteOldSegments() + } + new LogAppendInfo(firstOffset.messageOffset, appendInfo.lastOffset) + case None => + throw new KafkaException(s"Append failed unexpectedly: $appendInfo") + } } override def lastFetchedEpoch: Int = { - log.latestEpoch.getOrElse(0) + log.latestEpoch.getOrElse { + latestSnapshotId.map { snapshotId => + val logEndOffset = endOffset().offset + if (snapshotId.offset == startOffset && snapshotId.offset == logEndOffset) { + // Return the epoch of the snapshot when the log is empty + snapshotId.epoch + } else { + throw new KafkaException( + s"Log doesn't have a last fetch epoch and there is a snapshot ($snapshotId). " + + s"Expected the snapshot's end offset to match the log's end offset ($logEndOffset) " + + s"and the log start offset ($startOffset)" + ) + } + }.orElse(0) + } } override def endOffsetForEpoch(leaderEpoch: Int): Optional[raft.OffsetAndEpoch] = { val endOffsetOpt = log.endOffsetForEpoch(leaderEpoch).map { offsetAndEpoch => - new raft.OffsetAndEpoch(offsetAndEpoch.offset, offsetAndEpoch.leaderEpoch) + if (oldestSnapshotId.isPresent() && + offsetAndEpoch.offset == oldestSnapshotId.get().offset && + offsetAndEpoch.leaderEpoch == leaderEpoch) { Review comment: Always tough to think my way through this logic. So here we are handling the case where the client has requested the end offset for an epoch which is smaller than the entries remaining in the log. The current epoch cache implementation handles this by returning the requested epoch with an end offset equal to the log start offset. So we detect the case here by checking that the returned epoch matches the requested epoch and the end offset matches the log start offset, which we assume to be equal to the offset corresponding to the oldest snapshot. Right so far? So there are still a couple cases that are possible after verifying this: 1. The epoch matches the snapshot epoch. We know then that the end offset must be equal to the snapshot offset. This is a case where there is a new epoch which begins on the first offset in the log. 2. The epoch is less than the snapshot epoch. The follower might have been offline for a long time and is requesting an ancient epoch which we no longer have any knowledge of. I'm convinced about the handling for the first case, but less sure about the second one. The guarantee that this API provides is that it will return the largest epoch which is less than or equal to the requested epoch and its last offset. But for this case, the snapshot epoch is larger than the request epoch. In fact, we have no way of satisfying this API because we have lost the old data. Thinking in terms of what we want to happen might be helpful. We want to detect this as an out of range fetch which causes the follower to fetch from the latest snapshot. What if we return `Optional.empty` since we cannot determine the end offset for the requested epoch? Currently we do the following in `KafkaRaftClient`: ```java OffsetAndEpoch endOffsetAndEpoch = log.endOffsetForEpoch(lastFetchedEpoch).orElseThrow(() -> { return new IllegalStateException( String.format( "Expected to find an end offset for epoch %s since it must be less than the current epoch %s", lastFetchedEpoch, quorum.epoch() ) ); }); ``` There are two cases we want to handle: the requested epoch is less than any known epoch we have and the requested epoch is larger than any known epoch. The latter case might be possible if there was an "unclean" election of some kind. However, we can handle both cases the same. We can send the latest snapshot id to the follower, which will cause it to truncate. The follower may see this as a loss of committed data and fail, but that is ok. At least we will get a clear message about the failure. ########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -66,32 +78,67 @@ class KafkaMetadataLog( if (records.sizeInBytes == 0) throw new IllegalArgumentException("Attempt to append an empty record set") - val appendInfo = log.appendAsLeader(records.asInstanceOf[MemoryRecords], - leaderEpoch = epoch, - origin = AppendOrigin.Coordinator) - new LogAppendInfo(appendInfo.firstOffset.getOrElse { - throw new KafkaException("Append failed unexpectedly") - }, appendInfo.lastOffset) + handleAndConvertLogAppendInfo( + log.appendAsLeader(records.asInstanceOf[MemoryRecords], + leaderEpoch = epoch, + origin = AppendOrigin.Coordinator + ) + ) } override def appendAsFollower(records: Records): LogAppendInfo = { if (records.sizeInBytes == 0) throw new IllegalArgumentException("Attempt to append an empty record set") - val appendInfo = log.appendAsFollower(records.asInstanceOf[MemoryRecords]) - new LogAppendInfo(appendInfo.firstOffset.getOrElse { - throw new KafkaException("Append failed unexpectedly") - }, appendInfo.lastOffset) + handleAndConvertLogAppendInfo(log.appendAsFollower(records.asInstanceOf[MemoryRecords])) + } + + private def handleAndConvertLogAppendInfo(appendInfo: kafka.log.LogAppendInfo): LogAppendInfo = { + appendInfo.firstOffset match { + case Some(firstOffset) => + if (firstOffset.relativePositionInSegment == 0) { + // Assume that a new segment was created if the relative position is 0 + log.deleteOldSegments() + } + new LogAppendInfo(firstOffset.messageOffset, appendInfo.lastOffset) + case None => + throw new KafkaException(s"Append failed unexpectedly: $appendInfo") + } } override def lastFetchedEpoch: Int = { - log.latestEpoch.getOrElse(0) + log.latestEpoch.getOrElse { + latestSnapshotId.map { snapshotId => + val logEndOffset = endOffset().offset + if (snapshotId.offset == startOffset && snapshotId.offset == logEndOffset) { + // Return the epoch of the snapshot when the log is empty + snapshotId.epoch + } else { + throw new KafkaException( + s"Log doesn't have a last fetch epoch and there is a snapshot ($snapshotId). " + + s"Expected the snapshot's end offset to match the log's end offset ($logEndOffset) " + + s"and the log start offset ($startOffset)" + ) + } + }.orElse(0) + } } override def endOffsetForEpoch(leaderEpoch: Int): Optional[raft.OffsetAndEpoch] = { val endOffsetOpt = log.endOffsetForEpoch(leaderEpoch).map { offsetAndEpoch => - new raft.OffsetAndEpoch(offsetAndEpoch.offset, offsetAndEpoch.leaderEpoch) + if (oldestSnapshotId.isPresent() && Review comment: nit: might consider converting to scala Option and changing this to a `match`. A little easier on the eyes ########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -16,28 +16,41 @@ */ package kafka.raft +import java.nio.file.Files import java.nio.file.NoSuchFileException +import java.util.NoSuchElementException import java.util.Optional +import java.util.concurrent.ConcurrentSkipListSet -import kafka.log.{AppendOrigin, Log} +import kafka.log.{AppendOrigin, Log, SnapshotGenerated} import kafka.server.{FetchHighWatermark, FetchLogEnd} import org.apache.kafka.common.record.{MemoryRecords, Records} import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.raft -import org.apache.kafka.raft.{LogAppendInfo, LogFetchInfo, LogOffsetMetadata, Isolation, ReplicatedLog} +import org.apache.kafka.raft.{LogAppendInfo, LogFetchInfo, LogOffsetMetadata, Isolation, OffsetMetadata, ReplicatedLog} import org.apache.kafka.snapshot.FileRawSnapshotReader import org.apache.kafka.snapshot.FileRawSnapshotWriter import org.apache.kafka.snapshot.RawSnapshotReader import org.apache.kafka.snapshot.RawSnapshotWriter +import org.apache.kafka.snapshot.Snapshots import scala.compat.java8.OptionConverters._ -class KafkaMetadataLog( +final class KafkaMetadataLog private ( log: Log, + // This object needs to be thread-safe because the polling thread in the KafkaRaftClient implementation + // and other threads will access this object. This object is used to efficiently notify the polling thread + // when snapshots are created. + snapshotIds: ConcurrentSkipListSet[raft.OffsetAndEpoch], topicPartition: TopicPartition, - maxFetchSizeInBytes: Int = 1024 * 1024 + maxFetchSizeInBytes: Int ) extends ReplicatedLog { + private[this] var oldestSnapshotId = snapshotIds Review comment: Hmm... It might be helpful to document this fact somewhere since it seems non-obvious. It surprised me anyway. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org