jsancio commented on code in PR #17500:
URL: https://github.com/apache/kafka/pull/17500#discussion_r1870235740


##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3426,6 +3426,12 @@ public Optional<SnapshotWriter<T>> createSnapshot(
             throw new IllegalStateException("Cannot create snapshot before the 
replica has been initialized");
         }
 
+        long baseOffset = log.read(snapshotId.offset(), 
Isolation.COMMITTED).startOffsetMetadata.offset();
+        if (snapshotId.offset() != baseOffset) {
+            logger.info("Cannot create snapshot at offset {} because it is not 
batch aligned", snapshotId.offset());
+            throw new IllegalArgumentException("Cannot create snapshot at an 
offset that is not batch aligned");

Review Comment:
   Let's include as much information as possible. That includes the snapshot 
id's end offset and the return batch's base offset?



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java:
##########
@@ -1927,18 +1934,51 @@ public void 
testCreateSnapshotAsLeaderWithInvalidSnapshotId(boolean withKip853Rp
         context.client.poll();
         assertEquals(context.log.endOffset().offset(), 
context.client.highWatermark().getAsLong() + newRecords.size());
 
-        OffsetAndEpoch invalidSnapshotId2 = new 
OffsetAndEpoch(context.client.highWatermark().getAsLong() + 2, currentEpoch);
-        assertThrows(IllegalArgumentException.class, () -> 
context.client.createSnapshot(invalidSnapshotId2, 0));
+        OffsetAndEpoch invalidSnapshotId2 = new 
OffsetAndEpoch(context.client.highWatermark().getAsLong() + 3, currentEpoch);
+        exception = assertThrows(
+            IllegalArgumentException.class,
+            () -> context.client.createSnapshot(invalidSnapshotId2, 0)
+        );
+        assertEquals(
+            "Cannot create a snapshot with an id (OffsetAndEpoch(offset=7, 
epoch=3)) greater than the high-watermark (4)",
+            exception.getMessage()
+        );
 
         // 2 the quorum epoch must larger than or equal to the snapshotId's 
epoch
         OffsetAndEpoch invalidSnapshotId3 = new 
OffsetAndEpoch(context.client.highWatermark().getAsLong(), currentEpoch + 1);
-        assertThrows(IllegalArgumentException.class, () -> 
context.client.createSnapshot(invalidSnapshotId3, 0));
+        exception = assertThrows(
+            IllegalArgumentException.class,
+            () -> context.client.createSnapshot(invalidSnapshotId3, 0)
+        );
+        assertEquals(
+            "Snapshot id (OffsetAndEpoch(offset=4, epoch=4)) is not valid 
according to the log: ValidOffsetAndEpoch(kind=DIVERGING, 
offsetAndEpoch=OffsetAndEpoch(offset=7, epoch=3))",
+            exception.getMessage()
+        );
 
         // 3 the snapshotId should be validated against endOffsetForEpoch
         OffsetAndEpoch endOffsetForEpoch = 
context.log.endOffsetForEpoch(epoch);
         assertEquals(epoch, endOffsetForEpoch.epoch());
-        OffsetAndEpoch invalidSnapshotId4 = new 
OffsetAndEpoch(endOffsetForEpoch.offset() + 2, epoch);
-        assertThrows(IllegalArgumentException.class, () -> 
context.client.createSnapshot(invalidSnapshotId4, 0));
+        OffsetAndEpoch invalidSnapshotId4 = new 
OffsetAndEpoch(endOffsetForEpoch.offset() + 1, epoch);
+        exception = assertThrows(
+            IllegalArgumentException.class,
+            () -> context.client.createSnapshot(invalidSnapshotId4, 0)
+        );
+        assertEquals(
+            "Snapshot id (OffsetAndEpoch(offset=4, epoch=2)) is not valid 
according to the log: ValidOffsetAndEpoch(kind=DIVERGING, 
offsetAndEpoch=OffsetAndEpoch(offset=3, epoch=2))",
+            exception.getMessage()
+        );
+
+        // 4 snapshotId offset must be at a batch boundary

Review Comment:
   I would mention that this points to the middle of batch (d, e, f). It points 
to the f offset which is not batch aligned, right?



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3426,6 +3426,12 @@ public Optional<SnapshotWriter<T>> createSnapshot(
             throw new IllegalStateException("Cannot create snapshot before the 
replica has been initialized");
         }
 
+        long baseOffset = log.read(snapshotId.offset(), 
Isolation.COMMITTED).startOffsetMetadata.offset();

Review Comment:
   Let's document why you are doing this and why it is correct. It may not be 
obvious to future readers why the snapshot's end offset must be batch aligned 
or point to the log end offset.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3426,6 +3426,12 @@ public Optional<SnapshotWriter<T>> createSnapshot(
             throw new IllegalStateException("Cannot create snapshot before the 
replica has been initialized");
         }
 
+        long baseOffset = log.read(snapshotId.offset(), 
Isolation.COMMITTED).startOffsetMetadata.offset();
+        if (snapshotId.offset() != baseOffset) {
+            logger.info("Cannot create snapshot at offset {} because it is not 
batch aligned", snapshotId.offset());

Review Comment:
   We should also record the `baseOffset` returned by `log.read` in the log 
message.



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java:
##########
@@ -1927,18 +1934,51 @@ public void 
testCreateSnapshotAsLeaderWithInvalidSnapshotId(boolean withKip853Rp
         context.client.poll();
         assertEquals(context.log.endOffset().offset(), 
context.client.highWatermark().getAsLong() + newRecords.size());
 
-        OffsetAndEpoch invalidSnapshotId2 = new 
OffsetAndEpoch(context.client.highWatermark().getAsLong() + 2, currentEpoch);
-        assertThrows(IllegalArgumentException.class, () -> 
context.client.createSnapshot(invalidSnapshotId2, 0));
+        OffsetAndEpoch invalidSnapshotId2 = new 
OffsetAndEpoch(context.client.highWatermark().getAsLong() + 3, currentEpoch);

Review Comment:
   Did you need to change this because `HWM + 2` was not batched aligned since 
the batch after the HWM has three records? If so, I would make this explicit by 
using `HWM + newRecords.size()`.



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java:
##########
@@ -1951,6 +1991,7 @@ public void 
testCreateSnapshotAsFollowerWithInvalidSnapshotId(boolean withKip853
         Set<Integer> voters = Set.of(localId, leaderId, otherFollowerId);
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .appendToLog(0, List.of("a"))

Review Comment:
   Technically, `0` is not a valid leader epoch. The smallest possible epoch 
for a leader is 1.



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java:
##########
@@ -2003,11 +2054,43 @@ public void 
testCreateSnapshotAsFollowerWithInvalidSnapshotId(boolean withKip853
         context.client.poll();
         assertEquals(6L, context.client.highWatermark().getAsLong());
 
+        // 3) The quorum epoch must be larger than or equal to the 
snapshotId's epoch
+        OffsetAndEpoch invalidSnapshotId3 = new 
OffsetAndEpoch(context.client.highWatermark().getAsLong(), currentEpoch + 1);
+        exception = assertThrows(
+            IllegalArgumentException.class,
+            () -> context.client.createSnapshot(invalidSnapshotId3, 0)
+        );
+        assertEquals(
+            "Snapshot id (OffsetAndEpoch(offset=6, epoch=6)) is not valid 
according to the log: ValidOffsetAndEpoch(kind=DIVERGING, 
offsetAndEpoch=OffsetAndEpoch(offset=6, epoch=4))",
+            exception.getMessage()
+        );
+
         // 4) The snapshotId should be validated against endOffsetForEpoch
         OffsetAndEpoch endOffsetForEpoch = context.log.endOffsetForEpoch(3);
         assertEquals(3, endOffsetForEpoch.epoch());
-        OffsetAndEpoch invalidSnapshotId4 = new 
OffsetAndEpoch(endOffsetForEpoch.offset() + 1, epoch);
-        assertThrows(IllegalArgumentException.class, () -> 
context.client.createSnapshot(invalidSnapshotId4, 0));
+        OffsetAndEpoch invalidSnapshotId4 = new 
OffsetAndEpoch(endOffsetForEpoch.offset() + 3, 3);
+        exception = assertThrows(
+            IllegalArgumentException.class,
+            () -> context.client.createSnapshot(invalidSnapshotId4, 0)
+        );
+        assertEquals(
+            "Snapshot id (OffsetAndEpoch(offset=6, epoch=3)) is not valid 
according to the log: ValidOffsetAndEpoch(kind=DIVERGING, 
offsetAndEpoch=OffsetAndEpoch(offset=3, epoch=3))",
+            exception.getMessage()
+        );
+
+        // 5) The snapshotId should be batch-aligned
+        endOffsetForEpoch = context.log.endOffsetForEpoch(currentEpoch);
+        assertEquals(4, endOffsetForEpoch.epoch());
+        assertEquals(6, endOffsetForEpoch.offset());

Review Comment:
   I found this difficult to follow. I assume this point to `f` in the batch 
`d, e, f`. That has an base offset of 3, epoch of 4 and size of 3. So you are 
trying to check that offset 5 (f) is not batch aligned.
   
   The part that makes it difficult to follow is that `currentEpoch >= 5`.
   
   The other part that make it diffcult to follow is that you are using 
currentEpoch in `invalidSnapshotId5` and not 4.
   
   I think it easier to read to to make 4 a variable `batch2Epoch` and using it 
here and in `context.buildBatch(3L, 4, records2)`. What do you think?
   



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3426,6 +3426,12 @@ public Optional<SnapshotWriter<T>> createSnapshot(
             throw new IllegalStateException("Cannot create snapshot before the 
replica has been initialized");
         }
 
+        long baseOffset = log.read(snapshotId.offset(), 
Isolation.COMMITTED).startOffsetMetadata.offset();

Review Comment:
   Is there a reason why you added this check here and not to `ReplicatedLog` 
(`MockLog` and `KafkaMetadataLog`)? Those types already do snapshot id 
validation against the underlying log records.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to