Repository: incubator-ratis
Updated Branches:
  refs/heads/master c3845bc3f -> 9b84d79cf


RATIS-337. In RaftServerImpl, leaderState/heartbeatMonitor may be accessed 
without proper null check.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/9b84d79c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/9b84d79c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/9b84d79c

Branch: refs/heads/master
Commit: 9b84d79cf9b305aff99b65c782ad340671a58c87
Parents: c3845bc
Author: Tsz Wo Nicholas Sze <[email protected]>
Authored: Mon Oct 8 14:48:30 2018 +0800
Committer: Tsz Wo Nicholas Sze <[email protected]>
Committed: Mon Oct 8 14:48:30 2018 +0800

----------------------------------------------------------------------
 .../ratis/server/impl/RaftServerImpl.java       | 200 ++++++-------------
 .../org/apache/ratis/server/impl/RoleInfo.java  |  82 +++++++-
 .../ratis/server/impl/ServerProtoUtils.java     |  10 +
 .../apache/ratis/server/impl/ServerState.java   |   9 +-
 .../ratis/server/impl/RaftServerTestUtil.java   |   2 +-
 5 files changed, 162 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9b84d79c/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 5e7bd89..2d7f85a 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -45,7 +45,6 @@ import java.util.concurrent.*;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 import static 
org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.*;
 import static 
org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY;
@@ -73,15 +72,6 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
   private final Supplier<RaftPeer> peerSupplier = JavaUtils.memoize(() -> new 
RaftPeer(getId(), getServerRpc().getInetSocketAddress()));
   private final RoleInfo role;
 
-  /** used when the peer is follower, to monitor election timeout */
-  private volatile FollowerState heartbeatMonitor;
-
-  /** used when the peer is candidate, to request votes from other peers */
-  private volatile LeaderElection electionDaemon;
-
-  /** used when the peer is leader */
-  private volatile LeaderState leaderState;
-
   private final RetryCache retryCache;
   private final CommitInfoCache commitInfoCache = new CommitInfoCache();
 
@@ -93,7 +83,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
     this.groupId = group.getGroupId();
     this.lifeCycle = new LifeCycle(id);
     this.stateMachine = stateMachine;
-    this.role = new RoleInfo();
+    this.role = new RoleInfo(id);
 
     final RaftProperties properties = proxy.getProperties();
     minTimeoutMs = 
RaftServerConfigKeys.Rpc.timeoutMin(properties).toInt(TimeUnit.MILLISECONDS);
@@ -199,7 +189,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
    */
   private void startAsFollower() {
     setRole(RaftPeerRole.FOLLOWER, "startAsFollower");
-    startHeartbeatMonitor();
+    role.startFollowerState(this);
     lifeCycle.transition(RUNNING);
   }
 
@@ -210,23 +200,19 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
    */
   private void startInitializing() {
     setRole(RaftPeerRole.FOLLOWER, "startInitializing");
-    // do not start heartbeatMonitoring
+    // do not start FollowerState
   }
 
   public ServerState getState() {
     return state;
   }
 
-  LeaderState getLeaderState() {
-    return leaderState;
-  }
-
   public RaftPeerId getId() {
     return getState().getSelfId();
   }
 
-  RaftPeerRole getRole() {
-    return role.getCurrentRole();
+  RoleInfo getRole() {
+    return role;
   }
 
   RaftConfiguration getRaftConf() {
@@ -246,19 +232,19 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
         LOG.warn("Failed to un-register RaftServer JMX bean for " + getId(), 
ignored);
       }
       try {
-        shutdownHeartbeatMonitor();
+        role.shutdownFollowerState();
       } catch (Exception ignored) {
-        LOG.warn("Failed to shutdown heartbeat monitor for " + getId(), 
ignored);
+        LOG.warn("Failed to shutdown FollowerState for " + getId(), ignored);
       }
       try{
-        shutdownElectionDaemon();
+        role.shutdownLeaderElection();
       } catch (Exception ignored) {
-        LOG.warn("Failed to shutdown election daemon for " + getId(), ignored);
+        LOG.warn("Failed to shutdown LeaderElection for " + getId(), ignored);
       }
       try{
-        shutdownLeaderState(true);
+        role.shutdownLeaderState(true);
       } catch (Exception ignored) {
-        LOG.warn("Failed to shutdown leader state monitor for " + getId(), 
ignored);
+        LOG.warn("Failed to shutdown LeaderState monitor for " + getId(), 
ignored);
       }
       try{
         state.close();
@@ -305,11 +291,11 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
     if (old != RaftPeerRole.FOLLOWER) {
       setRole(RaftPeerRole.FOLLOWER, "changeToFollower");
       if (old == RaftPeerRole.LEADER) {
-        shutdownLeaderState(false);
+        role.shutdownLeaderState(false);
       } else if (old == RaftPeerRole.CANDIDATE) {
-        shutdownElectionDaemon();
+        role.shutdownLeaderElection();
       }
-      startHeartbeatMonitor();
+      role.startFollowerState(this);
     }
     return metadataUpdated;
   }
@@ -320,55 +306,17 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
     }
   }
 
-  private synchronized void shutdownLeaderState(boolean allowNull) {
-    if (leaderState == null) {
-      if (!allowNull) {
-        throw new NullPointerException("leaderState == null");
-      }
-    } else {
-      leaderState.stop();
-      leaderState = null;
-    }
-    // TODO: make sure that StateMachineUpdater has applied all transactions 
that have context
-  }
-
-  private void shutdownElectionDaemon() {
-    final LeaderElection election = electionDaemon;
-    if (election != null) {
-      election.stopRunning();
-      // no need to interrupt the election thread
-    }
-    electionDaemon = null;
-  }
-
   synchronized void changeToLeader() {
     Preconditions.assertTrue(isCandidate());
-    shutdownElectionDaemon();
+    role.shutdownLeaderElection();
     setRole(RaftPeerRole.LEADER, "changeToLeader");
     state.becomeLeader();
 
     // start sending AppendEntries RPC to followers
-    leaderState = new LeaderState(this, getProxy().getProperties());
-    final LogEntryProto e = leaderState.start();
+    final LogEntryProto e = role.startLeaderState(this, 
getProxy().getProperties());
     getState().setRaftConf(e.getIndex(), 
ServerProtoUtils.toRaftConfiguration(e));
   }
 
-  private void startHeartbeatMonitor() {
-    Preconditions.assertTrue(heartbeatMonitor == null, "heartbeatMonitor != 
null");
-    LOG.debug("{} starts heartbeatMonitor", getId());
-    heartbeatMonitor = new FollowerState(this);
-    heartbeatMonitor.start();
-  }
-
-  private void shutdownHeartbeatMonitor() {
-    final FollowerState hm = heartbeatMonitor;
-    if (hm != null) {
-      hm.stopRunning();
-      hm.interrupt();
-    }
-    heartbeatMonitor = null;
-  }
-
   Collection<CommitInfoProto> getCommitInfos() {
     final List<CommitInfoProto> infos = new ArrayList<>();
     // add the commit info of this server
@@ -376,7 +324,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
 
     // add the commit infos of other servers
     if (isLeader()) {
-      Optional.ofNullable(leaderState).ifPresent(
+      role.getLeaderState().ifPresent(
           leader -> leader.updateFollowerCommitInfos(commitInfoCache, infos));
     } else {
       getRaftConf().getPeers().stream()
@@ -408,22 +356,23 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
       break;
 
     case FOLLOWER:
-      FollowerInfoProto.Builder follower = FollowerInfoProto.newBuilder()
-          .setLeaderInfo(getServerRpcProto(
-              getRaftConf().getPeer(state.getLeaderId()),
-              heartbeatMonitor.getLastRpcTime().elapsedTimeMs()))
-          .setInLogSync(heartbeatMonitor.isInLogSync());
-      roleInfo.setFollowerInfo(follower);
+      role.getFollowerState().ifPresent(fs -> {
+        final ServerRpcProto leaderInfo = ServerProtoUtils.toServerRpcProto(
+            getRaftConf().getPeer(state.getLeaderId()), 
fs.getLastRpcTime().elapsedTimeMs());
+        roleInfo.setFollowerInfo(FollowerInfoProto.newBuilder()
+            .setLeaderInfo(leaderInfo)
+            .setInLogSync(fs.isInLogSync()));
+      });
       break;
 
     case LEADER:
-      LeaderInfoProto.Builder leader = LeaderInfoProto.newBuilder();
-      Stream<LogAppender> stream = getLeaderState().getLogAppenders();
-      stream.forEach(appender ->
-          leader.addFollowerInfo(getServerRpcProto(
-              appender.getFollower().getPeer(),
-              
appender.getFollower().getLastRpcResponseTime().elapsedTimeMs())));
-      roleInfo.setLeaderInfo(leader);
+      role.getLeaderState().ifPresent(ls -> {
+        final LeaderInfoProto.Builder leader = LeaderInfoProto.newBuilder();
+        ls.getLogAppenders().map(LogAppender::getFollower).forEach(f ->
+            leader.addFollowerInfo(ServerProtoUtils.toServerRpcProto(
+                f.getPeer(), f.getLastRpcResponseTime().elapsedTimeMs())));
+        roleInfo.setLeaderInfo(leader);
+      });
       break;
 
     default:
@@ -432,27 +381,15 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
     return roleInfo.build();
   }
 
-  private ServerRpcProto getServerRpcProto(RaftPeer peer, long delay) {
-    if (peer == null) {
-      // if no peer information return empty
-      return ServerRpcProto.getDefaultInstance();
-    }
-    return ServerRpcProto.newBuilder()
-        .setId(ProtoUtils.toRaftPeerProto(peer))
-        .setLastRpcElapsedTimeMs(delay)
-        .build();
-  }
-
   synchronized void changeToCandidate() {
     Preconditions.assertTrue(isFollower());
-    shutdownHeartbeatMonitor();
+    role.shutdownFollowerState();
     setRole(RaftPeerRole.CANDIDATE, "changeToCandidate");
     if (state.checkForExtendedNoLeader()) {
       stateMachine.notifyExtendedNoLeader(getGroup(), getRoleInfoProto());
     }
     // start election
-    electionDaemon = new LeaderElection(this);
-    electionDaemon.start();
+    role.startLeaderElection(this);
   }
 
   @Override
@@ -476,7 +413,9 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
       NotLeaderException exception = generateNotLeaderException();
       final RaftClientReply reply = new RaftClientReply(request, exception, 
getCommitInfos());
       return RetryCache.failWithReply(reply, entry);
-    } else if (leaderState == null || !leaderState.isReady()) {
+    }
+    final LeaderState leaderState = role.getLeaderState().orElse(null);
+    if (leaderState == null || !leaderState.isReady()) {
       RetryCache.CacheEntry cacheEntry = retryCache.get(request.getClientId(), 
request.getCallId());
       if (cacheEntry != null && cacheEntry.isCompletedNormally()) {
         return cacheEntry.getReplyFuture();
@@ -534,6 +473,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
       }
 
       // append the message to its local log
+      final LeaderState leaderState = role.getLeaderStateNonNull();
       final long entryIndex;
       try {
         entryIndex = state.applyLog(context, request.getClientId(),
@@ -696,6 +636,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
       }
 
       final RaftConfiguration current = getRaftConf();
+      final LeaderState leaderState = role.getLeaderStateNonNull();
       // make sure there is no other raft reconfiguration in progress
       if (!current.isStable() || leaderState.inStagingState() || 
!state.isConfCommitted()) {
         throw new ReconfigurationInProgressException(
@@ -723,7 +664,9 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
     } else if (isLeader()) {
       return true;
     } else {
-      return isFollower() && state.hasLeader() && 
heartbeatMonitor.shouldWithholdVotes();
+      // following a leader and not yet timeout
+      return isFollower() && state.hasLeader()
+          && 
role.getFollowerState().map(FollowerState::shouldWithholdVotes).orElse(false);
     }
   }
 
@@ -742,7 +685,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
         && getState().isConfCommitted()
         && !getRaftConf().containsInConf(candidateId)
         && candidateLastEntry.getIndex() < getRaftConf().getLogEntryIndex()
-        && !leaderState.isBootStrappingPeer(candidateId);
+        && role.getLeaderState().map(ls -> 
!ls.isBootStrappingPeer(candidateId)).orElse(false);
   }
 
   @Override
@@ -769,15 +712,16 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
     boolean shouldShutdown = false;
     final RequestVoteReplyProto reply;
     synchronized (this) {
+      final FollowerState fs = role.getFollowerState().orElse(null);
       if (shouldWithholdVotes(candidateTerm)) {
         LOG.info("{}-{}: Withhold vote from candidate {} with term {}. State: 
leader={}, term={}, lastRpcElapsed={}",
             getId(), role, candidateId, candidateTerm, state.getLeaderId(), 
state.getCurrentTerm(),
-            isFollower()? heartbeatMonitor.getLastRpcTime().elapsedTimeMs() + 
"ms": null);
+            fs != null? fs.getLastRpcTime().elapsedTimeMs() + "ms": null);
       } else if (state.recognizeCandidate(candidateId, candidateTerm)) {
         final boolean termUpdated = changeToFollower(candidateTerm);
         // see Section 5.4.1 Election restriction
-        if (state.isLogUpToDate(candidateLastEntry)) {
-          heartbeatMonitor.updateLastRpcTime(false);
+        if (state.isLogUpToDate(candidateLastEntry) && fs != null) {
+          fs.updateLastRpcTime(false);
           state.grantVote(candidateId);
           voteGranted = true;
         }
@@ -864,6 +808,12 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
     }
   }
 
+  private void updateLastRpcTime(boolean inLogSync) {
+    if (lifeCycle.getCurrentState() == RUNNING) {
+      role.getFollowerState().ifPresent(fs -> fs.updateLastRpcTime(inLogSync));
+    }
+  }
+
   private CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(
       RaftPeerId leaderId, RaftGroupId leaderGroupId, long leaderTerm,
       TermIndex previous, long leaderCommit, long callId, boolean initializing,
@@ -912,11 +862,9 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
       state.setLeader(leaderId, "appendEntries");
 
       if (!initializing && lifeCycle.compareAndTransition(STARTING, RUNNING)) {
-        startHeartbeatMonitor();
-      }
-      if (lifeCycle.getCurrentState() == RUNNING) {
-        heartbeatMonitor.updateLastRpcTime(true);
+        role.startFollowerState(this);
       }
+      updateLastRpcTime(true);
 
       // We need to check if "previous" is in the local peer. Note that it is
       // possible that "previous" is covered by the latest snapshot: e.g.,
@@ -950,7 +898,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
         if (lifeCycle.getCurrentState() == RUNNING && isFollower()
             && getState().getCurrentTerm() == currentTerm) {
           // reset election timer to avoid punishing the leader for our own 
long disk writes
-          heartbeatMonitor.updateLastRpcTime(false);
+          updateLastRpcTime(false);
         }
         state.updateStatemachine(leaderCommit, currentTerm);
         reply = ServerProtoUtils.toAppendEntriesReplyProto(leaderId, getId(), 
groupId, currentTerm,
@@ -1006,9 +954,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
       changeToFollowerAndPersistMetadata(leaderTerm);
       state.setLeader(leaderId, "installSnapshot");
 
-      if (lifeCycle.getCurrentState() == RUNNING) {
-        heartbeatMonitor.updateLastRpcTime(true);
-      }
+      updateLastRpcTime(true);
 
       // Check and append the snapshot chunk. We simply put this in lock
       // considering a follower peer requiring a snapshot installation does not
@@ -1026,9 +972,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
       if (request.getDone()) {
         state.reloadStateMachine(lastIncludedIndex, leaderTerm);
       }
-      if (lifeCycle.getCurrentState() == RUNNING) {
-        heartbeatMonitor.updateLastRpcTime(false);
-      }
+      updateLastRpcTime(false);
     }
     if (request.getDone()) {
       LOG.info("{}: successfully install the whole snapshot-{}", getId(),
@@ -1056,7 +1000,7 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
   }
 
   public void submitUpdateCommitEvent() {
-    
Optional.ofNullable(leaderState).ifPresent(LeaderState::submitUpdateCommitEvent);
+    role.getLeaderState().ifPresent(LeaderState::submitUpdateCommitEvent);
   }
 
   /**
@@ -1092,6 +1036,7 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
       // update pending request
       boolean updateCache = true;  // always update cache for follower
       synchronized (RaftServerImpl.this) {
+        final LeaderState leaderState = role.getLeaderState().orElse(null);
         if (isLeader() && leaderState != null) { // is leader and is running
           // For leader, update cache unless the reply is delayed.
           // When a reply is delayed, the cache will be updated in 
DelayedReply.getReply().
@@ -1104,19 +1049,11 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
     });
   }
 
-  private TransactionContext getTransactionContext(long index) {
-    if (leaderState != null) { // is leader and is running
-      return leaderState.getTransactionContext(index);
-    }
-    return null;
-  }
-
   public long[] getFollowerNextIndices() {
-    LeaderState s = this.leaderState;
-    if (s == null || !isLeader()) {
+    if (!isLeader()) {
       return null;
     }
-    return s.getFollowerNextIndices();
+    return 
role.getLeaderState().map(LeaderState::getFollowerNextIndices).orElse(null);
   }
 
   CompletableFuture<Message> applyLogToStateMachine(LogEntryProto next) {
@@ -1127,10 +1064,9 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
       
stateMachine.setRaftConfiguration(ServerProtoUtils.toRaftConfiguration(next));
     } else if (next.getLogEntryBodyCase() == SMLOGENTRY) {
       // check whether there is a TransactionContext because we are the leader.
-      TransactionContext trx = getTransactionContext(next.getIndex());
-      if (trx == null) {
-        trx = new TransactionContextImpl(getRole(), stateMachine, next);
-      }
+      TransactionContext trx = role.getLeaderState()
+          .map(leader -> 
leader.getTransactionContext(next.getIndex())).orElseGet(
+              () -> new TransactionContextImpl(role.getCurrentRole(), 
stateMachine, next));
 
       // Let the StateMachine inject logic for committed transactions in 
sequential order.
       trx = stateMachine.applyTransactionSerial(trx);
@@ -1189,12 +1125,8 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
 
     @Override
     public List<String> getFollowers() {
-      return Optional.ofNullable(leaderState)
-          .map(leader ->
-              leader.getFollowers().stream()
-                  .map(RaftPeer::toString)
-                  .collect(Collectors.toList()))
-          .orElse(Collections.emptyList());
+      return 
role.getLeaderState().map(LeaderState::getFollowers).orElse(Collections.emptyList())
+          .stream().map(RaftPeer::toString).collect(Collectors.toList());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9b84d79c/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
index 9ee2aa1..eb25c71 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
@@ -18,20 +18,38 @@
 
 package org.apache.ratis.server.impl;
 
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.Timestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Maintain the Role of a Raft Peer.
  */
 public class RoleInfo {
+  public static final Logger LOG = LoggerFactory.getLogger(RoleInfo.class);
 
+  private final RaftPeerId id;
   private volatile RaftPeerRole role;
+  /** Used when the peer is leader */
+  private final AtomicReference<LeaderState> leaderState = new 
AtomicReference<>();
+  /** Used when the peer is follower, to monitor election timeout */
+  private final AtomicReference<FollowerState> followerState = new 
AtomicReference<>();
+  /** Used when the peer is candidate, to request votes from other peers */
+  private final AtomicReference<LeaderElection> leaderElection = new 
AtomicReference<>();
+
   private final AtomicReference<Timestamp> transitionTime;
 
-  RoleInfo() {
+  RoleInfo(RaftPeerId id) {
+    this.id = id;
     this.transitionTime = new AtomicReference<>(new Timestamp());
   }
 
@@ -60,6 +78,68 @@ public class RoleInfo {
     return role == RaftPeerRole.LEADER;
   }
 
+  Optional<LeaderState> getLeaderState() {
+    return Optional.ofNullable(leaderState.get());
+  }
+
+  LeaderState getLeaderStateNonNull() {
+    return Objects.requireNonNull(leaderState.get(), "leaderState is null");
+  }
+
+  LogEntryProto startLeaderState(RaftServerImpl server, RaftProperties 
properties) {
+    return updateAndGet(leaderState, new LeaderState(server, 
properties)).start();
+  }
+
+  void shutdownLeaderState(boolean allowNull) {
+    final LeaderState leader = leaderState.getAndSet(null);
+    if (leader == null) {
+      if (!allowNull) {
+        throw new NullPointerException("leaderState == null");
+      }
+    } else {
+      LOG.info("{}: shutdown {}", id, leader.getClass().getSimpleName());
+      leader.stop();
+    }
+    // TODO: make sure that StateMachineUpdater has applied all transactions 
that have context
+  }
+
+  Optional<FollowerState> getFollowerState() {
+    return Optional.ofNullable(followerState.get());
+  }
+
+  void startFollowerState(RaftServerImpl server) {
+    updateAndGet(followerState, new FollowerState(server)).start();
+  }
+
+  void shutdownFollowerState() {
+    final FollowerState follower = followerState.getAndSet(null);
+    if (follower != null) {
+      LOG.info("{}: shutdown {}", id, follower.getClass().getSimpleName());
+      follower.stopRunning();
+      follower.interrupt();
+    }
+  }
+
+  void startLeaderElection(RaftServerImpl server) {
+    updateAndGet(leaderElection, new LeaderElection(server)).start();
+  }
+
+  void shutdownLeaderElection() {
+    final LeaderElection election = leaderElection.getAndSet(null);
+    if (election != null) {
+      LOG.info("{}: shutdown {}", id, election.getClass().getSimpleName());
+      election.stopRunning();
+      // no need to interrupt the election thread
+    }
+  }
+
+  private <T> T updateAndGet(AtomicReference<T> ref, T current) {
+    final T updated = ref.updateAndGet(previous -> previous != null? previous: 
current);
+    Preconditions.assertTrue(updated == current, "previous != null");
+    LOG.info("{}: start {}", id, current.getClass().getSimpleName());
+    return updated;
+  }
+
   @Override
   public String toString() {
     return "" + role;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9b84d79c/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index 53df265..6fbd43a 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -211,4 +211,14 @@ public class ServerProtoUtils {
     return b.build();
   }
 
+  static ServerRpcProto toServerRpcProto(RaftPeer peer, long delay) {
+    if (peer == null) {
+      // if no peer information return empty
+      return ServerRpcProto.getDefaultInstance();
+    }
+    return ServerRpcProto.newBuilder()
+        .setId(ProtoUtils.toRaftPeerProto(peer))
+        .setLastRpcElapsedTimeMs(delay)
+        .build();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9b84d79c/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 86b6b00..10afbfd 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -35,7 +35,6 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 
 import static org.apache.ratis.server.impl.RaftServerImpl.LOG;
@@ -63,23 +62,23 @@ public class ServerState implements Closeable {
    * Latest term server has seen. initialized to 0 on first boot, increases
    * monotonically.
    */
-  private long currentTerm;
+  private volatile long currentTerm;
   /**
    * The server ID of the leader for this term. Null means either there is
    * no leader for this term yet or this server does not know who it is yet.
    */
-  private RaftPeerId leaderId;
+  private volatile RaftPeerId leaderId;
   /**
    * Candidate that this peer granted vote for in current term (or null if 
none)
    */
-  private RaftPeerId votedFor;
+  private volatile RaftPeerId votedFor;
 
   /**
    * Latest installed snapshot for this server. This maybe different than 
StateMachine's latest
    * snapshot. Once we successfully install a snapshot, the SM may not pick it 
up immediately.
    * Further, this will not get updated when SM does snapshots itself.
    */
-  private TermIndex latestInstalledSnapshot;
+  private volatile TermIndex latestInstalledSnapshot;
 
   ServerState(RaftPeerId id, RaftGroup group, RaftProperties prop,
               RaftServerImpl server, StateMachine stateMachine)

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9b84d79c/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index a4ec715..bcfaf01 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -83,7 +83,7 @@ public class RaftServerTestUtil {
   }
 
   public static Stream<LogAppender> getLogAppenders(RaftServerImpl server) {
-    return server.getLeaderState().getLogAppenders();
+    return 
server.getRole().getLeaderState().map(LeaderState::getLogAppenders).orElse(null);
   }
 
   public static Logger getStateMachineUpdaterLog() {

Reply via email to