hachikuji commented on a change in pull request #10480: URL: https://github.com/apache/kafka/pull/10480#discussion_r620722131
########## File path: raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java ########## @@ -65,6 +66,34 @@ ); } + @Test + public void testLeaderChangeMessageWritten() { Review comment: A couple extra test cases to add: 1. Can we add a test case for `flush()` to ensure that it forces an immediate drain? 2. Can we add a test case in which we have undrained data when `appendLeaderChangeMessage` is called? ########## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ########## @@ -194,14 +196,50 @@ private void completeCurrentBatch() { MemoryRecords data = currentBatch.build(); completed.add(new CompletedBatch<>( currentBatch.baseOffset(), - currentBatch.records(), + Optional.of(currentBatch.records()), data, memoryPool, currentBatch.initialBuffer() )); currentBatch = null; } + public void appendLeaderChangeMessage(LeaderChangeMessage leaderChangeMessage, long currentTimeMs) { + appendLock.lock(); + try { + maybeCompleteDrain(); Review comment: I think I was a little off in my suggestion to add this. I don't think this is sufficient to ensure the current batch gets completed before we add the leader change message since `maybeCompleteDrain` will only do so if a drain has been started. Maybe the simple thing is to call `flush()`? ########## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ########## @@ -202,6 +203,25 @@ private void completeCurrentBatch() { currentBatch = null; } + public void addControlBatch(MemoryRecords records) { + appendLock.lock(); + try { + drainStatus = DrainStatus.STARTED; Review comment: How about `forceDrain`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org