jsancio commented on a change in pull request #9476: URL: https://github.com/apache/kafka/pull/9476#discussion_r509840265
########## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java ########## @@ -90,470 +76,480 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class KafkaRaftClientTest { - private static final TopicPartition METADATA_PARTITION = new TopicPartition("metadata", 0); - - private final int localId = 0; - private final int electionTimeoutMs = 10000; - private final int electionBackoffMaxMs = 100; - private final int fetchTimeoutMs = 50000; // fetch timeout is usually larger than election timeout - private final int retryBackoffMs = 50; - private final int requestTimeoutMs = 5000; - private final int fetchMaxWaitMs = 0; - - private final MockTime time = new MockTime(); - private final MockLog log = new MockLog(METADATA_PARTITION); - private final MockNetworkChannel channel = new MockNetworkChannel(); - private final Random random = Mockito.spy(new Random(1)); - private final QuorumStateStore quorumStateStore = new MockQuorumStateStore(); - - @AfterEach - public void cleanUp() throws IOException { - quorumStateStore.clear(); - } - - private InetSocketAddress mockAddress(int id) { - return new InetSocketAddress("localhost", 9990 + id); - } - - private KafkaRaftClient buildClient(Set<Integer> voters) throws IOException { - return buildClient(voters, new Metrics(time)); - } - - private KafkaRaftClient buildClient(Set<Integer> voters, Metrics metrics) throws IOException { - LogContext logContext = new LogContext(); - QuorumState quorum = new QuorumState(localId, voters, electionTimeoutMs, fetchTimeoutMs, - quorumStateStore, time, logContext, random); - - Map<Integer, InetSocketAddress> voterAddresses = voters.stream().collect(Collectors.toMap( - Function.identity(), - this::mockAddress - )); - - KafkaRaftClient client = new KafkaRaftClient(channel, log, quorum, time, metrics, - new MockFuturePurgatory<>(time), new MockFuturePurgatory<>(time), voterAddresses, - electionBackoffMaxMs, retryBackoffMs, requestTimeoutMs, fetchMaxWaitMs, logContext, random); - - client.initialize(); - - return client; - } - @Test public void testInitializeSingleMemberQuorum() throws IOException { - buildClient(Collections.singleton(localId)); - assertEquals(ElectionState.withElectedLeader(1, localId, Collections.singleton(localId)), - quorumStateStore.readElectionState()); + RaftClientTestContext context = RaftClientTestContext.build(Collections.singleton(LOCAL_ID)); + assertEquals( + ElectionState.withElectedLeader(1, LOCAL_ID, Collections.singleton(LOCAL_ID)), + context.quorumStateStore.readElectionState() + ); } @Test public void testInitializeAsLeaderFromStateStoreSingleMemberQuorum() throws Exception { // Start off as leader. We should still bump the epoch after initialization int initialEpoch = 2; - Set<Integer> voters = Collections.singleton(localId); - quorumStateStore.writeElectionState(ElectionState.withElectedLeader(initialEpoch, localId, voters)); - - KafkaRaftClient client = buildClient(voters); - assertEquals(1L, log.endOffset().offset); - assertEquals(initialEpoch + 1, log.lastFetchedEpoch()); - assertEquals(new LeaderAndEpoch(OptionalInt.of(localId), initialEpoch + 1), - client.currentLeaderAndEpoch()); - assertEquals(ElectionState.withElectedLeader(initialEpoch + 1, localId, voters), - quorumStateStore.readElectionState()); + Set<Integer> voters = Collections.singleton(LOCAL_ID); + RaftClientTestContext context = new RaftClientTestContext.Builder() + .updateQuorumStateStore(quorumStateStore -> { + assertDoesNotThrow(() -> { + quorumStateStore.writeElectionState( + ElectionState.withElectedLeader(initialEpoch, LOCAL_ID, voters) + ); + }); + }) + .build(voters); + + assertEquals(1L, context.log.endOffset().offset); + assertEquals(initialEpoch + 1, context.log.lastFetchedEpoch()); + assertEquals(new LeaderAndEpoch(OptionalInt.of(LOCAL_ID), initialEpoch + 1), + context.client.currentLeaderAndEpoch()); + assertEquals(ElectionState.withElectedLeader(initialEpoch + 1, LOCAL_ID, voters), + context.quorumStateStore.readElectionState()); } @Test public void testInitializeAsLeaderFromStateStore() throws Exception { - Set<Integer> voters = Utils.mkSet(localId, 1); + Set<Integer> voters = Utils.mkSet(LOCAL_ID, 1); int epoch = 2; - Mockito.doReturn(0).when(random).nextInt(electionTimeoutMs); - quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, localId, voters)); - KafkaRaftClient client = buildClient(voters); - assertEquals(0L, log.endOffset().offset); - assertEquals(ElectionState.withUnknownLeader(epoch, voters), quorumStateStore.readElectionState()); + RaftClientTestContext context = new RaftClientTestContext.Builder() + .updateRandom(random -> { + Mockito.doReturn(0).when(random).nextInt(RaftClientTestContext.ELECTION_TIMEOUT_MS); + }) + .updateQuorumStateStore(quorumStateStore -> { + assertDoesNotThrow(() -> { + quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, LOCAL_ID, voters)); + }); + }) + .build(voters); + - time.sleep(electionTimeoutMs); - pollUntilSend(client); - assertSentVoteRequest(epoch + 1, 0, 0L); + assertEquals(0L, context.log.endOffset().offset); + assertEquals(ElectionState.withUnknownLeader(epoch, voters), context.quorumStateStore.readElectionState()); + + context.time.sleep(RaftClientTestContext.ELECTION_TIMEOUT_MS); + context.pollUntilSend(); + context.assertSentVoteRequest(epoch + 1, 0, 0L); } @Test public void testInitializeAsCandidateFromStateStore() throws Exception { // Need 3 node to require a 2-node majority - Set<Integer> voters = Utils.mkSet(localId, 1, 2); - quorumStateStore.writeElectionState(ElectionState.withVotedCandidate(2, localId, voters)); + Set<Integer> voters = Utils.mkSet(LOCAL_ID, 1, 2); + + RaftClientTestContext context = new RaftClientTestContext.Builder() + .updateQuorumStateStore(quorumStateStore -> { + assertDoesNotThrow(() -> { + quorumStateStore.writeElectionState(ElectionState.withVotedCandidate(2, LOCAL_ID, voters)); + }); + }) + .build(voters); - KafkaRaftClient client = buildClient(voters); - assertEquals(0L, log.endOffset().offset); + assertEquals(0L, context.log.endOffset().offset); // Send out vote requests. - client.poll(); + context.client.poll(); - List<RaftRequest.Outbound> voteRequests = collectVoteRequests(2, 0, 0); + List<RaftRequest.Outbound> voteRequests = context.collectVoteRequests(2, 0, 0); assertEquals(2, voteRequests.size()); } @Test public void testInitializeAsCandidateAndBecomeLeader() throws Exception { final int otherNodeId = 1; - Set<Integer> voters = Utils.mkSet(localId, otherNodeId); - KafkaRaftClient client = buildClient(voters); + Set<Integer> voters = Utils.mkSet(LOCAL_ID, otherNodeId); + RaftClientTestContext context = RaftClientTestContext.build(voters); - assertEquals(ElectionState.withUnknownLeader(0, voters), quorumStateStore.readElectionState()); - time.sleep(2 * electionTimeoutMs); + assertEquals(ElectionState.withUnknownLeader(0, voters), context.quorumStateStore.readElectionState()); + context.time.sleep(2 * RaftClientTestContext.ELECTION_TIMEOUT_MS); - pollUntilSend(client); - assertEquals(ElectionState.withVotedCandidate(1, localId, voters), quorumStateStore.readElectionState()); + context.pollUntilSend(); + assertEquals(ElectionState.withVotedCandidate(1, LOCAL_ID, voters), context.quorumStateStore.readElectionState()); - int correlationId = assertSentVoteRequest(1, 0, 0L); - deliverResponse(correlationId, otherNodeId, voteResponse(true, Optional.empty(), 1)); + int correlationId = context.assertSentVoteRequest(1, 0, 0L); + context.deliverResponse(correlationId, otherNodeId, RaftClientTestContext.voteResponse(true, Optional.empty(), 1)); // Become leader after receiving the vote - client.poll(); - assertEquals(ElectionState.withElectedLeader(1, localId, voters), quorumStateStore.readElectionState()); - long electionTimestamp = time.milliseconds(); + context.client.poll(); + assertEquals(ElectionState.withElectedLeader(1, LOCAL_ID, voters), context.quorumStateStore.readElectionState()); + long electionTimestamp = context.time.milliseconds(); // Leader change record appended - assertEquals(1L, log.endOffset().offset); - assertEquals(1L, log.lastFlushedOffset()); + assertEquals(1L, context.log.endOffset().offset); + assertEquals(1L, context.log.lastFlushedOffset()); // Send BeginQuorumEpoch to voters - client.poll(); - assertSentBeginQuorumEpochRequest(1); + context.client.poll(); + context.assertSentBeginQuorumEpochRequest(1); - Records records = log.read(0, Isolation.UNCOMMITTED).records; + Records records = context.log.read(0, Isolation.UNCOMMITTED).records; RecordBatch batch = records.batches().iterator().next(); assertTrue(batch.isControlBatch()); Record record = batch.iterator().next(); assertEquals(electionTimestamp, record.timestamp()); - verifyLeaderChangeMessage(localId, Collections.singletonList(otherNodeId), - record.key(), record.value()); + RaftClientTestContext.verifyLeaderChangeMessage(LOCAL_ID, Collections.singletonList(otherNodeId), record.key(), record.value()); Review comment: @hachikuji is this a bug? Shouldn't the leader (LOCAL_ID) always be a voter (the second argument for this function)? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org