This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new eb3feb5  RATIS-605. Change RaftServerImpl and proto to use 
RaftGroupMemberId.
eb3feb5 is described below

commit eb3feb5af57a13173be58b9ba18c382ce4a5da5b
Author: Tsz Wo Nicholas Sze <[email protected]>
AuthorDate: Fri Jul 5 15:47:15 2019 +0800

    RATIS-605. Change RaftServerImpl and proto to use RaftGroupMemberId.
---
 .../apache/ratis/client/impl/ClientProtoUtils.java |  29 ++---
 .../ratis/protocol/LeaderNotReadyException.java    |  14 +-
 .../apache/ratis/protocol/NotLeaderException.java  |   5 +-
 .../org/apache/ratis/protocol/RaftClientReply.java |   7 +
 .../ratis/protocol/StateMachineException.java      |   7 +-
 .../java/org/apache/ratis/util/ProtoUtils.java     |  16 +++
 ratis-proto/src/main/proto/Raft.proto              |   7 +-
 .../org/apache/ratis/server/impl/LeaderState.java  |   6 +-
 .../apache/ratis/server/impl/RaftServerImpl.java   | 144 ++++++++++-----------
 .../org/apache/ratis/server/impl/RoleInfo.java     |   2 +-
 .../apache/ratis/server/impl/ServerProtoUtils.java |  34 ++---
 .../org/apache/ratis/server/impl/ServerState.java  |  41 +++---
 .../ratis/server/impl/StateMachineUpdater.java     |   6 +-
 .../raftlog/segmented/TestSegmentedRaftLog.java    |   4 +
 14 files changed, 163 insertions(+), 159 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 38977bc..6492c23 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -156,7 +156,7 @@ public interface ClientProtoUtils {
       if (reply.getMessage() != null) {
         b.setMessage(toClientMessageEntryProtoBuilder(reply.getMessage()));
       }
-      ProtoUtils.addCommitInfos(reply.getCommitInfos(), i -> 
b.addCommitInfos(i));
+      ProtoUtils.addCommitInfos(reply.getCommitInfos(), b::addCommitInfos);
 
       final NotLeaderException nle = reply.getNotLeaderException();
       final StateMachineException sme;
@@ -191,7 +191,7 @@ public interface ClientProtoUtils {
       final LeaderNotReadyException lnre = reply.getLeaderNotReadyException();
       if (lnre != null) {
         LeaderNotReadyExceptionProto.Builder lnreBuilder = 
LeaderNotReadyExceptionProto.newBuilder()
-            .setRaftPeerId(lnre.getRaftPeerId().toByteString());
+            
.setServerId(ProtoUtils.toRaftGroupMemberIdProtoBuilder(lnre.getServerId()));
         b.setLeaderNotReadyException(lnreBuilder);
       }
     }
@@ -224,40 +224,38 @@ public interface ClientProtoUtils {
         b.setGroup(ProtoUtils.toRaftGroupProtoBuilder(reply.getGroup()));
         b.setIsRaftStorageHealthy(reply.isRaftStorageHealthy());
         b.setRole(reply.getRoleInfoProto());
-        ProtoUtils.addCommitInfos(reply.getCommitInfos(), i -> 
b.addCommitInfos(i));
+        ProtoUtils.addCommitInfos(reply.getCommitInfos(), b::addCommitInfos);
       }
     }
     return b.build();
   }
 
-  static RaftClientReply toRaftClientReply(
-      RaftClientReplyProto replyProto) {
+  static RaftClientReply toRaftClientReply(RaftClientReplyProto replyProto) {
     final RaftRpcReplyProto rp = replyProto.getRpcReply();
+    final RaftGroupMemberId serverMemberId = 
ProtoUtils.toRaftGroupMemberId(rp.getReplyId(), rp.getRaftGroupId());
+
     final RaftException e;
     if (replyProto.getExceptionDetailsCase().equals(NOTLEADEREXCEPTION)) {
       NotLeaderExceptionProto nleProto = replyProto.getNotLeaderException();
       final RaftPeer suggestedLeader = nleProto.hasSuggestedLeader() ?
           ProtoUtils.toRaftPeer(nleProto.getSuggestedLeader()) : null;
       final List<RaftPeer> peers = 
ProtoUtils.toRaftPeers(nleProto.getPeersInConfList());
-      e = new NotLeaderException(RaftPeerId.valueOf(rp.getReplyId()), 
suggestedLeader, peers);
+      e = new NotLeaderException(serverMemberId, suggestedLeader, peers);
     } else if (replyProto.getExceptionDetailsCase() == NOTREPLICATEDEXCEPTION) 
{
       final NotReplicatedExceptionProto nre = 
replyProto.getNotReplicatedException();
       e = new NotReplicatedException(nre.getCallId(), nre.getReplication(), 
nre.getLogIndex());
     } else if 
(replyProto.getExceptionDetailsCase().equals(STATEMACHINEEXCEPTION)) {
       StateMachineExceptionProto smeProto = 
replyProto.getStateMachineException();
-      e = wrapStateMachineException(RaftPeerId.valueOf(rp.getReplyId()),
-          smeProto.getExceptionClassName(), smeProto.getErrorMsg(),
-          smeProto.getStacktrace());
+      e = wrapStateMachineException(serverMemberId,
+          smeProto.getExceptionClassName(), smeProto.getErrorMsg(), 
smeProto.getStacktrace());
     } else if 
(replyProto.getExceptionDetailsCase().equals(LEADERNOTREADYEXCEPTION)) {
       LeaderNotReadyExceptionProto lnreProto = 
replyProto.getLeaderNotReadyException();
-      e = new 
LeaderNotReadyException(RaftPeerId.valueOf(lnreProto.getRaftPeerId()));
+      e = new 
LeaderNotReadyException(ProtoUtils.toRaftGroupMemberId(lnreProto.getServerId()));
     } else {
       e = null;
     }
     ClientId clientId = ClientId.valueOf(rp.getRequestorId());
-    final RaftGroupId groupId = ProtoUtils.toRaftGroupId(rp.getRaftGroupId());
-    return new RaftClientReply(clientId, RaftPeerId.valueOf(rp.getReplyId()),
-        groupId, rp.getCallId(), rp.getSuccess(),
+    return new RaftClientReply(clientId, serverMemberId, rp.getCallId(), 
rp.getSuccess(),
         toMessage(replyProto.getMessage()), e,
         replyProto.getLogIndex(), replyProto.getCommitInfosList());
   }
@@ -288,8 +286,7 @@ public interface ClientProtoUtils {
   }
 
   static StateMachineException wrapStateMachineException(
-      RaftPeerId serverId, String className, String errorMsg,
-      ByteString stackTraceBytes) {
+      RaftGroupMemberId memberId, String className, String errorMsg, 
ByteString stackTraceBytes) {
     StateMachineException sme;
     if (className == null) {
       sme = new StateMachineException(errorMsg);
@@ -298,7 +295,7 @@ public interface ClientProtoUtils {
         Class<?> clazz = Class.forName(className);
         final Exception e = ReflectionUtils.instantiateException(
             clazz.asSubclass(Exception.class), errorMsg, null);
-        sme = new StateMachineException(serverId, e);
+        sme = new StateMachineException(memberId, e);
       } catch (Exception e) {
         sme = new StateMachineException(className + ": " + errorMsg);
       }
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java
 
b/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java
index 2b81958..efe795f 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -23,15 +23,15 @@ package org.apache.ratis.protocol;
  * log entry yet. Thus the leader cannot accept any new client requests since
  * it cannot determine whether a request is just a retry.
  */
-public class LeaderNotReadyException extends RaftException {
-  private final RaftPeerId raftPeerId;
+public class LeaderNotReadyException extends ServerNotReadyException {
+  private final RaftGroupMemberId serverId;
 
-  public LeaderNotReadyException(RaftPeerId id) {
+  public LeaderNotReadyException(RaftGroupMemberId id) {
     super(id + " is in LEADER state but not ready yet.");
-    this.raftPeerId = id;
+    this.serverId = id;
   }
 
-  public RaftPeerId getRaftPeerId() {
-    return raftPeerId;
+  public RaftGroupMemberId getServerId() {
+    return serverId;
   }
 }
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/NotLeaderException.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/NotLeaderException.java
index a854d8c..ab291b9 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/NotLeaderException.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/NotLeaderException.java
@@ -27,9 +27,8 @@ public class NotLeaderException extends RaftException {
   /** the client may need to update its RaftPeer list */
   private final Collection<RaftPeer> peers;
 
-  public NotLeaderException(RaftPeerId id, RaftPeer suggestedLeader, 
Collection<RaftPeer> peers) {
-    super("Server " + id + " is not the leader (" + suggestedLeader
-        + "). Request must be sent to leader.");
+  public NotLeaderException(RaftGroupMemberId memberId, RaftPeer 
suggestedLeader, Collection<RaftPeer> peers) {
+    super("Server " + memberId + " is not the leader" + (suggestedLeader != 
null? " " + suggestedLeader: ""));
     this.suggestedLeader = suggestedLeader;
     this.peers = peers != null? Collections.unmodifiableCollection(peers): 
Collections.emptyList();
     Preconditions.assertUnique(this.peers);
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
index 33bbcd0..0b80ae2 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
@@ -51,6 +51,13 @@ public class RaftClientReply extends RaftClientMessage {
   /** The commit information when the reply is created. */
   private final Collection<CommitInfoProto> commitInfos;
 
+  public RaftClientReply(ClientId clientId, RaftGroupMemberId serverId,
+      long callId, boolean success, Message message, RaftException exception,
+      long logIndex, Collection<CommitInfoProto> commitInfos) {
+    this(clientId, serverId.getPeerId(), serverId.getGroupId(),
+        callId, success, message, exception, logIndex, commitInfos);
+  }
+
   public RaftClientReply(
       ClientId clientId, RaftPeerId serverId, RaftGroupId groupId,
       long callId, boolean success, Message message, RaftException exception,
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java
 
b/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java
index a9710b2..2641b4a 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,6 +18,11 @@
 package org.apache.ratis.protocol;
 
 public class StateMachineException extends RaftException {
+  public StateMachineException(RaftGroupMemberId serverId, Throwable cause) {
+    this(serverId.getPeerId(), cause);
+  }
+
+  // TODO: remove this constructor in RATIS-609
   public StateMachineException(RaftPeerId serverId, Throwable cause) {
     // cause.getMessage is added to this exception message as the exception 
received through
     // RPC call contains similar message but Simulated RPC doesn't. Adding the 
message
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
index 2b0b5aa..7422637 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
@@ -19,6 +19,7 @@ package org.apache.ratis.util;
 
 import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
 import org.apache.ratis.proto.RaftProtos.RaftGroupIdProto;
+import org.apache.ratis.proto.RaftProtos.RaftGroupMemberIdProto;
 import org.apache.ratis.proto.RaftProtos.RaftGroupProto;
 import org.apache.ratis.proto.RaftProtos.RaftPeerProto;
 import org.apache.ratis.proto.RaftProtos.RaftRpcReplyProto;
@@ -26,6 +27,7 @@ import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto;
 import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry;
 import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -112,6 +114,20 @@ public interface ProtoUtils {
         .addAllPeers(toRaftPeerProtos(group.getPeers()));
   }
 
+  static RaftGroupMemberId toRaftGroupMemberId(ByteString peerId, 
RaftGroupIdProto groupId) {
+    return RaftGroupMemberId.valueOf(RaftPeerId.valueOf(peerId), 
ProtoUtils.toRaftGroupId(groupId));
+  }
+
+  static RaftGroupMemberId toRaftGroupMemberId(RaftGroupMemberIdProto 
memberId) {
+    return toRaftGroupMemberId(memberId.getPeerId(), memberId.getGroupId());
+  }
+
+  static RaftGroupMemberIdProto.Builder 
toRaftGroupMemberIdProtoBuilder(RaftGroupMemberId memberId) {
+    return RaftGroupMemberIdProto.newBuilder()
+        .setPeerId(memberId.getPeerId().toByteString())
+        .setGroupId(toRaftGroupIdProtoBuilder(memberId.getGroupId()));
+  }
+
   static CommitInfoProto toCommitInfoProto(RaftPeer peer, long commitIndex) {
     return CommitInfoProto.newBuilder()
         .setServer(peer.getRaftPeerProto())
diff --git a/ratis-proto/src/main/proto/Raft.proto 
b/ratis-proto/src/main/proto/Raft.proto
index 45b7851..b75569e 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -35,6 +35,11 @@ message RaftGroupProto {
   repeated RaftPeerProto peers = 2;
 }
 
+message RaftGroupMemberIdProto {
+  bytes peerId = 1;
+  RaftGroupIdProto groupId = 2;
+}
+
 message RaftConfigurationProto {
   repeated RaftPeerProto peers = 1; // the peers in the current or new conf
   repeated RaftPeerProto oldPeers = 2; // the peers in the old conf
@@ -270,7 +275,7 @@ message NotLeaderExceptionProto {
 }
 
 message LeaderNotReadyExceptionProto {
-  bytes raftPeerId = 1; // id of the peer
+  RaftGroupMemberIdProto serverId = 1; // id of the leader
 }
 
 message NotReplicatedExceptionProto {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 2271908..e61c7e4 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -208,7 +208,7 @@ public class LeaderState {
     this.watchRequests = new WatchRequests(server.getId(), properties);
 
     final RaftConfiguration conf = server.getRaftConf();
-    Collection<RaftPeer> others = conf.getOtherPeers(state.getSelfId());
+    Collection<RaftPeer> others = conf.getOtherPeers(server.getId());
     placeHolderIndex = raftLog.getNextIndex();
 
     senders = new SenderList();
@@ -360,8 +360,8 @@ public class LeaderState {
   AppendEntriesRequestProto newAppendEntriesRequestProto(RaftPeerId targetId,
       TermIndex previous, List<LogEntryProto> entries, boolean initializing,
       long callId) {
-    return ServerProtoUtils.toAppendEntriesRequestProto(server.getId(), 
targetId,
-        server.getGroupId(), currentTerm, entries, 
raftLog.getLastCommittedIndex(),
+    return ServerProtoUtils.toAppendEntriesRequestProto(server.getMemberId(), 
targetId,
+        currentTerm, entries, raftLog.getLastCommittedIndex(),
         initializing, previous, server.getCommitInfos(), callId);
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index b819e29..d93bcec 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -77,8 +77,6 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
 
   private final LifeCycle lifeCycle;
   private final ServerState state;
-  private final RaftGroupId groupId;
-  private final RaftGroupMemberId memberId; // TODO: move it to ServerState; 
see RATIS-605
   private final Supplier<RaftPeer> peerSupplier = JavaUtils.memoize(() -> new 
RaftPeer(getId(), getServerRpc().getInetSocketAddress()));
   private final RoleInfo role;
 
@@ -92,8 +90,6 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
   RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy 
proxy) throws IOException {
     final RaftPeerId id = proxy.getId();
     LOG.info("{}: new RaftServerImpl for {} with {}", id, group, stateMachine);
-    this.groupId = group.getGroupId();
-    this.memberId = RaftGroupMemberId.valueOf(id, groupId);
     this.lifeCycle = new LifeCycle(id);
     this.stateMachine = stateMachine;
     this.role = new RoleInfo(id);
@@ -149,7 +145,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
   }
 
   public RaftGroupId getGroupId() {
-    return groupId;
+    return getMemberId().getGroupId();
   }
 
   public StateMachine getStateMachine() {
@@ -170,8 +166,8 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
   }
 
   private void setRole(RaftPeerRole newRole, Object reason) {
-    LOG.info("{}:{} changes role from {} to {} at term {} for {}",
-        getId(), getGroupId(), this.role, newRole, state.getCurrentTerm(), 
reason);
+    LOG.info("{}: changes role from {} to {} at term {} for {}",
+        getMemberId(), this.role, newRole, state.getCurrentTerm(), reason);
     this.role.transitionRole(newRole);
   }
 
@@ -179,13 +175,12 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
     if (!lifeCycle.compareAndTransition(NEW, STARTING)) {
       return false;
     }
-    LOG.info("{}: start {}", getId(), groupId);
     RaftConfiguration conf = getRaftConf();
     if (conf != null && conf.contains(getId())) {
-      LOG.debug("{} starts as a follower, conf={}", getId(), conf);
+      LOG.info("{}: start as a follower, conf={}", getMemberId(), conf);
       startAsFollower();
     } else {
-      LOG.debug("{} starts with initializing state, conf={}", getId(), conf);
+      LOG.info("{}: start with initializing state, conf={}", getMemberId(), 
conf);
       startInitializing();
     }
 
@@ -227,11 +222,11 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
   }
 
   public RaftGroupMemberId getMemberId() {
-    return memberId;
+    return getState().getMemberId();
   }
 
   public RaftPeerId getId() {
-    return getState().getSelfId();
+    return getMemberId().getPeerId();
   }
 
   RoleInfo getRole() {
@@ -243,43 +238,43 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
   }
 
   RaftGroup getGroup() {
-    return RaftGroup.valueOf(groupId, getRaftConf().getPeers());
+    return RaftGroup.valueOf(getGroupId(), getRaftConf().getPeers());
   }
 
   public void shutdown(boolean deleteDirectory) {
     lifeCycle.checkStateAndClose(() -> {
-      LOG.info("{}: shutdown {}", getId(), groupId);
+      LOG.info("{}: shutdown", getMemberId());
       try {
         jmxAdapter.unregister();
       } catch (Exception ignored) {
-        LOG.warn("Failed to un-register RaftServer JMX bean for " + getId(), 
ignored);
+        LOG.warn("{}: Failed to un-register RaftServer JMX bean", 
getMemberId(), ignored);
       }
       try {
         role.shutdownFollowerState();
       } catch (Exception ignored) {
-        LOG.warn("Failed to shutdown FollowerState for " + getId(), ignored);
+        LOG.warn("{}: Failed to shutdown FollowerState", getMemberId(), 
ignored);
       }
       try{
         role.shutdownLeaderElection();
       } catch (Exception ignored) {
-        LOG.warn("Failed to shutdown LeaderElection for " + getId(), ignored);
+        LOG.warn("{}: Failed to shutdown LeaderElection", getMemberId(), 
ignored);
       }
       try{
         role.shutdownLeaderState(true);
       } catch (Exception ignored) {
-        LOG.warn("Failed to shutdown LeaderState monitor for " + getId(), 
ignored);
+        LOG.warn("{}: Failed to shutdown LeaderState monitor", getMemberId(), 
ignored);
       }
       try{
         state.close();
       } catch (Exception ignored) {
-        LOG.warn("Failed to close state for " + getId(), ignored);
+        LOG.warn("{}: Failed to close state", getMemberId(), ignored);
       }
       if (deleteDirectory) {
         final RaftStorageDirectory dir = state.getStorage().getStorageDir();
         try {
           FileUtils.deleteFully(dir.getRoot());
         } catch(Exception ignored) {
-          LOG.warn(getId() + ": Failed to remove RaftStorageDirectory " + dir, 
ignored);
+          LOG.warn("{}: Failed to remove RaftStorageDirectory {}", 
getMemberId(), dir, ignored);
         }
       }
     });
@@ -353,10 +348,10 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
           leader -> leader.updateFollowerCommitInfos(commitInfoCache, infos));
     } else {
       getRaftConf().getPeers().stream()
-          .filter(p -> !p.getId().equals(state.getSelfId()))
           .map(RaftPeer::getId)
+          .filter(id -> !id.equals(getId()))
           .map(commitInfoCache::get)
-          .filter(i -> i != null)
+          .filter(Objects::nonNull)
           .forEach(infos::add);
     }
     return infos;
@@ -419,8 +414,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
 
   @Override
   public String toString() {
-    return String.format("%8s ", role) + groupId + " " + state
-        + " " + lifeCycle.getCurrentState();
+    return role + " " + state + " " + lifeCycle.getCurrentState();
   }
 
   /**
@@ -445,8 +439,8 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
       if (cacheEntry != null && cacheEntry.isCompletedNormally()) {
         return cacheEntry.getReplyFuture();
       }
-      final RaftClientReply reply = new RaftClientReply(request,
-          new LeaderNotReadyException(getId()), getCommitInfos());
+      final LeaderNotReadyException lnre = new 
LeaderNotReadyException(getMemberId());
+      final RaftClientReply reply = new RaftClientReply(request, lnre, 
getCommitInfos());
       return RetryCache.failWithReply(reply, entry);
     }
     return null;
@@ -454,28 +448,29 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
 
   NotLeaderException generateNotLeaderException() {
     if (lifeCycle.getCurrentState() != RUNNING) {
-      return new NotLeaderException(getId(), null, null);
+      return new NotLeaderException(getMemberId(), null, null);
     }
     RaftPeerId leaderId = state.getLeaderId();
-    if (leaderId == null || leaderId.equals(state.getSelfId())) {
+    if (leaderId == null || leaderId.equals(getId())) {
       // No idea about who is the current leader. Or the peer is the current
       // leader, but it is about to step down. set the suggested leader as 
null.
       leaderId = null;
     }
     RaftConfiguration conf = getRaftConf();
     Collection<RaftPeer> peers = conf.getPeers();
-    return new NotLeaderException(getId(), conf.getPeer(leaderId), peers);
+    return new NotLeaderException(getMemberId(), conf.getPeer(leaderId), 
peers);
   }
 
   private LifeCycle.State assertLifeCycleState(LifeCycle.State... expected) 
throws ServerNotReadyException {
-    return lifeCycle.assertCurrentState((n, c) -> new 
ServerNotReadyException("Server " + n
-        + " is not " + Arrays.toString(expected) + ": current state is " + c),
+    return lifeCycle.assertCurrentState((n, c) -> new ServerNotReadyException(
+        getMemberId() + " is not in " + Arrays.toString(expected) + ": current 
state is " + c),
         expected);
   }
 
   void assertGroup(Object requestorId, RaftGroupId requestorGroupId) throws 
GroupMismatchException {
+    final RaftGroupId groupId = getGroupId();
     if (!groupId.equals(requestorGroupId)) {
-      throw new GroupMismatchException(getId()
+      throw new GroupMismatchException(getMemberId()
           + ": The group (" + requestorGroupId + ") of " + requestorId
           + " does not match the group (" + groupId + ") of the server " + 
getId());
     }
@@ -502,7 +497,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
       final PendingRequests.Permit permit = 
leaderState.tryAcquirePendingRequest();
       if (permit == null) {
         return JavaUtils.completeExceptionally(new 
ResourceUnavailableException(
-            "Failed to acquire a pending write request in " + getId() + " for 
" + request));
+            getMemberId() + ": Failed to acquire a pending write request for " 
+ request));
       }
       try {
         state.appendLog(context);
@@ -522,7 +517,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
       pending = leaderState.addPendingRequest(permit, request, context);
       if (pending == null) {
         return JavaUtils.completeExceptionally(new 
ResourceUnavailableException(
-            "Failed to add a pending write request in " + getId() + " for " + 
request));
+            getMemberId() + ": Failed to add a pending write request for " + 
request));
       }
       leaderState.notifySenders();
     }
@@ -533,7 +528,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
   public CompletableFuture<RaftClientReply> submitClientRequestAsync(
       RaftClientRequest request) throws IOException {
     assertLifeCycleState(RUNNING);
-    LOG.debug("{}: receive client request({})", getId(), request);
+    LOG.debug("{}: receive client request({})", getMemberId(), request);
     if (request.is(RaftClientRequestProto.TypeCase.STALEREAD)) {
       return staleReadAsync(request);
     }
@@ -572,7 +567,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
     TransactionContext context = stateMachine.startTransaction(request);
     if (context.getException() != null) {
       RaftClientReply exceptionReply = new RaftClientReply(request,
-          new StateMachineException(getId(), context.getException()), 
getCommitInfos());
+          new StateMachineException(getMemberId(), context.getException()), 
getCommitInfos());
       cacheEntry.failWithReply(exceptionReply);
       return CompletableFuture.completedFuture(exceptionReply);
     }
@@ -589,12 +584,12 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
   private CompletableFuture<RaftClientReply> staleReadAsync(RaftClientRequest 
request) {
     final long minIndex = request.getType().getStaleRead().getMinIndex();
     final long commitIndex = state.getLog().getLastCommittedIndex();
-    LOG.debug("{}: minIndex={}, commitIndex={}", getId(), minIndex, 
commitIndex);
+    LOG.debug("{}: minIndex={}, commitIndex={}", getMemberId(), minIndex, 
commitIndex);
     if (commitIndex < minIndex) {
       final StaleReadException e = new StaleReadException(
           "Unable to serve stale-read due to server commit index = " + 
commitIndex + " < min = " + minIndex);
       return CompletableFuture.completedFuture(
-          new RaftClientReply(request, new StateMachineException(getId(), e), 
getCommitInfos()));
+          new RaftClientReply(request, new 
StateMachineException(getMemberId(), e), getCommitInfos()));
     }
     return 
processQueryFuture(getStateMachine().queryStale(request.getMessage(), 
minIndex), request);
   }
@@ -614,17 +609,16 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
   @Override
   public RaftClientReply submitClientRequest(RaftClientRequest request)
       throws IOException {
-    return waitForReply(getId(), request, submitClientRequestAsync(request));
+    return waitForReply(request, submitClientRequestAsync(request));
   }
 
-  RaftClientReply waitForReply(RaftPeerId id,
-      RaftClientRequest request, CompletableFuture<RaftClientReply> future)
+  RaftClientReply waitForReply(RaftClientRequest request, 
CompletableFuture<RaftClientReply> future)
       throws IOException {
-    return waitForReply(id, request, future, e -> new RaftClientReply(request, 
e, getCommitInfos()));
+    return waitForReply(getMemberId(), request, future, e -> new 
RaftClientReply(request, e, getCommitInfos()));
   }
 
   static <REPLY extends RaftClientReply> REPLY waitForReply(
-      RaftPeerId id, RaftClientRequest request, CompletableFuture<REPLY> 
future,
+      Object id, RaftClientRequest request, CompletableFuture<REPLY> future,
       Function<RaftException, REPLY> exceptionReply)
       throws IOException {
     try {
@@ -650,18 +644,16 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
   }
 
   @Override
-  public RaftClientReply setConfiguration(SetConfigurationRequest request)
-      throws IOException {
-    return waitForReply(getId(), request, setConfigurationAsync(request));
+  public RaftClientReply setConfiguration(SetConfigurationRequest request) 
throws IOException {
+    return waitForReply(request, setConfigurationAsync(request));
   }
 
   /**
    * Handle a raft configuration change request from client.
    */
   @Override
-  public CompletableFuture<RaftClientReply> setConfigurationAsync(
-      SetConfigurationRequest request) throws IOException {
-    LOG.debug("{}: receive setConfiguration({})", getId(), request);
+  public CompletableFuture<RaftClientReply> 
setConfigurationAsync(SetConfigurationRequest request) throws IOException {
+    LOG.info("{}: receive setConfiguration {}", getMemberId(), request);
     assertLifeCycleState(RUNNING);
     assertGroup(request.getRequestorId(), request.getRaftGroupId());
 
@@ -747,7 +739,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
     CodeInjectionForTesting.execute(REQUEST_VOTE, getId(),
         candidateId, candidateTerm, candidateLastEntry);
     LOG.debug("{}: receive requestVote({}, {}, {}, {})",
-        getId(), candidateId, candidateGroupId, candidateTerm, 
candidateLastEntry);
+        getMemberId(), candidateId, candidateGroupId, candidateTerm, 
candidateLastEntry);
     assertLifeCycleState(RUNNING);
     assertGroup(candidateId, candidateGroupId);
 
@@ -758,7 +750,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
       final FollowerState fs = role.getFollowerState().orElse(null);
       if (shouldWithholdVotes(candidateTerm)) {
         LOG.info("{}-{}: Withhold vote from candidate {} with term {}. State: 
leader={}, term={}, lastRpcElapsed={}",
-            getId(), role, candidateId, candidateTerm, state.getLeaderId(), 
state.getCurrentTerm(),
+            getMemberId(), role, candidateId, candidateTerm, 
state.getLeaderId(), state.getCurrentTerm(),
             fs != null? fs.getLastRpcTime().elapsedTimeMs() + "ms": null);
       } else if (state.recognizeCandidate(candidateId, candidateTerm)) {
         final boolean termUpdated = changeToFollower(candidateTerm, true, 
"recognizeCandidate:" + candidateId);
@@ -775,11 +767,11 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
       if (!voteGranted && shouldSendShutdown(candidateId, candidateLastEntry)) 
{
         shouldShutdown = true;
       }
-      reply = ServerProtoUtils.toRequestVoteReplyProto(candidateId, getId(),
-          groupId, voteGranted, state.getCurrentTerm(), shouldShutdown);
+      reply = ServerProtoUtils.toRequestVoteReplyProto(candidateId, 
getMemberId(),
+          voteGranted, state.getCurrentTerm(), shouldShutdown);
       if (LOG.isDebugEnabled()) {
         LOG.debug("{} replies to vote request: {}. Peer's state: {}",
-            getId(), ServerProtoUtils.toString(reply), state);
+            getMemberId(), ServerProtoUtils.toString(reply), state);
       }
     }
     return reply;
@@ -841,7 +833,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
       return appendEntriesAsync(requestorId, r.getLeaderTerm(), previous, 
r.getLeaderCommit(),
           request.getCallId(), r.getInitializing(), r.getCommitInfosList(), 
entries);
     } catch(Throwable t) {
-      LOG.error(getId() + ": Failed appendEntriesAsync " + r, t);
+      LOG.error("{}: Failed appendEntriesAsync {}", getMemberId(), r, t);
       throw t;
     }
   }
@@ -876,7 +868,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
     final LifeCycle.State currentState = assertLifeCycleState(STARTING, 
RUNNING);
     if (currentState == STARTING) {
       if (role.getCurrentRole() == null) {
-        throw new ServerNotReadyException("The role of Server " + getId() + " 
is not yet initialized.");
+        throw new ServerNotReadyException(getMemberId() + ": The server role 
is not yet initialized.");
       }
     }
     assertGroup(leaderId, leaderGroupId);
@@ -893,7 +885,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
       List<CommitInfoProto> commitInfos, LogEntryProto... entries) {
     final boolean isHeartbeat = entries.length == 0;
     logAppendEntries(isHeartbeat,
-        () -> getId() + ": receive appendEntries(" + leaderId + ", " + 
leaderTerm + ", "
+        () -> getMemberId() + ": receive appendEntries(" + leaderId + ", " + 
leaderTerm + ", "
             + previous + ", " + leaderCommit + ", " + initializing
             + ", commits" + ProtoUtils.toString(commitInfos)
             + ", entries: " + ServerProtoUtils.toString(entries));
@@ -907,10 +899,10 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
       currentTerm = state.getCurrentTerm();
       if (!recognized) {
         final AppendEntriesReplyProto reply = 
ServerProtoUtils.toAppendEntriesReplyProto(
-            leaderId, getId(), groupId, currentTerm, followerCommit, 
state.getNextIndex(), NOT_LEADER, callId);
+            leaderId, getMemberId(), currentTerm, followerCommit, 
state.getNextIndex(), NOT_LEADER, callId);
         if (LOG.isDebugEnabled()) {
           LOG.debug("{}: Not recognize {} (term={}) as leader, state: {} 
reply: {}",
-              getId(), leaderId, leaderTerm, state, 
ServerProtoUtils.toString(reply));
+              getMemberId(), leaderId, leaderTerm, state, 
ServerProtoUtils.toString(reply));
         }
         return CompletableFuture.completedFuture(reply);
       }
@@ -957,11 +949,11 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
       synchronized(this) {
         state.updateStatemachine(leaderCommit, currentTerm);
         final long n = isHeartbeat? state.getLog().getNextIndex(): 
entries[entries.length - 1].getIndex() + 1;
-        reply = ServerProtoUtils.toAppendEntriesReplyProto(leaderId, getId(), 
groupId, currentTerm,
+        reply = ServerProtoUtils.toAppendEntriesReplyProto(leaderId, 
getMemberId(), currentTerm,
             state.getLog().getLastCommittedIndex(), n, SUCCESS, callId);
       }
       logAppendEntries(isHeartbeat, () ->
-          getId() + ": succeeded to handle AppendEntries. Reply: " + 
ServerProtoUtils.toString(reply));
+          getMemberId() + ": succeeded to handle AppendEntries. Reply: " + 
ServerProtoUtils.toString(reply));
       return reply;
     });
   }
@@ -974,8 +966,8 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
     }
 
     final AppendEntriesReplyProto reply = 
ServerProtoUtils.toAppendEntriesReplyProto(
-        leaderId, getId(), groupId, currentTerm, followerCommit, 
replyNextIndex, INCONSISTENCY, callId);
-    LOG.info("{}: inconsistency entries. Reply:{}", getId(), 
ServerProtoUtils.toString(reply));
+        leaderId, getMemberId(), currentTerm, followerCommit, replyNextIndex, 
INCONSISTENCY, callId);
+    LOG.info("{}: inconsistency entries. Reply:{}", getMemberId(), 
ServerProtoUtils.toString(reply));
     return reply;
   }
 
@@ -983,7 +975,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
     // Check if a snapshot installation through state machine is in progress.
     final TermIndex installSnapshot = inProgressInstallSnapshotRequest.get();
     if (installSnapshot != null) {
-      LOG.info("{}: Failed appendEntries as snapshot ({}) installation is in 
progress", getId(), installSnapshot);
+      LOG.info("{}: Failed appendEntries as snapshot ({}) installation is in 
progress", getMemberId(), installSnapshot);
       return installSnapshot.getIndex();
     }
 
@@ -994,7 +986,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
       final long snapshotIndex = state.getSnapshotIndex();
       if (snapshotIndex > 0 && snapshotIndex >= firstEntryIndex) {
         LOG.info("{}: Failed appendEntries as latest snapshot ({}) already has 
the append entries (first index: {})",
-            getId(), snapshotIndex, firstEntryIndex);
+            getMemberId(), snapshotIndex, firstEntryIndex);
         return snapshotIndex + 1;
       }
     }
@@ -1002,7 +994,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
     // Check if "previous" is contained in current state.
     if (previous != null && !state.containsTermIndex(previous)) {
       final long replyNextIndex = Math.min(state.getNextIndex(), 
previous.getIndex());
-      LOG.info("{}: Failed appendEntries as previous log entry ({}) is not 
found", getId(), previous);
+      LOG.info("{}: Failed appendEntries as previous log entry ({}) is not 
found", getMemberId(), previous);
       return replyNextIndex;
     }
 
@@ -1085,7 +1077,7 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
         Preconditions.assertTrue(
             state.getLog().getNextIndex() <= lastIncludedIndex,
             "%s log's next id is %s, last included index in snapshot is %s",
-            getId(), state.getLog().getNextIndex(), lastIncludedIndex);
+            getMemberId(), state.getLog().getNextIndex(), lastIncludedIndex);
 
         //TODO: We should only update State with installed snapshot once the 
request is done.
         state.installSnapshot(request);
@@ -1197,8 +1189,7 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
 
   synchronized RequestVoteRequestProto createRequestVoteRequest(
       RaftPeerId targetId, long term, TermIndex lastEntry) {
-    return ServerProtoUtils.toRequestVoteRequestProto(getId(), targetId,
-        groupId, term, lastEntry);
+    return ServerProtoUtils.toRequestVoteRequestProto(getMemberId(), targetId, 
term, lastEntry);
   }
 
   public void submitUpdateCommitEvent() {
@@ -1219,7 +1210,6 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
     // update the retry cache
     final ClientId clientId = ClientId.valueOf(smLog.getClientId());
     final long callId = smLog.getCallId();
-    final RaftPeerId serverId = getId();
     final RetryCache.CacheEntry cacheEntry = 
retryCache.getOrCreateEntry(clientId, callId);
     if (cacheEntry.isFailed()) {
       retryCache.refreshEntry(new RetryCache.CacheEntry(cacheEntry.getKey()));
@@ -1229,12 +1219,12 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
     return stateMachineFuture.whenComplete((reply, exception) -> {
       final RaftClientReply r;
       if (exception == null) {
-        r = new RaftClientReply(clientId, serverId, groupId, callId, true, 
reply, null, logIndex, getCommitInfos());
+        r = new RaftClientReply(clientId, getMemberId(), callId, true, reply, 
null, logIndex, getCommitInfos());
       } else {
         // the exception is coming from the state machine. wrap it into the
         // reply as a StateMachineException
-        final StateMachineException e = new StateMachineException(getId(), 
exception);
-        r = new RaftClientReply(clientId, serverId, groupId, callId, false, 
null, e, logIndex, getCommitInfos());
+        final StateMachineException e = new 
StateMachineException(getMemberId(), exception);
+        r = new RaftClientReply(clientId, getMemberId(), callId, false, null, 
e, logIndex, getCommitInfos());
       }
 
       // update pending request
@@ -1283,8 +1273,8 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
             stateMachine.applyTransaction(trx);
         return replyPendingRequest(next, stateMachineFuture);
       } catch (Throwable e) {
-        LOG.error("{}: applyTransaction failed for index:{} proto:{}", getId(),
-            next.getIndex(), ServerProtoUtils.toString(next), e.getMessage());
+        LOG.error("{}: applyTransaction failed for index:{} proto:{}",
+            getMemberId(), next.getIndex(), ServerProtoUtils.toString(next), 
e);
         throw e;
       }
     }
@@ -1298,7 +1288,7 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
       final long callId = smLog.getCallId();
       final RetryCache.CacheEntry cacheEntry = getRetryCache().get(clientId, 
callId);
       if (cacheEntry != null) {
-        final RaftClientReply reply = new RaftClientReply(clientId, getId(), 
getGroupId(),
+        final RaftClientReply reply = new RaftClientReply(clientId, 
getMemberId(),
             callId, false, null, generateNotLeaderException(),
             logEntry.getIndex(), getCommitInfos());
         cacheEntry.failWithReply(reply);
@@ -1309,7 +1299,7 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
   private class RaftServerJmxAdapter extends JmxRegister implements 
RaftServerMXBean {
     @Override
     public String getId() {
-      return getState().getSelfId().toString();
+      return getMemberId().getPeerId().toString();
     }
 
     @Override
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
index a27a086..ae2a0df 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
@@ -146,6 +146,6 @@ class RoleInfo {
 
   @Override
   public String toString() {
-    return "" + role;
+    return String.format("%9s", role);
   }
 }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index 5d94929..e688446 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -22,7 +22,6 @@ import org.apache.ratis.proto.RaftProtos.*;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
@@ -290,20 +289,14 @@ public interface ServerProtoUtils {
 
   static RaftRpcReplyProto.Builder toRaftRpcReplyProtoBuilder(
       RaftPeerId requestorId, RaftGroupMemberId replyId, boolean success) {
-    return toRaftRpcReplyProtoBuilder(requestorId, replyId.getPeerId(), 
replyId.getGroupId(), success);
-  }
-
-  static RaftRpcReplyProto.Builder toRaftRpcReplyProtoBuilder(
-      RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId, boolean 
success) {
     return ClientProtoUtils.toRaftRpcReplyProtoBuilder(
-        requestorId.toByteString(), replyId.toByteString(), groupId, 
DEFAULT_CALLID, success);
+        requestorId.toByteString(), replyId.getPeerId().toByteString(), 
replyId.getGroupId(), DEFAULT_CALLID, success);
   }
 
   static RequestVoteReplyProto toRequestVoteReplyProto(
-      RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId,
-      boolean success, long term, boolean shouldShutdown) {
+      RaftPeerId requestorId, RaftGroupMemberId replyId, boolean success, long 
term, boolean shouldShutdown) {
     return RequestVoteReplyProto.newBuilder()
-        .setServerReply(toRaftRpcReplyProtoBuilder(requestorId, replyId, 
groupId, success))
+        .setServerReply(toRaftRpcReplyProtoBuilder(requestorId, replyId, 
success))
         .setTerm(term)
         .setShouldShutdown(shouldShutdown)
         .build();
@@ -311,19 +304,14 @@ public interface ServerProtoUtils {
 
   static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
       RaftGroupMemberId requestorId, RaftPeerId replyId) {
-    return toRaftRpcRequestProtoBuilder(requestorId.getPeerId(), replyId, 
requestorId.getGroupId());
-  }
-
-  static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
-      RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId) {
     return ClientProtoUtils.toRaftRpcRequestProtoBuilder(
-        requestorId.toByteString(), replyId.toByteString(), groupId, 
DEFAULT_CALLID, null);
+        requestorId.getPeerId().toByteString(), replyId.toByteString(), 
requestorId.getGroupId(), DEFAULT_CALLID, null);
   }
 
   static RequestVoteRequestProto toRequestVoteRequestProto(
-      RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId, long 
term, TermIndex lastEntry) {
+      RaftGroupMemberId requestorId, RaftPeerId replyId, long term, TermIndex 
lastEntry) {
     final RequestVoteRequestProto.Builder b = 
RequestVoteRequestProto.newBuilder()
-        .setServerRequest(toRaftRpcRequestProtoBuilder(requestorId, replyId, 
groupId))
+        .setServerRequest(toRaftRpcRequestProtoBuilder(requestorId, replyId))
         .setCandidateTerm(term);
     if (lastEntry != null) {
       b.setCandidateLastEntry(toTermIndexProto(lastEntry));
@@ -399,10 +387,10 @@ public interface ServerProtoUtils {
   }
 
   static AppendEntriesReplyProto toAppendEntriesReplyProto(
-      RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId, long 
term,
+      RaftPeerId requestorId, RaftGroupMemberId replyId, long term,
       long followerCommit, long nextIndex, AppendResult result, long callId) {
     RaftRpcReplyProto.Builder rpcReply = toRaftRpcReplyProtoBuilder(
-        requestorId, replyId, groupId, result == AppendResult.SUCCESS)
+        requestorId, replyId, result == AppendResult.SUCCESS)
         .setCallId(callId);
     return AppendEntriesReplyProto.newBuilder()
         .setServerReply(rpcReply)
@@ -413,10 +401,10 @@ public interface ServerProtoUtils {
   }
 
   static AppendEntriesRequestProto toAppendEntriesRequestProto(
-      RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId, long 
leaderTerm,
+      RaftGroupMemberId requestorId, RaftPeerId replyId, long leaderTerm,
       List<LogEntryProto> entries, long leaderCommit, boolean initializing,
       TermIndex previous, Collection<CommitInfoProto> commitInfos, long 
callId) {
-    RaftRpcRequestProto.Builder rpcRequest = 
toRaftRpcRequestProtoBuilder(requestorId, replyId, groupId)
+    RaftRpcRequestProto.Builder rpcRequest = 
toRaftRpcRequestProtoBuilder(requestorId, replyId)
         .setCallId(callId);
     final AppendEntriesRequestProto.Builder b = AppendEntriesRequestProto
         .newBuilder()
@@ -431,7 +419,7 @@ public interface ServerProtoUtils {
     if (previous != null) {
       b.setPreviousLog(toTermIndexProto(previous));
     }
-    ProtoUtils.addCommitInfos(commitInfos, i -> b.addCommitInfos(i));
+    ProtoUtils.addCommitInfos(commitInfos, b::addCommitInfos);
     return b.build();
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index b40a131..7fbbc66 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -54,8 +54,7 @@ import static org.apache.ratis.server.impl.RaftServerImpl.LOG;
  * Common states of a raft peer. Protected by RaftServer's lock.
  */
 public class ServerState implements Closeable {
-  private final RaftPeerId selfId;
-  private final RaftGroupId groupId;
+  private final RaftGroupMemberId memberId;
   private final RaftServerImpl server;
   /** Raft log */
   private final RaftLog log;
@@ -96,13 +95,12 @@ public class ServerState implements Closeable {
   ServerState(RaftPeerId id, RaftGroup group, RaftProperties prop,
               RaftServerImpl server, StateMachine stateMachine)
       throws IOException {
-    this.selfId = id;
-    this.groupId = group.getGroupId();
+    this.memberId = RaftGroupMemberId.valueOf(id, group.getGroupId());
     this.server = server;
     RaftConfiguration initialConf = RaftConfiguration.newBuilder()
         .setConf(group.getPeers()).build();
     configurationManager = new ConfigurationManager(initialConf);
-    LOG.info("{}:{} {}", id, groupId, configurationManager);
+    LOG.info("{}: {}", getMemberId(), configurationManager);
 
     // use full uuid string to create a subdirectory
     final File dir = chooseStorageDir(RaftServerConfigKeys.storageDirs(prop),
@@ -126,10 +124,12 @@ public class ServerState implements Closeable {
     currentTerm.set(metadata.getTerm());
     votedFor = metadata.getVotedFor();
 
-    stateMachineUpdater = new StateMachineUpdater(stateMachine, server, log,
-         lastApplied, prop);
+    stateMachineUpdater = new StateMachineUpdater(stateMachine, server, this, 
lastApplied, prop);
   }
 
+  RaftGroupMemberId getMemberId() {
+    return memberId;
+  }
 
   static File chooseStorageDir(List<File> volumes, String targetSubDir) throws 
IOException {
     final Map<File, Integer> numberOfStorageDirPerVolume = new HashMap<>();
@@ -161,7 +161,7 @@ public class ServerState implements Closeable {
     SnapshotInfo snapshot = sm.getLatestSnapshot();
 
     if (snapshot == null || snapshot.getTermIndex().getIndex() < 0) {
-      return RaftServerConstants.INVALID_LOG_INDEX;
+      return RaftLog.INVALID_LOG_INDEX;
     }
 
     // get the raft configuration from raft metafile
@@ -201,10 +201,6 @@ public class ServerState implements Closeable {
     return configurationManager.getCurrent();
   }
 
-  public RaftPeerId getSelfId() {
-    return this.selfId;
-  }
-
   public long getCurrentTerm() {
     return currentTerm.get();
   }
@@ -231,7 +227,7 @@ public class ServerState implements Closeable {
    * Become a candidate and start leader election
    */
   long initElection() {
-    votedFor = selfId;
+    votedFor = getMemberId().getPeerId();
     setLeader(null, "initElection");
     return currentTerm.incrementAndGet();
   }
@@ -260,8 +256,8 @@ public class ServerState implements Closeable {
         lastNoLeaderTime = null;
         suffix = ", leader elected after " + previous.elapsedTimeMs() + "ms";
       }
-      LOG.info("{}:{} change Leader from {} to {} at term {} for {}{}",
-          selfId, groupId, leaderId, newLeaderId, getCurrentTerm(), op, 
suffix);
+      LOG.info("{}: change Leader from {} to {} at term {} for {}{}",
+          getMemberId(), leaderId, newLeaderId, getCurrentTerm(), op, suffix);
       leaderId = newLeaderId;
     }
   }
@@ -276,7 +272,7 @@ public class ServerState implements Closeable {
   }
 
   void becomeLeader() {
-    setLeader(selfId, "becomeLeader");
+    setLeader(getMemberId().getPeerId(), "becomeLeader");
   }
 
   public RaftLog getLog() {
@@ -340,7 +336,7 @@ public class ServerState implements Closeable {
 
   @Override
   public String toString() {
-    return selfId + ":t" + currentTerm + ", leader=" + leaderId
+    return getMemberId() + ":t" + currentTerm + ", leader=" + leaderId
         + ", voted=" + votedFor + ", raftlog=" + log + ", conf=" + 
getRaftConf();
   }
 
@@ -358,9 +354,8 @@ public class ServerState implements Closeable {
   void setRaftConf(long logIndex, RaftConfiguration conf) {
     configurationManager.addConfiguration(logIndex, conf);
     server.getServerRpc().addPeers(conf.getPeers());
-    LOG.info("{}:{} set configuration {} at {}", getSelfId(), groupId, conf,
-        logIndex);
-    LOG.trace("{}: {}", getSelfId(), configurationManager);
+    LOG.info("{}: set configuration {} at {}", getMemberId(), conf, logIndex);
+    LOG.trace("{}: {}", getMemberId(), configurationManager);
   }
 
   void updateConfiguration(LogEntryProto[] entries) {
@@ -388,11 +383,9 @@ public class ServerState implements Closeable {
     try {
       stateMachineUpdater.stopAndJoin();
     } catch (InterruptedException e) {
-      LOG.warn(getSelfId() +
-          ": Interrupted when joining stateMachineUpdater", e);
+      LOG.warn("{}: Interrupted when joining stateMachineUpdater", 
getMemberId(), e);
     }
-    LOG.info("{}:{} closes. The last applied log index is {}",
-        getSelfId(), groupId, getLastAppliedIndex());
+    LOG.info("{}: closes. applyIndex: {}", getMemberId(), 
getLastAppliedIndex());
 
     log.close();
     storage.close();
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index 699d8e0..f0a4c58 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -75,14 +75,14 @@ class StateMachineUpdater implements Runnable {
   private volatile State state = State.RUNNING;
 
   StateMachineUpdater(StateMachine stateMachine, RaftServerImpl server,
-      RaftLog raftLog, long lastAppliedIndex, RaftProperties properties) {
-    this.name = getClass().getSimpleName() + ":" + raftLog.getSelfId() + ":" + 
server.getGroupId();
+      ServerState serverState, long lastAppliedIndex, RaftProperties 
properties) {
+    this.name = serverState.getMemberId() + "-" + getClass().getSimpleName();
     this.infoIndexChange = s -> LOG.info("{}: {}", name, s);
     this.debugIndexChange = s -> LOG.debug("{}: {}", name, s);
 
     this.stateMachine = stateMachine;
     this.server = server;
-    this.raftLog = raftLog;
+    this.raftLog = serverState.getLog();
 
     this.appliedIndex = new RaftLogIndex("appliedIndex", lastAppliedIndex);
     this.snapshotIndex = new RaftLogIndex("snapshotIndex", lastAppliedIndex);
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index 353dffb..b209da9 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -21,6 +21,8 @@ import org.apache.log4j.Level;
 import org.apache.ratis.BaseTest;
 import org.apache.ratis.RaftTestUtil.SimpleOperation;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.TimeoutIOException;
 import org.apache.ratis.server.RaftServerConfigKeys;
@@ -423,6 +425,8 @@ public class TestSegmentedRaftLog extends BaseTest {
     RaftServerImpl server = mock(RaftServerImpl.class);
     RetryCache retryCache = RetryCacheTestUtil.createRetryCache();
     when(server.getRetryCache()).thenReturn(retryCache);
+    final RaftGroupMemberId id = 
RaftGroupMemberId.valueOf(RaftPeerId.valueOf("s0"), RaftGroupId.randomId());
+    when(server.getMemberId()).thenReturn(id);
     
doCallRealMethod().when(server).failClientRequest(any(LogEntryProto.class));
     try (SegmentedRaftLog raftLog =
              new SegmentedRaftLog(peerId, server, storage, -1, properties)) {

Reply via email to