feyman2016 commented on a change in pull request #10593: URL: https://github.com/apache/kafka/pull/10593#discussion_r619793709
########## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java ########## @@ -96,7 +96,7 @@ public BatchBuilder( } /** - * Append a record to this patch. The caller must first verify there is room for the batch + * Append a record to this batch. The caller must first verify there is room for the batch Review comment: Side fix ########## File path: raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterTest.java ########## @@ -38,7 +38,7 @@ @Test public void testWritingSnapshot() throws IOException { - OffsetAndEpoch id = new OffsetAndEpoch(10L, 3); + OffsetAndEpoch id = new OffsetAndEpoch(0L, 1); Review comment: The highwatermark here is 1, so we need to make the snapshotId's endOffset < 1. ########## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java ########## @@ -1335,6 +1313,51 @@ public void testFetchSnapshotRequestClusterIdValidation() throws Exception { context.assertSentFetchSnapshotResponse(Errors.INCONSISTENT_CLUSTER_ID); } + @Test + public void testCreateSnapshotWithInvalidSnapshotId() throws Exception { + int localId = 0; + int otherNodeId = localId + 1; + Set<Integer> voters = Utils.mkSet(localId, otherNodeId); + int epoch = 2; + + List<String> appendRecords = Arrays.asList("a", "b", "c"); + OffsetAndEpoch invalidSnapshotId1 = new OffsetAndEpoch(3, epoch); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + .appendToLog(epoch, appendRecords) + .withAppendLingerMs(1) + .build(); + + context.becomeLeader(); + int currentEpoch = context.currentEpoch(); + + // When creating snapshot: + // 1. high watermark cannot be empty + assertEquals(OptionalLong.empty(), context.client.highWatermark()); + assertThrows(KafkaException.class, () -> context.client.createSnapshot(invalidSnapshotId1)); + + // 2. high watermark must larger than the snapshotId's endOffset + advanceHighWatermark(context, currentEpoch, currentEpoch, otherNodeId, localId); + assertNotEquals(OptionalLong.empty(), context.client.highWatermark()); + OffsetAndEpoch invalidSnapshotId2 = new OffsetAndEpoch(context.client.highWatermark().getAsLong(), currentEpoch); + assertThrows(KafkaException.class, () -> context.client.createSnapshot(invalidSnapshotId2)); + + // 3. the current leader epoch cache must larger than the snapshotId's epoch + OffsetAndEpoch invalidSnapshotId3 = new OffsetAndEpoch(context.client.highWatermark().getAsLong() - 1, currentEpoch + 1); + assertThrows(KafkaException.class, () -> context.client.createSnapshot(invalidSnapshotId3)); + } + + private void advanceHighWatermark(RaftClientTestContext context, Review comment: Extract the functionality to avoid duplicate ########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -2268,6 +2269,20 @@ private Long append(int epoch, List<T> records, boolean isAtomic) { ); } + private void validateSnapshotId(OffsetAndEpoch snapshotId) { + Optional<LogOffsetMetadata> highWatermarkOpt = quorum().highWatermark(); + if (!highWatermarkOpt.isPresent() || highWatermarkOpt.get().offset <= snapshotId.offset) { Review comment: Conceptually, the `snapshotId.offset=highWatermarkOpt.get().offset` is ok, because the record at `snapshotId.offset` is not included in the snapshot, but I'm not sure if there are other restrictions because in the Jira description, it says: `The end offset and epoch of the snapshot is less than the high-watermark`, please kindly advice @jsancio ########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -2268,6 +2269,20 @@ private Long append(int epoch, List<T> records, boolean isAtomic) { ); } + private void validateSnapshotId(OffsetAndEpoch snapshotId) { + Optional<LogOffsetMetadata> highWatermarkOpt = quorum().highWatermark(); + if (!highWatermarkOpt.isPresent() || highWatermarkOpt.get().offset <= snapshotId.offset) { + throw new KafkaException("Trying to creating snapshot with snapshotId: " + snapshotId + " whose offset is larger than the high-watermark: " + + highWatermarkOpt + ". This may necessarily mean a bug in the caller, since the there should be a minimum " + + "size of records between the latest snapshot and the high-watermark when creating snapshot"); + } + int leaderEpoch = quorum().epoch(); + if (snapshotId.epoch > leaderEpoch) { + throw new KafkaException("Trying to creating snapshot with snapshotId: " + snapshotId + " whose epoch is" + + " larger than the current leader epoch: " + leaderEpoch); + } Review comment: Thanks, previous I thought the quorum epoch is the leader epoch cache as a mistake~ Updated the PR, one thing I'm not sure about is that: > The epoch of the snapshot is equal to the quorum epoch. I think the snapshotId's epoch <= quorum epoch should be fine? ########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -2268,6 +2269,20 @@ private Long append(int epoch, List<T> records, boolean isAtomic) { ); } + private void validateSnapshotId(OffsetAndEpoch snapshotId) { + Optional<LogOffsetMetadata> highWatermarkOpt = quorum().highWatermark(); + if (!highWatermarkOpt.isPresent() || highWatermarkOpt.get().offset <= snapshotId.offset) { + throw new KafkaException("Trying to creating snapshot with snapshotId: " + snapshotId + " whose offset is larger than the high-watermark: " + + highWatermarkOpt + ". This may necessarily mean a bug in the caller, since the there should be a minimum " + + "size of records between the latest snapshot and the high-watermark when creating snapshot"); + } + int leaderEpoch = quorum().epoch(); + if (snapshotId.epoch > leaderEpoch) { + throw new KafkaException("Trying to creating snapshot with snapshotId: " + snapshotId + " whose epoch is" + + " larger than the current leader epoch: " + leaderEpoch); + } Review comment: Thanks, previous I thought the quorum epoch is the leader epoch cache as a mistake~ Updated the PR, in the jira, one thing I'm not sure about is that: > 2. The epoch of the snapshot is equal to the quorum epoch. I think the snapshotId's epoch <= quorum epoch should be fine? ########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -2268,6 +2269,20 @@ private Long append(int epoch, List<T> records, boolean isAtomic) { ); } + private void validateSnapshotId(OffsetAndEpoch snapshotId) { + Optional<LogOffsetMetadata> highWatermarkOpt = quorum().highWatermark(); + if (!highWatermarkOpt.isPresent() || highWatermarkOpt.get().offset <= snapshotId.offset) { Review comment: Thank you, I updated the PR as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org