jsancio commented on a change in pull request #10593: URL: https://github.com/apache/kafka/pull/10593#discussion_r619882084
########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -2268,6 +2269,20 @@ private Long append(int epoch, List<T> records, boolean isAtomic) { ); } + private void validateSnapshotId(OffsetAndEpoch snapshotId) { + Optional<LogOffsetMetadata> highWatermarkOpt = quorum().highWatermark(); + if (!highWatermarkOpt.isPresent() || highWatermarkOpt.get().offset <= snapshotId.offset) { Review comment: You are correct. I think when I created the Jira I overlooked that both snapshot id's end offset and the high-watermark are exclusive values. Update the Jira's description. ########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -2268,6 +2269,20 @@ private Long append(int epoch, List<T> records, boolean isAtomic) { ); } + private void validateSnapshotId(OffsetAndEpoch snapshotId) { + Optional<LogOffsetMetadata> highWatermarkOpt = quorum().highWatermark(); + if (!highWatermarkOpt.isPresent() || highWatermarkOpt.get().offset <= snapshotId.offset) { + throw new KafkaException("Trying to creating snapshot with snapshotId: " + snapshotId + " whose offset is larger than the high-watermark: " + + highWatermarkOpt + ". This may necessarily mean a bug in the caller, since the there should be a minimum " + + "size of records between the latest snapshot and the high-watermark when creating snapshot"); + } + int leaderEpoch = quorum().epoch(); + if (snapshotId.epoch > leaderEpoch) { + throw new KafkaException("Trying to creating snapshot with snapshotId: " + snapshotId + " whose epoch is" + + " larger than the current leader epoch: " + leaderEpoch); + } Review comment: From the Jira: > The end offset and epoch of the snapshot is valid based on the leader epoch cache. How about also validating against the leader epoch cache? See https://github.com/apache/kafka/blob/trunk/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java#L124. This is important because both the snapshot and the leader epoch cache are used to validate offsets. See https://github.com/apache/kafka/blob/trunk/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java#L85. The term leader epoch cache comes the variable name `leaderEpochCache` used in `kafka.log.Log`. ########## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java ########## @@ -1335,6 +1313,51 @@ public void testFetchSnapshotRequestClusterIdValidation() throws Exception { context.assertSentFetchSnapshotResponse(Errors.INCONSISTENT_CLUSTER_ID); } + @Test + public void testCreateSnapshotWithInvalidSnapshotId() throws Exception { + int localId = 0; + int otherNodeId = localId + 1; + Set<Integer> voters = Utils.mkSet(localId, otherNodeId); + int epoch = 2; + + List<String> appendRecords = Arrays.asList("a", "b", "c"); + OffsetAndEpoch invalidSnapshotId1 = new OffsetAndEpoch(3, epoch); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + .appendToLog(epoch, appendRecords) + .withAppendLingerMs(1) + .build(); + + context.becomeLeader(); + int currentEpoch = context.currentEpoch(); + + // When creating snapshot: + // 1. high watermark cannot be empty + assertEquals(OptionalLong.empty(), context.client.highWatermark()); + assertThrows(KafkaException.class, () -> context.client.createSnapshot(invalidSnapshotId1)); + + // 2. high watermark must larger than the snapshotId's endOffset + advanceHighWatermark(context, currentEpoch, currentEpoch, otherNodeId, localId); + assertNotEquals(OptionalLong.empty(), context.client.highWatermark()); + OffsetAndEpoch invalidSnapshotId2 = new OffsetAndEpoch(context.client.highWatermark().getAsLong(), currentEpoch); + assertThrows(KafkaException.class, () -> context.client.createSnapshot(invalidSnapshotId2)); + + // 3. the current leader epoch cache must larger than the snapshotId's epoch + OffsetAndEpoch invalidSnapshotId3 = new OffsetAndEpoch(context.client.highWatermark().getAsLong() - 1, currentEpoch + 1); + assertThrows(KafkaException.class, () -> context.client.createSnapshot(invalidSnapshotId3)); + } + + private void advanceHighWatermark(RaftClientTestContext context, Review comment: Thanks for clean up the code duplication. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org