This is an automated email from the ASF dual-hosted git repository. jsancio pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new b7a6a8fd5f0 KAFKA-15076; KRaft should prefer latest snapshot (#13834) b7a6a8fd5f0 is described below commit b7a6a8fd5f03144c2ff964f5531470e456dbb4ba Author: José Armando García Sancio <jsan...@users.noreply.github.com> AuthorDate: Mon Jun 12 07:25:42 2023 -0700 KAFKA-15076; KRaft should prefer latest snapshot (#13834) If the KRaft listener is at offset 0, the start of the log, and KRaft has generated a snapshot, it should prefer the latest snapshot instead of having the listener read from the start of the log. This is implemented by having KafkaRaftClient send a Listener.handleLoadSnapshot event, if the Listener is at offset 0 and the KRaft partition has generated a snapshot. Reviewers: Jason Gustafson <ja...@confluent.io>, David Arthur <mum...@gmail.com> --- .../org/apache/kafka/raft/KafkaRaftClient.java | 7 ++- .../kafka/raft/KafkaRaftClientSnapshotTest.java | 53 ++++++++++++++-------- 2 files changed, 41 insertions(+), 19 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 94002f36d2a..fbb1117da25 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -308,7 +308,12 @@ public class KafkaRaftClient<T> implements RaftClient<T> { private void updateListenersProgress(long highWatermark) { for (ListenerContext listenerContext : listenerContexts.values()) { listenerContext.nextExpectedOffset().ifPresent(nextExpectedOffset -> { - if (nextExpectedOffset < log.startOffset() && nextExpectedOffset < highWatermark) { + // Send snapshot to the listener, if the listener is at the beginning of the log and there is a snapshot, + // or the listener is trying to read an offset for which there isn't a segment in the log. + if (nextExpectedOffset < highWatermark && + ((nextExpectedOffset == 0 && latestSnapshot().isPresent()) || + nextExpectedOffset < log.startOffset()) + ) { SnapshotReader<T> snapshot = latestSnapshot().orElseThrow(() -> new IllegalStateException( String.format( "Snapshot expected since next offset of %s is %d, log start offset is %d and high-watermark is %d", diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java index 4e9a377d5fd..2de6853d2e1 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java @@ -37,6 +37,8 @@ import org.apache.kafka.snapshot.SnapshotReader; import org.apache.kafka.snapshot.RecordsSnapshotWriter; import org.apache.kafka.snapshot.SnapshotWriterReaderTest; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import java.nio.ByteBuffer; @@ -87,19 +89,24 @@ final public class KafkaRaftClientSnapshotTest { assertEquals(Optional.empty(), context.client.latestSnapshotId()); } - @Test - public void testLeaderListenerNotified() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testLeaderListenerNotified(boolean entireLog) throws Exception { int localId = 0; int otherNodeId = localId + 1; Set<Integer> voters = Utils.mkSet(localId, otherNodeId); OffsetAndEpoch snapshotId = new OffsetAndEpoch(3, 1); - RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + RaftClientTestContext.Builder contextBuilder = new RaftClientTestContext.Builder(localId, voters) .appendToLog(snapshotId.epoch(), Arrays.asList("a", "b", "c")) .appendToLog(snapshotId.epoch(), Arrays.asList("d", "e", "f")) - .withEmptySnapshot(snapshotId) - .deleteBeforeSnapshot(snapshotId) - .build(); + .withEmptySnapshot(snapshotId); + + if (!entireLog) { + contextBuilder.deleteBeforeSnapshot(snapshotId); + } + + RaftClientTestContext context = contextBuilder.build(); context.becomeLeader(); int epoch = context.currentEpoch(); @@ -118,21 +125,26 @@ final public class KafkaRaftClientSnapshotTest { } } - @Test - public void testFollowerListenerNotified() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testFollowerListenerNotified(boolean entireLog) throws Exception { int localId = 0; int leaderId = localId + 1; Set<Integer> voters = Utils.mkSet(localId, leaderId); int epoch = 2; OffsetAndEpoch snapshotId = new OffsetAndEpoch(3, 1); - RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + RaftClientTestContext.Builder contextBuilder = new RaftClientTestContext.Builder(localId, voters) .appendToLog(snapshotId.epoch(), Arrays.asList("a", "b", "c")) .appendToLog(snapshotId.epoch(), Arrays.asList("d", "e", "f")) .withEmptySnapshot(snapshotId) - .deleteBeforeSnapshot(snapshotId) - .withElectedLeader(epoch, leaderId) - .build(); + .withElectedLeader(epoch, leaderId); + + if (!entireLog) { + contextBuilder.deleteBeforeSnapshot(snapshotId); + } + + RaftClientTestContext context = contextBuilder.build(); // Advance the highWatermark long localLogEndOffset = context.log.endOffset().offset; @@ -155,21 +167,26 @@ final public class KafkaRaftClientSnapshotTest { } } - @Test - public void testSecondListenerNotified() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testSecondListenerNotified(boolean entireLog) throws Exception { int localId = 0; int leaderId = localId + 1; Set<Integer> voters = Utils.mkSet(localId, leaderId); int epoch = 2; OffsetAndEpoch snapshotId = new OffsetAndEpoch(3, 1); - RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + RaftClientTestContext.Builder contextBuilder = new RaftClientTestContext.Builder(localId, voters) .appendToLog(snapshotId.epoch(), Arrays.asList("a", "b", "c")) .appendToLog(snapshotId.epoch(), Arrays.asList("d", "e", "f")) .withEmptySnapshot(snapshotId) - .deleteBeforeSnapshot(snapshotId) - .withElectedLeader(epoch, leaderId) - .build(); + .withElectedLeader(epoch, leaderId); + + if (!entireLog) { + contextBuilder.deleteBeforeSnapshot(snapshotId); + } + + RaftClientTestContext context = contextBuilder.build(); // Advance the highWatermark long localLogEndOffset = context.log.endOffset().offset;