hachikuji commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r557643619



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -16,28 +16,41 @@
  */
 package kafka.raft
 
+import java.nio.file.Files
 import java.nio.file.NoSuchFileException
+import java.util.NoSuchElementException
 import java.util.Optional
+import java.util.concurrent.ConcurrentSkipListSet
 
-import kafka.log.{AppendOrigin, Log}
+import kafka.log.{AppendOrigin, Log, SnapshotGenerated}
 import kafka.server.{FetchHighWatermark, FetchLogEnd}
 import org.apache.kafka.common.record.{MemoryRecords, Records}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.raft
-import org.apache.kafka.raft.{LogAppendInfo, LogFetchInfo, LogOffsetMetadata, 
Isolation, ReplicatedLog}
+import org.apache.kafka.raft.{LogAppendInfo, LogFetchInfo, LogOffsetMetadata, 
Isolation, OffsetMetadata, ReplicatedLog}
 import org.apache.kafka.snapshot.FileRawSnapshotReader
 import org.apache.kafka.snapshot.FileRawSnapshotWriter
 import org.apache.kafka.snapshot.RawSnapshotReader
 import org.apache.kafka.snapshot.RawSnapshotWriter
+import org.apache.kafka.snapshot.Snapshots
 
 import scala.compat.java8.OptionConverters._
 
-class KafkaMetadataLog(
+final class KafkaMetadataLog private (
   log: Log,
+  // This object needs to be thread-safe because the polling thread in the 
KafkaRaftClient implementation
+  // and other threads will access this object. This object is used to 
efficiently notify the polling thread

Review comment:
       What other threads? 

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -370,7 +375,9 @@ class Log(@volatile private var _dir: File,
       throw new KafkaStorageException(s"The memory mapped buffer for log of 
$topicPartition is already closed")
   }
 
-  def highWatermark: Long = highWatermarkMetadata.messageOffset
+  def highWatermark: Long = _highWatermarkMetadata.messageOffset
+
+  def highWatermarkMetadata: LogOffsetMetadata = _highWatermarkMetadata

Review comment:
       Could we instead either expose `fetchHighWatermarkMetadata` or make use 
of `fetchOffsetSnapshot` in `KafkaMetadataLog`? 

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -147,18 +221,102 @@ class KafkaMetadataLog(
   }
 
   override def createSnapshot(snapshotId: raft.OffsetAndEpoch): 
RawSnapshotWriter = {
-    FileRawSnapshotWriter.create(log.dir.toPath, snapshotId)
+    // Do not let the state machine create snapshots older than the latest 
snapshot
+    latestSnapshotId().ifPresent { latest =>
+      if (latest.epoch > snapshotId.epoch || latest.offset > 
snapshotId.offset) {
+        // Since snapshots are less than the high-watermark absolute offset 
comparison is okay.

Review comment:
       Is it useful here to ensure that `snapshotId` here is lower than the 
high watermark?

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -871,15 +875,32 @@ private FetchResponseData buildFetchResponse(
                 .setLeaderEpoch(quorum.epoch())
                 .setLeaderId(quorum.leaderIdOrNil());
 
-            divergingEpoch.ifPresent(partitionData::setDivergingEpoch);
+            switch (validatedOffsetAndEpoch.type()) {
+                case DIVERGING:
+                    partitionData.divergingEpoch()
+                        
.setEpoch(validatedOffsetAndEpoch.offsetAndEpoch().epoch)
+                        
.setEndOffset(validatedOffsetAndEpoch.offsetAndEpoch().offset);
+                    break;
+                case SNAPSHOT:
+                    partitionData.snapshotId()
+                        
.setEpoch(validatedOffsetAndEpoch.offsetAndEpoch().epoch)
+                        
.setEndOffset(validatedOffsetAndEpoch.offsetAndEpoch().offset);
+                    break;
+                default:
+            }
         });
     }
 
     private FetchResponseData buildEmptyFetchResponse(
         Errors error,
         Optional<LogOffsetMetadata> highWatermark
     ) {
-        return buildFetchResponse(error, MemoryRecords.EMPTY, 
Optional.empty(), highWatermark);
+        return buildFetchResponse(
+            error,
+            MemoryRecords.EMPTY,
+            ValidatedFetchOffsetAndEpoch.valid(new OffsetAndEpoch(-1, -1)),

Review comment:
       Not sure it's worth creating another type, but it is a little surprising 
to see `valid(new OffsetAndEpoch(-1, -1))`.

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2216,7 +2316,7 @@ public void complete() {
         // These fields are visible to both the Raft IO thread and the listener
         // and are protected through synchronization on this `ListenerContext` 
instance
         private BatchReader<T> lastSent = null;
-        private long lastAckedOffset = 0;
+        private long lastAckedEndOffset = 0;

Review comment:
       I don't feel too strongly about it, but the new name is a little 
confusing to me. Why would the client only be acking end offsets? Especially 
confusing when I see this `lastAckedEndOffset = logStartOffset` 🙂 .  I think we 
probably need a comment here regardless.

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -69,23 +82,55 @@ class KafkaMetadataLog(
     val appendInfo = log.appendAsLeader(records.asInstanceOf[MemoryRecords],
       leaderEpoch = epoch,
       origin = AppendOrigin.Coordinator)
-    new LogAppendInfo(appendInfo.firstOffset.getOrElse {
-      throw new KafkaException("Append failed unexpectedly")
-    }, appendInfo.lastOffset)
+
+    if (appendInfo.firstOffset.exists(_.relativePositionInSegment == 0)) {
+      // Assume that a new segment was created if the relative position is 0
+      log.deleteOldSegments()
+    }
+
+    new LogAppendInfo(
+      appendInfo.firstOffset.map(_.messageOffset).getOrElse {
+        throw new KafkaException("Append failed unexpectedly")
+      },
+      appendInfo.lastOffset
+    )
   }
 
   override def appendAsFollower(records: Records): LogAppendInfo = {
     if (records.sizeInBytes == 0)
       throw new IllegalArgumentException("Attempt to append an empty record 
set")
 
     val appendInfo = log.appendAsFollower(records.asInstanceOf[MemoryRecords])
-    new LogAppendInfo(appendInfo.firstOffset.getOrElse {
-      throw new KafkaException("Append failed unexpectedly")
-    }, appendInfo.lastOffset)
+
+    if (appendInfo.firstOffset.exists(_.relativePositionInSegment == 0)) {
+      // Assume that a new segment was created if the relative position is 0
+      log.deleteOldSegments()
+    }
+
+    new LogAppendInfo(
+      appendInfo.firstOffset.map(_.messageOffset).getOrElse {
+        throw new KafkaException("Append failed unexpectedly")
+      },
+      appendInfo.lastOffset
+    )
   }
 
   override def lastFetchedEpoch: Int = {
-    log.latestEpoch.getOrElse(0)
+    log.latestEpoch.getOrElse {
+      latestSnapshotId.map { snapshotId =>
+        val logEndOffset = endOffset().offset
+        if (snapshotId.offset == startOffset && snapshotId.offset == 
logEndOffset) {
+          // Return the epoch of the snapshot when the log is empty
+          snapshotId.epoch
+        } else {
+          throw new KafkaException(
+            s"Log doesn't have a last fetch epoch and there is a snapshot 
($snapshotId). " +

Review comment:
       I guess it's probably better to throw here. I was trying to think how we 
could end up here, but the only way I came up with is an invalid state 
transition which left the start or end offset inconsistent with the segment 
data. Sadly we have had a number of those bugs in the past. I debated whether 
we should just delete the snapshot, but I'm not sure that helps if we are left 
with inconsistent start/end offsets.

##########
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.raft
+
+import java.io.File
+import java.nio.file.Files
+import java.nio.file.Path
+import kafka.log.Log
+import kafka.log.LogManager
+import kafka.log.LogTest
+import kafka.server.BrokerTopicStats
+import kafka.server.LogDirFailureChannel
+import kafka.utils.MockTime
+import kafka.utils.TestUtils
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.OffsetOutOfRangeException
+import org.apache.kafka.common.record.CompressionType
+import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.record.SimpleRecord
+import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.raft.LogAppendInfo
+import org.apache.kafka.raft.LogOffsetMetadata
+import org.apache.kafka.raft.OffsetAndEpoch
+import org.apache.kafka.raft.ReplicatedLog
+import org.apache.kafka.snapshot.Snapshots
+import org.junit.After
+import org.junit.Assert.assertEquals
+import org.junit.Assert.assertFalse
+import org.junit.Assert.assertThrows
+import org.junit.Assert.assertTrue
+import org.junit.Before
+import org.junit.Test
+
+final class KafkaMetadataLogTest {
+  import KafkaMetadataLogTest._
+
+  var tempDir: File = null
+  val mockTime = new MockTime()
+
+  @Before
+  def setUp(): Unit = {
+    tempDir = TestUtils.tempDir()
+  }
+
+  @After
+  def tearDown(): Unit = {
+    Utils.delete(tempDir)
+  }
+
+  @Test
+  def testCreateSnapshot(): Unit = {
+    val topicPartition = new TopicPartition("cluster-metadata", 0)
+    val numberOfRecords = 10
+    val epoch = 0
+    val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
+    val log = buildMetadataLog(tempDir, mockTime, topicPartition)
+
+    append(log, numberOfRecords, epoch)
+    log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
+
+    TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+      snapshot.freeze()
+    }
+
+    TestUtils.resource(log.readSnapshot(snapshotId).get()) { snapshot =>
+      assertEquals(0, snapshot.sizeInBytes())
+    }
+  }
+
+  @Test
+  def testReadMissingSnapshot(): Unit = {
+    val topicPartition = new TopicPartition("cluster-metadata", 0)
+    val log = buildMetadataLog(tempDir, mockTime, topicPartition)
+
+    assertFalse(log.readSnapshot(new OffsetAndEpoch(10, 0)).isPresent)

Review comment:
       nit: doesn't matter too much for deterministic tests like this, but I 
think this is a nicer pattern:
   ```scala
   assertEquals(Optional.empty(), log.readSnapshot(new OffsetAndEpoch(10, 0)))
   ```
   Then the assertion message tells you what the value was.

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -113,6 +158,22 @@ class KafkaMetadataLog(
     log.truncateTo(offset)
   }
 
+  override def truncateFullyToLatestSnapshot(): Boolean = {

Review comment:
       nit: the method name suggests that the truncation occurs unconditionally

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -963,49 +979,111 @@ private FetchResponseData tryCompleteFetchRequest(
         FetchRequestData.FetchPartition request,
         long currentTimeMs
     ) {
-        Optional<Errors> errorOpt = 
validateLeaderOnlyRequest(request.currentLeaderEpoch());
-        if (errorOpt.isPresent()) {
-            return buildEmptyFetchResponse(errorOpt.get(), Optional.empty());
-        }
+        try {
+            Optional<Errors> errorOpt = 
validateLeaderOnlyRequest(request.currentLeaderEpoch());
+            if (errorOpt.isPresent()) {
+                return buildEmptyFetchResponse(errorOpt.get(), 
Optional.empty());
+            }
 
-        long fetchOffset = request.fetchOffset();
-        int lastFetchedEpoch = request.lastFetchedEpoch();
-        LeaderState state = quorum.leaderStateOrThrow();
-        Optional<OffsetAndEpoch> divergingEpochOpt = 
validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
-
-        if (divergingEpochOpt.isPresent()) {
-            Optional<FetchResponseData.EpochEndOffset> divergingEpoch =
-                divergingEpochOpt.map(offsetAndEpoch -> new 
FetchResponseData.EpochEndOffset()
-                    .setEpoch(offsetAndEpoch.epoch)
-                    .setEndOffset(offsetAndEpoch.offset));
-            return buildFetchResponse(Errors.NONE, MemoryRecords.EMPTY, 
divergingEpoch, state.highWatermark());
-        } else {
-            LogFetchInfo info = log.read(fetchOffset, Isolation.UNCOMMITTED);
+            long fetchOffset = request.fetchOffset();
+            int lastFetchedEpoch = request.lastFetchedEpoch();
+            LeaderState state = quorum.leaderStateOrThrow();
+            ValidatedFetchOffsetAndEpoch validatedOffsetAndEpoch = 
validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
 
-            if (state.updateReplicaState(replicaId, currentTimeMs, 
info.startOffsetMetadata)) {
-                onUpdateLeaderHighWatermark(state, currentTimeMs);
+            final Records records;
+            if (validatedOffsetAndEpoch.type() == 
ValidatedFetchOffsetAndEpoch.Type.VALID) {
+                LogFetchInfo info = log.read(fetchOffset, 
Isolation.UNCOMMITTED);
+
+                if (state.updateReplicaState(replicaId, currentTimeMs, 
info.startOffsetMetadata)) {
+                    onUpdateLeaderHighWatermark(state, currentTimeMs);
+                }
+
+                records = info.records;
+            } else {
+                records = MemoryRecords.EMPTY;
             }
 
-            return buildFetchResponse(Errors.NONE, info.records, 
Optional.empty(), state.highWatermark());
+            return buildFetchResponse(Errors.NONE, records, 
validatedOffsetAndEpoch, state.highWatermark());
+        } catch (Exception e) {
+            logger.error("Caught unexpected error in fetch completion of 
request {}", request, e);
+            return buildEmptyFetchResponse(Errors.UNKNOWN_SERVER_ERROR, 
Optional.empty());
         }
     }
 
     /**
      * Check whether a fetch offset and epoch is valid. Return the diverging 
epoch, which
      * is the largest epoch such that subsequent records are known to diverge.
      */
-    private Optional<OffsetAndEpoch> validateFetchOffsetAndEpoch(long 
fetchOffset, int lastFetchedEpoch) {
-        if (fetchOffset == 0 && lastFetchedEpoch == 0) {
-            return Optional.empty();
+    private ValidatedFetchOffsetAndEpoch validateFetchOffsetAndEpoch(long 
fetchOffset, int lastFetchedEpoch) {
+        if (log.startOffset() == 0 && fetchOffset == 0) {
+            if (lastFetchedEpoch != 0) {
+                logger.warn(
+                    "Replica sent a zero fetch offset ({}) but the last 
fetched epoch ({}) was not zero",
+                    fetchOffset,
+                    lastFetchedEpoch
+                );
+            }
+            return ValidatedFetchOffsetAndEpoch.valid(new 
OffsetAndEpoch(fetchOffset, lastFetchedEpoch));
         }
 
-        OffsetAndEpoch endOffsetAndEpoch = 
log.endOffsetForEpoch(lastFetchedEpoch)
-            .orElse(new OffsetAndEpoch(-1L, -1));
-        if (endOffsetAndEpoch.epoch != lastFetchedEpoch || 
endOffsetAndEpoch.offset < fetchOffset) {
-            return Optional.of(endOffsetAndEpoch);
+
+        Optional<OffsetAndEpoch> endOffsetAndEpochOpt = log
+            .endOffsetForEpoch(lastFetchedEpoch)
+            .flatMap(endOffsetAndEpoch -> {
+                if (endOffsetAndEpoch.epoch == lastFetchedEpoch && 
endOffsetAndEpoch.offset == log.startOffset()) {

Review comment:
       So basically we are trying to detect when the fetch is outside the range 
of the log, which means we will need to send a snapshot to the follower. Is 
that right?
   
   One thought we don't have to do here is perhaps we should change the 
semantics of `endOffsetForEpoch` as exposed by `ReplicatedLog`. Maybe we should 
only return the end offset of an epoch when we know it with certainty.

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -16,28 +16,41 @@
  */
 package kafka.raft
 
+import java.nio.file.Files
 import java.nio.file.NoSuchFileException
+import java.util.NoSuchElementException
 import java.util.Optional
+import java.util.concurrent.ConcurrentSkipListSet
 
-import kafka.log.{AppendOrigin, Log}
+import kafka.log.{AppendOrigin, Log, SnapshotGenerated}
 import kafka.server.{FetchHighWatermark, FetchLogEnd}
 import org.apache.kafka.common.record.{MemoryRecords, Records}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.raft
-import org.apache.kafka.raft.{LogAppendInfo, LogFetchInfo, LogOffsetMetadata, 
Isolation, ReplicatedLog}
+import org.apache.kafka.raft.{LogAppendInfo, LogFetchInfo, LogOffsetMetadata, 
Isolation, OffsetMetadata, ReplicatedLog}
 import org.apache.kafka.snapshot.FileRawSnapshotReader
 import org.apache.kafka.snapshot.FileRawSnapshotWriter
 import org.apache.kafka.snapshot.RawSnapshotReader
 import org.apache.kafka.snapshot.RawSnapshotWriter
+import org.apache.kafka.snapshot.Snapshots
 
 import scala.compat.java8.OptionConverters._
 
-class KafkaMetadataLog(
+final class KafkaMetadataLog private (
   log: Log,
+  // This object needs to be thread-safe because the polling thread in the 
KafkaRaftClient implementation
+  // and other threads will access this object. This object is used to 
efficiently notify the polling thread
+  // when snapshots are created.
+  snapshotIds: ConcurrentSkipListSet[raft.OffsetAndEpoch],
   topicPartition: TopicPartition,
-  maxFetchSizeInBytes: Int = 1024 * 1024
+  maxFetchSizeInBytes: Int
 ) extends ReplicatedLog {
 
+  private[this] var oldestSnapshotId = snapshotIds

Review comment:
       Do we need to keep this as a var or could we access it from 
`snapshotIds` when needed (as we do for `latestSnapshotId`)?

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -69,23 +82,55 @@ class KafkaMetadataLog(
     val appendInfo = log.appendAsLeader(records.asInstanceOf[MemoryRecords],
       leaderEpoch = epoch,
       origin = AppendOrigin.Coordinator)
-    new LogAppendInfo(appendInfo.firstOffset.getOrElse {
-      throw new KafkaException("Append failed unexpectedly")
-    }, appendInfo.lastOffset)
+
+    if (appendInfo.firstOffset.exists(_.relativePositionInSegment == 0)) {

Review comment:
       Maybe add a little helper? This code looks the same as in 
`appendAsFollower`.

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -69,23 +82,55 @@ class KafkaMetadataLog(
     val appendInfo = log.appendAsLeader(records.asInstanceOf[MemoryRecords],
       leaderEpoch = epoch,
       origin = AppendOrigin.Coordinator)
-    new LogAppendInfo(appendInfo.firstOffset.getOrElse {
-      throw new KafkaException("Append failed unexpectedly")
-    }, appendInfo.lastOffset)
+
+    if (appendInfo.firstOffset.exists(_.relativePositionInSegment == 0)) {
+      // Assume that a new segment was created if the relative position is 0
+      log.deleteOldSegments()
+    }
+
+    new LogAppendInfo(
+      appendInfo.firstOffset.map(_.messageOffset).getOrElse {
+        throw new KafkaException("Append failed unexpectedly")
+      },
+      appendInfo.lastOffset
+    )
   }
 
   override def appendAsFollower(records: Records): LogAppendInfo = {
     if (records.sizeInBytes == 0)
       throw new IllegalArgumentException("Attempt to append an empty record 
set")
 
     val appendInfo = log.appendAsFollower(records.asInstanceOf[MemoryRecords])
-    new LogAppendInfo(appendInfo.firstOffset.getOrElse {
-      throw new KafkaException("Append failed unexpectedly")
-    }, appendInfo.lastOffset)
+
+    if (appendInfo.firstOffset.exists(_.relativePositionInSegment == 0)) {
+      // Assume that a new segment was created if the relative position is 0
+      log.deleteOldSegments()
+    }
+
+    new LogAppendInfo(
+      appendInfo.firstOffset.map(_.messageOffset).getOrElse {
+        throw new KafkaException("Append failed unexpectedly")
+      },
+      appendInfo.lastOffset
+    )
   }
 
   override def lastFetchedEpoch: Int = {
-    log.latestEpoch.getOrElse(0)
+    log.latestEpoch.getOrElse {
+      latestSnapshotId.map { snapshotId =>
+        val logEndOffset = endOffset().offset
+        if (snapshotId.offset == startOffset && snapshotId.offset == 
logEndOffset) {
+          // Return the epoch of the snapshot when the log is empty
+          snapshotId.epoch
+        } else {
+          throw new KafkaException(
+            s"Log doesn't have a last fetch epoch and there is a snapshot 
($snapshotId). " +
+            s"Expected the snapshot's end offset to match the logs end offset 
($logEndOffset) " +

Review comment:
       nit: `log's`

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -69,23 +82,55 @@ class KafkaMetadataLog(
     val appendInfo = log.appendAsLeader(records.asInstanceOf[MemoryRecords],
       leaderEpoch = epoch,
       origin = AppendOrigin.Coordinator)
-    new LogAppendInfo(appendInfo.firstOffset.getOrElse {
-      throw new KafkaException("Append failed unexpectedly")
-    }, appendInfo.lastOffset)
+
+    if (appendInfo.firstOffset.exists(_.relativePositionInSegment == 0)) {
+      // Assume that a new segment was created if the relative position is 0
+      log.deleteOldSegments()
+    }
+
+    new LogAppendInfo(
+      appendInfo.firstOffset.map(_.messageOffset).getOrElse {

Review comment:
       nit: could probably consolidate the `exists` and `map` here into a 
single `match`
   
   ```scala
   appendInfo.firstOffset match {
     case None => 
       throw new KafkaException
   
     case Some(offsetMetadata=>
       if (offsetMetadata.relativePositionInSegment == 0) {
         log.deleteOldSegments()
       }
       appendInfo.lastOffset
   }
   ```

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -147,18 +221,102 @@ class KafkaMetadataLog(
   }
 
   override def createSnapshot(snapshotId: raft.OffsetAndEpoch): 
RawSnapshotWriter = {
-    FileRawSnapshotWriter.create(log.dir.toPath, snapshotId)
+    // Do not let the state machine create snapshots older than the latest 
snapshot
+    latestSnapshotId().ifPresent { latest =>
+      if (latest.epoch > snapshotId.epoch || latest.offset > 
snapshotId.offset) {
+        // Since snapshots are less than the high-watermark absolute offset 
comparison is okay.
+        throw new IllegalArgumentException(
+          s"Attemting to create a snapshot ($snapshotId) that is not greater 
than the latest snapshot ($latest)"
+        )
+      }
+    }
+
+    FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this))
   }
 
   override def readSnapshot(snapshotId: raft.OffsetAndEpoch): 
Optional[RawSnapshotReader] = {
     try {
-      Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+      if (snapshotIds.contains(snapshotId)) {
+        Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+      } else {
+        Optional.empty()
+      }
     } catch {
-      case e: NoSuchFileException => Optional.empty()
+      case _: NoSuchFileException =>
+        Optional.empty()
+    }
+  }
+
+  override def latestSnapshotId(): Optional[raft.OffsetAndEpoch] = {
+    try {
+      Optional.of(snapshotIds.last)
+    } catch {
+      case _: NoSuchElementException =>
+        Optional.empty()
+    }
+  }
+
+  override def oldestSnapshotId(): Optional[raft.OffsetAndEpoch] = {
+    oldestSnapshotId
+  }
+
+  override def onSnapshotFrozen(snapshotId: raft.OffsetAndEpoch): Unit = {
+    snapshotIds.add(snapshotId)
+  }
+
+  override def updateLogStart(logStartSnapshotId: raft.OffsetAndEpoch): 
Boolean = {

Review comment:
       One thing I was considering is whether we even want to expose this at 
all. In other words, who should be responsible for cleaning up old snapshots? I 
am tempted to say that that should be the responsibility of the Log 
implementation, but perhaps there are reasons it should not be?

##########
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
##########
@@ -106,6 +120,14 @@
      */
     void updateHighWatermark(LogOffsetMetadata offsetMetadata);
 
+    /**
+     * Updates the log start offset if necessary.
+     *
+     * The replicated log's start offset can be increased when there is a 
snapshot greater than the
+     * current log start offset.
+     */
+    boolean updateLogStart(OffsetAndEpoch logStartSnapshotId);

Review comment:
       Perhaps we can make this mirror `truncateFullyToLatestSnapshot`. Perhaps 
`deleteToLatestSnapshot()` or something like that.

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -963,49 +979,111 @@ private FetchResponseData tryCompleteFetchRequest(
         FetchRequestData.FetchPartition request,
         long currentTimeMs
     ) {
-        Optional<Errors> errorOpt = 
validateLeaderOnlyRequest(request.currentLeaderEpoch());
-        if (errorOpt.isPresent()) {
-            return buildEmptyFetchResponse(errorOpt.get(), Optional.empty());
-        }
+        try {
+            Optional<Errors> errorOpt = 
validateLeaderOnlyRequest(request.currentLeaderEpoch());
+            if (errorOpt.isPresent()) {
+                return buildEmptyFetchResponse(errorOpt.get(), 
Optional.empty());
+            }
 
-        long fetchOffset = request.fetchOffset();
-        int lastFetchedEpoch = request.lastFetchedEpoch();
-        LeaderState state = quorum.leaderStateOrThrow();
-        Optional<OffsetAndEpoch> divergingEpochOpt = 
validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
-
-        if (divergingEpochOpt.isPresent()) {
-            Optional<FetchResponseData.EpochEndOffset> divergingEpoch =
-                divergingEpochOpt.map(offsetAndEpoch -> new 
FetchResponseData.EpochEndOffset()
-                    .setEpoch(offsetAndEpoch.epoch)
-                    .setEndOffset(offsetAndEpoch.offset));
-            return buildFetchResponse(Errors.NONE, MemoryRecords.EMPTY, 
divergingEpoch, state.highWatermark());
-        } else {
-            LogFetchInfo info = log.read(fetchOffset, Isolation.UNCOMMITTED);
+            long fetchOffset = request.fetchOffset();
+            int lastFetchedEpoch = request.lastFetchedEpoch();
+            LeaderState state = quorum.leaderStateOrThrow();
+            ValidatedFetchOffsetAndEpoch validatedOffsetAndEpoch = 
validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
 
-            if (state.updateReplicaState(replicaId, currentTimeMs, 
info.startOffsetMetadata)) {
-                onUpdateLeaderHighWatermark(state, currentTimeMs);
+            final Records records;
+            if (validatedOffsetAndEpoch.type() == 
ValidatedFetchOffsetAndEpoch.Type.VALID) {
+                LogFetchInfo info = log.read(fetchOffset, 
Isolation.UNCOMMITTED);
+
+                if (state.updateReplicaState(replicaId, currentTimeMs, 
info.startOffsetMetadata)) {
+                    onUpdateLeaderHighWatermark(state, currentTimeMs);
+                }
+
+                records = info.records;
+            } else {
+                records = MemoryRecords.EMPTY;
             }
 
-            return buildFetchResponse(Errors.NONE, info.records, 
Optional.empty(), state.highWatermark());
+            return buildFetchResponse(Errors.NONE, records, 
validatedOffsetAndEpoch, state.highWatermark());
+        } catch (Exception e) {
+            logger.error("Caught unexpected error in fetch completion of 
request {}", request, e);
+            return buildEmptyFetchResponse(Errors.UNKNOWN_SERVER_ERROR, 
Optional.empty());
         }
     }
 
     /**
      * Check whether a fetch offset and epoch is valid. Return the diverging 
epoch, which
      * is the largest epoch such that subsequent records are known to diverge.
      */
-    private Optional<OffsetAndEpoch> validateFetchOffsetAndEpoch(long 
fetchOffset, int lastFetchedEpoch) {
-        if (fetchOffset == 0 && lastFetchedEpoch == 0) {
-            return Optional.empty();
+    private ValidatedFetchOffsetAndEpoch validateFetchOffsetAndEpoch(long 
fetchOffset, int lastFetchedEpoch) {
+        if (log.startOffset() == 0 && fetchOffset == 0) {
+            if (lastFetchedEpoch != 0) {
+                logger.warn(
+                    "Replica sent a zero fetch offset ({}) but the last 
fetched epoch ({}) was not zero",
+                    fetchOffset,
+                    lastFetchedEpoch
+                );
+            }
+            return ValidatedFetchOffsetAndEpoch.valid(new 
OffsetAndEpoch(fetchOffset, lastFetchedEpoch));
         }
 
-        OffsetAndEpoch endOffsetAndEpoch = 
log.endOffsetForEpoch(lastFetchedEpoch)
-            .orElse(new OffsetAndEpoch(-1L, -1));
-        if (endOffsetAndEpoch.epoch != lastFetchedEpoch || 
endOffsetAndEpoch.offset < fetchOffset) {
-            return Optional.of(endOffsetAndEpoch);
+
+        Optional<OffsetAndEpoch> endOffsetAndEpochOpt = log
+            .endOffsetForEpoch(lastFetchedEpoch)
+            .flatMap(endOffsetAndEpoch -> {
+                if (endOffsetAndEpoch.epoch == lastFetchedEpoch && 
endOffsetAndEpoch.offset == log.startOffset()) {
+                    // This means that either:
+                    // 1. The lastFetchedEpoch is smaller than any known epoch
+                    // 2. The current leader epoch is lastFetchedEpoch and the 
log is empty.
+                    // Assume that there is not diverging information
+                    return Optional.empty();
+                } else {
+                    return Optional.of(endOffsetAndEpoch);
+                }
+            });
+        if (endOffsetAndEpochOpt.isPresent()) {
+            OffsetAndEpoch endOffsetAndEpoch = endOffsetAndEpochOpt.get();
+            if (endOffsetAndEpoch.epoch != lastFetchedEpoch || 
endOffsetAndEpoch.offset < fetchOffset) {
+                return 
ValidatedFetchOffsetAndEpoch.diverging(endOffsetAndEpoch);
+            } else {
+                return ValidatedFetchOffsetAndEpoch.valid(new 
OffsetAndEpoch(fetchOffset, lastFetchedEpoch));
+            }
+        } else if (log.startOffset() > 0) {
+            OffsetAndEpoch oldestSnapshotId = 
log.oldestSnapshotId().orElseThrow(() -> {
+                return new IllegalStateException(
+                    String.format(
+                        "The log start offset (%s) was greater than zero but 
start snapshot was not found",
+                        log.startOffset()
+                    )
+                );
+            });
+
+            if (fetchOffset == log.startOffset() && lastFetchedEpoch == 
oldestSnapshotId.epoch) {

Review comment:
       Just checking, but I think we would only hit this case if the snapshot 
epoch did not match the epoch of the first entry in the log. Is that right? 
This might be another case that we should try to handle inside 
`KafkaMetadataLog. endOffsetForEpoch `. 

##########
File path: 
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
##########
@@ -43,7 +44,59 @@
 
 final public class KafkaRaftClientSnapshotTest {
     @Test
-    public void testMissingFetchSnapshotRequest() throws Exception {
+    public void testFetchRequest() throws Exception {
+        int localId = 0;
+        int otherNodeId = localId + 1;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withAppendLingerMs(1)
+            .build();
+
+        context.becomeLeader();
+        int epoch = context.currentEpoch();
+
+        String[] appendRecords = new String[] {"a", "b", "c"};
+        context.client.scheduleAppend(epoch, Arrays.asList(appendRecords));
+        context.time.sleep(context.appendLingerMs());
+        context.client.poll();
+
+        long localLogEndOffset = context.log.endOffset().offset;
+        assertTrue(
+            appendRecords.length <= localLogEndOffset,
+            String.format("Record length = %s, log end offset = %s", 
appendRecords.length, localLogEndOffset)
+        );
+
+        // Advance the highWatermark
+        context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 
localLogEndOffset, epoch, 0));
+        context.pollUntilResponse();
+        context.assertSentFetchResponse(Errors.NONE, epoch, 
OptionalInt.of(localId));
+        assertEquals(localLogEndOffset, 
context.client.highWatermark().getAsLong());
+
+        OffsetAndEpoch snapshotId = new OffsetAndEpoch(localLogEndOffset, 
epoch);
+        try (SnapshotWriter<String> snapshot = 
context.client.createSnapshot(snapshotId)) {
+            snapshot.freeze();
+        }
+
+        context.client.poll();
+
+        assertEquals(snapshotId.offset, context.log.startOffset());
+
+        // Send Fetch request less than start offset
+        context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 0, 
epoch, 0));
+        context.pollUntilResponse();
+        FetchResponseData.FetchablePartitionResponse partitionResponse = 
context.assertSentFetchResponse();
+        assertEquals(Errors.NONE, 
Errors.forCode(partitionResponse.errorCode()));
+        assertEquals(epoch, partitionResponse.currentLeader().leaderEpoch());
+        assertEquals(localId, partitionResponse.currentLeader().leaderId());
+        assertEquals(snapshotId.epoch, partitionResponse.snapshotId().epoch());
+        assertEquals(snapshotId.offset, 
partitionResponse.snapshotId().endOffset());
+    }
+
+    // TODO: Add a few more fetch request tests checking some of the error 
conditions and edge conditions

Review comment:
       Reminder here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to