jsancio commented on a change in pull request #9476:
URL: https://github.com/apache/kafka/pull/9476#discussion_r509840834



##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -90,470 +76,480 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class KafkaRaftClientTest {
-    private static final TopicPartition METADATA_PARTITION = new 
TopicPartition("metadata", 0);
-
-    private final int localId = 0;
-    private final int electionTimeoutMs = 10000;
-    private final int electionBackoffMaxMs = 100;
-    private final int fetchTimeoutMs = 50000;   // fetch timeout is usually 
larger than election timeout
-    private final int retryBackoffMs = 50;
-    private final int requestTimeoutMs = 5000;
-    private final int fetchMaxWaitMs = 0;
-
-    private final MockTime time = new MockTime();
-    private final MockLog log = new MockLog(METADATA_PARTITION);
-    private final MockNetworkChannel channel = new MockNetworkChannel();
-    private final Random random = Mockito.spy(new Random(1));
-    private final QuorumStateStore quorumStateStore = new 
MockQuorumStateStore();
-
-    @AfterEach
-    public void cleanUp() throws IOException {
-        quorumStateStore.clear();
-    }
-
-    private InetSocketAddress mockAddress(int id) {
-        return new InetSocketAddress("localhost", 9990 + id);
-    }
-
-    private KafkaRaftClient buildClient(Set<Integer> voters) throws 
IOException {
-        return buildClient(voters, new Metrics(time));
-    }
-
-    private KafkaRaftClient buildClient(Set<Integer> voters, Metrics metrics) 
throws IOException {
-        LogContext logContext = new LogContext();
-        QuorumState quorum = new QuorumState(localId, voters, 
electionTimeoutMs, fetchTimeoutMs,
-            quorumStateStore, time, logContext, random);
-
-        Map<Integer, InetSocketAddress> voterAddresses = 
voters.stream().collect(Collectors.toMap(
-            Function.identity(),
-            this::mockAddress
-        ));
-
-        KafkaRaftClient client = new KafkaRaftClient(channel, log, quorum, 
time, metrics,
-            new MockFuturePurgatory<>(time), new MockFuturePurgatory<>(time), 
voterAddresses,
-            electionBackoffMaxMs, retryBackoffMs, requestTimeoutMs, 
fetchMaxWaitMs, logContext, random);
-
-        client.initialize();
-
-        return client;
-    }
-
     @Test
     public void testInitializeSingleMemberQuorum() throws IOException {
-        buildClient(Collections.singleton(localId));
-        assertEquals(ElectionState.withElectedLeader(1, localId, 
Collections.singleton(localId)),
-            quorumStateStore.readElectionState());
+        RaftClientTestContext context = 
RaftClientTestContext.build(Collections.singleton(LOCAL_ID));
+        assertEquals(
+            ElectionState.withElectedLeader(1, LOCAL_ID, 
Collections.singleton(LOCAL_ID)),
+            context.quorumStateStore.readElectionState()
+        );
     }
 
     @Test
     public void testInitializeAsLeaderFromStateStoreSingleMemberQuorum() 
throws Exception {
         // Start off as leader. We should still bump the epoch after 
initialization
 
         int initialEpoch = 2;
-        Set<Integer> voters = Collections.singleton(localId);
-        
quorumStateStore.writeElectionState(ElectionState.withElectedLeader(initialEpoch,
 localId, voters));
-
-        KafkaRaftClient client = buildClient(voters);
-        assertEquals(1L, log.endOffset().offset);
-        assertEquals(initialEpoch + 1, log.lastFetchedEpoch());
-        assertEquals(new LeaderAndEpoch(OptionalInt.of(localId), initialEpoch 
+ 1),
-            client.currentLeaderAndEpoch());
-        assertEquals(ElectionState.withElectedLeader(initialEpoch + 1, 
localId, voters),
-            quorumStateStore.readElectionState());
+        Set<Integer> voters = Collections.singleton(LOCAL_ID);
+        RaftClientTestContext context = new RaftClientTestContext.Builder()
+            .updateQuorumStateStore(quorumStateStore -> {
+                assertDoesNotThrow(() -> {
+                    quorumStateStore.writeElectionState(
+                        ElectionState.withElectedLeader(initialEpoch, 
LOCAL_ID, voters)
+                    );
+                });
+            })
+            .build(voters);
+
+        assertEquals(1L, context.log.endOffset().offset);
+        assertEquals(initialEpoch + 1, context.log.lastFetchedEpoch());
+        assertEquals(new LeaderAndEpoch(OptionalInt.of(LOCAL_ID), initialEpoch 
+ 1),
+            context.client.currentLeaderAndEpoch());
+        assertEquals(ElectionState.withElectedLeader(initialEpoch + 1, 
LOCAL_ID, voters),
+            context.quorumStateStore.readElectionState());
     }
 
     @Test
     public void testInitializeAsLeaderFromStateStore() throws Exception {
-        Set<Integer> voters = Utils.mkSet(localId, 1);
+        Set<Integer> voters = Utils.mkSet(LOCAL_ID, 1);
         int epoch = 2;
 
-        Mockito.doReturn(0).when(random).nextInt(electionTimeoutMs);
-        
quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, 
localId, voters));
-        KafkaRaftClient client = buildClient(voters);
-        assertEquals(0L, log.endOffset().offset);
-        assertEquals(ElectionState.withUnknownLeader(epoch, voters), 
quorumStateStore.readElectionState());
+        RaftClientTestContext context = new RaftClientTestContext.Builder()
+            .updateRandom(random -> {
+                
Mockito.doReturn(0).when(random).nextInt(RaftClientTestContext.ELECTION_TIMEOUT_MS);
+            })
+            .updateQuorumStateStore(quorumStateStore -> {
+                assertDoesNotThrow(() -> {
+                    
quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, 
LOCAL_ID, voters));
+                });
+            })
+            .build(voters);
+
 
-        time.sleep(electionTimeoutMs);
-        pollUntilSend(client);
-        assertSentVoteRequest(epoch + 1, 0, 0L);
+        assertEquals(0L, context.log.endOffset().offset);
+        assertEquals(ElectionState.withUnknownLeader(epoch, voters), 
context.quorumStateStore.readElectionState());
+
+        context.time.sleep(RaftClientTestContext.ELECTION_TIMEOUT_MS);
+        context.pollUntilSend();
+        context.assertSentVoteRequest(epoch + 1, 0, 0L);
     }
 
     @Test
     public void testInitializeAsCandidateFromStateStore() throws Exception {
         // Need 3 node to require a 2-node majority
-        Set<Integer> voters = Utils.mkSet(localId, 1, 2);
-        
quorumStateStore.writeElectionState(ElectionState.withVotedCandidate(2, 
localId, voters));
+        Set<Integer> voters = Utils.mkSet(LOCAL_ID, 1, 2);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder()
+            .updateQuorumStateStore(quorumStateStore -> {
+                assertDoesNotThrow(() -> {
+                    
quorumStateStore.writeElectionState(ElectionState.withVotedCandidate(2, 
LOCAL_ID, voters));
+                });
+            })
+            .build(voters);
 
-        KafkaRaftClient client = buildClient(voters);
-        assertEquals(0L, log.endOffset().offset);
+        assertEquals(0L, context.log.endOffset().offset);
 
         // Send out vote requests.
-        client.poll();
+        context.client.poll();
 
-        List<RaftRequest.Outbound> voteRequests = collectVoteRequests(2, 0, 0);
+        List<RaftRequest.Outbound> voteRequests = 
context.collectVoteRequests(2, 0, 0);
         assertEquals(2, voteRequests.size());
     }
 
     @Test
     public void testInitializeAsCandidateAndBecomeLeader() throws Exception {
         final int otherNodeId = 1;
-        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
-        KafkaRaftClient client = buildClient(voters);
+        Set<Integer> voters = Utils.mkSet(LOCAL_ID, otherNodeId);
+        RaftClientTestContext context = RaftClientTestContext.build(voters);
 
-        assertEquals(ElectionState.withUnknownLeader(0, voters), 
quorumStateStore.readElectionState());
-        time.sleep(2 * electionTimeoutMs);
+        assertEquals(ElectionState.withUnknownLeader(0, voters), 
context.quorumStateStore.readElectionState());
+        context.time.sleep(2 * RaftClientTestContext.ELECTION_TIMEOUT_MS);
 
-        pollUntilSend(client);
-        assertEquals(ElectionState.withVotedCandidate(1, localId, voters), 
quorumStateStore.readElectionState());
+        context.pollUntilSend();
+        assertEquals(ElectionState.withVotedCandidate(1, LOCAL_ID, voters), 
context.quorumStateStore.readElectionState());
 
-        int correlationId = assertSentVoteRequest(1, 0, 0L);
-        deliverResponse(correlationId, otherNodeId, voteResponse(true, 
Optional.empty(), 1));
+        int correlationId = context.assertSentVoteRequest(1, 0, 0L);
+        context.deliverResponse(correlationId, otherNodeId, 
RaftClientTestContext.voteResponse(true, Optional.empty(), 1));
 
         // Become leader after receiving the vote
-        client.poll();
-        assertEquals(ElectionState.withElectedLeader(1, localId, voters), 
quorumStateStore.readElectionState());
-        long electionTimestamp = time.milliseconds();
+        context.client.poll();
+        assertEquals(ElectionState.withElectedLeader(1, LOCAL_ID, voters), 
context.quorumStateStore.readElectionState());
+        long electionTimestamp = context.time.milliseconds();
 
         // Leader change record appended
-        assertEquals(1L, log.endOffset().offset);
-        assertEquals(1L, log.lastFlushedOffset());
+        assertEquals(1L, context.log.endOffset().offset);
+        assertEquals(1L, context.log.lastFlushedOffset());
 
         // Send BeginQuorumEpoch to voters
-        client.poll();
-        assertSentBeginQuorumEpochRequest(1);
+        context.client.poll();
+        context.assertSentBeginQuorumEpochRequest(1);
 
-        Records records = log.read(0, Isolation.UNCOMMITTED).records;
+        Records records = context.log.read(0, Isolation.UNCOMMITTED).records;
         RecordBatch batch = records.batches().iterator().next();
         assertTrue(batch.isControlBatch());
 
         Record record = batch.iterator().next();
         assertEquals(electionTimestamp, record.timestamp());
-        verifyLeaderChangeMessage(localId, 
Collections.singletonList(otherNodeId),
-            record.key(), record.value());
+        RaftClientTestContext.verifyLeaderChangeMessage(LOCAL_ID, 
Collections.singletonList(otherNodeId), record.key(), record.value());

Review comment:
       This is what the LeaderChangeMessage says:
   ```
         {"name": "VotedIds", "type": "[]int32", "versions": "0+",
          "about": "The IDs of the voters who voted for the current leader"},
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to