vamossagar12 commented on a change in pull request #10468: URL: https://github.com/apache/kafka/pull/10468#discussion_r616018841
########## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java ########## @@ -1673,6 +1673,69 @@ public void testLeaderGracefulShutdownTimeout() throws Exception { assertFutureThrows(shutdownFuture, TimeoutException.class); } + @Test + public void testLeaderGracefulShutdownOnClose() throws Exception { + int localId = 0; + int otherNodeId = 1; + int lingerMs = 50; + Set<Integer> voters = Utils.mkSet(localId, otherNodeId); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + .withAppendLingerMs(lingerMs) + .build(); + + context.becomeLeader(); + assertEquals(OptionalInt.of(localId), context.currentLeader()); + assertEquals(1L, context.log.endOffset().offset); + + int epoch = context.currentEpoch(); + assertEquals(1L, context.client.scheduleAppend(epoch, singletonList("a"))); + + context.client.poll(); + assertEquals(OptionalLong.of(lingerMs), context.messageQueue.lastPollTimeoutMs()); + + context.time.sleep(20); + + // client closed now. + context.client.close(); + + // Flag for accepting appends should be toggled to false. + assertFalse(context.client.canAcceptAppends()); + + // acceptAppends flag set to false so no writes should be accepted by the Leader now. + assertNull(context.client.scheduleAppend(epoch, singletonList("b"))); + + // The leader should trigger a flush for whatever batches are present in the BatchAccumulator + assertEquals(2L, context.log.endOffset().offset); + + // Now shutdown + + // We should still be running until we have had a chance to send EndQuorumEpoch + assertTrue(context.client.isShuttingDown()); + assertTrue(context.client.isRunning()); + + // Send EndQuorumEpoch request to the other voter + context.pollUntilRequest(); + assertTrue(context.client.isShuttingDown()); + assertTrue(context.client.isRunning()); + context.assertSentEndQuorumEpochRequest(1, otherNodeId); + + // We should still be able to handle vote requests during graceful shutdown + // in order to help the new leader get elected + context.deliverRequest(context.voteRequest(epoch + 1, otherNodeId, epoch, 1L)); + context.client.poll(); + context.assertSentVoteResponse(Errors.NONE, epoch + 1, OptionalInt.empty(), true); + Review comment: no problem @jsancio plz review whenever you get the chance :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org