hachikuji commented on a change in pull request #9816: URL: https://github.com/apache/kafka/pull/9816#discussion_r557643619
########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -16,28 +16,41 @@ */ package kafka.raft +import java.nio.file.Files import java.nio.file.NoSuchFileException +import java.util.NoSuchElementException import java.util.Optional +import java.util.concurrent.ConcurrentSkipListSet -import kafka.log.{AppendOrigin, Log} +import kafka.log.{AppendOrigin, Log, SnapshotGenerated} import kafka.server.{FetchHighWatermark, FetchLogEnd} import org.apache.kafka.common.record.{MemoryRecords, Records} import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.raft -import org.apache.kafka.raft.{LogAppendInfo, LogFetchInfo, LogOffsetMetadata, Isolation, ReplicatedLog} +import org.apache.kafka.raft.{LogAppendInfo, LogFetchInfo, LogOffsetMetadata, Isolation, OffsetMetadata, ReplicatedLog} import org.apache.kafka.snapshot.FileRawSnapshotReader import org.apache.kafka.snapshot.FileRawSnapshotWriter import org.apache.kafka.snapshot.RawSnapshotReader import org.apache.kafka.snapshot.RawSnapshotWriter +import org.apache.kafka.snapshot.Snapshots import scala.compat.java8.OptionConverters._ -class KafkaMetadataLog( +final class KafkaMetadataLog private ( log: Log, + // This object needs to be thread-safe because the polling thread in the KafkaRaftClient implementation + // and other threads will access this object. This object is used to efficiently notify the polling thread Review comment: What other threads? ########## File path: core/src/main/scala/kafka/log/Log.scala ########## @@ -370,7 +375,9 @@ class Log(@volatile private var _dir: File, throw new KafkaStorageException(s"The memory mapped buffer for log of $topicPartition is already closed") } - def highWatermark: Long = highWatermarkMetadata.messageOffset + def highWatermark: Long = _highWatermarkMetadata.messageOffset + + def highWatermarkMetadata: LogOffsetMetadata = _highWatermarkMetadata Review comment: Could we instead either expose `fetchHighWatermarkMetadata` or make use of `fetchOffsetSnapshot` in `KafkaMetadataLog`? ########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -147,18 +221,102 @@ class KafkaMetadataLog( } override def createSnapshot(snapshotId: raft.OffsetAndEpoch): RawSnapshotWriter = { - FileRawSnapshotWriter.create(log.dir.toPath, snapshotId) + // Do not let the state machine create snapshots older than the latest snapshot + latestSnapshotId().ifPresent { latest => + if (latest.epoch > snapshotId.epoch || latest.offset > snapshotId.offset) { + // Since snapshots are less than the high-watermark absolute offset comparison is okay. Review comment: Is it useful here to ensure that `snapshotId` here is lower than the high watermark? ########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -871,15 +875,32 @@ private FetchResponseData buildFetchResponse( .setLeaderEpoch(quorum.epoch()) .setLeaderId(quorum.leaderIdOrNil()); - divergingEpoch.ifPresent(partitionData::setDivergingEpoch); + switch (validatedOffsetAndEpoch.type()) { + case DIVERGING: + partitionData.divergingEpoch() + .setEpoch(validatedOffsetAndEpoch.offsetAndEpoch().epoch) + .setEndOffset(validatedOffsetAndEpoch.offsetAndEpoch().offset); + break; + case SNAPSHOT: + partitionData.snapshotId() + .setEpoch(validatedOffsetAndEpoch.offsetAndEpoch().epoch) + .setEndOffset(validatedOffsetAndEpoch.offsetAndEpoch().offset); + break; + default: + } }); } private FetchResponseData buildEmptyFetchResponse( Errors error, Optional<LogOffsetMetadata> highWatermark ) { - return buildFetchResponse(error, MemoryRecords.EMPTY, Optional.empty(), highWatermark); + return buildFetchResponse( + error, + MemoryRecords.EMPTY, + ValidatedFetchOffsetAndEpoch.valid(new OffsetAndEpoch(-1, -1)), Review comment: Not sure it's worth creating another type, but it is a little surprising to see `valid(new OffsetAndEpoch(-1, -1))`. ########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -2216,7 +2316,7 @@ public void complete() { // These fields are visible to both the Raft IO thread and the listener // and are protected through synchronization on this `ListenerContext` instance private BatchReader<T> lastSent = null; - private long lastAckedOffset = 0; + private long lastAckedEndOffset = 0; Review comment: I don't feel too strongly about it, but the new name is a little confusing to me. Why would the client only be acking end offsets? Especially confusing when I see this `lastAckedEndOffset = logStartOffset` 🙂 . I think we probably need a comment here regardless. ########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -69,23 +82,55 @@ class KafkaMetadataLog( val appendInfo = log.appendAsLeader(records.asInstanceOf[MemoryRecords], leaderEpoch = epoch, origin = AppendOrigin.Coordinator) - new LogAppendInfo(appendInfo.firstOffset.getOrElse { - throw new KafkaException("Append failed unexpectedly") - }, appendInfo.lastOffset) + + if (appendInfo.firstOffset.exists(_.relativePositionInSegment == 0)) { + // Assume that a new segment was created if the relative position is 0 + log.deleteOldSegments() + } + + new LogAppendInfo( + appendInfo.firstOffset.map(_.messageOffset).getOrElse { + throw new KafkaException("Append failed unexpectedly") + }, + appendInfo.lastOffset + ) } override def appendAsFollower(records: Records): LogAppendInfo = { if (records.sizeInBytes == 0) throw new IllegalArgumentException("Attempt to append an empty record set") val appendInfo = log.appendAsFollower(records.asInstanceOf[MemoryRecords]) - new LogAppendInfo(appendInfo.firstOffset.getOrElse { - throw new KafkaException("Append failed unexpectedly") - }, appendInfo.lastOffset) + + if (appendInfo.firstOffset.exists(_.relativePositionInSegment == 0)) { + // Assume that a new segment was created if the relative position is 0 + log.deleteOldSegments() + } + + new LogAppendInfo( + appendInfo.firstOffset.map(_.messageOffset).getOrElse { + throw new KafkaException("Append failed unexpectedly") + }, + appendInfo.lastOffset + ) } override def lastFetchedEpoch: Int = { - log.latestEpoch.getOrElse(0) + log.latestEpoch.getOrElse { + latestSnapshotId.map { snapshotId => + val logEndOffset = endOffset().offset + if (snapshotId.offset == startOffset && snapshotId.offset == logEndOffset) { + // Return the epoch of the snapshot when the log is empty + snapshotId.epoch + } else { + throw new KafkaException( + s"Log doesn't have a last fetch epoch and there is a snapshot ($snapshotId). " + Review comment: I guess it's probably better to throw here. I was trying to think how we could end up here, but the only way I came up with is an invalid state transition which left the start or end offset inconsistent with the segment data. Sadly we have had a number of those bugs in the past. I debated whether we should just delete the snapshot, but I'm not sure that helps if we are left with inconsistent start/end offsets. ########## File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala ########## @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.raft + +import java.io.File +import java.nio.file.Files +import java.nio.file.Path +import kafka.log.Log +import kafka.log.LogManager +import kafka.log.LogTest +import kafka.server.BrokerTopicStats +import kafka.server.LogDirFailureChannel +import kafka.utils.MockTime +import kafka.utils.TestUtils +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.OffsetOutOfRangeException +import org.apache.kafka.common.record.CompressionType +import org.apache.kafka.common.record.MemoryRecords +import org.apache.kafka.common.record.SimpleRecord +import org.apache.kafka.common.utils.Utils +import org.apache.kafka.raft.LogAppendInfo +import org.apache.kafka.raft.LogOffsetMetadata +import org.apache.kafka.raft.OffsetAndEpoch +import org.apache.kafka.raft.ReplicatedLog +import org.apache.kafka.snapshot.Snapshots +import org.junit.After +import org.junit.Assert.assertEquals +import org.junit.Assert.assertFalse +import org.junit.Assert.assertThrows +import org.junit.Assert.assertTrue +import org.junit.Before +import org.junit.Test + +final class KafkaMetadataLogTest { + import KafkaMetadataLogTest._ + + var tempDir: File = null + val mockTime = new MockTime() + + @Before + def setUp(): Unit = { + tempDir = TestUtils.tempDir() + } + + @After + def tearDown(): Unit = { + Utils.delete(tempDir) + } + + @Test + def testCreateSnapshot(): Unit = { + val topicPartition = new TopicPartition("cluster-metadata", 0) + val numberOfRecords = 10 + val epoch = 0 + val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch) + val log = buildMetadataLog(tempDir, mockTime, topicPartition) + + append(log, numberOfRecords, epoch) + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)) + + TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot => + snapshot.freeze() + } + + TestUtils.resource(log.readSnapshot(snapshotId).get()) { snapshot => + assertEquals(0, snapshot.sizeInBytes()) + } + } + + @Test + def testReadMissingSnapshot(): Unit = { + val topicPartition = new TopicPartition("cluster-metadata", 0) + val log = buildMetadataLog(tempDir, mockTime, topicPartition) + + assertFalse(log.readSnapshot(new OffsetAndEpoch(10, 0)).isPresent) Review comment: nit: doesn't matter too much for deterministic tests like this, but I think this is a nicer pattern: ```scala assertEquals(Optional.empty(), log.readSnapshot(new OffsetAndEpoch(10, 0))) ``` Then the assertion message tells you what the value was. ########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -113,6 +158,22 @@ class KafkaMetadataLog( log.truncateTo(offset) } + override def truncateFullyToLatestSnapshot(): Boolean = { Review comment: nit: the method name suggests that the truncation occurs unconditionally ########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -963,49 +979,111 @@ private FetchResponseData tryCompleteFetchRequest( FetchRequestData.FetchPartition request, long currentTimeMs ) { - Optional<Errors> errorOpt = validateLeaderOnlyRequest(request.currentLeaderEpoch()); - if (errorOpt.isPresent()) { - return buildEmptyFetchResponse(errorOpt.get(), Optional.empty()); - } + try { + Optional<Errors> errorOpt = validateLeaderOnlyRequest(request.currentLeaderEpoch()); + if (errorOpt.isPresent()) { + return buildEmptyFetchResponse(errorOpt.get(), Optional.empty()); + } - long fetchOffset = request.fetchOffset(); - int lastFetchedEpoch = request.lastFetchedEpoch(); - LeaderState state = quorum.leaderStateOrThrow(); - Optional<OffsetAndEpoch> divergingEpochOpt = validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch); - - if (divergingEpochOpt.isPresent()) { - Optional<FetchResponseData.EpochEndOffset> divergingEpoch = - divergingEpochOpt.map(offsetAndEpoch -> new FetchResponseData.EpochEndOffset() - .setEpoch(offsetAndEpoch.epoch) - .setEndOffset(offsetAndEpoch.offset)); - return buildFetchResponse(Errors.NONE, MemoryRecords.EMPTY, divergingEpoch, state.highWatermark()); - } else { - LogFetchInfo info = log.read(fetchOffset, Isolation.UNCOMMITTED); + long fetchOffset = request.fetchOffset(); + int lastFetchedEpoch = request.lastFetchedEpoch(); + LeaderState state = quorum.leaderStateOrThrow(); + ValidatedFetchOffsetAndEpoch validatedOffsetAndEpoch = validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch); - if (state.updateReplicaState(replicaId, currentTimeMs, info.startOffsetMetadata)) { - onUpdateLeaderHighWatermark(state, currentTimeMs); + final Records records; + if (validatedOffsetAndEpoch.type() == ValidatedFetchOffsetAndEpoch.Type.VALID) { + LogFetchInfo info = log.read(fetchOffset, Isolation.UNCOMMITTED); + + if (state.updateReplicaState(replicaId, currentTimeMs, info.startOffsetMetadata)) { + onUpdateLeaderHighWatermark(state, currentTimeMs); + } + + records = info.records; + } else { + records = MemoryRecords.EMPTY; } - return buildFetchResponse(Errors.NONE, info.records, Optional.empty(), state.highWatermark()); + return buildFetchResponse(Errors.NONE, records, validatedOffsetAndEpoch, state.highWatermark()); + } catch (Exception e) { + logger.error("Caught unexpected error in fetch completion of request {}", request, e); + return buildEmptyFetchResponse(Errors.UNKNOWN_SERVER_ERROR, Optional.empty()); } } /** * Check whether a fetch offset and epoch is valid. Return the diverging epoch, which * is the largest epoch such that subsequent records are known to diverge. */ - private Optional<OffsetAndEpoch> validateFetchOffsetAndEpoch(long fetchOffset, int lastFetchedEpoch) { - if (fetchOffset == 0 && lastFetchedEpoch == 0) { - return Optional.empty(); + private ValidatedFetchOffsetAndEpoch validateFetchOffsetAndEpoch(long fetchOffset, int lastFetchedEpoch) { + if (log.startOffset() == 0 && fetchOffset == 0) { + if (lastFetchedEpoch != 0) { + logger.warn( + "Replica sent a zero fetch offset ({}) but the last fetched epoch ({}) was not zero", + fetchOffset, + lastFetchedEpoch + ); + } + return ValidatedFetchOffsetAndEpoch.valid(new OffsetAndEpoch(fetchOffset, lastFetchedEpoch)); } - OffsetAndEpoch endOffsetAndEpoch = log.endOffsetForEpoch(lastFetchedEpoch) - .orElse(new OffsetAndEpoch(-1L, -1)); - if (endOffsetAndEpoch.epoch != lastFetchedEpoch || endOffsetAndEpoch.offset < fetchOffset) { - return Optional.of(endOffsetAndEpoch); + + Optional<OffsetAndEpoch> endOffsetAndEpochOpt = log + .endOffsetForEpoch(lastFetchedEpoch) + .flatMap(endOffsetAndEpoch -> { + if (endOffsetAndEpoch.epoch == lastFetchedEpoch && endOffsetAndEpoch.offset == log.startOffset()) { Review comment: So basically we are trying to detect when the fetch is outside the range of the log, which means we will need to send a snapshot to the follower. Is that right? One thought we don't have to do here is perhaps we should change the semantics of `endOffsetForEpoch` as exposed by `ReplicatedLog`. Maybe we should only return the end offset of an epoch when we know it with certainty. ########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -16,28 +16,41 @@ */ package kafka.raft +import java.nio.file.Files import java.nio.file.NoSuchFileException +import java.util.NoSuchElementException import java.util.Optional +import java.util.concurrent.ConcurrentSkipListSet -import kafka.log.{AppendOrigin, Log} +import kafka.log.{AppendOrigin, Log, SnapshotGenerated} import kafka.server.{FetchHighWatermark, FetchLogEnd} import org.apache.kafka.common.record.{MemoryRecords, Records} import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.raft -import org.apache.kafka.raft.{LogAppendInfo, LogFetchInfo, LogOffsetMetadata, Isolation, ReplicatedLog} +import org.apache.kafka.raft.{LogAppendInfo, LogFetchInfo, LogOffsetMetadata, Isolation, OffsetMetadata, ReplicatedLog} import org.apache.kafka.snapshot.FileRawSnapshotReader import org.apache.kafka.snapshot.FileRawSnapshotWriter import org.apache.kafka.snapshot.RawSnapshotReader import org.apache.kafka.snapshot.RawSnapshotWriter +import org.apache.kafka.snapshot.Snapshots import scala.compat.java8.OptionConverters._ -class KafkaMetadataLog( +final class KafkaMetadataLog private ( log: Log, + // This object needs to be thread-safe because the polling thread in the KafkaRaftClient implementation + // and other threads will access this object. This object is used to efficiently notify the polling thread + // when snapshots are created. + snapshotIds: ConcurrentSkipListSet[raft.OffsetAndEpoch], topicPartition: TopicPartition, - maxFetchSizeInBytes: Int = 1024 * 1024 + maxFetchSizeInBytes: Int ) extends ReplicatedLog { + private[this] var oldestSnapshotId = snapshotIds Review comment: Do we need to keep this as a var or could we access it from `snapshotIds` when needed (as we do for `latestSnapshotId`)? ########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -69,23 +82,55 @@ class KafkaMetadataLog( val appendInfo = log.appendAsLeader(records.asInstanceOf[MemoryRecords], leaderEpoch = epoch, origin = AppendOrigin.Coordinator) - new LogAppendInfo(appendInfo.firstOffset.getOrElse { - throw new KafkaException("Append failed unexpectedly") - }, appendInfo.lastOffset) + + if (appendInfo.firstOffset.exists(_.relativePositionInSegment == 0)) { Review comment: Maybe add a little helper? This code looks the same as in `appendAsFollower`. ########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -69,23 +82,55 @@ class KafkaMetadataLog( val appendInfo = log.appendAsLeader(records.asInstanceOf[MemoryRecords], leaderEpoch = epoch, origin = AppendOrigin.Coordinator) - new LogAppendInfo(appendInfo.firstOffset.getOrElse { - throw new KafkaException("Append failed unexpectedly") - }, appendInfo.lastOffset) + + if (appendInfo.firstOffset.exists(_.relativePositionInSegment == 0)) { + // Assume that a new segment was created if the relative position is 0 + log.deleteOldSegments() + } + + new LogAppendInfo( + appendInfo.firstOffset.map(_.messageOffset).getOrElse { + throw new KafkaException("Append failed unexpectedly") + }, + appendInfo.lastOffset + ) } override def appendAsFollower(records: Records): LogAppendInfo = { if (records.sizeInBytes == 0) throw new IllegalArgumentException("Attempt to append an empty record set") val appendInfo = log.appendAsFollower(records.asInstanceOf[MemoryRecords]) - new LogAppendInfo(appendInfo.firstOffset.getOrElse { - throw new KafkaException("Append failed unexpectedly") - }, appendInfo.lastOffset) + + if (appendInfo.firstOffset.exists(_.relativePositionInSegment == 0)) { + // Assume that a new segment was created if the relative position is 0 + log.deleteOldSegments() + } + + new LogAppendInfo( + appendInfo.firstOffset.map(_.messageOffset).getOrElse { + throw new KafkaException("Append failed unexpectedly") + }, + appendInfo.lastOffset + ) } override def lastFetchedEpoch: Int = { - log.latestEpoch.getOrElse(0) + log.latestEpoch.getOrElse { + latestSnapshotId.map { snapshotId => + val logEndOffset = endOffset().offset + if (snapshotId.offset == startOffset && snapshotId.offset == logEndOffset) { + // Return the epoch of the snapshot when the log is empty + snapshotId.epoch + } else { + throw new KafkaException( + s"Log doesn't have a last fetch epoch and there is a snapshot ($snapshotId). " + + s"Expected the snapshot's end offset to match the logs end offset ($logEndOffset) " + Review comment: nit: `log's` ########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -69,23 +82,55 @@ class KafkaMetadataLog( val appendInfo = log.appendAsLeader(records.asInstanceOf[MemoryRecords], leaderEpoch = epoch, origin = AppendOrigin.Coordinator) - new LogAppendInfo(appendInfo.firstOffset.getOrElse { - throw new KafkaException("Append failed unexpectedly") - }, appendInfo.lastOffset) + + if (appendInfo.firstOffset.exists(_.relativePositionInSegment == 0)) { + // Assume that a new segment was created if the relative position is 0 + log.deleteOldSegments() + } + + new LogAppendInfo( + appendInfo.firstOffset.map(_.messageOffset).getOrElse { Review comment: nit: could probably consolidate the `exists` and `map` here into a single `match` ```scala appendInfo.firstOffset match { case None => throw new KafkaException case Some(offsetMetadata=> if (offsetMetadata.relativePositionInSegment == 0) { log.deleteOldSegments() } appendInfo.lastOffset } ``` ########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -147,18 +221,102 @@ class KafkaMetadataLog( } override def createSnapshot(snapshotId: raft.OffsetAndEpoch): RawSnapshotWriter = { - FileRawSnapshotWriter.create(log.dir.toPath, snapshotId) + // Do not let the state machine create snapshots older than the latest snapshot + latestSnapshotId().ifPresent { latest => + if (latest.epoch > snapshotId.epoch || latest.offset > snapshotId.offset) { + // Since snapshots are less than the high-watermark absolute offset comparison is okay. + throw new IllegalArgumentException( + s"Attemting to create a snapshot ($snapshotId) that is not greater than the latest snapshot ($latest)" + ) + } + } + + FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this)) } override def readSnapshot(snapshotId: raft.OffsetAndEpoch): Optional[RawSnapshotReader] = { try { - Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) + if (snapshotIds.contains(snapshotId)) { + Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) + } else { + Optional.empty() + } } catch { - case e: NoSuchFileException => Optional.empty() + case _: NoSuchFileException => + Optional.empty() + } + } + + override def latestSnapshotId(): Optional[raft.OffsetAndEpoch] = { + try { + Optional.of(snapshotIds.last) + } catch { + case _: NoSuchElementException => + Optional.empty() + } + } + + override def oldestSnapshotId(): Optional[raft.OffsetAndEpoch] = { + oldestSnapshotId + } + + override def onSnapshotFrozen(snapshotId: raft.OffsetAndEpoch): Unit = { + snapshotIds.add(snapshotId) + } + + override def updateLogStart(logStartSnapshotId: raft.OffsetAndEpoch): Boolean = { Review comment: One thing I was considering is whether we even want to expose this at all. In other words, who should be responsible for cleaning up old snapshots? I am tempted to say that that should be the responsibility of the Log implementation, but perhaps there are reasons it should not be? ########## File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java ########## @@ -106,6 +120,14 @@ */ void updateHighWatermark(LogOffsetMetadata offsetMetadata); + /** + * Updates the log start offset if necessary. + * + * The replicated log's start offset can be increased when there is a snapshot greater than the + * current log start offset. + */ + boolean updateLogStart(OffsetAndEpoch logStartSnapshotId); Review comment: Perhaps we can make this mirror `truncateFullyToLatestSnapshot`. Perhaps `deleteToLatestSnapshot()` or something like that. ########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -963,49 +979,111 @@ private FetchResponseData tryCompleteFetchRequest( FetchRequestData.FetchPartition request, long currentTimeMs ) { - Optional<Errors> errorOpt = validateLeaderOnlyRequest(request.currentLeaderEpoch()); - if (errorOpt.isPresent()) { - return buildEmptyFetchResponse(errorOpt.get(), Optional.empty()); - } + try { + Optional<Errors> errorOpt = validateLeaderOnlyRequest(request.currentLeaderEpoch()); + if (errorOpt.isPresent()) { + return buildEmptyFetchResponse(errorOpt.get(), Optional.empty()); + } - long fetchOffset = request.fetchOffset(); - int lastFetchedEpoch = request.lastFetchedEpoch(); - LeaderState state = quorum.leaderStateOrThrow(); - Optional<OffsetAndEpoch> divergingEpochOpt = validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch); - - if (divergingEpochOpt.isPresent()) { - Optional<FetchResponseData.EpochEndOffset> divergingEpoch = - divergingEpochOpt.map(offsetAndEpoch -> new FetchResponseData.EpochEndOffset() - .setEpoch(offsetAndEpoch.epoch) - .setEndOffset(offsetAndEpoch.offset)); - return buildFetchResponse(Errors.NONE, MemoryRecords.EMPTY, divergingEpoch, state.highWatermark()); - } else { - LogFetchInfo info = log.read(fetchOffset, Isolation.UNCOMMITTED); + long fetchOffset = request.fetchOffset(); + int lastFetchedEpoch = request.lastFetchedEpoch(); + LeaderState state = quorum.leaderStateOrThrow(); + ValidatedFetchOffsetAndEpoch validatedOffsetAndEpoch = validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch); - if (state.updateReplicaState(replicaId, currentTimeMs, info.startOffsetMetadata)) { - onUpdateLeaderHighWatermark(state, currentTimeMs); + final Records records; + if (validatedOffsetAndEpoch.type() == ValidatedFetchOffsetAndEpoch.Type.VALID) { + LogFetchInfo info = log.read(fetchOffset, Isolation.UNCOMMITTED); + + if (state.updateReplicaState(replicaId, currentTimeMs, info.startOffsetMetadata)) { + onUpdateLeaderHighWatermark(state, currentTimeMs); + } + + records = info.records; + } else { + records = MemoryRecords.EMPTY; } - return buildFetchResponse(Errors.NONE, info.records, Optional.empty(), state.highWatermark()); + return buildFetchResponse(Errors.NONE, records, validatedOffsetAndEpoch, state.highWatermark()); + } catch (Exception e) { + logger.error("Caught unexpected error in fetch completion of request {}", request, e); + return buildEmptyFetchResponse(Errors.UNKNOWN_SERVER_ERROR, Optional.empty()); } } /** * Check whether a fetch offset and epoch is valid. Return the diverging epoch, which * is the largest epoch such that subsequent records are known to diverge. */ - private Optional<OffsetAndEpoch> validateFetchOffsetAndEpoch(long fetchOffset, int lastFetchedEpoch) { - if (fetchOffset == 0 && lastFetchedEpoch == 0) { - return Optional.empty(); + private ValidatedFetchOffsetAndEpoch validateFetchOffsetAndEpoch(long fetchOffset, int lastFetchedEpoch) { + if (log.startOffset() == 0 && fetchOffset == 0) { + if (lastFetchedEpoch != 0) { + logger.warn( + "Replica sent a zero fetch offset ({}) but the last fetched epoch ({}) was not zero", + fetchOffset, + lastFetchedEpoch + ); + } + return ValidatedFetchOffsetAndEpoch.valid(new OffsetAndEpoch(fetchOffset, lastFetchedEpoch)); } - OffsetAndEpoch endOffsetAndEpoch = log.endOffsetForEpoch(lastFetchedEpoch) - .orElse(new OffsetAndEpoch(-1L, -1)); - if (endOffsetAndEpoch.epoch != lastFetchedEpoch || endOffsetAndEpoch.offset < fetchOffset) { - return Optional.of(endOffsetAndEpoch); + + Optional<OffsetAndEpoch> endOffsetAndEpochOpt = log + .endOffsetForEpoch(lastFetchedEpoch) + .flatMap(endOffsetAndEpoch -> { + if (endOffsetAndEpoch.epoch == lastFetchedEpoch && endOffsetAndEpoch.offset == log.startOffset()) { + // This means that either: + // 1. The lastFetchedEpoch is smaller than any known epoch + // 2. The current leader epoch is lastFetchedEpoch and the log is empty. + // Assume that there is not diverging information + return Optional.empty(); + } else { + return Optional.of(endOffsetAndEpoch); + } + }); + if (endOffsetAndEpochOpt.isPresent()) { + OffsetAndEpoch endOffsetAndEpoch = endOffsetAndEpochOpt.get(); + if (endOffsetAndEpoch.epoch != lastFetchedEpoch || endOffsetAndEpoch.offset < fetchOffset) { + return ValidatedFetchOffsetAndEpoch.diverging(endOffsetAndEpoch); + } else { + return ValidatedFetchOffsetAndEpoch.valid(new OffsetAndEpoch(fetchOffset, lastFetchedEpoch)); + } + } else if (log.startOffset() > 0) { + OffsetAndEpoch oldestSnapshotId = log.oldestSnapshotId().orElseThrow(() -> { + return new IllegalStateException( + String.format( + "The log start offset (%s) was greater than zero but start snapshot was not found", + log.startOffset() + ) + ); + }); + + if (fetchOffset == log.startOffset() && lastFetchedEpoch == oldestSnapshotId.epoch) { Review comment: Just checking, but I think we would only hit this case if the snapshot epoch did not match the epoch of the first entry in the log. Is that right? This might be another case that we should try to handle inside `KafkaMetadataLog. endOffsetForEpoch `. ########## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java ########## @@ -43,7 +44,59 @@ final public class KafkaRaftClientSnapshotTest { @Test - public void testMissingFetchSnapshotRequest() throws Exception { + public void testFetchRequest() throws Exception { + int localId = 0; + int otherNodeId = localId + 1; + Set<Integer> voters = Utils.mkSet(localId, otherNodeId); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + .withAppendLingerMs(1) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + String[] appendRecords = new String[] {"a", "b", "c"}; + context.client.scheduleAppend(epoch, Arrays.asList(appendRecords)); + context.time.sleep(context.appendLingerMs()); + context.client.poll(); + + long localLogEndOffset = context.log.endOffset().offset; + assertTrue( + appendRecords.length <= localLogEndOffset, + String.format("Record length = %s, log end offset = %s", appendRecords.length, localLogEndOffset) + ); + + // Advance the highWatermark + context.deliverRequest(context.fetchRequest(epoch, otherNodeId, localLogEndOffset, epoch, 0)); + context.pollUntilResponse(); + context.assertSentFetchResponse(Errors.NONE, epoch, OptionalInt.of(localId)); + assertEquals(localLogEndOffset, context.client.highWatermark().getAsLong()); + + OffsetAndEpoch snapshotId = new OffsetAndEpoch(localLogEndOffset, epoch); + try (SnapshotWriter<String> snapshot = context.client.createSnapshot(snapshotId)) { + snapshot.freeze(); + } + + context.client.poll(); + + assertEquals(snapshotId.offset, context.log.startOffset()); + + // Send Fetch request less than start offset + context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 0, epoch, 0)); + context.pollUntilResponse(); + FetchResponseData.FetchablePartitionResponse partitionResponse = context.assertSentFetchResponse(); + assertEquals(Errors.NONE, Errors.forCode(partitionResponse.errorCode())); + assertEquals(epoch, partitionResponse.currentLeader().leaderEpoch()); + assertEquals(localId, partitionResponse.currentLeader().leaderId()); + assertEquals(snapshotId.epoch, partitionResponse.snapshotId().epoch()); + assertEquals(snapshotId.offset, partitionResponse.snapshotId().endOffset()); + } + + // TODO: Add a few more fetch request tests checking some of the error conditions and edge conditions Review comment: Reminder here ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org