hachikuji commented on a change in pull request #9418: URL: https://github.com/apache/kafka/pull/9418#discussion_r506636227
########## File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java ########## @@ -19,33 +19,51 @@ import org.apache.kafka.common.record.Records; import java.io.IOException; +import java.util.List; import java.util.concurrent.CompletableFuture; -public interface RaftClient { +public interface RaftClient<T> { + + interface Listener<T> { + /** + * Callback which is invoked when records written through {@link #scheduleAppend(int, List)} + * become committed. + * + * Note that there is not a one-to-one correspondence between writes through + * {@link #scheduleAppend(int, List)} and this callback. The Raft implementation + * is free to batch together the records from multiple append calls provided + * that batch boundaries are respected. This means that each batch specified + * through {@link #scheduleAppend(int, List)} is guaranteed to be a subset of + * a batch passed to {@link #handleCommit(int, long, List)}. + * + * @param epoch the epoch in which the write was accepted + * @param lastOffset the offset of the last record in the record list + * @param records the set of records that were committed + */ + void handleCommit(int epoch, long lastOffset, List<T> records); + } /** * Initialize the client. This should only be called once and it must be * called before any of the other APIs can be invoked. * * @throws IOException For any IO errors during initialization */ - void initialize() throws IOException; + void initialize(Listener<T> listener) throws IOException; /** - * Append a new entry to the log. The client must be in the leader state to - * accept an append: it is up to the state machine implementation - * to ensure this using {@link #currentLeaderAndEpoch()}. - * - * TODO: One improvement we can make here is to allow the caller to specify - * the current leader epoch in the record set. That would ensure that each - * leader change must be "observed" by the state machine before new appends - * are accepted. + * Append a list of records to the log. The write will be scheduled for some time + * in the future. There is no guarantee that appended records will be written to + * the log and eventually committed. However, it is guaranteed that if any of the + * records become committed, then all of them will be. * - * @param records The records to append to the log - * @param timeoutMs Maximum time to wait for the append to complete - * @return A future containing the last offset and epoch of the appended records (if successful) + * @param epoch the current leader epoch + * @param records the list of records to append + * @return the offset within the current epoch that the log entries will be appended, + * or null if the leader was unable to accept the write (e.g. due to memory + * being reached). */ - CompletableFuture<OffsetAndEpoch> append(Records records, AckMode ackMode, long timeoutMs); + Long scheduleAppend(int epoch, List<T> records); Review comment: Yeah, this was more of a workaround until we thought of something better. The current idea we are thinking about is letting the Raft layer return a backoff time, so the type would semantically be `Either[Offset, BackoffMs]`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org