RATIS-89. Add raft group and raft group ID. Contributed by Jing Zhao.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/466fc2c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/466fc2c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/466fc2c3

Branch: refs/heads/master
Commit: 466fc2c3a8fc984b3f94e939045fc4d8a7b0ae0c
Parents: ddb82cd
Author: Jing Zhao <[email protected]>
Authored: Tue Jun 13 19:42:53 2017 -0700
Committer: Jing Zhao <[email protected]>
Committed: Tue Jun 13 19:42:53 2017 -0700

----------------------------------------------------------------------
 .../org/apache/ratis/client/RaftClient.java     |  9 ++-
 .../ratis/client/impl/ClientImplUtils.java      |  5 +-
 .../ratis/client/impl/ClientProtoUtils.java     | 37 +++++++----
 .../ratis/client/impl/RaftClientImpl.java       | 13 ++--
 .../org/apache/ratis/protocol/ClientId.java     | 42 ++----------
 .../ratis/protocol/RaftClientMessage.java       | 13 +++-
 .../apache/ratis/protocol/RaftClientReply.java  | 15 +++--
 .../ratis/protocol/RaftClientRequest.java       |  8 +--
 .../org/apache/ratis/protocol/RaftGroup.java    | 55 ++++++++++++++++
 .../org/apache/ratis/protocol/RaftGroupId.java  | 41 ++++++++++++
 .../java/org/apache/ratis/protocol/RaftId.java  | 67 ++++++++++++++++++++
 .../org/apache/ratis/protocol/RaftPeerId.java   |  3 +-
 .../apache/ratis/protocol/RaftRpcMessage.java   |  2 +
 .../ratis/protocol/ReinitializeRequest.java     | 14 ++--
 .../ratis/protocol/SetConfigurationRequest.java |  4 +-
 .../java/org/apache/ratis/util/ProtoUtils.java  |  1 +
 .../TestRaftStateMachineException.java          |  4 +-
 .../ratis/grpc/client/AppendStreamer.java       | 14 ++--
 .../ratis/grpc/client/RaftOutputStream.java     |  5 +-
 .../ratis/grpc/MiniRaftClusterWithGRpc.java     |  7 +-
 .../org/apache/ratis/grpc/TestRaftStream.java   |  9 ++-
 .../hadooprpc/MiniRaftClusterWithHadoopRpc.java |  8 +--
 .../ratis/netty/MiniRaftClusterWithNetty.java   |  7 +-
 ratis-proto-shaded/src/main/proto/Raft.proto    | 11 ++++
 .../org/apache/ratis/server/RaftServer.java     |  8 +--
 .../ratis/server/impl/RaftServerImpl.java       | 40 +++++++-----
 .../ratis/server/impl/RaftServerProxy.java      | 22 +++----
 .../ratis/server/impl/ServerImplUtils.java      | 15 +----
 .../ratis/server/impl/ServerProtoUtils.java     | 29 +++++----
 .../apache/ratis/server/impl/ServerState.java   |  7 +-
 .../java/org/apache/ratis/MiniRaftCluster.java  | 67 +++++++++++---------
 .../ratis/RaftNotLeaderExceptionBaseTest.java   |  6 +-
 .../org/apache/ratis/RaftRetryCacheTests.java   |  9 +--
 .../impl/RaftReconfigurationBaseTest.java       | 14 ++--
 .../server/impl/ReinitializationBaseTest.java   | 15 +++--
 .../MiniRaftClusterWithSimulatedRpc.java        |  7 +-
 .../server/simulation/RaftServerReply.java      | 12 ++++
 .../server/simulation/RaftServerRequest.java    | 12 ++++
 .../statemachine/RaftSnapshotBaseTest.java      |  2 +-
 39 files changed, 433 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java 
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index 956a6de..1a7faf6 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -26,7 +26,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.Collection;
 import java.util.Objects;
 
 /** A client who sends requests to a raft service. */
@@ -64,7 +63,7 @@ public interface RaftClient extends Closeable {
   class Builder {
     private ClientId clientId;
     private RaftClientRpc clientRpc;
-    private Collection<RaftPeer> servers;
+    private RaftGroup group;
     private RaftPeerId leaderId;
     private RaftProperties properties;
     private TimeDuration retryInterval = 
RaftClientConfigKeys.Rpc.TIMEOUT_DEFAULT;
@@ -80,7 +79,7 @@ public interface RaftClient extends Closeable {
         retryInterval = RaftClientConfigKeys.Rpc.timeout(properties);
       }
       return ClientImplUtils.newRaftClient(clientId,
-          Objects.requireNonNull(servers, "The 'servers' field is not 
initialized."),
+          Objects.requireNonNull(group, "The 'servers' field is not 
initialized."),
           leaderId,
           Objects.requireNonNull(clientRpc, "The 'clientRpc' field is not 
initialized."),
           retryInterval);
@@ -93,8 +92,8 @@ public interface RaftClient extends Closeable {
     }
 
     /** Set servers. */
-    public Builder setServers(Collection<RaftPeer> servers) {
-      this.servers = servers;
+    public Builder setRaftGroup(RaftGroup group) {
+      this.group = group;
       return this;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
----------------------------------------------------------------------
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
index 882aa41..07b07b0 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
@@ -19,6 +19,7 @@ package org.apache.ratis.client.impl;
 
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.RaftClientRpc;
+import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.RaftPeer;
@@ -29,8 +30,8 @@ import java.util.Collection;
 /** Client utilities for internal use. */
 public class ClientImplUtils {
   public static RaftClient newRaftClient(
-      ClientId clientId, Collection<RaftPeer> peers, RaftPeerId leaderId,
+      ClientId clientId, RaftGroup group, RaftPeerId leaderId,
       RaftClientRpc clientRpc, TimeDuration retryInterval) {
-    return new RaftClientImpl(clientId, peers, leaderId, clientRpc, 
retryInterval);
+    return new RaftClientImpl(clientId, group, leaderId, clientRpc, 
retryInterval);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
----------------------------------------------------------------------
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index ff49109..146622b 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -30,27 +30,31 @@ import static 
org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto.Exce
 
 public class ClientProtoUtils {
   public static RaftRpcReplyProto.Builder toRaftRpcReplyProtoBuilder(
-      byte[] requestorId, byte[] replyId, long callId, boolean success) {
+      byte[] requestorId, byte[] replyId, byte[] groupId, long callId, boolean 
success) {
     return RaftRpcReplyProto.newBuilder()
         .setRequestorId(ProtoUtils.toByteString(requestorId))
         .setReplyId(ProtoUtils.toByteString(replyId))
+        
.setRaftGroupId(RaftGroupIdProto.newBuilder().setId(ProtoUtils.toByteString(groupId)))
         .setCallId(callId)
         .setSuccess(success);
   }
 
   public static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
-      byte[] requesterId, byte[] replyId, long callId) {
+      byte[] requesterId, byte[] replyId, byte[] raftGroupId, long callId) {
     return RaftRpcRequestProto.newBuilder()
         .setRequestorId(ProtoUtils.toByteString(requesterId))
         .setReplyId(ProtoUtils.toByteString(replyId))
+        
.setRaftGroupId(RaftGroupIdProto.newBuilder().setId(ProtoUtils.toByteString(raftGroupId)))
         .setCallId(callId);
   }
 
   public static RaftClientRequest toRaftClientRequest(RaftClientRequestProto 
p) {
     ClientId clientId = new ClientId(
         p.getRpcRequest().getRequestorId().toByteArray());
+    RaftGroupId groupId =
+        new 
RaftGroupId(p.getRpcRequest().getRaftGroupId().getId().toByteArray());
     RaftPeerId serverId = RaftPeerId.valueOf(p.getRpcRequest().getReplyId());
-    return new RaftClientRequest(clientId, serverId,
+    return new RaftClientRequest(clientId, serverId, groupId,
         p.getRpcRequest().getCallId(),
         toMessage(p.getMessage()), p.getReadOnly());
   }
@@ -59,18 +63,19 @@ public class ClientProtoUtils {
       RaftClientRequest request) {
     return RaftClientRequestProto.newBuilder()
         
.setRpcRequest(toRaftRpcRequestProtoBuilder(request.getClientId().toBytes(),
-            request.getServerId().toBytes(), request.getCallId()))
+            request.getServerId().toBytes(), 
request.getRaftGroupId().toBytes(),
+            request.getCallId()))
         .setMessage(toClientMessageEntryProto(request.getMessage()))
         .setReadOnly(request.isReadOnly())
         .build();
   }
 
   public static RaftClientRequestProto genRaftClientRequestProto(
-      ClientId clientId, RaftPeerId serverId, long callId, ByteString content,
-      boolean readOnly) {
+      ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId,
+      ByteString content, boolean readOnly) {
     return RaftClientRequestProto.newBuilder()
         .setRpcRequest(toRaftRpcRequestProtoBuilder(clientId.toBytes(),
-            serverId.toBytes(), callId))
+            serverId.toBytes(), groupId.toBytes(), callId))
         .setMessage(ClientMessageEntryProto.newBuilder().setContent(content))
         .setReadOnly(readOnly)
         .build();
@@ -81,7 +86,8 @@ public class ClientProtoUtils {
     final RaftClientReplyProto.Builder b = RaftClientReplyProto.newBuilder();
     if (reply != null) {
       b.setRpcReply(toRaftRpcReplyProtoBuilder(reply.getClientId().toBytes(),
-          reply.getServerId().toBytes(), reply.getCallId(), 
reply.isSuccess()));
+          reply.getServerId().toBytes(), reply.getRaftGroupId().toBytes(),
+          reply.getCallId(), reply.isSuccess()));
       if (reply.getMessage() != null) {
         b.setMessage(toClientMessageEntryProto(reply.getMessage()));
       }
@@ -128,9 +134,11 @@ public class ClientProtoUtils {
           smeProto.getExceptionClassName(), smeProto.getErrorMsg(),
           smeProto.getStacktrace());
     }
-    return new RaftClientReply(new ClientId(rp.getRequestorId().toByteArray()),
-        RaftPeerId.valueOf(rp.getReplyId()),
-        rp.getCallId(), rp.getSuccess(), toMessage(replyProto.getMessage()), 
e);
+    ClientId clientId = new ClientId(rp.getRequestorId().toByteArray());
+    RaftGroupId groupId = new 
RaftGroupId(rp.getRaftGroupId().getId().toByteArray());
+    return new RaftClientReply(clientId, RaftPeerId.valueOf(rp.getReplyId()),
+        groupId, rp.getCallId(), rp.getSuccess(),
+        toMessage(replyProto.getMessage()), e);
   }
 
   private static StateMachineException wrapStateMachineException(
@@ -171,6 +179,7 @@ public class ClientProtoUtils {
     return new SetConfigurationRequest(
         new ClientId(m.getRequestorId().toByteArray()),
         RaftPeerId.valueOf(m.getReplyId()),
+        new RaftGroupId(m.getRaftGroupId().getId().toByteArray()),
         p.getRpcRequest().getCallId(), peers);
   }
 
@@ -180,6 +189,7 @@ public class ClientProtoUtils {
         .setRpcRequest(toRaftRpcRequestProtoBuilder(
             request.getClientId().toBytes(),
             request.getServerId().toBytes(),
+            request.getRaftGroupId().toBytes(),
             request.getCallId()))
         .addAllPeers(ProtoUtils.toRaftPeerProtos(
             Arrays.asList(request.getPeersInNewConf())))
@@ -193,6 +203,7 @@ public class ClientProtoUtils {
     return new ReinitializeRequest(
         new ClientId(m.getRequestorId().toByteArray()),
         RaftPeerId.valueOf(m.getReplyId()),
+        new RaftGroupId(m.getRaftGroupId().getId().toByteArray()),
         p.getRpcRequest().getCallId(), peers);
   }
 
@@ -202,9 +213,9 @@ public class ClientProtoUtils {
         .setRpcRequest(toRaftRpcRequestProtoBuilder(
             request.getClientId().toBytes(),
             request.getServerId().toBytes(),
+            request.getRaftGroupId().toBytes(),
             request.getCallId()))
-        .addAllPeers(ProtoUtils.toRaftPeerProtos(
-            Arrays.asList(request.getPeersInNewConf())))
+        
.addAllPeers(ProtoUtils.toRaftPeerProtos(request.getPeersInGroup().getPeers()))
         .build();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 75ef2a4..c6571ae 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -26,6 +26,7 @@ import org.apache.ratis.protocol.*;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.concurrent.atomic.AtomicLong;
@@ -43,16 +44,18 @@ final class RaftClientImpl implements RaftClient {
   private final ClientId clientId;
   private final RaftClientRpc clientRpc;
   private final Collection<RaftPeer> peers;
+  private final RaftGroupId groupId;
   private final TimeDuration retryInterval;
 
   private volatile RaftPeerId leaderId;
 
-  RaftClientImpl(ClientId clientId, Collection<RaftPeer> peers,
+  RaftClientImpl(ClientId clientId, RaftGroup group,
       RaftPeerId leaderId, RaftClientRpc clientRpc,
       TimeDuration retryInterval) {
     this.clientId = clientId;
     this.clientRpc = clientRpc;
-    this.peers = peers;
+    this.peers = new ArrayList<>(group.getPeers());
+    this.groupId = group.getGroupId();
     this.leaderId = leaderId != null? leaderId : 
peers.iterator().next().getId();
     this.retryInterval = retryInterval;
 
@@ -77,7 +80,7 @@ final class RaftClientImpl implements RaftClient {
   private RaftClientReply send(Message message, boolean readOnly) throws 
IOException {
     final long callId = nextCallId();
     return sendRequestWithRetry(() -> new RaftClientRequest(
-        clientId, leaderId, callId, message, readOnly));
+        clientId, leaderId, groupId, callId, message, readOnly));
   }
 
   @Override
@@ -87,7 +90,7 @@ final class RaftClientImpl implements RaftClient {
     // also refresh the rpc proxies for these peers
     addServers(peersInNewConf);
     return sendRequestWithRetry(() -> new SetConfigurationRequest(
-        clientId, leaderId, callId, peersInNewConf));
+        clientId, leaderId, groupId, callId, peersInNewConf));
   }
 
   @Override
@@ -96,7 +99,7 @@ final class RaftClientImpl implements RaftClient {
     final long callId = nextCallId();
     addServers(peersInNewConf);
     return sendRequest(new ReinitializeRequest(
-        clientId, server, callId, peersInNewConf));
+        clientId, server, groupId, callId, peersInNewConf));
   }
 
   private void addServers(RaftPeer[] peersInNewConf) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-common/src/main/java/org/apache/ratis/protocol/ClientId.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/ClientId.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/ClientId.java
index 310e207..2af6558 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/ClientId.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/ClientId.java
@@ -17,60 +17,28 @@
  */
 package org.apache.ratis.protocol;
 
-import org.apache.ratis.util.Preconditions;
-
-import java.nio.ByteBuffer;
-import java.util.Objects;
 import java.util.UUID;
 
 /**
  * Id of Raft client. Should be globally unique so that raft peers can use it
  * to correctly identify retry requests from the same client.
  */
-public class ClientId {
-  public static final int BYTE_LENGTH = 16;
-
+public class ClientId extends RaftId {
   public static ClientId createId() {
     UUID uuid = UUID.randomUUID();
     return new ClientId(uuid);
   }
 
-  private final UUID uuid;
-
-  private ClientId(UUID id) {
-    this.uuid = Objects.requireNonNull(id, "id == null");
-  }
-
   public ClientId(byte[] data) {
-    Objects.requireNonNull(data, "data == null");
-    Preconditions.assertTrue(data.length == BYTE_LENGTH,
-        "data.length = %s != BYTE_LENGTH = %s", data.length, BYTE_LENGTH);
-    ByteBuffer buffer = ByteBuffer.wrap(data);
-    this.uuid = new UUID(buffer.getLong(), buffer.getLong());
+    super(data);
   }
 
-  public byte[] toBytes() {
-    ByteBuffer buf = ByteBuffer.wrap(new byte[BYTE_LENGTH]);
-    buf.putLong(uuid.getMostSignificantBits());
-    buf.putLong(uuid.getLeastSignificantBits());
-    return buf.array();
+  private ClientId(UUID uuid) {
+    super(uuid);
   }
 
   @Override
   public String toString() {
-    return uuid.toString();
-  }
-
-
-  @Override
-  public boolean equals(Object other) {
-    return other == this ||
-        (other instanceof ClientId &&
-            uuid.equals(((ClientId) other).uuid));
-  }
-
-  @Override
-  public int hashCode() {
-    return uuid.hashCode();
+    return "client-" + super.toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java
----------------------------------------------------------------------
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java
index f205f34..49f2b6f 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java
@@ -20,10 +20,13 @@ package org.apache.ratis.protocol;
 public abstract class RaftClientMessage implements RaftRpcMessage {
   private final ClientId clientId;
   private final RaftPeerId serverId;
+  private final RaftGroupId groupId;
 
-  public RaftClientMessage(ClientId clientId, RaftPeerId serverId) {
+  public RaftClientMessage(ClientId clientId, RaftPeerId serverId,
+      RaftGroupId groupId) {
     this.clientId = clientId;
     this.serverId = serverId;
+    this.groupId = groupId;
   }
 
   @Override
@@ -45,7 +48,13 @@ public abstract class RaftClientMessage implements 
RaftRpcMessage {
   }
 
   @Override
+  public RaftGroupId getRaftGroupId() {
+    return groupId;
+  }
+
+  @Override
   public String toString() {
-    return getClass().getSimpleName() + "(" + clientId + "->" + serverId + ")";
+    return getClass().getSimpleName() + "(" + clientId + "->" + serverId
+        + ") in raft group " + groupId;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
----------------------------------------------------------------------
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
index 7179505..60dc6c1 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
@@ -32,9 +32,10 @@ public class RaftClientReply extends RaftClientMessage {
   private final RaftException exception;
   private final Message message;
 
-  public RaftClientReply(ClientId clientId, RaftPeerId serverId, long callId,
-      boolean success, Message message, RaftException exception) {
-    super(clientId, serverId);
+  public RaftClientReply(ClientId clientId, RaftPeerId serverId,
+      RaftGroupId groupId, long callId, boolean success, Message message,
+      RaftException exception) {
+    super(clientId, serverId, groupId);
     this.success = success;
     this.callId = callId;
     this.message = message;
@@ -43,13 +44,13 @@ public class RaftClientReply extends RaftClientMessage {
 
   public RaftClientReply(RaftClientRequest request,
       RaftException exception) {
-    this(request.getClientId(), request.getServerId(), request.getCallId(),
-        false, null, exception);
+    this(request.getClientId(), request.getServerId(), 
request.getRaftGroupId(),
+        request.getCallId(), false, null, exception);
   }
 
   public RaftClientReply(RaftClientRequest request, Message message) {
-    this(request.getClientId(), request.getServerId(), request.getCallId(),
-        true, message, null);
+    this(request.getClientId(), request.getServerId(), 
request.getRaftGroupId(),
+        request.getCallId(), true, message, null);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
----------------------------------------------------------------------
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index 898c166..41bdb2e 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -26,13 +26,13 @@ public class RaftClientRequest extends RaftClientMessage {
   private final boolean readOnly;
 
   public RaftClientRequest(ClientId clientId, RaftPeerId serverId,
-      long callId, Message message) {
-    this(clientId, serverId, callId, message, false);
+      RaftGroupId groupId, long callId, Message message) {
+    this(clientId, serverId, groupId, callId, message, false);
   }
 
   public RaftClientRequest(ClientId clientId, RaftPeerId serverId,
-      long callId, Message message, boolean readOnly) {
-    super(clientId, serverId);
+      RaftGroupId groupId, long callId, Message message, boolean readOnly) {
+    super(clientId, serverId, groupId);
     this.callId = callId;
     this.message = message;
     this.readOnly = readOnly;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
----------------------------------------------------------------------
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
new file mode 100644
index 0000000..3ec0fdc
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+import org.apache.ratis.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Description of a raft group. It has a globally unique ID and a group of raft
+ * peers.
+ */
+public class RaftGroup {
+  /** UTF-8 string as id */
+  private final RaftGroupId groupId;
+  /** The group of raft peers */
+  private final List<RaftPeer> peers;
+
+  public RaftGroup(RaftGroupId groupId, RaftPeer[] peers) {
+    Preconditions.assertTrue(peers != null);
+    this.groupId = groupId;
+    this.peers = Collections.unmodifiableList(new 
ArrayList<>(Arrays.asList(peers)));
+  }
+
+  public RaftGroupId getGroupId() {
+    return groupId;
+  }
+
+  public List<RaftPeer> getPeers() {
+    return peers;
+  }
+
+  @Override
+  public String toString() {
+    return groupId + ":" + Arrays.asList(peers);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroupId.java
----------------------------------------------------------------------
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroupId.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroupId.java
new file mode 100644
index 0000000..0cb3a07
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroupId.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+import java.util.UUID;
+
+public class RaftGroupId extends RaftId {
+
+  public static RaftGroupId createId() {
+    UUID uuid = UUID.randomUUID();
+    return new RaftGroupId(uuid);
+  }
+
+  protected RaftGroupId(UUID id) {
+    super(id);
+  }
+
+  public RaftGroupId(byte[] data) {
+    super(data);
+  }
+
+  @Override
+  public String toString() {
+    return "group-" + super.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java
new file mode 100644
index 0000000..86d7ccc
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+import org.apache.ratis.util.Preconditions;
+
+import java.nio.ByteBuffer;
+import java.util.Objects;
+import java.util.UUID;
+
+public abstract class RaftId {
+  public static final int BYTE_LENGTH = 16;
+
+  private final UUID uuid;
+
+  protected RaftId(UUID id) {
+    this.uuid = Objects.requireNonNull(id, "id == null");
+  }
+
+  public RaftId(byte[] data) {
+    Objects.requireNonNull(data, "data == null");
+    Preconditions.assertTrue(data.length == BYTE_LENGTH,
+        "data.length = %s != BYTE_LENGTH = %s", data.length, BYTE_LENGTH);
+    ByteBuffer buffer = ByteBuffer.wrap(data);
+    this.uuid = new UUID(buffer.getLong(), buffer.getLong());
+  }
+
+  public byte[] toBytes() {
+    ByteBuffer buf = ByteBuffer.wrap(new byte[BYTE_LENGTH]);
+    buf.putLong(uuid.getMostSignificantBits());
+    buf.putLong(uuid.getLeastSignificantBits());
+    return buf.array();
+  }
+
+  @Override
+  public String toString() {
+    return uuid.toString();
+  }
+
+
+  @Override
+  public boolean equals(Object other) {
+    return other == this ||
+        (other instanceof RaftId &&
+            uuid.equals(((RaftId) other).uuid));
+  }
+
+  @Override
+  public int hashCode() {
+    return uuid.hashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java
----------------------------------------------------------------------
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java
index 3fe3d0e..4e1a5d8 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java
@@ -39,7 +39,7 @@ public class RaftPeerId {
   }
 
   public static RaftPeerId valueOf(String id) {
-    return stringMap.computeIfAbsent(id, key -> new RaftPeerId(key));
+    return stringMap.computeIfAbsent(id, RaftPeerId::new);
   }
 
   public static RaftPeerId getRaftPeerId(String id) {
@@ -56,6 +56,7 @@ public class RaftPeerId {
     Preconditions.assertTrue(!id.isEmpty(), "id is an empty string.");
     this.id = id.getBytes(StandardCharsets.UTF_8);
   }
+
   private RaftPeerId(byte[] id) {
     this.id = Objects.requireNonNull(id, "id == null");
     Preconditions.assertTrue(id.length > 0, "id is an empty array.");

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRpcMessage.java
----------------------------------------------------------------------
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRpcMessage.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRpcMessage.java
index 58aa9ee..70727af 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRpcMessage.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRpcMessage.java
@@ -24,4 +24,6 @@ public interface RaftRpcMessage {
   String getRequestorId();
 
   String getReplierId();
+
+  RaftGroupId getRaftGroupId();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java
----------------------------------------------------------------------
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java
index 0a89340..b69c845 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java
@@ -20,20 +20,20 @@ package org.apache.ratis.protocol;
 import java.util.Arrays;
 
 public class ReinitializeRequest extends RaftClientRequest {
-  private final RaftPeer[] peers;
+  private final RaftGroup group;
 
   public ReinitializeRequest(ClientId clientId, RaftPeerId serverId,
-                             long callId, RaftPeer[] peers) {
-    super(clientId, serverId, callId, null);
-    this.peers = peers;
+      RaftGroupId groupId, long callId, RaftPeer[] peers) {
+    super(clientId, serverId, groupId, callId, null);
+    this.group = new RaftGroup(groupId, peers);
   }
 
-  public RaftPeer[] getPeersInNewConf() {
-    return peers;
+  public RaftGroup getPeersInGroup() {
+    return group;
   }
 
   @Override
   public String toString() {
-    return super.toString() + ", peers:" + Arrays.asList(getPeersInNewConf());
+    return super.toString() + ", peers:" + Arrays.asList(getPeersInGroup());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
----------------------------------------------------------------------
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
 
b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
index 77d545c..83be197 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
@@ -23,8 +23,8 @@ public class SetConfigurationRequest extends 
RaftClientRequest {
   private final RaftPeer[] peers;
 
   public SetConfigurationRequest(ClientId clientId, RaftPeerId serverId,
-      long callId, RaftPeer[] peers) {
-    super(clientId, serverId, callId, null);
+      RaftGroupId groupId, long callId, RaftPeer[] peers) {
+    super(clientId, serverId, groupId, callId, null);
     this.peers = peers;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
index 527c4e8..7d73251 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.shaded.com.google.protobuf.ByteString;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
----------------------------------------------------------------------
diff --git 
a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
 
b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
index b16bf67..5845ca9 100644
--- 
a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
+++ 
b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
@@ -116,7 +116,7 @@ public class TestRaftStateMachineException {
     final RaftClientRpc rpc = client.getClientRpc();
     final long callId = 999;
     RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId,
-        callId, new RaftTestUtil.SimpleMessage("message"));
+        cluster.getGroupId(), callId, new 
RaftTestUtil.SimpleMessage("message"));
     RaftClientReply reply = rpc.sendRequest(r);
     Assert.assertFalse(reply.isSuccess());
     Assert.assertNotNull(reply.getStateMachineException());
@@ -163,7 +163,7 @@ public class TestRaftStateMachineException {
     final RaftClientRpc rpc = client.getClientRpc();
     final long callId = 999;
     RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId,
-        callId, new RaftTestUtil.SimpleMessage("message"));
+        cluster.getGroupId(), callId, new 
RaftTestUtil.SimpleMessage("message"));
     RaftClientReply reply = rpc.sendRequest(r);
     Assert.assertTrue(reply.hasStateMachineException());
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
index f9ff00a..75d1dd7 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
@@ -86,8 +86,9 @@ public class AppendStreamer implements Closeable {
   private volatile RunningState running = RunningState.RUNNING;
   private final ExceptionAndRetry exceptionAndRetry;
   private final Sender senderThread;
+  private final RaftGroupId groupId;
 
-  AppendStreamer(RaftProperties prop, Collection<RaftPeer> peers,
+  AppendStreamer(RaftProperties prop, RaftGroup group,
       RaftPeerId leaderId, ClientId clientId) {
     this.clientId = clientId;
     maxPendingNum = GrpcConfigKeys.OutputStream.outstandingAppendsMax(prop);
@@ -95,11 +96,12 @@ public class AppendStreamer implements Closeable {
     ackQueue = new ConcurrentLinkedDeque<>();
     exceptionAndRetry = new ExceptionAndRetry(prop);
 
-    this.peers = peers.stream().collect(
+    this.groupId = group.getGroupId();
+    this.peers = group.getPeers().stream().collect(
         Collectors.toMap(RaftPeer::getId, Function.identity()));
     proxyMap = new PeerProxyMap<>(
         raftPeer -> new RaftClientProtocolProxy(raftPeer, 
ResponseHandler::new));
-    proxyMap.addPeers(peers);
+    proxyMap.addPeers(group.getPeers());
     refreshLeaderProxy(leaderId, null);
 
     senderThread = new Sender();
@@ -155,7 +157,7 @@ public class AppendStreamer implements Closeable {
     if (isRunning()) {
       // wrap the current buffer into a RaftClientRequestProto
       final RaftClientRequestProto request = genRaftClientRequestProto(
-          clientId, leaderId, seqNum, content, false);
+          clientId, leaderId, groupId, seqNum, content, false);
       dataQueue.offer(request);
       this.notifyAll();
     } else {
@@ -361,8 +363,8 @@ public class AppendStreamer implements Closeable {
         RaftClientRequestProto newRequest = RaftClientRequestProto.newBuilder()
             .setMessage(oldRequest.getMessage())
             .setReadOnly(oldRequest.getReadOnly())
-            .setRpcRequest(toRaftRpcRequestProtoBuilder(
-                clientId.toBytes(), newLeader.toBytes(), r.getCallId()))
+            .setRpcRequest(toRaftRpcRequestProtoBuilder(clientId.toBytes(),
+                newLeader.toBytes(), groupId.toBytes(), r.getCallId()))
             .build();
         dataQueue.offerFirst(newRequest);
       }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java
index 33d3d22..09d57a0 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java
@@ -20,6 +20,7 @@ package org.apache.ratis.grpc.client;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.grpc.GrpcConfigKeys;
 import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.util.ProtoUtils;
@@ -40,12 +41,12 @@ public class RaftOutputStream extends OutputStream {
   private boolean closed = false;
 
   public RaftOutputStream(RaftProperties prop, ClientId clientId,
-      Collection<RaftPeer> peers, RaftPeerId leaderId) {
+      RaftGroup group, RaftPeerId leaderId) {
     final int bufferSize = 
GrpcConfigKeys.OutputStream.bufferSize(prop).getSizeInt();
     buf = new byte[bufferSize];
     count = 0;
     this.clientId = clientId;
-    streamer = new AppendStreamer(prop, peers, leaderId, clientId);
+    streamer = new AppendStreamer(prop, group, leaderId, clientId);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java 
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
index 1cf55ad..a5992b1 100644
--- 
a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
+++ 
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
@@ -21,6 +21,7 @@ import org.apache.ratis.MiniRaftCluster;
 import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.server.RaftServer;
@@ -49,10 +50,10 @@ public class MiniRaftClusterWithGRpc extends 
MiniRaftCluster.RpcBase {
 
   @Override
   protected RaftServerProxy newRaftServer(
-      RaftPeerId id, StateMachine stateMachine, RaftConfiguration conf,
+      RaftPeerId id, StateMachine stateMachine, RaftGroup group,
       RaftProperties properties) throws IOException {
-    GrpcConfigKeys.Server.setPort(properties, getPort(id, conf));
-    return ServerImplUtils.newRaftServer(id, stateMachine, conf, properties, 
null);
+    GrpcConfigKeys.Server.setPort(properties, getPort(id, group));
+    return ServerImplUtils.newRaftServer(id, group, stateMachine, properties, 
null);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java 
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
index 1efe4d3..c67572b 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
@@ -57,7 +57,6 @@ public class TestRaftStream {
 
   private MiniRaftClusterWithGRpc cluster;
 
-
   @After
   public void tearDown() {
     if (cluster != null) {
@@ -91,7 +90,7 @@ public class TestRaftStream {
     final long seed = r.nextLong();
     r.setSeed(seed);
     try (RaftOutputStream out = new RaftOutputStream(prop, ClientId.createId(),
-        cluster.getPeers(), leader.getId())) {
+        cluster.getGroup(), leader.getId())) {
       for (int i = 0; i < 500; i++) { // generate 500 requests
         out.write(toBytes(r.nextInt()));
       }
@@ -128,7 +127,7 @@ public class TestRaftStream {
 
     RaftServerImpl leader = waitForLeader(cluster);
     RaftOutputStream out = new RaftOutputStream(prop, ClientId.createId(),
-        cluster.getPeers(), leader.getId());
+        cluster.getGroup(), leader.getId());
 
     int[] lengths = new int[]{1, 500, 1023, 1024, 1025, 2048, 3000, 3072};
     ByteValue[] values = new ByteValue[lengths.length];
@@ -207,7 +206,7 @@ public class TestRaftStream {
     RaftServerImpl leader = waitForLeader(cluster);
 
     RaftOutputStream out = new RaftOutputStream(prop, ClientId.createId(),
-        cluster.getPeers(), leader.getId());
+        cluster.getGroup(), leader.getId());
 
     byte[] b1 = new byte[ByteValue.BUFFERSIZE / 2];
     Arrays.fill(b1, (byte) 1);
@@ -273,7 +272,7 @@ public class TestRaftStream {
       LOG.info("Writer thread starts");
       int count = 0;
       try (RaftOutputStream out = new RaftOutputStream(prop, 
ClientId.createId(),
-          cluster.getPeers(), leader.getId())) {
+          cluster.getGroup(), leader.getId())) {
         while (running.get()) {
           out.write(toBytes(count++));
           Thread.sleep(10);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java
 
b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java
index c52c81d..bc420f4 100644
--- 
a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java
+++ 
b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java
@@ -23,10 +23,10 @@ import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.hadooprpc.server.HadoopRpcService;
+import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
-import org.apache.ratis.server.impl.RaftConfiguration;
 import org.apache.ratis.server.impl.RaftServerProxy;
 import org.apache.ratis.server.impl.ServerImplUtils;
 import org.apache.ratis.statemachine.StateMachine;
@@ -73,13 +73,13 @@ public class MiniRaftClusterWithHadoopRpc extends 
MiniRaftCluster.RpcBase {
 
   @Override
   protected RaftServerProxy newRaftServer(
-      RaftPeerId id, StateMachine stateMachine, RaftConfiguration conf,
+      RaftPeerId id, StateMachine stateMachine, RaftGroup group,
       RaftProperties properties) throws IOException {
     final Configuration hconf = new Configuration(hadoopConf);
-    final String address = "0.0.0.0:" + getPort(id, conf);
+    final String address = "0.0.0.0:" + getPort(id, group);
     HadoopConfigKeys.Ipc.setAddress(hconf, address);
 
-    return ServerImplUtils.newRaftServer(id, stateMachine, conf, properties,
+    return ServerImplUtils.newRaftServer(id, group, stateMachine, properties,
         HadoopFactory.newRaftParameters(hconf));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
----------------------------------------------------------------------
diff --git 
a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
 
b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
index 02a7493..e6b19f8 100644
--- 
a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
+++ 
b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
@@ -22,6 +22,7 @@ import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.netty.server.NettyRpcService;
+import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.server.impl.*;
@@ -48,10 +49,10 @@ public class MiniRaftClusterWithNetty extends 
MiniRaftCluster.RpcBase {
 
   @Override
   protected RaftServerProxy newRaftServer(
-      RaftPeerId id, StateMachine stateMachine, RaftConfiguration conf,
+      RaftPeerId id, StateMachine stateMachine, RaftGroup group,
       RaftProperties properties) throws IOException {
-    NettyConfigKeys.Server.setPort(properties, getPort(id, conf));
-    return ServerImplUtils.newRaftServer(id, stateMachine, conf, properties, 
null);
+    NettyConfigKeys.Server.setPort(properties, getPort(id, group));
+    return ServerImplUtils.newRaftServer(id, group, stateMachine, properties, 
null);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-proto-shaded/src/main/proto/Raft.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto 
b/ratis-proto-shaded/src/main/proto/Raft.proto
index b53181f..ed145df 100644
--- a/ratis-proto-shaded/src/main/proto/Raft.proto
+++ b/ratis-proto-shaded/src/main/proto/Raft.proto
@@ -26,6 +26,15 @@ message RaftPeerProto {
   string address = 2; // e.g. IP address, hostname etc.
 }
 
+message RaftGroupIdProto {
+  bytes id = 1;
+}
+
+message RaftGroupProto {
+  RaftGroupIdProto groupId = 1;
+  repeated RaftPeerProto peers = 2;
+}
+
 message RaftConfigurationProto {
   repeated RaftPeerProto peers = 1; // the peers in the current or new conf
   repeated RaftPeerProto oldPeers = 2; // the peers in the old conf
@@ -66,6 +75,7 @@ message RaftRpcRequestProto {
   bytes requestorId = 1;
   bytes replyId = 2;
   uint64 callId = 3;
+  RaftGroupIdProto raftGroupId = 4;
 }
 
 message RaftRpcReplyProto {
@@ -73,6 +83,7 @@ message RaftRpcReplyProto {
   bytes replyId = 2;
   uint64 callId = 3;
   bool success = 4;
+  RaftGroupIdProto raftGroupId = 5;
 }
 
 message FileChunkProto {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java 
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
index 0899dd1..d5a46f9 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -61,7 +61,7 @@ public interface RaftServer extends Closeable, RpcType.Get, 
RaftServerProtocol,
   class Builder {
     private RaftPeerId serverId;
     private StateMachine stateMachine;
-    private Iterable<RaftPeer> peers;
+    private RaftGroup group;
     private RaftProperties properties;
     private Parameters parameters;
 
@@ -69,8 +69,8 @@ public interface RaftServer extends Closeable, RpcType.Get, 
RaftServerProtocol,
     public RaftServer build() throws IOException {
       return ServerImplUtils.newRaftServer(
           Objects.requireNonNull(serverId, "The 'serverId' field is not 
initialized."),
+          Objects.requireNonNull(group, "The 'peers' field is not 
initialized."),
           Objects.requireNonNull(stateMachine, "The 'stateMachine' is not 
initialized."),
-          Objects.requireNonNull(peers, "The 'peers' field is not 
initialized."),
           Objects.requireNonNull(properties, "The 'properties' field is not 
initialized."),
           parameters);
     }
@@ -88,8 +88,8 @@ public interface RaftServer extends Closeable, RpcType.Get, 
RaftServerProtocol,
     }
 
     /** Set all the peers (including the server being built) in the Raft 
cluster. */
-    public Builder setPeers(Iterable<RaftPeer> peers) {
-      this.peers = peers;
+    public Builder setPeers(RaftGroup group) {
+      this.group = group;
       return this;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 9685fbc..88cf240 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -82,16 +82,18 @@ public class RaftServerImpl implements RaftServerProtocol,
 
   private final RetryCache retryCache;
 
-  RaftServerImpl(RaftPeerId id, RaftServerProxy proxy,
-      RaftConfiguration raftConf, RaftProperties properties)
-      throws IOException {
+  private final RaftGroupId groupId;
+
+  RaftServerImpl(RaftPeerId id, RaftGroup group, RaftServerProxy proxy,
+      RaftProperties properties) throws IOException {
+    this.groupId = group.getGroupId();
     this.lifeCycle = new LifeCycle(id);
     minTimeoutMs = 
RaftServerConfigKeys.Rpc.timeoutMin(properties).toInt(TimeUnit.MILLISECONDS);
     maxTimeoutMs = 
RaftServerConfigKeys.Rpc.timeoutMax(properties).toInt(TimeUnit.MILLISECONDS);
     Preconditions.assertTrue(maxTimeoutMs > minTimeoutMs,
         "max timeout: %s, min timeout: %s", maxTimeoutMs, minTimeoutMs);
     this.proxy = proxy;
-    this.state = new ServerState(id, raftConf, properties, this, 
proxy.getStateMachine());
+    this.state = new ServerState(id, group, properties, this, 
proxy.getStateMachine());
     this.retryCache = initRetryCache(properties);
   }
 
@@ -122,6 +124,10 @@ public class RaftServerImpl implements RaftServerProtocol,
         maxTimeoutMs - minTimeoutMs + 1);
   }
 
+  RaftGroupId getGroupId() {
+    return groupId;
+  }
+
   StateMachine getStateMachine() {
     return proxy.getStateMachine();
   }
@@ -582,7 +588,7 @@ public class RaftServerImpl implements RaftServerProtocol,
         shouldShutdown = true;
       }
       reply = ServerProtoUtils.toRequestVoteReplyProto(candidateId, getId(),
-          voteGranted, state.getCurrentTerm(), shouldShutdown);
+          groupId, voteGranted, state.getCurrentTerm(), shouldShutdown);
       if (LOG.isDebugEnabled()) {
         LOG.debug("{} replies to vote request: {}. Peer's state: {}",
             getId(), ProtoUtils.toString(reply), state);
@@ -671,7 +677,7 @@ public class RaftServerImpl implements RaftServerProtocol,
       currentTerm = state.getCurrentTerm();
       if (!recognized) {
         final AppendEntriesReplyProto reply = 
ServerProtoUtils.toAppendEntriesReplyProto(
-            leaderId, getId(), currentTerm, nextIndex, NOT_LEADER);
+            leaderId, getId(), groupId, currentTerm, nextIndex, NOT_LEADER);
         if (LOG.isDebugEnabled()) {
           LOG.debug("{}: do not recognize leader. Reply: {}",
               getId(), ProtoUtils.toString(reply));
@@ -697,7 +703,7 @@ public class RaftServerImpl implements RaftServerProtocol,
       // last index should have been committed.
       if (previous != null && !containPrevious(previous)) {
         final AppendEntriesReplyProto reply =
-            ServerProtoUtils.toAppendEntriesReplyProto(leaderId, getId(),
+            ServerProtoUtils.toAppendEntriesReplyProto(leaderId, getId(), 
groupId,
                 currentTerm, Math.min(nextIndex, previous.getIndex()), 
INCONSISTENCY);
         if (LOG.isDebugEnabled()) {
           LOG.debug("{}: inconsistency entries. Leader previous:{}, Reply:{}",
@@ -710,7 +716,7 @@ public class RaftServerImpl implements RaftServerProtocol,
       state.updateConfiguration(entries);
       state.updateStatemachine(leaderCommit, currentTerm);
     }
-    if (entries != null && entries.length > 0) {
+    if (entries.length > 0) {
       try {
         state.getLog().logSync();
       } catch (InterruptedException e) {
@@ -727,7 +733,7 @@ public class RaftServerImpl implements RaftServerProtocol,
       }
     }
     final AppendEntriesReplyProto reply = 
ServerProtoUtils.toAppendEntriesReplyProto(
-        leaderId, getId(), currentTerm, nextIndex, SUCCESS);
+        leaderId, getId(), groupId, currentTerm, nextIndex, SUCCESS);
     logAppendEntries(isHeartbeat,
         () -> getId() + ": succeeded to handle AppendEntries. Reply: "
             + ServerProtoUtils.toString(reply));
@@ -767,7 +773,7 @@ public class RaftServerImpl implements RaftServerProtocol,
       currentTerm = state.getCurrentTerm();
       if (!recognized) {
         final InstallSnapshotReplyProto reply = ServerProtoUtils
-            .toInstallSnapshotReplyProto(leaderId, getId(), currentTerm,
+            .toInstallSnapshotReplyProto(leaderId, getId(), groupId, 
currentTerm,
                 request.getRequestIndex(), InstallSnapshotResult.NOT_LEADER);
         LOG.debug("{}: do not recognize leader for installing snapshot." +
             " Reply: {}", getId(), reply);
@@ -804,14 +810,14 @@ public class RaftServerImpl implements RaftServerProtocol,
       LOG.info("{}: successfully install the whole snapshot-{}", getId(),
           lastIncludedIndex);
     }
-    return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getId(),
+    return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getId(), 
groupId,
         currentTerm, request.getRequestIndex(), InstallSnapshotResult.SUCCESS);
   }
 
   AppendEntriesRequestProto createAppendEntriesRequest(long leaderTerm,
       RaftPeerId targetId, TermIndex previous, List<LogEntryProto> entries,
       boolean initializing) {
-    return ServerProtoUtils.toAppendEntriesRequestProto(getId(), targetId,
+    return ServerProtoUtils.toAppendEntriesRequestProto(getId(), targetId, 
groupId,
         leaderTerm, entries, state.getLog().getLastCommittedIndex(),
         initializing, previous);
   }
@@ -822,15 +828,15 @@ public class RaftServerImpl implements RaftServerProtocol,
     OptionalLong totalSize = snapshot.getFiles().stream()
         .mapToLong(FileInfo::getFileSize).reduce(Long::sum);
     assert totalSize.isPresent();
-    return ServerProtoUtils.toInstallSnapshotRequestProto(getId(), targetId,
+    return ServerProtoUtils.toInstallSnapshotRequestProto(getId(), targetId, 
groupId,
         requestId, requestIndex, state.getCurrentTerm(), 
snapshot.getTermIndex(),
         chunks, totalSize.getAsLong(), done);
   }
 
   synchronized RequestVoteRequestProto createRequestVoteRequest(
       RaftPeerId targetId, long term, TermIndex lastEntry) {
-    return ServerProtoUtils.toRequestVoteRequestProto(getId(), targetId, term,
-        lastEntry);
+    return ServerProtoUtils.toRequestVoteRequestProto(getId(), targetId,
+        groupId, term, lastEntry);
   }
 
   public synchronized void submitLocalSyncEvent() {
@@ -857,13 +863,13 @@ public class RaftServerImpl implements RaftServerProtocol,
     stateMachineFuture.whenComplete((reply, exception) -> {
       final RaftClientReply r;
       if (exception == null) {
-        r = new RaftClientReply(clientId, serverId, callId, true, reply, null);
+        r = new RaftClientReply(clientId, serverId, groupId, callId, true, 
reply, null);
       } else {
         // the exception is coming from the state machine. wrap it into the
         // reply as a StateMachineException
         final StateMachineException e = new StateMachineException(
             getId().toString(), exception);
-        r = new RaftClientReply(clientId, serverId, callId, false, null, e);
+        r = new RaftClientReply(clientId, serverId, groupId, callId, false, 
null, e);
       }
       // update retry cache
       cacheEntry.updateResult(r);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 5afb737..366a74c 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -50,7 +50,7 @@ public class RaftServerProxy implements RaftServer {
   private final AtomicReference<ReinitializeRequest> reinitializeRequest = new 
AtomicReference<>();
 
   RaftServerProxy(RaftPeerId id, StateMachine stateMachine,
-                  RaftConfiguration raftConf, RaftProperties properties, 
Parameters parameters)
+      RaftGroup group, RaftProperties properties, Parameters parameters)
       throws IOException {
     this.id = id;
     this.properties = properties;
@@ -59,20 +59,20 @@ public class RaftServerProxy implements RaftServer {
     final RpcType rpcType = RaftConfigKeys.Rpc.type(properties);
     this.factory = ServerFactory.cast(rpcType.newFactory(properties, 
parameters));
 
-    this.impl = CompletableFuture.completedFuture(initImpl(raftConf));
-    this.serverRpc = initRaftServerRpc(factory, this, raftConf);
+    this.impl = CompletableFuture.completedFuture(initImpl(group));
+    this.serverRpc = initRaftServerRpc(factory, this, group);
   }
 
-  private RaftServerImpl initImpl(RaftConfiguration raftConf) throws 
IOException {
-    return new RaftServerImpl(id, this, raftConf, properties);
+  private RaftServerImpl initImpl(RaftGroup group) throws IOException {
+    return new RaftServerImpl(id, group, this, properties);
   }
 
   private static RaftServerRpc initRaftServerRpc(
-      ServerFactory factory, RaftServer server, RaftConfiguration raftConf) {
+      ServerFactory factory, RaftServer server, RaftGroup group) {
     final RaftServerRpc rpc = factory.newRaftServerRpc(server);
     // add peers into rpc service
-    if (raftConf != null) {
-      rpc.addPeers(raftConf.getPeers());
+    if (group != null) {
+      rpc.addPeers(group.getPeers());
     }
     return rpc;
   }
@@ -167,11 +167,9 @@ public class RaftServerProxy implements RaftServer {
         impl = new CompletableFuture<>();
         JavaUtils.getAndConsume(oldImpl, RaftServerImpl::shutdown);
 
-        final RaftConfiguration newConf = RaftConfiguration.newBuilder()
-            .setConf(request.getPeersInNewConf()).build();
         final RaftServerImpl newImpl;
         try {
-          newImpl = initImpl(newConf);
+          newImpl = initImpl(request.getPeersInGroup());
         } catch (IOException ioe) {
           final RaftException re = new RaftException(
               "Failed to reinitialize, request=" + request, ioe);
@@ -180,7 +178,7 @@ public class RaftServerProxy implements RaftServer {
           return new RaftClientReply(request, re);
         }
 
-        getServerRpc().addPeers(newConf.getPeers());
+        getServerRpc().addPeers(request.getPeersInGroup().getPeers());
         newImpl.start();
         impl.complete(newImpl);
         return new RaftClientReply(request, (Message) null);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
index 7934bbb..3c617f1 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
@@ -19,9 +19,8 @@ package org.apache.ratis.server.impl;
 
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.statemachine.StateMachine;
 
@@ -29,18 +28,10 @@ import java.io.IOException;
 
 /** Server utilities for internal use. */
 public class ServerImplUtils {
-  public static RaftServer newRaftServer(
-      RaftPeerId id, StateMachine stateMachine, Iterable<RaftPeer> peers,
-      RaftProperties properties, Parameters parameters) throws IOException {
-    return newRaftServer(id, stateMachine,
-        RaftConfiguration.newBuilder().setConf(peers).build(),
-        properties, parameters);
-  }
-
   public static RaftServerProxy newRaftServer(
-      RaftPeerId id, StateMachine stateMachine, RaftConfiguration conf,
+      RaftPeerId id, RaftGroup group, StateMachine stateMachine,
       RaftProperties properties, Parameters parameters) throws IOException {
-    return new RaftServerProxy(id, stateMachine, conf, properties, parameters);
+    return new RaftServerProxy(id, stateMachine, group, properties, 
parameters);
   }
 
   public static TermIndex newTermIndex(long term, long index) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index ffd4378..5b11599 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.shaded.proto.RaftProtos;
@@ -108,20 +109,20 @@ public class ServerProtoUtils {
   }
 
   public static RequestVoteReplyProto toRequestVoteReplyProto(
-      RaftPeerId requestorId, RaftPeerId replyId, boolean success, long term,
-      boolean shouldShutdown) {
+      RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId,
+      boolean success, long term, boolean shouldShutdown) {
     final RequestVoteReplyProto.Builder b = RequestVoteReplyProto.newBuilder();
     b.setServerReply(ClientProtoUtils.toRaftRpcReplyProtoBuilder(
-        requestorId.toBytes(), replyId.toBytes(), DEFAULT_CALLID, success))
+        requestorId.toBytes(), replyId.toBytes(), groupId.toBytes(), 
DEFAULT_CALLID, success))
         .setTerm(term)
         .setShouldShutdown(shouldShutdown);
     return b.build();
   }
 
   public static RequestVoteRequestProto toRequestVoteRequestProto(
-      RaftPeerId requestorId, RaftPeerId replyId, long term, TermIndex 
lastEntry) {
+      RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId, long 
term, TermIndex lastEntry) {
     RaftProtos.RaftRpcRequestProto.Builder rpb = ClientProtoUtils
-        .toRaftRpcRequestProtoBuilder(requestorId.toBytes(), 
replyId.toBytes(), DEFAULT_CALLID);
+        .toRaftRpcRequestProtoBuilder(requestorId.toBytes(), 
replyId.toBytes(), groupId.toBytes(), DEFAULT_CALLID);
     final RequestVoteRequestProto.Builder b = 
RequestVoteRequestProto.newBuilder()
         .setServerRequest(rpb)
         .setCandidateTerm(term);
@@ -132,10 +133,10 @@ public class ServerProtoUtils {
   }
 
   public static InstallSnapshotReplyProto toInstallSnapshotReplyProto(
-      RaftPeerId requestorId, RaftPeerId replyId, long term, int requestIndex,
-      InstallSnapshotResult result) {
+      RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId,
+      long term, int requestIndex, InstallSnapshotResult result) {
     final RaftRpcReplyProto.Builder rb = 
ClientProtoUtils.toRaftRpcReplyProtoBuilder(requestorId.toBytes(),
-        replyId.toBytes(), DEFAULT_CALLID, result == 
InstallSnapshotResult.SUCCESS);
+        replyId.toBytes(), groupId.toBytes(), DEFAULT_CALLID, result == 
InstallSnapshotResult.SUCCESS);
     final InstallSnapshotReplyProto.Builder builder = InstallSnapshotReplyProto
         .newBuilder().setServerReply(rb).setTerm(term).setResult(result)
         .setRequestIndex(requestIndex);
@@ -143,13 +144,13 @@ public class ServerProtoUtils {
   }
 
   public static InstallSnapshotRequestProto toInstallSnapshotRequestProto(
-      RaftPeerId requestorId, RaftPeerId replyId, String requestId, int 
requestIndex,
+      RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId, String 
requestId, int requestIndex,
       long term, TermIndex lastTermIndex, List<FileChunkProto> chunks,
       long totalSize, boolean done) {
     return InstallSnapshotRequestProto.newBuilder()
         .setServerRequest(
             
ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId.toBytes(),
-                replyId.toBytes(), DEFAULT_CALLID))
+                replyId.toBytes(), groupId.toBytes(), DEFAULT_CALLID))
         .setRequestId(requestId)
         .setRequestIndex(requestIndex)
         // .setRaftConfiguration()  TODO: save and pass RaftConfiguration
@@ -161,10 +162,10 @@ public class ServerProtoUtils {
   }
 
   public static AppendEntriesReplyProto toAppendEntriesReplyProto(
-      RaftPeerId requestorId, RaftPeerId replyId, long term,
+      RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId, long 
term,
       long nextIndex, AppendEntriesReplyProto.AppendResult appendResult) {
     RaftRpcReplyProto.Builder rb = 
ClientProtoUtils.toRaftRpcReplyProtoBuilder(requestorId.toBytes(),
-        replyId.toBytes(), DEFAULT_CALLID, appendResult == SUCCESS);
+        replyId.toBytes(), groupId.toBytes(), DEFAULT_CALLID, appendResult == 
SUCCESS);
     final AppendEntriesReplyProto.Builder b = 
AppendEntriesReplyProto.newBuilder();
     b.setServerReply(rb).setTerm(term).setNextIndex(nextIndex)
         .setResult(appendResult);
@@ -172,14 +173,14 @@ public class ServerProtoUtils {
   }
 
   public static AppendEntriesRequestProto toAppendEntriesRequestProto(
-      RaftPeerId requestorId, RaftPeerId replyId, long leaderTerm,
+      RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId, long 
leaderTerm,
       List<LogEntryProto> entries, long leaderCommit, boolean initializing,
       TermIndex previous) {
     final AppendEntriesRequestProto.Builder b = AppendEntriesRequestProto
         .newBuilder()
         .setServerRequest(
             
ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId.toBytes(),
-                replyId.toBytes(), DEFAULT_CALLID))
+                replyId.toBytes(), groupId.toBytes(), DEFAULT_CALLID))
         .setLeaderTerm(leaderTerm)
         .setLeaderCommit(leaderCommit)
         .setInitializing(initializing);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index d595691..53363cd 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -19,6 +19,7 @@ package org.apache.ratis.server.impl;
 
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.StateMachineException;
 import org.apache.ratis.server.RaftServerConfigKeys;
@@ -76,12 +77,14 @@ public class ServerState implements Closeable {
    */
   private TermIndex latestInstalledSnapshot;
 
-  ServerState(RaftPeerId id, RaftConfiguration conf, RaftProperties prop,
+  ServerState(RaftPeerId id, RaftGroup group, RaftProperties prop,
               RaftServerImpl server, StateMachine stateMachine)
       throws IOException {
     this.selfId = id;
     this.server = server;
-    configurationManager = new ConfigurationManager(conf);
+    RaftConfiguration initialConf = RaftConfiguration.newBuilder()
+        .setConf(group.getPeers()).build();
+    configurationManager = new ConfigurationManager(initialConf);
     storage = new RaftStorage(prop, RaftServerConstants.StartupOption.REGULAR);
     snapshotManager = new SnapshotManager(storage, id);
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java 
b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index 83fcf54..0ef7439 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -21,6 +21,8 @@ import org.apache.ratis.client.ClientFactory;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.rpc.RpcType;
@@ -75,9 +77,10 @@ public abstract class MiniRaftCluster {
       RaftTestUtil.setBlockRequestsFrom(src, block);
     }
 
-    public static int getPort(RaftPeerId id, RaftConfiguration conf) {
-      final RaftPeer peer = conf.getPeer(id);
-      final String address = peer != null? peer.getAddress(): null;
+    public static int getPort(RaftPeerId id, RaftGroup group) {
+      final List<RaftPeer> peers = group.getPeers().stream()
+          .filter(raftPeer -> 
raftPeer.getId().equals(id)).collect(Collectors.toList());
+      final String address = peers.isEmpty() ? null : 
peers.get(0).getAddress();
       final InetSocketAddress inetAddress = address != null?
           NetUtils.createSocketAddr(address): 
NetUtils.createLocalServerAddress();
       return inetAddress.getPort();
@@ -96,13 +99,12 @@ public abstract class MiniRaftCluster {
     }
   }
 
-  public static RaftConfiguration initConfiguration(Collection<String> ids) {
-    return RaftConfiguration.newBuilder()
-        .setConf(ids.stream()
-            .map(id -> RaftPeerId.valueOf(id))
-            .map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress()))
-            .collect(Collectors.toList()))
-        .build();
+  public static RaftGroup initRaftGroup(Collection<String> ids) {
+    List<RaftPeer> peers = ids.stream()
+        .map(RaftPeerId::valueOf)
+        .map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress()))
+        .collect(Collectors.toList());
+    return new RaftGroup(RaftGroupId.createId(), peers.toArray(new 
RaftPeer[peers.size()]));
   }
 
   private static String getBaseDirectory() {
@@ -125,14 +127,14 @@ public abstract class MiniRaftCluster {
   }
 
   protected final ClientFactory clientFactory;
-  protected RaftConfiguration conf;
+  protected RaftGroup group;
   protected final RaftProperties properties;
   protected final Parameters parameters;
   private final String testBaseDir;
   protected final Map<RaftPeerId, RaftServerProxy> servers = new 
ConcurrentHashMap<>();
 
   protected MiniRaftCluster(String[] ids, RaftProperties properties, 
Parameters parameters) {
-    this.conf = initConfiguration(Arrays.asList(ids));
+    this.group = initRaftGroup(Arrays.asList(ids));
     this.properties = new RaftProperties(properties);
     this.parameters = parameters;
 
@@ -146,17 +148,17 @@ public abstract class MiniRaftCluster {
 
   public MiniRaftCluster initServers() {
     if (servers.isEmpty()) {
-      putNewServers(CollectionUtils.as(conf.getPeers(), RaftPeer::getId), 
true);
+      putNewServers(CollectionUtils.as(group.getPeers(), RaftPeer::getId), 
true);
     }
     return this;
   }
 
   private RaftServerProxy putNewServer(RaftPeerId id, boolean format) {
-    return putNewServer(id, conf, format);
+    return putNewServer(id, group, format);
   }
 
-  public RaftServerProxy putNewServer(RaftPeerId id, RaftConfiguration 
raftConf, boolean format) {
-    final RaftServerProxy s = newRaftServer(id, raftConf, format);
+  public RaftServerProxy putNewServer(RaftPeerId id, RaftGroup group, boolean 
format) {
+    final RaftServerProxy s = newRaftServer(id, group, format);
     Preconditions.assertTrue(servers.put(id, s) == null);
     return s;
   }
@@ -197,11 +199,8 @@ public abstract class MiniRaftCluster {
     return 
RaftServerConfigKeys.Rpc.timeoutMax(properties).toInt(TimeUnit.MILLISECONDS);
   }
 
-  public RaftConfiguration getConf() {
-    return conf;
-  }
-
-  private RaftServerProxy newRaftServer(RaftPeerId id, RaftConfiguration 
raftConf, boolean format) {
+  private RaftServerProxy newRaftServer(RaftPeerId id, RaftGroup group,
+      boolean format) {
     try {
       final String dirStr = testBaseDir + id;
       if (format) {
@@ -210,14 +209,14 @@ public abstract class MiniRaftCluster {
       final RaftProperties prop = new RaftProperties(properties);
       RaftServerConfigKeys.setStorageDir(prop, dirStr);
       final StateMachine stateMachine = getStateMachine4Test(properties);
-      return newRaftServer(id, stateMachine, raftConf, prop);
+      return newRaftServer(id, stateMachine, group, prop);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
   }
 
   protected abstract RaftServerProxy newRaftServer(
-      RaftPeerId id, StateMachine stateMachine, RaftConfiguration conf,
+      RaftPeerId id, StateMachine stateMachine, RaftGroup group,
       RaftProperties properties) throws IOException;
 
   static StateMachine getStateMachine4Test(RaftProperties properties) {
@@ -259,9 +258,9 @@ public abstract class MiniRaftCluster {
 
     final Collection<RaftPeer> newPeers = toRaftPeers(newServers);
     final RaftPeer[] np = newPeers.toArray(new RaftPeer[newPeers.size()]);
-    newPeers.addAll(conf.getPeers());
-    conf = 
RaftConfiguration.newBuilder().setConf(newPeers).setLogEntryIndex(0).build();
+    newPeers.addAll(group.getPeers());
     RaftPeer[] p = newPeers.toArray(new RaftPeer[newPeers.size()]);
+    group = new RaftGroup(group.getGroupId(), p);
     return new PeerChanges(p, np, new RaftPeer[0]);
   }
 
@@ -280,7 +279,7 @@ public abstract class MiniRaftCluster {
    */
   public PeerChanges removePeers(int number, boolean removeLeader,
       Collection<RaftPeer> excluded) {
-    Collection<RaftPeer> peers = new ArrayList<>(conf.getPeers());
+    Collection<RaftPeer> peers = new ArrayList<>(group.getPeers());
     List<RaftPeer> removedPeers = new ArrayList<>(number);
     if (removeLeader) {
       final RaftPeer leader = toRaftPeer(getLeader());
@@ -298,8 +297,8 @@ public abstract class MiniRaftCluster {
         removed++;
       }
     }
-    conf = 
RaftConfiguration.newBuilder().setConf(peers).setLogEntryIndex(0).build();
     RaftPeer[] p = peers.toArray(new RaftPeer[peers.size()]);
+    group = new RaftGroup(group.getGroupId(), p);
     return new PeerChanges(p, new RaftPeer[0],
         removedPeers.toArray(new RaftPeer[removedPeers.size()]));
   }
@@ -393,13 +392,17 @@ public abstract class MiniRaftCluster {
     return toRaftPeers(getServers());
   }
 
+  public RaftGroup getGroup() {
+    return group;
+  }
+
   public RaftClient createClient(RaftPeerId leaderId) {
-    return createClient(leaderId, conf.getPeers());
+    return createClient(leaderId, group);
   }
 
-  public RaftClient createClient(RaftPeerId leaderId, Collection<RaftPeer> 
servers) {
+  public RaftClient createClient(RaftPeerId leaderId, RaftGroup group) {
     return RaftClient.newBuilder()
-        .setServers(servers)
+        .setRaftGroup(group)
         .setLeaderId(leaderId)
         .setClientRpc(clientFactory.newRaftClientRpc())
         .setProperties(properties)
@@ -451,4 +454,8 @@ public abstract class MiniRaftCluster {
 
   /** Block/unblock the requests sent from the given source. */
   public abstract void setBlockRequestsFrom(String src, boolean block);
+
+  public RaftGroupId getGroupId() {
+    return group.getGroupId();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java
 
b/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java
index 83c88f5..43c9fff 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java
@@ -104,7 +104,8 @@ public abstract class RaftNotLeaderExceptionBaseTest {
     for (int i = 0; reply == null && i < 10; i++) {
       try {
         reply = rpc.sendRequest(
-            new RaftClientRequest(ClientId.createId(), leaderId, 
DEFAULT_CALLID,
+            new RaftClientRequest(ClientId.createId(), leaderId,
+                cluster.getGroupId(), DEFAULT_CALLID,
                 new SimpleMessage("m2")));
       } catch (IOException ignored) {
         Thread.sleep(1000);
@@ -151,7 +152,8 @@ public abstract class RaftNotLeaderExceptionBaseTest {
     for (int i = 0; reply == null && i < 10; i++) {
       try {
         reply = rpc.sendRequest(
-            new RaftClientRequest(ClientId.createId(), leaderId, 
DEFAULT_CALLID,
+            new RaftClientRequest(ClientId.createId(), leaderId,
+                cluster.getGroupId(), DEFAULT_CALLID,
                 new SimpleMessage("m1")));
       } catch (IOException ignored) {
         Thread.sleep(1000);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-server/src/test/java/org/apache/ratis/RaftRetryCacheTests.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/RaftRetryCacheTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftRetryCacheTests.java
index a42d1bd..4317be1 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftRetryCacheTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftRetryCacheTests.java
@@ -88,7 +88,7 @@ public abstract class RaftRetryCacheTests {
     final RaftClientRpc rpc = client.getClientRpc();
     final long callId = 999;
     RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId,
-        callId, new RaftTestUtil.SimpleMessage("message"));
+        cluster.getGroupId(), callId, new 
RaftTestUtil.SimpleMessage("message"));
     RaftClientReply reply = rpc.sendRequest(r);
     Assert.assertEquals(callId, reply.getCallId());
     Assert.assertTrue(reply.isSuccess());
@@ -132,7 +132,7 @@ public abstract class RaftRetryCacheTests {
     RaftClientRpc rpc = client.getClientRpc();
     final long callId = 999;
     RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId,
-        callId, new RaftTestUtil.SimpleMessage("message"));
+        cluster.getGroupId(), callId, new 
RaftTestUtil.SimpleMessage("message"));
     RaftClientReply reply = rpc.sendRequest(r);
     Assert.assertEquals(callId, reply.getCallId());
     Assert.assertTrue(reply.isSuccess());
@@ -144,7 +144,8 @@ public abstract class RaftRetryCacheTests {
         asList(change.newPeers)).allPeersInNewConf;
     // trigger setConfiguration
     SetConfigurationRequest request = new SetConfigurationRequest(
-        client.getId(), cluster.getLeader().getId(), DEFAULT_CALLID, allPeers);
+        client.getId(), cluster.getLeader().getId(), cluster.getGroupId(),
+        DEFAULT_CALLID, allPeers);
     LOG.info("Start changing the configuration: {}", request);
     cluster.getLeader().setConfiguration(request);
 
@@ -152,7 +153,7 @@ public abstract class RaftRetryCacheTests {
     final RaftPeerId newLeaderId = cluster.getLeader().getId();
     Assert.assertNotEquals(leaderId, newLeaderId);
     // same clientId and callId in the request
-    r = new RaftClientRequest(client.getId(), newLeaderId,
+    r = new RaftClientRequest(client.getId(), newLeaderId, 
cluster.getGroupId(),
         callId, new RaftTestUtil.SimpleMessage("message"));
     for (int i = 0; i < 10; i++) {
       try {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/466fc2c3/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index 20e66e6..2a80b37 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -92,7 +92,7 @@ public abstract class RaftReconfigurationBaseTest {
 
       // trigger setConfiguration
       SetConfigurationRequest request = new SetConfigurationRequest(clientId,
-          cluster.getLeader().getId(), DEFAULT_CALLID, allPeers);
+          cluster.getLeader().getId(), cluster.getGroupId(), DEFAULT_CALLID, 
allPeers);
       LOG.info("Start changing the configuration: {}", request);
       cluster.getLeader().setConfiguration(request);
 
@@ -120,7 +120,7 @@ public abstract class RaftReconfigurationBaseTest {
 
       // trigger setConfiguration
       SetConfigurationRequest request = new SetConfigurationRequest(clientId,
-          cluster.getLeader().getId(), DEFAULT_CALLID, allPeers);
+          cluster.getLeader().getId(), cluster.getGroupId(), DEFAULT_CALLID, 
allPeers);
       LOG.info("Start changing the configuration: {}", request);
       cluster.getLeader().setConfiguration(request);
 
@@ -158,7 +158,7 @@ public abstract class RaftReconfigurationBaseTest {
 
       // trigger setConfiguration
       SetConfigurationRequest request = new SetConfigurationRequest(clientId,
-          cluster.getLeader().getId(), DEFAULT_CALLID, allPeers);
+          cluster.getLeader().getId(), cluster.getGroupId(), DEFAULT_CALLID, 
allPeers);
       LOG.info("Start changing the configuration: {}", request);
       cluster.getLeader().setConfiguration(request);
 
@@ -254,7 +254,7 @@ public abstract class RaftReconfigurationBaseTest {
 
       final RaftClientRpc sender = client.getClientRpc();
       final SetConfigurationRequest request = new SetConfigurationRequest(
-          client.getId(), leaderId, DEFAULT_CALLID, c1.allPeersInNewConf);
+          client.getId(), leaderId, cluster.getGroupId(), DEFAULT_CALLID, 
c1.allPeersInNewConf);
       try {
         sender.sendRequest(request);
         Assert.fail("did not get expected exception");
@@ -472,7 +472,7 @@ public abstract class RaftReconfigurationBaseTest {
           LOG.info("client2 starts to change conf");
           final RaftClientRpc sender2 = client2.getClientRpc();
           sender2.sendRequest(new SetConfigurationRequest(
-              client2.getId(), leaderId, DEFAULT_CALLID, peersInRequest2));
+              client2.getId(), leaderId, cluster.getGroupId(), DEFAULT_CALLID, 
peersInRequest2));
         } catch (ReconfigurationInProgressException e) {
           caughtException.set(true);
         } catch (Exception e) {
@@ -537,7 +537,7 @@ public abstract class RaftReconfigurationBaseTest {
           LOG.info("client starts to change conf");
           final RaftClientRpc sender = client.getClientRpc();
           RaftClientReply reply = sender.sendRequest(new 
SetConfigurationRequest(
-              client.getId(), leaderId, DEFAULT_CALLID, 
change.allPeersInNewConf));
+              client.getId(), leaderId, cluster.getGroupId(), DEFAULT_CALLID, 
change.allPeersInNewConf));
           if (reply.isNotLeader()) {
             gotNotLeader.set(true);
           }
@@ -601,7 +601,7 @@ public abstract class RaftReconfigurationBaseTest {
         final RaftClientRpc sender = client.getClientRpc();
 
         final RaftClientRequest request = new RaftClientRequest(client.getId(),
-            leaderId, 0, new SimpleMessage("test"));
+            leaderId, cluster.getGroupId(), 0, new SimpleMessage("test"));
         while (!success.get()) {
           try {
             RaftClientReply reply = sender.sendRequest(request);

Reply via email to