This is an automated email from the ASF dual-hosted git repository. jsancio pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 37416e1aeba KAFKA-15489: resign leadership when no fetch or fetch snapshot from majority voters (#14428) 37416e1aeba is described below commit 37416e1aebae33d01d5059ba906ec8e0e1107284 Author: Luke Chen <show...@gmail.com> AuthorDate: Fri Dec 1 03:34:44 2023 +0800 KAFKA-15489: resign leadership when no fetch or fetch snapshot from majority voters (#14428) In KIP-595, we expect to piggy-back on the `quorum.fetch.timeout.ms` config, and if the leader did not receive Fetch requests from a majority of the quorum for that amount of time, it would begin a new election, to resolve the network partition in the quorum. But we missed this implementation in current KRaft. Fixed it in this PR. The commit include: 1. Added a timer with timeout configuration in `LeaderState`, and check if expired each time when leader is polled. If expired, resigning the leadership and start a new election. 2. Added `fetchedVoters` in `LeaderState`, and update the value each time received a FETCH or FETCH_SNAPSHOT request, and clear it and resets the timer if the majority - 1 of the remote voters sent such requests. Reviewers: José Armando García Sancio <jsan...@apache.org> --- .../common/requests/FetchSnapshotRequest.java | 2 + .../org/apache/kafka/raft/KafkaRaftClient.java | 14 ++- .../java/org/apache/kafka/raft/LeaderState.java | 59 ++++++++++++ .../java/org/apache/kafka/raft/QuorumState.java | 2 + .../java/org/apache/kafka/raft/RaftConfig.java | 4 +- .../kafka/raft/KafkaRaftClientSnapshotTest.java | 100 ++++++++++++++++++++- .../org/apache/kafka/raft/KafkaRaftClientTest.java | 54 +++++++++++ .../org/apache/kafka/raft/LeaderStateTest.java | 44 ++++++++- .../apache/kafka/raft/RaftClientTestContext.java | 2 + 9 files changed, 273 insertions(+), 8 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchSnapshotRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchSnapshotRequest.java index 1769e94f47f..34db2d1c124 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchSnapshotRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchSnapshotRequest.java @@ -61,6 +61,7 @@ final public class FetchSnapshotRequest extends AbstractRequest { */ public static FetchSnapshotRequestData singleton( String clusterId, + int replicaId, TopicPartition topicPartition, UnaryOperator<FetchSnapshotRequestData.PartitionSnapshot> operator ) { @@ -70,6 +71,7 @@ final public class FetchSnapshotRequest extends AbstractRequest { return new FetchSnapshotRequestData() .setClusterId(clusterId) + .setReplicaId(replicaId) .setTopics( Collections.singletonList( new FetchSnapshotRequestData.TopicSnapshot() diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index b2e14ee3ec9..a04240b6426 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -1257,7 +1257,8 @@ public class KafkaRaftClient<T> implements RaftClient<T> { * - {@link Errors#POSITION_OUT_OF_RANGE} if the request snapshot offset out of range */ private FetchSnapshotResponseData handleFetchSnapshotRequest( - RaftRequest.Inbound requestMetadata + RaftRequest.Inbound requestMetadata, + long currentTimeMs ) { FetchSnapshotRequestData data = (FetchSnapshotRequestData) requestMetadata.data; @@ -1340,6 +1341,9 @@ public class KafkaRaftClient<T> implements RaftClient<T> { UnalignedRecords records = snapshot.slice(partitionSnapshot.position(), Math.min(data.maxBytes(), maxSnapshotSize)); + LeaderState<T> state = quorum.leaderStateOrThrow(); + state.updateCheckQuorumForFollowingVoter(data.replicaId(), currentTimeMs); + return FetchSnapshotResponse.singleton( log.topicPartition(), responsePartitionSnapshot -> { @@ -1701,7 +1705,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> { break; case FETCH_SNAPSHOT: - responseFuture = completedFuture(handleFetchSnapshotRequest(request)); + responseFuture = completedFuture(handleFetchSnapshotRequest(request, currentTimeMs)); break; default: @@ -1875,6 +1879,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> { FetchSnapshotRequestData request = FetchSnapshotRequest.singleton( clusterId, + quorum().localIdOrSentinel(), log.topicPartition(), snapshotPartition -> { return snapshotPartition @@ -1990,7 +1995,8 @@ public class KafkaRaftClient<T> implements RaftClient<T> { LeaderState<T> state = quorum.leaderStateOrThrow(); maybeFireLeaderChange(state); - if (shutdown.get() != null || state.isResignRequested()) { + long timeUntilCheckQuorumExpires = state.timeUntilCheckQuorumExpires(currentTimeMs); + if (shutdown.get() != null || state.isResignRequested() || timeUntilCheckQuorumExpires == 0) { transitionToResigned(state.nonLeaderVotersByDescendingFetchOffset()); return 0L; } @@ -2006,7 +2012,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> { this::buildBeginQuorumEpochRequest ); - return Math.min(timeUntilFlush, timeUntilSend); + return Math.min(timeUntilFlush, Math.min(timeUntilSend, timeUntilCheckQuorumExpires)); } private long maybeSendVoteRequests( diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index d7595393e77..14e327208f7 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -22,6 +22,8 @@ import org.apache.kafka.common.message.LeaderChangeMessage.Voter; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.ControlRecordUtils; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; import org.apache.kafka.raft.internals.BatchAccumulator; import org.slf4j.Logger; @@ -44,6 +46,7 @@ import java.util.stream.Collectors; */ public class LeaderState<T> implements EpochState { static final long OBSERVER_SESSION_TIMEOUT_MS = 300_000L; + static final double CHECK_QUORUM_TIMEOUT_FACTOR = 1.5; private final int localId; private final int epoch; @@ -55,17 +58,23 @@ public class LeaderState<T> implements EpochState { private final Map<Integer, ReplicaState> observerStates = new HashMap<>(); private final Logger log; private final BatchAccumulator<T> accumulator; + // The set includes all of the followers voters that FETCH or FETCH_SNAPSHOT during the current checkQuorumTimer interval. + private final Set<Integer> fetchedVoters = new HashSet<>(); + private final Timer checkQuorumTimer; + private final int checkQuorumTimeoutMs; // This is volatile because resignation can be requested from an external thread. private volatile boolean resignRequested = false; protected LeaderState( + Time time, int localId, int epoch, long epochStartOffset, Set<Integer> voters, Set<Integer> grantingVoters, BatchAccumulator<T> accumulator, + int fetchTimeoutMs, LogContext logContext ) { this.localId = localId; @@ -79,6 +88,55 @@ public class LeaderState<T> implements EpochState { this.grantingVoters = Collections.unmodifiableSet(new HashSet<>(grantingVoters)); this.log = logContext.logger(LeaderState.class); this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null"); + // use the 1.5x of fetch timeout to tolerate some network transition time or other IO time. + this.checkQuorumTimeoutMs = (int) (fetchTimeoutMs * CHECK_QUORUM_TIMEOUT_FACTOR); + this.checkQuorumTimer = time.timer(checkQuorumTimeoutMs); + } + + /** + * Get the remaining time in milliseconds until the checkQuorumTimer expires. + * This will happen if we didn't receive a valid fetch/fetchSnapshot request from the majority of the voters within checkQuorumTimeoutMs. + * + * @param currentTimeMs the current timestamp in millisecond + * @return the remainingMs before the checkQuorumTimer expired + */ + public long timeUntilCheckQuorumExpires(long currentTimeMs) { + checkQuorumTimer.update(currentTimeMs); + long remainingMs = checkQuorumTimer.remainingMs(); + if (remainingMs == 0) { + log.info( + "Did not receive fetch request from the majority of the voters within {}ms. Current fetched voters are {}.", + checkQuorumTimeoutMs, + fetchedVoters); + } + return remainingMs; + } + + /** + * Reset the checkQuorumTimer if we've received fetch/fetchSnapshot request from the majority of the voter + * + * @param id the node id + * @param currentTimeMs the current timestamp in millisecond + */ + public void updateCheckQuorumForFollowingVoter(int id, long currentTimeMs) { + updateFetchedVoters(id); + // The majority number of the voters excluding the leader. Ex: 3 voters, the value will be 1 + int majority = voterStates.size() / 2; + if (fetchedVoters.size() >= majority) { + fetchedVoters.clear(); + checkQuorumTimer.update(currentTimeMs); + checkQuorumTimer.reset(checkQuorumTimeoutMs); + } + } + + private void updateFetchedVoters(int id) { + if (id == localId) { + throw new IllegalArgumentException("Received a FETCH/FETCH_SNAPSHOT request from the leader itself."); + } + + if (isVoter(id)) { + fetchedVoters.add(id); + } } public BatchAccumulator<T> accumulator() { @@ -287,6 +345,7 @@ public class LeaderState<T> implements EpochState { fetchOffsetMetadata, leaderEndOffsetOpt ); + updateCheckQuorumForFollowingVoter(replicaId, currentTimeMs); return isVoter(state.nodeId) && maybeUpdateHighWatermark(); } diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java index 9115358f52d..3daf85275ba 100644 --- a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java +++ b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java @@ -453,12 +453,14 @@ public class QuorumState { // we typically expect the state machine to be caught up anyway. LeaderState<T> state = new LeaderState<>( + time, localIdOrThrow(), epoch(), epochStartOffset, voters, candidateState.grantingVoters(), accumulator, + fetchTimeoutMs, logContext ); durableTransitionTo(state); diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftConfig.java b/raft/src/main/java/org/apache/kafka/raft/RaftConfig.java index e17de3278d3..5c5865f3581 100644 --- a/raft/src/main/java/org/apache/kafka/raft/RaftConfig.java +++ b/raft/src/main/java/org/apache/kafka/raft/RaftConfig.java @@ -68,8 +68,8 @@ public class RaftConfig { public static final String QUORUM_FETCH_TIMEOUT_MS_CONFIG = QUORUM_PREFIX + "fetch.timeout.ms"; public static final String QUORUM_FETCH_TIMEOUT_MS_DOC = "Maximum time without a successful fetch from " + - "the current leader before becoming a candidate and triggering an election for voters; Maximum time without " + - "receiving fetch from a majority of the quorum before asking around to see if there's a new epoch for leader."; + "the current leader before becoming a candidate and triggering an election for voters; Maximum time " + + "a leader can go without receiving valid fetch or fetchSnapshot request from a majority of the quorum before resigning."; public static final int DEFAULT_QUORUM_FETCH_TIMEOUT_MS = 2_000; public static final String QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG = QUORUM_PREFIX + "election.backoff.max.ms"; diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java index f3d7012db92..44d39880aa0 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java @@ -51,6 +51,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -701,6 +702,97 @@ final public class KafkaRaftClientSnapshotTest { assertEquals(memoryRecords.buffer(), ((UnalignedMemoryRecords) response.unalignedRecords()).buffer()); } + @Test + public void testLeaderShouldResignLeadershipIfNotGetFetchSnapshotRequestFromMajorityVoters() throws Exception { + int localId = 0; + int voter1 = 1; + int voter2 = 2; + int observerId3 = 3; + Set<Integer> voters = Utils.mkSet(localId, voter1, voter2); + OffsetAndEpoch snapshotId = new OffsetAndEpoch(1, 1); + List<String> records = Arrays.asList("foo", "bar"); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + .appendToLog(snapshotId.epoch(), Arrays.asList("a")) + .build(); + + int resignLeadershipTimeout = context.checkQuorumTimeoutMs; + context.becomeLeader(); + int epoch = context.currentEpoch(); + + FetchSnapshotRequestData voter1FetchSnapshotRequest = fetchSnapshotRequest( + context.clusterId.toString(), + voter1, + context.metadataPartition, + epoch, + snapshotId, + Integer.MAX_VALUE, + 0 + ); + + FetchSnapshotRequestData voter2FetchSnapshotRequest = fetchSnapshotRequest( + context.clusterId.toString(), + voter2, + context.metadataPartition, + epoch, + snapshotId, + Integer.MAX_VALUE, + 0 + ); + + FetchSnapshotRequestData observerFetchSnapshotRequest = fetchSnapshotRequest( + context.clusterId.toString(), + observerId3, + context.metadataPartition, + epoch, + snapshotId, + Integer.MAX_VALUE, + 0 + ); + + context.advanceLocalLeaderHighWatermarkToLogEndOffset(); + try (SnapshotWriter<String> snapshot = context.client.createSnapshot(snapshotId, 0).get()) { + assertEquals(snapshotId, snapshot.snapshotId()); + snapshot.append(records); + snapshot.freeze(); + } + + // fetch timeout is not expired, the leader should not get resigned + context.time.sleep(resignLeadershipTimeout / 2); + context.client.poll(); + assertFalse(context.client.quorum().isResigned()); + + // voter1 sends fetchSnapshotRequest, the fetch timer should be reset + context.deliverRequest(voter1FetchSnapshotRequest); + context.client.poll(); + context.assertSentFetchSnapshotResponse(context.metadataPartition); + + // Since the fetch timer is reset, the leader should not get resigned + context.time.sleep(resignLeadershipTimeout / 2); + context.client.poll(); + assertFalse(context.client.quorum().isResigned()); + + // voter2 sends fetchSnapshotRequest, the fetch timer should be reset + context.deliverRequest(voter2FetchSnapshotRequest); + context.client.poll(); + context.assertSentFetchSnapshotResponse(context.metadataPartition); + + // Since the fetch timer is reset, the leader should not get resigned + context.time.sleep(resignLeadershipTimeout / 2); + context.client.poll(); + assertFalse(context.client.quorum().isResigned()); + + // An observer sends fetchSnapshotRequest, but the fetch timer should not be reset. + context.deliverRequest(observerFetchSnapshotRequest); + context.client.poll(); + context.assertSentFetchSnapshotResponse(context.metadataPartition); + + // After this sleep, the fetch timeout should expire since we don't receive fetch request from the majority voters within fetchTimeoutMs + context.time.sleep(resignLeadershipTimeout / 2); + context.client.poll(); + assertTrue(context.client.quorum().isResigned()); + } + @Test public void testPartialFetchSnapshotRequestAsLeader() throws Exception { int localId = 0; @@ -1589,6 +1681,7 @@ final public class KafkaRaftClientSnapshotTest { context.deliverRequest( fetchSnapshotRequest( context.clusterId.toString(), + otherNodeId, context.metadataPartition, epoch, new OffsetAndEpoch(0, 0), @@ -1603,6 +1696,7 @@ final public class KafkaRaftClientSnapshotTest { context.deliverRequest( fetchSnapshotRequest( null, + otherNodeId, context.metadataPartition, epoch, new OffsetAndEpoch(0, 0), @@ -1617,6 +1711,7 @@ final public class KafkaRaftClientSnapshotTest { context.deliverRequest( fetchSnapshotRequest( "", + otherNodeId, context.metadataPartition, epoch, new OffsetAndEpoch(0, 0), @@ -1631,6 +1726,7 @@ final public class KafkaRaftClientSnapshotTest { context.deliverRequest( fetchSnapshotRequest( "invalid-uuid", + otherNodeId, context.metadataPartition, epoch, new OffsetAndEpoch(0, 0), @@ -1756,11 +1852,12 @@ final public class KafkaRaftClientSnapshotTest { int maxBytes, long position ) { - return fetchSnapshotRequest(null, topicPartition, epoch, offsetAndEpoch, maxBytes, position); + return fetchSnapshotRequest(null, -1, topicPartition, epoch, offsetAndEpoch, maxBytes, position); } private static FetchSnapshotRequestData fetchSnapshotRequest( String clusterId, + int replicaId, TopicPartition topicPartition, int epoch, OffsetAndEpoch offsetAndEpoch, @@ -1773,6 +1870,7 @@ final public class KafkaRaftClientSnapshotTest { FetchSnapshotRequestData request = FetchSnapshotRequest.singleton( clusterId, + replicaId, topicPartition, snapshotPartition -> { return snapshotPartition diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index d71874a5336..6d714daa11b 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -485,6 +485,60 @@ public class KafkaRaftClientTest { context.listener.currentLeaderAndEpoch()); } + @Test + public void testLeaderShouldResignLeadershipIfNotGetFetchRequestFromMajorityVoters() throws Exception { + int localId = 0; + int remoteId1 = 1; + int remoteId2 = 2; + int observerId3 = 3; + Set<Integer> voters = Utils.mkSet(localId, remoteId1, remoteId2); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build(); + int resignLeadershipTimeout = context.checkQuorumTimeoutMs; + + context.becomeLeader(); + int epoch = context.currentEpoch(); + assertEquals(OptionalInt.of(localId), context.currentLeader()); + + // fetch timeout is not expired, the leader should not get resigned + context.time.sleep(resignLeadershipTimeout / 2); + context.client.poll(); + + assertFalse(context.client.quorum().isResigned()); + + // Received fetch request from a voter, the fetch timer should be reset. + context.deliverRequest(context.fetchRequest(epoch, remoteId1, 0, 0, 0)); + context.pollUntilRequest(); + + // Since the fetch timer is reset, the leader should not get resigned + context.time.sleep(resignLeadershipTimeout / 2); + context.client.poll(); + + assertFalse(context.client.quorum().isResigned()); + + // Received fetch request from another voter, the fetch timer should be reset. + context.deliverRequest(context.fetchRequest(epoch, remoteId2, 0, 0, 0)); + context.pollUntilRequest(); + + // Since the fetch timer is reset, the leader should not get resigned + context.time.sleep(resignLeadershipTimeout / 2); + context.client.poll(); + + assertFalse(context.client.quorum().isResigned()); + + // Received fetch request from an observer, but the fetch timer should not be reset. + context.deliverRequest(context.fetchRequest(epoch, observerId3, 0, 0, 0)); + context.pollUntilRequest(); + + // After this sleep, the fetch timeout should expire since we don't receive fetch request from the majority voters within fetchTimeoutMs + context.time.sleep(resignLeadershipTimeout / 2); + context.client.poll(); + + // The leadership should get resigned now + assertTrue(context.client.quorum().isResigned()); + context.assertResignedLeader(epoch, localId); + } + @Test public void testElectionTimeoutAfterUserInitiatedResign() throws Exception { int localId = 0; diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java index bb44fea2ac0..97d290fa347 100644 --- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java @@ -36,6 +36,7 @@ import java.util.Set; import static java.util.Collections.emptySet; import static java.util.Collections.singleton; import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.apache.kafka.raft.LeaderState.CHECK_QUORUM_TIMEOUT_FACTOR; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -45,20 +46,24 @@ public class LeaderStateTest { private final int localId = 0; private final int epoch = 5; private final LogContext logContext = new LogContext(); - private final BatchAccumulator<?> accumulator = Mockito.mock(BatchAccumulator.class); + private final MockTime time = new MockTime(); + private final int fetchTimeoutMs = 2000; + private final int checkQuorumTimeoutMs = (int) (fetchTimeoutMs * CHECK_QUORUM_TIMEOUT_FACTOR); private LeaderState<?> newLeaderState( Set<Integer> voters, long epochStartOffset ) { return new LeaderState<>( + time, localId, epoch, epochStartOffset, voters, voters, accumulator, + fetchTimeoutMs, logContext ); } @@ -66,12 +71,14 @@ public class LeaderStateTest { @Test public void testRequireNonNullAccumulator() { assertThrows(NullPointerException.class, () -> new LeaderState<>( + new MockTime(), localId, epoch, 0, Collections.emptySet(), Collections.emptySet(), null, + fetchTimeoutMs, logContext )); } @@ -447,6 +454,41 @@ public class LeaderStateTest { observerState); } + @Test + public void testCheckQuorum() { + int node1 = 1; + int node2 = 2; + int node3 = 3; + int node4 = 4; + int observer5 = 5; + LeaderState<?> state = newLeaderState(mkSet(localId, node1, node2, node3, node4), 0L); + assertEquals(checkQuorumTimeoutMs, state.timeUntilCheckQuorumExpires(time.milliseconds())); + int resignLeadershipTimeout = checkQuorumTimeoutMs; + + // checkQuorum timeout not exceeded, should not expire the timer + time.sleep(resignLeadershipTimeout / 2); + assertTrue(state.timeUntilCheckQuorumExpires(time.milliseconds()) > 0); + + // received fetch requests from 2 voter nodes, the timer should be reset + state.updateCheckQuorumForFollowingVoter(node1, time.milliseconds()); + state.updateCheckQuorumForFollowingVoter(node2, time.milliseconds()); + assertEquals(checkQuorumTimeoutMs, state.timeUntilCheckQuorumExpires(time.milliseconds())); + + // Since the timer was reset, it won't expire this time. + time.sleep(resignLeadershipTimeout / 2); + long remainingMs = state.timeUntilCheckQuorumExpires(time.milliseconds()); + assertTrue(remainingMs > 0); + + // received fetch requests from 1 voter and 1 observer nodes, the timer should not be reset. + state.updateCheckQuorumForFollowingVoter(node3, time.milliseconds()); + state.updateCheckQuorumForFollowingVoter(observer5, time.milliseconds()); + assertEquals(remainingMs, state.timeUntilCheckQuorumExpires(time.milliseconds())); + + // This time, the checkQuorum timer will be expired + time.sleep(resignLeadershipTimeout / 2); + assertEquals(0, state.timeUntilCheckQuorumExpires(time.milliseconds())); + } + @Test public void testNoOpForNegativeRemoteNodeId() { MockTime time = new MockTime(); diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index 9d798453d0c..68241ae70b1 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -78,6 +78,7 @@ import java.util.Set; import java.util.function.Consumer; import java.util.stream.Collectors; +import static org.apache.kafka.raft.LeaderState.CHECK_QUORUM_TIMEOUT_FACTOR; import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -91,6 +92,7 @@ public final class RaftClientTestContext { final int electionBackoffMaxMs = Builder.ELECTION_BACKOFF_MAX_MS; final int fetchMaxWaitMs = Builder.FETCH_MAX_WAIT_MS; final int fetchTimeoutMs = Builder.FETCH_TIMEOUT_MS; + final int checkQuorumTimeoutMs = (int) (fetchTimeoutMs * CHECK_QUORUM_TIMEOUT_FACTOR); final int retryBackoffMs = Builder.RETRY_BACKOFF_MS; private int electionTimeoutMs;