hachikuji commented on a change in pull request #9418:
URL: https://github.com/apache/kafka/pull/9418#discussion_r509719785



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1605,100 +1708,18 @@ public void poll() throws IOException {
         }
     }
 
-    private void failPendingAppends(KafkaException exception) {
-        for (UnwrittenAppend unwrittenAppend : unwrittenAppends) {
-            unwrittenAppend.fail(exception);
-        }
-        unwrittenAppends.clear();
-    }
-
-    private void pollPendingAppends(LeaderState state, long currentTimeMs) {
-        int numAppends = 0;
-        int maxNumAppends = unwrittenAppends.size();
-
-        while (!unwrittenAppends.isEmpty() && numAppends < maxNumAppends) {
-            final UnwrittenAppend unwrittenAppend = unwrittenAppends.poll();
-
-            if (unwrittenAppend.future.isDone())
-                continue;
-
-            if (unwrittenAppend.isTimedOut(currentTimeMs)) {
-                unwrittenAppend.fail(new TimeoutException("Request timeout " + 
unwrittenAppend.requestTimeoutMs
-                    + " expired before the records could be appended to the 
log"));
-            } else {
-                int epoch = quorum.epoch();
-                LogAppendInfo info = appendAsLeader(unwrittenAppend.records);
-                OffsetAndEpoch offsetAndEpoch = new 
OffsetAndEpoch(info.lastOffset, epoch);
-                long numRecords = info.lastOffset - info.firstOffset + 1;
-                logger.debug("Completed write of {} records at {}", 
numRecords, offsetAndEpoch);
-
-                if (unwrittenAppend.ackMode == AckMode.LEADER) {
-                    unwrittenAppend.complete(offsetAndEpoch);
-                } else if (unwrittenAppend.ackMode == AckMode.QUORUM) {
-                    CompletableFuture<Long> future = appendPurgatory.await(
-                        LogOffset.awaitCommitted(offsetAndEpoch.offset),
-                        unwrittenAppend.requestTimeoutMs);
-
-                    future.whenComplete((completionTimeMs, exception) -> {
-                        if (exception != null) {
-                            logger.error("Failed to commit append at {} due to 
{}", offsetAndEpoch, exception);
-
-                            unwrittenAppend.fail(exception);
-                        } else {
-                            long elapsedTime = Math.max(0, completionTimeMs - 
currentTimeMs);
-                            double elapsedTimePerRecord = (double) elapsedTime 
/ numRecords;
-                            
kafkaRaftMetrics.updateCommitLatency(elapsedTimePerRecord, currentTimeMs);
-                            unwrittenAppend.complete(offsetAndEpoch);
-
-                            logger.debug("Completed commit of {} records at 
{}", numRecords, offsetAndEpoch);
-                        }
-                    });
-                }
-            }
-
-            numAppends++;
-        }
-
-        if (numAppends > 0) {
-            flushLeaderLog(state, currentTimeMs);
-        }
-    }
-
-    /**
-     * Append a set of records to the log. Successful completion of the future 
indicates a success of
-     * the append, with the uncommitted base offset and epoch.
-     *
-     * @param records The records to write to the log
-     * @param ackMode The commit mode for the appended records
-     * @param timeoutMs The maximum time to wait for the append operation to 
complete (including
-     *                  any time needed for replication)
-     * @return The uncommitted base offset and epoch of the appended records
-     */
     @Override
-    public CompletableFuture<OffsetAndEpoch> append(
-        Records records,
-        AckMode ackMode,
-        long timeoutMs
-    ) {
-        if (records.sizeInBytes() == 0)
-            throw new IllegalArgumentException("Attempt to append empty record 
set");
-
-        if (shutdown.get() != null)
-            throw new IllegalStateException("Cannot append records while we 
are shutting down");
-
-        if (quorum.isObserver())
-            throw new IllegalStateException("Illegal attempt to write to an 
observer");
-
-        CompletableFuture<OffsetAndEpoch> future = new CompletableFuture<>();
-        UnwrittenAppend unwrittenAppend = new UnwrittenAppend(
-            records, time.milliseconds(), timeoutMs, ackMode, future);
+    public Long scheduleAppend(int epoch, List<T> records) {
+        BatchAccumulator<T> accumulator = this.accumulator;
+        if (accumulator == null) {
+            return Long.MAX_VALUE;

Review comment:
       Yeah, see my comment above about the handling of `Long.MAX_VALUE`. This 
is an attempt to reduce the error handling in the state machine. The model that 
we are working toward here is the following:
   
   1) the state machine gets notified that the node has become leader in some 
epoch
   2) the state machine can schedule appends with this epoch and it will get 
back the expected append offset
   3) the state machine treats scheduled appends as uncommitted until the call 
to `handleCommit`
   4) if the node resigns its leadership, the state machine will get notified 
and it will be expected to drop uncommitted data
   
   By using a sort of impossible offset sentinel, the state machine just needs 
to wait for the notification that the leader has resigned.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to