jsancio commented on a change in pull request #10593:
URL: https://github.com/apache/kafka/pull/10593#discussion_r622425656



##########
File path: 
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
##########
@@ -1335,6 +1313,57 @@ public void 
testFetchSnapshotRequestClusterIdValidation() throws Exception {
         
context.assertSentFetchSnapshotResponse(Errors.INCONSISTENT_CLUSTER_ID);
     }
 
+    @Test
+    public void testCreateSnapshotWithInvalidSnapshotId() throws Exception {
+        int localId = 0;
+        int otherNodeId = localId + 1;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+        int epoch = 2;
+
+        List<String> appendRecords = Arrays.asList("a", "b", "c");
+        OffsetAndEpoch invalidSnapshotId1 = new OffsetAndEpoch(3, epoch);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+                .appendToLog(epoch, appendRecords)
+                .withAppendLingerMs(1)
+                .build();
+
+        context.becomeLeader();
+        int currentEpoch = context.currentEpoch();
+
+        // When creating snapshot:
+        // 1.1 high watermark cannot be empty
+        assertEquals(OptionalLong.empty(), context.client.highWatermark());
+        assertThrows(KafkaException.class, () -> 
context.client.createSnapshot(invalidSnapshotId1));
+
+        // 1.2 high watermark must larger than or equal to the snapshotId's 
endOffset
+        advanceHighWatermark(context, currentEpoch, currentEpoch, otherNodeId, 
localId);

Review comment:
       I would append a couple of batches after advancing the high-watermark. 
At this point the HWM equals the LEO.

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2268,6 +2269,25 @@ private Long append(int epoch, List<T> records, boolean 
isAtomic) {
         );
     }
 
+    private void validateSnapshotId(OffsetAndEpoch snapshotId) {
+        Optional<LogOffsetMetadata> highWatermarkOpt = 
quorum().highWatermark();
+        if (!highWatermarkOpt.isPresent() || highWatermarkOpt.get().offset < 
snapshotId.offset) {
+            throw new KafkaException("Trying to creating snapshot with invalid 
snapshotId: " + snapshotId + " whose offset is larger than the high-watermark: 
" +
+                    highWatermarkOpt + ". This may necessarily mean a bug in 
the caller, since the there should be a minimum " +
+                    "size of records between the latest snapshot and the 
high-watermark when creating snapshot");

Review comment:
       What do you mean by ", since the there should be a minimum size of 
records between the latest snapshot and the high-watermark when creating 
snapshot"?

##########
File path: 
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
##########
@@ -1335,6 +1313,57 @@ public void 
testFetchSnapshotRequestClusterIdValidation() throws Exception {
         
context.assertSentFetchSnapshotResponse(Errors.INCONSISTENT_CLUSTER_ID);
     }
 
+    @Test
+    public void testCreateSnapshotWithInvalidSnapshotId() throws Exception {
+        int localId = 0;
+        int otherNodeId = localId + 1;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+        int epoch = 2;
+
+        List<String> appendRecords = Arrays.asList("a", "b", "c");
+        OffsetAndEpoch invalidSnapshotId1 = new OffsetAndEpoch(3, epoch);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+                .appendToLog(epoch, appendRecords)
+                .withAppendLingerMs(1)
+                .build();
+
+        context.becomeLeader();
+        int currentEpoch = context.currentEpoch();
+
+        // When creating snapshot:
+        // 1.1 high watermark cannot be empty
+        assertEquals(OptionalLong.empty(), context.client.highWatermark());
+        assertThrows(KafkaException.class, () -> 
context.client.createSnapshot(invalidSnapshotId1));
+
+        // 1.2 high watermark must larger than or equal to the snapshotId's 
endOffset
+        advanceHighWatermark(context, currentEpoch, currentEpoch, otherNodeId, 
localId);
+        assertNotEquals(OptionalLong.empty(), context.client.highWatermark());
+        OffsetAndEpoch invalidSnapshotId2 = new 
OffsetAndEpoch(context.client.highWatermark().getAsLong() + 1, currentEpoch);
+        assertThrows(KafkaException.class, () -> 
context.client.createSnapshot(invalidSnapshotId2));
+
+        // 2 the quorum epoch must larger than or equal to the snapshotId's 
epoch
+        OffsetAndEpoch invalidSnapshotId3 = new 
OffsetAndEpoch(context.client.highWatermark().getAsLong() - 1, currentEpoch + 
1);
+        assertThrows(KafkaException.class, () -> 
context.client.createSnapshot(invalidSnapshotId3));
+
+        // 3 the snapshotId should be validated against endOffsetForEpoch
+        OffsetAndEpoch endOffsetForEpoch = 
context.log.endOffsetForEpoch(currentEpoch);

Review comment:
       Let's use `epoch` instead of `currentEpoch`. Since we are using 
`currentEpoch`, the `endOffsetForEpoch.offset` will equal the LEO. If you 
instead use `epoch` then the `endOffsetForEpoch.offset`. will be `4` which is 
less than the LEO (`5`).

##########
File path: 
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
##########
@@ -1335,6 +1313,57 @@ public void 
testFetchSnapshotRequestClusterIdValidation() throws Exception {
         
context.assertSentFetchSnapshotResponse(Errors.INCONSISTENT_CLUSTER_ID);
     }
 
+    @Test
+    public void testCreateSnapshotWithInvalidSnapshotId() throws Exception {

Review comment:
       This test is checking the `snapshotId` when the `KafkaRaftClient` is the 
leader. Let's have a similar test but when the `KafkaRaftClient` is a follower. 
Take a look at `KafkaRaftClient::testEmptyRecordSetInFetchResponse` for a 
simple example of how you can advance the high-watermark on the follower.
   
   Note that the followers don't need to wait for the "high-watermark" to reach 
the current epoch to set (and advance) the high-watermark.

##########
File path: raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterTest.java
##########
@@ -38,7 +38,7 @@
 
     @Test
     public void testWritingSnapshot() throws IOException {
-        OffsetAndEpoch id = new OffsetAndEpoch(10L, 3);
+        OffsetAndEpoch id = new OffsetAndEpoch(1L, 1);

Review comment:
       This applies to all of the tests in the suite.
   
   Do you know why the `context.client.createSnapshot` doesn't throw an 
exception in these tests? Shouldn't the high-watermark be unknown 
(`Optional.empty`) in all of these cases?

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2268,6 +2269,25 @@ private Long append(int epoch, List<T> records, boolean 
isAtomic) {
         );
     }
 
+    private void validateSnapshotId(OffsetAndEpoch snapshotId) {
+        Optional<LogOffsetMetadata> highWatermarkOpt = 
quorum().highWatermark();
+        if (!highWatermarkOpt.isPresent() || highWatermarkOpt.get().offset < 
snapshotId.offset) {
+            throw new KafkaException("Trying to creating snapshot with invalid 
snapshotId: " + snapshotId + " whose offset is larger than the high-watermark: 
" +
+                    highWatermarkOpt + ". This may necessarily mean a bug in 
the caller, since the there should be a minimum " +
+                    "size of records between the latest snapshot and the 
high-watermark when creating snapshot");
+        }
+        int quorumEpoch = quorum().epoch();
+        if (snapshotId.epoch > quorumEpoch) {
+            throw new KafkaException("Trying to creating snapshot with invalid 
snapshotId: " + snapshotId + " whose epoch is" +
+                    " larger than the quorum epoch: " + quorumEpoch);
+        }
+        OffsetAndEpoch endOffsetAndEpoch = 
log.endOffsetForEpoch(snapshotId.epoch);
+        if (endOffsetAndEpoch.epoch != snapshotId.epoch || 
endOffsetAndEpoch.offset < snapshotId.offset) {
+            throw new KafkaException("Trying to creating snapshot with invalid 
snapshotId: " + snapshotId +

Review comment:
       Let's use `InvalidArgumentException` instead of `KafkaException` for all 
of the exceptions thrown in this method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to