feyman2016 commented on a change in pull request #10593:
URL: https://github.com/apache/kafka/pull/10593#discussion_r619793709



##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java
##########
@@ -96,7 +96,7 @@ public BatchBuilder(
     }
 
     /**
-     * Append a record to this patch. The caller must first verify there is 
room for the batch
+     * Append a record to this batch. The caller must first verify there is 
room for the batch

Review comment:
       Side fix

##########
File path: raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterTest.java
##########
@@ -38,7 +38,7 @@
 
     @Test
     public void testWritingSnapshot() throws IOException {
-        OffsetAndEpoch id = new OffsetAndEpoch(10L, 3);
+        OffsetAndEpoch id = new OffsetAndEpoch(0L, 1);

Review comment:
       The highwatermark here is 1, so we need to make the snapshotId's 
endOffset < 1.

##########
File path: 
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
##########
@@ -1335,6 +1313,51 @@ public void 
testFetchSnapshotRequestClusterIdValidation() throws Exception {
         
context.assertSentFetchSnapshotResponse(Errors.INCONSISTENT_CLUSTER_ID);
     }
 
+    @Test
+    public void testCreateSnapshotWithInvalidSnapshotId() throws Exception {
+        int localId = 0;
+        int otherNodeId = localId + 1;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+        int epoch = 2;
+
+        List<String> appendRecords = Arrays.asList("a", "b", "c");
+        OffsetAndEpoch invalidSnapshotId1 = new OffsetAndEpoch(3, epoch);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+                .appendToLog(epoch, appendRecords)
+                .withAppendLingerMs(1)
+                .build();
+
+        context.becomeLeader();
+        int currentEpoch = context.currentEpoch();
+
+        // When creating snapshot:
+        // 1. high watermark cannot be empty
+        assertEquals(OptionalLong.empty(), context.client.highWatermark());
+        assertThrows(KafkaException.class, () -> 
context.client.createSnapshot(invalidSnapshotId1));
+
+        // 2. high watermark must larger than the snapshotId's endOffset
+        advanceHighWatermark(context, currentEpoch, currentEpoch, otherNodeId, 
localId);
+        assertNotEquals(OptionalLong.empty(), context.client.highWatermark());
+        OffsetAndEpoch invalidSnapshotId2 = new 
OffsetAndEpoch(context.client.highWatermark().getAsLong(), currentEpoch);
+        assertThrows(KafkaException.class, () -> 
context.client.createSnapshot(invalidSnapshotId2));
+
+        // 3. the current leader epoch cache must larger than the snapshotId's 
epoch
+        OffsetAndEpoch invalidSnapshotId3 = new 
OffsetAndEpoch(context.client.highWatermark().getAsLong() - 1, currentEpoch + 
1);
+        assertThrows(KafkaException.class, () -> 
context.client.createSnapshot(invalidSnapshotId3));
+    }
+
+    private void advanceHighWatermark(RaftClientTestContext context,

Review comment:
       Extract the functionality to avoid duplicate

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2268,6 +2269,20 @@ private Long append(int epoch, List<T> records, boolean 
isAtomic) {
         );
     }
 
+    private void validateSnapshotId(OffsetAndEpoch snapshotId) {
+        Optional<LogOffsetMetadata> highWatermarkOpt = 
quorum().highWatermark();
+        if (!highWatermarkOpt.isPresent() || highWatermarkOpt.get().offset <= 
snapshotId.offset) {

Review comment:
       Conceptually, the `snapshotId.offset=highWatermarkOpt.get().offset` is 
ok, because the record at `snapshotId.offset` is not included in the snapshot, 
but I'm not sure if there are other restrictions because in the Jira 
description, it says: `The end offset and epoch of the snapshot is less than 
the high-watermark`, please kindly advice @jsancio 

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2268,6 +2269,20 @@ private Long append(int epoch, List<T> records, boolean 
isAtomic) {
         );
     }
 
+    private void validateSnapshotId(OffsetAndEpoch snapshotId) {
+        Optional<LogOffsetMetadata> highWatermarkOpt = 
quorum().highWatermark();
+        if (!highWatermarkOpt.isPresent() || highWatermarkOpt.get().offset <= 
snapshotId.offset) {
+            throw new KafkaException("Trying to creating snapshot with 
snapshotId: " + snapshotId + " whose offset is larger than the high-watermark: 
" +
+                    highWatermarkOpt + ". This may necessarily mean a bug in 
the caller, since the there should be a minimum " +
+                    "size of records between the latest snapshot and the 
high-watermark when creating snapshot");
+        }
+        int leaderEpoch = quorum().epoch();
+        if (snapshotId.epoch > leaderEpoch) {
+            throw new KafkaException("Trying to creating snapshot with 
snapshotId: " + snapshotId + " whose epoch is" +
+                    " larger than the current leader epoch: " + leaderEpoch);
+        }

Review comment:
       Thanks, previous I thought the quorum epoch is the leader epoch cache as 
a mistake~
   Updated the PR, one thing I'm not sure about is that:
   
   > The epoch of the snapshot is equal to the quorum epoch.
   
   I think the snapshotId's epoch <= quorum epoch should be fine?

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2268,6 +2269,20 @@ private Long append(int epoch, List<T> records, boolean 
isAtomic) {
         );
     }
 
+    private void validateSnapshotId(OffsetAndEpoch snapshotId) {
+        Optional<LogOffsetMetadata> highWatermarkOpt = 
quorum().highWatermark();
+        if (!highWatermarkOpt.isPresent() || highWatermarkOpt.get().offset <= 
snapshotId.offset) {
+            throw new KafkaException("Trying to creating snapshot with 
snapshotId: " + snapshotId + " whose offset is larger than the high-watermark: 
" +
+                    highWatermarkOpt + ". This may necessarily mean a bug in 
the caller, since the there should be a minimum " +
+                    "size of records between the latest snapshot and the 
high-watermark when creating snapshot");
+        }
+        int leaderEpoch = quorum().epoch();
+        if (snapshotId.epoch > leaderEpoch) {
+            throw new KafkaException("Trying to creating snapshot with 
snapshotId: " + snapshotId + " whose epoch is" +
+                    " larger than the current leader epoch: " + leaderEpoch);
+        }

Review comment:
       Thanks, previous I thought the quorum epoch is the leader epoch cache as 
a mistake~
   Updated the PR, in the jira, one thing I'm not sure about is that:
   
   > 2. The epoch of the snapshot is equal to the quorum epoch.
   
   I think the snapshotId's epoch <= quorum epoch should be fine?

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2268,6 +2269,20 @@ private Long append(int epoch, List<T> records, boolean 
isAtomic) {
         );
     }
 
+    private void validateSnapshotId(OffsetAndEpoch snapshotId) {
+        Optional<LogOffsetMetadata> highWatermarkOpt = 
quorum().highWatermark();
+        if (!highWatermarkOpt.isPresent() || highWatermarkOpt.get().offset <= 
snapshotId.offset) {

Review comment:
       Thank you, I updated the PR as well




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to