[ZEPPELIN-3610] Cluster Raft module design

### What is this PR for?

By using the Raft protocol, multiple Zeppelin-Server groups are built into a 
Zeppelin cluster, the cluster State Machine is maintained through the Raft 
protocol, and the services in the cluster are agreed upon. The Zeppelin-Server 
and Zeppelin-Interperter services and processes are stored in the Cluster 
MetaData. Metadata information;

### What type of PR is it?
[Feature]

### Todos
* [x] add raft algorithm atomix jar
* [x] add cluster state machine
* [x] add state machine query command
* [x] add state machine delete command
* [x] add state machine put command
* [x] Isolate the netty JAR package introduced by atomix

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3610

### How should this be tested?
[CI pass](https://travis-ci.org/liuxunorg/zeppelin/builds/418742522)

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? Yes

Author: liuxunorg <33611...@qq.com>
Author: Jeff Zhang <zjf...@gmail.com>

Closes #3183 from liuxunorg/ZEPPELIN-3610 and squashes the following commits:

c899b151f [liuxunorg] uncomment maven-dependency-plugin.
af728bd2c [liuxunorg] add more doc to explain this class's responsibility.
d268ee72e [Jeff Zhang] fix test (#2)
97f17acac [liuxunorg] Cluster Raft module design


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/f28e9631
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/f28e9631
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/f28e9631

Branch: refs/heads/master
Commit: f28e9631737da554a0a12323f6c00ccae3ec6282
Parents: 0c374c0
Author: liuxunorg <33611...@qq.com>
Authored: Thu Sep 27 11:46:50 2018 +0800
Committer: Jeff Zhang <zjf...@apache.org>
Committed: Fri Sep 28 08:50:35 2018 +0800

----------------------------------------------------------------------
 angular/pom.xml                                 |  17 +
 bin/interpreter.sh                              |   5 +-
 livy/pom.xml                                    |   7 -
 markdown/pom.xml                                |   5 +
 python/pom.xml                                  |  28 +-
 .../zeppelin/python/IPythonInterpreterTest.java |   2 +-
 python/src/test/resources/log4j.properties      |   2 +-
 shell/pom.xml                                   |   5 +
 spark/interpreter/pom.xml                       |   5 +
 zeppelin-integration/pom.xml                    |   2 +-
 zeppelin-interpreter/pom.xml                    |  53 ++
 .../cluster/BroadcastServiceAdapter.java        |  44 ++
 .../apache/zeppelin/cluster/ClusterManager.java | 520 ++++++++++++++++++
 .../zeppelin/cluster/ClusterPrimitiveType.java  |  57 ++
 .../zeppelin/cluster/ClusterStateMachine.java   | 179 +++++++
 .../ZeppelinClusterMembershipEventListener.java |  50 ++
 .../zeppelin/cluster/meta/ClusterMeta.java      | 144 +++++
 .../cluster/meta/ClusterMetaEntity.java         |  57 ++
 .../cluster/meta/ClusterMetaOperation.java      |  26 +
 .../zeppelin/cluster/meta/ClusterMetaType.java  |  25 +
 .../protocol/LocalRaftClientProtocol.java       | 163 ++++++
 .../cluster/protocol/LocalRaftProtocol.java     |  58 ++
 .../protocol/LocalRaftProtocolFactory.java      |  57 ++
 .../protocol/LocalRaftServerProtocol.java       | 527 +++++++++++++++++++
 .../protocol/RaftClientMessagingProtocol.java   | 123 +++++
 .../cluster/protocol/RaftMessagingProtocol.java |  83 +++
 .../protocol/RaftServerMessagingProtocol.java   | 346 ++++++++++++
 .../zeppelin/conf/ZeppelinConfiguration.java    |  29 +
 zeppelin-zengine/pom.xml                        | 198 ++++---
 .../mock/MockInterpreterResourcePool.java       |   9 +-
 .../test/resources/commons-logging-1.1.1.jar    | Bin 0 -> 60686 bytes
 .../src/test/resources/gson-2.2.jar             | Bin 0 -> 189612 bytes
 .../src/test/resources/log4j-1.2.17.jar         | Bin 0 -> 489884 bytes
 .../src/test/resources/log4j.properties         |   2 +-
 .../src/test/resources/slf4j-api-1.7.10.jar     | Bin 0 -> 32119 bytes
 .../src/test/resources/slf4j-log4j12-1.7.10.jar | Bin 0 -> 8866 bytes
 36 files changed, 2706 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/angular/pom.xml
----------------------------------------------------------------------
diff --git a/angular/pom.xml b/angular/pom.xml
index df4e83c..021139d 100644
--- a/angular/pom.xml
+++ b/angular/pom.xml
@@ -54,6 +54,23 @@
     </dependency>
     
     <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <version>3.4</version>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+      <version>2.4</version>
+    </dependency>
+
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/bin/interpreter.sh
----------------------------------------------------------------------
diff --git a/bin/interpreter.sh b/bin/interpreter.sh
index c895018..4606ed3 100755
--- a/bin/interpreter.sh
+++ b/bin/interpreter.sh
@@ -79,9 +79,12 @@ fi
 # add test classes for unittest
 if [[ -d "${ZEPPELIN_HOME}/zeppelin-zengine/target/test-classes" ]]; then
   
ZEPPELIN_INTP_CLASSPATH+=":${ZEPPELIN_HOME}/zeppelin-zengine/target/test-classes"
+  if [[ -n "${ZEPPELIN_ZENGINE_TEST}" ]]; then
+    addJarInDirForIntp "${ZEPPELIN_HOME}/zeppelin-zengine/target/test-classes"
+  fi
 fi
 
-addJarInDirForIntp "${ZEPPELIN_HOME}/zeppelin-interpreter/target/lib"
+addJarInDirForIntp "${ZEPPELIN_HOME}/zeppelin-interpreter-api/target"
 addJarInDirForIntp "${ZEPPELIN_HOME}/lib/interpreter"
 addJarInDirForIntp "${INTERPRETER_DIR}"
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/livy/pom.xml
----------------------------------------------------------------------
diff --git a/livy/pom.xml b/livy/pom.xml
index 3fc58c4..5e648d9 100644
--- a/livy/pom.xml
+++ b/livy/pom.xml
@@ -58,13 +58,6 @@
         </dependency>
 
         <dependency>
-            <groupId>${project.groupId}</groupId>
-            <artifactId>zeppelin-interpreter</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
             <groupId>org.apache.livy</groupId>
             <artifactId>livy-integration-test</artifactId>
             <version>${livy.version}</version>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/markdown/pom.xml
----------------------------------------------------------------------
diff --git a/markdown/pom.xml b/markdown/pom.xml
index d5fe9a3..55535a6 100644
--- a/markdown/pom.xml
+++ b/markdown/pom.xml
@@ -81,6 +81,11 @@
     </dependency>
 
     <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/python/pom.xml
----------------------------------------------------------------------
diff --git a/python/pom.xml b/python/pom.xml
index 5c7059d..8fed0c9 100644
--- a/python/pom.xml
+++ b/python/pom.xml
@@ -35,18 +35,12 @@
   <properties>
     <interpreter.name>python</interpreter.name>
     <python.py4j.version>0.10.7</python.py4j.version>
-    <grpc.version>1.4.0</grpc.version>
+    <grpc.version>1.15.0</grpc.version>
     <interpreter.jar.name>python-interpreter-with-py4j</interpreter.jar.name>
   </properties>
 
   <dependencies>
     <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>zeppelin-interpreter</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-
-    <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-exec</artifactId>
       <version>1.3</version>
@@ -69,6 +63,26 @@
     </dependency>
 
     <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-lang</groupId>
+      <artifactId>commons-lang</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-codec</groupId>
+      <artifactId>commons-codec</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>io.grpc</groupId>
       <artifactId>grpc-netty</artifactId>
       <version>${grpc.version}</version>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
----------------------------------------------------------------------
diff --git 
a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java 
b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
index 4b6bfdb..28e6270 100644
--- 
a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
+++ 
b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
@@ -214,7 +214,7 @@ public class IPythonInterpreterTest extends 
BasePythonInterpreterTest {
     List<InterpreterResultMessage> interpreterResultMessages =
         context.out.toInterpreterResultMessage();
     assertEquals(1, interpreterResultMessages.size());
-    assertTrue(interpreterResultMessages.get(0).getData().contains("exceeds 
maximum: 3000"));
+    assertTrue(interpreterResultMessages.get(0).getData().contains("exceeds 
maximum size 3000"));
 
     // next call continue work
     result = interpreter.interpret("print(1)", context);

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/python/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/python/src/test/resources/log4j.properties 
b/python/src/test/resources/log4j.properties
index 8993ff2..2933a64 100644
--- a/python/src/test/resources/log4j.properties
+++ b/python/src/test/resources/log4j.properties
@@ -24,4 +24,4 @@ log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n
 
 
-log4j.logger.org.apache.zeppelin.python=DEBUG
+log4j.logger.org.apache.zeppelin.python=INFO

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/shell/pom.xml
----------------------------------------------------------------------
diff --git a/shell/pom.xml b/shell/pom.xml
index c702b9f..98b8a83 100644
--- a/shell/pom.xml
+++ b/shell/pom.xml
@@ -59,6 +59,11 @@
     </dependency>
 
     <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+    </dependency>
+    
+    <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-exec</artifactId>
       <version>${commons.exec.version}</version>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/spark/interpreter/pom.xml
----------------------------------------------------------------------
diff --git a/spark/interpreter/pom.xml b/spark/interpreter/pom.xml
index a7dbce6..daf801f 100644
--- a/spark/interpreter/pom.xml
+++ b/spark/interpreter/pom.xml
@@ -293,6 +293,11 @@
     </dependency>
 
     <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>org.scala-lang</groupId>
       <artifactId>scala-library</artifactId>
       <version>${scala.version}</version>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-integration/pom.xml
----------------------------------------------------------------------
diff --git a/zeppelin-integration/pom.xml b/zeppelin-integration/pom.xml
index 94ed3a2..25463b3 100644
--- a/zeppelin-integration/pom.xml
+++ b/zeppelin-integration/pom.xml
@@ -44,7 +44,7 @@
 
     <!--test library versions-->
     <selenium.java.version>3.8.1</selenium.java.version>
-    <commons.lang3.version>3.4</commons.lang3.version>
+    <commons.lang3.version>3.7</commons.lang3.version>
 
     <!--plugin library versions-->
     <plugin.failsafe.version>2.16</plugin.failsafe.version>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-interpreter/pom.xml
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/pom.xml b/zeppelin-interpreter/pom.xml
index 38259c7..22a029e 100644
--- a/zeppelin-interpreter/pom.xml
+++ b/zeppelin-interpreter/pom.xml
@@ -44,12 +44,65 @@
     <maven.aeither.provider.version>3.0.3</maven.aeither.provider.version>
     <wagon.version>1.0</wagon.version>
     <jline.version>2.12.1</jline.version>
+    <atomix.version>3.0.0-rc4</atomix.version>
+    <commons-math3.version>3.1.1</commons-math3.version>
+    <guava.version>20.0</guava.version>
+    <commons-lang3.version>3.7</commons-lang3.version>
 
     <!--plugin versions-->
     <plugin.shade.version>2.3</plugin.shade.version>
   </properties>
 
   <dependencies>
+    <dependency>
+      <groupId>io.atomix</groupId>
+      <artifactId>atomix</artifactId>
+      <version>${atomix.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.commons</groupId>
+          <artifactId>commons-math3</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.commons</groupId>
+          <artifactId>commons-lang3</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>io.atomix</groupId>
+      <artifactId>atomix-raft</artifactId>
+      <version>${atomix.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>io.atomix</groupId>
+      <artifactId>atomix-primary-backup</artifactId>
+      <version>${atomix.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-math3</artifactId>
+      <version>${commons-math3.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <version>${commons-lang3.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>${guava.version}</version>
+    </dependency>
 
     <dependency>
       <groupId>org.apache.thrift</groupId>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/BroadcastServiceAdapter.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/BroadcastServiceAdapter.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/BroadcastServiceAdapter.java
new file mode 100644
index 0000000..34e3b6f
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/BroadcastServiceAdapter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster;
+
+import io.atomix.cluster.messaging.BroadcastService;
+
+import java.util.function.Consumer;
+
+/**
+ * Broadcast Service Adapter
+ * Service for broadcast messaging between nodes.
+ * The broadcast service is an unreliable broadcast messaging service backed 
by multicast.
+ * This service provides no guaranteed regarding reliability or order of 
messages.
+ */
+public class BroadcastServiceAdapter implements BroadcastService {
+  @Override
+  public void broadcast(String subject, byte[] message) {
+
+  }
+
+  @Override
+  public void addListener(String subject, Consumer<byte[]> listener) {
+
+  }
+
+  @Override
+  public void removeListener(String subject, Consumer<byte[]> listener) {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java
new file mode 100644
index 0000000..683f068
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster;
+
+import com.google.common.collect.Maps;
+import io.atomix.cluster.MemberId;
+import io.atomix.cluster.Node;
+import io.atomix.cluster.messaging.MessagingService;
+import io.atomix.cluster.messaging.impl.NettyMessagingService;
+import io.atomix.primitive.operation.OperationType;
+import io.atomix.primitive.operation.PrimitiveOperation;
+import io.atomix.primitive.operation.impl.DefaultOperationId;
+import io.atomix.primitive.partition.PartitionId;
+import io.atomix.primitive.service.ServiceConfig;
+import io.atomix.primitive.session.SessionClient;
+import io.atomix.primitive.session.SessionId;
+import io.atomix.protocols.raft.RaftClient;
+import io.atomix.protocols.raft.RaftError;
+import io.atomix.protocols.raft.ReadConsistency;
+import io.atomix.protocols.raft.cluster.RaftMember;
+import io.atomix.protocols.raft.cluster.impl.DefaultRaftMember;
+import io.atomix.protocols.raft.protocol.CloseSessionRequest;
+import io.atomix.protocols.raft.protocol.CloseSessionResponse;
+import io.atomix.protocols.raft.protocol.KeepAliveRequest;
+import io.atomix.protocols.raft.protocol.KeepAliveResponse;
+import io.atomix.protocols.raft.protocol.QueryRequest;
+import io.atomix.protocols.raft.protocol.QueryResponse;
+import io.atomix.protocols.raft.protocol.CommandRequest;
+import io.atomix.protocols.raft.protocol.CommandResponse;
+import io.atomix.protocols.raft.protocol.MetadataRequest;
+import io.atomix.protocols.raft.protocol.MetadataResponse;
+import io.atomix.protocols.raft.protocol.JoinRequest;
+import io.atomix.protocols.raft.protocol.JoinResponse;
+import io.atomix.protocols.raft.protocol.LeaveRequest;
+import io.atomix.protocols.raft.protocol.LeaveResponse;
+import io.atomix.protocols.raft.protocol.ConfigureRequest;
+import io.atomix.protocols.raft.protocol.ConfigureResponse;
+import io.atomix.protocols.raft.protocol.ReconfigureRequest;
+import io.atomix.protocols.raft.protocol.ReconfigureResponse;
+import io.atomix.protocols.raft.protocol.InstallRequest;
+import io.atomix.protocols.raft.protocol.InstallResponse;
+import io.atomix.protocols.raft.protocol.PollRequest;
+import io.atomix.protocols.raft.protocol.PollResponse;
+import io.atomix.protocols.raft.protocol.VoteRequest;
+import io.atomix.protocols.raft.protocol.VoteResponse;
+import io.atomix.protocols.raft.protocol.AppendRequest;
+import io.atomix.protocols.raft.protocol.AppendResponse;
+import io.atomix.protocols.raft.protocol.PublishRequest;
+import io.atomix.protocols.raft.protocol.ResetRequest;
+import io.atomix.protocols.raft.protocol.RaftResponse;
+import io.atomix.protocols.raft.storage.log.entry.CloseSessionEntry;
+import io.atomix.protocols.raft.storage.log.entry.CommandEntry;
+import io.atomix.protocols.raft.storage.log.entry.ConfigurationEntry;
+import io.atomix.protocols.raft.storage.log.entry.InitializeEntry;
+import io.atomix.protocols.raft.storage.log.entry.KeepAliveEntry;
+import io.atomix.protocols.raft.storage.log.entry.MetadataEntry;
+import io.atomix.protocols.raft.storage.log.entry.OpenSessionEntry;
+import io.atomix.protocols.raft.storage.log.entry.QueryEntry;
+import io.atomix.protocols.raft.protocol.OpenSessionRequest;
+import io.atomix.protocols.raft.protocol.OpenSessionResponse;
+import io.atomix.protocols.raft.protocol.RaftClientProtocol;
+import io.atomix.protocols.raft.session.CommunicationStrategy;
+import io.atomix.protocols.raft.storage.system.Configuration;
+import io.atomix.utils.net.Address;
+import io.atomix.utils.serializer.Namespace;
+import io.atomix.utils.serializer.Serializer;
+import org.apache.commons.lang.StringUtils;
+import org.apache.zeppelin.cluster.meta.ClusterMeta;
+import org.apache.zeppelin.cluster.meta.ClusterMetaEntity;
+import org.apache.zeppelin.cluster.meta.ClusterMetaOperation;
+import org.apache.zeppelin.cluster.meta.ClusterMetaType;
+import org.apache.zeppelin.cluster.protocol.LocalRaftProtocolFactory;
+import org.apache.zeppelin.cluster.protocol.RaftClientMessagingProtocol;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.time.Instant;
+
+import java.util.Date;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+import static io.atomix.primitive.operation.PrimitiveOperation.operation;
+import static 
org.apache.zeppelin.cluster.meta.ClusterMetaOperation.DELETE_OPERATION;
+import static 
org.apache.zeppelin.cluster.meta.ClusterMetaOperation.PUT_OPERATION;
+import static 
org.apache.zeppelin.cluster.meta.ClusterMetaOperation.GET_OPERATION;
+
+/**
+ * The base class for cluster management, including the following 
implementations
+ * 1. RaftClient as the raft client
+ * 2. Threading to provide retry after cluster metadata submission failure
+ * 3. Cluster monitoring
+ */
+public abstract class ClusterManager {
+  private static Logger LOGGER = LoggerFactory.getLogger(ClusterManager.class);
+
+  public final ZeppelinConfiguration zconf = ZeppelinConfiguration.create();
+
+  protected Collection<Node> clusterNodes = new ArrayList<>();
+
+  // raft
+  protected static String ZEPL_CLUSTER_ID = "ZEPL-CLUSTER";
+  protected static String ZEPL_CLIENT_ID = "ZEPL-CLIENT";
+
+  protected int raftServerPort = 0;
+
+  protected RaftClient raftClient = null;
+  protected SessionClient raftSessionClient = null;
+  protected Map<MemberId, Address> raftAddressMap = new ConcurrentHashMap<>();
+  protected LocalRaftProtocolFactory protocolFactory
+      = new LocalRaftProtocolFactory(protocolSerializer);
+  protected List<MessagingService> messagingServices = new ArrayList<>();
+  protected List<MemberId> clusterMemberIds = new ArrayList<MemberId>();
+
+  protected AtomicBoolean running = new AtomicBoolean(true);
+
+  // Write data through the queue to prevent failure due to network exceptions
+  private ConcurrentLinkedQueue<ClusterMetaEntity> clusterMetaQueue
+      = new ConcurrentLinkedQueue<>();
+
+  // zeppelin server host & port
+  protected String zeplServerHost = "";
+
+  public ClusterManager() {
+    try {
+      zeplServerHost = RemoteInterpreterUtils.findAvailableHostAddress();
+      String clusterAddr = zconf.getClusterAddress();
+      if (!StringUtils.isEmpty(clusterAddr)) {
+        String cluster[] = clusterAddr.split(",");
+
+        for (int i = 0; i < cluster.length; i++) {
+          String[] parts = cluster[i].split(":");
+          String clusterHost = parts[0];
+          int clusterPort = Integer.valueOf(parts[1]);
+          if (zeplServerHost.equalsIgnoreCase(clusterHost)) {
+            raftServerPort = clusterPort;
+          }
+
+          Node node = Node.builder().withId(cluster[i])
+              .withAddress(Address.from(clusterHost, clusterPort)).build();
+          clusterNodes.add(node);
+          raftAddressMap.put(MemberId.from(cluster[i]), 
Address.from(clusterHost, clusterPort));
+          clusterMemberIds.add(MemberId.from(cluster[i]));
+        }
+      }
+    } catch (UnknownHostException e) {
+      LOGGER.error(e.getMessage());
+    } catch (SocketException e) {
+      LOGGER.error(e.getMessage());
+    }
+
+  }
+
+  // Check if the raft environment is initialized
+  public abstract boolean raftInitialized();
+  // Is it a cluster leader
+  public abstract boolean isClusterLeader();
+
+  public AtomicBoolean getRunning() {
+    return running;
+  }
+
+  private SessionClient createProxy(RaftClient client) {
+    return client.sessionBuilder(ClusterPrimitiveType.PRIMITIVE_NAME,
+        ClusterPrimitiveType.INSTANCE, new ServiceConfig())
+        .withReadConsistency(ReadConsistency.SEQUENTIAL)
+        .withCommunicationStrategy(CommunicationStrategy.LEADER)
+        .build()
+        .connect()
+        .join();
+  }
+
+  public void start() {
+    if (!zconf.isClusterMode()) {
+      return;
+    }
+
+    // RaftClient Thread
+    new Thread(new Runnable() {
+      @Override
+      public void run() {
+        LOGGER.info("RaftClientThread run() >>>");
+
+        int raftClientPort = 0;
+        try {
+          raftClientPort = 
RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
+        } catch (IOException e) {
+          LOGGER.error(e.getMessage());
+        }
+
+        MemberId memberId = MemberId.from(ZEPL_CLIENT_ID + zeplServerHost + 
":" + raftClientPort);
+        Address address = Address.from(zeplServerHost, raftClientPort);
+        raftAddressMap.put(memberId, address);
+
+        MessagingService messagingManager
+            = 
NettyMessagingService.builder().withAddress(address).build().start().join();
+        RaftClientProtocol protocol = new RaftClientMessagingProtocol(
+            messagingManager, protocolSerializer, raftAddressMap::get);
+
+        raftClient = RaftClient.builder()
+            .withMemberId(memberId)
+            .withPartitionId(PartitionId.from("partition", 1))
+            .withProtocol(protocol)
+            .build();
+
+        raftClient.connect(clusterMemberIds).join();
+
+        raftSessionClient = createProxy(raftClient);
+
+        LOGGER.info("RaftClientThread run() <<<");
+      }
+    }).start();
+
+    // Cluster Meta Consume Thread
+    new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          while (getRunning().get()) {
+            ClusterMetaEntity metaEntity = clusterMetaQueue.peek();
+            if (null != metaEntity) {
+              // Determine whether the client is connected
+              int retry = 0;
+              while (!raftInitialized()) {
+                retry++;
+                if (0 == retry % 30) {
+                  LOGGER.error("Raft incomplete initialization! retry[{}]", 
retry);
+                }
+                Thread.sleep(100);
+              }
+              boolean success = false;
+              switch (metaEntity.getOperation()) {
+                case DELETE_OPERATION:
+                  success = deleteClusterMeta(metaEntity);
+                  break;
+                case PUT_OPERATION:
+                  success = putClusterMeta(metaEntity);
+                  break;
+              }
+              if (true == success) {
+                // The operation was successfully deleted
+                clusterMetaQueue.remove(metaEntity);
+              } else {
+                LOGGER.error("Cluster Meta Consume faild!");
+              }
+            } else {
+              Thread.sleep(100);
+            }
+          }
+        } catch (InterruptedException e) {
+          LOGGER.error(e.getMessage());
+        }
+      }
+    }).start();
+  }
+
+  // cluster shutdown
+  public void shutdown() {
+    if (!zconf.isClusterMode()) {
+      return;
+    }
+
+    running.set(false);
+
+    try {
+      if (null != raftSessionClient) {
+        raftSessionClient.close().get(3, TimeUnit.SECONDS);
+      }
+      if (null != raftClient) {
+        raftClient.close().get(3, TimeUnit.SECONDS);
+      }
+    } catch (InterruptedException e) {
+      LOGGER.error(e.getMessage());
+    } catch (ExecutionException e) {
+      LOGGER.error(e.getMessage());
+    } catch (TimeoutException e) {
+      LOGGER.error(e.getMessage());
+    }
+  }
+
+  public String getClusterName() {
+    return zeplServerHost + ":" + raftServerPort;
+  }
+
+  // put metadata into cluster metadata
+  private boolean putClusterMeta(ClusterMetaEntity entity) {
+    if (!raftInitialized()) {
+      LOGGER.error("Raft incomplete initialization!");
+      return false;
+    }
+
+    ClusterMetaType metaType = entity.getMetaType();
+    String metaKey = entity.getKey();
+    HashMap<String, Object> newMetaValue = entity.getValues();
+
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("putClusterMeta {} {}", metaType, metaKey);
+    }
+
+    // add cluster name
+    newMetaValue.put(ClusterMeta.SERVER_HOST, zeplServerHost);
+    newMetaValue.put(ClusterMeta.SERVER_PORT, raftServerPort);
+
+    raftSessionClient.execute(operation(ClusterStateMachine.PUT,
+        clientSerializer.encode(entity)))
+        .<Long>thenApply(clientSerializer::decode);
+    return true;
+  }
+
+  // put metadata into cluster metadata
+  public void putClusterMeta(ClusterMetaType type, String key, HashMap<String, 
Object> values) {
+    ClusterMetaEntity metaEntity = new ClusterMetaEntity(PUT_OPERATION, type, 
key, values);
+
+    boolean result = putClusterMeta(metaEntity);
+    if (false == result) {
+      LOGGER.warn("putClusterMeta failure, Cache metadata to queue.");
+      clusterMetaQueue.add(metaEntity);
+    }
+  }
+
+  // delete metadata by cluster metadata
+  private boolean deleteClusterMeta(ClusterMetaEntity entity) {
+    ClusterMetaType metaType = entity.getMetaType();
+    String metaKey = entity.getKey();
+
+    // Need to pay attention to delete metadata operations
+    LOGGER.info("deleteClusterMeta {} {}", metaType, metaKey);
+
+    if (!raftInitialized()) {
+      LOGGER.error("Raft incomplete initialization!");
+      return false;
+    }
+
+    raftSessionClient.execute(operation(
+        ClusterStateMachine.REMOVE,
+        clientSerializer.encode(entity)))
+        .<Long>thenApply(clientSerializer::decode)
+        .thenAccept(result -> {
+          LOGGER.info("deleteClusterMeta {}", result);
+        });
+
+    return true;
+  }
+
+  // delete metadata from cluster metadata
+  public void deleteClusterMeta(ClusterMetaType type, String key) {
+    ClusterMetaEntity metaEntity = new ClusterMetaEntity(DELETE_OPERATION, 
type, key, null);
+
+    boolean result = deleteClusterMeta(metaEntity);
+    if (false == result) {
+      LOGGER.warn("deleteClusterMeta faild, Cache data to queue.");
+      clusterMetaQueue.add(metaEntity);
+    }
+  }
+
+  // get metadata by cluster metadata
+  public HashMap<String, HashMap<String, Object>> getClusterMeta(
+      ClusterMetaType metaType, String metaKey) {
+    HashMap<String, HashMap<String, Object>> clusterMeta = new HashMap<>();
+    if (!raftInitialized()) {
+      LOGGER.error("Raft incomplete initialization!");
+      return clusterMeta;
+    }
+
+    ClusterMetaEntity entity = new ClusterMetaEntity(GET_OPERATION, metaType, 
metaKey, null);
+
+    byte[] mateData = null;
+    try {
+      mateData = raftSessionClient.execute(operation(ClusterStateMachine.GET,
+          clientSerializer.encode(entity))).get(3, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOGGER.error(e.getMessage());
+    } catch (ExecutionException e) {
+      LOGGER.error(e.getMessage());
+    } catch (TimeoutException e) {
+      LOGGER.error(e.getMessage());
+    }
+
+    if (null != mateData) {
+      clusterMeta = clientSerializer.decode(mateData);
+    }
+
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("getClusterMeta >>> {}", clusterMeta.toString());
+    }
+
+    return clusterMeta;
+  }
+
+  protected static final Serializer protocolSerializer = 
Serializer.using(Namespace.builder()
+      .register(OpenSessionRequest.class)
+      .register(OpenSessionResponse.class)
+      .register(CloseSessionRequest.class)
+      .register(CloseSessionResponse.class)
+      .register(KeepAliveRequest.class)
+      .register(KeepAliveResponse.class)
+      .register(QueryRequest.class)
+      .register(QueryResponse.class)
+      .register(CommandRequest.class)
+      .register(CommandResponse.class)
+      .register(MetadataRequest.class)
+      .register(MetadataResponse.class)
+      .register(JoinRequest.class)
+      .register(JoinResponse.class)
+      .register(LeaveRequest.class)
+      .register(LeaveResponse.class)
+      .register(ConfigureRequest.class)
+      .register(ConfigureResponse.class)
+      .register(ReconfigureRequest.class)
+      .register(ReconfigureResponse.class)
+      .register(InstallRequest.class)
+      .register(InstallResponse.class)
+      .register(PollRequest.class)
+      .register(PollResponse.class)
+      .register(VoteRequest.class)
+      .register(VoteResponse.class)
+      .register(AppendRequest.class)
+      .register(AppendResponse.class)
+      .register(PublishRequest.class)
+      .register(ResetRequest.class)
+      .register(RaftResponse.Status.class)
+      .register(RaftError.class)
+      .register(RaftError.Type.class)
+      .register(PrimitiveOperation.class)
+      .register(ReadConsistency.class)
+      .register(byte[].class)
+      .register(long[].class)
+      .register(CloseSessionEntry.class)
+      .register(CommandEntry.class)
+      .register(ConfigurationEntry.class)
+      .register(InitializeEntry.class)
+      .register(KeepAliveEntry.class)
+      .register(MetadataEntry.class)
+      .register(OpenSessionEntry.class)
+      .register(QueryEntry.class)
+      .register(PrimitiveOperation.class)
+      .register(DefaultOperationId.class)
+      .register(OperationType.class)
+      .register(ReadConsistency.class)
+      .register(ArrayList.class)
+      .register(HashMap.class)
+      .register(ClusterMetaEntity.class)
+      .register(Date.class)
+      .register(Collections.emptyList().getClass())
+      .register(HashSet.class)
+      .register(DefaultRaftMember.class)
+      .register(MemberId.class)
+      .register(SessionId.class)
+      .register(RaftMember.Type.class)
+      .register(Instant.class)
+      .register(Configuration.class)
+      .build());
+
+  protected static final Serializer storageSerializer = 
Serializer.using(Namespace.builder()
+      .register(CloseSessionEntry.class)
+      .register(CommandEntry.class)
+      .register(ConfigurationEntry.class)
+      .register(InitializeEntry.class)
+      .register(KeepAliveEntry.class)
+      .register(MetadataEntry.class)
+      .register(OpenSessionEntry.class)
+      .register(QueryEntry.class)
+      .register(PrimitiveOperation.class)
+      .register(DefaultOperationId.class)
+      .register(OperationType.class)
+      .register(ReadConsistency.class)
+      .register(ArrayList.class)
+      .register(ClusterMetaEntity.class)
+      .register(HashMap.class)
+      .register(HashSet.class)
+      .register(Date.class)
+      .register(DefaultRaftMember.class)
+      .register(MemberId.class)
+      .register(RaftMember.Type.class)
+      .register(Instant.class)
+      .register(Configuration.class)
+      .register(byte[].class)
+      .register(long[].class)
+      .build());
+
+  protected static final Serializer clientSerializer = 
Serializer.using(Namespace.builder()
+      .register(ReadConsistency.class)
+      .register(ClusterMetaEntity.class)
+      .register(ClusterMetaOperation.class)
+      .register(ClusterMetaType.class)
+      .register(HashMap.class)
+      .register(Date.class)
+      .register(Maps.immutableEntry(new String(), new Object()).getClass())
+      .build());
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterPrimitiveType.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterPrimitiveType.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterPrimitiveType.java
new file mode 100644
index 0000000..b4802a0
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterPrimitiveType.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster;
+
+import io.atomix.primitive.PrimitiveBuilder;
+import io.atomix.primitive.PrimitiveManagementService;
+import io.atomix.primitive.PrimitiveType;
+import io.atomix.primitive.config.PrimitiveConfig;
+import io.atomix.primitive.service.PrimitiveService;
+import io.atomix.primitive.service.ServiceConfig;
+
+/**
+ * Cluster primitive type
+ * Creating a custom distributed primitive is defining the primitive type.
+ * To create a new type, implement the PrimitiveType interface
+ */
+public class ClusterPrimitiveType implements PrimitiveType {
+  public static final ClusterPrimitiveType INSTANCE = new 
ClusterPrimitiveType();
+
+  public static final String PRIMITIVE_NAME = "CLUSTER_PRIMITIVE";
+
+  @Override
+  public String name() {
+    return PRIMITIVE_NAME;
+  }
+
+  @Override
+  public PrimitiveConfig newConfig() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public PrimitiveBuilder newBuilder(String primitiveName,
+                                     PrimitiveConfig config,
+                                     PrimitiveManagementService 
managementService) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public PrimitiveService newService(ServiceConfig config) {
+    return new ClusterStateMachine();
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java
new file mode 100644
index 0000000..460f6ac
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster;
+
+import com.google.common.collect.Maps;
+import io.atomix.primitive.operation.OperationId;
+import io.atomix.primitive.service.AbstractPrimitiveService;
+import io.atomix.primitive.service.BackupOutput;
+import io.atomix.primitive.service.BackupInput;
+import io.atomix.primitive.service.Commit;
+import io.atomix.primitive.service.ServiceExecutor;
+import io.atomix.utils.serializer.Serializer;
+import org.apache.zeppelin.cluster.meta.ClusterMeta;
+import org.apache.zeppelin.cluster.meta.ClusterMetaEntity;
+import org.apache.zeppelin.cluster.meta.ClusterMetaType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Cluster State Machine for Zeppelin
+ * The cluster state is implemented as a snapshot state machine.
+ * The state machine stores the service and process metadata information of 
the cluster.
+ * Metadata information can be manipulated by put, get, remove, index, and 
snapshot.
+ */
+public class ClusterStateMachine extends AbstractPrimitiveService {
+  private static Logger logger = 
LoggerFactory.getLogger(ClusterStateMachine.class);
+  private ClusterMeta clusterMeta = new ClusterMeta();
+
+  // Command to operation a variable in cluster state machine
+  public static final OperationId PUT = OperationId.command("put");
+  public static final OperationId GET = OperationId.query("get");
+  public static final OperationId REMOVE = OperationId.command("remove");
+  public static final OperationId INDEX = OperationId.command("index");
+
+  public ClusterStateMachine() {
+    super(ClusterPrimitiveType.INSTANCE);
+  }
+
+  @Override
+  public Serializer serializer() {
+    return ClusterManager.clientSerializer;
+  }
+
+  @Override
+  protected void configure(ServiceExecutor executor) {
+    executor.register(PUT, this::put);
+    executor.register(GET, this::get);
+    executor.register(REMOVE, this::remove);
+    executor.register(INDEX, this::index);
+  }
+
+  protected long put(Commit<ClusterMetaEntity> commit) {
+    clusterMeta.put(commit.value().getMetaType(),
+        commit.value().getKey(), commit.value().getValues());
+    return commit.index();
+  }
+
+  protected Map<String, Map<String, Object>> get(Commit<ClusterMetaEntity> 
commit) {
+    return clusterMeta.get(commit.value().getMetaType(), 
commit.value().getKey());
+  }
+
+  protected long remove(Commit<ClusterMetaEntity> commit) {
+    clusterMeta.remove(commit.value().getMetaType(), commit.value().getKey());
+    return commit.index();
+  }
+
+  protected long index(Commit<Void> commit) {
+    return commit.index();
+  }
+
+  @Override
+  public void backup(BackupOutput writer) {
+    if (logger.isDebugEnabled()) {
+      logger.debug("ClusterStateMachine.backup()");
+    }
+
+    // backup ServerMeta
+    // cluster meta map struct
+    // cluster_name -> 
{server_tserver_host,server_tserver_port,cpu_capacity,...}
+    Map<String, Map<String, Object>> mapServerMeta
+        = clusterMeta.get(ClusterMetaType.ServerMeta, "");
+    // write all ServerMeta size
+    writer.writeInt(mapServerMeta.size());
+    for (Map.Entry<String, Map<String, Object>> entry : 
mapServerMeta.entrySet()) {
+      // write cluster_name
+      writer.writeString(entry.getKey());
+
+      Map<String, Object> kvPairs = entry.getValue();
+      // write cluster mate kv pairs size
+      writer.writeInt(kvPairs.size());
+      for (Map.Entry<String, Object> entryValue : kvPairs.entrySet()) {
+        // write cluster mate kv pairs
+        writer.writeString(entryValue.getKey());
+        writer.writeObject(entryValue.getValue());
+      }
+    }
+
+    // backup IntpProcessMeta
+    // Interpreter meta map struct
+    // IntpGroupId -> {server_tserver_host,server_tserver_port,...}
+    Map<String, Map<String, Object>> mapIntpProcMeta
+        = clusterMeta.get(ClusterMetaType.IntpProcessMeta, "");
+    // write interpreter size
+    writer.writeInt(mapIntpProcMeta.size());
+    for (Map.Entry<String, Map<String, Object>> entry : 
mapIntpProcMeta.entrySet()) {
+      // write IntpGroupId
+      writer.writeString(entry.getKey());
+
+      Map<String, Object> kvPairs = entry.getValue();
+      // write interpreter mate kv pairs size
+      writer.writeInt(kvPairs.size());
+      for (Map.Entry<String, Object> entryValue : kvPairs.entrySet()) {
+        // write interpreter mate kv pairs
+        writer.writeString(entryValue.getKey());
+        writer.writeObject(entryValue.getValue());
+      }
+    }
+  }
+
+  @Override
+  public void restore(BackupInput reader) {
+    if (logger.isDebugEnabled()) {
+      logger.debug("ClusterStateMachine.restore()");
+    }
+
+    clusterMeta = new ClusterMeta();
+    // read all ServerMeta size
+    int nServerMeta = reader.readInt();
+    for (int i = 0; i < nServerMeta; i++) {
+      // read cluster_name
+      String clusterName = reader.readString();
+
+      // read cluster mate kv pairs size
+      int nKVpairs = reader.readInt();
+      for (int j = 0; j < nKVpairs; i++) {
+        // read cluster mate kv pairs
+        String key = reader.readString();
+        Object value = reader.readObject();
+
+        clusterMeta.put(ClusterMetaType.ServerMeta,
+            clusterName, Maps.immutableEntry(key, value));
+      }
+    }
+
+    // read all IntpProcessMeta size
+    int nIntpMeta = reader.readInt();
+    for (int i = 0; i < nIntpMeta; i++) {
+      // read interpreter name
+      String intpName = reader.readString();
+
+      // read interpreter mate kv pairs size
+      int nKVpairs = reader.readInt();
+      for (int j = 0; j < nKVpairs; i++) {
+        // read interpreter mate kv pairs
+        String key = reader.readString();
+        Object value = reader.readObject();
+
+        clusterMeta.put(ClusterMetaType.IntpProcessMeta,
+            intpName, Maps.immutableEntry(key, value));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/listener/ZeppelinClusterMembershipEventListener.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/listener/ZeppelinClusterMembershipEventListener.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/listener/ZeppelinClusterMembershipEventListener.java
new file mode 100644
index 0000000..6283813
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/listener/ZeppelinClusterMembershipEventListener.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster.listener;
+
+import io.atomix.cluster.ClusterMembershipEvent;
+import io.atomix.cluster.ClusterMembershipEventListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Entity capable of receiving device cluster-related events.
+ * Listen for new zeppelin servers to join or leave the cluster,
+ * Monitor whether the metadata in the cluster server changes
+ */
+public class ZeppelinClusterMembershipEventListener implements 
ClusterMembershipEventListener {
+  private static Logger logger
+      = LoggerFactory.getLogger(ZeppelinClusterMembershipEventListener.class);
+
+  @Override
+  public void event(ClusterMembershipEvent event) {
+    switch (event.type()) {
+      case MEMBER_ADDED:
+        logger.info(event.subject().id() + " joined the cluster.");
+        break;
+      case MEMBER_REMOVED:
+        logger.info(event.subject().id() + " left the cluster.");
+        break;
+      case METADATA_CHANGED:
+        logger.info(event.subject().id() + " meta data changed.");
+        break;
+      case REACHABILITY_CHANGED:
+        logger.info(event.subject().id() + " reachability changed.");
+        break;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java
new file mode 100644
index 0000000..b96e32b
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster.meta;
+
+import com.google.gson.Gson;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Metadata stores metadata information in a KV key-value pair
+ */
+public class ClusterMeta implements Serializable {
+  private static Logger logger = LoggerFactory.getLogger(ClusterMeta.class);
+
+  // zeppelin-server meta
+  public static String SERVER_HOST          = "SERVER_HOST";
+  public static String SERVER_PORT          = "SERVER_PORT";
+  public static String SERVER_TSERVER_HOST  = "SERVER_TSERVER_HOST";
+  public static String SERVER_TSERVER_PORT  = "SERVER_TSERVER_PORT";
+  public static String SERVER_START_TIME    = "SERVER_START_TIME";
+
+  // interperter-process meta
+  public static String INTP_TSERVER_HOST    = "INTP_TSERVER_HOST";
+  public static String INTP_TSERVER_PORT    = "INTP_TSERVER_PORT";
+  public static String INTP_START_TIME      = "INTP_START_TIME";
+
+  // zeppelin-server resource usage
+  public static String CPU_CAPACITY         = "CPU_CAPACITY";
+  public static String CPU_USED             = "CPU_USED";
+  public static String MEMORY_CAPACITY      = "MEMORY_CAPACITY";
+  public static String MEMORY_USED          = "MEMORY_USED";
+
+  public static String HEARTBEAT            = "HEARTBEAT";
+
+  // zeppelin-server or interperter-process status
+  public static String STATUS               = "STATUS";
+  public static String ONLINE_STATUS        = "ONLINE";
+  public static String OFFLINE_STATUS       = "OFFLINE";
+
+  // cluster_name = host:port
+  // Map:cluster_name -> 
{server_tserver_host,server_tserver_port,cpu_capacity,...}
+  private Map<String, Map<String, Object>> mapServerMeta = new HashMap<>();
+
+  // Map:InterpreterGroupId -> {cluster_name,intp_tserver_host,...}
+  private Map<String, Map<String, Object>> mapInterpreterMeta = new 
HashMap<>();
+
+  public static Gson gson = new Gson();
+
+  public void put(ClusterMetaType type, String key, Object value) {
+    Map<String, Object> mapValue = (Map<String, Object>) value;
+
+    switch (type) {
+      case ServerMeta:
+        // Because it may be partially updated metadata information
+        if (mapServerMeta.containsKey(key)) {
+          Map<String, Object> values = mapServerMeta.get(key);
+          values.putAll(mapValue);
+        } else {
+          mapServerMeta.put(key, mapValue);
+        }
+        break;
+      case IntpProcessMeta:
+        if (mapInterpreterMeta.containsKey(key)) {
+          Map<String, Object> values = mapInterpreterMeta.get(key);
+          values.putAll(mapValue);
+        } else {
+          mapInterpreterMeta.put(key, mapValue);
+        }
+        break;
+    }
+  }
+
+  public Map<String, Map<String, Object>> get(ClusterMetaType type, String 
key) {
+    Map<String, Object> values = null;
+
+    switch (type) {
+      case ServerMeta:
+        if (null == key || StringUtils.isEmpty(key)) {
+          return mapServerMeta;
+        }
+        if (mapServerMeta.containsKey(key)) {
+          values = mapServerMeta.get(key);
+        } else {
+          logger.warn("can not find key : {}", key);
+        }
+        break;
+      case IntpProcessMeta:
+        if (null == key || StringUtils.isEmpty(key)) {
+          return mapInterpreterMeta;
+        }
+        if (mapInterpreterMeta.containsKey(key)) {
+          values = mapInterpreterMeta.get(key);
+        } else {
+          logger.warn("can not find key : {}", key);
+        }
+        break;
+    }
+
+    Map<String, Map<String, Object>> result = new HashMap<>();
+    result.put(key, values);
+
+    return result;
+  }
+
+  public Map<String, Object> remove(ClusterMetaType type, String key) {
+    switch (type) {
+      case ServerMeta:
+        if (mapServerMeta.containsKey(key)) {
+          return mapServerMeta.remove(key);
+        } else {
+          logger.warn("can not find key : {}", key);
+        }
+        break;
+      case IntpProcessMeta:
+        if (mapInterpreterMeta.containsKey(key)) {
+          return mapInterpreterMeta.remove(key);
+        } else {
+          logger.warn("can not find key : {}", key);
+        }
+        break;
+    }
+
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaEntity.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaEntity.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaEntity.java
new file mode 100644
index 0000000..7a5afb0
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaEntity.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster.meta;
+
+import java.io.Serializable;
+import java.util.HashMap;
+
+/**
+ * Cluster operations, cluster types, encapsulation objects for keys and values
+ */
+public class ClusterMetaEntity implements Serializable {
+  private ClusterMetaOperation operation;
+  private ClusterMetaType type;
+  private String key;
+  private HashMap<String, Object> values = new HashMap<>();
+
+  public ClusterMetaEntity(ClusterMetaOperation operation, ClusterMetaType 
type,
+                           String key, HashMap<String, Object> values) {
+    this.operation = operation;
+    this.type = type;
+    this.key = key;
+
+    if (null != values) {
+      this.values.putAll(values);
+    }
+  }
+
+  public ClusterMetaOperation getOperation() {
+    return operation;
+  }
+
+  public ClusterMetaType getMetaType() {
+    return type;
+  }
+
+  public String getKey() {
+    return key;
+  }
+
+  public HashMap<String, Object> getValues() {
+    return values;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaOperation.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaOperation.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaOperation.java
new file mode 100644
index 0000000..33c99c8
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaOperation.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster.meta;
+
+/**
+ * Type of cluster metadata operation
+ */
+public enum ClusterMetaOperation {
+  GET_OPERATION,
+  PUT_OPERATION,
+  DELETE_OPERATION
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java
new file mode 100644
index 0000000..c6229bd
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster.meta;
+
+/**
+ * Type of cluster metadata
+ */
+public enum ClusterMetaType {
+  ServerMeta,
+  IntpProcessMeta
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftClientProtocol.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftClientProtocol.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftClientProtocol.java
new file mode 100644
index 0000000..eb7a762
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftClientProtocol.java
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster.protocol;
+
+import com.google.common.collect.Maps;
+import io.atomix.cluster.MemberId;
+import io.atomix.primitive.session.SessionId;
+
+import io.atomix.protocols.raft.protocol.HeartbeatRequest;
+import io.atomix.protocols.raft.protocol.PublishRequest;
+import io.atomix.protocols.raft.protocol.RaftClientProtocol;
+import io.atomix.protocols.raft.protocol.HeartbeatResponse;
+import io.atomix.protocols.raft.protocol.OpenSessionResponse;
+import io.atomix.protocols.raft.protocol.OpenSessionRequest;
+import io.atomix.protocols.raft.protocol.CloseSessionResponse;
+import io.atomix.protocols.raft.protocol.CloseSessionRequest;
+import io.atomix.protocols.raft.protocol.KeepAliveResponse;
+import io.atomix.protocols.raft.protocol.KeepAliveRequest;
+import io.atomix.protocols.raft.protocol.QueryResponse;
+import io.atomix.protocols.raft.protocol.QueryRequest;
+import io.atomix.protocols.raft.protocol.CommandResponse;
+import io.atomix.protocols.raft.protocol.CommandRequest;
+import io.atomix.protocols.raft.protocol.MetadataResponse;
+import io.atomix.protocols.raft.protocol.MetadataRequest;
+import io.atomix.protocols.raft.protocol.ResetRequest;
+import io.atomix.utils.concurrent.Futures;
+import io.atomix.utils.serializer.Serializer;
+
+import java.net.ConnectException;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * Protocol for intercommunication between Raft clients for each server in the 
cluster.
+ * Communication protocol for handling sessions, queries, commands, and 
services within the cluster.
+ */
+public class LocalRaftClientProtocol extends LocalRaftProtocol implements 
RaftClientProtocol {
+  private Function<HeartbeatRequest, CompletableFuture<HeartbeatResponse>> 
heartbeatHandler;
+  private final Map<Long, Consumer<PublishRequest>> publishListeners = 
Maps.newConcurrentMap();
+
+  public LocalRaftClientProtocol(MemberId memberId,
+                                 Serializer serializer,
+                                 Map<MemberId, LocalRaftServerProtocol> 
servers,
+                                 Map<MemberId, LocalRaftClientProtocol> 
clients) {
+    super(serializer, servers, clients);
+    clients.put(memberId, this);
+  }
+
+  private CompletableFuture<LocalRaftServerProtocol> getServer(MemberId 
memberId) {
+    LocalRaftServerProtocol server = server(memberId);
+    if (server != null) {
+      return Futures.completedFuture(server);
+    } else {
+      return Futures.exceptionalFuture(new ConnectException());
+    }
+  }
+
+  @Override
+  public CompletableFuture<OpenSessionResponse> openSession(MemberId memberId,
+                                                            OpenSessionRequest 
request) {
+    return getServer(memberId).thenCompose(protocol ->
+        protocol.openSession(encode(request))).thenApply(this::decode);
+  }
+
+  @Override
+  public CompletableFuture<CloseSessionResponse> closeSession(MemberId 
memberId,
+                                                              
CloseSessionRequest request) {
+    return getServer(memberId).thenCompose(protocol ->
+        protocol.closeSession(encode(request))).thenApply(this::decode);
+  }
+
+  @Override
+  public CompletableFuture<KeepAliveResponse> keepAlive(MemberId memberId,
+                                                        KeepAliveRequest 
request) {
+    return getServer(memberId).thenCompose(protocol ->
+        protocol.keepAlive(encode(request))).thenApply(this::decode);
+  }
+
+  @Override
+  public CompletableFuture<QueryResponse> query(MemberId memberId, 
QueryRequest request) {
+    return getServer(memberId).thenCompose(protocol ->
+        protocol.query(encode(request))).thenApply(this::decode);
+  }
+
+  @Override
+  public CompletableFuture<CommandResponse> command(MemberId memberId,
+                                                    CommandRequest request) {
+    return getServer(memberId).thenCompose(protocol ->
+        protocol.command(encode(request))).thenApply(this::decode);
+  }
+
+  @Override
+  public CompletableFuture<MetadataResponse> metadata(MemberId memberId,
+                                                      MetadataRequest request) 
{
+    return getServer(memberId).thenCompose(protocol ->
+        protocol.metadata(encode(request))).thenApply(this::decode);
+  }
+
+  CompletableFuture<byte[]> heartbeat(byte[] request) {
+    if (heartbeatHandler != null) {
+      return heartbeatHandler.apply(decode(request)).thenApply(this::encode);
+    } else {
+      return Futures.exceptionalFuture(new ConnectException());
+    }
+  }
+
+  @Override
+  public void registerHeartbeatHandler(Function<HeartbeatRequest,
+      CompletableFuture<HeartbeatResponse>> handler) {
+    this.heartbeatHandler = handler;
+  }
+
+  @Override
+  public void unregisterHeartbeatHandler() {
+    this.heartbeatHandler = null;
+  }
+
+  @Override
+  public void reset(Set<MemberId> members, ResetRequest request) {
+    members.forEach(nodeId -> {
+      LocalRaftServerProtocol server = server(nodeId);
+      if (server != null) {
+        server.reset(request.session(), encode(request));
+      }
+    });
+  }
+
+  void publish(long sessionId, byte[] request) {
+    Consumer<PublishRequest> listener = publishListeners.get(sessionId);
+    if (listener != null) {
+      listener.accept(decode(request));
+    }
+  }
+
+  @Override
+  public void registerPublishListener(SessionId sessionId,
+                                      Consumer<PublishRequest> listener, 
Executor executor) {
+    publishListeners.put(sessionId.id(), request ->
+        executor.execute(() -> listener.accept(request)));
+  }
+
+  @Override
+  public void unregisterPublishListener(SessionId sessionId) {
+    publishListeners.remove(sessionId.id());
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftProtocol.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftProtocol.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftProtocol.java
new file mode 100644
index 0000000..c28047a
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftProtocol.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster.protocol;
+
+import io.atomix.cluster.MemberId;
+import io.atomix.utils.serializer.Serializer;
+
+import java.util.Map;
+
+/**
+ * Base class for Raft protocol.
+ */
+public abstract class LocalRaftProtocol {
+  private final Serializer serializer;
+  private final Map<MemberId, LocalRaftServerProtocol> servers;
+  private final Map<MemberId, LocalRaftClientProtocol> clients;
+
+  public LocalRaftProtocol(Serializer serializer,
+                           Map<MemberId, LocalRaftServerProtocol> servers,
+                           Map<MemberId, LocalRaftClientProtocol> clients) {
+    this.serializer = serializer;
+    this.servers = servers;
+    this.clients = clients;
+  }
+
+  <T> T copy(T value) {
+    return serializer.decode(serializer.encode(value));
+  }
+
+  byte[] encode(Object value) {
+    return serializer.encode(value);
+  }
+
+  <T> T decode(byte[] bytes) {
+    return serializer.decode(bytes);
+  }
+
+  LocalRaftServerProtocol server(MemberId memberId) {
+    return servers.get(memberId);
+  }
+
+  LocalRaftClientProtocol client(MemberId memberId) {
+    return clients.get(memberId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftProtocolFactory.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftProtocolFactory.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftProtocolFactory.java
new file mode 100644
index 0000000..83d2502
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftProtocolFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster.protocol;
+
+import com.google.common.collect.Maps;
+import io.atomix.cluster.MemberId;
+import io.atomix.protocols.raft.protocol.RaftClientProtocol;
+import io.atomix.protocols.raft.protocol.RaftServerProtocol;
+import io.atomix.utils.serializer.Serializer;
+
+import java.util.Map;
+
+/**
+ * Cluster Raft protocol factory.
+ */
+public class LocalRaftProtocolFactory {
+  private final Serializer serializer;
+  private final Map<MemberId, LocalRaftServerProtocol> servers = 
Maps.newConcurrentMap();
+  private final Map<MemberId, LocalRaftClientProtocol> clients = 
Maps.newConcurrentMap();
+
+  public LocalRaftProtocolFactory(Serializer serializer) {
+    this.serializer = serializer;
+  }
+
+  /**
+   * Returns a new test client protocol.
+   *
+   * @param memberId the client member identifier
+   * @return a new test client protocol
+   */
+  public RaftClientProtocol newClientProtocol(MemberId memberId) {
+    return new LocalRaftClientProtocol(memberId, serializer, servers, clients);
+  }
+
+  /**
+   * Returns a new test server protocol.
+   *
+   * @param memberId the server member identifier
+   * @return a new test server protocol
+   */
+  public RaftServerProtocol newServerProtocol(MemberId memberId) {
+    return new LocalRaftServerProtocol(memberId, serializer, servers, clients);
+  }
+}

Reply via email to