This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 5a103ba KAFKA-12851: Fix Raft partition simulation (#11134)
5a103ba is described below
commit 5a103baa8b6d36258763dfc484dd7b697888323b
Author: José Armando García Sancio <[email protected]>
AuthorDate: Wed Jul 28 09:28:56 2021 -0700
KAFKA-12851: Fix Raft partition simulation (#11134)
Instead of waiting for a high-watermark of 20 after the partition, the
test should wait for the high-watermark to reach an offset greater than
the largest log end offset at the time of the partition. Only that offset
is guarantee to be reached as the high-watermark by the new majority.
Reviewers: Jason Gustafson <[email protected]>
---
.../java/org/apache/kafka/raft/LeaderState.java | 12 +++++
.../apache/kafka/raft/RaftEventSimulationTest.java | 54 +++++++++++++++++-----
2 files changed, 54 insertions(+), 12 deletions(-)
diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
index 2bcc931..de08b7b 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
@@ -171,6 +171,12 @@ public class LeaderState<T> implements EpochState {
|| (highWatermarkUpdateOffset ==
currentHighWatermarkMetadata.offset &&
!highWatermarkUpdateMetadata.metadata.equals(currentHighWatermarkMetadata.metadata)))
{
highWatermark = highWatermarkUpdateOpt;
+ log.trace(
+ "High watermark updated to {} based on indexOfHw
{} and voters {}",
+ highWatermark,
+ indexOfHw,
+ followersByDescendingFetchOffset
+ );
return true;
} else if (highWatermarkUpdateOffset <
currentHighWatermarkMetadata.offset) {
log.error("The latest computed high watermark {} is
smaller than the current " +
@@ -183,6 +189,12 @@ public class LeaderState<T> implements EpochState {
}
} else {
highWatermark = highWatermarkUpdateOpt;
+ log.trace(
+ "High watermark set to {} based on indexOfHw {} and
voters {}",
+ highWatermark,
+ indexOfHw,
+ followersByDescendingFetchOffset
+ );
return true;
}
}
diff --git
a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
index 6cfea2b..120eca3 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
@@ -256,12 +256,20 @@ public class RaftEventSimulationTest {
router.filter(3, new DropOutboundRequestsFrom(Utils.mkSet(0, 1)));
router.filter(4, new DropOutboundRequestsFrom(Utils.mkSet(0, 1)));
- scheduler.runUntil(() -> cluster.anyReachedHighWatermark(20));
+ long partitionLogEndOffset = cluster.maxLogEndOffset();
+ scheduler.runUntil(() -> cluster.anyReachedHighWatermark(2 *
partitionLogEndOffset));
long minorityHighWatermark =
cluster.maxHighWatermarkReached(Utils.mkSet(0, 1));
long majorityHighWatermark =
cluster.maxHighWatermarkReached(Utils.mkSet(2, 3, 4));
- assertTrue(majorityHighWatermark > minorityHighWatermark);
+ assertTrue(
+ majorityHighWatermark > minorityHighWatermark,
+ String.format(
+ "majorityHighWatermark = %s, minorityHighWatermark = %s",
+ majorityHighWatermark,
+ minorityHighWatermark
+ )
+ );
// Now restore the partition and verify everyone catches up
router.filter(0, new PermitAllTraffic());
@@ -270,7 +278,8 @@ public class RaftEventSimulationTest {
router.filter(3, new PermitAllTraffic());
router.filter(4, new PermitAllTraffic());
- scheduler.runUntil(() -> cluster.allReachedHighWatermark(30));
+ long restoredLogEndOffset = cluster.maxLogEndOffset();
+ scheduler.runUntil(() -> cluster.allReachedHighWatermark(2 *
restoredLogEndOffset));
}
@Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
@@ -541,9 +550,21 @@ public class RaftEventSimulationTest {
return voters.size() / 2 + 1;
}
+ long maxLogEndOffset() {
+ return running
+ .values()
+ .stream()
+ .mapToLong(RaftNode::logEndOffset)
+ .max()
+ .orElse(0L);
+ }
+
OptionalLong leaderHighWatermark() {
- Optional<RaftNode> leaderWithMaxEpoch =
running.values().stream().filter(node -> node.client.quorum().isLeader())
- .max((node1, node2) ->
Integer.compare(node2.client.quorum().epoch(), node1.client.quorum().epoch()));
+ Optional<RaftNode> leaderWithMaxEpoch = running
+ .values()
+ .stream()
+ .filter(node -> node.client.quorum().isLeader())
+ .max((node1, node2) ->
Integer.compare(node2.client.quorum().epoch(), node1.client.quorum().epoch()));
if (leaderWithMaxEpoch.isPresent()) {
return leaderWithMaxEpoch.get().client.highWatermark();
} else {
@@ -558,27 +579,27 @@ public class RaftEventSimulationTest {
long maxHighWatermarkReached() {
return running.values().stream()
- .map(RaftNode::highWatermark)
- .max(Long::compareTo)
+ .mapToLong(RaftNode::highWatermark)
+ .max()
.orElse(0L);
}
long maxHighWatermarkReached(Set<Integer> nodeIds) {
return running.values().stream()
.filter(node -> nodeIds.contains(node.nodeId))
- .map(RaftNode::highWatermark)
- .max(Long::compareTo)
+ .mapToLong(RaftNode::highWatermark)
+ .max()
.orElse(0L);
}
boolean allReachedHighWatermark(long offset, Set<Integer> nodeIds) {
return nodeIds.stream()
- .allMatch(nodeId -> running.get(nodeId).highWatermark() >
offset);
+ .allMatch(nodeId -> running.get(nodeId).highWatermark() >=
offset);
}
boolean allReachedHighWatermark(long offset) {
return running.values().stream()
- .allMatch(node -> node.highWatermark() > offset);
+ .allMatch(node -> node.highWatermark() >= offset);
}
boolean hasLeader(int nodeId) {
@@ -799,9 +820,18 @@ public class RaftEventSimulationTest {
.orElse(0L);
}
+ long logEndOffset() {
+ return log.endOffset().offset;
+ }
+
@Override
public String toString() {
- return "Node(id=" + nodeId + ", hw=" + highWatermark() + ")";
+ return String.format(
+ "Node(id=%s, hw=%s, logEndOffset=%s)",
+ nodeId,
+ highWatermark(),
+ logEndOffset()
+ );
}
}