jsancio commented on code in PR #17500:
URL: https://github.com/apache/kafka/pull/17500#discussion_r1870235740
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3426,6 +3426,12 @@ public Optional<SnapshotWriter<T>> createSnapshot(
throw new IllegalStateException("Cannot create snapshot before the
replica has been initialized");
}
+ long baseOffset = log.read(snapshotId.offset(),
Isolation.COMMITTED).startOffsetMetadata.offset();
+ if (snapshotId.offset() != baseOffset) {
+ logger.info("Cannot create snapshot at offset {} because it is not
batch aligned", snapshotId.offset());
+ throw new IllegalArgumentException("Cannot create snapshot at an
offset that is not batch aligned");
Review Comment:
Let's include as much information as possible. That includes the snapshot
id's end offset and the return batch's base offset?
##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java:
##########
@@ -1927,18 +1934,51 @@ public void
testCreateSnapshotAsLeaderWithInvalidSnapshotId(boolean withKip853Rp
context.client.poll();
assertEquals(context.log.endOffset().offset(),
context.client.highWatermark().getAsLong() + newRecords.size());
- OffsetAndEpoch invalidSnapshotId2 = new
OffsetAndEpoch(context.client.highWatermark().getAsLong() + 2, currentEpoch);
- assertThrows(IllegalArgumentException.class, () ->
context.client.createSnapshot(invalidSnapshotId2, 0));
+ OffsetAndEpoch invalidSnapshotId2 = new
OffsetAndEpoch(context.client.highWatermark().getAsLong() + 3, currentEpoch);
+ exception = assertThrows(
+ IllegalArgumentException.class,
+ () -> context.client.createSnapshot(invalidSnapshotId2, 0)
+ );
+ assertEquals(
+ "Cannot create a snapshot with an id (OffsetAndEpoch(offset=7,
epoch=3)) greater than the high-watermark (4)",
+ exception.getMessage()
+ );
// 2 the quorum epoch must larger than or equal to the snapshotId's
epoch
OffsetAndEpoch invalidSnapshotId3 = new
OffsetAndEpoch(context.client.highWatermark().getAsLong(), currentEpoch + 1);
- assertThrows(IllegalArgumentException.class, () ->
context.client.createSnapshot(invalidSnapshotId3, 0));
+ exception = assertThrows(
+ IllegalArgumentException.class,
+ () -> context.client.createSnapshot(invalidSnapshotId3, 0)
+ );
+ assertEquals(
+ "Snapshot id (OffsetAndEpoch(offset=4, epoch=4)) is not valid
according to the log: ValidOffsetAndEpoch(kind=DIVERGING,
offsetAndEpoch=OffsetAndEpoch(offset=7, epoch=3))",
+ exception.getMessage()
+ );
// 3 the snapshotId should be validated against endOffsetForEpoch
OffsetAndEpoch endOffsetForEpoch =
context.log.endOffsetForEpoch(epoch);
assertEquals(epoch, endOffsetForEpoch.epoch());
- OffsetAndEpoch invalidSnapshotId4 = new
OffsetAndEpoch(endOffsetForEpoch.offset() + 2, epoch);
- assertThrows(IllegalArgumentException.class, () ->
context.client.createSnapshot(invalidSnapshotId4, 0));
+ OffsetAndEpoch invalidSnapshotId4 = new
OffsetAndEpoch(endOffsetForEpoch.offset() + 1, epoch);
+ exception = assertThrows(
+ IllegalArgumentException.class,
+ () -> context.client.createSnapshot(invalidSnapshotId4, 0)
+ );
+ assertEquals(
+ "Snapshot id (OffsetAndEpoch(offset=4, epoch=2)) is not valid
according to the log: ValidOffsetAndEpoch(kind=DIVERGING,
offsetAndEpoch=OffsetAndEpoch(offset=3, epoch=2))",
+ exception.getMessage()
+ );
+
+ // 4 snapshotId offset must be at a batch boundary
Review Comment:
I would mention that this points to the middle of batch (d, e, f). It points
to the f offset which is not batch aligned, right?
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3426,6 +3426,12 @@ public Optional<SnapshotWriter<T>> createSnapshot(
throw new IllegalStateException("Cannot create snapshot before the
replica has been initialized");
}
+ long baseOffset = log.read(snapshotId.offset(),
Isolation.COMMITTED).startOffsetMetadata.offset();
Review Comment:
Let's document why you are doing this and why it is correct. It may not be
obvious to future readers why the snapshot's end offset must be batch aligned
or point to the log end offset.
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3426,6 +3426,12 @@ public Optional<SnapshotWriter<T>> createSnapshot(
throw new IllegalStateException("Cannot create snapshot before the
replica has been initialized");
}
+ long baseOffset = log.read(snapshotId.offset(),
Isolation.COMMITTED).startOffsetMetadata.offset();
+ if (snapshotId.offset() != baseOffset) {
+ logger.info("Cannot create snapshot at offset {} because it is not
batch aligned", snapshotId.offset());
Review Comment:
We should also record the `baseOffset` returned by `log.read` in the log
message.
##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java:
##########
@@ -1927,18 +1934,51 @@ public void
testCreateSnapshotAsLeaderWithInvalidSnapshotId(boolean withKip853Rp
context.client.poll();
assertEquals(context.log.endOffset().offset(),
context.client.highWatermark().getAsLong() + newRecords.size());
- OffsetAndEpoch invalidSnapshotId2 = new
OffsetAndEpoch(context.client.highWatermark().getAsLong() + 2, currentEpoch);
- assertThrows(IllegalArgumentException.class, () ->
context.client.createSnapshot(invalidSnapshotId2, 0));
+ OffsetAndEpoch invalidSnapshotId2 = new
OffsetAndEpoch(context.client.highWatermark().getAsLong() + 3, currentEpoch);
Review Comment:
Did you need to change this because `HWM + 2` was not batched aligned since
the batch after the HWM has three records? If so, I would make this explicit by
using `HWM + newRecords.size()`.
##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java:
##########
@@ -1951,6 +1991,7 @@ public void
testCreateSnapshotAsFollowerWithInvalidSnapshotId(boolean withKip853
Set<Integer> voters = Set.of(localId, leaderId, otherFollowerId);
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
+ .appendToLog(0, List.of("a"))
Review Comment:
Technically, `0` is not a valid leader epoch. The smallest possible epoch
for a leader is 1.
##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java:
##########
@@ -2003,11 +2054,43 @@ public void
testCreateSnapshotAsFollowerWithInvalidSnapshotId(boolean withKip853
context.client.poll();
assertEquals(6L, context.client.highWatermark().getAsLong());
+ // 3) The quorum epoch must be larger than or equal to the
snapshotId's epoch
+ OffsetAndEpoch invalidSnapshotId3 = new
OffsetAndEpoch(context.client.highWatermark().getAsLong(), currentEpoch + 1);
+ exception = assertThrows(
+ IllegalArgumentException.class,
+ () -> context.client.createSnapshot(invalidSnapshotId3, 0)
+ );
+ assertEquals(
+ "Snapshot id (OffsetAndEpoch(offset=6, epoch=6)) is not valid
according to the log: ValidOffsetAndEpoch(kind=DIVERGING,
offsetAndEpoch=OffsetAndEpoch(offset=6, epoch=4))",
+ exception.getMessage()
+ );
+
// 4) The snapshotId should be validated against endOffsetForEpoch
OffsetAndEpoch endOffsetForEpoch = context.log.endOffsetForEpoch(3);
assertEquals(3, endOffsetForEpoch.epoch());
- OffsetAndEpoch invalidSnapshotId4 = new
OffsetAndEpoch(endOffsetForEpoch.offset() + 1, epoch);
- assertThrows(IllegalArgumentException.class, () ->
context.client.createSnapshot(invalidSnapshotId4, 0));
+ OffsetAndEpoch invalidSnapshotId4 = new
OffsetAndEpoch(endOffsetForEpoch.offset() + 3, 3);
+ exception = assertThrows(
+ IllegalArgumentException.class,
+ () -> context.client.createSnapshot(invalidSnapshotId4, 0)
+ );
+ assertEquals(
+ "Snapshot id (OffsetAndEpoch(offset=6, epoch=3)) is not valid
according to the log: ValidOffsetAndEpoch(kind=DIVERGING,
offsetAndEpoch=OffsetAndEpoch(offset=3, epoch=3))",
+ exception.getMessage()
+ );
+
+ // 5) The snapshotId should be batch-aligned
+ endOffsetForEpoch = context.log.endOffsetForEpoch(currentEpoch);
+ assertEquals(4, endOffsetForEpoch.epoch());
+ assertEquals(6, endOffsetForEpoch.offset());
Review Comment:
I found this difficult to follow. I assume this point to `f` in the batch
`d, e, f`. That has an base offset of 3, epoch of 4 and size of 3. So you are
trying to check that offset 5 (f) is not batch aligned.
The part that makes it difficult to follow is that `currentEpoch >= 5`.
The other part that make it diffcult to follow is that you are using
currentEpoch in `invalidSnapshotId5` and not 4.
I think it easier to read to to make 4 a variable `batch2Epoch` and using it
here and in `context.buildBatch(3L, 4, records2)`. What do you think?
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3426,6 +3426,12 @@ public Optional<SnapshotWriter<T>> createSnapshot(
throw new IllegalStateException("Cannot create snapshot before the
replica has been initialized");
}
+ long baseOffset = log.read(snapshotId.offset(),
Isolation.COMMITTED).startOffsetMetadata.offset();
Review Comment:
Is there a reason why you added this check here and not to `ReplicatedLog`
(`MockLog` and `KafkaMetadataLog`)? Those types already do snapshot id
validation against the underlying log records.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]