hachikuji commented on a change in pull request #9476:
URL: https://github.com/apache/kafka/pull/9476#discussion_r509760723



##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -1729,126 +1695,156 @@ public void testLeaderGracefulShutdownTimeout() 
throws Exception {
     public void testFollowerGracefulShutdown() throws Exception {
         int otherNodeId = 1;
         int epoch = 5;
+        Set<Integer> voters = Utils.mkSet(LOCAL_ID, otherNodeId);
 
-        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
-        
quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, 
otherNodeId, voters));
-        KafkaRaftClient client = buildClient(voters);
-        assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, 
voters), quorumStateStore.readElectionState());
+        RaftClientTestContext context = new RaftClientTestContext.Builder()
+            .updateQuorumStateStore(quorumStateStore -> {

Review comment:
       I think nearly every call to `updateQuorumStateStore` is just writing an 
initial state. Seems like we can introduce a more direct option to the builder.
   
   By the way, one of the annoyances is needing to provide `voters` through the 
initial state and through `build` below. Since we always need `voters`, maybe 
we can provide it in the builder constructor. That would allow us to add 
helpers to construct the state. For example, we could turn this into:
   
   ```java
   new RaftClientTestContext.Builder(voters)
     .initializeAsFollower(epoch, otherNodeId)
     .build()
   ```
   
   Similarly, we could probably do state assertions in the test context as well 
and save the need to always pass through `voters` (e.g. we could have 
`context.assertFollower(epoch, leaderId)` instead of the cumbersome 
`assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, voters), 
context.quorumStateStore.readElectionState())`).

##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -1729,126 +1695,156 @@ public void testLeaderGracefulShutdownTimeout() 
throws Exception {
     public void testFollowerGracefulShutdown() throws Exception {
         int otherNodeId = 1;
         int epoch = 5;
+        Set<Integer> voters = Utils.mkSet(LOCAL_ID, otherNodeId);
 
-        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
-        
quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, 
otherNodeId, voters));
-        KafkaRaftClient client = buildClient(voters);
-        assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, 
voters), quorumStateStore.readElectionState());
+        RaftClientTestContext context = new RaftClientTestContext.Builder()
+            .updateQuorumStateStore(quorumStateStore -> {

Review comment:
       I think just about every call to `updateQuorumStateStore` is just 
writing an initial state. Seems like we can introduce a more direct option to 
the builder.
   
   By the way, one of the annoyances is needing to provide `voters` through the 
initial state and through `build` below. Since we always need `voters`, maybe 
we can provide it in the builder constructor. That would allow us to add 
helpers to construct the state. For example, we could turn this into:
   
   ```java
   new RaftClientTestContext.Builder(voters)
     .initializeAsFollower(epoch, otherNodeId)
     .build()
   ```

##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -1729,126 +1695,156 @@ public void testLeaderGracefulShutdownTimeout() 
throws Exception {
     public void testFollowerGracefulShutdown() throws Exception {
         int otherNodeId = 1;
         int epoch = 5;
+        Set<Integer> voters = Utils.mkSet(LOCAL_ID, otherNodeId);
 
-        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
-        
quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, 
otherNodeId, voters));
-        KafkaRaftClient client = buildClient(voters);
-        assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, 
voters), quorumStateStore.readElectionState());
+        RaftClientTestContext context = new RaftClientTestContext.Builder()
+            .updateQuorumStateStore(quorumStateStore -> {
+                assertDoesNotThrow(() -> {
+                    
quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, 
otherNodeId, voters));
+                });
+            })
+            .build(voters);
 
-        client.poll();
+        assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, 
voters), context.quorumStateStore.readElectionState());
+
+        context.client.poll();
 
         int shutdownTimeoutMs = 5000;
-        CompletableFuture<Void> shutdownFuture = 
client.shutdown(shutdownTimeoutMs);
-        assertTrue(client.isRunning());
+        CompletableFuture<Void> shutdownFuture = 
context.client.shutdown(shutdownTimeoutMs);
+        assertTrue(context.client.isRunning());
         assertFalse(shutdownFuture.isDone());
 
-        client.poll();
-        assertFalse(client.isRunning());
+        context.client.poll();
+        assertFalse(context.client.isRunning());
         assertTrue(shutdownFuture.isDone());
         assertNull(shutdownFuture.get());
     }
 
     @Test
     public void testGracefulShutdownSingleMemberQuorum() throws IOException {
-        KafkaRaftClient client = buildClient(Collections.singleton(localId));
+        RaftClientTestContext context = 
RaftClientTestContext.build(Collections.singleton(LOCAL_ID));
+
         assertEquals(ElectionState.withElectedLeader(
-            1, localId, Collections.singleton(localId)), 
quorumStateStore.readElectionState());
-        client.poll();
-        assertEquals(0, channel.drainSendQueue().size());
+            1, LOCAL_ID, Collections.singleton(LOCAL_ID)), 
context.quorumStateStore.readElectionState());
+        context.client.poll();
+        assertEquals(0, context.channel.drainSendQueue().size());
         int shutdownTimeoutMs = 5000;
-        client.shutdown(shutdownTimeoutMs);
-        assertTrue(client.isRunning());
-        client.poll();
-        assertFalse(client.isRunning());
+        context.client.shutdown(shutdownTimeoutMs);
+        assertTrue(context.client.isRunning());
+        context.client.poll();
+        assertFalse(context.client.isRunning());
     }
 
     @Test
     public void testFollowerReplication() throws Exception {
         int otherNodeId = 1;
         int epoch = 5;
-        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
-        
quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, 
otherNodeId, voters));
-        KafkaRaftClient client = buildClient(voters);
-        assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, 
voters), quorumStateStore.readElectionState());
+        Set<Integer> voters = Utils.mkSet(LOCAL_ID, otherNodeId);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder()
+            .updateQuorumStateStore(quorumStateStore -> {
+                assertDoesNotThrow(() -> {
+                    
quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, 
otherNodeId, voters));
+                });
+            })
+            .build(voters);
+
+        assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, 
voters), context.quorumStateStore.readElectionState());
 
-        pollUntilSend(client);
+        context.pollUntilSend();
 
-        int fetchQuorumCorrelationId = assertSentFetchRequest(epoch, 0L, 0);
+        int fetchQuorumCorrelationId = context.assertSentFetchRequest(epoch, 
0L, 0);
         Records records = MemoryRecords.withRecords(0L, CompressionType.NONE,
             3, new SimpleRecord("a".getBytes()), new 
SimpleRecord("b".getBytes()));
         FetchResponseData response = fetchResponse(epoch, otherNodeId, 
records, 0L, Errors.NONE);
-        deliverResponse(fetchQuorumCorrelationId, otherNodeId, response);
+        context.deliverResponse(fetchQuorumCorrelationId, otherNodeId, 
response);
 
-        client.poll();
-        assertEquals(2L, log.endOffset().offset);
-        assertEquals(2L, log.lastFlushedOffset());
+        context.client.poll();
+        assertEquals(2L, context.log.endOffset().offset);
+        assertEquals(2L, context.log.lastFlushedOffset());
     }
 
     @Test
     public void testEmptyRecordSetInFetchResponse() throws Exception {
         int otherNodeId = 1;
         int epoch = 5;
-        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
-        
quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, 
otherNodeId, voters));
-        KafkaRaftClient client = buildClient(voters);
+        Set<Integer> voters = Utils.mkSet(LOCAL_ID, otherNodeId);
 
-        assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, 
voters), quorumStateStore.readElectionState());
+        RaftClientTestContext context = new RaftClientTestContext.Builder()
+            .updateQuorumStateStore(quorumStateStore -> {
+                assertDoesNotThrow(() -> {
+                    
quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, 
otherNodeId, voters));
+                });
+            })
+            .build(voters);
+
+        assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, 
voters), context.quorumStateStore.readElectionState());

Review comment:
       nit: we have assertions like this in many test cases. With a more direct 
api to update quorum state, we can move these assertions into that api.

##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -1536,67 +1522,70 @@ public void 
testObserverLeaderRediscoveryAfterRequestTimeout() throws Exception
         int otherNodeId = 2;
         int epoch = 5;
         Set<Integer> voters = Utils.mkSet(leaderId, otherNodeId);
-        KafkaRaftClient client = buildClient(voters);
-        discoverLeaderAsObserver(client, voters, leaderId, epoch);
 
-        pollUntilSend(client);
-        RaftRequest.Outbound fetchRequest1 = assertSentFetchRequest();
+        RaftClientTestContext context = RaftClientTestContext.build(voters);
+
+        context.discoverLeaderAsObserver(voters, leaderId, epoch);
+
+        context.pollUntilSend();
+        RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
         assertEquals(leaderId, fetchRequest1.destinationId());
-        assertFetchRequestData(fetchRequest1, epoch, 0L, 0);
+        RaftClientTestContext.assertFetchRequestData(fetchRequest1, epoch, 0L, 
0);
 
-        time.sleep(requestTimeoutMs);
-        pollUntilSend(client);
+        context.time.sleep(REQUEST_TIMEOUT_MS);
+        context.pollUntilSend();
 
         // We should retry the Fetch against the other voter since the original
         // voter connection will be backing off.
-        RaftRequest.Outbound fetchRequest2 = assertSentFetchRequest();
+        RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
         assertNotEquals(leaderId, fetchRequest2.destinationId());
         assertTrue(voters.contains(fetchRequest2.destinationId()));
-        assertFetchRequestData(fetchRequest2, epoch, 0L, 0);
+        RaftClientTestContext.assertFetchRequestData(fetchRequest2, epoch, 0L, 
0);
 
-        deliverResponse(fetchRequest2.correlationId, 
fetchRequest2.destinationId(),
+        context.deliverResponse(fetchRequest2.correlationId, 
fetchRequest2.destinationId(),
             fetchResponse(epoch, leaderId, MemoryRecords.EMPTY, 0L, 
Errors.FENCED_LEADER_EPOCH));
-        client.poll();
+        context.client.poll();
 
-        assertEquals(ElectionState.withElectedLeader(epoch, leaderId, voters), 
quorumStateStore.readElectionState());
+        assertEquals(ElectionState.withElectedLeader(epoch, leaderId, voters), 
context.quorumStateStore.readElectionState());
     }
 
     @Test
     public void testLeaderGracefulShutdown() throws Exception {
         int otherNodeId = 1;
-        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
         int epoch = 1;
-        KafkaRaftClient client = initializeAsLeader(voters, epoch);
+        Set<Integer> voters = Utils.mkSet(LOCAL_ID, otherNodeId);
+
+        RaftClientTestContext context = 
RaftClientTestContext.initializeAsLeader(voters, epoch);
 
         // Now shutdown
         int shutdownTimeoutMs = 5000;
-        CompletableFuture<Void> shutdownFuture = 
client.shutdown(shutdownTimeoutMs);
+        CompletableFuture<Void> shutdownFuture = 
context.client.shutdown(shutdownTimeoutMs);
 
         // We should still be running until we have had a chance to send 
EndQuorumEpoch
-        assertTrue(client.isShuttingDown());
-        assertTrue(client.isRunning());
+        assertTrue(context.client.isShuttingDown());

Review comment:
       nit: it is a tad vexing to see all the `context` prefixes. I guess 
another option might be to define `RaftClientTestContext` as an abstract class 
so that the test method can define the test behavior within the scope of a 
subclass.
   
   For example:
   ```java
   new RaftClientTestContext(builder) {
     void run() {
       assertTrue(client.isShuttingDown());
       ...
     }
   }
   ```
   Not required, just an alternative to consider.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to