Repository: zeppelin Updated Branches: refs/heads/master 0c374c021 -> f28e96317
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftServerProtocol.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftServerProtocol.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftServerProtocol.java new file mode 100644 index 0000000..c28ae87 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftServerProtocol.java @@ -0,0 +1,527 @@ +/* + * Copyright 2017-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.cluster.protocol; + +import com.google.common.collect.Maps; +import io.atomix.cluster.MemberId; +import io.atomix.primitive.session.SessionId; +import io.atomix.protocols.raft.protocol.RaftServerProtocol; +import io.atomix.protocols.raft.protocol.OpenSessionRequest; +import io.atomix.protocols.raft.protocol.OpenSessionResponse; +import io.atomix.protocols.raft.protocol.CloseSessionRequest; +import io.atomix.protocols.raft.protocol.CloseSessionResponse; +import io.atomix.protocols.raft.protocol.KeepAliveRequest; +import io.atomix.protocols.raft.protocol.KeepAliveResponse; +import io.atomix.protocols.raft.protocol.QueryRequest; +import io.atomix.protocols.raft.protocol.QueryResponse; +import io.atomix.protocols.raft.protocol.CommandRequest; +import io.atomix.protocols.raft.protocol.CommandResponse; +import io.atomix.protocols.raft.protocol.MetadataRequest; +import io.atomix.protocols.raft.protocol.MetadataResponse; +import io.atomix.protocols.raft.protocol.JoinRequest; +import io.atomix.protocols.raft.protocol.JoinResponse; +import io.atomix.protocols.raft.protocol.LeaveRequest; +import io.atomix.protocols.raft.protocol.LeaveResponse; +import io.atomix.protocols.raft.protocol.ConfigureRequest; +import io.atomix.protocols.raft.protocol.ConfigureResponse; +import io.atomix.protocols.raft.protocol.ReconfigureRequest; +import io.atomix.protocols.raft.protocol.ReconfigureResponse; +import io.atomix.protocols.raft.protocol.InstallRequest; +import io.atomix.protocols.raft.protocol.InstallResponse; +import io.atomix.protocols.raft.protocol.PollRequest; +import io.atomix.protocols.raft.protocol.PollResponse; +import io.atomix.protocols.raft.protocol.VoteRequest; +import io.atomix.protocols.raft.protocol.VoteResponse; +import io.atomix.protocols.raft.protocol.TransferRequest; +import io.atomix.protocols.raft.protocol.TransferResponse; +import io.atomix.protocols.raft.protocol.AppendRequest; +import io.atomix.protocols.raft.protocol.AppendResponse; +import io.atomix.protocols.raft.protocol.ResetRequest; +import io.atomix.protocols.raft.protocol.PublishRequest; +import io.atomix.protocols.raft.protocol.HeartbeatResponse; +import io.atomix.protocols.raft.protocol.HeartbeatRequest; + +import io.atomix.utils.concurrent.Futures; +import io.atomix.utils.serializer.Serializer; + +import java.net.ConnectException; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * Cluster server protocol. + */ +public class LocalRaftServerProtocol extends LocalRaftProtocol implements RaftServerProtocol { + private Function<OpenSessionRequest, CompletableFuture<OpenSessionResponse>> openSessionHandler; + private Function<CloseSessionRequest, CompletableFuture<CloseSessionResponse>> + closeSessionHandler; + private Function<KeepAliveRequest, CompletableFuture<KeepAliveResponse>> keepAliveHandler; + private Function<QueryRequest, CompletableFuture<QueryResponse>> queryHandler; + private Function<CommandRequest, CompletableFuture<CommandResponse>> commandHandler; + private Function<MetadataRequest, CompletableFuture<MetadataResponse>> metadataHandler; + private Function<JoinRequest, CompletableFuture<JoinResponse>> joinHandler; + private Function<LeaveRequest, CompletableFuture<LeaveResponse>> leaveHandler; + private Function<ConfigureRequest, CompletableFuture<ConfigureResponse>> configureHandler; + private Function<ReconfigureRequest, CompletableFuture<ReconfigureResponse>> reconfigureHandler; + private Function<InstallRequest, CompletableFuture<InstallResponse>> installHandler; + private Function<PollRequest, CompletableFuture<PollResponse>> pollHandler; + private Function<VoteRequest, CompletableFuture<VoteResponse>> voteHandler; + private Function<TransferRequest, CompletableFuture<TransferResponse>> transferHandler; + private Function<AppendRequest, CompletableFuture<AppendResponse>> appendHandler; + private final Map<Long, Consumer<ResetRequest>> resetListeners = Maps.newConcurrentMap(); + + public LocalRaftServerProtocol(MemberId memberId, Serializer serializer, + Map<MemberId, LocalRaftServerProtocol> servers, + Map<MemberId, LocalRaftClientProtocol> clients) { + super(serializer, servers, clients); + servers.put(memberId, this); + } + + private CompletableFuture<LocalRaftServerProtocol> getServer(MemberId memberId) { + LocalRaftServerProtocol server = server(memberId); + if (server != null) { + return Futures.completedFuture(server); + } else { + return Futures.exceptionalFuture(new ConnectException()); + } + } + + private CompletableFuture<LocalRaftClientProtocol> getClient(MemberId memberId) { + LocalRaftClientProtocol client = client(memberId); + if (client != null) { + return Futures.completedFuture(client); + } else { + return Futures.exceptionalFuture(new ConnectException()); + } + } + + @Override + public CompletableFuture<OpenSessionResponse> openSession(MemberId memberId, + OpenSessionRequest request) { + return getServer(memberId).thenCompose(listener -> + listener.openSession(encode(request))).thenApply(this::decode); + } + + @Override + public CompletableFuture<CloseSessionResponse> closeSession(MemberId memberId, + CloseSessionRequest request) { + return getServer(memberId).thenCompose(listener -> + listener.closeSession(encode(request))).thenApply(this::decode); + } + + @Override + public CompletableFuture<KeepAliveResponse> keepAlive(MemberId memberId, + KeepAliveRequest request) { + return getServer(memberId).thenCompose(listener -> + listener.keepAlive(encode(request))).thenApply(this::decode); + } + + @Override + public CompletableFuture<QueryResponse> query(MemberId memberId, QueryRequest request) { + return getServer(memberId).thenCompose(listener -> + listener.query(encode(request))).thenApply(this::decode); + } + + @Override + public CompletableFuture<CommandResponse> command(MemberId memberId, + CommandRequest request) { + return getServer(memberId).thenCompose(listener -> + listener.command(encode(request))).thenApply(this::decode); + } + + @Override + public CompletableFuture<MetadataResponse> metadata(MemberId memberId, + MetadataRequest request) { + return getServer(memberId).thenCompose(listener -> + listener.metadata(encode(request))).thenApply(this::decode); + } + + @Override + public CompletableFuture<JoinResponse> join(MemberId memberId, JoinRequest request) { + return getServer(memberId).thenCompose(listener -> + listener.join(encode(request))).thenApply(this::decode); + } + + @Override + public CompletableFuture<LeaveResponse> leave(MemberId memberId, LeaveRequest request) { + return getServer(memberId).thenCompose(listener -> + listener.leave(encode(request))).thenApply(this::decode); + } + + @Override + public CompletableFuture<ConfigureResponse> configure(MemberId memberId, + ConfigureRequest request) { + return getServer(memberId).thenCompose(listener -> + listener.configure(encode(request))).thenApply(this::decode); + } + + @Override + public CompletableFuture<ReconfigureResponse> reconfigure(MemberId memberId, + ReconfigureRequest request) { + return getServer(memberId).thenCompose(listener -> + listener.reconfigure(encode(request))).thenApply(this::decode); + } + + @Override + public CompletableFuture<InstallResponse> install(MemberId memberId, InstallRequest request) { + return getServer(memberId).thenCompose(listener -> + listener.install(encode(request))).thenApply(this::decode); + } + + @Override + public CompletableFuture<TransferResponse> transfer(MemberId memberId, TransferRequest request) { + return getServer(memberId).thenCompose(listener -> + listener.install(encode(request))).thenApply(this::decode); + } + + @Override + public CompletableFuture<PollResponse> poll(MemberId memberId, PollRequest request) { + return getServer(memberId).thenCompose(listener -> + listener.poll(encode(request))).thenApply(this::decode); + } + + @Override + public CompletableFuture<VoteResponse> vote(MemberId memberId, VoteRequest request) { + return getServer(memberId).thenCompose(listener -> + listener.vote(encode(request))).thenApply(this::decode); + } + + @Override + public CompletableFuture<AppendResponse> append(MemberId memberId, AppendRequest request) { + return getServer(memberId).thenCompose(listener -> + listener.append(encode(request))).thenApply(this::decode); + } + + @Override + public void publish(MemberId memberId, PublishRequest request) { + getClient(memberId).thenAccept(protocol -> + protocol.publish(request.session(), encode(request))); + } + + @Override + public CompletableFuture<HeartbeatResponse> heartbeat(MemberId memberId, + HeartbeatRequest request) { + return getClient(memberId).thenCompose(protocol -> + protocol.heartbeat(encode(request))).thenApply(this::decode); + } + + CompletableFuture<byte[]> openSession(byte[] request) { + if (openSessionHandler != null) { + return openSessionHandler.apply(decode(request)).thenApply(this::encode); + } else { + return Futures.exceptionalFuture(new ConnectException()); + } + } + + @Override + public void registerOpenSessionHandler(Function<OpenSessionRequest, + CompletableFuture<OpenSessionResponse>> handler) { + this.openSessionHandler = handler; + } + + @Override + public void unregisterOpenSessionHandler() { + this.openSessionHandler = null; + } + + CompletableFuture<byte[]> closeSession(byte[] request) { + if (closeSessionHandler != null) { + return closeSessionHandler.apply(decode(request)).thenApply(this::encode); + } else { + return Futures.exceptionalFuture(new ConnectException()); + } + } + + @Override + public void registerCloseSessionHandler(Function<CloseSessionRequest, + CompletableFuture<CloseSessionResponse>> handler) { + this.closeSessionHandler = handler; + } + + @Override + public void unregisterCloseSessionHandler() { + this.closeSessionHandler = null; + } + + CompletableFuture<byte[]> keepAlive(byte[] request) { + if (keepAliveHandler != null) { + return keepAliveHandler.apply(decode(request)).thenApply(this::encode); + } else { + return Futures.exceptionalFuture(new ConnectException()); + } + } + + @Override + public void registerKeepAliveHandler(Function<KeepAliveRequest, + CompletableFuture<KeepAliveResponse>> handler) { + this.keepAliveHandler = handler; + } + + @Override + public void unregisterKeepAliveHandler() { + this.keepAliveHandler = null; + } + + CompletableFuture<byte[]> query(byte[] request) { + if (queryHandler != null) { + return queryHandler.apply(decode(request)).thenApply(this::encode); + } else { + return Futures.exceptionalFuture(new ConnectException()); + } + } + + @Override + public void registerQueryHandler(Function<QueryRequest, + CompletableFuture<QueryResponse>> handler) { + this.queryHandler = handler; + } + + @Override + public void unregisterQueryHandler() { + this.queryHandler = null; + } + + CompletableFuture<byte[]> command(byte[] request) { + if (commandHandler != null) { + return commandHandler.apply(decode(request)).thenApply(this::encode); + } else { + return Futures.exceptionalFuture(new ConnectException()); + } + } + + @Override + public void registerCommandHandler(Function<CommandRequest, + CompletableFuture<CommandResponse>> handler) { + this.commandHandler = handler; + } + + @Override + public void unregisterCommandHandler() { + this.commandHandler = null; + } + + CompletableFuture<byte[]> metadata(byte[] request) { + if (metadataHandler != null) { + return metadataHandler.apply(decode(request)).thenApply(this::encode); + } else { + return Futures.exceptionalFuture(new ConnectException()); + } + } + + @Override + public void registerMetadataHandler(Function<MetadataRequest, + CompletableFuture<MetadataResponse>> handler) { + this.metadataHandler = handler; + } + + @Override + public void unregisterMetadataHandler() { + this.metadataHandler = null; + } + + CompletableFuture<byte[]> join(byte[] request) { + if (joinHandler != null) { + return joinHandler.apply(decode(request)).thenApply(this::encode); + } else { + return Futures.exceptionalFuture(new ConnectException()); + } + } + + @Override + public void registerJoinHandler(Function<JoinRequest, + CompletableFuture<JoinResponse>> handler) { + this.joinHandler = handler; + } + + @Override + public void unregisterJoinHandler() { + this.joinHandler = null; + } + + CompletableFuture<byte[]> leave(byte[] request) { + if (leaveHandler != null) { + return leaveHandler.apply(decode(request)).thenApply(this::encode); + } else { + return Futures.exceptionalFuture(new ConnectException()); + } + } + + @Override + public void registerLeaveHandler(Function<LeaveRequest, + CompletableFuture<LeaveResponse>> handler) { + this.leaveHandler = handler; + } + + @Override + public void unregisterLeaveHandler() { + this.leaveHandler = null; + } + + CompletableFuture<byte[]> configure(byte[] request) { + if (configureHandler != null) { + return configureHandler.apply(decode(request)).thenApply(this::encode); + } else { + return Futures.exceptionalFuture(new ConnectException()); + } + } + + @Override + public void registerConfigureHandler(Function<ConfigureRequest, + CompletableFuture<ConfigureResponse>> handler) { + this.configureHandler = handler; + } + + @Override + public void unregisterConfigureHandler() { + this.configureHandler = null; + } + + CompletableFuture<byte[]> reconfigure(byte[] request) { + if (reconfigureHandler != null) { + return reconfigureHandler.apply(decode(request)).thenApply(this::encode); + } else { + return Futures.exceptionalFuture(new ConnectException()); + } + } + + @Override + public void registerReconfigureHandler(Function<ReconfigureRequest, + CompletableFuture<ReconfigureResponse>> handler) { + this.reconfigureHandler = handler; + } + + @Override + public void unregisterReconfigureHandler() { + this.reconfigureHandler = null; + } + + CompletableFuture<byte[]> install(byte[] request) { + if (installHandler != null) { + return installHandler.apply(decode(request)).thenApply(this::encode); + } else { + return Futures.exceptionalFuture(new ConnectException()); + } + } + + @Override + public void registerInstallHandler(Function<InstallRequest, + CompletableFuture<InstallResponse>> handler) { + this.installHandler = handler; + } + + @Override + public void unregisterInstallHandler() { + this.installHandler = null; + } + + CompletableFuture<byte[]> poll(byte[] request) { + if (pollHandler != null) { + return pollHandler.apply(decode(request)).thenApply(this::encode); + } else { + return Futures.exceptionalFuture(new ConnectException()); + } + } + + @Override + public void registerPollHandler(Function<PollRequest, + CompletableFuture<PollResponse>> handler) { + this.pollHandler = handler; + } + + @Override + public void unregisterPollHandler() { + this.pollHandler = null; + } + + CompletableFuture<byte[]> vote(byte[] request) { + if (voteHandler != null) { + return voteHandler.apply(decode(request)).thenApply(this::encode); + } else { + return Futures.exceptionalFuture(new ConnectException()); + } + } + + @Override + public void registerVoteHandler(Function<VoteRequest, + CompletableFuture<VoteResponse>> handler) { + this.voteHandler = handler; + } + + @Override + public void unregisterVoteHandler() { + this.voteHandler = null; + } + + @Override + public void registerTransferHandler(Function<TransferRequest, + CompletableFuture<TransferResponse>> handler) { + this.transferHandler = handler; + } + + @Override + public void unregisterTransferHandler() { + this.transferHandler = null; + } + + CompletableFuture<byte[]> transfer(byte[] request) { + if (transferHandler != null) { + return transferHandler.apply(decode(request)).thenApply(this::encode); + } else { + return Futures.exceptionalFuture(new ConnectException()); + } + } + + CompletableFuture<byte[]> append(byte[] request) { + if (appendHandler != null) { + return appendHandler.apply(decode(request)).thenApply(this::encode); + } else { + return Futures.exceptionalFuture(new ConnectException()); + } + } + + @Override + public void registerAppendHandler(Function<AppendRequest, + CompletableFuture<AppendResponse>> handler) { + this.appendHandler = handler; + } + + @Override + public void unregisterAppendHandler() { + this.appendHandler = null; + } + + void reset(long sessionId, byte[] request) { + Consumer<ResetRequest> listener = resetListeners.get(sessionId); + if (listener != null) { + listener.accept(decode(request)); + } + } + + @Override + public void registerResetListener(SessionId sessionId, + Consumer<ResetRequest> listener, Executor executor) { + resetListeners.put(sessionId.id(), request -> executor.execute(() + -> listener.accept(request))); + } + + @Override + public void unregisterResetListener(SessionId sessionId) { + resetListeners.remove(sessionId.id()); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/RaftClientMessagingProtocol.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/RaftClientMessagingProtocol.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/RaftClientMessagingProtocol.java new file mode 100644 index 0000000..8d2b425 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/RaftClientMessagingProtocol.java @@ -0,0 +1,123 @@ +/* + * Copyright 2017-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.cluster.protocol; + +import io.atomix.cluster.MemberId; +import io.atomix.cluster.messaging.MessagingService; +import io.atomix.primitive.session.SessionId; +import io.atomix.protocols.raft.protocol.OpenSessionRequest; +import io.atomix.protocols.raft.protocol.OpenSessionResponse; +import io.atomix.protocols.raft.protocol.RaftClientProtocol; +import io.atomix.protocols.raft.protocol.HeartbeatRequest; +import io.atomix.protocols.raft.protocol.PublishRequest; +import io.atomix.protocols.raft.protocol.HeartbeatResponse; +import io.atomix.protocols.raft.protocol.CloseSessionResponse; +import io.atomix.protocols.raft.protocol.CloseSessionRequest; +import io.atomix.protocols.raft.protocol.KeepAliveResponse; +import io.atomix.protocols.raft.protocol.KeepAliveRequest; +import io.atomix.protocols.raft.protocol.QueryResponse; +import io.atomix.protocols.raft.protocol.QueryRequest; +import io.atomix.protocols.raft.protocol.CommandResponse; +import io.atomix.protocols.raft.protocol.CommandRequest; +import io.atomix.protocols.raft.protocol.MetadataResponse; +import io.atomix.protocols.raft.protocol.MetadataRequest; +import io.atomix.protocols.raft.protocol.ResetRequest; +import io.atomix.utils.net.Address; +import io.atomix.utils.serializer.Serializer; + +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * Raft client messaging service protocol. + */ +public class RaftClientMessagingProtocol extends RaftMessagingProtocol + implements RaftClientProtocol { + public RaftClientMessagingProtocol(MessagingService messagingService, + Serializer serializer, + Function<MemberId, Address> addressProvider) { + super(messagingService, serializer, addressProvider); + } + + @Override + public CompletableFuture<OpenSessionResponse> openSession(MemberId memberId, + OpenSessionRequest request) { + return sendAndReceive(memberId, "open-session", request); + } + + @Override + public CompletableFuture<CloseSessionResponse> closeSession(MemberId memberId, + CloseSessionRequest request) { + return sendAndReceive(memberId, "close-session", request); + } + + @Override + public CompletableFuture<KeepAliveResponse> keepAlive(MemberId memberId, + KeepAliveRequest request) { + return sendAndReceive(memberId, "keep-alive", request); + } + + @Override + public CompletableFuture<QueryResponse> query(MemberId memberId, QueryRequest request) { + return sendAndReceive(memberId, "query", request); + } + + @Override + public CompletableFuture<CommandResponse> command(MemberId memberId, + CommandRequest request) { + return sendAndReceive(memberId, "command", request); + } + + @Override + public CompletableFuture<MetadataResponse> metadata(MemberId memberId, + MetadataRequest request) { + return sendAndReceive(memberId, "metadata", request); + } + + @Override + public void registerHeartbeatHandler(Function<HeartbeatRequest, + CompletableFuture<HeartbeatResponse>> handler) { + registerHandler("heartbeat", handler); + } + + @Override + public void unregisterHeartbeatHandler() { + unregisterHandler("heartbeat"); + } + + @Override + public void reset(Set<MemberId> members, ResetRequest request) { + for (MemberId memberId : members) { + sendAsync(memberId, String.format("reset-%d", request.session()), request); + } + } + + @Override + public void registerPublishListener(SessionId sessionId, Consumer<PublishRequest> listener, + Executor executor) { + messagingService.registerHandler(String.format("publish-%d", sessionId.id()), (e, p) -> { + listener.accept(serializer.decode(p)); + }, executor); + } + + @Override + public void unregisterPublishListener(SessionId sessionId) { + messagingService.unregisterHandler(String.format("publish-%d", sessionId.id())); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/RaftMessagingProtocol.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/RaftMessagingProtocol.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/RaftMessagingProtocol.java new file mode 100644 index 0000000..d67dbb8 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/RaftMessagingProtocol.java @@ -0,0 +1,83 @@ +/* + * Copyright 2017-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.cluster.protocol; + +import io.atomix.cluster.MemberId; +import io.atomix.cluster.messaging.MessagingService; +import io.atomix.utils.concurrent.Futures; +import io.atomix.utils.net.Address; +import io.atomix.utils.serializer.Serializer; + +import java.net.ConnectException; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +/** + * Messaging service based Raft protocol. + */ +public abstract class RaftMessagingProtocol { + protected final MessagingService messagingService; + protected final Serializer serializer; + private final Function<MemberId, Address> addressProvider; + + public RaftMessagingProtocol(MessagingService messagingService, + Serializer serializer, + Function<MemberId, Address> addressProvider) { + this.messagingService = messagingService; + this.serializer = serializer; + this.addressProvider = addressProvider; + } + + protected Address address(MemberId memberId) { + return addressProvider.apply(memberId); + } + + protected <T, U> CompletableFuture<U> sendAndReceive(MemberId memberId, + String type, T request) { + Address address = address(memberId); + if (address == null) { + return Futures.exceptionalFuture(new ConnectException()); + } + return messagingService.sendAndReceive(address, type, serializer.encode(request)) + .thenApply(serializer::decode); + } + + protected CompletableFuture<Void> sendAsync(MemberId memberId, String type, Object request) { + Address address = address(memberId); + if (address != null) { + return messagingService.sendAsync(address(memberId), type, serializer.encode(request)); + } + return CompletableFuture.completedFuture(null); + } + + protected <T, U> void registerHandler(String type, Function<T, CompletableFuture<U>> handler) { + messagingService.registerHandler(type, (e, p) -> { + CompletableFuture<byte[]> future = new CompletableFuture<>(); + handler.apply(serializer.decode(p)).whenComplete((result, error) -> { + if (error == null) { + future.complete(serializer.encode(result)); + } else { + future.completeExceptionally(error); + } + }); + return future; + }); + } + + protected void unregisterHandler(String type) { + messagingService.unregisterHandler(type); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/RaftServerMessagingProtocol.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/RaftServerMessagingProtocol.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/RaftServerMessagingProtocol.java new file mode 100644 index 0000000..bae52bf --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/RaftServerMessagingProtocol.java @@ -0,0 +1,346 @@ +/* + * Copyright 2017-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.cluster.protocol; + +import io.atomix.cluster.MemberId; +import io.atomix.cluster.messaging.MessagingService; +import io.atomix.primitive.session.SessionId; +import io.atomix.protocols.raft.protocol.RaftServerProtocol; +import io.atomix.protocols.raft.protocol.OpenSessionRequest; +import io.atomix.protocols.raft.protocol.OpenSessionResponse; +import io.atomix.protocols.raft.protocol.CloseSessionRequest; +import io.atomix.protocols.raft.protocol.CloseSessionResponse; +import io.atomix.protocols.raft.protocol.KeepAliveRequest; +import io.atomix.protocols.raft.protocol.KeepAliveResponse; +import io.atomix.protocols.raft.protocol.QueryRequest; +import io.atomix.protocols.raft.protocol.QueryResponse; +import io.atomix.protocols.raft.protocol.CommandRequest; +import io.atomix.protocols.raft.protocol.CommandResponse; +import io.atomix.protocols.raft.protocol.MetadataRequest; +import io.atomix.protocols.raft.protocol.MetadataResponse; +import io.atomix.protocols.raft.protocol.JoinRequest; +import io.atomix.protocols.raft.protocol.JoinResponse; +import io.atomix.protocols.raft.protocol.LeaveRequest; +import io.atomix.protocols.raft.protocol.LeaveResponse; +import io.atomix.protocols.raft.protocol.ConfigureRequest; +import io.atomix.protocols.raft.protocol.ConfigureResponse; +import io.atomix.protocols.raft.protocol.ReconfigureRequest; +import io.atomix.protocols.raft.protocol.ReconfigureResponse; +import io.atomix.protocols.raft.protocol.InstallRequest; +import io.atomix.protocols.raft.protocol.InstallResponse; +import io.atomix.protocols.raft.protocol.PollRequest; +import io.atomix.protocols.raft.protocol.PollResponse; +import io.atomix.protocols.raft.protocol.VoteRequest; +import io.atomix.protocols.raft.protocol.VoteResponse; +import io.atomix.protocols.raft.protocol.TransferRequest; +import io.atomix.protocols.raft.protocol.TransferResponse; +import io.atomix.protocols.raft.protocol.AppendRequest; +import io.atomix.protocols.raft.protocol.AppendResponse; +import io.atomix.protocols.raft.protocol.ResetRequest; +import io.atomix.protocols.raft.protocol.PublishRequest; +import io.atomix.protocols.raft.protocol.HeartbeatResponse; +import io.atomix.protocols.raft.protocol.HeartbeatRequest; +import io.atomix.utils.net.Address; +import io.atomix.utils.serializer.Serializer; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * Raft server messaging protocol between Raft Servers for each server in the cluster. + */ +public class RaftServerMessagingProtocol extends RaftMessagingProtocol + implements RaftServerProtocol { + public RaftServerMessagingProtocol(MessagingService messagingService, + Serializer serializer, + Function<MemberId, Address> addressProvider) { + super(messagingService, serializer, addressProvider); + } + + @Override + public CompletableFuture<OpenSessionResponse> openSession(MemberId memberId, + OpenSessionRequest request) { + return sendAndReceive(memberId, "open-session", request); + } + + @Override + public CompletableFuture<CloseSessionResponse> closeSession(MemberId memberId, + CloseSessionRequest request) { + return sendAndReceive(memberId, "close-session", request); + } + + @Override + public CompletableFuture<KeepAliveResponse> keepAlive(MemberId memberId, + KeepAliveRequest request) { + return sendAndReceive(memberId, "keep-alive", request); + } + + @Override + public CompletableFuture<QueryResponse> query(MemberId memberId, QueryRequest request) { + return sendAndReceive(memberId, "query", request); + } + + @Override + public CompletableFuture<CommandResponse> command(MemberId memberId, + CommandRequest request) { + return sendAndReceive(memberId, "command", request); + } + + @Override + public CompletableFuture<MetadataResponse> metadata(MemberId memberId, + MetadataRequest request) { + return sendAndReceive(memberId, "metadata", request); + } + + @Override + public CompletableFuture<JoinResponse> join(MemberId memberId, JoinRequest request) { + return sendAndReceive(memberId, "join", request); + } + + @Override + public CompletableFuture<LeaveResponse> leave(MemberId memberId, LeaveRequest request) { + return sendAndReceive(memberId, "leave", request); + } + + @Override + public CompletableFuture<ConfigureResponse> configure(MemberId memberId, + ConfigureRequest request) { + return sendAndReceive(memberId, "configure", request); + } + + @Override + public CompletableFuture<ReconfigureResponse> reconfigure(MemberId memberId, + ReconfigureRequest request) { + return sendAndReceive(memberId, "reconfigure", request); + } + + @Override + public CompletableFuture<InstallResponse> install(MemberId memberId, InstallRequest request) { + return sendAndReceive(memberId, "install", request); + } + + @Override + public CompletableFuture<TransferResponse> transfer(MemberId memberId, + TransferRequest request) { + return sendAndReceive(memberId, "transfer", request); + } + + @Override + public CompletableFuture<PollResponse> poll(MemberId memberId, PollRequest request) { + return sendAndReceive(memberId, "poll", request); + } + + @Override + public CompletableFuture<VoteResponse> vote(MemberId memberId, VoteRequest request) { + return sendAndReceive(memberId, "vote", request); + } + + @Override + public CompletableFuture<AppendResponse> append(MemberId memberId, AppendRequest request) { + return sendAndReceive(memberId, "append", request); + } + + @Override + public void publish(MemberId memberId, PublishRequest request) { + sendAsync(memberId, String.format("publish-%d", request.session()), request); + } + + @Override + public CompletableFuture<HeartbeatResponse> heartbeat(MemberId memberId, + HeartbeatRequest request) { + return sendAndReceive(memberId, "heartbeat", request); + } + + @Override + public void registerOpenSessionHandler(Function<OpenSessionRequest, + CompletableFuture<OpenSessionResponse>> handler) { + registerHandler("open-session", handler); + } + + @Override + public void unregisterOpenSessionHandler() { + unregisterHandler("open-session"); + } + + @Override + public void registerCloseSessionHandler(Function<CloseSessionRequest, + CompletableFuture<CloseSessionResponse>> handler) { + registerHandler("close-session", handler); + } + + @Override + public void unregisterCloseSessionHandler() { + unregisterHandler("close-session"); + } + + @Override + public void registerKeepAliveHandler(Function<KeepAliveRequest, + CompletableFuture<KeepAliveResponse>> handler) { + registerHandler("keep-alive", handler); + } + + @Override + public void unregisterKeepAliveHandler() { + unregisterHandler("keep-alive"); + } + + @Override + public void registerQueryHandler(Function<QueryRequest, + CompletableFuture<QueryResponse>> handler) { + registerHandler("query", handler); + } + + @Override + public void unregisterQueryHandler() { + unregisterHandler("query"); + } + + @Override + public void registerCommandHandler(Function<CommandRequest, + CompletableFuture<CommandResponse>> handler) { + registerHandler("command", handler); + } + + @Override + public void unregisterCommandHandler() { + unregisterHandler("command"); + } + + @Override + public void registerMetadataHandler(Function<MetadataRequest, + CompletableFuture<MetadataResponse>> handler) { + registerHandler("metadata", handler); + } + + @Override + public void unregisterMetadataHandler() { + unregisterHandler("metadata"); + } + + @Override + public void registerJoinHandler(Function<JoinRequest, + CompletableFuture<JoinResponse>> handler) { + registerHandler("join", handler); + } + + @Override + public void unregisterJoinHandler() { + unregisterHandler("join"); + } + + @Override + public void registerLeaveHandler(Function<LeaveRequest, + CompletableFuture<LeaveResponse>> handler) { + registerHandler("leave", handler); + } + + @Override + public void unregisterLeaveHandler() { + unregisterHandler("leave"); + } + + @Override + public void registerConfigureHandler(Function<ConfigureRequest, + CompletableFuture<ConfigureResponse>> handler) { + registerHandler("configure", handler); + } + + @Override + public void unregisterConfigureHandler() { + unregisterHandler("configure"); + } + + @Override + public void registerReconfigureHandler(Function<ReconfigureRequest, + CompletableFuture<ReconfigureResponse>> handler) { + registerHandler("reconfigure", handler); + } + + @Override + public void unregisterReconfigureHandler() { + unregisterHandler("reconfigure"); + } + + @Override + public void registerInstallHandler(Function<InstallRequest, + CompletableFuture<InstallResponse>> handler) { + registerHandler("install", handler); + } + + @Override + public void unregisterInstallHandler() { + unregisterHandler("install"); + } + + @Override + public void registerTransferHandler(Function<TransferRequest, + CompletableFuture<TransferResponse>> handler) { + registerHandler("transfer", handler); + } + + @Override + public void unregisterTransferHandler() { + unregisterHandler("transfer"); + } + + @Override + public void registerPollHandler(Function<PollRequest, + CompletableFuture<PollResponse>> handler) { + registerHandler("poll", handler); + } + + @Override + public void unregisterPollHandler() { + unregisterHandler("poll"); + } + + @Override + public void registerVoteHandler(Function<VoteRequest, + CompletableFuture<VoteResponse>> handler) { + registerHandler("vote", handler); + } + + @Override + public void unregisterVoteHandler() { + unregisterHandler("vote"); + } + + @Override + public void registerAppendHandler(Function<AppendRequest, + CompletableFuture<AppendResponse>> handler) { + registerHandler("append", handler); + } + + @Override + public void unregisterAppendHandler() { + unregisterHandler("append"); + } + + @Override + public void registerResetListener(SessionId sessionId, + Consumer<ResetRequest> listener, Executor executor) { + messagingService.registerHandler(String.format("reset-%d", sessionId.id()), (e, p) -> { + listener.accept(serializer.decode(p)); + }, executor); + } + + @Override + public void unregisterResetListener(SessionId sessionId) { + messagingService.unregisterHandler(String.format("reset-%d", sessionId.id())); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 2b2f3b6..9881003 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -640,6 +640,31 @@ public class ZeppelinConfiguration extends XMLConfiguration { return getRelativeDir(ConfVars.ZEPPELIN_SEARCH_TEMP_PATH); } + public String getClusterAddress() { + return getString(ConfVars.ZEPPELIN_CLUSTER_ADDR); + } + + public void setClusterAddress(String clusterAddr) { + properties.put(ConfVars.ZEPPELIN_CLUSTER_ADDR.getVarName(), clusterAddr); + } + + public boolean isClusterMode() { + String clusterAddr = getString(ConfVars.ZEPPELIN_CLUSTER_ADDR); + if (StringUtils.isEmpty(clusterAddr)) { + return false; + } + + return true; + } + + public int getClusterHeartbeatInterval() { + return getInt(ConfVars.ZEPPELIN_CLUSTER_HEARTBEAT_INTERVAL); + } + + public int getClusterHeartbeatTimeout() { + return getInt(ConfVars.ZEPPELIN_CLUSTER_HEARTBEAT_TIMEOUT); + } + public Map<String, String> dumpConfigurations(Predicate<String> predicate) { Map<String, String> properties = new HashMap<>(); @@ -782,6 +807,10 @@ public class ZeppelinConfiguration extends XMLConfiguration { ZEPPELIN_OWNER_ROLE("zeppelin.notebook.default.owner.username", ""), + ZEPPELIN_CLUSTER_ADDR("zeppelin.cluster.addr", ""), + ZEPPELIN_CLUSTER_HEARTBEAT_INTERVAL("zeppelin.cluster.heartbeat.interval", 3000), + ZEPPELIN_CLUSTER_HEARTBEAT_TIMEOUT("zeppelin.cluster.heartbeat.timeout", 9000), + ZEPPELIN_NOTEBOOK_GIT_REMOTE_URL("zeppelin.notebook.git.remote.url", ""), ZEPPELIN_NOTEBOOK_GIT_REMOTE_USERNAME("zeppelin.notebook.git.remote.username", "token"), ZEPPELIN_NOTEBOOK_GIT_REMOTE_ACCESS_TOKEN("zeppelin.notebook.git.remote.access-token", ""), http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-zengine/pom.xml ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/pom.xml b/zeppelin-zengine/pom.xml index 9f0e13b..8e67dc1 100644 --- a/zeppelin-zengine/pom.xml +++ b/zeppelin-zengine/pom.xml @@ -37,7 +37,7 @@ <properties> <!--library versions--> <hadoop.version>2.7.3</hadoop.version> - <commons.lang3.version>3.4</commons.lang3.version> + <commons.lang3.version>3.7</commons.lang3.version> <jackrabbit.webdav.version>1.5.2</jackrabbit.webdav.version> <quartz.scheduler.version>2.2.1</quartz.scheduler.version> <lucene.version>5.3.1</lucene.version> @@ -128,12 +128,6 @@ </dependency> <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <version>20.0</version> - </dependency> - - <dependency> <groupId>org.apache.lucene</groupId> <artifactId>lucene-core</artifactId> <version>${lucene.version}</version> @@ -258,6 +252,9 @@ <systemProperties> <java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir> </systemProperties> + <environmentVariables> + <ZEPPELIN_ZENGINE_TEST>1</ZEPPELIN_ZENGINE_TEST> + </environmentVariables> </configuration> </plugin> @@ -357,87 +354,103 @@ <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <version>${hadoop.version}</version> - <classifier>tests</classifier> - <scope>test</scope> - <exclusions> - <exclusion> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-core</artifactId> - </exclusion> - <exclusion> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-json</artifactId> - </exclusion> - <exclusion> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-client</artifactId> - </exclusion> - <exclusion> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-server</artifactId> - </exclusion> - <exclusion> - <groupId>javax.servlet</groupId> - <artifactId>servlet-api</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.avro</groupId> - <artifactId>avro</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.jackrabbit</groupId> - <artifactId>jackrabbit-webdav</artifactId> - </exclusion> - <exclusion> - <groupId>io.netty</groupId> - <artifactId>netty</artifactId> - </exclusion> <exclusion> - <groupId>commons-httpclient</groupId> - <artifactId>commons-httpclient</artifactId> - </exclusion> - <exclusion> - <groupId>org.eclipse.jgit</groupId> - <artifactId>org.eclipse.jgit</artifactId> - </exclusion> - <exclusion> - <groupId>com.jcraft</groupId> - <artifactId>jsch</artifactId> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> </exclusion> <exclusion> <groupId>org.apache.commons</groupId> - <artifactId>commons-compress</artifactId> - </exclusion> - <exclusion> - <groupId>xml-apis</groupId> - <artifactId>xml-apis</artifactId> - </exclusion> - <exclusion> - <groupId>xerces</groupId> - <artifactId>xercesImpl</artifactId> - </exclusion> - <exclusion> - <groupId>org.codehaus.jackson</groupId> - <artifactId>jackson-mapper-asl</artifactId> - </exclusion> - <exclusion> - <groupId>org.codehaus.jackson</groupId> - <artifactId>jackson-core-asl</artifactId> - </exclusion> - <exclusion> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> + <artifactId>commons-math3</artifactId> </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + <classifier>tests</classifier> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-core</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-json</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-client</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-server</artifactId> + </exclusion> + <exclusion> + <groupId>javax.servlet</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.jackrabbit</groupId> + <artifactId>jackrabbit-webdav</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + <exclusion> + <groupId>commons-httpclient</groupId> + <artifactId>commons-httpclient</artifactId> + </exclusion> + <exclusion> + <groupId>org.eclipse.jgit</groupId> + <artifactId>org.eclipse.jgit</artifactId> + </exclusion> + <exclusion> + <groupId>com.jcraft</groupId> + <artifactId>jsch</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.commons</groupId> + <artifactId>commons-compress</artifactId> + </exclusion> + <exclusion> + <groupId>xml-apis</groupId> + <artifactId>xml-apis</artifactId> + </exclusion> + <exclusion> + <groupId>xerces</groupId> + <artifactId>xercesImpl</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-core-asl</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.commons</groupId> + <artifactId>commons-math3</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> @@ -588,29 +601,10 @@ <version>${project.version}</version> <scope>test</scope> <exclusions> + <!-- It is fine to exclude zeppelin-python, because it is only used at runtime when launching interpreter --> <exclusion> - <groupId>com.google.protobuf</groupId> - <artifactId>protobuf-java</artifactId> - </exclusion> - <exclusion> - <groupId>com.google.protobuf</groupId> - <artifactId>protobuf-java-util</artifactId> - </exclusion> - <exclusion> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </exclusion> - <exclusion> - <groupId>com.google.errorprone</groupId> - <artifactId>error_prone_annotations</artifactId> - </exclusion> - <exclusion> - <groupId>io.grpc</groupId> - <artifactId>grpc-context</artifactId> - </exclusion> - <exclusion> - <groupId>com.google.api.grpc</groupId> - <artifactId>proto-google-common-protos</artifactId> + <groupId>org.apache.zeppelin</groupId> + <artifactId>zeppelin-python</artifactId> </exclusion> </exclusions> </dependency> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java index ee9f15c..d900031 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java @@ -25,6 +25,8 @@ import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.resource.Resource; import org.apache.zeppelin.resource.ResourcePool; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.List; import java.util.Properties; @@ -32,6 +34,8 @@ import java.util.concurrent.atomic.AtomicInteger; public class MockInterpreterResourcePool extends Interpreter { + private static final Logger LOGGER = LoggerFactory.getLogger(MockInterpreterResourcePool.class); + AtomicInteger numWatch = new AtomicInteger(0); public MockInterpreterResourcePool(Properties property) { @@ -86,11 +90,14 @@ public class MockInterpreterResourcePool extends Interpreter { ret = resourcePool.getAll(); } else if (cmd.equals("invoke")) { Resource resource = resourcePool.get(noteId, paragraphId, name); + LOGGER.info("Resource: " + resource); if (stmt.length >=4) { Resource res = resource.invokeMethod(value, null, null, stmt[3]); + LOGGER.info("After invokeMethod: " + resource); ret = res.get(); } else { ret = resource.invokeMethod(value, null, null); + LOGGER.info("After invokeMethod: " + ret); } } @@ -119,7 +126,7 @@ public class MockInterpreterResourcePool extends Interpreter { @Override public List<InterpreterCompletion> completion(String buf, int cursor, - InterpreterContext interpreterContext) { + InterpreterContext interpreterContext) { return null; } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-zengine/src/test/resources/commons-logging-1.1.1.jar ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/resources/commons-logging-1.1.1.jar b/zeppelin-zengine/src/test/resources/commons-logging-1.1.1.jar new file mode 100644 index 0000000..1deef14 Binary files /dev/null and b/zeppelin-zengine/src/test/resources/commons-logging-1.1.1.jar differ http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-zengine/src/test/resources/gson-2.2.jar ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/resources/gson-2.2.jar b/zeppelin-zengine/src/test/resources/gson-2.2.jar new file mode 100644 index 0000000..e0576b4 Binary files /dev/null and b/zeppelin-zengine/src/test/resources/gson-2.2.jar differ http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-zengine/src/test/resources/log4j-1.2.17.jar ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/resources/log4j-1.2.17.jar b/zeppelin-zengine/src/test/resources/log4j-1.2.17.jar new file mode 100644 index 0000000..1d425cf Binary files /dev/null and b/zeppelin-zengine/src/test/resources/log4j-1.2.17.jar differ http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-zengine/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/resources/log4j.properties b/zeppelin-zengine/src/test/resources/log4j.properties index 843d415..fd9771c 100644 --- a/zeppelin-zengine/src/test/resources/log4j.properties +++ b/zeppelin-zengine/src/test/resources/log4j.properties @@ -47,4 +47,4 @@ log4j.logger.org.apache.zeppelin.plugin=DEBUG log4j.logger.org.apache.zeppelin.spark=DEBUG log4j.logger.org.apache.zeppelin.python=DEBUG -log4j.logger.org.quartz.core=DEBUG +log4j.logger.org.apache.zeppelin.interpreter=DEBUG http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-zengine/src/test/resources/slf4j-api-1.7.10.jar ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/resources/slf4j-api-1.7.10.jar b/zeppelin-zengine/src/test/resources/slf4j-api-1.7.10.jar new file mode 100644 index 0000000..744e9ec Binary files /dev/null and b/zeppelin-zengine/src/test/resources/slf4j-api-1.7.10.jar differ http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-zengine/src/test/resources/slf4j-log4j12-1.7.10.jar ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/resources/slf4j-log4j12-1.7.10.jar b/zeppelin-zengine/src/test/resources/slf4j-log4j12-1.7.10.jar new file mode 100644 index 0000000..957b2b1 Binary files /dev/null and b/zeppelin-zengine/src/test/resources/slf4j-log4j12-1.7.10.jar differ