jsancio commented on a change in pull request #10593:
URL: https://github.com/apache/kafka/pull/10593#discussion_r619882084



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2268,6 +2269,20 @@ private Long append(int epoch, List<T> records, boolean 
isAtomic) {
         );
     }
 
+    private void validateSnapshotId(OffsetAndEpoch snapshotId) {
+        Optional<LogOffsetMetadata> highWatermarkOpt = 
quorum().highWatermark();
+        if (!highWatermarkOpt.isPresent() || highWatermarkOpt.get().offset <= 
snapshotId.offset) {

Review comment:
       You are correct. I think when I created the Jira I overlooked that both 
snapshot id's end offset and the high-watermark are exclusive values. Update 
the Jira's description.

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2268,6 +2269,20 @@ private Long append(int epoch, List<T> records, boolean 
isAtomic) {
         );
     }
 
+    private void validateSnapshotId(OffsetAndEpoch snapshotId) {
+        Optional<LogOffsetMetadata> highWatermarkOpt = 
quorum().highWatermark();
+        if (!highWatermarkOpt.isPresent() || highWatermarkOpt.get().offset <= 
snapshotId.offset) {
+            throw new KafkaException("Trying to creating snapshot with 
snapshotId: " + snapshotId + " whose offset is larger than the high-watermark: 
" +
+                    highWatermarkOpt + ". This may necessarily mean a bug in 
the caller, since the there should be a minimum " +
+                    "size of records between the latest snapshot and the 
high-watermark when creating snapshot");
+        }
+        int leaderEpoch = quorum().epoch();
+        if (snapshotId.epoch > leaderEpoch) {
+            throw new KafkaException("Trying to creating snapshot with 
snapshotId: " + snapshotId + " whose epoch is" +
+                    " larger than the current leader epoch: " + leaderEpoch);
+        }

Review comment:
       From the Jira:
   > The end offset and epoch of the snapshot is valid based on the leader 
epoch cache.
   
   How about also validating against the leader epoch cache? See 
https://github.com/apache/kafka/blob/trunk/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java#L124.
 This is important because both the snapshot and the leader epoch cache are 
used to validate offsets. See 
https://github.com/apache/kafka/blob/trunk/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java#L85.
 The term leader epoch cache comes the variable name `leaderEpochCache` used in 
`kafka.log.Log`.

##########
File path: 
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
##########
@@ -1335,6 +1313,51 @@ public void 
testFetchSnapshotRequestClusterIdValidation() throws Exception {
         
context.assertSentFetchSnapshotResponse(Errors.INCONSISTENT_CLUSTER_ID);
     }
 
+    @Test
+    public void testCreateSnapshotWithInvalidSnapshotId() throws Exception {
+        int localId = 0;
+        int otherNodeId = localId + 1;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+        int epoch = 2;
+
+        List<String> appendRecords = Arrays.asList("a", "b", "c");
+        OffsetAndEpoch invalidSnapshotId1 = new OffsetAndEpoch(3, epoch);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+                .appendToLog(epoch, appendRecords)
+                .withAppendLingerMs(1)
+                .build();
+
+        context.becomeLeader();
+        int currentEpoch = context.currentEpoch();
+
+        // When creating snapshot:
+        // 1. high watermark cannot be empty
+        assertEquals(OptionalLong.empty(), context.client.highWatermark());
+        assertThrows(KafkaException.class, () -> 
context.client.createSnapshot(invalidSnapshotId1));
+
+        // 2. high watermark must larger than the snapshotId's endOffset
+        advanceHighWatermark(context, currentEpoch, currentEpoch, otherNodeId, 
localId);
+        assertNotEquals(OptionalLong.empty(), context.client.highWatermark());
+        OffsetAndEpoch invalidSnapshotId2 = new 
OffsetAndEpoch(context.client.highWatermark().getAsLong(), currentEpoch);
+        assertThrows(KafkaException.class, () -> 
context.client.createSnapshot(invalidSnapshotId2));
+
+        // 3. the current leader epoch cache must larger than the snapshotId's 
epoch
+        OffsetAndEpoch invalidSnapshotId3 = new 
OffsetAndEpoch(context.client.highWatermark().getAsLong() - 1, currentEpoch + 
1);
+        assertThrows(KafkaException.class, () -> 
context.client.createSnapshot(invalidSnapshotId3));
+    }
+
+    private void advanceHighWatermark(RaftClientTestContext context,

Review comment:
       Thanks for clean up the code duplication.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to