Repository: zeppelin
Updated Branches:
  refs/heads/master 0c374c021 -> f28e96317


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftServerProtocol.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftServerProtocol.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftServerProtocol.java
new file mode 100644
index 0000000..c28ae87
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftServerProtocol.java
@@ -0,0 +1,527 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster.protocol;
+
+import com.google.common.collect.Maps;
+import io.atomix.cluster.MemberId;
+import io.atomix.primitive.session.SessionId;
+import io.atomix.protocols.raft.protocol.RaftServerProtocol;
+import io.atomix.protocols.raft.protocol.OpenSessionRequest;
+import io.atomix.protocols.raft.protocol.OpenSessionResponse;
+import io.atomix.protocols.raft.protocol.CloseSessionRequest;
+import io.atomix.protocols.raft.protocol.CloseSessionResponse;
+import io.atomix.protocols.raft.protocol.KeepAliveRequest;
+import io.atomix.protocols.raft.protocol.KeepAliveResponse;
+import io.atomix.protocols.raft.protocol.QueryRequest;
+import io.atomix.protocols.raft.protocol.QueryResponse;
+import io.atomix.protocols.raft.protocol.CommandRequest;
+import io.atomix.protocols.raft.protocol.CommandResponse;
+import io.atomix.protocols.raft.protocol.MetadataRequest;
+import io.atomix.protocols.raft.protocol.MetadataResponse;
+import io.atomix.protocols.raft.protocol.JoinRequest;
+import io.atomix.protocols.raft.protocol.JoinResponse;
+import io.atomix.protocols.raft.protocol.LeaveRequest;
+import io.atomix.protocols.raft.protocol.LeaveResponse;
+import io.atomix.protocols.raft.protocol.ConfigureRequest;
+import io.atomix.protocols.raft.protocol.ConfigureResponse;
+import io.atomix.protocols.raft.protocol.ReconfigureRequest;
+import io.atomix.protocols.raft.protocol.ReconfigureResponse;
+import io.atomix.protocols.raft.protocol.InstallRequest;
+import io.atomix.protocols.raft.protocol.InstallResponse;
+import io.atomix.protocols.raft.protocol.PollRequest;
+import io.atomix.protocols.raft.protocol.PollResponse;
+import io.atomix.protocols.raft.protocol.VoteRequest;
+import io.atomix.protocols.raft.protocol.VoteResponse;
+import io.atomix.protocols.raft.protocol.TransferRequest;
+import io.atomix.protocols.raft.protocol.TransferResponse;
+import io.atomix.protocols.raft.protocol.AppendRequest;
+import io.atomix.protocols.raft.protocol.AppendResponse;
+import io.atomix.protocols.raft.protocol.ResetRequest;
+import io.atomix.protocols.raft.protocol.PublishRequest;
+import io.atomix.protocols.raft.protocol.HeartbeatResponse;
+import io.atomix.protocols.raft.protocol.HeartbeatRequest;
+
+import io.atomix.utils.concurrent.Futures;
+import io.atomix.utils.serializer.Serializer;
+
+import java.net.ConnectException;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * Cluster server protocol.
+ */
+public class LocalRaftServerProtocol extends LocalRaftProtocol implements 
RaftServerProtocol {
+  private Function<OpenSessionRequest, CompletableFuture<OpenSessionResponse>> 
openSessionHandler;
+  private Function<CloseSessionRequest, 
CompletableFuture<CloseSessionResponse>>
+      closeSessionHandler;
+  private Function<KeepAliveRequest, CompletableFuture<KeepAliveResponse>> 
keepAliveHandler;
+  private Function<QueryRequest, CompletableFuture<QueryResponse>> 
queryHandler;
+  private Function<CommandRequest, CompletableFuture<CommandResponse>> 
commandHandler;
+  private Function<MetadataRequest, CompletableFuture<MetadataResponse>> 
metadataHandler;
+  private Function<JoinRequest, CompletableFuture<JoinResponse>> joinHandler;
+  private Function<LeaveRequest, CompletableFuture<LeaveResponse>> 
leaveHandler;
+  private Function<ConfigureRequest, CompletableFuture<ConfigureResponse>> 
configureHandler;
+  private Function<ReconfigureRequest, CompletableFuture<ReconfigureResponse>> 
reconfigureHandler;
+  private Function<InstallRequest, CompletableFuture<InstallResponse>> 
installHandler;
+  private Function<PollRequest, CompletableFuture<PollResponse>> pollHandler;
+  private Function<VoteRequest, CompletableFuture<VoteResponse>> voteHandler;
+  private Function<TransferRequest, CompletableFuture<TransferResponse>> 
transferHandler;
+  private Function<AppendRequest, CompletableFuture<AppendResponse>> 
appendHandler;
+  private final Map<Long, Consumer<ResetRequest>> resetListeners = 
Maps.newConcurrentMap();
+
+  public LocalRaftServerProtocol(MemberId memberId, Serializer serializer,
+                                 Map<MemberId, LocalRaftServerProtocol> 
servers,
+                                 Map<MemberId, LocalRaftClientProtocol> 
clients) {
+    super(serializer, servers, clients);
+    servers.put(memberId, this);
+  }
+
+  private CompletableFuture<LocalRaftServerProtocol> getServer(MemberId 
memberId) {
+    LocalRaftServerProtocol server = server(memberId);
+    if (server != null) {
+      return Futures.completedFuture(server);
+    } else {
+      return Futures.exceptionalFuture(new ConnectException());
+    }
+  }
+
+  private CompletableFuture<LocalRaftClientProtocol> getClient(MemberId 
memberId) {
+    LocalRaftClientProtocol client = client(memberId);
+    if (client != null) {
+      return Futures.completedFuture(client);
+    } else {
+      return Futures.exceptionalFuture(new ConnectException());
+    }
+  }
+
+  @Override
+  public CompletableFuture<OpenSessionResponse> openSession(MemberId memberId,
+                                                            OpenSessionRequest 
request) {
+    return getServer(memberId).thenCompose(listener ->
+        listener.openSession(encode(request))).thenApply(this::decode);
+  }
+
+  @Override
+  public CompletableFuture<CloseSessionResponse> closeSession(MemberId 
memberId,
+                                                              
CloseSessionRequest request) {
+    return getServer(memberId).thenCompose(listener ->
+        listener.closeSession(encode(request))).thenApply(this::decode);
+  }
+
+  @Override
+  public CompletableFuture<KeepAliveResponse> keepAlive(MemberId memberId,
+                                                        KeepAliveRequest 
request) {
+    return getServer(memberId).thenCompose(listener ->
+        listener.keepAlive(encode(request))).thenApply(this::decode);
+  }
+
+  @Override
+  public CompletableFuture<QueryResponse> query(MemberId memberId, 
QueryRequest request) {
+    return getServer(memberId).thenCompose(listener ->
+        listener.query(encode(request))).thenApply(this::decode);
+  }
+
+  @Override
+  public CompletableFuture<CommandResponse> command(MemberId memberId,
+                                                    CommandRequest request) {
+    return getServer(memberId).thenCompose(listener ->
+        listener.command(encode(request))).thenApply(this::decode);
+  }
+
+  @Override
+  public CompletableFuture<MetadataResponse> metadata(MemberId memberId,
+                                                      MetadataRequest request) 
{
+    return getServer(memberId).thenCompose(listener ->
+        listener.metadata(encode(request))).thenApply(this::decode);
+  }
+
+  @Override
+  public CompletableFuture<JoinResponse> join(MemberId memberId, JoinRequest 
request) {
+    return getServer(memberId).thenCompose(listener ->
+        listener.join(encode(request))).thenApply(this::decode);
+  }
+
+  @Override
+  public CompletableFuture<LeaveResponse> leave(MemberId memberId, 
LeaveRequest request) {
+    return getServer(memberId).thenCompose(listener ->
+        listener.leave(encode(request))).thenApply(this::decode);
+  }
+
+  @Override
+  public CompletableFuture<ConfigureResponse> configure(MemberId memberId,
+                                                        ConfigureRequest 
request) {
+    return getServer(memberId).thenCompose(listener ->
+        listener.configure(encode(request))).thenApply(this::decode);
+  }
+
+  @Override
+  public CompletableFuture<ReconfigureResponse> reconfigure(MemberId memberId,
+                                                            ReconfigureRequest 
request) {
+    return getServer(memberId).thenCompose(listener ->
+        listener.reconfigure(encode(request))).thenApply(this::decode);
+  }
+
+  @Override
+  public CompletableFuture<InstallResponse> install(MemberId memberId, 
InstallRequest request) {
+    return getServer(memberId).thenCompose(listener ->
+        listener.install(encode(request))).thenApply(this::decode);
+  }
+
+  @Override
+  public CompletableFuture<TransferResponse> transfer(MemberId memberId, 
TransferRequest request) {
+    return getServer(memberId).thenCompose(listener ->
+        listener.install(encode(request))).thenApply(this::decode);
+  }
+
+  @Override
+  public CompletableFuture<PollResponse> poll(MemberId memberId, PollRequest 
request) {
+    return getServer(memberId).thenCompose(listener ->
+        listener.poll(encode(request))).thenApply(this::decode);
+  }
+
+  @Override
+  public CompletableFuture<VoteResponse> vote(MemberId memberId, VoteRequest 
request) {
+    return getServer(memberId).thenCompose(listener ->
+        listener.vote(encode(request))).thenApply(this::decode);
+  }
+
+  @Override
+  public CompletableFuture<AppendResponse> append(MemberId memberId, 
AppendRequest request) {
+    return getServer(memberId).thenCompose(listener ->
+        listener.append(encode(request))).thenApply(this::decode);
+  }
+
+  @Override
+  public void publish(MemberId memberId, PublishRequest request) {
+    getClient(memberId).thenAccept(protocol ->
+        protocol.publish(request.session(), encode(request)));
+  }
+
+  @Override
+  public CompletableFuture<HeartbeatResponse> heartbeat(MemberId memberId,
+                                                        HeartbeatRequest 
request) {
+    return getClient(memberId).thenCompose(protocol ->
+        protocol.heartbeat(encode(request))).thenApply(this::decode);
+  }
+
+  CompletableFuture<byte[]> openSession(byte[] request) {
+    if (openSessionHandler != null) {
+      return openSessionHandler.apply(decode(request)).thenApply(this::encode);
+    } else {
+      return Futures.exceptionalFuture(new ConnectException());
+    }
+  }
+
+  @Override
+  public void registerOpenSessionHandler(Function<OpenSessionRequest,
+      CompletableFuture<OpenSessionResponse>> handler) {
+    this.openSessionHandler = handler;
+  }
+
+  @Override
+  public void unregisterOpenSessionHandler() {
+    this.openSessionHandler = null;
+  }
+
+  CompletableFuture<byte[]> closeSession(byte[] request) {
+    if (closeSessionHandler != null) {
+      return 
closeSessionHandler.apply(decode(request)).thenApply(this::encode);
+    } else {
+      return Futures.exceptionalFuture(new ConnectException());
+    }
+  }
+
+  @Override
+  public void registerCloseSessionHandler(Function<CloseSessionRequest,
+      CompletableFuture<CloseSessionResponse>> handler) {
+    this.closeSessionHandler = handler;
+  }
+
+  @Override
+  public void unregisterCloseSessionHandler() {
+    this.closeSessionHandler = null;
+  }
+
+  CompletableFuture<byte[]> keepAlive(byte[] request) {
+    if (keepAliveHandler != null) {
+      return keepAliveHandler.apply(decode(request)).thenApply(this::encode);
+    } else {
+      return Futures.exceptionalFuture(new ConnectException());
+    }
+  }
+
+  @Override
+  public void registerKeepAliveHandler(Function<KeepAliveRequest,
+      CompletableFuture<KeepAliveResponse>> handler) {
+    this.keepAliveHandler = handler;
+  }
+
+  @Override
+  public void unregisterKeepAliveHandler() {
+    this.keepAliveHandler = null;
+  }
+
+  CompletableFuture<byte[]> query(byte[] request) {
+    if (queryHandler != null) {
+      return queryHandler.apply(decode(request)).thenApply(this::encode);
+    } else {
+      return Futures.exceptionalFuture(new ConnectException());
+    }
+  }
+
+  @Override
+  public void registerQueryHandler(Function<QueryRequest,
+      CompletableFuture<QueryResponse>> handler) {
+    this.queryHandler = handler;
+  }
+
+  @Override
+  public void unregisterQueryHandler() {
+    this.queryHandler = null;
+  }
+
+  CompletableFuture<byte[]> command(byte[] request) {
+    if (commandHandler != null) {
+      return commandHandler.apply(decode(request)).thenApply(this::encode);
+    } else {
+      return Futures.exceptionalFuture(new ConnectException());
+    }
+  }
+
+  @Override
+  public void registerCommandHandler(Function<CommandRequest,
+      CompletableFuture<CommandResponse>> handler) {
+    this.commandHandler = handler;
+  }
+
+  @Override
+  public void unregisterCommandHandler() {
+    this.commandHandler = null;
+  }
+
+  CompletableFuture<byte[]> metadata(byte[] request) {
+    if (metadataHandler != null) {
+      return metadataHandler.apply(decode(request)).thenApply(this::encode);
+    } else {
+      return Futures.exceptionalFuture(new ConnectException());
+    }
+  }
+
+  @Override
+  public void registerMetadataHandler(Function<MetadataRequest,
+      CompletableFuture<MetadataResponse>> handler) {
+    this.metadataHandler = handler;
+  }
+
+  @Override
+  public void unregisterMetadataHandler() {
+    this.metadataHandler = null;
+  }
+
+  CompletableFuture<byte[]> join(byte[] request) {
+    if (joinHandler != null) {
+      return joinHandler.apply(decode(request)).thenApply(this::encode);
+    } else {
+      return Futures.exceptionalFuture(new ConnectException());
+    }
+  }
+
+  @Override
+  public void registerJoinHandler(Function<JoinRequest,
+      CompletableFuture<JoinResponse>> handler) {
+    this.joinHandler = handler;
+  }
+
+  @Override
+  public void unregisterJoinHandler() {
+    this.joinHandler = null;
+  }
+
+  CompletableFuture<byte[]> leave(byte[] request) {
+    if (leaveHandler != null) {
+      return leaveHandler.apply(decode(request)).thenApply(this::encode);
+    } else {
+      return Futures.exceptionalFuture(new ConnectException());
+    }
+  }
+
+  @Override
+  public void registerLeaveHandler(Function<LeaveRequest,
+      CompletableFuture<LeaveResponse>> handler) {
+    this.leaveHandler = handler;
+  }
+
+  @Override
+  public void unregisterLeaveHandler() {
+    this.leaveHandler = null;
+  }
+
+  CompletableFuture<byte[]> configure(byte[] request) {
+    if (configureHandler != null) {
+      return configureHandler.apply(decode(request)).thenApply(this::encode);
+    } else {
+      return Futures.exceptionalFuture(new ConnectException());
+    }
+  }
+
+  @Override
+  public void registerConfigureHandler(Function<ConfigureRequest,
+      CompletableFuture<ConfigureResponse>> handler) {
+    this.configureHandler = handler;
+  }
+
+  @Override
+  public void unregisterConfigureHandler() {
+    this.configureHandler = null;
+  }
+
+  CompletableFuture<byte[]> reconfigure(byte[] request) {
+    if (reconfigureHandler != null) {
+      return reconfigureHandler.apply(decode(request)).thenApply(this::encode);
+    } else {
+      return Futures.exceptionalFuture(new ConnectException());
+    }
+  }
+
+  @Override
+  public void registerReconfigureHandler(Function<ReconfigureRequest,
+      CompletableFuture<ReconfigureResponse>> handler) {
+    this.reconfigureHandler = handler;
+  }
+
+  @Override
+  public void unregisterReconfigureHandler() {
+    this.reconfigureHandler = null;
+  }
+
+  CompletableFuture<byte[]> install(byte[] request) {
+    if (installHandler != null) {
+      return installHandler.apply(decode(request)).thenApply(this::encode);
+    } else {
+      return Futures.exceptionalFuture(new ConnectException());
+    }
+  }
+
+  @Override
+  public void registerInstallHandler(Function<InstallRequest,
+      CompletableFuture<InstallResponse>> handler) {
+    this.installHandler = handler;
+  }
+
+  @Override
+  public void unregisterInstallHandler() {
+    this.installHandler = null;
+  }
+
+  CompletableFuture<byte[]> poll(byte[] request) {
+    if (pollHandler != null) {
+      return pollHandler.apply(decode(request)).thenApply(this::encode);
+    } else {
+      return Futures.exceptionalFuture(new ConnectException());
+    }
+  }
+
+  @Override
+  public void registerPollHandler(Function<PollRequest,
+      CompletableFuture<PollResponse>> handler) {
+    this.pollHandler = handler;
+  }
+
+  @Override
+  public void unregisterPollHandler() {
+    this.pollHandler = null;
+  }
+
+  CompletableFuture<byte[]> vote(byte[] request) {
+    if (voteHandler != null) {
+      return voteHandler.apply(decode(request)).thenApply(this::encode);
+    } else {
+      return Futures.exceptionalFuture(new ConnectException());
+    }
+  }
+
+  @Override
+  public void registerVoteHandler(Function<VoteRequest,
+      CompletableFuture<VoteResponse>> handler) {
+    this.voteHandler = handler;
+  }
+
+  @Override
+  public void unregisterVoteHandler() {
+    this.voteHandler = null;
+  }
+
+  @Override
+  public void registerTransferHandler(Function<TransferRequest,
+      CompletableFuture<TransferResponse>> handler) {
+    this.transferHandler = handler;
+  }
+
+  @Override
+  public void unregisterTransferHandler() {
+    this.transferHandler = null;
+  }
+
+  CompletableFuture<byte[]> transfer(byte[] request) {
+    if (transferHandler != null) {
+      return transferHandler.apply(decode(request)).thenApply(this::encode);
+    } else {
+      return Futures.exceptionalFuture(new ConnectException());
+    }
+  }
+
+  CompletableFuture<byte[]> append(byte[] request) {
+    if (appendHandler != null) {
+      return appendHandler.apply(decode(request)).thenApply(this::encode);
+    } else {
+      return Futures.exceptionalFuture(new ConnectException());
+    }
+  }
+
+  @Override
+  public void registerAppendHandler(Function<AppendRequest,
+      CompletableFuture<AppendResponse>> handler) {
+    this.appendHandler = handler;
+  }
+
+  @Override
+  public void unregisterAppendHandler() {
+    this.appendHandler = null;
+  }
+
+  void reset(long sessionId, byte[] request) {
+    Consumer<ResetRequest> listener = resetListeners.get(sessionId);
+    if (listener != null) {
+      listener.accept(decode(request));
+    }
+  }
+
+  @Override
+  public void registerResetListener(SessionId sessionId,
+                                    Consumer<ResetRequest> listener, Executor 
executor) {
+    resetListeners.put(sessionId.id(), request -> executor.execute(()
+        -> listener.accept(request)));
+  }
+
+  @Override
+  public void unregisterResetListener(SessionId sessionId) {
+    resetListeners.remove(sessionId.id());
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/RaftClientMessagingProtocol.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/RaftClientMessagingProtocol.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/RaftClientMessagingProtocol.java
new file mode 100644
index 0000000..8d2b425
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/RaftClientMessagingProtocol.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster.protocol;
+
+import io.atomix.cluster.MemberId;
+import io.atomix.cluster.messaging.MessagingService;
+import io.atomix.primitive.session.SessionId;
+import io.atomix.protocols.raft.protocol.OpenSessionRequest;
+import io.atomix.protocols.raft.protocol.OpenSessionResponse;
+import io.atomix.protocols.raft.protocol.RaftClientProtocol;
+import io.atomix.protocols.raft.protocol.HeartbeatRequest;
+import io.atomix.protocols.raft.protocol.PublishRequest;
+import io.atomix.protocols.raft.protocol.HeartbeatResponse;
+import io.atomix.protocols.raft.protocol.CloseSessionResponse;
+import io.atomix.protocols.raft.protocol.CloseSessionRequest;
+import io.atomix.protocols.raft.protocol.KeepAliveResponse;
+import io.atomix.protocols.raft.protocol.KeepAliveRequest;
+import io.atomix.protocols.raft.protocol.QueryResponse;
+import io.atomix.protocols.raft.protocol.QueryRequest;
+import io.atomix.protocols.raft.protocol.CommandResponse;
+import io.atomix.protocols.raft.protocol.CommandRequest;
+import io.atomix.protocols.raft.protocol.MetadataResponse;
+import io.atomix.protocols.raft.protocol.MetadataRequest;
+import io.atomix.protocols.raft.protocol.ResetRequest;
+import io.atomix.utils.net.Address;
+import io.atomix.utils.serializer.Serializer;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * Raft client messaging service protocol.
+ */
+public class RaftClientMessagingProtocol extends RaftMessagingProtocol
+    implements RaftClientProtocol {
+  public RaftClientMessagingProtocol(MessagingService messagingService,
+                                     Serializer serializer,
+                                     Function<MemberId, Address> 
addressProvider) {
+    super(messagingService, serializer, addressProvider);
+  }
+
+  @Override
+  public CompletableFuture<OpenSessionResponse> openSession(MemberId memberId,
+                                                            OpenSessionRequest 
request) {
+    return sendAndReceive(memberId, "open-session", request);
+  }
+
+  @Override
+  public CompletableFuture<CloseSessionResponse> closeSession(MemberId 
memberId,
+                                                              
CloseSessionRequest request) {
+    return sendAndReceive(memberId, "close-session", request);
+  }
+
+  @Override
+  public CompletableFuture<KeepAliveResponse> keepAlive(MemberId memberId,
+                                                        KeepAliveRequest 
request) {
+    return sendAndReceive(memberId, "keep-alive", request);
+  }
+
+  @Override
+  public CompletableFuture<QueryResponse> query(MemberId memberId, 
QueryRequest request) {
+    return sendAndReceive(memberId, "query", request);
+  }
+
+  @Override
+  public CompletableFuture<CommandResponse> command(MemberId memberId,
+                                                    CommandRequest request) {
+    return sendAndReceive(memberId, "command", request);
+  }
+
+  @Override
+  public CompletableFuture<MetadataResponse> metadata(MemberId memberId,
+                                                      MetadataRequest request) 
{
+    return sendAndReceive(memberId, "metadata", request);
+  }
+
+  @Override
+  public void registerHeartbeatHandler(Function<HeartbeatRequest,
+      CompletableFuture<HeartbeatResponse>> handler) {
+    registerHandler("heartbeat", handler);
+  }
+
+  @Override
+  public void unregisterHeartbeatHandler() {
+    unregisterHandler("heartbeat");
+  }
+
+  @Override
+  public void reset(Set<MemberId> members, ResetRequest request) {
+    for (MemberId memberId : members) {
+      sendAsync(memberId, String.format("reset-%d", request.session()), 
request);
+    }
+  }
+
+  @Override
+  public void registerPublishListener(SessionId sessionId, 
Consumer<PublishRequest> listener,
+                                      Executor executor) {
+    messagingService.registerHandler(String.format("publish-%d", 
sessionId.id()), (e, p) -> {
+      listener.accept(serializer.decode(p));
+    }, executor);
+  }
+
+  @Override
+  public void unregisterPublishListener(SessionId sessionId) {
+    messagingService.unregisterHandler(String.format("publish-%d", 
sessionId.id()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/RaftMessagingProtocol.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/RaftMessagingProtocol.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/RaftMessagingProtocol.java
new file mode 100644
index 0000000..d67dbb8
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/RaftMessagingProtocol.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster.protocol;
+
+import io.atomix.cluster.MemberId;
+import io.atomix.cluster.messaging.MessagingService;
+import io.atomix.utils.concurrent.Futures;
+import io.atomix.utils.net.Address;
+import io.atomix.utils.serializer.Serializer;
+
+import java.net.ConnectException;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * Messaging service based Raft protocol.
+ */
+public abstract class RaftMessagingProtocol {
+  protected final MessagingService messagingService;
+  protected final Serializer serializer;
+  private final Function<MemberId, Address> addressProvider;
+
+  public RaftMessagingProtocol(MessagingService messagingService,
+                               Serializer serializer,
+                               Function<MemberId, Address> addressProvider) {
+    this.messagingService = messagingService;
+    this.serializer = serializer;
+    this.addressProvider = addressProvider;
+  }
+
+  protected Address address(MemberId memberId) {
+    return addressProvider.apply(memberId);
+  }
+
+  protected <T, U> CompletableFuture<U> sendAndReceive(MemberId memberId,
+                                                       String type, T request) 
{
+    Address address = address(memberId);
+    if (address == null) {
+      return Futures.exceptionalFuture(new ConnectException());
+    }
+    return messagingService.sendAndReceive(address, type, 
serializer.encode(request))
+        .thenApply(serializer::decode);
+  }
+
+  protected CompletableFuture<Void> sendAsync(MemberId memberId, String type, 
Object request) {
+    Address address = address(memberId);
+    if (address != null) {
+      return messagingService.sendAsync(address(memberId), type, 
serializer.encode(request));
+    }
+    return CompletableFuture.completedFuture(null);
+  }
+
+  protected <T, U> void registerHandler(String type, Function<T, 
CompletableFuture<U>> handler) {
+    messagingService.registerHandler(type, (e, p) -> {
+      CompletableFuture<byte[]> future = new CompletableFuture<>();
+      handler.apply(serializer.decode(p)).whenComplete((result, error) -> {
+        if (error == null) {
+          future.complete(serializer.encode(result));
+        } else {
+          future.completeExceptionally(error);
+        }
+      });
+      return future;
+    });
+  }
+
+  protected void unregisterHandler(String type) {
+    messagingService.unregisterHandler(type);
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/RaftServerMessagingProtocol.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/RaftServerMessagingProtocol.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/RaftServerMessagingProtocol.java
new file mode 100644
index 0000000..bae52bf
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/RaftServerMessagingProtocol.java
@@ -0,0 +1,346 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster.protocol;
+
+import io.atomix.cluster.MemberId;
+import io.atomix.cluster.messaging.MessagingService;
+import io.atomix.primitive.session.SessionId;
+import io.atomix.protocols.raft.protocol.RaftServerProtocol;
+import io.atomix.protocols.raft.protocol.OpenSessionRequest;
+import io.atomix.protocols.raft.protocol.OpenSessionResponse;
+import io.atomix.protocols.raft.protocol.CloseSessionRequest;
+import io.atomix.protocols.raft.protocol.CloseSessionResponse;
+import io.atomix.protocols.raft.protocol.KeepAliveRequest;
+import io.atomix.protocols.raft.protocol.KeepAliveResponse;
+import io.atomix.protocols.raft.protocol.QueryRequest;
+import io.atomix.protocols.raft.protocol.QueryResponse;
+import io.atomix.protocols.raft.protocol.CommandRequest;
+import io.atomix.protocols.raft.protocol.CommandResponse;
+import io.atomix.protocols.raft.protocol.MetadataRequest;
+import io.atomix.protocols.raft.protocol.MetadataResponse;
+import io.atomix.protocols.raft.protocol.JoinRequest;
+import io.atomix.protocols.raft.protocol.JoinResponse;
+import io.atomix.protocols.raft.protocol.LeaveRequest;
+import io.atomix.protocols.raft.protocol.LeaveResponse;
+import io.atomix.protocols.raft.protocol.ConfigureRequest;
+import io.atomix.protocols.raft.protocol.ConfigureResponse;
+import io.atomix.protocols.raft.protocol.ReconfigureRequest;
+import io.atomix.protocols.raft.protocol.ReconfigureResponse;
+import io.atomix.protocols.raft.protocol.InstallRequest;
+import io.atomix.protocols.raft.protocol.InstallResponse;
+import io.atomix.protocols.raft.protocol.PollRequest;
+import io.atomix.protocols.raft.protocol.PollResponse;
+import io.atomix.protocols.raft.protocol.VoteRequest;
+import io.atomix.protocols.raft.protocol.VoteResponse;
+import io.atomix.protocols.raft.protocol.TransferRequest;
+import io.atomix.protocols.raft.protocol.TransferResponse;
+import io.atomix.protocols.raft.protocol.AppendRequest;
+import io.atomix.protocols.raft.protocol.AppendResponse;
+import io.atomix.protocols.raft.protocol.ResetRequest;
+import io.atomix.protocols.raft.protocol.PublishRequest;
+import io.atomix.protocols.raft.protocol.HeartbeatResponse;
+import io.atomix.protocols.raft.protocol.HeartbeatRequest;
+import io.atomix.utils.net.Address;
+import io.atomix.utils.serializer.Serializer;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * Raft server messaging protocol between Raft Servers for each server in the 
cluster.
+ */
+public class RaftServerMessagingProtocol extends RaftMessagingProtocol
+    implements RaftServerProtocol {
+  public RaftServerMessagingProtocol(MessagingService messagingService,
+                                     Serializer serializer,
+                                     Function<MemberId, Address> 
addressProvider) {
+    super(messagingService, serializer, addressProvider);
+  }
+
+  @Override
+  public CompletableFuture<OpenSessionResponse> openSession(MemberId memberId,
+                                                            OpenSessionRequest 
request) {
+    return sendAndReceive(memberId, "open-session", request);
+  }
+
+  @Override
+  public CompletableFuture<CloseSessionResponse> closeSession(MemberId 
memberId,
+                                                              
CloseSessionRequest request) {
+    return sendAndReceive(memberId, "close-session", request);
+  }
+
+  @Override
+  public CompletableFuture<KeepAliveResponse> keepAlive(MemberId memberId,
+                                                        KeepAliveRequest 
request) {
+    return sendAndReceive(memberId, "keep-alive", request);
+  }
+
+  @Override
+  public CompletableFuture<QueryResponse> query(MemberId memberId, 
QueryRequest request) {
+    return sendAndReceive(memberId, "query", request);
+  }
+
+  @Override
+  public CompletableFuture<CommandResponse> command(MemberId memberId,
+                                                    CommandRequest request) {
+    return sendAndReceive(memberId, "command", request);
+  }
+
+  @Override
+  public CompletableFuture<MetadataResponse> metadata(MemberId memberId,
+                                                      MetadataRequest request) 
{
+    return sendAndReceive(memberId, "metadata", request);
+  }
+
+  @Override
+  public CompletableFuture<JoinResponse> join(MemberId memberId, JoinRequest 
request) {
+    return sendAndReceive(memberId, "join", request);
+  }
+
+  @Override
+  public CompletableFuture<LeaveResponse> leave(MemberId memberId, 
LeaveRequest request) {
+    return sendAndReceive(memberId, "leave", request);
+  }
+
+  @Override
+  public CompletableFuture<ConfigureResponse> configure(MemberId memberId,
+                                                        ConfigureRequest 
request) {
+    return sendAndReceive(memberId, "configure", request);
+  }
+
+  @Override
+  public CompletableFuture<ReconfigureResponse> reconfigure(MemberId memberId,
+                                                            ReconfigureRequest 
request) {
+    return sendAndReceive(memberId, "reconfigure", request);
+  }
+
+  @Override
+  public CompletableFuture<InstallResponse> install(MemberId memberId, 
InstallRequest request) {
+    return sendAndReceive(memberId, "install", request);
+  }
+
+  @Override
+  public CompletableFuture<TransferResponse> transfer(MemberId memberId,
+                                                      TransferRequest request) 
{
+    return sendAndReceive(memberId, "transfer", request);
+  }
+
+  @Override
+  public CompletableFuture<PollResponse> poll(MemberId memberId, PollRequest 
request) {
+    return sendAndReceive(memberId, "poll", request);
+  }
+
+  @Override
+  public CompletableFuture<VoteResponse> vote(MemberId memberId, VoteRequest 
request) {
+    return sendAndReceive(memberId, "vote", request);
+  }
+
+  @Override
+  public CompletableFuture<AppendResponse> append(MemberId memberId, 
AppendRequest request) {
+    return sendAndReceive(memberId, "append", request);
+  }
+
+  @Override
+  public void publish(MemberId memberId, PublishRequest request) {
+    sendAsync(memberId, String.format("publish-%d", request.session()), 
request);
+  }
+
+  @Override
+  public CompletableFuture<HeartbeatResponse> heartbeat(MemberId memberId,
+                                                        HeartbeatRequest 
request) {
+    return sendAndReceive(memberId, "heartbeat", request);
+  }
+
+  @Override
+  public void registerOpenSessionHandler(Function<OpenSessionRequest,
+      CompletableFuture<OpenSessionResponse>> handler) {
+    registerHandler("open-session", handler);
+  }
+
+  @Override
+  public void unregisterOpenSessionHandler() {
+    unregisterHandler("open-session");
+  }
+
+  @Override
+  public void registerCloseSessionHandler(Function<CloseSessionRequest,
+      CompletableFuture<CloseSessionResponse>> handler) {
+    registerHandler("close-session", handler);
+  }
+
+  @Override
+  public void unregisterCloseSessionHandler() {
+    unregisterHandler("close-session");
+  }
+
+  @Override
+  public void registerKeepAliveHandler(Function<KeepAliveRequest,
+      CompletableFuture<KeepAliveResponse>> handler) {
+    registerHandler("keep-alive", handler);
+  }
+
+  @Override
+  public void unregisterKeepAliveHandler() {
+    unregisterHandler("keep-alive");
+  }
+
+  @Override
+  public void registerQueryHandler(Function<QueryRequest,
+      CompletableFuture<QueryResponse>> handler) {
+    registerHandler("query", handler);
+  }
+
+  @Override
+  public void unregisterQueryHandler() {
+    unregisterHandler("query");
+  }
+
+  @Override
+  public void registerCommandHandler(Function<CommandRequest,
+      CompletableFuture<CommandResponse>> handler) {
+    registerHandler("command", handler);
+  }
+
+  @Override
+  public void unregisterCommandHandler() {
+    unregisterHandler("command");
+  }
+
+  @Override
+  public void registerMetadataHandler(Function<MetadataRequest,
+      CompletableFuture<MetadataResponse>> handler) {
+    registerHandler("metadata", handler);
+  }
+
+  @Override
+  public void unregisterMetadataHandler() {
+    unregisterHandler("metadata");
+  }
+
+  @Override
+  public void registerJoinHandler(Function<JoinRequest,
+      CompletableFuture<JoinResponse>> handler) {
+    registerHandler("join", handler);
+  }
+
+  @Override
+  public void unregisterJoinHandler() {
+    unregisterHandler("join");
+  }
+
+  @Override
+  public void registerLeaveHandler(Function<LeaveRequest,
+      CompletableFuture<LeaveResponse>> handler) {
+    registerHandler("leave", handler);
+  }
+
+  @Override
+  public void unregisterLeaveHandler() {
+    unregisterHandler("leave");
+  }
+
+  @Override
+  public void registerConfigureHandler(Function<ConfigureRequest,
+      CompletableFuture<ConfigureResponse>> handler) {
+    registerHandler("configure", handler);
+  }
+
+  @Override
+  public void unregisterConfigureHandler() {
+    unregisterHandler("configure");
+  }
+
+  @Override
+  public void registerReconfigureHandler(Function<ReconfigureRequest,
+      CompletableFuture<ReconfigureResponse>> handler) {
+    registerHandler("reconfigure", handler);
+  }
+
+  @Override
+  public void unregisterReconfigureHandler() {
+    unregisterHandler("reconfigure");
+  }
+
+  @Override
+  public void registerInstallHandler(Function<InstallRequest,
+      CompletableFuture<InstallResponse>> handler) {
+    registerHandler("install", handler);
+  }
+
+  @Override
+  public void unregisterInstallHandler() {
+    unregisterHandler("install");
+  }
+
+  @Override
+  public void registerTransferHandler(Function<TransferRequest,
+      CompletableFuture<TransferResponse>> handler) {
+    registerHandler("transfer", handler);
+  }
+
+  @Override
+  public void unregisterTransferHandler() {
+    unregisterHandler("transfer");
+  }
+
+  @Override
+  public void registerPollHandler(Function<PollRequest,
+      CompletableFuture<PollResponse>> handler) {
+    registerHandler("poll", handler);
+  }
+
+  @Override
+  public void unregisterPollHandler() {
+    unregisterHandler("poll");
+  }
+
+  @Override
+  public void registerVoteHandler(Function<VoteRequest,
+      CompletableFuture<VoteResponse>> handler) {
+    registerHandler("vote", handler);
+  }
+
+  @Override
+  public void unregisterVoteHandler() {
+    unregisterHandler("vote");
+  }
+
+  @Override
+  public void registerAppendHandler(Function<AppendRequest,
+      CompletableFuture<AppendResponse>> handler) {
+    registerHandler("append", handler);
+  }
+
+  @Override
+  public void unregisterAppendHandler() {
+    unregisterHandler("append");
+  }
+
+  @Override
+  public void registerResetListener(SessionId sessionId,
+                                    Consumer<ResetRequest> listener, Executor 
executor) {
+    messagingService.registerHandler(String.format("reset-%d", 
sessionId.id()), (e, p) -> {
+      listener.accept(serializer.decode(p));
+    }, executor);
+  }
+
+  @Override
+  public void unregisterResetListener(SessionId sessionId) {
+    messagingService.unregisterHandler(String.format("reset-%d", 
sessionId.id()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index 2b2f3b6..9881003 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -640,6 +640,31 @@ public class ZeppelinConfiguration extends 
XMLConfiguration {
     return getRelativeDir(ConfVars.ZEPPELIN_SEARCH_TEMP_PATH);
   }
 
+  public String getClusterAddress() {
+    return getString(ConfVars.ZEPPELIN_CLUSTER_ADDR);
+  }
+
+  public void setClusterAddress(String clusterAddr) {
+    properties.put(ConfVars.ZEPPELIN_CLUSTER_ADDR.getVarName(), clusterAddr);
+  }
+
+  public boolean isClusterMode() {
+    String clusterAddr = getString(ConfVars.ZEPPELIN_CLUSTER_ADDR);
+    if (StringUtils.isEmpty(clusterAddr)) {
+      return false;
+    }
+
+    return true;
+  }
+
+  public int getClusterHeartbeatInterval() {
+    return getInt(ConfVars.ZEPPELIN_CLUSTER_HEARTBEAT_INTERVAL);
+  }
+
+  public int getClusterHeartbeatTimeout() {
+    return getInt(ConfVars.ZEPPELIN_CLUSTER_HEARTBEAT_TIMEOUT);
+  }
+
   public Map<String, String> dumpConfigurations(Predicate<String> predicate) {
     Map<String, String> properties = new HashMap<>();
 
@@ -782,6 +807,10 @@ public class ZeppelinConfiguration extends 
XMLConfiguration {
 
     ZEPPELIN_OWNER_ROLE("zeppelin.notebook.default.owner.username", ""),
 
+    ZEPPELIN_CLUSTER_ADDR("zeppelin.cluster.addr", ""),
+    ZEPPELIN_CLUSTER_HEARTBEAT_INTERVAL("zeppelin.cluster.heartbeat.interval", 
3000),
+    ZEPPELIN_CLUSTER_HEARTBEAT_TIMEOUT("zeppelin.cluster.heartbeat.timeout", 
9000),
+
     ZEPPELIN_NOTEBOOK_GIT_REMOTE_URL("zeppelin.notebook.git.remote.url", ""),
     
ZEPPELIN_NOTEBOOK_GIT_REMOTE_USERNAME("zeppelin.notebook.git.remote.username", 
"token"),
     
ZEPPELIN_NOTEBOOK_GIT_REMOTE_ACCESS_TOKEN("zeppelin.notebook.git.remote.access-token",
 ""),

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-zengine/pom.xml
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/pom.xml b/zeppelin-zengine/pom.xml
index 9f0e13b..8e67dc1 100644
--- a/zeppelin-zengine/pom.xml
+++ b/zeppelin-zengine/pom.xml
@@ -37,7 +37,7 @@
   <properties>
     <!--library versions-->
     <hadoop.version>2.7.3</hadoop.version>
-    <commons.lang3.version>3.4</commons.lang3.version>
+    <commons.lang3.version>3.7</commons.lang3.version>
     <jackrabbit.webdav.version>1.5.2</jackrabbit.webdav.version>
     <quartz.scheduler.version>2.2.1</quartz.scheduler.version>
     <lucene.version>5.3.1</lucene.version>
@@ -128,12 +128,6 @@
     </dependency>
 
     <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-      <version>20.0</version>
-    </dependency>
-
-    <dependency>
       <groupId>org.apache.lucene</groupId>
       <artifactId>lucene-core</artifactId>
       <version>${lucene.version}</version>
@@ -258,6 +252,9 @@
           <systemProperties>
             <java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
           </systemProperties>
+          <environmentVariables>
+            <ZEPPELIN_ZENGINE_TEST>1</ZEPPELIN_ZENGINE_TEST>
+          </environmentVariables>
         </configuration>
       </plugin>
 
@@ -357,87 +354,103 @@
               <groupId>com.google.guava</groupId>
               <artifactId>guava</artifactId>
             </exclusion>
-          </exclusions>
-        </dependency>
-
-        <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-common</artifactId>
-          <version>${hadoop.version}</version>
-          <classifier>tests</classifier>
-          <scope>test</scope>
-          <exclusions>
-            <exclusion>
-              <groupId>com.sun.jersey</groupId>
-              <artifactId>jersey-core</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.sun.jersey</groupId>
-              <artifactId>jersey-json</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.sun.jersey</groupId>
-              <artifactId>jersey-client</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.sun.jersey</groupId>
-              <artifactId>jersey-server</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>javax.servlet</groupId>
-              <artifactId>servlet-api</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.apache.avro</groupId>
-              <artifactId>avro</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.apache.jackrabbit</groupId>
-              <artifactId>jackrabbit-webdav</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>io.netty</groupId>
-              <artifactId>netty</artifactId>
-            </exclusion>
             <exclusion>
-              <groupId>commons-httpclient</groupId>
-              <artifactId>commons-httpclient</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.eclipse.jgit</groupId>
-              <artifactId>org.eclipse.jgit</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.jcraft</groupId>
-              <artifactId>jsch</artifactId>
+              <groupId>com.google.code.findbugs</groupId>
+              <artifactId>jsr305</artifactId>
             </exclusion>
             <exclusion>
               <groupId>org.apache.commons</groupId>
-              <artifactId>commons-compress</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>xml-apis</groupId>
-              <artifactId>xml-apis</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>xerces</groupId>
-              <artifactId>xercesImpl</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.codehaus.jackson</groupId>
-              <artifactId>jackson-mapper-asl</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.codehaus.jackson</groupId>
-              <artifactId>jackson-core-asl</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.google.guava</groupId>
-              <artifactId>guava</artifactId>
+              <artifactId>commons-math3</artifactId>
             </exclusion>
           </exclusions>
         </dependency>
 
+          <dependency>
+              <groupId>org.apache.hadoop</groupId>
+              <artifactId>hadoop-common</artifactId>
+              <version>${hadoop.version}</version>
+              <classifier>tests</classifier>
+              <scope>test</scope>
+              <exclusions>
+                  <exclusion>
+                      <groupId>com.sun.jersey</groupId>
+                      <artifactId>jersey-core</artifactId>
+                  </exclusion>
+                  <exclusion>
+                      <groupId>com.sun.jersey</groupId>
+                      <artifactId>jersey-json</artifactId>
+                  </exclusion>
+                  <exclusion>
+                      <groupId>com.sun.jersey</groupId>
+                      <artifactId>jersey-client</artifactId>
+                  </exclusion>
+                  <exclusion>
+                      <groupId>com.sun.jersey</groupId>
+                      <artifactId>jersey-server</artifactId>
+                  </exclusion>
+                  <exclusion>
+                      <groupId>javax.servlet</groupId>
+                      <artifactId>servlet-api</artifactId>
+                  </exclusion>
+                  <exclusion>
+                      <groupId>org.apache.avro</groupId>
+                      <artifactId>avro</artifactId>
+                  </exclusion>
+                  <exclusion>
+                      <groupId>org.apache.jackrabbit</groupId>
+                      <artifactId>jackrabbit-webdav</artifactId>
+                  </exclusion>
+                  <exclusion>
+                      <groupId>io.netty</groupId>
+                      <artifactId>netty</artifactId>
+                  </exclusion>
+                  <exclusion>
+                      <groupId>commons-httpclient</groupId>
+                      <artifactId>commons-httpclient</artifactId>
+                  </exclusion>
+                  <exclusion>
+                      <groupId>org.eclipse.jgit</groupId>
+                      <artifactId>org.eclipse.jgit</artifactId>
+                  </exclusion>
+                  <exclusion>
+                      <groupId>com.jcraft</groupId>
+                      <artifactId>jsch</artifactId>
+                  </exclusion>
+                  <exclusion>
+                      <groupId>org.apache.commons</groupId>
+                      <artifactId>commons-compress</artifactId>
+                  </exclusion>
+                  <exclusion>
+                      <groupId>xml-apis</groupId>
+                      <artifactId>xml-apis</artifactId>
+                  </exclusion>
+                  <exclusion>
+                      <groupId>xerces</groupId>
+                      <artifactId>xercesImpl</artifactId>
+                  </exclusion>
+                  <exclusion>
+                      <groupId>org.codehaus.jackson</groupId>
+                      <artifactId>jackson-mapper-asl</artifactId>
+                  </exclusion>
+                  <exclusion>
+                      <groupId>org.codehaus.jackson</groupId>
+                      <artifactId>jackson-core-asl</artifactId>
+                  </exclusion>
+                  <exclusion>
+                      <groupId>com.google.guava</groupId>
+                      <artifactId>guava</artifactId>
+                  </exclusion>
+                <exclusion>
+                  <groupId>com.google.code.findbugs</groupId>
+                  <artifactId>jsr305</artifactId>
+                </exclusion>
+                <exclusion>
+                  <groupId>org.apache.commons</groupId>
+                  <artifactId>commons-math3</artifactId>
+                </exclusion>
+              </exclusions>
+          </dependency>
+
         <dependency>
           <groupId>org.apache.hadoop</groupId>
           <artifactId>hadoop-hdfs</artifactId>
@@ -588,29 +601,10 @@
           <version>${project.version}</version>
           <scope>test</scope>
           <exclusions>
+            <!-- It is fine to exclude zeppelin-python, because it is only 
used at runtime when launching interpreter -->
             <exclusion>
-              <groupId>com.google.protobuf</groupId>
-              <artifactId>protobuf-java</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.google.protobuf</groupId>
-              <artifactId>protobuf-java-util</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.google.guava</groupId>
-              <artifactId>guava</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.google.errorprone</groupId>
-              <artifactId>error_prone_annotations</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>io.grpc</groupId>
-              <artifactId>grpc-context</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.google.api.grpc</groupId>
-              <artifactId>proto-google-common-protos</artifactId>
+              <groupId>org.apache.zeppelin</groupId>
+              <artifactId>zeppelin-python</artifactId>
             </exclusion>
           </exclusions>
         </dependency>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
index ee9f15c..d900031 100644
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
@@ -25,6 +25,8 @@ import org.apache.zeppelin.interpreter.InterpreterResult.Code;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.apache.zeppelin.resource.Resource;
 import org.apache.zeppelin.resource.ResourcePool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Properties;
@@ -32,6 +34,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 public class MockInterpreterResourcePool extends Interpreter {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MockInterpreterResourcePool.class);
+
   AtomicInteger numWatch = new AtomicInteger(0);
 
   public MockInterpreterResourcePool(Properties property) {
@@ -86,11 +90,14 @@ public class MockInterpreterResourcePool extends 
Interpreter {
       ret = resourcePool.getAll();
     } else if (cmd.equals("invoke")) {
       Resource resource = resourcePool.get(noteId, paragraphId, name);
+      LOGGER.info("Resource: " + resource);
       if (stmt.length >=4) {
         Resource res = resource.invokeMethod(value, null, null, stmt[3]);
+        LOGGER.info("After invokeMethod: " + resource);
         ret = res.get();
       } else {
         ret = resource.invokeMethod(value, null, null);
+        LOGGER.info("After invokeMethod: " + ret);
       }
     }
 
@@ -119,7 +126,7 @@ public class MockInterpreterResourcePool extends 
Interpreter {
 
   @Override
   public List<InterpreterCompletion> completion(String buf, int cursor,
-      InterpreterContext interpreterContext) {
+                                                InterpreterContext 
interpreterContext) {
     return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-zengine/src/test/resources/commons-logging-1.1.1.jar
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/resources/commons-logging-1.1.1.jar 
b/zeppelin-zengine/src/test/resources/commons-logging-1.1.1.jar
new file mode 100644
index 0000000..1deef14
Binary files /dev/null and 
b/zeppelin-zengine/src/test/resources/commons-logging-1.1.1.jar differ

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-zengine/src/test/resources/gson-2.2.jar
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/resources/gson-2.2.jar 
b/zeppelin-zengine/src/test/resources/gson-2.2.jar
new file mode 100644
index 0000000..e0576b4
Binary files /dev/null and b/zeppelin-zengine/src/test/resources/gson-2.2.jar 
differ

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-zengine/src/test/resources/log4j-1.2.17.jar
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/resources/log4j-1.2.17.jar 
b/zeppelin-zengine/src/test/resources/log4j-1.2.17.jar
new file mode 100644
index 0000000..1d425cf
Binary files /dev/null and 
b/zeppelin-zengine/src/test/resources/log4j-1.2.17.jar differ

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-zengine/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/resources/log4j.properties 
b/zeppelin-zengine/src/test/resources/log4j.properties
index 843d415..fd9771c 100644
--- a/zeppelin-zengine/src/test/resources/log4j.properties
+++ b/zeppelin-zengine/src/test/resources/log4j.properties
@@ -47,4 +47,4 @@ log4j.logger.org.apache.zeppelin.plugin=DEBUG
 log4j.logger.org.apache.zeppelin.spark=DEBUG
 log4j.logger.org.apache.zeppelin.python=DEBUG
 
-log4j.logger.org.quartz.core=DEBUG
+log4j.logger.org.apache.zeppelin.interpreter=DEBUG

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-zengine/src/test/resources/slf4j-api-1.7.10.jar
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/resources/slf4j-api-1.7.10.jar 
b/zeppelin-zengine/src/test/resources/slf4j-api-1.7.10.jar
new file mode 100644
index 0000000..744e9ec
Binary files /dev/null and 
b/zeppelin-zengine/src/test/resources/slf4j-api-1.7.10.jar differ

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-zengine/src/test/resources/slf4j-log4j12-1.7.10.jar
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/resources/slf4j-log4j12-1.7.10.jar 
b/zeppelin-zengine/src/test/resources/slf4j-log4j12-1.7.10.jar
new file mode 100644
index 0000000..957b2b1
Binary files /dev/null and 
b/zeppelin-zengine/src/test/resources/slf4j-log4j12-1.7.10.jar differ

Reply via email to