hachikuji commented on a change in pull request #9418: URL: https://github.com/apache/kafka/pull/9418#discussion_r509719785
########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -1605,100 +1708,18 @@ public void poll() throws IOException { } } - private void failPendingAppends(KafkaException exception) { - for (UnwrittenAppend unwrittenAppend : unwrittenAppends) { - unwrittenAppend.fail(exception); - } - unwrittenAppends.clear(); - } - - private void pollPendingAppends(LeaderState state, long currentTimeMs) { - int numAppends = 0; - int maxNumAppends = unwrittenAppends.size(); - - while (!unwrittenAppends.isEmpty() && numAppends < maxNumAppends) { - final UnwrittenAppend unwrittenAppend = unwrittenAppends.poll(); - - if (unwrittenAppend.future.isDone()) - continue; - - if (unwrittenAppend.isTimedOut(currentTimeMs)) { - unwrittenAppend.fail(new TimeoutException("Request timeout " + unwrittenAppend.requestTimeoutMs - + " expired before the records could be appended to the log")); - } else { - int epoch = quorum.epoch(); - LogAppendInfo info = appendAsLeader(unwrittenAppend.records); - OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(info.lastOffset, epoch); - long numRecords = info.lastOffset - info.firstOffset + 1; - logger.debug("Completed write of {} records at {}", numRecords, offsetAndEpoch); - - if (unwrittenAppend.ackMode == AckMode.LEADER) { - unwrittenAppend.complete(offsetAndEpoch); - } else if (unwrittenAppend.ackMode == AckMode.QUORUM) { - CompletableFuture<Long> future = appendPurgatory.await( - LogOffset.awaitCommitted(offsetAndEpoch.offset), - unwrittenAppend.requestTimeoutMs); - - future.whenComplete((completionTimeMs, exception) -> { - if (exception != null) { - logger.error("Failed to commit append at {} due to {}", offsetAndEpoch, exception); - - unwrittenAppend.fail(exception); - } else { - long elapsedTime = Math.max(0, completionTimeMs - currentTimeMs); - double elapsedTimePerRecord = (double) elapsedTime / numRecords; - kafkaRaftMetrics.updateCommitLatency(elapsedTimePerRecord, currentTimeMs); - unwrittenAppend.complete(offsetAndEpoch); - - logger.debug("Completed commit of {} records at {}", numRecords, offsetAndEpoch); - } - }); - } - } - - numAppends++; - } - - if (numAppends > 0) { - flushLeaderLog(state, currentTimeMs); - } - } - - /** - * Append a set of records to the log. Successful completion of the future indicates a success of - * the append, with the uncommitted base offset and epoch. - * - * @param records The records to write to the log - * @param ackMode The commit mode for the appended records - * @param timeoutMs The maximum time to wait for the append operation to complete (including - * any time needed for replication) - * @return The uncommitted base offset and epoch of the appended records - */ @Override - public CompletableFuture<OffsetAndEpoch> append( - Records records, - AckMode ackMode, - long timeoutMs - ) { - if (records.sizeInBytes() == 0) - throw new IllegalArgumentException("Attempt to append empty record set"); - - if (shutdown.get() != null) - throw new IllegalStateException("Cannot append records while we are shutting down"); - - if (quorum.isObserver()) - throw new IllegalStateException("Illegal attempt to write to an observer"); - - CompletableFuture<OffsetAndEpoch> future = new CompletableFuture<>(); - UnwrittenAppend unwrittenAppend = new UnwrittenAppend( - records, time.milliseconds(), timeoutMs, ackMode, future); + public Long scheduleAppend(int epoch, List<T> records) { + BatchAccumulator<T> accumulator = this.accumulator; + if (accumulator == null) { + return Long.MAX_VALUE; Review comment: Yeah, see my comment above about the handling of `Long.MAX_VALUE`. This is an attempt to reduce the error handling in the state machine. The model that we are working toward here is the following: 1) the state machine gets notified that the node has become leader in some epoch 2) the state machine can schedule appends with this epoch and it will get back the expected append offset 3) the state machine treats scheduled appends as uncommitted until the call to `handleCommit` 4) if the node resigns its leadership, the state machine will get notified and it will be expected to drop uncommitted data By using a sort of impossible offset sentinel, the state machine just needs to wait for the notification that the leader has resigned. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org