http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/impl/LeaderState.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/impl/LeaderState.java 
b/raft-server/src/main/java/org/apache/raft/server/impl/LeaderState.java
deleted file mode 100644
index fbbcb85..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/impl/LeaderState.java
+++ /dev/null
@@ -1,582 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.impl;
-
-import com.google.common.base.Preconditions;
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.protocol.*;
-import org.apache.raft.server.storage.RaftLog;
-import org.apache.raft.shaded.proto.RaftProtos.LeaderNoOp;
-import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.raft.statemachine.TransactionContext;
-import org.apache.raft.util.Daemon;
-import org.apache.raft.util.ProtoUtils;
-import org.apache.raft.util.Timestamp;
-import org.slf4j.Logger;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-import static org.apache.raft.server.RaftServerConfigKeys.*;
-import static org.apache.raft.server.impl.LeaderState.StateUpdateEventType.*;
-
-/**
- * States for leader only. It contains three different types of processors:
- * 1. RPC senders: each thread is appending log to a follower
- * 2. EventProcessor: a single thread updating the raft server's state based on
- *                    status of log appending response
- * 3. PendingRequestHandler: a handler sending back responses to clients when
- *                           corresponding log entries are committed
- */
-public class LeaderState {
-  private static final Logger LOG = RaftServerImpl.LOG;
-
-  enum StateUpdateEventType {
-    STEPDOWN, UPDATECOMMIT, STAGINGPROGRESS
-  }
-
-  enum BootStrapProgress {
-    NOPROGRESS, PROGRESSING, CAUGHTUP
-  }
-
-  static class StateUpdateEvent {
-    final StateUpdateEventType type;
-    final long newTerm;
-
-    StateUpdateEvent(StateUpdateEventType type, long newTerm) {
-      this.type = type;
-      this.newTerm = newTerm;
-    }
-  }
-
-  static final StateUpdateEvent UPDATE_COMMIT_EVENT =
-      new StateUpdateEvent(StateUpdateEventType.UPDATECOMMIT, -1);
-  static final StateUpdateEvent STAGING_PROGRESS_EVENT =
-      new StateUpdateEvent(StateUpdateEventType.STAGINGPROGRESS, -1);
-
-  private final RaftServerImpl server;
-  private final RaftLog raftLog;
-  private final long currentTerm;
-  private volatile ConfigurationStagingState stagingState;
-  private List<List<FollowerInfo>> voterLists;
-
-  /**
-   * The list of threads appending entries to followers.
-   * The list is protected by the RaftServer's lock.
-   */
-  private final List<LogAppender> senders;
-  private final BlockingQueue<StateUpdateEvent> eventQ;
-  private final EventProcessor processor;
-  private final PendingRequests pendingRequests;
-  private volatile boolean running = true;
-
-  private final int stagingCatchupGap;
-  private final int snapshotChunkMaxSize;
-  private final int syncInterval;
-
-  LeaderState(RaftServerImpl server, RaftProperties properties) {
-    this.server = server;
-
-    stagingCatchupGap = properties.getInt(
-        RAFT_SERVER_STAGING_CATCHUP_GAP_KEY,
-        RAFT_SERVER_STAGING_CATCHUP_GAP_DEFAULT);
-    snapshotChunkMaxSize = properties.getInt(
-        RAFT_SNAPSHOT_CHUNK_MAX_SIZE_KEY,
-        RAFT_SNAPSHOT_CHUNK_MAX_SIZE_DEFAULT);
-    syncInterval = properties.getInt(
-        RAFT_SERVER_RPC_SLEEP_TIME_MS_KEY,
-        RAFT_SERVER_RPC_SLEEP_TIME_MS_DEFAULT);
-
-    final ServerState state = server.getState();
-    this.raftLog = state.getLog();
-    this.currentTerm = state.getCurrentTerm();
-    eventQ = new ArrayBlockingQueue<>(4096);
-    processor = new EventProcessor();
-    pendingRequests = new PendingRequests(server);
-
-    final RaftConfiguration conf = server.getRaftConf();
-    Collection<RaftPeer> others = conf.getOtherPeers(state.getSelfId());
-    final Timestamp t = new Timestamp().addTimeMs(-server.getMaxTimeoutMs());
-    final long nextIndex = raftLog.getNextIndex();
-    senders = new ArrayList<>(others.size());
-    for (RaftPeer p : others) {
-      FollowerInfo f = new FollowerInfo(p, t, nextIndex, true);
-      senders.add(server.getLogAppenderFactory().getLogAppender(server, this, 
f));
-    }
-    voterLists = divideFollowers(conf);
-  }
-
-  void start() {
-    // In the beginning of the new term, replicate an empty entry in order
-    // to finally commit entries in the previous term.
-    // Also this message can help identify the last committed index when
-    // the leader peer is just started.
-    final LogEntryProto placeHolder = LogEntryProto.newBuilder()
-        .setTerm(server.getState().getCurrentTerm())
-        .setIndex(raftLog.getNextIndex())
-        .setNoOp(LeaderNoOp.newBuilder()).build();
-    raftLog.append(placeHolder);
-
-    processor.start();
-    startSenders();
-  }
-
-  private void startSenders() {
-    senders.forEach(Thread::start);
-  }
-
-  void stop() {
-    this.running = false;
-    // do not interrupt event processor since it may be in the middle of 
logSync
-    for (LogAppender sender : senders) {
-      sender.stopSender();
-      sender.interrupt();
-    }
-    try {
-      pendingRequests.sendNotLeaderResponses();
-    } catch (IOException e) {
-      LOG.warn("Caught exception in sendNotLeaderResponses", e);
-    }
-  }
-
-  void notifySenders() {
-    senders.forEach(LogAppender::notifyAppend);
-  }
-
-  boolean inStagingState() {
-    return stagingState != null;
-  }
-
-  ConfigurationStagingState getStagingState() {
-    return stagingState;
-  }
-
-  long getCurrentTerm() {
-    return currentTerm;
-  }
-
-  int getSnapshotChunkMaxSize() {
-    return snapshotChunkMaxSize;
-  }
-
-  int getSyncInterval() {
-    return syncInterval;
-  }
-
-  /**
-   * Start bootstrapping new peers
-   */
-  PendingRequest startSetConfiguration(SetConfigurationRequest request) {
-    Preconditions.checkState(running && !inStagingState());
-
-    RaftPeer[] peersInNewConf = request.getPeersInNewConf();
-    Collection<RaftPeer> peersToBootStrap = RaftConfiguration
-        .computeNewPeers(peersInNewConf, server.getRaftConf());
-
-    // add the request to the pending queue
-    final PendingRequest pending = pendingRequests.addConfRequest(request);
-
-    ConfigurationStagingState stagingState = new ConfigurationStagingState(
-        peersToBootStrap, new 
PeerConfiguration(Arrays.asList(peersInNewConf)));
-    Collection<RaftPeer> newPeers = stagingState.getNewPeers();
-    // set the staging state
-    this.stagingState = stagingState;
-
-    if (newPeers.isEmpty()) {
-      applyOldNewConf();
-    } else {
-      // update the LeaderState's sender list
-      addSenders(newPeers);
-    }
-    return pending;
-  }
-
-  PendingRequest addPendingRequest(long index, RaftClientRequest request,
-      TransactionContext entry) {
-    return pendingRequests.addPendingRequest(index, request, entry);
-  }
-
-  private void applyOldNewConf() {
-    final ServerState state = server.getState();
-    final RaftConfiguration current = server.getRaftConf();
-    final RaftConfiguration oldNewConf= 
stagingState.generateOldNewConf(current,
-        state.getLog().getNextIndex());
-    // apply the (old, new) configuration to log, and use it as the current 
conf
-    long index = state.getLog().append(state.getCurrentTerm(), oldNewConf);
-    updateConfiguration(index, oldNewConf);
-
-    this.stagingState = null;
-    notifySenders();
-  }
-
-  private void updateConfiguration(long logIndex, RaftConfiguration newConf) {
-    voterLists = divideFollowers(newConf);
-    server.getState().setRaftConf(logIndex, newConf);
-  }
-
-  /**
-   * After receiving a setConfiguration request, the leader should update its
-   * RpcSender list.
-   */
-  void addSenders(Collection<RaftPeer> newMembers) {
-    final Timestamp t = new Timestamp().addTimeMs(-server.getMaxTimeoutMs());
-    final long nextIndex = raftLog.getNextIndex();
-    for (RaftPeer peer : newMembers) {
-      FollowerInfo f = new FollowerInfo(peer, t, nextIndex, false);
-      LogAppender sender = server.getLogAppenderFactory()
-          .getLogAppender(server, this, f);
-      senders.add(sender);
-      sender.start();
-    }
-  }
-
-  /**
-   * Update the RpcSender list based on the current configuration
-   */
-  private void updateSenders(RaftConfiguration conf) {
-    Preconditions.checkState(conf.isStable() && !inStagingState());
-    Iterator<LogAppender> iterator = senders.iterator();
-    while (iterator.hasNext()) {
-      LogAppender sender = iterator.next();
-      if (!conf.containsInConf(sender.getFollower().getPeer().getId())) {
-        iterator.remove();
-        sender.stopSender();
-        sender.interrupt();
-      }
-    }
-  }
-
-  void submitUpdateStateEvent(StateUpdateEvent event) {
-    try {
-      eventQ.put(event);
-    } catch (InterruptedException e) {
-      LOG.info("Interrupted when adding event {} into the queue", event);
-    }
-  }
-
-  private void prepare() {
-    synchronized (server) {
-      if (running) {
-        final RaftConfiguration conf = server.getRaftConf();
-        if (conf.isTransitional() && server.getState().isConfCommitted()) {
-          // the configuration is in transitional state, and has been committed
-          // so it is time to generate and replicate (new) conf.
-          replicateNewConf();
-        }
-      }
-    }
-  }
-
-  /**
-   * The processor thread takes the responsibility to update the raft server's
-   * state, such as changing to follower, or updating the committed index.
-   */
-  private class EventProcessor extends Daemon {
-    @Override
-    public void run() {
-      // apply an empty message; check if necessary to replicate (new) conf
-      prepare();
-
-      while (running) {
-        try {
-          StateUpdateEvent event = eventQ.poll(server.getMaxTimeoutMs(),
-              TimeUnit.MILLISECONDS);
-          synchronized (server) {
-            if (running) {
-              handleEvent(event);
-            }
-          }
-          // the updated configuration does not need to be sync'ed here
-        } catch (InterruptedException e) {
-          final String s = server.getId() + " " + getClass().getSimpleName()
-              + " thread is interrupted ";
-          if (!running) {
-            LOG.info(s + " gracefully; server=" + server);
-          } else {
-            LOG.warn(s + " UNEXPECTEDLY; server=" + server, e);
-            throw new RuntimeException(e);
-          }
-        } catch (IOException e) {
-          LOG.warn("Failed to persist new votedFor/term.", e);
-          // the failure should happen while changing the state to follower
-          // thus the in-memory state should have been updated
-          Preconditions.checkState(!running);
-        }
-      }
-    }
-  }
-
-  private void handleEvent(StateUpdateEvent e) throws IOException {
-    if (e == null) {
-      if (inStagingState()) {
-        checkNewPeers();
-      }
-    } else {
-      if (e.type == STEPDOWN) {
-        server.changeToFollower(e.newTerm, true);
-      } else if (e.type == UPDATECOMMIT) {
-        updateLastCommitted();
-      } else if (e.type == STAGINGPROGRESS) {
-        checkNewPeers();
-      }
-    }
-  }
-
-  /**
-   * So far we use a simple implementation for catchup checking:
-   * 1. If the latest rpc time of the remote peer is before 3 * max_timeout,
-   *    the peer made no progress for that long. We should fail the whole
-   *    setConfiguration request.
-   * 2. If the peer's matching index is just behind for a small gap, and the
-   *    peer was updated recently (within max_timeout), declare the peer as
-   *    caught-up.
-   * 3. Otherwise the peer is making progressing. Keep waiting.
-   */
-  private BootStrapProgress checkProgress(FollowerInfo follower,
-      long committed) {
-    Preconditions.checkArgument(!follower.isAttendingVote());
-    final Timestamp progressTime = new 
Timestamp().addTimeMs(-server.getMaxTimeoutMs());
-    final Timestamp timeoutTime = new 
Timestamp().addTimeMs(-3*server.getMaxTimeoutMs());
-    if (follower.getLastRpcResponseTime().compareTo(timeoutTime) < 0) {
-      LOG.debug("{} detects a follower {} timeout for bootstrapping," +
-              " timeoutTime: {}", server.getId(), follower, timeoutTime);
-      return BootStrapProgress.NOPROGRESS;
-    } else if (follower.getMatchIndex() + stagingCatchupGap > committed
-        && follower.getLastRpcResponseTime().compareTo(progressTime) > 0) {
-      return BootStrapProgress.CAUGHTUP;
-    } else {
-      return BootStrapProgress.PROGRESSING;
-    }
-  }
-
-  private Collection<BootStrapProgress> checkAllProgress(long committed) {
-    Preconditions.checkState(inStagingState());
-    return senders.stream()
-        .filter(sender -> !sender.getFollower().isAttendingVote())
-        .map(sender -> checkProgress(sender.getFollower(), committed))
-        .collect(Collectors.toCollection(ArrayList::new));
-  }
-
-  private void checkNewPeers() {
-    if (!inStagingState()) {
-      // it is possible that the bootstrapping is done and we still have
-      // remaining STAGINGPROGRESS event to handle.
-      updateLastCommitted();
-    } else {
-      final long committedIndex = server.getState().getLog()
-          .getLastCommittedIndex();
-      Collection<BootStrapProgress> reports = checkAllProgress(committedIndex);
-      if (reports.contains(BootStrapProgress.NOPROGRESS)) {
-        LOG.debug("{} fails the setConfiguration request", server.getId());
-        stagingState.fail();
-      } else if (!reports.contains(BootStrapProgress.PROGRESSING)) {
-        // all caught up!
-        applyOldNewConf();
-        for (LogAppender sender : senders) {
-          sender.getFollower().startAttendVote();
-        }
-      }
-    }
-  }
-
-  boolean isBootStrappingPeer(String peerId) {
-    return inStagingState() && getStagingState().contains(peerId);
-  }
-
-  private void updateLastCommitted() {
-    final String selfId = server.getId();
-    final RaftConfiguration conf = server.getRaftConf();
-    long majorityInNewConf = computeLastCommitted(voterLists.get(0),
-        conf.containsInConf(selfId));
-    final long oldLastCommitted = raftLog.getLastCommittedIndex();
-    final LogEntryProto[] entriesToCommit;
-    if (!conf.isTransitional()) {
-      // copy the entries that may get committed out of the raftlog, to prevent
-      // the possible race that the log gets purged after the statemachine does
-      // a snapshot
-      entriesToCommit = raftLog.getEntries(oldLastCommitted + 1,
-          Math.max(majorityInNewConf, oldLastCommitted) + 1);
-      server.getState().updateStatemachine(majorityInNewConf, currentTerm);
-    } else { // configuration is in transitional state
-      long majorityInOldConf = computeLastCommitted(voterLists.get(1),
-          conf.containsInOldConf(selfId));
-      final long majority = Math.min(majorityInNewConf, majorityInOldConf);
-      entriesToCommit = raftLog.getEntries(oldLastCommitted + 1,
-          Math.max(majority, oldLastCommitted) + 1);
-      server.getState().updateStatemachine(majority, currentTerm);
-    }
-    checkAndUpdateConfiguration(entriesToCommit);
-  }
-
-  private boolean committedConf(LogEntryProto[] entries) {
-    final long currentCommitted = raftLog.getLastCommittedIndex();
-    for (LogEntryProto entry : entries) {
-      if (entry.getIndex() <= currentCommitted &&
-          ProtoUtils.isConfigurationLogEntry(entry)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private void checkAndUpdateConfiguration(LogEntryProto[] entriesToCheck) {
-    final RaftConfiguration conf = server.getRaftConf();
-    if (committedConf(entriesToCheck)) {
-      if (conf.isTransitional()) {
-        replicateNewConf();
-      } else { // the (new) log entry has been committed
-        LOG.debug("{} sends success to setConfiguration request", 
server.getId());
-        pendingRequests.replySetConfiguration();
-        // if the leader is not included in the current configuration, step 
down
-        if (!conf.containsInConf(server.getId())) {
-          LOG.info("{} is not included in the new configuration {}. Step 
down.",
-              server.getId(), conf);
-          try {
-            // leave some time for all RPC senders to send out new conf entry
-            Thread.sleep(server.getMinTimeoutMs());
-          } catch (InterruptedException ignored) {
-          }
-          // the pending request handler will send NotLeaderException for
-          // pending client requests when it stops
-          server.close();
-        }
-      }
-    }
-  }
-
-  /**
-   * when the (old, new) log entry has been committed, should replicate (new):
-   * 1) append (new) to log
-   * 2) update conf to (new)
-   * 3) update RpcSenders list
-   * 4) start replicating the log entry
-   */
-  private void replicateNewConf() {
-    final RaftConfiguration conf = server.getRaftConf();
-    final RaftConfiguration newConf = RaftConfiguration.newBuilder()
-        .setConf(conf)
-        .setLogEntryIndex(raftLog.getNextIndex())
-        .build();
-    // stop the LogAppender if the corresponding follower is no longer in the 
conf
-    updateSenders(newConf);
-    long index = raftLog.append(server.getState().getCurrentTerm(), newConf);
-    updateConfiguration(index, newConf);
-    notifySenders();
-  }
-
-  private long computeLastCommitted(List<FollowerInfo> followers,
-      boolean includeSelf) {
-    final int length = includeSelf ? followers.size() + 1 : followers.size();
-    final long[] indices = new long[length];
-    for (int i = 0; i < followers.size(); i++) {
-      indices[i] = followers.get(i).getMatchIndex();
-    }
-    if (includeSelf) {
-      // note that we also need to wait for the local disk I/O
-      indices[length - 1] = raftLog.getLatestFlushedIndex();
-    }
-
-    Arrays.sort(indices);
-    return indices[(indices.length - 1) / 2];
-  }
-
-  private List<List<FollowerInfo>> divideFollowers(RaftConfiguration conf) {
-    List<List<FollowerInfo>> lists = new ArrayList<>(2);
-    List<FollowerInfo> listForNew = senders.stream()
-        .filter(sender -> 
conf.containsInConf(sender.getFollower().getPeer().getId()))
-        .map(LogAppender::getFollower)
-        .collect(Collectors.toList());
-    lists.add(listForNew);
-    if (conf.isTransitional()) {
-      List<FollowerInfo> listForOld = senders.stream()
-          .filter(sender -> 
conf.containsInOldConf(sender.getFollower().getPeer().getId()))
-          .map(LogAppender::getFollower)
-          .collect(Collectors.toList());
-      lists.add(listForOld);
-    }
-    return lists;
-  }
-
-  PendingRequest returnNoConfChange(SetConfigurationRequest r) {
-    PendingRequest pending = new PendingRequest(r);
-    pending.setSuccessReply(null);
-    return pending;
-  }
-
-  void replyPendingRequest(long logIndex, CompletableFuture<Message> message) {
-    pendingRequests.replyPendingRequest(logIndex, message);
-  }
-
-  TransactionContext getTransactionContext(long index) {
-    return pendingRequests.getTransactionContext(index);
-  }
-
-  private class ConfigurationStagingState {
-    private final Map<String, RaftPeer> newPeers;
-    private final PeerConfiguration newConf;
-
-    ConfigurationStagingState(Collection<RaftPeer> newPeers,
-        PeerConfiguration newConf) {
-      Map<String, RaftPeer> map = new HashMap<>();
-      for (RaftPeer peer : newPeers) {
-        map.put(peer.getId(), peer);
-      }
-      this.newPeers = Collections.unmodifiableMap(map);
-      this.newConf = newConf;
-    }
-
-    RaftConfiguration generateOldNewConf(RaftConfiguration current,
-        long logIndex) {
-      return RaftConfiguration.newBuilder()
-          .setConf(newConf)
-          .setOldConf(current)
-          .setLogEntryIndex(logIndex)
-          .build();
-    }
-
-    Collection<RaftPeer> getNewPeers() {
-      return newPeers.values();
-    }
-
-    boolean contains(String peerId) {
-      return newPeers.containsKey(peerId);
-    }
-
-    void fail() {
-      Iterator<LogAppender> iterator = senders.iterator();
-      while (iterator.hasNext()) {
-        LogAppender sender = iterator.next();
-        if (!sender.getFollower().isAttendingVote()) {
-          iterator.remove();
-          sender.stopSender();
-          sender.interrupt();
-        }
-      }
-      LeaderState.this.stagingState = null;
-      // send back failure response to client's request
-      pendingRequests.failSetConfiguration(
-          new ReconfigurationTimeoutException("Fail to set configuration "
-              + newConf + ". Timeout when bootstrapping new peers."));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/impl/LogAppender.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/impl/LogAppender.java 
b/raft-server/src/main/java/org/apache/raft/server/impl/LogAppender.java
deleted file mode 100644
index cf613ca..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/impl/LogAppender.java
+++ /dev/null
@@ -1,480 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.impl;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.commons.io.IOUtils;
-import org.apache.raft.server.impl.LeaderState.StateUpdateEventType;
-import org.apache.raft.server.protocol.TermIndex;
-import org.apache.raft.server.storage.FileInfo;
-import org.apache.raft.server.storage.RaftLog;
-import org.apache.raft.shaded.com.google.protobuf.ByteString;
-import org.apache.raft.shaded.proto.RaftProtos.*;
-import org.apache.raft.statemachine.SnapshotInfo;
-import org.apache.raft.util.Daemon;
-import org.apache.raft.util.ProtoUtils;
-import org.apache.raft.util.Timestamp;
-import org.slf4j.Logger;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.nio.file.Path;
-import java.util.*;
-
-import static org.apache.raft.server.RaftServerConfigKeys.*;
-import static 
org.apache.raft.server.impl.RaftServerConstants.INVALID_LOG_INDEX;
-
-/**
- * A daemon thread appending log entries to a follower peer.
- */
-public class LogAppender extends Daemon {
-  public static final Logger LOG = RaftServerImpl.LOG;
-
-  protected final RaftServerImpl server;
-  private final LeaderState leaderState;
-  protected final RaftLog raftLog;
-  protected final FollowerInfo follower;
-  private final int maxBufferSize;
-  private final boolean batchSending;
-  private final LogEntryBuffer buffer;
-  private final long leaderTerm;
-
-  private volatile boolean sending = true;
-
-  public LogAppender(RaftServerImpl server, LeaderState leaderState, 
FollowerInfo f) {
-    this.follower = f;
-    this.server = server;
-    this.leaderState = leaderState;
-    this.raftLog = server.getState().getLog();
-    this.maxBufferSize = server.getProperties().getInt(
-        RAFT_SERVER_LOG_APPENDER_BUFFER_CAPACITY_KEY,
-        RAFT_SERVER_LOG_APPENDER_BUFFER_CAPACITY_DEFAULT);
-    this.batchSending = server.getProperties().getBoolean(
-        RAFT_SERVER_LOG_APPENDER_BATCH_ENABLED_KEY,
-        RAFT_SERVER_LOG_APPENDER_BATCH_ENABLED_DEFAULT);
-    this.buffer = new LogEntryBuffer();
-    this.leaderTerm = server.getState().getCurrentTerm();
-  }
-
-  @Override
-  public String toString() {
-    return getClass().getSimpleName() + "(" + server.getId() + " -> " +
-        follower.getPeer().getId() + ")";
-  }
-
-  @Override
-  public void run() {
-    try {
-      checkAndSendAppendEntries();
-    } catch (InterruptedException | InterruptedIOException e) {
-      LOG.info(this + " was interrupted: " + e);
-    }
-  }
-
-  protected boolean isAppenderRunning() {
-    return sending;
-  }
-
-  public void stopSender() {
-    this.sending = false;
-  }
-
-  public FollowerInfo getFollower() {
-    return follower;
-  }
-
-  /**
-   * A buffer for log entries with size limitation.
-   */
-  private class LogEntryBuffer {
-    private final List<LogEntryProto> buf = new ArrayList<>();
-    private int totalSize = 0;
-
-    void addEntry(LogEntryProto entry) {
-      buf.add(entry);
-      totalSize += entry.getSerializedSize();
-    }
-
-    boolean isFull() {
-      return totalSize >= maxBufferSize;
-    }
-
-    boolean isEmpty() {
-      return buf.isEmpty();
-    }
-
-    AppendEntriesRequestProto getAppendRequest(TermIndex previous) {
-      final AppendEntriesRequestProto request = server
-          .createAppendEntriesRequest(leaderTerm, follower.getPeer().getId(),
-              previous, buf, !follower.isAttendingVote());
-      buf.clear();
-      totalSize = 0;
-      return request;
-    }
-
-    int getPendingEntryNum() {
-      return buf.size();
-    }
-  }
-
-  private TermIndex getPrevious() {
-    TermIndex previous = ServerProtoUtils.toTermIndex(
-        raftLog.get(follower.getNextIndex() - 1));
-    if (previous == null) {
-      // if previous is null, nextIndex must be equal to the log start
-      // index (otherwise we will install snapshot).
-      Preconditions.checkState(follower.getNextIndex() == 
raftLog.getStartIndex(),
-          "follower's next index %s, local log start index %s",
-          follower.getNextIndex(), raftLog.getStartIndex());
-      SnapshotInfo snapshot = server.getState().getLatestSnapshot();
-      previous = snapshot == null ? null : snapshot.getTermIndex();
-    }
-    return previous;
-  }
-
-  protected AppendEntriesRequestProto createRequest() {
-    final TermIndex previous = getPrevious();
-    final long leaderNext = raftLog.getNextIndex();
-    long next = follower.getNextIndex() + buffer.getPendingEntryNum();
-    boolean toSend = false;
-
-    if (leaderNext == next && !buffer.isEmpty()) {
-      // no new entries, then send out the entries in the buffer
-      toSend = true;
-    } else if (leaderNext > next) {
-      while (leaderNext > next && !buffer.isFull()) {
-        // stop adding entry once the buffer size is >= the max size
-        buffer.addEntry(raftLog.get(next++));
-      }
-      if (buffer.isFull() || !batchSending) {
-        // buffer is full or batch sending is disabled, send out a request
-        toSend = true;
-      }
-    }
-
-    if (toSend || shouldHeartbeat()) {
-      return buffer.getAppendRequest(previous);
-    }
-    return null;
-  }
-
-  /** Send an appendEntries RPC; retry indefinitely. */
-  private AppendEntriesReplyProto sendAppendEntriesWithRetries()
-      throws InterruptedException, InterruptedIOException {
-    int retry = 0;
-    AppendEntriesRequestProto request = null;
-    while (isAppenderRunning()) { // keep retrying for IOException
-      try {
-        if (request == null || request.getEntriesCount() == 0) {
-          request = createRequest();
-        }
-
-        if (request == null) {
-          LOG.trace("{} need not send AppendEntries now." +
-              " Wait for more entries.", server.getId());
-          return null;
-        } else if (!isAppenderRunning()) {
-          LOG.debug("LogAppender {} has been stopped. Skip the request.", 
this);
-          return null;
-        }
-
-        follower.updateLastRpcSendTime();
-        final AppendEntriesReplyProto r = server.getServerRpc()
-            .appendEntries(request);
-        follower.updateLastRpcResponseTime();
-
-        return r;
-      } catch (InterruptedIOException iioe) {
-        throw iioe;
-      } catch (IOException ioe) {
-        LOG.debug(this + ": failed to send appendEntries; retry " + retry++, 
ioe);
-      }
-      if (isAppenderRunning()) {
-        Thread.sleep(leaderState.getSyncInterval());
-      }
-    }
-    return null;
-  }
-
-  protected class SnapshotRequestIter
-      implements Iterable<InstallSnapshotRequestProto> {
-    private final SnapshotInfo snapshot;
-    private final List<FileInfo> files;
-    private FileInputStream in;
-    private int fileIndex = 0;
-
-    private FileInfo currentFileInfo;
-    private byte[] currentBuf;
-    private long currentFileSize;
-    private long currentOffset = 0;
-    private int chunkIndex = 0;
-
-    private final String requestId;
-    private int requestIndex = 0;
-
-    public SnapshotRequestIter(SnapshotInfo snapshot, String requestId)
-        throws IOException {
-      this.snapshot = snapshot;
-      this.requestId = requestId;
-      this.files = snapshot.getFiles();
-      if (files.size() > 0) {
-        startReadFile();
-      }
-    }
-
-    private void startReadFile() throws IOException {
-      currentFileInfo = files.get(fileIndex);
-      File snapshotFile = currentFileInfo.getPath().toFile();
-      currentFileSize = snapshotFile.length();
-      final int bufLength =
-          (int) Math.min(leaderState.getSnapshotChunkMaxSize(), 
currentFileSize);
-      currentBuf = new byte[bufLength];
-      currentOffset = 0;
-      chunkIndex = 0;
-      in = new FileInputStream(snapshotFile);
-    }
-
-    @Override
-    public Iterator<InstallSnapshotRequestProto> iterator() {
-      return new Iterator<InstallSnapshotRequestProto>() {
-        @Override
-        public boolean hasNext() {
-          return fileIndex < files.size();
-        }
-
-        @Override
-        public InstallSnapshotRequestProto next() {
-          if (fileIndex >= files.size()) {
-            throw new NoSuchElementException();
-          }
-          int targetLength = (int) Math.min(currentFileSize - currentOffset,
-              leaderState.getSnapshotChunkMaxSize());
-          FileChunkProto chunk;
-          try {
-            chunk = readFileChunk(currentFileInfo, in, currentBuf,
-                targetLength, currentOffset, chunkIndex);
-            boolean done = (fileIndex == files.size() - 1) &&
-                chunk.getDone();
-            InstallSnapshotRequestProto request =
-                server.createInstallSnapshotRequest(follower.getPeer().getId(),
-                    requestId, requestIndex++, snapshot,
-                    Lists.newArrayList(chunk), done);
-            currentOffset += targetLength;
-            chunkIndex++;
-
-            if (currentOffset >= currentFileSize) {
-              in.close();
-              fileIndex++;
-              if (fileIndex < files.size()) {
-                startReadFile();
-              }
-            }
-
-            return request;
-          } catch (IOException e) {
-            if (in != null) {
-              try {
-                in.close();
-              } catch (IOException ignored) {
-              }
-            }
-            LOG.warn("Got exception when preparing InstallSnapshot request", 
e);
-            throw new RuntimeException(e);
-          }
-        }
-      };
-    }
-  }
-
-  private FileChunkProto readFileChunk(FileInfo fileInfo,
-      FileInputStream in, byte[] buf, int length, long offset, int chunkIndex)
-      throws IOException {
-    FileChunkProto.Builder builder = FileChunkProto.newBuilder()
-        .setOffset(offset).setChunkIndex(chunkIndex);
-    IOUtils.readFully(in, buf, 0, length);
-    Path relativePath = server.getState().getStorage().getStorageDir()
-        .relativizeToRoot(fileInfo.getPath());
-    builder.setFilename(relativePath.toString());
-    builder.setDone(offset + length == fileInfo.getFileSize());
-    builder.setFileDigest(
-        ByteString.copyFrom(fileInfo.getFileDigest().getDigest()));
-    builder.setData(ByteString.copyFrom(buf, 0, length));
-    return builder.build();
-  }
-
-  private InstallSnapshotReplyProto installSnapshot(SnapshotInfo snapshot)
-      throws InterruptedException, InterruptedIOException {
-    String requestId = UUID.randomUUID().toString();
-    InstallSnapshotReplyProto reply = null;
-    try {
-      for (InstallSnapshotRequestProto request :
-          new SnapshotRequestIter(snapshot, requestId)) {
-        follower.updateLastRpcSendTime();
-        reply = server.getServerRpc().installSnapshot(request);
-        follower.updateLastRpcResponseTime();
-
-        if (!reply.getServerReply().getSuccess()) {
-          return reply;
-        }
-      }
-    } catch (InterruptedIOException iioe) {
-      throw iioe;
-    } catch (Exception ioe) {
-      LOG.warn(this + ": failed to install SnapshotInfo " + 
snapshot.getFiles(),
-          ioe);
-      return null;
-    }
-
-    if (reply != null) {
-      follower.updateMatchIndex(snapshot.getTermIndex().getIndex());
-      follower.updateNextIndex(snapshot.getTermIndex().getIndex() + 1);
-      LOG.info("{}: install snapshot-{} successfully on follower {}",
-          server.getId(), snapshot.getTermIndex().getIndex(), 
follower.getPeer());
-    }
-    return reply;
-  }
-
-  protected SnapshotInfo shouldInstallSnapshot() {
-    final long logStartIndex = raftLog.getStartIndex();
-    // we should install snapshot if the follower needs to catch up and:
-    // 1. there is no local log entry but there is snapshot
-    // 2. or the follower's next index is smaller than the log start index
-    if (follower.getNextIndex() < raftLog.getNextIndex()) {
-      SnapshotInfo snapshot = server.getState().getLatestSnapshot();
-      if (follower.getNextIndex() < logStartIndex ||
-          (logStartIndex == INVALID_LOG_INDEX && snapshot != null)) {
-        return snapshot;
-      }
-    }
-    return null;
-  }
-
-  /** Check and send appendEntries RPC */
-  private void checkAndSendAppendEntries()
-      throws InterruptedException, InterruptedIOException {
-    while (isAppenderRunning()) {
-      if (shouldSendRequest()) {
-        SnapshotInfo snapshot = shouldInstallSnapshot();
-        if (snapshot != null) {
-          LOG.info("{}: follower {}'s next index is {}," +
-              " log's start index is {}, need to install snapshot",
-              server.getId(), follower.getPeer(), follower.getNextIndex(),
-              raftLog.getStartIndex());
-
-          final InstallSnapshotReplyProto r = installSnapshot(snapshot);
-          if (r != null && r.getResult() == InstallSnapshotResult.NOT_LEADER) {
-            checkResponseTerm(r.getTerm());
-          } // otherwise if r is null, retry the snapshot installation
-        } else {
-          final AppendEntriesReplyProto r = sendAppendEntriesWithRetries();
-          if (r != null) {
-            handleReply(r);
-          }
-        }
-      }
-      if (isAppenderRunning() && !shouldAppendEntries(
-          follower.getNextIndex() + buffer.getPendingEntryNum())) {
-        final long waitTime = getHeartbeatRemainingTime(
-            follower.getLastRpcTime());
-        if (waitTime > 0) {
-          synchronized (this) {
-            wait(waitTime);
-          }
-        }
-      }
-    }
-  }
-
-  private void handleReply(AppendEntriesReplyProto reply) {
-    if (reply != null) {
-      switch (reply.getResult()) {
-        case SUCCESS:
-          final long oldNextIndex = follower.getNextIndex();
-          final long nextIndex = reply.getNextIndex();
-          if (nextIndex < oldNextIndex) {
-            throw new IllegalStateException("nextIndex=" + nextIndex
-                + " < oldNextIndex=" + oldNextIndex
-                + ", reply=" + ProtoUtils.toString(reply));
-          }
-
-          if (nextIndex > oldNextIndex) {
-            follower.updateMatchIndex(nextIndex - 1);
-            follower.updateNextIndex(nextIndex);
-            submitEventOnSuccessAppend();
-          }
-          break;
-        case NOT_LEADER:
-          // check if should step down
-          checkResponseTerm(reply.getTerm());
-          break;
-        case INCONSISTENCY:
-          follower.decreaseNextIndex(reply.getNextIndex());
-          break;
-        case UNRECOGNIZED:
-          LOG.warn("{} received UNRECOGNIZED AppendResult from {}",
-              server.getId(), follower.getPeer().getId());
-          break;
-      }
-    }
-  }
-
-  protected void submitEventOnSuccessAppend() {
-    LeaderState.StateUpdateEvent e = follower.isAttendingVote() ?
-        LeaderState.UPDATE_COMMIT_EVENT :
-        LeaderState.STAGING_PROGRESS_EVENT;
-    leaderState.submitUpdateStateEvent(e);
-  }
-
-  public synchronized void notifyAppend() {
-    this.notify();
-  }
-
-  /** Should the leader send appendEntries RPC to this follower? */
-  protected boolean shouldSendRequest() {
-    return shouldAppendEntries(follower.getNextIndex()) || shouldHeartbeat();
-  }
-
-  private boolean shouldAppendEntries(long followerIndex) {
-    return followerIndex < raftLog.getNextIndex();
-  }
-
-  private boolean shouldHeartbeat() {
-    return getHeartbeatRemainingTime(follower.getLastRpcTime()) <= 0;
-  }
-
-  /**
-   * @return the time in milliseconds that the leader should send a heartbeat.
-   */
-  protected long getHeartbeatRemainingTime(Timestamp lastTime) {
-    return server.getMinTimeoutMs() / 2 - lastTime.elapsedTimeMs();
-  }
-
-  protected void checkResponseTerm(long responseTerm) {
-    synchronized (server) {
-      if (isAppenderRunning() && follower.isAttendingVote()
-          && responseTerm > leaderState.getCurrentTerm()) {
-        leaderState.submitUpdateStateEvent(
-            new LeaderState.StateUpdateEvent(StateUpdateEventType.STEPDOWN,
-                responseTerm));
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/impl/LogAppenderFactory.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/impl/LogAppenderFactory.java 
b/raft-server/src/main/java/org/apache/raft/server/impl/LogAppenderFactory.java
deleted file mode 100644
index d77faff..0000000
--- 
a/raft-server/src/main/java/org/apache/raft/server/impl/LogAppenderFactory.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.impl;
-
-public interface LogAppenderFactory {
-  LogAppender getLogAppender(RaftServerImpl server, LeaderState state,
-                             FollowerInfo f);
-
-  class SynchronousLogAppenderFactory implements LogAppenderFactory {
-    @Override
-    public LogAppender getLogAppender(RaftServerImpl server, LeaderState state,
-                                      FollowerInfo f) {
-      return new LogAppender(server, state, f);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/impl/PeerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/impl/PeerConfiguration.java 
b/raft-server/src/main/java/org/apache/raft/server/impl/PeerConfiguration.java
deleted file mode 100644
index 774a0c5..0000000
--- 
a/raft-server/src/main/java/org/apache/raft/server/impl/PeerConfiguration.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.impl;
-
-import com.google.common.base.Preconditions;
-import org.apache.raft.protocol.RaftPeer;
-
-import java.util.*;
-
-/**
- * The peer configuration of a raft cluster.
- *
- * The objects of this class are immutable.
- */
-class PeerConfiguration {
-  private final Map<String, RaftPeer> peers;
-
-  PeerConfiguration(Iterable<RaftPeer> peers) {
-    Preconditions.checkNotNull(peers);
-    Map<String, RaftPeer> map = new HashMap<>();
-    for(RaftPeer p : peers) {
-      map.put(p.getId(), p);
-    }
-    this.peers = Collections.unmodifiableMap(map);
-    Preconditions.checkState(!this.peers.isEmpty());
-  }
-
-  Collection<RaftPeer> getPeers() {
-    return Collections.unmodifiableCollection(peers.values());
-  }
-
-  int size() {
-    return peers.size();
-  }
-
-  @Override
-  public String toString() {
-    return peers.values().toString();
-  }
-
-  RaftPeer getPeer(String id) {
-    return peers.get(id);
-  }
-
-  boolean contains(String id) {
-    return peers.containsKey(id);
-  }
-
-  List<RaftPeer> getOtherPeers(String selfId) {
-    List<RaftPeer> others = new ArrayList<>();
-    for (Map.Entry<String, RaftPeer> entry : peers.entrySet()) {
-      if (!selfId.equals(entry.getValue().getId())) {
-        others.add(entry.getValue());
-      }
-    }
-    return others;
-  }
-
-  boolean hasMajority(Collection<String> others, String selfId) {
-    Preconditions.checkArgument(!others.contains(selfId));
-    int num = 0;
-    if (contains(selfId)) {
-      num++;
-    }
-    for (String other : others) {
-      if (contains(other)) {
-        num++;
-      }
-      if (num > size() / 2) {
-        return true;
-      }
-    }
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/impl/PendingRequest.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/impl/PendingRequest.java 
b/raft-server/src/main/java/org/apache/raft/server/impl/PendingRequest.java
deleted file mode 100644
index 689566a..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/impl/PendingRequest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.impl;
-
-import com.google.common.base.Preconditions;
-import org.apache.raft.protocol.Message;
-import org.apache.raft.protocol.RaftClientReply;
-import org.apache.raft.protocol.RaftClientRequest;
-import org.apache.raft.protocol.SetConfigurationRequest;
-import org.apache.raft.statemachine.TransactionContext;
-
-import java.util.concurrent.CompletableFuture;
-
-public class PendingRequest implements Comparable<PendingRequest> {
-  private final Long index;
-  private final RaftClientRequest request;
-  private final TransactionContext entry;
-  private final CompletableFuture<RaftClientReply> future;
-
-  PendingRequest(long index, RaftClientRequest request,
-                 TransactionContext entry) {
-    this.index = index;
-    this.request = request;
-    this.entry = entry;
-    this.future = new CompletableFuture<>();
-  }
-
-  PendingRequest(SetConfigurationRequest request) {
-    this(RaftServerConstants.INVALID_LOG_INDEX, request, null);
-  }
-
-  long getIndex() {
-    return index;
-  }
-
-  RaftClientRequest getRequest() {
-    return request;
-  }
-
-  public CompletableFuture<RaftClientReply> getFuture() {
-    return future;
-  }
-
-  TransactionContext getEntry() {
-    return entry;
-  }
-
-  synchronized void setException(Throwable e) {
-    Preconditions.checkArgument(e != null);
-    future.completeExceptionally(e);
-  }
-
-  synchronized void setReply(RaftClientReply r) {
-    Preconditions.checkArgument(r != null);
-    future.complete(r);
-  }
-
-  void setSuccessReply(Message message) {
-    setReply(new RaftClientReply(getRequest(), message));
-  }
-
-  @Override
-  public int compareTo(PendingRequest that) {
-    return Long.compare(this.index, that.index);
-  }
-
-  @Override
-  public String toString() {
-    return getClass().getSimpleName() + "(index=" + index
-        + ", request=" + request;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/impl/PendingRequests.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/impl/PendingRequests.java 
b/raft-server/src/main/java/org/apache/raft/server/impl/PendingRequests.java
deleted file mode 100644
index 32f127e..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/impl/PendingRequests.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.impl;
-
-import com.google.common.base.Preconditions;
-import org.apache.raft.protocol.*;
-import org.apache.raft.statemachine.TransactionContext;
-import org.slf4j.Logger;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.stream.Collectors;
-
-class PendingRequests {
-  private static final Logger LOG = RaftServerImpl.LOG;
-
-  private PendingRequest pendingSetConf;
-  private final RaftServerImpl server;
-  private final ConcurrentMap<Long, PendingRequest> pendingRequests = new 
ConcurrentHashMap<>();
-  private PendingRequest last = null;
-
-  PendingRequests(RaftServerImpl server) {
-    this.server = server;
-  }
-
-  PendingRequest addPendingRequest(long index, RaftClientRequest request,
-      TransactionContext entry) {
-    // externally synced for now
-    Preconditions.checkArgument(!request.isReadOnly());
-    Preconditions.checkState(last == null || index == last.getIndex() + 1);
-    return add(index, request, entry);
-  }
-
-  private PendingRequest add(long index, RaftClientRequest request,
-      TransactionContext entry) {
-    final PendingRequest pending = new PendingRequest(index, request, entry);
-    pendingRequests.put(index, pending);
-    last = pending;
-    return pending;
-  }
-
-  PendingRequest addConfRequest(SetConfigurationRequest request) {
-    Preconditions.checkState(pendingSetConf == null);
-    pendingSetConf = new PendingRequest(request);
-    return pendingSetConf;
-  }
-
-  void replySetConfiguration() {
-    // we allow the pendingRequest to be null in case that the new leader
-    // commits the new configuration while it has not received the retry
-    // request from the client
-    if (pendingSetConf != null) {
-      // for setConfiguration we do not need to wait for statemachine. send 
back
-      // reply after it's committed.
-      pendingSetConf.setSuccessReply(null);
-      pendingSetConf = null;
-    }
-  }
-
-  void failSetConfiguration(RaftException e) {
-    Preconditions.checkState(pendingSetConf != null);
-    pendingSetConf.setException(e);
-    pendingSetConf = null;
-  }
-
-  TransactionContext getTransactionContext(long index) {
-    PendingRequest pendingRequest = pendingRequests.get(index);
-    // it is possible that the pendingRequest is null if this peer just becomes
-    // the new leader and commits transactions received by the previous leader
-    return pendingRequest != null ? pendingRequest.getEntry() : null;
-  }
-
-  void replyPendingRequest(long index, CompletableFuture<Message> 
messageFuture) {
-    final PendingRequest pending = pendingRequests.get(index);
-    if (pending != null) {
-      Preconditions.checkState(pending.getIndex() == index);
-
-      messageFuture.whenComplete((reply, exception) -> {
-        if (exception == null) {
-          pending.setSuccessReply(reply);
-        } else {
-          pending.setException(exception);
-        }
-      });
-    }
-  }
-
-  /**
-   * The leader state is stopped. Send NotLeaderException to all the pending
-   * requests since they have not got applied to the state machine yet.
-   */
-  void sendNotLeaderResponses() throws IOException {
-    LOG.info("{} sends responses before shutting down PendingRequestsHandler",
-        server.getId());
-
-    Collection<TransactionContext> pendingEntries = 
pendingRequests.values().stream()
-        .map(PendingRequest::getEntry).collect(Collectors.toList());
-    // notify the state machine about stepping down
-    server.getStateMachine().notifyNotLeader(pendingEntries);
-    pendingRequests.values().forEach(this::setNotLeaderException);
-    if (pendingSetConf != null) {
-      setNotLeaderException(pendingSetConf);
-    }
-  }
-
-  private void setNotLeaderException(PendingRequest pending) {
-    RaftClientReply reply = new RaftClientReply(pending.getRequest(),
-        server.generateNotLeaderException());
-    pending.setReply(reply);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/impl/RaftConfiguration.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/impl/RaftConfiguration.java 
b/raft-server/src/main/java/org/apache/raft/server/impl/RaftConfiguration.java
deleted file mode 100644
index 4879314..0000000
--- 
a/raft-server/src/main/java/org/apache/raft/server/impl/RaftConfiguration.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.impl;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.raft.protocol.RaftPeer;
-
-import java.util.*;
-import java.util.concurrent.ThreadLocalRandom;
-
-/**
- * The configuration of the raft cluster.
- *
- * The configuration is stable if there is no on-going peer change. Otherwise,
- * the configuration is transitional, i.e. in the middle of a peer change.
- *
- * The objects of this class are immutable.
- */
-public class RaftConfiguration {
-  /** Create a {@link Builder}. */
-  public static Builder newBuilder() {
-    return new Builder();
-  }
-
-  /** To build {@link RaftConfiguration} objects. */
-  public static class Builder {
-    private PeerConfiguration oldConf;
-    private PeerConfiguration conf;
-    private long logEntryIndex = RaftServerConstants.INVALID_LOG_INDEX;
-
-    private boolean forceStable = false;
-    private boolean forceTransitional = false;
-
-    private Builder() {}
-
-    public Builder setConf(PeerConfiguration conf) {
-      Preconditions.checkNotNull(conf);
-      Preconditions.checkState(this.conf == null, "conf is already set.");
-      this.conf = conf;
-      return this;
-    }
-
-    public Builder setConf(Iterable<RaftPeer> peers) {
-      return setConf(new PeerConfiguration(peers));
-    }
-
-    public Builder setConf(RaftPeer[] peers) {
-      return setConf(Arrays.asList(peers));
-    }
-
-    Builder setConf(RaftConfiguration transitionalConf) {
-      Preconditions.checkNotNull(transitionalConf);
-      Preconditions.checkState(transitionalConf.isTransitional());
-
-      Preconditions.checkState(!forceTransitional);
-      forceStable = true;
-      return setConf(transitionalConf.conf);
-    }
-
-
-    public Builder setOldConf(PeerConfiguration oldConf) {
-      Preconditions.checkNotNull(oldConf);
-      Preconditions.checkState(this.oldConf == null, "oldConf is already 
set.");
-      this.oldConf = oldConf;
-      return this;
-    }
-
-    public Builder setOldConf(Iterable<RaftPeer> oldPeers) {
-      return setOldConf(new PeerConfiguration(oldPeers));
-    }
-
-    public Builder setOldConf(RaftPeer[] oldPeers) {
-      return setOldConf(Arrays.asList(oldPeers));
-    }
-
-    Builder setOldConf(RaftConfiguration stableConf) {
-      Preconditions.checkNotNull(stableConf);
-      Preconditions.checkState(stableConf.isStable());
-
-      Preconditions.checkState(!forceStable);
-      forceTransitional = true;
-      return setOldConf(stableConf.conf);
-    }
-
-    public Builder setLogEntryIndex(long logEntryIndex) {
-      Preconditions.checkArgument(
-          logEntryIndex != RaftServerConstants.INVALID_LOG_INDEX);
-      Preconditions.checkState(
-          this.logEntryIndex == RaftServerConstants.INVALID_LOG_INDEX,
-          "logEntryIndex is already set.");
-      this.logEntryIndex = logEntryIndex;
-      return this;
-    }
-
-    /** Build a {@link RaftConfiguration}. */
-    public RaftConfiguration build() {
-      if (forceTransitional) {
-        Preconditions.checkState(oldConf != null);
-      }
-      if (forceStable) {
-        Preconditions.checkState(oldConf == null);
-      }
-      return new RaftConfiguration(conf, oldConf, logEntryIndex);
-    }
-  }
-
-  /** Non-null only if this configuration is transitional. */
-  private final PeerConfiguration oldConf;
-  /**
-   * The current peer configuration while this configuration is stable;
-   * or the new peer configuration while this configuration is transitional.
-   */
-  private final PeerConfiguration conf;
-
-  /** The index of the corresponding log entry for this configuration. */
-  private final long logEntryIndex;
-
-  private RaftConfiguration(PeerConfiguration conf, PeerConfiguration oldConf,
-      long logEntryIndex) {
-    Preconditions.checkNotNull(conf);
-    this.conf = conf;
-    this.oldConf = oldConf;
-    this.logEntryIndex = logEntryIndex;
-  }
-
-  /** Is this configuration transitional, i.e. in the middle of a peer change? 
*/
-  boolean isTransitional() {
-    return oldConf != null;
-  }
-
-  /** Is this configuration stable, i.e. no on-going peer change? */
-  boolean isStable() {
-    return oldConf == null;
-  }
-
-  boolean containsInConf(String peerId) {
-    return conf.contains(peerId);
-  }
-
-  boolean containsInOldConf(String peerId) {
-    return oldConf != null && oldConf.contains(peerId);
-  }
-
-  boolean contains(String peerId) {
-    return containsInConf(peerId) && (oldConf == null || 
containsInOldConf(peerId));
-  }
-
-  /**
-   * @return the peer corresponding to the given id;
-   *         or return null if the peer is not in this configuration.
-   */
-  public RaftPeer getPeer(String id) {
-    if (id == null) {
-      return null;
-    }
-    RaftPeer peer = conf.getPeer(id);
-    if (peer != null) {
-      return peer;
-    } else if (oldConf != null) {
-      return oldConf.getPeer(id);
-    }
-    return null;
-  }
-
-  /** @return all the peers from the conf, and the old conf if it exists. */
-  public Collection<RaftPeer> getPeers() {
-    final Collection<RaftPeer> peers = new ArrayList<>(conf.getPeers());
-    if (oldConf != null) {
-      oldConf.getPeers().stream().filter(p -> !peers.contains(p))
-          .forEach(peers::add);
-    }
-    return peers;
-  }
-
-  /**
-   * @return all the peers other than the given self id from the conf,
-   *         and the old conf if it exists.
-   */
-  public Collection<RaftPeer> getOtherPeers(String selfId) {
-    Collection<RaftPeer> others = conf.getOtherPeers(selfId);
-    if (oldConf != null) {
-      oldConf.getOtherPeers(selfId).stream()
-          .filter(p -> !others.contains(p))
-          .forEach(others::add);
-    }
-    return others;
-  }
-
-  /** @return true if the self id together with the others are in the 
majority. */
-  boolean hasMajority(Collection<String> others, String selfId) {
-    Preconditions.checkArgument(!others.contains(selfId));
-    return conf.hasMajority(others, selfId) &&
-        (oldConf == null || oldConf.hasMajority(others, selfId));
-  }
-
-  @Override
-  public String toString() {
-    return conf + (oldConf != null ? "old:" + oldConf : "");
-  }
-
-  @VisibleForTesting
-  boolean hasNoChange(RaftPeer[] newMembers) {
-    if (!isStable() || conf.size() != newMembers.length) {
-      return false;
-    }
-    for (RaftPeer peer : newMembers) {
-      if (!conf.contains(peer.getId())) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  long getLogEntryIndex() {
-    return logEntryIndex;
-  }
-
-  static Collection<RaftPeer> computeNewPeers(RaftPeer[] newMembers,
-      RaftConfiguration old) {
-    List<RaftPeer> peers = new ArrayList<>();
-    for (RaftPeer p : newMembers) {
-      if (!old.containsInConf(p.getId())) {
-        peers.add(p);
-      }
-    }
-    return peers;
-  }
-
-  RaftPeer getRandomPeer(String exclusiveId) {
-    final List<RaftPeer> peers = conf.getOtherPeers(exclusiveId);
-    if (peers.isEmpty()) {
-      return null;
-    }
-    final int index = ThreadLocalRandom.current().nextInt(peers.size());
-    return peers.get(index);
-  }
-
-  Collection<RaftPeer> getPeersInOldConf() {
-    return oldConf != null ? oldConf.getPeers() : Collections.emptyList();
-  }
-
-  Collection<RaftPeer> getPeersInConf() {
-    return conf.getPeers();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerConstants.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerConstants.java
 
b/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerConstants.java
deleted file mode 100644
index 6634152..0000000
--- 
a/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerConstants.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.impl;
-
-import org.apache.raft.client.RaftClient;
-
-public interface RaftServerConstants {
-  long INVALID_LOG_INDEX = -1;
-  byte LOG_TERMINATE_BYTE = 0;
-  long DEFAULT_SEQNUM = RaftClient.DEFAULT_SEQNUM;
-
-  enum StartupOption {
-    FORMAT("format"),
-    REGULAR("regular");
-
-    private final String option;
-
-    StartupOption(String arg) {
-      this.option = arg;
-    }
-
-    public static StartupOption getOption(String arg) {
-      for (StartupOption s : StartupOption.values()) {
-        if (s.option.equals(arg)) {
-          return s;
-        }
-      }
-      return REGULAR;
-    }
-  }
-}

Reply via email to