hachikuji commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r620722131



##########
File path: 
raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java
##########
@@ -65,6 +66,34 @@
         );
     }
 
+    @Test
+    public void testLeaderChangeMessageWritten() {

Review comment:
       A couple extra test cases to add:
   
   1. Can we add a test case for `flush()` to ensure that it forces an 
immediate drain?
   2. Can we add a test case in which we have undrained data when 
`appendLeaderChangeMessage` is called?

##########
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -194,14 +196,50 @@ private void completeCurrentBatch() {
         MemoryRecords data = currentBatch.build();
         completed.add(new CompletedBatch<>(
             currentBatch.baseOffset(),
-            currentBatch.records(),
+            Optional.of(currentBatch.records()),
             data,
             memoryPool,
             currentBatch.initialBuffer()
         ));
         currentBatch = null;
     }
 
+    public void appendLeaderChangeMessage(LeaderChangeMessage 
leaderChangeMessage, long currentTimeMs) {
+        appendLock.lock();
+        try {
+            maybeCompleteDrain();

Review comment:
       I think I was a little off in my suggestion to add this. I don't think 
this is sufficient to ensure the current batch gets completed before we add the 
leader change message since `maybeCompleteDrain` will only do so if a drain has 
been started. Maybe the simple thing is to call `flush()`?

##########
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -202,6 +203,25 @@ private void completeCurrentBatch() {
         currentBatch = null;
     }
 
+    public void addControlBatch(MemoryRecords records) {
+        appendLock.lock();
+        try {
+            drainStatus = DrainStatus.STARTED;

Review comment:
       How about `forceDrain`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to