This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b7a6a8fd5f0 KAFKA-15076; KRaft should prefer latest snapshot (#13834)
b7a6a8fd5f0 is described below

commit b7a6a8fd5f03144c2ff964f5531470e456dbb4ba
Author: José Armando García Sancio <jsan...@users.noreply.github.com>
AuthorDate: Mon Jun 12 07:25:42 2023 -0700

    KAFKA-15076; KRaft should prefer latest snapshot (#13834)
    
    If the KRaft listener is at offset 0, the start of the log, and KRaft has 
generated a snapshot, it should prefer the latest snapshot instead of having 
the listener read from the start of the log.
    
    This is implemented by having KafkaRaftClient send a 
Listener.handleLoadSnapshot event, if the Listener is at offset 0 and the KRaft 
partition has generated a snapshot.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>, David Arthur 
<mum...@gmail.com>
---
 .../org/apache/kafka/raft/KafkaRaftClient.java     |  7 ++-
 .../kafka/raft/KafkaRaftClientSnapshotTest.java    | 53 ++++++++++++++--------
 2 files changed, 41 insertions(+), 19 deletions(-)

diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java 
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 94002f36d2a..fbb1117da25 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -308,7 +308,12 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
     private void updateListenersProgress(long highWatermark) {
         for (ListenerContext listenerContext : listenerContexts.values()) {
             listenerContext.nextExpectedOffset().ifPresent(nextExpectedOffset 
-> {
-                if (nextExpectedOffset < log.startOffset() && 
nextExpectedOffset < highWatermark) {
+                // Send snapshot to the listener, if the listener is at the 
beginning of the log and there is a snapshot,
+                // or the listener is trying to read an offset for which there 
isn't a segment in the log.
+                if (nextExpectedOffset < highWatermark &&
+                    ((nextExpectedOffset == 0 && latestSnapshot().isPresent()) 
||
+                     nextExpectedOffset < log.startOffset())
+                ) {
                     SnapshotReader<T> snapshot = 
latestSnapshot().orElseThrow(() -> new IllegalStateException(
                         String.format(
                             "Snapshot expected since next offset of %s is %d, 
log start offset is %d and high-watermark is %d",
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
index 4e9a377d5fd..2de6853d2e1 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
@@ -37,6 +37,8 @@ import org.apache.kafka.snapshot.SnapshotReader;
 import org.apache.kafka.snapshot.RecordsSnapshotWriter;
 import org.apache.kafka.snapshot.SnapshotWriterReaderTest;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -87,19 +89,24 @@ final public class KafkaRaftClientSnapshotTest {
         assertEquals(Optional.empty(), context.client.latestSnapshotId());
     }
 
-    @Test
-    public void testLeaderListenerNotified() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testLeaderListenerNotified(boolean entireLog) throws Exception 
{
         int localId = 0;
         int otherNodeId = localId + 1;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
         OffsetAndEpoch snapshotId = new OffsetAndEpoch(3, 1);
 
-        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+        RaftClientTestContext.Builder contextBuilder = new 
RaftClientTestContext.Builder(localId, voters)
             .appendToLog(snapshotId.epoch(), Arrays.asList("a", "b", "c"))
             .appendToLog(snapshotId.epoch(), Arrays.asList("d", "e", "f"))
-            .withEmptySnapshot(snapshotId)
-            .deleteBeforeSnapshot(snapshotId)
-            .build();
+            .withEmptySnapshot(snapshotId);
+
+        if (!entireLog) {
+            contextBuilder.deleteBeforeSnapshot(snapshotId);
+        }
+
+        RaftClientTestContext context = contextBuilder.build();
 
         context.becomeLeader();
         int epoch = context.currentEpoch();
@@ -118,21 +125,26 @@ final public class KafkaRaftClientSnapshotTest {
         }
     }
 
-    @Test
-    public void testFollowerListenerNotified() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testFollowerListenerNotified(boolean entireLog) throws 
Exception {
         int localId = 0;
         int leaderId = localId + 1;
         Set<Integer> voters = Utils.mkSet(localId, leaderId);
         int epoch = 2;
         OffsetAndEpoch snapshotId = new OffsetAndEpoch(3, 1);
 
-        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+        RaftClientTestContext.Builder contextBuilder = new 
RaftClientTestContext.Builder(localId, voters)
             .appendToLog(snapshotId.epoch(), Arrays.asList("a", "b", "c"))
             .appendToLog(snapshotId.epoch(), Arrays.asList("d", "e", "f"))
             .withEmptySnapshot(snapshotId)
-            .deleteBeforeSnapshot(snapshotId)
-            .withElectedLeader(epoch, leaderId)
-            .build();
+            .withElectedLeader(epoch, leaderId);
+
+        if (!entireLog) {
+            contextBuilder.deleteBeforeSnapshot(snapshotId);
+        }
+
+        RaftClientTestContext context = contextBuilder.build();
 
         // Advance the highWatermark
         long localLogEndOffset = context.log.endOffset().offset;
@@ -155,21 +167,26 @@ final public class KafkaRaftClientSnapshotTest {
         }
     }
 
-    @Test
-    public void testSecondListenerNotified() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testSecondListenerNotified(boolean entireLog) throws Exception 
{
         int localId = 0;
         int leaderId = localId + 1;
         Set<Integer> voters = Utils.mkSet(localId, leaderId);
         int epoch = 2;
         OffsetAndEpoch snapshotId = new OffsetAndEpoch(3, 1);
 
-        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+        RaftClientTestContext.Builder contextBuilder = new 
RaftClientTestContext.Builder(localId, voters)
             .appendToLog(snapshotId.epoch(), Arrays.asList("a", "b", "c"))
             .appendToLog(snapshotId.epoch(), Arrays.asList("d", "e", "f"))
             .withEmptySnapshot(snapshotId)
-            .deleteBeforeSnapshot(snapshotId)
-            .withElectedLeader(epoch, leaderId)
-            .build();
+            .withElectedLeader(epoch, leaderId);
+
+        if (!entireLog) {
+            contextBuilder.deleteBeforeSnapshot(snapshotId);
+        }
+
+        RaftClientTestContext context = contextBuilder.build();
 
         // Advance the highWatermark
         long localLogEndOffset = context.log.endOffset().offset;

Reply via email to