jsancio commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r564766462



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -147,18 +221,102 @@ class KafkaMetadataLog(
   }
 
   override def createSnapshot(snapshotId: raft.OffsetAndEpoch): 
RawSnapshotWriter = {
-    FileRawSnapshotWriter.create(log.dir.toPath, snapshotId)
+    // Do not let the state machine create snapshots older than the latest 
snapshot
+    latestSnapshotId().ifPresent { latest =>
+      if (latest.epoch > snapshotId.epoch || latest.offset > 
snapshotId.offset) {
+        // Since snapshots are less than the high-watermark absolute offset 
comparison is okay.

Review comment:
       Yes but only when it is the state machine that is creating the snapshot. 
When the raft client is creating the snapshot because it is downloading it from 
the leader we don't want to validate this.
   
   We'll add this check to the `RaftClient::createSnapshot` API when we 
implement https://issues.apache.org/jira/browse/KAFKA-10800

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -113,6 +158,22 @@ class KafkaMetadataLog(
     log.truncateTo(offset)
   }
 
+  override def truncateFullyToLatestSnapshot(): Boolean = {

Review comment:
       I can prefix it with maybe. E.g. `maybeTruncateFullyToLatestSnapshot`.
   
   

##########
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.raft
+
+import java.io.File
+import java.nio.file.Files
+import java.nio.file.Path
+import kafka.log.Log
+import kafka.log.LogManager
+import kafka.log.LogTest
+import kafka.server.BrokerTopicStats
+import kafka.server.LogDirFailureChannel
+import kafka.utils.MockTime
+import kafka.utils.TestUtils
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.OffsetOutOfRangeException
+import org.apache.kafka.common.record.CompressionType
+import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.record.SimpleRecord
+import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.raft.LogAppendInfo
+import org.apache.kafka.raft.LogOffsetMetadata
+import org.apache.kafka.raft.OffsetAndEpoch
+import org.apache.kafka.raft.ReplicatedLog
+import org.apache.kafka.snapshot.Snapshots
+import org.junit.After
+import org.junit.Assert.assertEquals
+import org.junit.Assert.assertFalse
+import org.junit.Assert.assertThrows
+import org.junit.Assert.assertTrue
+import org.junit.Before
+import org.junit.Test
+
+final class KafkaMetadataLogTest {
+  import KafkaMetadataLogTest._
+
+  var tempDir: File = null
+  val mockTime = new MockTime()
+
+  @Before
+  def setUp(): Unit = {
+    tempDir = TestUtils.tempDir()
+  }
+
+  @After
+  def tearDown(): Unit = {
+    Utils.delete(tempDir)
+  }
+
+  @Test
+  def testCreateSnapshot(): Unit = {
+    val topicPartition = new TopicPartition("cluster-metadata", 0)
+    val numberOfRecords = 10
+    val epoch = 0
+    val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
+    val log = buildMetadataLog(tempDir, mockTime, topicPartition)
+
+    append(log, numberOfRecords, epoch)
+    log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
+
+    TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+      snapshot.freeze()
+    }
+
+    TestUtils.resource(log.readSnapshot(snapshotId).get()) { snapshot =>
+      assertEquals(0, snapshot.sizeInBytes())
+    }
+  }
+
+  @Test
+  def testReadMissingSnapshot(): Unit = {
+    val topicPartition = new TopicPartition("cluster-metadata", 0)
+    val log = buildMetadataLog(tempDir, mockTime, topicPartition)
+
+    assertFalse(log.readSnapshot(new OffsetAndEpoch(10, 0)).isPresent)

Review comment:
       Fixed.
   
   I think the reason that I didn't do this was because `RawSnapshotReader` 
doesn't implement in an interesting equality. It doesn't also override 
`toString` but probably it should. Let me create a Jira to add `toString` to 
some of the classes I added.

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -871,15 +875,32 @@ private FetchResponseData buildFetchResponse(
                 .setLeaderEpoch(quorum.epoch())
                 .setLeaderId(quorum.leaderIdOrNil());
 
-            divergingEpoch.ifPresent(partitionData::setDivergingEpoch);
+            switch (validatedOffsetAndEpoch.type()) {
+                case DIVERGING:
+                    partitionData.divergingEpoch()
+                        
.setEpoch(validatedOffsetAndEpoch.offsetAndEpoch().epoch)
+                        
.setEndOffset(validatedOffsetAndEpoch.offsetAndEpoch().offset);
+                    break;
+                case SNAPSHOT:
+                    partitionData.snapshotId()
+                        
.setEpoch(validatedOffsetAndEpoch.offsetAndEpoch().epoch)
+                        
.setEndOffset(validatedOffsetAndEpoch.offsetAndEpoch().offset);
+                    break;
+                default:
+            }
         });
     }
 
     private FetchResponseData buildEmptyFetchResponse(
         Errors error,
         Optional<LogOffsetMetadata> highWatermark
     ) {
-        return buildFetchResponse(error, MemoryRecords.EMPTY, 
Optional.empty(), highWatermark);
+        return buildFetchResponse(
+            error,
+            MemoryRecords.EMPTY,
+            ValidatedFetchOffsetAndEpoch.valid(new OffsetAndEpoch(-1, -1)),

Review comment:
       Yeah. I can hide this by adding an overloaded signature as 
`ValidatedFetchOffsetAndEpoch valid()`. The Raft Client implementation ignores 
this value when it is valid when it is creating a Fetch response.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to