This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 5a103ba  KAFKA-12851: Fix Raft partition simulation (#11134)
5a103ba is described below

commit 5a103baa8b6d36258763dfc484dd7b697888323b
Author: José Armando García Sancio <[email protected]>
AuthorDate: Wed Jul 28 09:28:56 2021 -0700

    KAFKA-12851: Fix Raft partition simulation (#11134)
    
    Instead of waiting for a high-watermark of 20 after the partition, the
    test should wait for the high-watermark to reach an offset greater than
    the largest log end offset at the time of the partition. Only that offset
    is guarantee to be reached as the high-watermark by the new majority.
    
    Reviewers: Jason Gustafson <[email protected]>
---
 .../java/org/apache/kafka/raft/LeaderState.java    | 12 +++++
 .../apache/kafka/raft/RaftEventSimulationTest.java | 54 +++++++++++++++++-----
 2 files changed, 54 insertions(+), 12 deletions(-)

diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java 
b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
index 2bcc931..de08b7b 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
@@ -171,6 +171,12 @@ public class LeaderState<T> implements EpochState {
                         || (highWatermarkUpdateOffset == 
currentHighWatermarkMetadata.offset &&
                             
!highWatermarkUpdateMetadata.metadata.equals(currentHighWatermarkMetadata.metadata)))
 {
                         highWatermark = highWatermarkUpdateOpt;
+                        log.trace(
+                            "High watermark updated to {} based on indexOfHw 
{} and voters {}",
+                            highWatermark,
+                            indexOfHw,
+                            followersByDescendingFetchOffset
+                        );
                         return true;
                     } else if (highWatermarkUpdateOffset < 
currentHighWatermarkMetadata.offset) {
                         log.error("The latest computed high watermark {} is 
smaller than the current " +
@@ -183,6 +189,12 @@ public class LeaderState<T> implements EpochState {
                     }
                 } else {
                     highWatermark = highWatermarkUpdateOpt;
+                    log.trace(
+                        "High watermark set to {} based on indexOfHw {} and 
voters {}",
+                        highWatermark,
+                        indexOfHw,
+                        followersByDescendingFetchOffset
+                    );
                     return true;
                 }
             }
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java 
b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
index 6cfea2b..120eca3 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
@@ -256,12 +256,20 @@ public class RaftEventSimulationTest {
         router.filter(3, new DropOutboundRequestsFrom(Utils.mkSet(0, 1)));
         router.filter(4, new DropOutboundRequestsFrom(Utils.mkSet(0, 1)));
 
-        scheduler.runUntil(() -> cluster.anyReachedHighWatermark(20));
+        long partitionLogEndOffset = cluster.maxLogEndOffset();
+        scheduler.runUntil(() -> cluster.anyReachedHighWatermark(2 * 
partitionLogEndOffset));
 
         long minorityHighWatermark = 
cluster.maxHighWatermarkReached(Utils.mkSet(0, 1));
         long majorityHighWatermark = 
cluster.maxHighWatermarkReached(Utils.mkSet(2, 3, 4));
 
-        assertTrue(majorityHighWatermark > minorityHighWatermark);
+        assertTrue(
+            majorityHighWatermark > minorityHighWatermark,
+            String.format(
+                "majorityHighWatermark = %s, minorityHighWatermark = %s",
+                majorityHighWatermark,
+                minorityHighWatermark
+            )
+        );
 
         // Now restore the partition and verify everyone catches up
         router.filter(0, new PermitAllTraffic());
@@ -270,7 +278,8 @@ public class RaftEventSimulationTest {
         router.filter(3, new PermitAllTraffic());
         router.filter(4, new PermitAllTraffic());
 
-        scheduler.runUntil(() -> cluster.allReachedHighWatermark(30));
+        long restoredLogEndOffset = cluster.maxLogEndOffset();
+        scheduler.runUntil(() -> cluster.allReachedHighWatermark(2 * 
restoredLogEndOffset));
     }
 
     @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
@@ -541,9 +550,21 @@ public class RaftEventSimulationTest {
             return voters.size() / 2 + 1;
         }
 
+        long maxLogEndOffset() {
+            return running
+                .values()
+                .stream()
+                .mapToLong(RaftNode::logEndOffset)
+                .max()
+                .orElse(0L);
+        }
+
         OptionalLong leaderHighWatermark() {
-            Optional<RaftNode> leaderWithMaxEpoch = 
running.values().stream().filter(node -> node.client.quorum().isLeader())
-                    .max((node1, node2) -> 
Integer.compare(node2.client.quorum().epoch(), node1.client.quorum().epoch()));
+            Optional<RaftNode> leaderWithMaxEpoch = running
+                .values()
+                .stream()
+                .filter(node -> node.client.quorum().isLeader())
+                .max((node1, node2) -> 
Integer.compare(node2.client.quorum().epoch(), node1.client.quorum().epoch()));
             if (leaderWithMaxEpoch.isPresent()) {
                 return leaderWithMaxEpoch.get().client.highWatermark();
             } else {
@@ -558,27 +579,27 @@ public class RaftEventSimulationTest {
 
         long maxHighWatermarkReached() {
             return running.values().stream()
-                .map(RaftNode::highWatermark)
-                .max(Long::compareTo)
+                .mapToLong(RaftNode::highWatermark)
+                .max()
                 .orElse(0L);
         }
 
         long maxHighWatermarkReached(Set<Integer> nodeIds) {
             return running.values().stream()
                 .filter(node -> nodeIds.contains(node.nodeId))
-                .map(RaftNode::highWatermark)
-                .max(Long::compareTo)
+                .mapToLong(RaftNode::highWatermark)
+                .max()
                 .orElse(0L);
         }
 
         boolean allReachedHighWatermark(long offset, Set<Integer> nodeIds) {
             return nodeIds.stream()
-                .allMatch(nodeId -> running.get(nodeId).highWatermark() > 
offset);
+                .allMatch(nodeId -> running.get(nodeId).highWatermark() >= 
offset);
         }
 
         boolean allReachedHighWatermark(long offset) {
             return running.values().stream()
-                .allMatch(node -> node.highWatermark() > offset);
+                .allMatch(node -> node.highWatermark() >= offset);
         }
 
         boolean hasLeader(int nodeId) {
@@ -799,9 +820,18 @@ public class RaftEventSimulationTest {
                 .orElse(0L);
         }
 
+        long logEndOffset() {
+            return log.endOffset().offset;
+        }
+
         @Override
         public String toString() {
-            return "Node(id=" + nodeId + ", hw=" + highWatermark() + ")";
+            return String.format(
+                "Node(id=%s, hw=%s, logEndOffset=%s)",
+                nodeId,
+                highWatermark(),
+                logEndOffset()
+            );
         }
     }
 

Reply via email to