hachikuji commented on a change in pull request #10480: URL: https://github.com/apache/kafka/pull/10480#discussion_r610235224
########## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ########## @@ -202,6 +203,25 @@ private void completeCurrentBatch() { currentBatch = null; } + public void addControlBatch(MemoryRecords records) { + appendLock.lock(); + try { + drainStatus = DrainStatus.STARTED; + completed.add(new CompletedBatch<>( + nextOffset, + null, Review comment: Why don't we use `Optional.empty`? ########## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ########## @@ -202,6 +203,25 @@ private void completeCurrentBatch() { currentBatch = null; } + public void addControlBatch(MemoryRecords records) { + appendLock.lock(); + try { + drainStatus = DrainStatus.STARTED; Review comment: It would be useful to factor out a `flush()` API. We may have additional use cases in the future. ```java public void flush() { appendLock.lock(); try { drainStatus = DrainStatus.STARTED; maybeCompleteDrain(); } finally { appendLock.unlock(); } } ``` ########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -1876,12 +1836,12 @@ private void appendBatch( } private long maybeAppendBatches( - LeaderState state, + LeaderState<T> state, long currentTimeMs ) { - long timeUnitFlush = accumulator.timeUntilDrain(currentTimeMs); + long timeUnitFlush = state.accumulator().timeUntilDrain(currentTimeMs); Review comment: While we're here, can we fix the name? It should be `timeUntilFlush`. ########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -2222,9 +2182,12 @@ public Long scheduleAtomicAppend(int epoch, List<T> records) { return append(epoch, records, true); } + @SuppressWarnings("unchecked") Review comment: Seems this is not needed? ########## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ########## @@ -202,6 +203,25 @@ private void completeCurrentBatch() { currentBatch = null; } + public void addControlBatch(MemoryRecords records) { Review comment: Can we change this to `appendLeaderChangeMessage(LeaderChangeMessage)`? This would provide a stronger contract since it ensures that the batch is indeed a control batch, that its base offset is set consistently with `nextOffset`, and that it contains only one record as expected. It also would allow us to allocate the buffer used for the control batch from the `MemoryPool`. Then we wouldn't need to use the `null` values below when constructing the `CompletedBatch`, which avoids NPE potential. ########## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ########## @@ -202,6 +203,25 @@ private void completeCurrentBatch() { currentBatch = null; } + public void addControlBatch(MemoryRecords records) { + appendLock.lock(); + try { + drainStatus = DrainStatus.STARTED; + completed.add(new CompletedBatch<>( Review comment: I think we are assuming here that `currentBatch` is null. Although that is guaranteed to be the case for this specific usage in `KafkaRaftClient`, we should try to give `BatchAccumulator` a stronger contract. To fix this, we can just call `maybeCompleteDrain` before doing anything else. By the way, we should also have unit tests for these scenarios. ########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -1859,15 +1819,15 @@ private void appendBatch( offsetAndEpoch.offset + 1, Integer.MAX_VALUE); future.whenComplete((commitTimeMs, exception) -> { - int numRecords = batch.records.size(); + int numRecords = batch.records.get().size(); Review comment: We should try to avoid blind calls to `get()`. I'm honestly not too sure how this even works for the case of a control batch. Maybe we are just ignoring the error? ########## File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java ########## @@ -67,6 +75,39 @@ protected LeaderState( } this.grantingVoters.addAll(grantingVoters); this.log = logContext.logger(LeaderState.class); + this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null"); + } + + public BatchAccumulator<T> accumulator() { + return this.accumulator; + } + + private static List<Voter> convertToVoters(Set<Integer> voterIds) { + return voterIds.stream() + .map(follower -> new Voter().setVoterId(follower)) + .collect(Collectors.toList()); + } + + public void appendLeaderChangeMessage(int epoch, long baseOffset, long currentTimeMs) { + List<Voter> voters = convertToVoters(this.followers()); Review comment: If you look at `followers`, it is using `voterStates.keySet` and then stripping out the leaderId. Then we add it back on line 96 below. It seems simpler to use `voterStates.keySet` directly. ########## File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java ########## @@ -67,6 +75,39 @@ protected LeaderState( } this.grantingVoters.addAll(grantingVoters); this.log = logContext.logger(LeaderState.class); + this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null"); + } + + public BatchAccumulator<T> accumulator() { + return this.accumulator; + } + + private static List<Voter> convertToVoters(Set<Integer> voterIds) { + return voterIds.stream() + .map(follower -> new Voter().setVoterId(follower)) + .collect(Collectors.toList()); + } + + public void appendLeaderChangeMessage(int epoch, long baseOffset, long currentTimeMs) { Review comment: We can use the `epoch` field. We can also get rid of `baseOffset` if we move construction of `MemoryRecords` into `BatchAccumulator` as suggested in the other comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org