http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/impl/PendingRequests.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/impl/PendingRequests.java 
b/raft-server/src/main/java/org/apache/raft/server/impl/PendingRequests.java
new file mode 100644
index 0000000..ad6ecef
--- /dev/null
+++ b/raft-server/src/main/java/org/apache/raft/server/impl/PendingRequests.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.raft.server.impl;
+
+import com.google.common.base.Preconditions;
+import org.apache.raft.protocol.*;
+import org.apache.raft.statemachine.TransactionContext;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+
+class PendingRequests {
+  private static final Logger LOG = RaftServer.LOG;
+
+  private PendingRequest pendingSetConf;
+  private final RaftServer server;
+  private final ConcurrentMap<Long, PendingRequest> pendingRequests = new 
ConcurrentHashMap<>();
+  private PendingRequest last = null;
+
+  PendingRequests(RaftServer server) {
+    this.server = server;
+  }
+
+  PendingRequest addPendingRequest(long index, RaftClientRequest request,
+      TransactionContext entry) {
+    // externally synced for now
+    Preconditions.checkArgument(!request.isReadOnly());
+    Preconditions.checkState(last == null || index == last.getIndex() + 1);
+    return add(index, request, entry);
+  }
+
+  private PendingRequest add(long index, RaftClientRequest request,
+      TransactionContext entry) {
+    final PendingRequest pending = new PendingRequest(index, request, entry);
+    pendingRequests.put(index, pending);
+    last = pending;
+    return pending;
+  }
+
+  PendingRequest addConfRequest(SetConfigurationRequest request) {
+    Preconditions.checkState(pendingSetConf == null);
+    pendingSetConf = new PendingRequest(request);
+    return pendingSetConf;
+  }
+
+  void replySetConfiguration() {
+    // we allow the pendingRequest to be null in case that the new leader
+    // commits the new configuration while it has not received the retry
+    // request from the client
+    if (pendingSetConf != null) {
+      // for setConfiguration we do not need to wait for statemachine. send 
back
+      // reply after it's committed.
+      pendingSetConf.setSuccessReply(null);
+      pendingSetConf = null;
+    }
+  }
+
+  void failSetConfiguration(RaftException e) {
+    Preconditions.checkState(pendingSetConf != null);
+    pendingSetConf.setException(e);
+    pendingSetConf = null;
+  }
+
+  TransactionContext getTransactionContext(long index) {
+    PendingRequest pendingRequest = pendingRequests.get(index);
+    // it is possible that the pendingRequest is null if this peer just becomes
+    // the new leader and commits transactions received by the previous leader
+    return pendingRequest != null ? pendingRequest.getEntry() : null;
+  }
+
+  void replyPendingRequest(long index, CompletableFuture<Message> 
messageFuture) {
+    final PendingRequest pending = pendingRequests.get(index);
+    if (pending != null) {
+      Preconditions.checkState(pending.getIndex() == index);
+
+      messageFuture.whenComplete((reply, exception) -> {
+        if (exception == null) {
+          pending.setSuccessReply(reply);
+        } else {
+          pending.setException(exception);
+        }
+      });
+    }
+  }
+
+  /**
+   * The leader state is stopped. Send NotLeaderException to all the pending
+   * requests since they have not got applied to the state machine yet.
+   */
+  void sendNotLeaderResponses() throws IOException {
+    LOG.info("{} sends responses before shutting down PendingRequestsHandler",
+        server.getId());
+
+    Collection<TransactionContext> pendingEntries = 
pendingRequests.values().stream()
+        .map(PendingRequest::getEntry).collect(Collectors.toList());
+    // notify the state machine about stepping down
+    server.getStateMachine().notifyNotLeader(pendingEntries);
+    pendingRequests.values().forEach(this::setNotLeaderException);
+    if (pendingSetConf != null) {
+      setNotLeaderException(pendingSetConf);
+    }
+  }
+
+  private void setNotLeaderException(PendingRequest pending) {
+    RaftClientReply reply = new RaftClientReply(pending.getRequest(),
+        server.generateNotLeaderException());
+    pending.setReply(reply);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/impl/RaftConfiguration.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/impl/RaftConfiguration.java 
b/raft-server/src/main/java/org/apache/raft/server/impl/RaftConfiguration.java
new file mode 100644
index 0000000..28ff330
--- /dev/null
+++ 
b/raft-server/src/main/java/org/apache/raft/server/impl/RaftConfiguration.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.raft.server.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.raft.protocol.RaftPeer;
+
+import java.util.*;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * The configuration of the raft cluster.
+ *
+ * The configuration is stable if there is no on-going peer change. Otherwise,
+ * the configuration is transitional, i.e. in the middle of a peer change.
+ *
+ * The objects of this class are immutable.
+ */
+public class RaftConfiguration {
+  /** Create a {@link Builder}. */
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  /** To build {@link RaftConfiguration} objects. */
+  public static class Builder {
+    private PeerConfiguration oldConf;
+    private PeerConfiguration conf;
+    private long logEntryIndex = RaftServerConstants.INVALID_LOG_INDEX;
+
+    private boolean forceStable = false;
+    private boolean forceTransitional = false;
+
+    private Builder() {}
+
+    public Builder setConf(PeerConfiguration conf) {
+      Preconditions.checkNotNull(conf);
+      Preconditions.checkState(this.conf == null, "conf is already set.");
+      this.conf = conf;
+      return this;
+    }
+
+    public Builder setConf(Iterable<RaftPeer> peers) {
+      return setConf(new PeerConfiguration(peers));
+    }
+
+    public Builder setConf(RaftPeer[] peers) {
+      return setConf(Arrays.asList(peers));
+    }
+
+    Builder setConf(RaftConfiguration transitionalConf) {
+      Preconditions.checkNotNull(transitionalConf);
+      Preconditions.checkState(transitionalConf.isTransitional());
+
+      Preconditions.checkState(!forceTransitional);
+      forceStable = true;
+      return setConf(transitionalConf.conf);
+    }
+
+
+    public Builder setOldConf(PeerConfiguration oldConf) {
+      Preconditions.checkNotNull(oldConf);
+      Preconditions.checkState(this.oldConf == null, "oldConf is already 
set.");
+      this.oldConf = oldConf;
+      return this;
+    }
+
+    public Builder setOldConf(Iterable<RaftPeer> oldPeers) {
+      return setOldConf(new PeerConfiguration(oldPeers));
+    }
+
+    public Builder setOldConf(RaftPeer[] oldPeers) {
+      return setOldConf(Arrays.asList(oldPeers));
+    }
+
+    Builder setOldConf(RaftConfiguration stableConf) {
+      Preconditions.checkNotNull(stableConf);
+      Preconditions.checkState(stableConf.isStable());
+
+      Preconditions.checkState(!forceStable);
+      forceTransitional = true;
+      return setOldConf(stableConf.conf);
+    }
+
+    public Builder setLogEntryIndex(long logEntryIndex) {
+      Preconditions.checkArgument(
+          logEntryIndex != RaftServerConstants.INVALID_LOG_INDEX);
+      Preconditions.checkState(
+          this.logEntryIndex == RaftServerConstants.INVALID_LOG_INDEX,
+          "logEntryIndex is already set.");
+      this.logEntryIndex = logEntryIndex;
+      return this;
+    }
+
+    /** Build a {@link RaftConfiguration}. */
+    public RaftConfiguration build() {
+      if (forceTransitional) {
+        Preconditions.checkState(oldConf != null);
+      }
+      if (forceStable) {
+        Preconditions.checkState(oldConf == null);
+      }
+      return new RaftConfiguration(conf, oldConf, logEntryIndex);
+    }
+  }
+
+  /** Non-null only if this configuration is transitional. */
+  private final PeerConfiguration oldConf;
+  /**
+   * The current peer configuration while this configuration is stable;
+   * or the new peer configuration while this configuration is transitional.
+   */
+  private final PeerConfiguration conf;
+
+  /** The index of the corresponding log entry for this configuration. */
+  private final long logEntryIndex;
+
+  private RaftConfiguration(PeerConfiguration conf, PeerConfiguration oldConf,
+      long logEntryIndex) {
+    Preconditions.checkNotNull(conf);
+    this.conf = conf;
+    this.oldConf = oldConf;
+    this.logEntryIndex = logEntryIndex;
+  }
+
+  /** Is this configuration transitional, i.e. in the middle of a peer change? 
*/
+  public boolean isTransitional() {
+    return oldConf != null;
+  }
+
+  /** Is this configuration stable, i.e. no on-going peer change? */
+  public boolean isStable() {
+    return oldConf == null;
+  }
+
+  boolean containsInConf(String peerId) {
+    return conf.contains(peerId);
+  }
+
+  boolean containsInOldConf(String peerId) {
+    return oldConf != null && oldConf.contains(peerId);
+  }
+
+  public boolean contains(String peerId) {
+    return containsInConf(peerId) && (oldConf == null || 
containsInOldConf(peerId));
+  }
+
+  /**
+   * @return the peer corresponding to the given id;
+   *         or return null if the peer is not in this configuration.
+   */
+  public RaftPeer getPeer(String id) {
+    if (id == null) {
+      return null;
+    }
+    RaftPeer peer = conf.getPeer(id);
+    if (peer != null) {
+      return peer;
+    } else if (oldConf != null) {
+      return oldConf.getPeer(id);
+    }
+    return null;
+  }
+
+  /** @return all the peers from the conf, and the old conf if it exists. */
+  public Collection<RaftPeer> getPeers() {
+    final Collection<RaftPeer> peers = new ArrayList<>(conf.getPeers());
+    if (oldConf != null) {
+      oldConf.getPeers().stream().filter(p -> !peers.contains(p))
+          .forEach(peers::add);
+    }
+    return peers;
+  }
+
+  /**
+   * @return all the peers other than the given self id from the conf,
+   *         and the old conf if it exists.
+   */
+  public Collection<RaftPeer> getOtherPeers(String selfId) {
+    Collection<RaftPeer> others = conf.getOtherPeers(selfId);
+    if (oldConf != null) {
+      oldConf.getOtherPeers(selfId).stream()
+          .filter(p -> !others.contains(p))
+          .forEach(others::add);
+    }
+    return others;
+  }
+
+  /** @return true if the self id together with the others are in the 
majority. */
+  public boolean hasMajority(Collection<String> others, String selfId) {
+    Preconditions.checkArgument(!others.contains(selfId));
+    return conf.hasMajority(others, selfId) &&
+        (oldConf == null || oldConf.hasMajority(others, selfId));
+  }
+
+  @Override
+  public String toString() {
+    return conf + (oldConf != null ? "old:" + oldConf : "");
+  }
+
+  @VisibleForTesting
+  boolean hasNoChange(RaftPeer[] newMembers) {
+    if (!isStable() || conf.size() != newMembers.length) {
+      return false;
+    }
+    for (RaftPeer peer : newMembers) {
+      if (!conf.contains(peer.getId())) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  long getLogEntryIndex() {
+    return logEntryIndex;
+  }
+
+  static Collection<RaftPeer> computeNewPeers(RaftPeer[] newMembers,
+      RaftConfiguration old) {
+    List<RaftPeer> peers = new ArrayList<>();
+    for (RaftPeer p : newMembers) {
+      if (!old.containsInConf(p.getId())) {
+        peers.add(p);
+      }
+    }
+    return peers;
+  }
+
+  RaftPeer getRandomPeer(String exclusiveId) {
+    final List<RaftPeer> peers = conf.getOtherPeers(exclusiveId);
+    if (peers.isEmpty()) {
+      return null;
+    }
+    final int index = ThreadLocalRandom.current().nextInt(peers.size());
+    return peers.get(index);
+  }
+
+  public Collection<RaftPeer> getPeersInOldConf() {
+    return oldConf != null ? oldConf.getPeers() : Collections.emptyList();
+  }
+
+  public Collection<RaftPeer> getPeersInConf() {
+    return conf.getPeers();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/impl/RaftServer.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/impl/RaftServer.java 
b/raft-server/src/main/java/org/apache/raft/server/impl/RaftServer.java
new file mode 100644
index 0000000..c1bf4a9
--- /dev/null
+++ b/raft-server/src/main/java/org/apache/raft/server/impl/RaftServer.java
@@ -0,0 +1,749 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.raft.server.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.raft.conf.RaftProperties;
+import org.apache.raft.protocol.*;
+import org.apache.raft.server.RaftServerConfigKeys;
+import org.apache.raft.server.protocol.RaftServerProtocol;
+import org.apache.raft.server.protocol.TermIndex;
+import org.apache.raft.server.storage.FileInfo;
+import org.apache.raft.shaded.proto.RaftProtos.*;
+import org.apache.raft.statemachine.SnapshotInfo;
+import org.apache.raft.statemachine.StateMachine;
+import org.apache.raft.statemachine.TransactionContext;
+import org.apache.raft.util.CodeInjectionForTesting;
+import org.apache.raft.util.LifeCycle;
+import org.apache.raft.util.ProtoUtils;
+import org.apache.raft.util.RaftUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.OptionalLong;
+import java.util.concurrent.CompletableFuture;
+
+import static 
org.apache.raft.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.*;
+import static org.apache.raft.util.LifeCycle.State.*;
+
+public class RaftServer implements RaftServerProtocol, Closeable {
+  public static final Logger LOG = LoggerFactory.getLogger(RaftServer.class);
+
+  private static final String CLASS_NAME = RaftServer.class.getSimpleName();
+  static final String REQUEST_VOTE = CLASS_NAME + ".requestVote";
+  static final String APPEND_ENTRIES = CLASS_NAME + ".appendEntries";
+  static final String INSTALL_SNAPSHOT = CLASS_NAME + ".installSnapshot";
+
+
+  private final int minTimeoutMs;
+  private final int maxTimeoutMs;
+
+  private final LifeCycle lifeCycle;
+  private final ServerState state;
+  private final StateMachine stateMachine;
+  private final RaftProperties properties;
+  private volatile Role role;
+
+  /** used when the peer is follower, to monitor election timeout */
+  private volatile FollowerState heartbeatMonitor;
+
+  /** used when the peer is candidate, to request votes from other peers */
+  private volatile LeaderElection electionDaemon;
+
+  /** used when the peer is leader */
+  private volatile LeaderState leaderState;
+
+  private RaftServerRpc serverRpc;
+
+  private final LogAppenderFactory appenderFactory;
+
+  public RaftServer(String id, RaftConfiguration raftConf,
+      RaftProperties properties, StateMachine stateMachine) throws IOException 
{
+    this.lifeCycle = new LifeCycle(id);
+    minTimeoutMs = properties.getInt(
+        RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_KEY,
+        RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT);
+    maxTimeoutMs = properties.getInt(
+        RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_KEY,
+        RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_DEFAULT);
+    Preconditions.checkArgument(maxTimeoutMs > minTimeoutMs,
+        "max timeout: %s, min timeout: %s", maxTimeoutMs, minTimeoutMs);
+    this.properties = properties;
+    this.stateMachine = stateMachine;
+    this.state = new ServerState(id, raftConf, properties, this, stateMachine);
+    appenderFactory = initAppenderFactory();
+  }
+
+  public int getMinTimeoutMs() {
+    return minTimeoutMs;
+  }
+
+  public int getMaxTimeoutMs() {
+    return maxTimeoutMs;
+  }
+
+  public int getRandomTimeoutMs() {
+    return RaftUtils.getRandomBetween(minTimeoutMs, maxTimeoutMs);
+  }
+
+  public StateMachine getStateMachine() {
+    return this.stateMachine;
+  }
+
+  public LogAppenderFactory getLogAppenderFactory() {
+    return appenderFactory;
+  }
+
+  private LogAppenderFactory initAppenderFactory() {
+    Class<? extends LogAppenderFactory> factoryClass = properties.getClass(
+        RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY,
+        RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_DEFAULT,
+        LogAppenderFactory.class);
+    return RaftUtils.newInstance(factoryClass);
+  }
+
+  /**
+   * Used by tests to set initial raft configuration with correct port 
bindings.
+   */
+  @VisibleForTesting
+  public void setInitialConf(RaftConfiguration conf) {
+    this.state.setInitialConf(conf);
+  }
+
+  public void setServerRpc(RaftServerRpc serverRpc) {
+    this.serverRpc = serverRpc;
+    // add peers into rpc service
+    RaftConfiguration conf = getRaftConf();
+    if (conf != null) {
+      addPeersToRPC(conf.getPeers());
+    }
+  }
+
+  public RaftServerRpc getServerRpc() {
+    return serverRpc;
+  }
+
+  public void start() {
+    lifeCycle.transition(STARTING);
+    state.start();
+    RaftConfiguration conf = getRaftConf();
+    if (conf != null && conf.contains(getId())) {
+      LOG.debug("{} starts as a follower", getId());
+      startAsFollower();
+    } else {
+      LOG.debug("{} starts with initializing state", getId());
+      startInitializing();
+    }
+  }
+
+  /**
+   * The peer belongs to the current configuration, should start as a follower
+   */
+  private void startAsFollower() {
+    role = Role.FOLLOWER;
+    heartbeatMonitor = new FollowerState(this);
+    heartbeatMonitor.start();
+
+    serverRpc.start();
+    lifeCycle.transition(RUNNING);
+  }
+
+  /**
+   * The peer does not have any configuration (maybe it will later be included
+   * in some configuration). Start still as a follower but will not vote or
+   * start election.
+   */
+  private void startInitializing() {
+    role = Role.FOLLOWER;
+    // do not start heartbeatMonitoring
+    serverRpc.start();
+  }
+
+  public ServerState getState() {
+    return this.state;
+  }
+
+  public String getId() {
+    return getState().getSelfId();
+  }
+
+  public RaftConfiguration getRaftConf() {
+    return getState().getRaftConf();
+  }
+
+  @Override
+  public void close() {
+    lifeCycle.checkStateAndClose(() -> {
+      try {
+        shutdownHeartbeatMonitor();
+        shutdownElectionDaemon();
+        shutdownLeaderState();
+
+        serverRpc.shutdown();
+        state.close();
+      } catch (Exception ignored) {
+        LOG.warn("Failed to kill " + state.getSelfId(), ignored);
+      }
+    });
+  }
+
+  public boolean isAlive() {
+    return !lifeCycle.getCurrentState().isOneOf(CLOSING, CLOSED);
+  }
+
+  public boolean isFollower() {
+    return role == Role.FOLLOWER;
+  }
+
+  public boolean isCandidate() {
+    return role == Role.CANDIDATE;
+  }
+
+  public boolean isLeader() {
+    return role == Role.LEADER;
+  }
+
+  Role getRole() {
+    return role;
+  }
+
+  /**
+   * Change the server state to Follower if necessary
+   * @param newTerm The new term.
+   * @param sync We will call {@link ServerState#persistMetadata()} if this is
+   *             set to true and term/votedFor get updated.
+   * @return if the term/votedFor should be updated to the new term
+   * @throws IOException if term/votedFor persistence failed.
+   */
+  synchronized boolean changeToFollower(long newTerm, boolean sync)
+      throws IOException {
+    final Role old = role;
+    role = Role.FOLLOWER;
+
+    boolean metadataUpdated = false;
+    if (newTerm > state.getCurrentTerm()) {
+      state.setCurrentTerm(newTerm);
+      state.resetLeaderAndVotedFor();
+      metadataUpdated = true;
+    }
+
+    if (old == Role.LEADER) {
+      assert leaderState != null;
+      shutdownLeaderState();
+    } else if (old == Role.CANDIDATE) {
+      shutdownElectionDaemon();
+    }
+
+    if (old != Role.FOLLOWER) {
+      heartbeatMonitor = new FollowerState(this);
+      heartbeatMonitor.start();
+    }
+
+    if (metadataUpdated && sync) {
+      state.persistMetadata();
+    }
+    return metadataUpdated;
+  }
+
+  private synchronized void shutdownLeaderState() {
+    final LeaderState leader = leaderState;
+    if (leader != null) {
+      leader.stop();
+    }
+    leaderState = null;
+    // TODO: make sure that StateMachineUpdater has applied all transactions 
that have context
+  }
+
+  private void shutdownElectionDaemon() {
+    final LeaderElection election = electionDaemon;
+    if (election != null) {
+      election.stopRunning();
+      // no need to interrupt the election thread
+    }
+    electionDaemon = null;
+  }
+
+  synchronized void changeToLeader() {
+    Preconditions.checkState(isCandidate());
+    shutdownElectionDaemon();
+    role = Role.LEADER;
+    state.becomeLeader();
+    // start sending AppendEntries RPC to followers
+    leaderState = new LeaderState(this, properties);
+    leaderState.start();
+  }
+
+  private void shutdownHeartbeatMonitor() {
+    final FollowerState hm = heartbeatMonitor;
+    if (hm != null) {
+      hm.stopRunning();
+      hm.interrupt();
+    }
+    heartbeatMonitor = null;
+  }
+
+  synchronized void changeToCandidate() {
+    Preconditions.checkState(isFollower());
+    shutdownHeartbeatMonitor();
+    role = Role.CANDIDATE;
+    // start election
+    electionDaemon = new LeaderElection(this);
+    electionDaemon.start();
+  }
+
+  @Override
+  public String toString() {
+    return role + " " + state + " " + lifeCycle.getCurrentState();
+  }
+
+  /**
+   * @return null if the server is in leader state.
+   */
+  CompletableFuture<RaftClientReply> checkLeaderState(
+      RaftClientRequest request) {
+    if (!isLeader()) {
+      NotLeaderException exception = generateNotLeaderException();
+      CompletableFuture<RaftClientReply> future = new CompletableFuture<>();
+      future.complete(new RaftClientReply(request, exception));
+      return future;
+    }
+    return null;
+  }
+
+  NotLeaderException generateNotLeaderException() {
+    if (lifeCycle.getCurrentState() != RUNNING) {
+      return new NotLeaderException(getId(), null, null);
+    }
+    String leaderId = state.getLeaderId();
+    if (leaderId == null || leaderId.equals(state.getSelfId())) {
+      // No idea about who is the current leader. Or the peer is the current
+      // leader, but it is about to step down
+      RaftPeer suggestedLeader = state.getRaftConf()
+          .getRandomPeer(state.getSelfId());
+      leaderId = suggestedLeader == null ? null : suggestedLeader.getId();
+    }
+    RaftConfiguration conf = getRaftConf();
+    Collection<RaftPeer> peers = conf.getPeers();
+    return new NotLeaderException(getId(), conf.getPeer(leaderId),
+        peers.toArray(new RaftPeer[peers.size()]));
+  }
+
+  /**
+   * Handle a normal update request from client.
+   */
+  public CompletableFuture<RaftClientReply> appendTransaction(
+      RaftClientRequest request, TransactionContext entry)
+      throws RaftException {
+    LOG.debug("{}: receive client request({})", getId(), request);
+    lifeCycle.assertCurrentState(RUNNING);
+    CompletableFuture<RaftClientReply> reply;
+
+    final PendingRequest pending;
+    synchronized (this) {
+      reply = checkLeaderState(request);
+      if (reply != null) {
+        return reply;
+      }
+
+      // append the message to its local log
+      final long entryIndex;
+      try {
+        entryIndex = state.applyLog(entry);
+      } catch (IOException e) {
+        throw new RaftException(e);
+      }
+
+      // put the request into the pending queue
+      pending = leaderState.addPendingRequest(entryIndex, request, entry);
+      leaderState.notifySenders();
+    }
+    return pending.getFuture();
+  }
+
+  /**
+   * Handle a raft configuration change request from client.
+   */
+  public CompletableFuture<RaftClientReply> setConfiguration(
+      SetConfigurationRequest request) throws IOException {
+    LOG.debug("{}: receive setConfiguration({})", getId(), request);
+    lifeCycle.assertCurrentState(RUNNING);
+    CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
+    if (reply != null) {
+      return reply;
+    }
+
+    final RaftPeer[] peersInNewConf = request.getPeersInNewConf();
+    final PendingRequest pending;
+    synchronized (this) {
+      reply = checkLeaderState(request);
+      if (reply != null) {
+        return reply;
+      }
+
+      final RaftConfiguration current = getRaftConf();
+      // make sure there is no other raft reconfiguration in progress
+      if (!current.isStable() || leaderState.inStagingState() ||
+          !state.isCurrentConfCommitted()) {
+        throw new ReconfigurationInProgressException(
+            "Reconfiguration is already in progress: " + current);
+      }
+
+      // return true if the new configuration is the same with the current one
+      if (current.hasNoChange(peersInNewConf)) {
+        pending = leaderState.returnNoConfChange(request);
+        return pending.getFuture();
+      }
+
+      // add new peers into the rpc service
+      addPeersToRPC(Arrays.asList(peersInNewConf));
+      // add staging state into the leaderState
+      pending = leaderState.startSetConfiguration(request);
+    }
+    return pending.getFuture();
+  }
+
+  private boolean shouldWithholdVotes() {
+    return isLeader() || (isFollower() && state.hasLeader()
+        && heartbeatMonitor.shouldWithholdVotes());
+  }
+
+  /**
+   * check if the remote peer is not included in the current conf
+   * and should shutdown. should shutdown if all the following stands:
+   * 1. this is a leader
+   * 2. current conf is stable and has been committed
+   * 3. candidate id is not included in conf
+   * 4. candidate's last entry's index < conf's index
+   */
+  private boolean shouldSendShutdown(String candidateId,
+      TermIndex candidateLastEntry) {
+    return isLeader()
+        && getRaftConf().isStable()
+        && getState().isConfCommitted()
+        && !getRaftConf().containsInConf(candidateId)
+        && candidateLastEntry.getIndex() < getRaftConf().getLogEntryIndex()
+        && !leaderState.isBootStrappingPeer(candidateId);
+  }
+
+  @Override
+  public RequestVoteReplyProto requestVote(RequestVoteRequestProto r)
+      throws IOException {
+    final String candidateId = r.getServerRequest().getRequestorId();
+    return requestVote(candidateId, r.getCandidateTerm(),
+        ServerProtoUtils.toTermIndex(r.getCandidateLastEntry()));
+  }
+
+  private RequestVoteReplyProto requestVote(String candidateId,
+      long candidateTerm, TermIndex candidateLastEntry) throws IOException {
+    CodeInjectionForTesting.execute(REQUEST_VOTE, getId(),
+        candidateId, candidateTerm, candidateLastEntry);
+    LOG.debug("{}: receive requestVote({}, {}, {})",
+        getId(), candidateId, candidateTerm, candidateLastEntry);
+    lifeCycle.assertCurrentState(RUNNING);
+
+    boolean voteGranted = false;
+    boolean shouldShutdown = false;
+    final RequestVoteReplyProto reply;
+    synchronized (this) {
+      if (shouldWithholdVotes()) {
+        LOG.info("{} Withhold vote from server {} with term {}. " +
+            "This server:{}, last rpc time from leader {} is {}", getId(),
+            candidateId, candidateTerm, this, this.getState().getLeaderId(),
+            (isFollower() ? heartbeatMonitor.getLastRpcTime() : -1));
+      } else if (state.recognizeCandidate(candidateId, candidateTerm)) {
+        boolean termUpdated = changeToFollower(candidateTerm, false);
+        // see Section 5.4.1 Election restriction
+        if (state.isLogUpToDate(candidateLastEntry)) {
+          heartbeatMonitor.updateLastRpcTime(false);
+          state.grantVote(candidateId);
+          voteGranted = true;
+        }
+        if (termUpdated || voteGranted) {
+          state.persistMetadata(); // sync metafile
+        }
+      }
+      if (!voteGranted && shouldSendShutdown(candidateId, candidateLastEntry)) 
{
+        shouldShutdown = true;
+      }
+      reply = ServerProtoUtils.toRequestVoteReplyProto(candidateId, getId(),
+          voteGranted, state.getCurrentTerm(), shouldShutdown);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("{} replies to vote request: {}. Peer's state: {}",
+            getId(), ProtoUtils.toString(reply), state);
+      }
+    }
+    return reply;
+  }
+
+  private void validateEntries(long expectedTerm, TermIndex previous,
+      LogEntryProto... entries) {
+    if (entries != null && entries.length > 0) {
+      final long index0 = entries[0].getIndex();
+
+      if (previous == null || previous.getTerm() == 0) {
+        Preconditions.checkArgument(index0 == 0,
+            "Unexpected Index: previous is null but entries[%s].getIndex()=%s",
+            0, index0);
+      } else {
+        Preconditions.checkArgument(previous.getIndex() == index0 - 1,
+            "Unexpected Index: previous is %s but entries[%s].getIndex()=%s",
+            previous, 0, index0);
+      }
+
+      for (int i = 0; i < entries.length; i++) {
+        final long t = entries[i].getTerm();
+        Preconditions.checkArgument(expectedTerm >= t,
+            "Unexpected Term: entries[%s].getTerm()=%s but expectedTerm=%s",
+            i, t, expectedTerm);
+
+        final long indexi = entries[i].getIndex();
+        Preconditions.checkArgument(indexi == index0 + i,
+            "Unexpected Index: entries[%s].getIndex()=%s but 
entries[0].getIndex()=%s",
+            i, indexi, index0);
+      }
+    }
+  }
+
+  @Override
+  public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto r)
+      throws IOException {
+    // TODO avoid converting list to array
+    final LogEntryProto[] entries = r.getEntriesList()
+        .toArray(new LogEntryProto[r.getEntriesCount()]);
+    final TermIndex previous = r.hasPreviousLog() ?
+        ServerProtoUtils.toTermIndex(r.getPreviousLog()) : null;
+    return appendEntries(r.getServerRequest().getRequestorId(),
+        r.getLeaderTerm(), previous, r.getLeaderCommit(), r.getInitializing(),
+        entries);
+  }
+
+  private AppendEntriesReplyProto appendEntries(String leaderId, long 
leaderTerm,
+      TermIndex previous, long leaderCommit, boolean initializing,
+      LogEntryProto... entries) throws IOException {
+    CodeInjectionForTesting.execute(APPEND_ENTRIES, getId(),
+        leaderId, leaderTerm, previous, leaderCommit, initializing, entries);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("{}: receive appendEntries({}, {}, {}, {}, {}, {})", getId(),
+          leaderId, leaderTerm, previous, leaderCommit, initializing,
+          ServerProtoUtils.toString(entries));
+    }
+    lifeCycle.assertCurrentState(STARTING, RUNNING);
+
+    try {
+      validateEntries(leaderTerm, previous, entries);
+    } catch (IllegalArgumentException e) {
+      throw new IOException(e);
+    }
+
+    final long currentTerm;
+    long nextIndex = state.getLog().getNextIndex();
+    synchronized (this) {
+      final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
+      currentTerm = state.getCurrentTerm();
+      if (!recognized) {
+        final AppendEntriesReplyProto reply = 
ServerProtoUtils.toAppendEntriesReplyProto(
+            leaderId, getId(), currentTerm, nextIndex, NOT_LEADER);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("{}: do not recognize leader. Reply: {}",
+              getId(), ProtoUtils.toString(reply));
+        }
+        return reply;
+      }
+      changeToFollower(leaderTerm, true);
+      state.setLeader(leaderId);
+
+      if (!initializing && lifeCycle.compareAndTransition(STARTING, RUNNING)) {
+        heartbeatMonitor = new FollowerState(this);
+        heartbeatMonitor.start();
+      }
+      if (lifeCycle.getCurrentState() == RUNNING) {
+        heartbeatMonitor.updateLastRpcTime(true);
+      }
+
+      // We need to check if "previous" is in the local peer. Note that it is
+      // possible that "previous" is covered by the latest snapshot: e.g.,
+      // it's possible there's no log entries outside of the latest snapshot.
+      // However, it is not possible that "previous" index is smaller than the
+      // last index included in snapshot. This is because indices <= snapshot's
+      // last index should have been committed.
+      if (previous != null && !containPrevious(previous)) {
+        final AppendEntriesReplyProto reply =
+            ServerProtoUtils.toAppendEntriesReplyProto(leaderId, getId(),
+                currentTerm, Math.min(nextIndex, previous.getIndex()), 
INCONSISTENCY);
+        LOG.debug("{}: inconsistency entries. Leader previous:{}, Reply:{}",
+            getId(), previous, ServerProtoUtils.toString(reply));
+        return reply;
+      }
+
+      state.getLog().append(entries);
+      state.updateConfiguration(entries);
+      state.updateStatemachine(leaderCommit, currentTerm);
+    }
+    if (entries != null && entries.length > 0) {
+      try {
+        state.getLog().logSync();
+      } catch (InterruptedException e) {
+        throw new InterruptedIOException("logSync got interrupted");
+      }
+      nextIndex = entries[entries.length - 1].getIndex() + 1;
+    }
+    synchronized (this) {
+      if (lifeCycle.getCurrentState() == RUNNING && isFollower()
+          && getState().getCurrentTerm() == currentTerm) {
+        // reset election timer to avoid punishing the leader for our own
+        // long disk writes
+        heartbeatMonitor.updateLastRpcTime(false);
+      }
+    }
+    final AppendEntriesReplyProto reply = 
ServerProtoUtils.toAppendEntriesReplyProto(
+        leaderId, getId(), currentTerm, nextIndex, SUCCESS);
+    LOG.debug("{}: succeeded to handle AppendEntries. Reply: {}", getId(),
+        ServerProtoUtils.toString(reply));
+    return reply;
+  }
+
+  private boolean containPrevious(TermIndex previous) {
+    LOG.debug("{}: prev:{}, latestSnapshot:{}, getLatestInstalledSnapshot:{}",
+        getId(), previous, state.getLatestSnapshot(), 
state.getLatestInstalledSnapshot());
+    return state.getLog().contains(previous)
+        ||  (state.getLatestSnapshot() != null
+             && state.getLatestSnapshot().getTermIndex().equals(previous))
+        || (state.getLatestInstalledSnapshot() != null)
+             && state.getLatestInstalledSnapshot().equals(previous);
+  }
+
+  @Override
+  public InstallSnapshotReplyProto installSnapshot(
+      InstallSnapshotRequestProto request) throws IOException {
+    final String leaderId = request.getServerRequest().getRequestorId();
+    CodeInjectionForTesting.execute(INSTALL_SNAPSHOT, getId(), leaderId, 
request);
+    LOG.debug("{}: receive installSnapshot({})", getId(), request);
+
+    lifeCycle.assertCurrentState(STARTING, RUNNING);
+
+    final long currentTerm;
+    final long leaderTerm = request.getLeaderTerm();
+    final TermIndex lastTermIndex = ServerProtoUtils.toTermIndex(
+        request.getTermIndex());
+    final long lastIncludedIndex = lastTermIndex.getIndex();
+    synchronized (this) {
+      final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
+      currentTerm = state.getCurrentTerm();
+      if (!recognized) {
+        final InstallSnapshotReplyProto reply = ServerProtoUtils
+            .toInstallSnapshotReplyProto(leaderId, getId(), currentTerm,
+                request.getRequestIndex(), InstallSnapshotResult.NOT_LEADER);
+        LOG.debug("{}: do not recognize leader for installing snapshot." +
+            " Reply: {}", getId(), reply);
+        return reply;
+      }
+      changeToFollower(leaderTerm, true);
+      state.setLeader(leaderId);
+
+      if (lifeCycle.getCurrentState() == RUNNING) {
+        heartbeatMonitor.updateLastRpcTime(true);
+      }
+
+      // Check and append the snapshot chunk. We simply put this in lock
+      // considering a follower peer requiring a snapshot installation does not
+      // have a lot of requests
+      Preconditions.checkState(
+          state.getLog().getNextIndex() <= lastIncludedIndex,
+          "%s log's next id is %s, last included index in snapshot is %s",
+          getId(),  state.getLog().getNextIndex(), lastIncludedIndex);
+
+      //TODO: We should only update State with installed snapshot once the 
request is done.
+      state.installSnapshot(request);
+
+      // update the committed index
+      // re-load the state machine if this is the last chunk
+      if (request.getDone()) {
+        state.reloadStateMachine(lastIncludedIndex, leaderTerm);
+      }
+      if (lifeCycle.getCurrentState() == RUNNING) {
+        heartbeatMonitor.updateLastRpcTime(false);
+      }
+    }
+    if (request.getDone()) {
+      LOG.info("{}: successfully install the whole snapshot-{}", getId(),
+          lastIncludedIndex);
+    }
+    return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getId(),
+        currentTerm, request.getRequestIndex(), InstallSnapshotResult.SUCCESS);
+  }
+
+  AppendEntriesRequestProto createAppendEntriesRequest(long leaderTerm,
+      String targetId, TermIndex previous, List<LogEntryProto> entries,
+      boolean initializing) {
+    return ServerProtoUtils.toAppendEntriesRequestProto(getId(), targetId,
+        leaderTerm, entries, state.getLog().getLastCommittedIndex(),
+        initializing, previous);
+  }
+
+  synchronized InstallSnapshotRequestProto createInstallSnapshotRequest(
+      String targetId, String requestId, int requestIndex, SnapshotInfo 
snapshot,
+      List<FileChunkProto> chunks, boolean done) {
+    OptionalLong totalSize = snapshot.getFiles().stream()
+        .mapToLong(FileInfo::getFileSize).reduce(Long::sum);
+    assert totalSize.isPresent();
+    return ServerProtoUtils.toInstallSnapshotRequestProto(getId(), targetId,
+        requestId, requestIndex, state.getCurrentTerm(), 
snapshot.getTermIndex(),
+        chunks, totalSize.getAsLong(), done);
+  }
+
+  synchronized RequestVoteRequestProto createRequestVoteRequest(String 
targetId,
+      long term, TermIndex lastEntry) {
+    return ServerProtoUtils.toRequestVoteRequestProto(getId(), targetId, term,
+        lastEntry);
+  }
+
+  public synchronized void submitLocalSyncEvent() {
+    if (isLeader() && leaderState != null) {
+      leaderState.submitUpdateStateEvent(LeaderState.UPDATE_COMMIT_EVENT);
+    }
+  }
+
+  public void addPeersToRPC(Iterable<RaftPeer> peers) {
+    serverRpc.addPeers(peers);
+  }
+
+  synchronized void replyPendingRequest(long logIndex,
+      CompletableFuture<Message> message) {
+    if (isLeader() && leaderState != null) { // is leader and is running
+      leaderState.replyPendingRequest(logIndex, message);
+    }
+  }
+
+  TransactionContext getTransactionContext(long index) {
+    if (leaderState != null) { // is leader and is running
+      return leaderState.getTransactionContext(index);
+    }
+    return null;
+  }
+
+  public RaftProperties getProperties() {
+    return this.properties;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerConstants.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerConstants.java
 
b/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerConstants.java
new file mode 100644
index 0000000..6634152
--- /dev/null
+++ 
b/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerConstants.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.raft.server.impl;
+
+import org.apache.raft.client.RaftClient;
+
+public interface RaftServerConstants {
+  long INVALID_LOG_INDEX = -1;
+  byte LOG_TERMINATE_BYTE = 0;
+  long DEFAULT_SEQNUM = RaftClient.DEFAULT_SEQNUM;
+
+  enum StartupOption {
+    FORMAT("format"),
+    REGULAR("regular");
+
+    private final String option;
+
+    StartupOption(String arg) {
+      this.option = arg;
+    }
+
+    public static StartupOption getOption(String arg) {
+      for (StartupOption s : StartupOption.values()) {
+        if (s.option.equals(arg)) {
+          return s;
+        }
+      }
+      return REGULAR;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerRpc.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerRpc.java 
b/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerRpc.java
new file mode 100644
index 0000000..fb74a0e
--- /dev/null
+++ b/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerRpc.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.raft.server.impl;
+
+import org.apache.raft.protocol.RaftPeer;
+import org.apache.raft.shaded.proto.RaftProtos.*;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+public interface RaftServerRpc {
+  void start();
+
+  void shutdown();
+
+  InetSocketAddress getInetSocketAddress();
+
+  AppendEntriesReplyProto sendAppendEntries(
+      AppendEntriesRequestProto request) throws IOException;
+
+  InstallSnapshotReplyProto sendInstallSnapshot(
+      InstallSnapshotRequestProto request) throws IOException;
+
+  RequestVoteReplyProto sendRequestVote(RequestVoteRequestProto request)
+      throws IOException;
+
+  /** add information of the given peers */
+  void addPeers(Iterable<RaftPeer> peers);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/impl/RequestDispatcher.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/impl/RequestDispatcher.java 
b/raft-server/src/main/java/org/apache/raft/server/impl/RequestDispatcher.java
new file mode 100644
index 0000000..cc8651d
--- /dev/null
+++ 
b/raft-server/src/main/java/org/apache/raft/server/impl/RequestDispatcher.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.raft.server.impl;
+
+import org.apache.raft.protocol.*;
+import org.apache.raft.server.protocol.RaftServerProtocol;
+import org.apache.raft.shaded.proto.RaftProtos.*;
+import org.apache.raft.statemachine.StateMachine;
+import org.apache.raft.statemachine.TransactionContext;
+import org.apache.raft.util.RaftUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Each RPC request is first handled by the RequestDispatcher:
+ * 1. A request from another RaftPeer is to be handled by RaftServer.
+ *
+ * If the raft peer is the leader, then:
+ *
+ * 2. A read-only request from client is to be handled by the state machine.
+ * 3. A write request from client is first validated by the state machine. The
+ * state machine returns the content of the raft log entry, which is then 
passed
+ * to the RaftServer for replication.
+ */
+public class RequestDispatcher implements RaftClientProtocol, 
RaftServerProtocol {
+  static final Logger LOG = LoggerFactory.getLogger(RequestDispatcher.class);
+
+  private final RaftServer server;
+  private final StateMachine stateMachine;
+
+  public RequestDispatcher(RaftServer server) {
+    this.server = server;
+    this.stateMachine = server.getStateMachine();
+  }
+
+  public CompletableFuture<RaftClientReply> handleClientRequest(
+      RaftClientRequest request) throws IOException {
+    // first check the server's leader state
+    CompletableFuture<RaftClientReply> reply = 
server.checkLeaderState(request);
+    if (reply != null) {
+      return reply;
+    }
+
+    // let the state machine handle read-only request from client
+    if (request.isReadOnly()) {
+      // TODO: We might not be the leader anymore by the time this completes. 
See the RAFT paper,
+      // section 8 (last part)
+      return stateMachine.query(request);
+    }
+
+    // TODO: this client request will not be added to pending requests
+    // until later which means that any failure in between will leave partial 
state in the
+    // state machine. We should call cancelTransaction() for failed requests
+    TransactionContext entry = stateMachine.startTransaction(request);
+    if (entry.getException().isPresent()) {
+      throw RaftUtils.asIOException(entry.getException().get());
+    }
+
+    return server.appendTransaction(request, entry);
+  }
+
+  @Override
+  public RaftClientReply submitClientRequest(RaftClientRequest request)
+      throws IOException {
+    return waitForReply(server.getId(), request, handleClientRequest(request));
+  }
+
+  public CompletableFuture<RaftClientReply> setConfigurationAsync(
+      SetConfigurationRequest request) throws IOException {
+    return server.setConfiguration(request);
+  }
+
+  @Override
+  public RaftClientReply setConfiguration(SetConfigurationRequest request)
+      throws IOException {
+    return waitForReply(server.getId(), request, 
setConfigurationAsync(request));
+  }
+
+  private static RaftClientReply waitForReply(String serverId,
+      RaftClientRequest request, CompletableFuture<RaftClientReply> future)
+      throws IOException {
+    try {
+      return future.get();
+    } catch (InterruptedException e) {
+      final String s = serverId + ": Interrupted when waiting for reply, 
request=" + request;
+      LOG.info(s, e);
+      throw RaftUtils.toInterruptedIOException(s, e);
+    } catch (ExecutionException e) {
+      final Throwable cause = e.getCause();
+      if (cause == null) {
+        throw new IOException(e);
+      }
+      if (cause instanceof NotLeaderException) {
+        return new RaftClientReply(request, (NotLeaderException)cause);
+      } else {
+        throw RaftUtils.asIOException(cause);
+      }
+    }
+  }
+
+  @Override
+  public RequestVoteReplyProto requestVote(RequestVoteRequestProto request)
+      throws IOException {
+    return server.requestVote(request);
+  }
+
+  @Override
+  public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto 
request)
+      throws IOException {
+    return server.appendEntries(request);
+  }
+
+  @Override
+  public InstallSnapshotReplyProto installSnapshot(
+      InstallSnapshotRequestProto request) throws IOException {
+    return server.installSnapshot(request);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/impl/Role.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/Role.java 
b/raft-server/src/main/java/org/apache/raft/server/impl/Role.java
new file mode 100644
index 0000000..1413961
--- /dev/null
+++ b/raft-server/src/main/java/org/apache/raft/server/impl/Role.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.raft.server.impl;
+
+/**
+ * Role of Raft peer
+ */
+public enum Role {
+  LEADER, CANDIDATE, FOLLOWER
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/impl/ServerProtoUtils.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/impl/ServerProtoUtils.java 
b/raft-server/src/main/java/org/apache/raft/server/impl/ServerProtoUtils.java
index 21fd719..4594666 100644
--- 
a/raft-server/src/main/java/org/apache/raft/server/impl/ServerProtoUtils.java
+++ 
b/raft-server/src/main/java/org/apache/raft/server/impl/ServerProtoUtils.java
@@ -18,7 +18,6 @@
 package org.apache.raft.server.impl;
 
 import org.apache.raft.client.impl.ClientProtoUtils;
-import org.apache.raft.server.RaftConfiguration;
 import org.apache.raft.server.protocol.TermIndex;
 import org.apache.raft.shaded.proto.RaftProtos.*;
 import org.apache.raft.util.ProtoUtils;
@@ -27,7 +26,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Collectors;
 
-import static org.apache.raft.server.RaftServerConstants.DEFAULT_SEQNUM;
+import static org.apache.raft.server.impl.RaftServerConstants.DEFAULT_SEQNUM;
 import static 
org.apache.raft.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.SUCCESS;
 
 public class ServerProtoUtils {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/impl/ServerState.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/impl/ServerState.java 
b/raft-server/src/main/java/org/apache/raft/server/impl/ServerState.java
new file mode 100644
index 0000000..6680175
--- /dev/null
+++ b/raft-server/src/main/java/org/apache/raft/server/impl/ServerState.java
@@ -0,0 +1,345 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.raft.server.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.raft.conf.RaftProperties;
+import org.apache.raft.server.protocol.TermIndex;
+import org.apache.raft.server.storage.*;
+import org.apache.raft.shaded.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
+import org.apache.raft.statemachine.SnapshotInfo;
+import org.apache.raft.statemachine.StateMachine;
+import org.apache.raft.statemachine.TransactionContext;
+import org.apache.raft.util.ProtoUtils;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import static 
org.apache.raft.server.RaftServerConfigKeys.RAFT_SERVER_USE_MEMORY_LOG_DEFAULT;
+import static 
org.apache.raft.server.RaftServerConfigKeys.RAFT_SERVER_USE_MEMORY_LOG_KEY;
+
+/**
+ * Common states of a raft peer. Protected by RaftServer's lock.
+ */
+public class ServerState implements Closeable {
+  private final String selfId;
+  private final RaftServer server;
+  /** Raft log */
+  private final RaftLog log;
+  /** Raft configuration */
+  private final ConfigurationManager configurationManager;
+  /** The thread that applies committed log entries to the state machine */
+  private final StateMachineUpdater stateMachineUpdater;
+  /** local storage for log and snapshot */
+  private final RaftStorage storage;
+  private final SnapshotManager snapshotManager;
+
+  /**
+   * Latest term server has seen. initialized to 0 on first boot, increases
+   * monotonically.
+   */
+  private long currentTerm;
+  /**
+   * The server ID of the leader for this term. Null means either there is
+   * no leader for this term yet or this server does not know who it is yet.
+   */
+  private String leaderId;
+  /**
+   * Candidate that this peer granted vote for in current term (or null if 
none)
+   */
+  private String votedFor;
+
+  /**
+   * Latest installed snapshot for this server. This maybe different than 
StateMachine's latest
+   * snapshot. Once we successfully install a snapshot, the SM may not pick it 
up immediately.
+   * Further, this will not get updated when SM does snapshots itself.
+   */
+  private TermIndex latestInstalledSnapshot;
+
+  ServerState(String id, RaftConfiguration conf, RaftProperties prop,
+      RaftServer server, StateMachine stateMachine) throws IOException {
+    this.selfId = id;
+    this.server = server;
+    configurationManager = new ConfigurationManager(conf);
+    storage = new RaftStorage(prop, RaftServerConstants.StartupOption.REGULAR);
+    snapshotManager = new SnapshotManager(storage, id);
+
+    long lastApplied = initStatemachine(stateMachine, prop);
+
+    leaderId = null;
+    log = initLog(id, prop, server, lastApplied);
+    RaftLog.Metadata metadata = log.loadMetadata();
+    currentTerm = metadata.getTerm();
+    votedFor = metadata.getVotedFor();
+
+    stateMachineUpdater = new StateMachineUpdater(stateMachine, server, log,
+         lastApplied, prop);
+  }
+
+  /**
+   * Used by tests to set initial raft configuration with correct port 
bindings.
+   */
+  @VisibleForTesting
+  public void setInitialConf(RaftConfiguration initialConf) {
+    configurationManager.setInitialConf(initialConf);
+  }
+
+  private long initStatemachine(StateMachine sm, RaftProperties properties)
+      throws IOException {
+    sm.initialize(selfId, properties, storage);
+    storage.setStateMachineStorage(sm.getStateMachineStorage());
+    SnapshotInfo snapshot = sm.getLatestSnapshot();
+
+    if (snapshot == null || snapshot.getTermIndex().getIndex() < 0) {
+      return RaftServerConstants.INVALID_LOG_INDEX;
+    }
+
+    // get the raft configuration from the snapshot
+    RaftConfiguration raftConf = sm.getRaftConfiguration();
+    if (raftConf != null) {
+      configurationManager.addConfiguration(raftConf.getLogEntryIndex(),
+          raftConf);
+    }
+    return snapshot.getIndex();
+  }
+
+  void start() {
+    stateMachineUpdater.start();
+  }
+
+  /**
+   * note we do not apply log entries to the state machine here since we do not
+   * know whether they have been committed.
+   */
+  private RaftLog initLog(String id, RaftProperties prop, RaftServer server,
+      long lastIndexInSnapshot) throws IOException {
+    final RaftLog log;
+    if (prop.getBoolean(RAFT_SERVER_USE_MEMORY_LOG_KEY,
+        RAFT_SERVER_USE_MEMORY_LOG_DEFAULT)) {
+      log = new MemoryRaftLog(id);
+    } else {
+      log = new SegmentedRaftLog(id, server, this.storage,
+          lastIndexInSnapshot, prop);
+    }
+    log.open(configurationManager, lastIndexInSnapshot);
+    return log;
+  }
+
+  public RaftConfiguration getRaftConf() {
+    return configurationManager.getCurrent();
+  }
+
+  @VisibleForTesting
+
+  public String getSelfId() {
+    return this.selfId;
+  }
+
+  public long getCurrentTerm() {
+    return currentTerm;
+  }
+
+  void setCurrentTerm(long term) {
+    currentTerm = term;
+  }
+
+  String getLeaderId() {
+    return leaderId;
+  }
+
+  boolean hasLeader() {
+    return leaderId != null;
+  }
+
+  /**
+   * Become a candidate and start leader election
+   */
+  long initElection() {
+    votedFor = selfId;
+    leaderId = null;
+    return ++currentTerm;
+  }
+
+  void persistMetadata() throws IOException {
+    this.log.writeMetadata(currentTerm, votedFor);
+  }
+
+  void resetLeaderAndVotedFor() {
+    votedFor = null;
+    leaderId = null;
+  }
+
+  /**
+   * Vote for a candidate and update the local state.
+   */
+  void grantVote(String candidateId) {
+    votedFor = candidateId;
+    leaderId = null;
+  }
+
+  void setLeader(String leaderId) {
+    this.leaderId = leaderId;
+  }
+
+  void becomeLeader() {
+    leaderId = selfId;
+  }
+
+  public RaftLog getLog() {
+    return log;
+  }
+
+  long applyLog(TransactionContext operation) throws IOException {
+    return log.append(currentTerm, operation);
+  }
+
+  /**
+   * Check if accept the leader selfId and term from the incoming 
AppendEntries rpc.
+   * If accept, update the current state.
+   * @return true if the check passes
+   */
+  boolean recognizeLeader(String leaderId, long leaderTerm) {
+    if (leaderTerm < currentTerm) {
+      return false;
+    } else if (leaderTerm > currentTerm || this.leaderId == null) {
+      // If the request indicates a term that is greater than the current term
+      // or no leader has been set for the current term, make sure to update
+      // leader and term later
+      return true;
+    }
+    Preconditions.checkArgument(this.leaderId.equals(leaderId),
+        "selfId:%s, this.leaderId:%s, received leaderId:%s",
+        selfId, this.leaderId, leaderId);
+    return true;
+  }
+
+  /**
+   * Check if the candidate's term is acceptable
+   */
+  boolean recognizeCandidate(String candidateId,
+      long candidateTerm) {
+    if (candidateTerm > currentTerm) {
+      return true;
+    } else if (candidateTerm == currentTerm) {
+      // has not voted yet or this is a retry
+      return votedFor == null || votedFor.equals(candidateId);
+    }
+    return false;
+  }
+
+  boolean isLogUpToDate(TermIndex candidateLastEntry) {
+    LogEntryProto lastEntry = log.getLastEntry();
+    // need to take into account snapshot
+    SnapshotInfo snapshot = server.getStateMachine().getLatestSnapshot();
+     if (lastEntry == null && snapshot == null) {
+      return true;
+    } else if (candidateLastEntry == null) {
+      return false;
+    }
+    TermIndex local = ServerProtoUtils.toTermIndex(lastEntry);
+    if (local == null || (snapshot != null && snapshot.getIndex() > 
lastEntry.getIndex())) {
+      local = snapshot.getTermIndex();
+    }
+    return local.compareTo(candidateLastEntry) <= 0;
+  }
+
+  @Override
+  public String toString() {
+    return selfId + ":t" + currentTerm + ", leader=" + leaderId
+        + ", voted=" + votedFor + ", raftlog=" + log + ", conf=" + 
getRaftConf();
+  }
+
+  boolean isConfCommitted() {
+    return getLog().getLastCommittedIndex() >=
+        getRaftConf().getLogEntryIndex();
+  }
+
+  public void setRaftConf(long logIndex, RaftConfiguration conf) {
+    configurationManager.addConfiguration(logIndex, conf);
+    RaftServer.LOG.info("{}: successfully update the configuration {}",
+        getSelfId(), conf);
+  }
+
+  void updateConfiguration(LogEntryProto[] entries) {
+    if (entries != null && entries.length > 0) {
+      configurationManager.removeConfigurations(entries[0].getIndex());
+      for (LogEntryProto entry : entries) {
+        if (ProtoUtils.isConfigurationLogEntry(entry)) {
+          final RaftConfiguration conf = ServerProtoUtils.toRaftConfiguration(
+              entry.getIndex(), entry.getConfigurationEntry());
+          configurationManager.addConfiguration(entry.getIndex(), conf);
+          server.addPeersToRPC(conf.getPeers());
+        }
+      }
+    }
+  }
+
+  void updateStatemachine(long majorityIndex, long currentTerm) {
+    log.updateLastCommitted(majorityIndex, currentTerm);
+    stateMachineUpdater.notifyUpdater();
+  }
+
+  void reloadStateMachine(long lastIndexInSnapshot, long currentTerm)
+      throws IOException {
+    log.updateLastCommitted(lastIndexInSnapshot, currentTerm);
+
+    stateMachineUpdater.reloadStateMachine();
+  }
+
+  @Override
+  public void close() throws IOException {
+    stateMachineUpdater.stop();
+    RaftServer.LOG.info("{} closes. The last applied log index is {}",
+        getSelfId(), getLastAppliedIndex());
+    storage.close();
+  }
+
+  @VisibleForTesting
+  public RaftStorage getStorage() {
+    return storage;
+  }
+
+  void installSnapshot(InstallSnapshotRequestProto request) throws IOException 
{
+    // TODO: verify that we need to install the snapshot
+    StateMachine sm = server.getStateMachine();
+    sm.pause(); // pause the SM to prepare for install snapshot
+    snapshotManager.installSnapshot(sm, request);
+    log.syncWithSnapshot(request.getTermIndex().getIndex());
+    this.latestInstalledSnapshot = ServerProtoUtils.toTermIndex(
+        request.getTermIndex());
+  }
+
+  SnapshotInfo getLatestSnapshot() {
+    return 
server.getStateMachine().getStateMachineStorage().getLatestSnapshot();
+  }
+
+  public TermIndex getLatestInstalledSnapshot() {
+    return latestInstalledSnapshot;
+  }
+
+  @VisibleForTesting
+  public long getLastAppliedIndex() {
+    return stateMachineUpdater.getLastAppliedIndex();
+  }
+
+  boolean isCurrentConfCommitted() {
+    return getRaftConf().getLogEntryIndex() <= 
getLog().getLastCommittedIndex();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/impl/StateMachineUpdater.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/impl/StateMachineUpdater.java
 
b/raft-server/src/main/java/org/apache/raft/server/impl/StateMachineUpdater.java
new file mode 100644
index 0000000..06fa221
--- /dev/null
+++ 
b/raft-server/src/main/java/org/apache/raft/server/impl/StateMachineUpdater.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.raft.server.impl;
+
+import com.google.common.base.Preconditions;
+import org.apache.raft.conf.RaftProperties;
+import org.apache.raft.protocol.Message;
+import org.apache.raft.server.RaftServerConfigKeys;
+import org.apache.raft.server.storage.RaftLog;
+import org.apache.raft.server.storage.RaftStorage;
+import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
+import org.apache.raft.statemachine.SnapshotInfo;
+import org.apache.raft.statemachine.StateMachine;
+import org.apache.raft.statemachine.TransactionContext;
+import org.apache.raft.util.Daemon;
+import org.apache.raft.util.ExitUtils;
+import org.apache.raft.util.LifeCycle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import static 
org.apache.raft.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY;
+import static 
org.apache.raft.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.SMLOGENTRY;
+
+/**
+ * This class tracks the log entries that have been committed in a quorum and
+ * applies them to the state machine. We let a separate thread do this work
+ * asynchronously so that this will not block normal raft protocol.
+ *
+ * If the auto log compaction is enabled, the state machine updater thread will
+ * trigger a snapshot of the state machine by calling
+ * {@link StateMachine#takeSnapshot} when the log size exceeds a limit.
+ */
+class StateMachineUpdater implements Runnable {
+  static final Logger LOG = LoggerFactory.getLogger(StateMachineUpdater.class);
+
+  enum State {
+    RUNNING, STOP, RELOAD
+  }
+
+  private final RaftProperties properties;
+  private final StateMachine stateMachine;
+  private final RaftServer server;
+  private final RaftLog raftLog;
+
+  private volatile long lastAppliedIndex;
+
+  private final boolean autoSnapshotEnabled;
+  private final long snapshotThreshold;
+  private long lastSnapshotIndex;
+
+  private final Thread updater;
+  private volatile State state = State.RUNNING;
+
+  StateMachineUpdater(StateMachine stateMachine, RaftServer server,
+      RaftLog raftLog, long lastAppliedIndex, RaftProperties properties) {
+    this.properties = properties;
+    this.stateMachine = stateMachine;
+    this.server = server;
+    this.raftLog = raftLog;
+
+    this.lastAppliedIndex = lastAppliedIndex;
+    lastSnapshotIndex = lastAppliedIndex;
+
+    autoSnapshotEnabled = properties.getBoolean(
+        RaftServerConfigKeys.RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_KEY,
+        RaftServerConfigKeys.RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_DEFAULT);
+    snapshotThreshold = properties.getLong(
+        RaftServerConfigKeys.RAFT_SERVER_SNAPSHOT_TRIGGER_THRESHOLD_KEY,
+        RaftServerConfigKeys.RAFT_SERVER_SNAPSHOT_TRIGGER_THRESHOLD_DEFAULT);
+    updater = new Daemon(this);
+  }
+
+  void start() {
+    updater.start();
+  }
+
+  void stop() {
+    state = State.STOP;
+    updater.interrupt();
+    try {
+      stateMachine.close();
+    } catch (IOException ignored) {
+    }
+  }
+
+  void reloadStateMachine() {
+    state = State.RELOAD;
+    notifyUpdater();
+  }
+
+  synchronized void notifyUpdater() {
+    notifyAll();
+  }
+
+  @Override
+  public String toString() {
+    return this.getClass().getSimpleName() + "-" + raftLog.getSelfId();
+  }
+
+  @Override
+  public void run() {
+    final RaftStorage storage = server.getState().getStorage();
+    while (isRunning()) {
+      try {
+        synchronized (this) {
+          // when the peers just start, the committedIndex is initialized as 0
+          // and will be updated only after the leader contacts other peers.
+          // Thus initially lastAppliedIndex can be greater than lastCommitted.
+          while (lastAppliedIndex >= raftLog.getLastCommittedIndex()) {
+            wait();
+          }
+        }
+
+        final long committedIndex = raftLog.getLastCommittedIndex();
+        Preconditions.checkState(lastAppliedIndex < committedIndex);
+
+        if (state == State.RELOAD) {
+          Preconditions.checkState(stateMachine.getLifeCycleState() == 
LifeCycle.State.PAUSED);
+
+          stateMachine.reinitialize(server.getId(), properties, storage);
+
+          SnapshotInfo snapshot = stateMachine.getLatestSnapshot();
+          Preconditions.checkState(snapshot != null && snapshot.getIndex() > 
lastAppliedIndex,
+              "Snapshot: %s, lastAppliedIndex: %s", snapshot, 
lastAppliedIndex);
+
+          lastAppliedIndex = snapshot.getIndex();
+          lastSnapshotIndex = snapshot.getIndex();
+          state = State.RUNNING;
+        }
+
+        while (lastAppliedIndex < committedIndex) {
+          final LogEntryProto next = raftLog.get(lastAppliedIndex + 1);
+          if (next != null) {
+            if (next.getLogEntryBodyCase() == CONFIGURATIONENTRY) {
+              // the reply should have already been set. only need to record
+              // the new conf in the state machine.
+              stateMachine.setRaftConfiguration(
+                  ServerProtoUtils.toRaftConfiguration(next.getIndex(),
+                      next.getConfigurationEntry()));
+            } else if (next.getLogEntryBodyCase() == SMLOGENTRY) {
+              // check whether there is a TransactionContext because we are 
the leader.
+              TransactionContext trx = 
server.getTransactionContext(next.getIndex());
+              if (trx == null) {
+                trx = new TransactionContext(stateMachine, next);
+              }
+
+              // Let the StateMachine inject logic for committed transactions 
in sequential order.
+              trx = stateMachine.applyTransactionSerial(trx);
+
+              // TODO: This step can be parallelized
+              CompletableFuture<Message> messageFuture =
+                  stateMachine.applyTransaction(trx);
+              server.replyPendingRequest(next.getIndex(), messageFuture);
+            }
+            lastAppliedIndex++;
+          } else {
+            LOG.debug("{}: logEntry {} is null. There may be snapshot to load. 
state:{}",
+                this, lastAppliedIndex + 1, state);
+            break;
+          }
+        }
+
+        // check if need to trigger a snapshot
+        if (shouldTakeSnapshot(lastAppliedIndex)) {
+          stateMachine.takeSnapshot();
+          // TODO purge logs, including log cache. but should keep log for 
leader's RPCSenders
+          lastSnapshotIndex = lastAppliedIndex;
+        }
+      } catch (InterruptedException e) {
+        if (!isRunning()) {
+          LOG.info("{}: the StateMachineUpdater is interrupted and will 
exit.", this);
+        } else {
+          final String s = this + ": the StateMachineUpdater is wrongly 
interrupted";
+          ExitUtils.terminate(1, s, e, LOG);
+        }
+      } catch (Throwable t) {
+        final String s = this + ": the StateMachineUpdater hits Throwable";
+        ExitUtils.terminate(2, s, t, LOG);
+      }
+    }
+  }
+
+  private boolean isRunning() {
+    return state != State.STOP;
+  }
+
+  private boolean shouldTakeSnapshot(long currentAppliedIndex) {
+    return autoSnapshotEnabled && (state != State.RELOAD) &&
+        (currentAppliedIndex - lastSnapshotIndex >= snapshotThreshold);
+  }
+
+  long getLastAppliedIndex() {
+    return lastAppliedIndex;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/storage/LogInputStream.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/storage/LogInputStream.java 
b/raft-server/src/main/java/org/apache/raft/server/storage/LogInputStream.java
index e058ea3..95597b2 100644
--- 
a/raft-server/src/main/java/org/apache/raft/server/storage/LogInputStream.java
+++ 
b/raft-server/src/main/java/org/apache/raft/server/storage/LogInputStream.java
@@ -29,7 +29,7 @@ import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
 
-import static org.apache.raft.server.RaftServerConstants.INVALID_LOG_INDEX;
+import static 
org.apache.raft.server.impl.RaftServerConstants.INVALID_LOG_INDEX;
 
 public class LogInputStream implements Closeable {
   static final Logger LOG = LoggerFactory.getLogger(LogInputStream.class);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/storage/LogOutputStream.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/storage/LogOutputStream.java 
b/raft-server/src/main/java/org/apache/raft/server/storage/LogOutputStream.java
index 1ecb5d5..0dc8029 100644
--- 
a/raft-server/src/main/java/org/apache/raft/server/storage/LogOutputStream.java
+++ 
b/raft-server/src/main/java/org/apache/raft/server/storage/LogOutputStream.java
@@ -18,7 +18,7 @@
 package org.apache.raft.server.storage;
 
 import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.server.RaftServerConstants;
+import org.apache.raft.server.impl.RaftServerConstants;
 import org.apache.raft.shaded.com.google.protobuf.CodedOutputStream;
 import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.raft.util.PureJavaCrc32C;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/storage/LogReader.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/storage/LogReader.java 
b/raft-server/src/main/java/org/apache/raft/server/storage/LogReader.java
index 4e37e2a..9523cac 100644
--- a/raft-server/src/main/java/org/apache/raft/server/storage/LogReader.java
+++ b/raft-server/src/main/java/org/apache/raft/server/storage/LogReader.java
@@ -20,7 +20,7 @@ package org.apache.raft.server.storage;
 import com.google.common.base.Preconditions;
 import org.apache.commons.io.Charsets;
 import org.apache.raft.protocol.ChecksumException;
-import org.apache.raft.server.RaftServerConstants;
+import org.apache.raft.server.impl.RaftServerConstants;
 import org.apache.raft.shaded.com.google.protobuf.CodedInputStream;
 import org.apache.raft.shaded.com.google.protobuf.CodedOutputStream;
 import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/storage/LogSegment.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/storage/LogSegment.java 
b/raft-server/src/main/java/org/apache/raft/server/storage/LogSegment.java
index b2531f0..987cc6c 100644
--- a/raft-server/src/main/java/org/apache/raft/server/storage/LogSegment.java
+++ b/raft-server/src/main/java/org/apache/raft/server/storage/LogSegment.java
@@ -18,7 +18,7 @@
 package org.apache.raft.server.storage;
 
 import com.google.common.base.Preconditions;
-import org.apache.raft.server.ConfigurationManager;
+import org.apache.raft.server.impl.ConfigurationManager;
 import org.apache.raft.server.impl.ServerProtoUtils;
 import org.apache.raft.shaded.com.google.protobuf.CodedOutputStream;
 import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/storage/MemoryRaftLog.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/storage/MemoryRaftLog.java 
b/raft-server/src/main/java/org/apache/raft/server/storage/MemoryRaftLog.java
index d87b0a4..c12e1aa 100644
--- 
a/raft-server/src/main/java/org/apache/raft/server/storage/MemoryRaftLog.java
+++ 
b/raft-server/src/main/java/org/apache/raft/server/storage/MemoryRaftLog.java
@@ -18,8 +18,8 @@
 package org.apache.raft.server.storage;
 
 import com.google.common.base.Preconditions;
-import org.apache.raft.server.RaftConfiguration;
-import org.apache.raft.server.RaftServerConstants;
+import org.apache.raft.server.impl.RaftConfiguration;
+import org.apache.raft.server.impl.RaftServerConstants;
 import org.apache.raft.server.impl.ServerProtoUtils;
 import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.raft.util.AutoCloseableLock;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/storage/RaftLog.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/storage/RaftLog.java 
b/raft-server/src/main/java/org/apache/raft/server/storage/RaftLog.java
index 42f1391..acd44b0 100644
--- a/raft-server/src/main/java/org/apache/raft/server/storage/RaftLog.java
+++ b/raft-server/src/main/java/org/apache/raft/server/storage/RaftLog.java
@@ -18,9 +18,9 @@
 package org.apache.raft.server.storage;
 
 import com.google.common.base.Preconditions;
-import org.apache.raft.server.ConfigurationManager;
-import org.apache.raft.server.RaftConfiguration;
-import org.apache.raft.server.RaftServerConstants;
+import org.apache.raft.server.impl.ConfigurationManager;
+import org.apache.raft.server.impl.RaftConfiguration;
+import org.apache.raft.server.impl.RaftServerConstants;
 import org.apache.raft.server.impl.ServerProtoUtils;
 import org.apache.raft.server.protocol.TermIndex;
 import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogCache.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogCache.java 
b/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogCache.java
index bdc0675..d022a91 100644
--- a/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogCache.java
+++ b/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogCache.java
@@ -19,14 +19,14 @@ package org.apache.raft.server.storage;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import org.apache.raft.server.RaftServerConstants;
+import org.apache.raft.server.impl.RaftServerConstants;
 import org.apache.raft.server.storage.LogSegment.LogRecord;
 import org.apache.raft.server.storage.LogSegment.SegmentFileInfo;
 import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
 
 import java.util.*;
 
-import static org.apache.raft.server.RaftServerConstants.INVALID_LOG_INDEX;
+import static 
org.apache.raft.server.impl.RaftServerConstants.INVALID_LOG_INDEX;
 
 /**
  * In-memory RaftLog Cache. Currently we provide a simple implementation that

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogWorker.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogWorker.java 
b/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogWorker.java
index 090be49..1837e94 100644
--- 
a/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogWorker.java
+++ 
b/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogWorker.java
@@ -20,8 +20,8 @@ package org.apache.raft.server.storage;
 import com.google.common.base.Preconditions;
 import org.apache.raft.conf.RaftProperties;
 import org.apache.raft.io.nativeio.NativeIO;
-import org.apache.raft.server.RaftServer;
-import org.apache.raft.server.RaftServerConstants;
+import org.apache.raft.server.impl.RaftServer;
+import org.apache.raft.server.impl.RaftServerConstants;
 import org.apache.raft.server.storage.LogSegment.SegmentFileInfo;
 import org.apache.raft.server.storage.RaftLogCache.TruncationSegments;
 import org.apache.raft.server.storage.SegmentedRaftLog.Task;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorage.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorage.java 
b/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorage.java
index 8646b9a..434f505 100644
--- a/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorage.java
+++ b/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorage.java
@@ -19,7 +19,7 @@ package org.apache.raft.server.storage;
 
 import com.google.common.base.Preconditions;
 import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.server.RaftServerConstants;
+import org.apache.raft.server.impl.RaftServerConstants;
 import org.apache.raft.server.storage.RaftStorageDirectory.StorageState;
 import org.apache.raft.statemachine.SnapshotInfo;
 import org.apache.raft.statemachine.StateMachineStorage;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorageDirectory.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorageDirectory.java
 
b/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorageDirectory.java
index e47f3a6..662e4ec 100644
--- 
a/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorageDirectory.java
+++ 
b/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorageDirectory.java
@@ -40,7 +40,7 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import static java.nio.file.Files.newDirectoryStream;
-import static org.apache.raft.server.RaftServerConstants.INVALID_LOG_INDEX;
+import static 
org.apache.raft.server.impl.RaftServerConstants.INVALID_LOG_INDEX;
 
 public class RaftStorageDirectory {
   static final Logger LOG = 
LoggerFactory.getLogger(RaftStorageDirectory.class);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/storage/SegmentedRaftLog.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/storage/SegmentedRaftLog.java
 
b/raft-server/src/main/java/org/apache/raft/server/storage/SegmentedRaftLog.java
index d87fc0a..9c55491 100644
--- 
a/raft-server/src/main/java/org/apache/raft/server/storage/SegmentedRaftLog.java
+++ 
b/raft-server/src/main/java/org/apache/raft/server/storage/SegmentedRaftLog.java
@@ -21,9 +21,9 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.commons.io.Charsets;
 import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.server.ConfigurationManager;
-import org.apache.raft.server.RaftServer;
-import org.apache.raft.server.RaftServerConstants;
+import org.apache.raft.server.impl.ConfigurationManager;
+import org.apache.raft.server.impl.RaftServer;
+import org.apache.raft.server.impl.RaftServerConstants;
 import org.apache.raft.server.storage.RaftStorageDirectory.LogPathAndIndex;
 import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.raft.util.AutoCloseableLock;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/statemachine/BaseStateMachine.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/statemachine/BaseStateMachine.java 
b/raft-server/src/main/java/org/apache/raft/statemachine/BaseStateMachine.java
index 33fff3f..ccc52c7 100644
--- 
a/raft-server/src/main/java/org/apache/raft/statemachine/BaseStateMachine.java
+++ 
b/raft-server/src/main/java/org/apache/raft/statemachine/BaseStateMachine.java
@@ -22,8 +22,8 @@ import org.apache.raft.conf.RaftProperties;
 import org.apache.raft.protocol.Message;
 import org.apache.raft.protocol.RaftClientReply;
 import org.apache.raft.protocol.RaftClientRequest;
-import org.apache.raft.server.RaftConfiguration;
-import org.apache.raft.server.RaftServerConstants;
+import org.apache.raft.server.impl.RaftConfiguration;
+import org.apache.raft.server.impl.RaftServerConstants;
 import org.apache.raft.server.storage.RaftStorage;
 import org.apache.raft.shaded.proto.RaftProtos.SMLogEntryProto;
 import org.apache.raft.util.LifeCycle;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/statemachine/SimpleStateMachineStorage.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/statemachine/SimpleStateMachineStorage.java
 
b/raft-server/src/main/java/org/apache/raft/statemachine/SimpleStateMachineStorage.java
index ad9fee3..bedb5b0 100644
--- 
a/raft-server/src/main/java/org/apache/raft/statemachine/SimpleStateMachineStorage.java
+++ 
b/raft-server/src/main/java/org/apache/raft/statemachine/SimpleStateMachineStorage.java
@@ -21,7 +21,7 @@ package org.apache.raft.statemachine;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import org.apache.raft.io.MD5Hash;
-import org.apache.raft.server.RaftConfiguration;
+import org.apache.raft.server.impl.RaftConfiguration;
 import org.apache.raft.server.protocol.TermIndex;
 import org.apache.raft.server.storage.FileInfo;
 import org.apache.raft.server.storage.RaftStorage;


Reply via email to