http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java 
b/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java
new file mode 100644
index 0000000..6778683
--- /dev/null
+++ b/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java
@@ -0,0 +1,749 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.raft.server.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.raft.conf.RaftProperties;
+import org.apache.raft.protocol.*;
+import org.apache.raft.server.RaftServerConfigKeys;
+import org.apache.raft.server.protocol.RaftServerProtocol;
+import org.apache.raft.server.protocol.TermIndex;
+import org.apache.raft.server.storage.FileInfo;
+import org.apache.raft.shaded.proto.RaftProtos.*;
+import org.apache.raft.statemachine.SnapshotInfo;
+import org.apache.raft.statemachine.StateMachine;
+import org.apache.raft.statemachine.TransactionContext;
+import org.apache.raft.util.CodeInjectionForTesting;
+import org.apache.raft.util.LifeCycle;
+import org.apache.raft.util.ProtoUtils;
+import org.apache.raft.util.RaftUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.OptionalLong;
+import java.util.concurrent.CompletableFuture;
+
+import static 
org.apache.raft.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.*;
+import static org.apache.raft.util.LifeCycle.State.*;
+
+public class RaftServerImpl implements RaftServerProtocol, Closeable {
+  public static final Logger LOG = 
LoggerFactory.getLogger(RaftServerImpl.class);
+
+  private static final String CLASS_NAME = 
RaftServerImpl.class.getSimpleName();
+  static final String REQUEST_VOTE = CLASS_NAME + ".requestVote";
+  static final String APPEND_ENTRIES = CLASS_NAME + ".appendEntries";
+  static final String INSTALL_SNAPSHOT = CLASS_NAME + ".installSnapshot";
+
+
+  private final int minTimeoutMs;
+  private final int maxTimeoutMs;
+
+  private final LifeCycle lifeCycle;
+  private final ServerState state;
+  private final StateMachine stateMachine;
+  private final RaftProperties properties;
+  private volatile Role role;
+
+  /** used when the peer is follower, to monitor election timeout */
+  private volatile FollowerState heartbeatMonitor;
+
+  /** used when the peer is candidate, to request votes from other peers */
+  private volatile LeaderElection electionDaemon;
+
+  /** used when the peer is leader */
+  private volatile LeaderState leaderState;
+
+  private RaftServerRpc serverRpc;
+
+  private final LogAppenderFactory appenderFactory;
+
+  public RaftServerImpl(String id, RaftConfiguration raftConf,
+                        RaftProperties properties, StateMachine stateMachine) 
throws IOException {
+    this.lifeCycle = new LifeCycle(id);
+    minTimeoutMs = properties.getInt(
+        RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_KEY,
+        RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT);
+    maxTimeoutMs = properties.getInt(
+        RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_KEY,
+        RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_DEFAULT);
+    Preconditions.checkArgument(maxTimeoutMs > minTimeoutMs,
+        "max timeout: %s, min timeout: %s", maxTimeoutMs, minTimeoutMs);
+    this.properties = properties;
+    this.stateMachine = stateMachine;
+    this.state = new ServerState(id, raftConf, properties, this, stateMachine);
+    appenderFactory = initAppenderFactory();
+  }
+
+  public int getMinTimeoutMs() {
+    return minTimeoutMs;
+  }
+
+  public int getMaxTimeoutMs() {
+    return maxTimeoutMs;
+  }
+
+  public int getRandomTimeoutMs() {
+    return RaftUtils.getRandomBetween(minTimeoutMs, maxTimeoutMs);
+  }
+
+  public StateMachine getStateMachine() {
+    return this.stateMachine;
+  }
+
+  public LogAppenderFactory getLogAppenderFactory() {
+    return appenderFactory;
+  }
+
+  private LogAppenderFactory initAppenderFactory() {
+    Class<? extends LogAppenderFactory> factoryClass = properties.getClass(
+        RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY,
+        RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_DEFAULT,
+        LogAppenderFactory.class);
+    return RaftUtils.newInstance(factoryClass);
+  }
+
+  /**
+   * Used by tests to set initial raft configuration with correct port 
bindings.
+   */
+  @VisibleForTesting
+  public void setInitialConf(RaftConfiguration conf) {
+    this.state.setInitialConf(conf);
+  }
+
+  public void setServerRpc(RaftServerRpc serverRpc) {
+    this.serverRpc = serverRpc;
+    // add peers into rpc service
+    RaftConfiguration conf = getRaftConf();
+    if (conf != null) {
+      addPeersToRPC(conf.getPeers());
+    }
+  }
+
+  public RaftServerRpc getServerRpc() {
+    return serverRpc;
+  }
+
+  public void start() {
+    lifeCycle.transition(STARTING);
+    state.start();
+    RaftConfiguration conf = getRaftConf();
+    if (conf != null && conf.contains(getId())) {
+      LOG.debug("{} starts as a follower", getId());
+      startAsFollower();
+    } else {
+      LOG.debug("{} starts with initializing state", getId());
+      startInitializing();
+    }
+  }
+
+  /**
+   * The peer belongs to the current configuration, should start as a follower
+   */
+  private void startAsFollower() {
+    role = Role.FOLLOWER;
+    heartbeatMonitor = new FollowerState(this);
+    heartbeatMonitor.start();
+
+    serverRpc.start();
+    lifeCycle.transition(RUNNING);
+  }
+
+  /**
+   * The peer does not have any configuration (maybe it will later be included
+   * in some configuration). Start still as a follower but will not vote or
+   * start election.
+   */
+  private void startInitializing() {
+    role = Role.FOLLOWER;
+    // do not start heartbeatMonitoring
+    serverRpc.start();
+  }
+
+  public ServerState getState() {
+    return this.state;
+  }
+
+  public String getId() {
+    return getState().getSelfId();
+  }
+
+  public RaftConfiguration getRaftConf() {
+    return getState().getRaftConf();
+  }
+
+  @Override
+  public void close() {
+    lifeCycle.checkStateAndClose(() -> {
+      try {
+        shutdownHeartbeatMonitor();
+        shutdownElectionDaemon();
+        shutdownLeaderState();
+
+        serverRpc.shutdown();
+        state.close();
+      } catch (Exception ignored) {
+        LOG.warn("Failed to kill " + state.getSelfId(), ignored);
+      }
+    });
+  }
+
+  public boolean isAlive() {
+    return !lifeCycle.getCurrentState().isOneOf(CLOSING, CLOSED);
+  }
+
+  public boolean isFollower() {
+    return role == Role.FOLLOWER;
+  }
+
+  public boolean isCandidate() {
+    return role == Role.CANDIDATE;
+  }
+
+  public boolean isLeader() {
+    return role == Role.LEADER;
+  }
+
+  Role getRole() {
+    return role;
+  }
+
+  /**
+   * Change the server state to Follower if necessary
+   * @param newTerm The new term.
+   * @param sync We will call {@link ServerState#persistMetadata()} if this is
+   *             set to true and term/votedFor get updated.
+   * @return if the term/votedFor should be updated to the new term
+   * @throws IOException if term/votedFor persistence failed.
+   */
+  synchronized boolean changeToFollower(long newTerm, boolean sync)
+      throws IOException {
+    final Role old = role;
+    role = Role.FOLLOWER;
+
+    boolean metadataUpdated = false;
+    if (newTerm > state.getCurrentTerm()) {
+      state.setCurrentTerm(newTerm);
+      state.resetLeaderAndVotedFor();
+      metadataUpdated = true;
+    }
+
+    if (old == Role.LEADER) {
+      assert leaderState != null;
+      shutdownLeaderState();
+    } else if (old == Role.CANDIDATE) {
+      shutdownElectionDaemon();
+    }
+
+    if (old != Role.FOLLOWER) {
+      heartbeatMonitor = new FollowerState(this);
+      heartbeatMonitor.start();
+    }
+
+    if (metadataUpdated && sync) {
+      state.persistMetadata();
+    }
+    return metadataUpdated;
+  }
+
+  private synchronized void shutdownLeaderState() {
+    final LeaderState leader = leaderState;
+    if (leader != null) {
+      leader.stop();
+    }
+    leaderState = null;
+    // TODO: make sure that StateMachineUpdater has applied all transactions 
that have context
+  }
+
+  private void shutdownElectionDaemon() {
+    final LeaderElection election = electionDaemon;
+    if (election != null) {
+      election.stopRunning();
+      // no need to interrupt the election thread
+    }
+    electionDaemon = null;
+  }
+
+  synchronized void changeToLeader() {
+    Preconditions.checkState(isCandidate());
+    shutdownElectionDaemon();
+    role = Role.LEADER;
+    state.becomeLeader();
+    // start sending AppendEntries RPC to followers
+    leaderState = new LeaderState(this, properties);
+    leaderState.start();
+  }
+
+  private void shutdownHeartbeatMonitor() {
+    final FollowerState hm = heartbeatMonitor;
+    if (hm != null) {
+      hm.stopRunning();
+      hm.interrupt();
+    }
+    heartbeatMonitor = null;
+  }
+
+  synchronized void changeToCandidate() {
+    Preconditions.checkState(isFollower());
+    shutdownHeartbeatMonitor();
+    role = Role.CANDIDATE;
+    // start election
+    electionDaemon = new LeaderElection(this);
+    electionDaemon.start();
+  }
+
+  @Override
+  public String toString() {
+    return role + " " + state + " " + lifeCycle.getCurrentState();
+  }
+
+  /**
+   * @return null if the server is in leader state.
+   */
+  CompletableFuture<RaftClientReply> checkLeaderState(
+      RaftClientRequest request) {
+    if (!isLeader()) {
+      NotLeaderException exception = generateNotLeaderException();
+      CompletableFuture<RaftClientReply> future = new CompletableFuture<>();
+      future.complete(new RaftClientReply(request, exception));
+      return future;
+    }
+    return null;
+  }
+
+  NotLeaderException generateNotLeaderException() {
+    if (lifeCycle.getCurrentState() != RUNNING) {
+      return new NotLeaderException(getId(), null, null);
+    }
+    String leaderId = state.getLeaderId();
+    if (leaderId == null || leaderId.equals(state.getSelfId())) {
+      // No idea about who is the current leader. Or the peer is the current
+      // leader, but it is about to step down
+      RaftPeer suggestedLeader = state.getRaftConf()
+          .getRandomPeer(state.getSelfId());
+      leaderId = suggestedLeader == null ? null : suggestedLeader.getId();
+    }
+    RaftConfiguration conf = getRaftConf();
+    Collection<RaftPeer> peers = conf.getPeers();
+    return new NotLeaderException(getId(), conf.getPeer(leaderId),
+        peers.toArray(new RaftPeer[peers.size()]));
+  }
+
+  /**
+   * Handle a normal update request from client.
+   */
+  public CompletableFuture<RaftClientReply> appendTransaction(
+      RaftClientRequest request, TransactionContext entry)
+      throws RaftException {
+    LOG.debug("{}: receive client request({})", getId(), request);
+    lifeCycle.assertCurrentState(RUNNING);
+    CompletableFuture<RaftClientReply> reply;
+
+    final PendingRequest pending;
+    synchronized (this) {
+      reply = checkLeaderState(request);
+      if (reply != null) {
+        return reply;
+      }
+
+      // append the message to its local log
+      final long entryIndex;
+      try {
+        entryIndex = state.applyLog(entry);
+      } catch (IOException e) {
+        throw new RaftException(e);
+      }
+
+      // put the request into the pending queue
+      pending = leaderState.addPendingRequest(entryIndex, request, entry);
+      leaderState.notifySenders();
+    }
+    return pending.getFuture();
+  }
+
+  /**
+   * Handle a raft configuration change request from client.
+   */
+  public CompletableFuture<RaftClientReply> setConfiguration(
+      SetConfigurationRequest request) throws IOException {
+    LOG.debug("{}: receive setConfiguration({})", getId(), request);
+    lifeCycle.assertCurrentState(RUNNING);
+    CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
+    if (reply != null) {
+      return reply;
+    }
+
+    final RaftPeer[] peersInNewConf = request.getPeersInNewConf();
+    final PendingRequest pending;
+    synchronized (this) {
+      reply = checkLeaderState(request);
+      if (reply != null) {
+        return reply;
+      }
+
+      final RaftConfiguration current = getRaftConf();
+      // make sure there is no other raft reconfiguration in progress
+      if (!current.isStable() || leaderState.inStagingState() ||
+          !state.isCurrentConfCommitted()) {
+        throw new ReconfigurationInProgressException(
+            "Reconfiguration is already in progress: " + current);
+      }
+
+      // return true if the new configuration is the same with the current one
+      if (current.hasNoChange(peersInNewConf)) {
+        pending = leaderState.returnNoConfChange(request);
+        return pending.getFuture();
+      }
+
+      // add new peers into the rpc service
+      addPeersToRPC(Arrays.asList(peersInNewConf));
+      // add staging state into the leaderState
+      pending = leaderState.startSetConfiguration(request);
+    }
+    return pending.getFuture();
+  }
+
+  private boolean shouldWithholdVotes() {
+    return isLeader() || (isFollower() && state.hasLeader()
+        && heartbeatMonitor.shouldWithholdVotes());
+  }
+
+  /**
+   * check if the remote peer is not included in the current conf
+   * and should shutdown. should shutdown if all the following stands:
+   * 1. this is a leader
+   * 2. current conf is stable and has been committed
+   * 3. candidate id is not included in conf
+   * 4. candidate's last entry's index < conf's index
+   */
+  private boolean shouldSendShutdown(String candidateId,
+      TermIndex candidateLastEntry) {
+    return isLeader()
+        && getRaftConf().isStable()
+        && getState().isConfCommitted()
+        && !getRaftConf().containsInConf(candidateId)
+        && candidateLastEntry.getIndex() < getRaftConf().getLogEntryIndex()
+        && !leaderState.isBootStrappingPeer(candidateId);
+  }
+
+  @Override
+  public RequestVoteReplyProto requestVote(RequestVoteRequestProto r)
+      throws IOException {
+    final String candidateId = r.getServerRequest().getRequestorId();
+    return requestVote(candidateId, r.getCandidateTerm(),
+        ServerProtoUtils.toTermIndex(r.getCandidateLastEntry()));
+  }
+
+  private RequestVoteReplyProto requestVote(String candidateId,
+      long candidateTerm, TermIndex candidateLastEntry) throws IOException {
+    CodeInjectionForTesting.execute(REQUEST_VOTE, getId(),
+        candidateId, candidateTerm, candidateLastEntry);
+    LOG.debug("{}: receive requestVote({}, {}, {})",
+        getId(), candidateId, candidateTerm, candidateLastEntry);
+    lifeCycle.assertCurrentState(RUNNING);
+
+    boolean voteGranted = false;
+    boolean shouldShutdown = false;
+    final RequestVoteReplyProto reply;
+    synchronized (this) {
+      if (shouldWithholdVotes()) {
+        LOG.info("{} Withhold vote from server {} with term {}. " +
+            "This server:{}, last rpc time from leader {} is {}", getId(),
+            candidateId, candidateTerm, this, this.getState().getLeaderId(),
+            (isFollower() ? heartbeatMonitor.getLastRpcTime() : -1));
+      } else if (state.recognizeCandidate(candidateId, candidateTerm)) {
+        boolean termUpdated = changeToFollower(candidateTerm, false);
+        // see Section 5.4.1 Election restriction
+        if (state.isLogUpToDate(candidateLastEntry)) {
+          heartbeatMonitor.updateLastRpcTime(false);
+          state.grantVote(candidateId);
+          voteGranted = true;
+        }
+        if (termUpdated || voteGranted) {
+          state.persistMetadata(); // sync metafile
+        }
+      }
+      if (!voteGranted && shouldSendShutdown(candidateId, candidateLastEntry)) 
{
+        shouldShutdown = true;
+      }
+      reply = ServerProtoUtils.toRequestVoteReplyProto(candidateId, getId(),
+          voteGranted, state.getCurrentTerm(), shouldShutdown);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("{} replies to vote request: {}. Peer's state: {}",
+            getId(), ProtoUtils.toString(reply), state);
+      }
+    }
+    return reply;
+  }
+
+  private void validateEntries(long expectedTerm, TermIndex previous,
+      LogEntryProto... entries) {
+    if (entries != null && entries.length > 0) {
+      final long index0 = entries[0].getIndex();
+
+      if (previous == null || previous.getTerm() == 0) {
+        Preconditions.checkArgument(index0 == 0,
+            "Unexpected Index: previous is null but entries[%s].getIndex()=%s",
+            0, index0);
+      } else {
+        Preconditions.checkArgument(previous.getIndex() == index0 - 1,
+            "Unexpected Index: previous is %s but entries[%s].getIndex()=%s",
+            previous, 0, index0);
+      }
+
+      for (int i = 0; i < entries.length; i++) {
+        final long t = entries[i].getTerm();
+        Preconditions.checkArgument(expectedTerm >= t,
+            "Unexpected Term: entries[%s].getTerm()=%s but expectedTerm=%s",
+            i, t, expectedTerm);
+
+        final long indexi = entries[i].getIndex();
+        Preconditions.checkArgument(indexi == index0 + i,
+            "Unexpected Index: entries[%s].getIndex()=%s but 
entries[0].getIndex()=%s",
+            i, indexi, index0);
+      }
+    }
+  }
+
+  @Override
+  public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto r)
+      throws IOException {
+    // TODO avoid converting list to array
+    final LogEntryProto[] entries = r.getEntriesList()
+        .toArray(new LogEntryProto[r.getEntriesCount()]);
+    final TermIndex previous = r.hasPreviousLog() ?
+        ServerProtoUtils.toTermIndex(r.getPreviousLog()) : null;
+    return appendEntries(r.getServerRequest().getRequestorId(),
+        r.getLeaderTerm(), previous, r.getLeaderCommit(), r.getInitializing(),
+        entries);
+  }
+
+  private AppendEntriesReplyProto appendEntries(String leaderId, long 
leaderTerm,
+      TermIndex previous, long leaderCommit, boolean initializing,
+      LogEntryProto... entries) throws IOException {
+    CodeInjectionForTesting.execute(APPEND_ENTRIES, getId(),
+        leaderId, leaderTerm, previous, leaderCommit, initializing, entries);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("{}: receive appendEntries({}, {}, {}, {}, {}, {})", getId(),
+          leaderId, leaderTerm, previous, leaderCommit, initializing,
+          ServerProtoUtils.toString(entries));
+    }
+    lifeCycle.assertCurrentState(STARTING, RUNNING);
+
+    try {
+      validateEntries(leaderTerm, previous, entries);
+    } catch (IllegalArgumentException e) {
+      throw new IOException(e);
+    }
+
+    final long currentTerm;
+    long nextIndex = state.getLog().getNextIndex();
+    synchronized (this) {
+      final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
+      currentTerm = state.getCurrentTerm();
+      if (!recognized) {
+        final AppendEntriesReplyProto reply = 
ServerProtoUtils.toAppendEntriesReplyProto(
+            leaderId, getId(), currentTerm, nextIndex, NOT_LEADER);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("{}: do not recognize leader. Reply: {}",
+              getId(), ProtoUtils.toString(reply));
+        }
+        return reply;
+      }
+      changeToFollower(leaderTerm, true);
+      state.setLeader(leaderId);
+
+      if (!initializing && lifeCycle.compareAndTransition(STARTING, RUNNING)) {
+        heartbeatMonitor = new FollowerState(this);
+        heartbeatMonitor.start();
+      }
+      if (lifeCycle.getCurrentState() == RUNNING) {
+        heartbeatMonitor.updateLastRpcTime(true);
+      }
+
+      // We need to check if "previous" is in the local peer. Note that it is
+      // possible that "previous" is covered by the latest snapshot: e.g.,
+      // it's possible there's no log entries outside of the latest snapshot.
+      // However, it is not possible that "previous" index is smaller than the
+      // last index included in snapshot. This is because indices <= snapshot's
+      // last index should have been committed.
+      if (previous != null && !containPrevious(previous)) {
+        final AppendEntriesReplyProto reply =
+            ServerProtoUtils.toAppendEntriesReplyProto(leaderId, getId(),
+                currentTerm, Math.min(nextIndex, previous.getIndex()), 
INCONSISTENCY);
+        LOG.debug("{}: inconsistency entries. Leader previous:{}, Reply:{}",
+            getId(), previous, ServerProtoUtils.toString(reply));
+        return reply;
+      }
+
+      state.getLog().append(entries);
+      state.updateConfiguration(entries);
+      state.updateStatemachine(leaderCommit, currentTerm);
+    }
+    if (entries != null && entries.length > 0) {
+      try {
+        state.getLog().logSync();
+      } catch (InterruptedException e) {
+        throw new InterruptedIOException("logSync got interrupted");
+      }
+      nextIndex = entries[entries.length - 1].getIndex() + 1;
+    }
+    synchronized (this) {
+      if (lifeCycle.getCurrentState() == RUNNING && isFollower()
+          && getState().getCurrentTerm() == currentTerm) {
+        // reset election timer to avoid punishing the leader for our own
+        // long disk writes
+        heartbeatMonitor.updateLastRpcTime(false);
+      }
+    }
+    final AppendEntriesReplyProto reply = 
ServerProtoUtils.toAppendEntriesReplyProto(
+        leaderId, getId(), currentTerm, nextIndex, SUCCESS);
+    LOG.debug("{}: succeeded to handle AppendEntries. Reply: {}", getId(),
+        ServerProtoUtils.toString(reply));
+    return reply;
+  }
+
+  private boolean containPrevious(TermIndex previous) {
+    LOG.debug("{}: prev:{}, latestSnapshot:{}, getLatestInstalledSnapshot:{}",
+        getId(), previous, state.getLatestSnapshot(), 
state.getLatestInstalledSnapshot());
+    return state.getLog().contains(previous)
+        ||  (state.getLatestSnapshot() != null
+             && state.getLatestSnapshot().getTermIndex().equals(previous))
+        || (state.getLatestInstalledSnapshot() != null)
+             && state.getLatestInstalledSnapshot().equals(previous);
+  }
+
+  @Override
+  public InstallSnapshotReplyProto installSnapshot(
+      InstallSnapshotRequestProto request) throws IOException {
+    final String leaderId = request.getServerRequest().getRequestorId();
+    CodeInjectionForTesting.execute(INSTALL_SNAPSHOT, getId(), leaderId, 
request);
+    LOG.debug("{}: receive installSnapshot({})", getId(), request);
+
+    lifeCycle.assertCurrentState(STARTING, RUNNING);
+
+    final long currentTerm;
+    final long leaderTerm = request.getLeaderTerm();
+    final TermIndex lastTermIndex = ServerProtoUtils.toTermIndex(
+        request.getTermIndex());
+    final long lastIncludedIndex = lastTermIndex.getIndex();
+    synchronized (this) {
+      final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
+      currentTerm = state.getCurrentTerm();
+      if (!recognized) {
+        final InstallSnapshotReplyProto reply = ServerProtoUtils
+            .toInstallSnapshotReplyProto(leaderId, getId(), currentTerm,
+                request.getRequestIndex(), InstallSnapshotResult.NOT_LEADER);
+        LOG.debug("{}: do not recognize leader for installing snapshot." +
+            " Reply: {}", getId(), reply);
+        return reply;
+      }
+      changeToFollower(leaderTerm, true);
+      state.setLeader(leaderId);
+
+      if (lifeCycle.getCurrentState() == RUNNING) {
+        heartbeatMonitor.updateLastRpcTime(true);
+      }
+
+      // Check and append the snapshot chunk. We simply put this in lock
+      // considering a follower peer requiring a snapshot installation does not
+      // have a lot of requests
+      Preconditions.checkState(
+          state.getLog().getNextIndex() <= lastIncludedIndex,
+          "%s log's next id is %s, last included index in snapshot is %s",
+          getId(),  state.getLog().getNextIndex(), lastIncludedIndex);
+
+      //TODO: We should only update State with installed snapshot once the 
request is done.
+      state.installSnapshot(request);
+
+      // update the committed index
+      // re-load the state machine if this is the last chunk
+      if (request.getDone()) {
+        state.reloadStateMachine(lastIncludedIndex, leaderTerm);
+      }
+      if (lifeCycle.getCurrentState() == RUNNING) {
+        heartbeatMonitor.updateLastRpcTime(false);
+      }
+    }
+    if (request.getDone()) {
+      LOG.info("{}: successfully install the whole snapshot-{}", getId(),
+          lastIncludedIndex);
+    }
+    return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getId(),
+        currentTerm, request.getRequestIndex(), InstallSnapshotResult.SUCCESS);
+  }
+
+  AppendEntriesRequestProto createAppendEntriesRequest(long leaderTerm,
+      String targetId, TermIndex previous, List<LogEntryProto> entries,
+      boolean initializing) {
+    return ServerProtoUtils.toAppendEntriesRequestProto(getId(), targetId,
+        leaderTerm, entries, state.getLog().getLastCommittedIndex(),
+        initializing, previous);
+  }
+
+  synchronized InstallSnapshotRequestProto createInstallSnapshotRequest(
+      String targetId, String requestId, int requestIndex, SnapshotInfo 
snapshot,
+      List<FileChunkProto> chunks, boolean done) {
+    OptionalLong totalSize = snapshot.getFiles().stream()
+        .mapToLong(FileInfo::getFileSize).reduce(Long::sum);
+    assert totalSize.isPresent();
+    return ServerProtoUtils.toInstallSnapshotRequestProto(getId(), targetId,
+        requestId, requestIndex, state.getCurrentTerm(), 
snapshot.getTermIndex(),
+        chunks, totalSize.getAsLong(), done);
+  }
+
+  synchronized RequestVoteRequestProto createRequestVoteRequest(String 
targetId,
+      long term, TermIndex lastEntry) {
+    return ServerProtoUtils.toRequestVoteRequestProto(getId(), targetId, term,
+        lastEntry);
+  }
+
+  public synchronized void submitLocalSyncEvent() {
+    if (isLeader() && leaderState != null) {
+      leaderState.submitUpdateStateEvent(LeaderState.UPDATE_COMMIT_EVENT);
+    }
+  }
+
+  public void addPeersToRPC(Iterable<RaftPeer> peers) {
+    serverRpc.addPeers(peers);
+  }
+
+  synchronized void replyPendingRequest(long logIndex,
+      CompletableFuture<Message> message) {
+    if (isLeader() && leaderState != null) { // is leader and is running
+      leaderState.replyPendingRequest(logIndex, message);
+    }
+  }
+
+  TransactionContext getTransactionContext(long index) {
+    if (leaderState != null) { // is leader and is running
+      return leaderState.getTransactionContext(index);
+    }
+    return null;
+  }
+
+  public RaftProperties getProperties() {
+    return this.properties;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/main/java/org/apache/raft/server/impl/RequestDispatcher.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/impl/RequestDispatcher.java 
b/raft-server/src/main/java/org/apache/raft/server/impl/RequestDispatcher.java
index cc8651d..b897afd 100644
--- 
a/raft-server/src/main/java/org/apache/raft/server/impl/RequestDispatcher.java
+++ 
b/raft-server/src/main/java/org/apache/raft/server/impl/RequestDispatcher.java
@@ -44,10 +44,10 @@ import java.util.concurrent.ExecutionException;
 public class RequestDispatcher implements RaftClientProtocol, 
RaftServerProtocol {
   static final Logger LOG = LoggerFactory.getLogger(RequestDispatcher.class);
 
-  private final RaftServer server;
+  private final RaftServerImpl server;
   private final StateMachine stateMachine;
 
-  public RequestDispatcher(RaftServer server) {
+  public RequestDispatcher(RaftServerImpl server) {
     this.server = server;
     this.stateMachine = server.getStateMachine();
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/main/java/org/apache/raft/server/impl/ServerState.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/impl/ServerState.java 
b/raft-server/src/main/java/org/apache/raft/server/impl/ServerState.java
index 6680175..c91968c 100644
--- a/raft-server/src/main/java/org/apache/raft/server/impl/ServerState.java
+++ b/raft-server/src/main/java/org/apache/raft/server/impl/ServerState.java
@@ -40,7 +40,7 @@ import static 
org.apache.raft.server.RaftServerConfigKeys.RAFT_SERVER_USE_MEMORY
  */
 public class ServerState implements Closeable {
   private final String selfId;
-  private final RaftServer server;
+  private final RaftServerImpl server;
   /** Raft log */
   private final RaftLog log;
   /** Raft configuration */
@@ -74,7 +74,7 @@ public class ServerState implements Closeable {
   private TermIndex latestInstalledSnapshot;
 
   ServerState(String id, RaftConfiguration conf, RaftProperties prop,
-      RaftServer server, StateMachine stateMachine) throws IOException {
+              RaftServerImpl server, StateMachine stateMachine) throws 
IOException {
     this.selfId = id;
     this.server = server;
     configurationManager = new ConfigurationManager(conf);
@@ -128,7 +128,7 @@ public class ServerState implements Closeable {
    * note we do not apply log entries to the state machine here since we do not
    * know whether they have been committed.
    */
-  private RaftLog initLog(String id, RaftProperties prop, RaftServer server,
+  private RaftLog initLog(String id, RaftProperties prop, RaftServerImpl 
server,
       long lastIndexInSnapshot) throws IOException {
     final RaftLog log;
     if (prop.getBoolean(RAFT_SERVER_USE_MEMORY_LOG_KEY,
@@ -273,7 +273,7 @@ public class ServerState implements Closeable {
 
   public void setRaftConf(long logIndex, RaftConfiguration conf) {
     configurationManager.addConfiguration(logIndex, conf);
-    RaftServer.LOG.info("{}: successfully update the configuration {}",
+    RaftServerImpl.LOG.info("{}: successfully update the configuration {}",
         getSelfId(), conf);
   }
 
@@ -306,7 +306,7 @@ public class ServerState implements Closeable {
   @Override
   public void close() throws IOException {
     stateMachineUpdater.stop();
-    RaftServer.LOG.info("{} closes. The last applied log index is {}",
+    RaftServerImpl.LOG.info("{} closes. The last applied log index is {}",
         getSelfId(), getLastAppliedIndex());
     storage.close();
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/main/java/org/apache/raft/server/impl/StateMachineUpdater.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/impl/StateMachineUpdater.java
 
b/raft-server/src/main/java/org/apache/raft/server/impl/StateMachineUpdater.java
index 06fa221..f85639b 100644
--- 
a/raft-server/src/main/java/org/apache/raft/server/impl/StateMachineUpdater.java
+++ 
b/raft-server/src/main/java/org/apache/raft/server/impl/StateMachineUpdater.java
@@ -57,7 +57,7 @@ class StateMachineUpdater implements Runnable {
 
   private final RaftProperties properties;
   private final StateMachine stateMachine;
-  private final RaftServer server;
+  private final RaftServerImpl server;
   private final RaftLog raftLog;
 
   private volatile long lastAppliedIndex;
@@ -69,7 +69,7 @@ class StateMachineUpdater implements Runnable {
   private final Thread updater;
   private volatile State state = State.RUNNING;
 
-  StateMachineUpdater(StateMachine stateMachine, RaftServer server,
+  StateMachineUpdater(StateMachine stateMachine, RaftServerImpl server,
       RaftLog raftLog, long lastAppliedIndex, RaftProperties properties) {
     this.properties = properties;
     this.stateMachine = stateMachine;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogWorker.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogWorker.java 
b/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogWorker.java
index 1837e94..6cef212 100644
--- 
a/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogWorker.java
+++ 
b/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogWorker.java
@@ -20,7 +20,7 @@ package org.apache.raft.server.storage;
 import com.google.common.base.Preconditions;
 import org.apache.raft.conf.RaftProperties;
 import org.apache.raft.io.nativeio.NativeIO;
-import org.apache.raft.server.impl.RaftServer;
+import org.apache.raft.server.impl.RaftServerImpl;
 import org.apache.raft.server.impl.RaftServerConstants;
 import org.apache.raft.server.storage.LogSegment.SegmentFileInfo;
 import org.apache.raft.server.storage.RaftLogCache.TruncationSegments;
@@ -56,7 +56,7 @@ class RaftLogWorker implements Runnable {
 
   private final RaftStorage storage;
   private LogOutputStream out;
-  private final RaftServer raftServer;
+  private final RaftServerImpl raftServer;
 
   /**
    * The number of entries that have been written into the LogOutputStream but
@@ -72,8 +72,8 @@ class RaftLogWorker implements Runnable {
 
   private final  RaftProperties properties;
 
-  RaftLogWorker(RaftServer raftServer, RaftStorage storage,
-      RaftProperties properties) {
+  RaftLogWorker(RaftServerImpl raftServer, RaftStorage storage,
+                RaftProperties properties) {
     this.raftServer = raftServer;
     this.storage = storage;
     this.properties = properties;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/main/java/org/apache/raft/server/storage/SegmentedRaftLog.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/storage/SegmentedRaftLog.java
 
b/raft-server/src/main/java/org/apache/raft/server/storage/SegmentedRaftLog.java
index 9c55491..293e1a4 100644
--- 
a/raft-server/src/main/java/org/apache/raft/server/storage/SegmentedRaftLog.java
+++ 
b/raft-server/src/main/java/org/apache/raft/server/storage/SegmentedRaftLog.java
@@ -22,7 +22,7 @@ import com.google.common.base.Preconditions;
 import org.apache.commons.io.Charsets;
 import org.apache.raft.conf.RaftProperties;
 import org.apache.raft.server.impl.ConfigurationManager;
-import org.apache.raft.server.impl.RaftServer;
+import org.apache.raft.server.impl.RaftServerImpl;
 import org.apache.raft.server.impl.RaftServerConstants;
 import org.apache.raft.server.storage.RaftStorageDirectory.LogPathAndIndex;
 import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
@@ -100,8 +100,8 @@ public class SegmentedRaftLog extends RaftLog {
   private final RaftLogWorker fileLogWorker;
   private final long segmentMaxSize;
 
-  public SegmentedRaftLog(String selfId, RaftServer server, RaftStorage 
storage,
-      long lastIndexInSnapshot, RaftProperties properties) throws IOException {
+  public SegmentedRaftLog(String selfId, RaftServerImpl server, RaftStorage 
storage,
+                          long lastIndexInSnapshot, RaftProperties properties) 
throws IOException {
     super(selfId);
     this.storage = storage;
     this.segmentMaxSize = properties.getLong(RAFT_LOG_SEGMENT_MAX_SIZE_KEY,

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/test/java/org/apache/raft/MiniRaftCluster.java
----------------------------------------------------------------------
diff --git a/raft-server/src/test/java/org/apache/raft/MiniRaftCluster.java 
b/raft-server/src/test/java/org/apache/raft/MiniRaftCluster.java
index 4f0871f..45cec15 100644
--- a/raft-server/src/test/java/org/apache/raft/MiniRaftCluster.java
+++ b/raft-server/src/test/java/org/apache/raft/MiniRaftCluster.java
@@ -26,7 +26,7 @@ import org.apache.raft.protocol.RaftPeer;
 import org.apache.raft.server.RaftServerConfigKeys;
 import org.apache.raft.server.impl.DelayLocalExecutionInjection;
 import org.apache.raft.server.impl.RaftConfiguration;
-import org.apache.raft.server.impl.RaftServer;
+import org.apache.raft.server.impl.RaftServerImpl;
 import org.apache.raft.server.impl.RaftServerRpc;
 import org.apache.raft.server.storage.MemoryRaftLog;
 import org.apache.raft.server.storage.RaftLog;
@@ -72,7 +72,7 @@ public abstract class MiniRaftCluster {
       super(ids, properties, formatted);
     }
 
-    protected abstract RaftServer setPeerRpc(RaftPeer peer) throws IOException;
+    protected abstract RaftServerImpl setPeerRpc(RaftPeer peer) throws 
IOException;
 
     @Override
     protected void setPeerRpc() throws IOException {
@@ -133,7 +133,7 @@ public abstract class MiniRaftCluster {
   protected RaftConfiguration conf;
   protected final RaftProperties properties;
   private final String testBaseDir;
-  protected final Map<String, RaftServer> servers =
+  protected final Map<String, RaftServerImpl> servers =
       Collections.synchronizedMap(new LinkedHashMap<>());
 
   public MiniRaftCluster(String[] ids, RaftProperties properties,
@@ -152,7 +152,7 @@ public abstract class MiniRaftCluster {
     LOG.info("peers = " + peers.keySet());
     conf = RaftConfiguration.newBuilder().setConf(peers.keySet()).build();
     for (Map.Entry<RaftPeer, RPC> entry : peers.entrySet()) {
-      final RaftServer server = servers.get(entry.getKey().getId());
+      final RaftServerImpl server = servers.get(entry.getKey().getId());
       server.setInitialConf(conf);
       server.setServerRpc(entry.getValue());
     }
@@ -160,7 +160,7 @@ public abstract class MiniRaftCluster {
 
   public void start() {
     LOG.info("Starting " + getClass().getSimpleName());
-    servers.values().forEach(RaftServer::start);
+    servers.values().forEach(RaftServerImpl::start);
   }
 
   /**
@@ -173,8 +173,8 @@ public abstract class MiniRaftCluster {
   }
 
   public final void restart(boolean format) throws IOException {
-    servers.values().stream().filter(RaftServer::isAlive)
-        .forEach(RaftServer::close);
+    servers.values().stream().filter(RaftServerImpl::isAlive)
+        .forEach(RaftServerImpl::close);
     List<String> idList = new ArrayList<>(servers.keySet());
     for (String id : idList) {
       servers.remove(id);
@@ -197,16 +197,16 @@ public abstract class MiniRaftCluster {
     return conf;
   }
 
-  private RaftServer newRaftServer(String id, RaftConfiguration conf,
-      boolean format) {
-    final RaftServer s;
+  private RaftServerImpl newRaftServer(String id, RaftConfiguration conf,
+                                       boolean format) {
+    final RaftServerImpl s;
     try {
       final String dirStr = testBaseDir + id;
       if (format) {
         formatDir(dirStr);
       }
       properties.set(RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_KEY, dirStr);
-      s = new RaftServer(id, conf, properties, 
getStateMachine4Test(properties));
+      s = new RaftServerImpl(id, conf, properties, 
getStateMachine4Test(properties));
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -224,20 +224,20 @@ public abstract class MiniRaftCluster {
   public abstract RaftClientRequestSender getRaftClientRequestSender();
 
   protected <RPC extends RaftServerRpc> Collection<RaftPeer> addNewPeers(
-      Map<RaftPeer, RPC> newPeers, Collection<RaftServer> newServers,
+      Map<RaftPeer, RPC> newPeers, Collection<RaftServerImpl> newServers,
       boolean startService) throws IOException {
     for (Map.Entry<RaftPeer, RPC> entry : newPeers.entrySet()) {
-      RaftServer server = servers.get(entry.getKey().getId());
+      RaftServerImpl server = servers.get(entry.getKey().getId());
       server.setServerRpc(entry.getValue());
     }
     if (startService) {
-      newServers.forEach(RaftServer::start);
+      newServers.forEach(RaftServerImpl::start);
     }
     return new ArrayList<>(newPeers.keySet());
   }
 
   protected abstract Collection<RaftPeer> addNewPeers(
-      Collection<RaftPeer> newPeers, Collection<RaftServer> newServers,
+      Collection<RaftPeer> newPeers, Collection<RaftServerImpl> newServers,
       boolean startService) throws IOException;
 
   public PeerChanges addNewPeers(int number, boolean startNewPeer)
@@ -254,9 +254,9 @@ public abstract class MiniRaftCluster {
     }
 
     // create and add new RaftServers
-    final List<RaftServer> newServers = new ArrayList<>(ids.length);
+    final List<RaftServerImpl> newServers = new ArrayList<>(ids.length);
     for (RaftPeer p : newPeers) {
-      RaftServer newServer = newRaftServer(p.getId(), conf, true);
+      RaftServerImpl newServer = newRaftServer(p.getId(), conf, true);
       Preconditions.checkArgument(!servers.containsKey(p.getId()));
       servers.put(p.getId(), newServer);
       newServers.add(newServer);
@@ -273,12 +273,12 @@ public abstract class MiniRaftCluster {
   }
 
   public void startServer(String id) {
-    RaftServer server = servers.get(id);
+    RaftServerImpl server = servers.get(id);
     assert server != null;
     server.start();
   }
 
-  private RaftPeer getPeer(RaftServer s) {
+  private RaftPeer getPeer(RaftServerImpl s) {
     return new RaftPeer(s.getId(), s.getServerRpc().getInetSocketAddress());
   }
 
@@ -295,7 +295,7 @@ public abstract class MiniRaftCluster {
       peers.remove(leader);
       removedPeers.add(leader);
     }
-    List<RaftServer> followers = getFollowers();
+    List<RaftServerImpl> followers = getFollowers();
     for (int i = 0, removed = 0; i < followers.size() &&
         removed < (removeLeader ? number - 1 : number); i++) {
       RaftPeer toRemove = getPeer(followers.get(i));
@@ -317,7 +317,7 @@ public abstract class MiniRaftCluster {
 
   public String printServers() {
     StringBuilder b = new StringBuilder("\n#servers = " + servers.size() + 
"\n");
-    for (RaftServer s : servers.values()) {
+    for (RaftServerImpl s : servers.values()) {
       b.append("  ");
       b.append(s).append("\n");
     }
@@ -326,7 +326,7 @@ public abstract class MiniRaftCluster {
 
   public String printAllLogs() {
     StringBuilder b = new StringBuilder("\n#servers = " + servers.size() + 
"\n");
-    for (RaftServer s : servers.values()) {
+    for (RaftServerImpl s : servers.values()) {
       b.append("  ");
       b.append(s).append("\n");
 
@@ -339,8 +339,8 @@ public abstract class MiniRaftCluster {
     return b.toString();
   }
 
-  public RaftServer getLeader() {
-    final List<RaftServer> leaders = new ArrayList<>();
+  public RaftServerImpl getLeader() {
+    final List<RaftServerImpl> leaders = new ArrayList<>();
     servers.values().stream()
         .filter(s -> s.isAlive() && s.isLeader())
         .forEach(s -> {
@@ -367,21 +367,21 @@ public abstract class MiniRaftCluster {
   }
 
   public boolean isLeader(String leaderId) throws InterruptedException {
-    final RaftServer leader = getLeader();
+    final RaftServerImpl leader = getLeader();
     return leader != null && leader.getId().equals(leaderId);
   }
 
-  public List<RaftServer> getFollowers() {
+  public List<RaftServerImpl> getFollowers() {
     return servers.values().stream()
         .filter(s -> s.isAlive() && s.isFollower())
         .collect(Collectors.toList());
   }
 
-  public Collection<RaftServer> getServers() {
+  public Collection<RaftServerImpl> getServers() {
     return servers.values();
   }
 
-  public RaftServer getServer(String id) {
+  public RaftServerImpl getServer(String id) {
     return servers.get(id);
   }
 
@@ -398,8 +398,8 @@ public abstract class MiniRaftCluster {
 
   public void shutdown() {
     LOG.info("Stopping " + getClass().getSimpleName());
-    servers.values().stream().filter(RaftServer::isAlive)
-        .forEach(RaftServer::close);
+    servers.values().stream().filter(RaftServerImpl::isAlive)
+        .forEach(RaftServerImpl::close);
 
     if (ExitUtils.isTerminated()) {
       LOG.error("Test resulted in an unexpected exit",

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/test/java/org/apache/raft/RaftBasicTests.java
----------------------------------------------------------------------
diff --git a/raft-server/src/test/java/org/apache/raft/RaftBasicTests.java 
b/raft-server/src/test/java/org/apache/raft/RaftBasicTests.java
index 921e063..ed40bde 100644
--- a/raft-server/src/test/java/org/apache/raft/RaftBasicTests.java
+++ b/raft-server/src/test/java/org/apache/raft/RaftBasicTests.java
@@ -20,7 +20,7 @@ package org.apache.raft;
 import org.apache.raft.RaftTestUtil.SimpleMessage;
 import org.apache.raft.client.RaftClient;
 import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.server.impl.RaftServer;
+import org.apache.raft.server.impl.RaftServerImpl;
 import org.junit.*;
 import org.junit.rules.Timeout;
 import org.slf4j.Logger;
@@ -80,7 +80,7 @@ public abstract class RaftBasicTests {
   public void testBasicAppendEntries() throws Exception {
     LOG.info("Running testBasicAppendEntries");
     final MiniRaftCluster cluster = getCluster();
-    RaftServer leader = waitForLeader(cluster);
+    RaftServerImpl leader = waitForLeader(cluster);
     final long term = leader.getState().getCurrentTerm();
     final String killed = cluster.getFollowers().get(3).getId();
     cluster.killServer(killed);
@@ -96,7 +96,7 @@ public abstract class RaftBasicTests {
     Thread.sleep(cluster.getMaxTimeout() + 100);
     LOG.info(cluster.printAllLogs());
 
-    cluster.getServers().stream().filter(RaftServer::isAlive)
+    cluster.getServers().stream().filter(RaftServerImpl::isAlive)
         .map(s -> s.getState().getLog().getEntries(1, Long.MAX_VALUE))
         .forEach(e -> RaftTestUtil.assertLogEntries(e, 1, term, messages));
   }
@@ -174,7 +174,7 @@ public abstract class RaftBasicTests {
       lastStep = n;
       count++;
 
-      RaftServer leader = cluster.getLeader();
+      RaftServerImpl leader = cluster.getLeader();
       if (leader != null) {
         final String oldLeader = leader.getId();
         LOG.info("Block all requests sent by leader " + oldLeader);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/test/java/org/apache/raft/RaftNotLeaderExceptionBaseTest.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/test/java/org/apache/raft/RaftNotLeaderExceptionBaseTest.java 
b/raft-server/src/test/java/org/apache/raft/RaftNotLeaderExceptionBaseTest.java
index 8a249e9..195cbec 100644
--- 
a/raft-server/src/test/java/org/apache/raft/RaftNotLeaderExceptionBaseTest.java
+++ 
b/raft-server/src/test/java/org/apache/raft/RaftNotLeaderExceptionBaseTest.java
@@ -25,7 +25,7 @@ import org.apache.raft.client.impl.RaftClientImpl;
 import org.apache.raft.protocol.RaftClientReply;
 import org.apache.raft.protocol.RaftClientRequest;
 import org.apache.raft.protocol.RaftPeer;
-import org.apache.raft.server.impl.RaftServer;
+import org.apache.raft.server.impl.RaftServerImpl;
 import org.apache.raft.server.simulation.RequestHandler;
 import org.apache.raft.server.storage.RaftLog;
 import org.apache.raft.util.RaftUtils;
@@ -42,7 +42,7 @@ import static 
org.apache.raft.server.impl.RaftServerConstants.DEFAULT_SEQNUM;
 
 public abstract class RaftNotLeaderExceptionBaseTest {
   static {
-    RaftUtils.setLogLevel(RaftServer.LOG, Level.DEBUG);
+    RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
     RaftUtils.setLogLevel(RaftLog.LOG, Level.DEBUG);
     RaftUtils.setLogLevel(RequestHandler.LOG, Level.DEBUG);
     RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/test/java/org/apache/raft/RaftTestUtil.java
----------------------------------------------------------------------
diff --git a/raft-server/src/test/java/org/apache/raft/RaftTestUtil.java 
b/raft-server/src/test/java/org/apache/raft/RaftTestUtil.java
index 92bf5c4..461dd15 100644
--- a/raft-server/src/test/java/org/apache/raft/RaftTestUtil.java
+++ b/raft-server/src/test/java/org/apache/raft/RaftTestUtil.java
@@ -23,7 +23,7 @@ import org.apache.raft.protocol.Message;
 import org.apache.raft.server.RaftServerConfigKeys;
 import org.apache.raft.server.impl.BlockRequestHandlingInjection;
 import org.apache.raft.server.impl.DelayLocalExecutionInjection;
-import org.apache.raft.server.impl.RaftServer;
+import org.apache.raft.server.impl.RaftServerImpl;
 import org.apache.raft.shaded.com.google.protobuf.ByteString;
 import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.raft.shaded.proto.RaftProtos.SMLogEntryProto;
@@ -46,11 +46,11 @@ import static org.apache.raft.util.ProtoUtils.toByteString;
 public class RaftTestUtil {
   static final Logger LOG = LoggerFactory.getLogger(RaftTestUtil.class);
 
-  public static RaftServer waitForLeader(MiniRaftCluster cluster)
+  public static RaftServerImpl waitForLeader(MiniRaftCluster cluster)
       throws InterruptedException {
     final long sleepTime = (cluster.getMaxTimeout() * 3) >> 1;
     LOG.info(cluster.printServers());
-    RaftServer leader = null;
+    RaftServerImpl leader = null;
     for(int i = 0; leader == null && i < 10; i++) {
       Thread.sleep(sleepTime);
       leader = cluster.getLeader();
@@ -59,11 +59,11 @@ public class RaftTestUtil {
     return leader;
   }
 
-  public static RaftServer waitForLeader(MiniRaftCluster cluster,
-      final String leaderId) throws InterruptedException {
+  public static RaftServerImpl waitForLeader(MiniRaftCluster cluster,
+                                             final String leaderId) throws 
InterruptedException {
     LOG.info(cluster.printServers());
     for(int i = 0; !cluster.tryEnforceLeader(leaderId) && i < 10; i++) {
-      RaftServer currLeader = cluster.getLeader();
+      RaftServerImpl currLeader = cluster.getLeader();
       if (LOG.isDebugEnabled()) {
         LOG.debug("try enforcing leader to " + leaderId + " but "
             + (currLeader == null? "no leader for this round"
@@ -72,14 +72,14 @@ public class RaftTestUtil {
     }
     LOG.info(cluster.printServers());
 
-    final RaftServer leader = cluster.getLeader();
+    final RaftServerImpl leader = cluster.getLeader();
     Assert.assertEquals(leaderId, leader.getId());
     return leader;
   }
 
   public static String waitAndKillLeader(MiniRaftCluster cluster,
       boolean expectLeader) throws InterruptedException {
-    final RaftServer leader = waitForLeader(cluster);
+    final RaftServerImpl leader = waitForLeader(cluster);
     if (!expectLeader) {
       Assert.assertNull(leader);
     } else {
@@ -105,11 +105,11 @@ public class RaftTestUtil {
     return idxExpected == expectedMessages.length;
   }
 
-  public static void assertLogEntries(Collection<RaftServer> servers,
+  public static void assertLogEntries(Collection<RaftServerImpl> servers,
                                       SimpleMessage... expectedMessages) {
     final int size = servers.size();
     final long count = servers.stream()
-        .filter(RaftServer::isAlive)
+        .filter(RaftServerImpl::isAlive)
         .map(s -> s.getState().getLog().getEntries(0, Long.MAX_VALUE))
         .filter(e -> logEntriesContains(e, expectedMessages))
         .count();
@@ -269,7 +269,7 @@ public class RaftTestUtil {
     return newLeader;
   }
 
-  public static void blockQueueAndSetDelay(Collection<RaftServer> servers,
+  public static void blockQueueAndSetDelay(Collection<RaftServerImpl> servers,
       DelayLocalExecutionInjection injection, String leaderId, int delayMs,
       long maxTimeout) throws InterruptedException {
     // block reqeusts sent to leader if delayMs > 0

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/test/java/org/apache/raft/server/impl/BlockRequestHandlingInjection.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/test/java/org/apache/raft/server/impl/BlockRequestHandlingInjection.java
 
b/raft-server/src/test/java/org/apache/raft/server/impl/BlockRequestHandlingInjection.java
index 0980e93..7f7de9a 100644
--- 
a/raft-server/src/test/java/org/apache/raft/server/impl/BlockRequestHandlingInjection.java
+++ 
b/raft-server/src/test/java/org/apache/raft/server/impl/BlockRequestHandlingInjection.java
@@ -29,9 +29,9 @@ public class BlockRequestHandlingInjection implements 
CodeInjectionForTesting.Co
       new BlockRequestHandlingInjection();
 
   static {
-    CodeInjectionForTesting.put(RaftServer.REQUEST_VOTE, INSTANCE);
-    CodeInjectionForTesting.put(RaftServer.APPEND_ENTRIES, INSTANCE);
-    CodeInjectionForTesting.put(RaftServer.INSTALL_SNAPSHOT, INSTANCE);
+    CodeInjectionForTesting.put(RaftServerImpl.REQUEST_VOTE, INSTANCE);
+    CodeInjectionForTesting.put(RaftServerImpl.APPEND_ENTRIES, INSTANCE);
+    CodeInjectionForTesting.put(RaftServerImpl.INSTALL_SNAPSHOT, INSTANCE);
   }
 
   public static BlockRequestHandlingInjection getInstance() {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/test/java/org/apache/raft/server/impl/RaftReconfigurationBaseTest.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/test/java/org/apache/raft/server/impl/RaftReconfigurationBaseTest.java
 
b/raft-server/src/test/java/org/apache/raft/server/impl/RaftReconfigurationBaseTest.java
index 30f1e15..8a5af69 100644
--- 
a/raft-server/src/test/java/org/apache/raft/server/impl/RaftReconfigurationBaseTest.java
+++ 
b/raft-server/src/test/java/org/apache/raft/server/impl/RaftReconfigurationBaseTest.java
@@ -54,7 +54,7 @@ import static 
org.apache.raft.shaded.proto.RaftProtos.LogEntryProto.LogEntryBody
 
 public abstract class RaftReconfigurationBaseTest {
   static {
-    RaftUtils.setLogLevel(RaftServer.LOG, Level.DEBUG);
+    RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
     RaftUtils.setLogLevel(RequestHandler.LOG, Level.DEBUG);
     RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
   }
@@ -221,7 +221,7 @@ public abstract class RaftReconfigurationBaseTest {
 
       // check configuration manager's internal state
       // each reconf will generate two configurations: (old, new) and (new)
-      cluster.getServers().stream().filter(RaftServer::isAlive)
+      cluster.getServers().stream().filter(RaftServerImpl::isAlive)
           .forEach(server -> {
         ConfigurationManager confManager =
             (ConfigurationManager) Whitebox.getInternalState(server.getState(),

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/test/java/org/apache/raft/server/impl/RaftServerTestUtil.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/test/java/org/apache/raft/server/impl/RaftServerTestUtil.java 
b/raft-server/src/test/java/org/apache/raft/server/impl/RaftServerTestUtil.java
index b30ddc9..5103fca 100644
--- 
a/raft-server/src/test/java/org/apache/raft/server/impl/RaftServerTestUtil.java
+++ 
b/raft-server/src/test/java/org/apache/raft/server/impl/RaftServerTestUtil.java
@@ -46,7 +46,7 @@ public class RaftServerTestUtil {
     int deadIncluded = 0;
     final RaftConfiguration current = RaftConfiguration.newBuilder()
         .setConf(peers).setLogEntryIndex(0).build();
-    for (RaftServer server : cluster.getServers()) {
+    for (RaftServerImpl server : cluster.getServers()) {
       if (deadPeers != null && deadPeers.contains(server.getId())) {
         if (current.containsInConf(server.getId())) {
           deadIncluded++;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/test/java/org/apache/raft/server/simulation/MiniRaftClusterWithSimulatedRpc.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/test/java/org/apache/raft/server/simulation/MiniRaftClusterWithSimulatedRpc.java
 
b/raft-server/src/test/java/org/apache/raft/server/simulation/MiniRaftClusterWithSimulatedRpc.java
index 360fe1e..7414872 100644
--- 
a/raft-server/src/test/java/org/apache/raft/server/simulation/MiniRaftClusterWithSimulatedRpc.java
+++ 
b/raft-server/src/test/java/org/apache/raft/server/simulation/MiniRaftClusterWithSimulatedRpc.java
@@ -21,7 +21,7 @@ import org.apache.raft.MiniRaftCluster;
 import org.apache.raft.client.RaftClientRequestSender;
 import org.apache.raft.conf.RaftProperties;
 import org.apache.raft.protocol.RaftPeer;
-import org.apache.raft.server.impl.RaftServer;
+import org.apache.raft.server.impl.RaftServerImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,7 +70,7 @@ public class MiniRaftClusterWithSimulatedRpc extends 
MiniRaftCluster {
     setRpcServers(getServers());
   }
 
-  private void setRpcServers(Collection<RaftServer> newServers) {
+  private void setRpcServers(Collection<RaftServerImpl> newServers) {
     newServers.forEach(s -> s.setServerRpc(
         new SimulatedServerRpc(s, serverRequestReply, 
client2serverRequestReply)));
   }
@@ -88,7 +88,7 @@ public class MiniRaftClusterWithSimulatedRpc extends 
MiniRaftCluster {
   @Override
   public void restartServer(String id, boolean format) throws IOException {
     super.restartServer(id, format);
-    RaftServer s = getServer(id);
+    RaftServerImpl s = getServer(id);
     addPeersToRpc(Collections.singletonList(conf.getPeer(id)));
     s.setServerRpc(new SimulatedServerRpc(s, serverRequestReply,
         client2serverRequestReply));
@@ -97,11 +97,11 @@ public class MiniRaftClusterWithSimulatedRpc extends 
MiniRaftCluster {
 
   @Override
   public Collection<RaftPeer> addNewPeers(Collection<RaftPeer> newPeers,
-      Collection<RaftServer> newServers, boolean startService) {
+                                          Collection<RaftServerImpl> 
newServers, boolean startService) {
     addPeersToRpc(newPeers);
     setRpcServers(newServers);
     if (startService) {
-      newServers.forEach(RaftServer::start);
+      newServers.forEach(RaftServerImpl::start);
     }
     return newPeers;
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java
 
b/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java
index ed522d4..93e3f5c 100644
--- 
a/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java
+++ 
b/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java
@@ -22,7 +22,7 @@ import org.apache.raft.protocol.RaftClientReply;
 import org.apache.raft.protocol.RaftClientRequest;
 import org.apache.raft.protocol.RaftPeer;
 import org.apache.raft.protocol.SetConfigurationRequest;
-import org.apache.raft.server.impl.RaftServer;
+import org.apache.raft.server.impl.RaftServerImpl;
 import org.apache.raft.server.impl.RaftServerRpc;
 import org.apache.raft.server.impl.RequestDispatcher;
 import org.apache.raft.shaded.proto.RaftProtos.*;
@@ -39,14 +39,14 @@ import java.util.concurrent.TimeUnit;
 public class SimulatedServerRpc implements RaftServerRpc {
   static final Logger LOG = LoggerFactory.getLogger(SimulatedServerRpc.class);
 
-  private final RaftServer server;
+  private final RaftServerImpl server;
   private final RequestDispatcher dispatcher;
   private final RequestHandler<RaftServerRequest, RaftServerReply> 
serverHandler;
   private final RequestHandler<RaftClientRequest, RaftClientReply> 
clientHandler;
   private final ExecutorService executor = Executors.newFixedThreadPool(3,
       new ThreadFactoryBuilder().setDaemon(true).build());
 
-  public SimulatedServerRpc(RaftServer server,
+  public SimulatedServerRpc(RaftServerImpl server,
       SimulatedRequestReply<RaftServerRequest, RaftServerReply> 
serverRequestReply,
       SimulatedRequestReply<RaftClientRequest, RaftClientReply> 
clientRequestReply) {
     this.server = server;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/test/java/org/apache/raft/server/simulation/TestRaftWithSimulatedRpc.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/test/java/org/apache/raft/server/simulation/TestRaftWithSimulatedRpc.java
 
b/raft-server/src/test/java/org/apache/raft/server/simulation/TestRaftWithSimulatedRpc.java
index 669226a..faa9dd8 100644
--- 
a/raft-server/src/test/java/org/apache/raft/server/simulation/TestRaftWithSimulatedRpc.java
+++ 
b/raft-server/src/test/java/org/apache/raft/server/simulation/TestRaftWithSimulatedRpc.java
@@ -21,7 +21,7 @@ import org.apache.log4j.Level;
 import org.apache.raft.RaftBasicTests;
 import org.apache.raft.client.RaftClient;
 import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.server.impl.RaftServer;
+import org.apache.raft.server.impl.RaftServerImpl;
 import org.apache.raft.util.RaftUtils;
 
 import java.io.IOException;
@@ -29,7 +29,7 @@ import java.util.concurrent.ThreadLocalRandom;
 
 public class TestRaftWithSimulatedRpc extends RaftBasicTests {
   static {
-    RaftUtils.setLogLevel(RaftServer.LOG, Level.DEBUG);
+    RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
     RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/test/java/org/apache/raft/statemachine/RaftSnapshotBaseTest.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/test/java/org/apache/raft/statemachine/RaftSnapshotBaseTest.java
 
b/raft-server/src/test/java/org/apache/raft/statemachine/RaftSnapshotBaseTest.java
index 41ae9af..721d12c 100644
--- 
a/raft-server/src/test/java/org/apache/raft/statemachine/RaftSnapshotBaseTest.java
+++ 
b/raft-server/src/test/java/org/apache/raft/statemachine/RaftSnapshotBaseTest.java
@@ -25,7 +25,7 @@ import org.apache.raft.client.RaftClient;
 import org.apache.raft.conf.RaftProperties;
 import org.apache.raft.protocol.RaftClientReply;
 import org.apache.raft.protocol.SetConfigurationRequest;
-import org.apache.raft.server.impl.RaftServer;
+import org.apache.raft.server.impl.RaftServerImpl;
 import org.apache.raft.server.impl.RaftServerTestUtil;
 import org.apache.raft.server.simulation.RequestHandler;
 import org.apache.raft.server.storage.RaftLog;
@@ -52,7 +52,7 @@ import static 
org.apache.raft.server.impl.RaftServerConstants.DEFAULT_SEQNUM;
 
 public abstract class RaftSnapshotBaseTest {
   static {
-    RaftUtils.setLogLevel(RaftServer.LOG, Level.DEBUG);
+    RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
     RaftUtils.setLogLevel(RaftLog.LOG, Level.DEBUG);
     RaftUtils.setLogLevel(RequestHandler.LOG, Level.DEBUG);
     RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/test/java/org/apache/raft/statemachine/TestStateMachine.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/test/java/org/apache/raft/statemachine/TestStateMachine.java 
b/raft-server/src/test/java/org/apache/raft/statemachine/TestStateMachine.java
index c9dd99c..5892c65 100644
--- 
a/raft-server/src/test/java/org/apache/raft/statemachine/TestStateMachine.java
+++ 
b/raft-server/src/test/java/org/apache/raft/statemachine/TestStateMachine.java
@@ -25,7 +25,7 @@ import org.apache.raft.conf.RaftProperties;
 import org.apache.raft.protocol.Message;
 import org.apache.raft.protocol.RaftClientRequest;
 import org.apache.raft.server.RaftServerConfigKeys;
-import org.apache.raft.server.impl.RaftServer;
+import org.apache.raft.server.impl.RaftServerImpl;
 import org.apache.raft.server.simulation.MiniRaftClusterWithSimulatedRpc;
 import org.apache.raft.shaded.proto.RaftProtos.SMLogEntryProto;
 import org.apache.raft.util.RaftUtils;
@@ -50,7 +50,7 @@ import static org.junit.Assert.*;
  */
 public class TestStateMachine {
   static {
-    RaftUtils.setLogLevel(RaftServer.LOG, Level.DEBUG);
+    RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
     RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
   }
 
@@ -161,14 +161,14 @@ public class TestStateMachine {
     // TODO: there eshould be a better way to ensure all data is replicated 
and applied
     Thread.sleep(cluster.getMaxTimeout() + 100);
 
-    for (RaftServer raftServer : cluster.getServers()) {
+    for (RaftServerImpl raftServer : cluster.getServers()) {
       SMTransactionContext sm = 
((SMTransactionContext)raftServer.getStateMachine());
       sm.rethrowIfException();
       assertEquals(numTrx, sm.numApplied.get());
     }
 
     // check leader
-    RaftServer raftServer = cluster.getLeader();
+    RaftServerImpl raftServer = cluster.getLeader();
     // assert every transaction has obtained context in leader
     SMTransactionContext sm = 
((SMTransactionContext)raftServer.getStateMachine());
     List<Long> ll = sm.applied.stream().collect(Collectors.toList());

Reply via email to