[ZEPPELIN-3610] Cluster Raft module design ### What is this PR for?
By using the Raft protocol, multiple Zeppelin-Server groups are built into a Zeppelin cluster, the cluster State Machine is maintained through the Raft protocol, and the services in the cluster are agreed upon. The Zeppelin-Server and Zeppelin-Interperter services and processes are stored in the Cluster MetaData. Metadata information; ### What type of PR is it? [Feature] ### Todos * [x] add raft algorithm atomix jar * [x] add cluster state machine * [x] add state machine query command * [x] add state machine delete command * [x] add state machine put command * [x] Isolate the netty JAR package introduced by atomix ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-3610 ### How should this be tested? [CI pass](https://travis-ci.org/liuxunorg/zeppelin/builds/418742522) ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? Yes Author: liuxunorg <33611...@qq.com> Author: Jeff Zhang <zjf...@gmail.com> Closes #3183 from liuxunorg/ZEPPELIN-3610 and squashes the following commits: c899b151f [liuxunorg] uncomment maven-dependency-plugin. af728bd2c [liuxunorg] add more doc to explain this class's responsibility. d268ee72e [Jeff Zhang] fix test (#2) 97f17acac [liuxunorg] Cluster Raft module design Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/f28e9631 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/f28e9631 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/f28e9631 Branch: refs/heads/master Commit: f28e9631737da554a0a12323f6c00ccae3ec6282 Parents: 0c374c0 Author: liuxunorg <33611...@qq.com> Authored: Thu Sep 27 11:46:50 2018 +0800 Committer: Jeff Zhang <zjf...@apache.org> Committed: Fri Sep 28 08:50:35 2018 +0800 ---------------------------------------------------------------------- angular/pom.xml | 17 + bin/interpreter.sh | 5 +- livy/pom.xml | 7 - markdown/pom.xml | 5 + python/pom.xml | 28 +- .../zeppelin/python/IPythonInterpreterTest.java | 2 +- python/src/test/resources/log4j.properties | 2 +- shell/pom.xml | 5 + spark/interpreter/pom.xml | 5 + zeppelin-integration/pom.xml | 2 +- zeppelin-interpreter/pom.xml | 53 ++ .../cluster/BroadcastServiceAdapter.java | 44 ++ .../apache/zeppelin/cluster/ClusterManager.java | 520 ++++++++++++++++++ .../zeppelin/cluster/ClusterPrimitiveType.java | 57 ++ .../zeppelin/cluster/ClusterStateMachine.java | 179 +++++++ .../ZeppelinClusterMembershipEventListener.java | 50 ++ .../zeppelin/cluster/meta/ClusterMeta.java | 144 +++++ .../cluster/meta/ClusterMetaEntity.java | 57 ++ .../cluster/meta/ClusterMetaOperation.java | 26 + .../zeppelin/cluster/meta/ClusterMetaType.java | 25 + .../protocol/LocalRaftClientProtocol.java | 163 ++++++ .../cluster/protocol/LocalRaftProtocol.java | 58 ++ .../protocol/LocalRaftProtocolFactory.java | 57 ++ .../protocol/LocalRaftServerProtocol.java | 527 +++++++++++++++++++ .../protocol/RaftClientMessagingProtocol.java | 123 +++++ .../cluster/protocol/RaftMessagingProtocol.java | 83 +++ .../protocol/RaftServerMessagingProtocol.java | 346 ++++++++++++ .../zeppelin/conf/ZeppelinConfiguration.java | 29 + zeppelin-zengine/pom.xml | 198 ++++--- .../mock/MockInterpreterResourcePool.java | 9 +- .../test/resources/commons-logging-1.1.1.jar | Bin 0 -> 60686 bytes .../src/test/resources/gson-2.2.jar | Bin 0 -> 189612 bytes .../src/test/resources/log4j-1.2.17.jar | Bin 0 -> 489884 bytes .../src/test/resources/log4j.properties | 2 +- .../src/test/resources/slf4j-api-1.7.10.jar | Bin 0 -> 32119 bytes .../src/test/resources/slf4j-log4j12-1.7.10.jar | Bin 0 -> 8866 bytes 36 files changed, 2706 insertions(+), 122 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/angular/pom.xml ---------------------------------------------------------------------- diff --git a/angular/pom.xml b/angular/pom.xml index df4e83c..021139d 100644 --- a/angular/pom.xml +++ b/angular/pom.xml @@ -54,6 +54,23 @@ </dependency> <dependency> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>3.4</version> + </dependency> + + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + <version>2.4</version> + </dependency> + + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/bin/interpreter.sh ---------------------------------------------------------------------- diff --git a/bin/interpreter.sh b/bin/interpreter.sh index c895018..4606ed3 100755 --- a/bin/interpreter.sh +++ b/bin/interpreter.sh @@ -79,9 +79,12 @@ fi # add test classes for unittest if [[ -d "${ZEPPELIN_HOME}/zeppelin-zengine/target/test-classes" ]]; then ZEPPELIN_INTP_CLASSPATH+=":${ZEPPELIN_HOME}/zeppelin-zengine/target/test-classes" + if [[ -n "${ZEPPELIN_ZENGINE_TEST}" ]]; then + addJarInDirForIntp "${ZEPPELIN_HOME}/zeppelin-zengine/target/test-classes" + fi fi -addJarInDirForIntp "${ZEPPELIN_HOME}/zeppelin-interpreter/target/lib" +addJarInDirForIntp "${ZEPPELIN_HOME}/zeppelin-interpreter-api/target" addJarInDirForIntp "${ZEPPELIN_HOME}/lib/interpreter" addJarInDirForIntp "${INTERPRETER_DIR}" http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/livy/pom.xml ---------------------------------------------------------------------- diff --git a/livy/pom.xml b/livy/pom.xml index 3fc58c4..5e648d9 100644 --- a/livy/pom.xml +++ b/livy/pom.xml @@ -58,13 +58,6 @@ </dependency> <dependency> - <groupId>${project.groupId}</groupId> - <artifactId>zeppelin-interpreter</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <dependency> <groupId>org.apache.livy</groupId> <artifactId>livy-integration-test</artifactId> <version>${livy.version}</version> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/markdown/pom.xml ---------------------------------------------------------------------- diff --git a/markdown/pom.xml b/markdown/pom.xml index d5fe9a3..55535a6 100644 --- a/markdown/pom.xml +++ b/markdown/pom.xml @@ -81,6 +81,11 @@ </dependency> <dependency> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </dependency> + + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/python/pom.xml ---------------------------------------------------------------------- diff --git a/python/pom.xml b/python/pom.xml index 5c7059d..8fed0c9 100644 --- a/python/pom.xml +++ b/python/pom.xml @@ -35,18 +35,12 @@ <properties> <interpreter.name>python</interpreter.name> <python.py4j.version>0.10.7</python.py4j.version> - <grpc.version>1.4.0</grpc.version> + <grpc.version>1.15.0</grpc.version> <interpreter.jar.name>python-interpreter-with-py4j</interpreter.jar.name> </properties> <dependencies> <dependency> - <groupId>${project.groupId}</groupId> - <artifactId>zeppelin-interpreter</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-exec</artifactId> <version>1.3</version> @@ -69,6 +63,26 @@ </dependency> <dependency> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </dependency> + + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + </dependency> + + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + + <dependency> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + </dependency> + + <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-netty</artifactId> <version>${grpc.version}</version> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java ---------------------------------------------------------------------- diff --git a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java index 4b6bfdb..28e6270 100644 --- a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java +++ b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java @@ -214,7 +214,7 @@ public class IPythonInterpreterTest extends BasePythonInterpreterTest { List<InterpreterResultMessage> interpreterResultMessages = context.out.toInterpreterResultMessage(); assertEquals(1, interpreterResultMessages.size()); - assertTrue(interpreterResultMessages.get(0).getData().contains("exceeds maximum: 3000")); + assertTrue(interpreterResultMessages.get(0).getData().contains("exceeds maximum size 3000")); // next call continue work result = interpreter.interpret("print(1)", context); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/python/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/python/src/test/resources/log4j.properties b/python/src/test/resources/log4j.properties index 8993ff2..2933a64 100644 --- a/python/src/test/resources/log4j.properties +++ b/python/src/test/resources/log4j.properties @@ -24,4 +24,4 @@ log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n -log4j.logger.org.apache.zeppelin.python=DEBUG +log4j.logger.org.apache.zeppelin.python=INFO http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/shell/pom.xml ---------------------------------------------------------------------- diff --git a/shell/pom.xml b/shell/pom.xml index c702b9f..98b8a83 100644 --- a/shell/pom.xml +++ b/shell/pom.xml @@ -59,6 +59,11 @@ </dependency> <dependency> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </dependency> + + <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-exec</artifactId> <version>${commons.exec.version}</version> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/spark/interpreter/pom.xml ---------------------------------------------------------------------- diff --git a/spark/interpreter/pom.xml b/spark/interpreter/pom.xml index a7dbce6..daf801f 100644 --- a/spark/interpreter/pom.xml +++ b/spark/interpreter/pom.xml @@ -293,6 +293,11 @@ </dependency> <dependency> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </dependency> + + <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-integration/pom.xml ---------------------------------------------------------------------- diff --git a/zeppelin-integration/pom.xml b/zeppelin-integration/pom.xml index 94ed3a2..25463b3 100644 --- a/zeppelin-integration/pom.xml +++ b/zeppelin-integration/pom.xml @@ -44,7 +44,7 @@ <!--test library versions--> <selenium.java.version>3.8.1</selenium.java.version> - <commons.lang3.version>3.4</commons.lang3.version> + <commons.lang3.version>3.7</commons.lang3.version> <!--plugin library versions--> <plugin.failsafe.version>2.16</plugin.failsafe.version> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-interpreter/pom.xml ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/pom.xml b/zeppelin-interpreter/pom.xml index 38259c7..22a029e 100644 --- a/zeppelin-interpreter/pom.xml +++ b/zeppelin-interpreter/pom.xml @@ -44,12 +44,65 @@ <maven.aeither.provider.version>3.0.3</maven.aeither.provider.version> <wagon.version>1.0</wagon.version> <jline.version>2.12.1</jline.version> + <atomix.version>3.0.0-rc4</atomix.version> + <commons-math3.version>3.1.1</commons-math3.version> + <guava.version>20.0</guava.version> + <commons-lang3.version>3.7</commons-lang3.version> <!--plugin versions--> <plugin.shade.version>2.3</plugin.shade.version> </properties> <dependencies> + <dependency> + <groupId>io.atomix</groupId> + <artifactId>atomix</artifactId> + <version>${atomix.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.commons</groupId> + <artifactId>commons-math3</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>io.atomix</groupId> + <artifactId>atomix-raft</artifactId> + <version>${atomix.version}</version> + </dependency> + + <dependency> + <groupId>io.atomix</groupId> + <artifactId>atomix-primary-backup</artifactId> + <version>${atomix.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-math3</artifactId> + <version>${commons-math3.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>${commons-lang3.version}</version> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + </dependency> <dependency> <groupId>org.apache.thrift</groupId> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/BroadcastServiceAdapter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/BroadcastServiceAdapter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/BroadcastServiceAdapter.java new file mode 100644 index 0000000..34e3b6f --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/BroadcastServiceAdapter.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.cluster; + +import io.atomix.cluster.messaging.BroadcastService; + +import java.util.function.Consumer; + +/** + * Broadcast Service Adapter + * Service for broadcast messaging between nodes. + * The broadcast service is an unreliable broadcast messaging service backed by multicast. + * This service provides no guaranteed regarding reliability or order of messages. + */ +public class BroadcastServiceAdapter implements BroadcastService { + @Override + public void broadcast(String subject, byte[] message) { + + } + + @Override + public void addListener(String subject, Consumer<byte[]> listener) { + + } + + @Override + public void removeListener(String subject, Consumer<byte[]> listener) { + + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java new file mode 100644 index 0000000..683f068 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java @@ -0,0 +1,520 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.cluster; + +import com.google.common.collect.Maps; +import io.atomix.cluster.MemberId; +import io.atomix.cluster.Node; +import io.atomix.cluster.messaging.MessagingService; +import io.atomix.cluster.messaging.impl.NettyMessagingService; +import io.atomix.primitive.operation.OperationType; +import io.atomix.primitive.operation.PrimitiveOperation; +import io.atomix.primitive.operation.impl.DefaultOperationId; +import io.atomix.primitive.partition.PartitionId; +import io.atomix.primitive.service.ServiceConfig; +import io.atomix.primitive.session.SessionClient; +import io.atomix.primitive.session.SessionId; +import io.atomix.protocols.raft.RaftClient; +import io.atomix.protocols.raft.RaftError; +import io.atomix.protocols.raft.ReadConsistency; +import io.atomix.protocols.raft.cluster.RaftMember; +import io.atomix.protocols.raft.cluster.impl.DefaultRaftMember; +import io.atomix.protocols.raft.protocol.CloseSessionRequest; +import io.atomix.protocols.raft.protocol.CloseSessionResponse; +import io.atomix.protocols.raft.protocol.KeepAliveRequest; +import io.atomix.protocols.raft.protocol.KeepAliveResponse; +import io.atomix.protocols.raft.protocol.QueryRequest; +import io.atomix.protocols.raft.protocol.QueryResponse; +import io.atomix.protocols.raft.protocol.CommandRequest; +import io.atomix.protocols.raft.protocol.CommandResponse; +import io.atomix.protocols.raft.protocol.MetadataRequest; +import io.atomix.protocols.raft.protocol.MetadataResponse; +import io.atomix.protocols.raft.protocol.JoinRequest; +import io.atomix.protocols.raft.protocol.JoinResponse; +import io.atomix.protocols.raft.protocol.LeaveRequest; +import io.atomix.protocols.raft.protocol.LeaveResponse; +import io.atomix.protocols.raft.protocol.ConfigureRequest; +import io.atomix.protocols.raft.protocol.ConfigureResponse; +import io.atomix.protocols.raft.protocol.ReconfigureRequest; +import io.atomix.protocols.raft.protocol.ReconfigureResponse; +import io.atomix.protocols.raft.protocol.InstallRequest; +import io.atomix.protocols.raft.protocol.InstallResponse; +import io.atomix.protocols.raft.protocol.PollRequest; +import io.atomix.protocols.raft.protocol.PollResponse; +import io.atomix.protocols.raft.protocol.VoteRequest; +import io.atomix.protocols.raft.protocol.VoteResponse; +import io.atomix.protocols.raft.protocol.AppendRequest; +import io.atomix.protocols.raft.protocol.AppendResponse; +import io.atomix.protocols.raft.protocol.PublishRequest; +import io.atomix.protocols.raft.protocol.ResetRequest; +import io.atomix.protocols.raft.protocol.RaftResponse; +import io.atomix.protocols.raft.storage.log.entry.CloseSessionEntry; +import io.atomix.protocols.raft.storage.log.entry.CommandEntry; +import io.atomix.protocols.raft.storage.log.entry.ConfigurationEntry; +import io.atomix.protocols.raft.storage.log.entry.InitializeEntry; +import io.atomix.protocols.raft.storage.log.entry.KeepAliveEntry; +import io.atomix.protocols.raft.storage.log.entry.MetadataEntry; +import io.atomix.protocols.raft.storage.log.entry.OpenSessionEntry; +import io.atomix.protocols.raft.storage.log.entry.QueryEntry; +import io.atomix.protocols.raft.protocol.OpenSessionRequest; +import io.atomix.protocols.raft.protocol.OpenSessionResponse; +import io.atomix.protocols.raft.protocol.RaftClientProtocol; +import io.atomix.protocols.raft.session.CommunicationStrategy; +import io.atomix.protocols.raft.storage.system.Configuration; +import io.atomix.utils.net.Address; +import io.atomix.utils.serializer.Namespace; +import io.atomix.utils.serializer.Serializer; +import org.apache.commons.lang.StringUtils; +import org.apache.zeppelin.cluster.meta.ClusterMeta; +import org.apache.zeppelin.cluster.meta.ClusterMetaEntity; +import org.apache.zeppelin.cluster.meta.ClusterMetaOperation; +import org.apache.zeppelin.cluster.meta.ClusterMetaType; +import org.apache.zeppelin.cluster.protocol.LocalRaftProtocolFactory; +import org.apache.zeppelin.cluster.protocol.RaftClientMessagingProtocol; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.time.Instant; + +import java.util.Date; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +import static io.atomix.primitive.operation.PrimitiveOperation.operation; +import static org.apache.zeppelin.cluster.meta.ClusterMetaOperation.DELETE_OPERATION; +import static org.apache.zeppelin.cluster.meta.ClusterMetaOperation.PUT_OPERATION; +import static org.apache.zeppelin.cluster.meta.ClusterMetaOperation.GET_OPERATION; + +/** + * The base class for cluster management, including the following implementations + * 1. RaftClient as the raft client + * 2. Threading to provide retry after cluster metadata submission failure + * 3. Cluster monitoring + */ +public abstract class ClusterManager { + private static Logger LOGGER = LoggerFactory.getLogger(ClusterManager.class); + + public final ZeppelinConfiguration zconf = ZeppelinConfiguration.create(); + + protected Collection<Node> clusterNodes = new ArrayList<>(); + + // raft + protected static String ZEPL_CLUSTER_ID = "ZEPL-CLUSTER"; + protected static String ZEPL_CLIENT_ID = "ZEPL-CLIENT"; + + protected int raftServerPort = 0; + + protected RaftClient raftClient = null; + protected SessionClient raftSessionClient = null; + protected Map<MemberId, Address> raftAddressMap = new ConcurrentHashMap<>(); + protected LocalRaftProtocolFactory protocolFactory + = new LocalRaftProtocolFactory(protocolSerializer); + protected List<MessagingService> messagingServices = new ArrayList<>(); + protected List<MemberId> clusterMemberIds = new ArrayList<MemberId>(); + + protected AtomicBoolean running = new AtomicBoolean(true); + + // Write data through the queue to prevent failure due to network exceptions + private ConcurrentLinkedQueue<ClusterMetaEntity> clusterMetaQueue + = new ConcurrentLinkedQueue<>(); + + // zeppelin server host & port + protected String zeplServerHost = ""; + + public ClusterManager() { + try { + zeplServerHost = RemoteInterpreterUtils.findAvailableHostAddress(); + String clusterAddr = zconf.getClusterAddress(); + if (!StringUtils.isEmpty(clusterAddr)) { + String cluster[] = clusterAddr.split(","); + + for (int i = 0; i < cluster.length; i++) { + String[] parts = cluster[i].split(":"); + String clusterHost = parts[0]; + int clusterPort = Integer.valueOf(parts[1]); + if (zeplServerHost.equalsIgnoreCase(clusterHost)) { + raftServerPort = clusterPort; + } + + Node node = Node.builder().withId(cluster[i]) + .withAddress(Address.from(clusterHost, clusterPort)).build(); + clusterNodes.add(node); + raftAddressMap.put(MemberId.from(cluster[i]), Address.from(clusterHost, clusterPort)); + clusterMemberIds.add(MemberId.from(cluster[i])); + } + } + } catch (UnknownHostException e) { + LOGGER.error(e.getMessage()); + } catch (SocketException e) { + LOGGER.error(e.getMessage()); + } + + } + + // Check if the raft environment is initialized + public abstract boolean raftInitialized(); + // Is it a cluster leader + public abstract boolean isClusterLeader(); + + public AtomicBoolean getRunning() { + return running; + } + + private SessionClient createProxy(RaftClient client) { + return client.sessionBuilder(ClusterPrimitiveType.PRIMITIVE_NAME, + ClusterPrimitiveType.INSTANCE, new ServiceConfig()) + .withReadConsistency(ReadConsistency.SEQUENTIAL) + .withCommunicationStrategy(CommunicationStrategy.LEADER) + .build() + .connect() + .join(); + } + + public void start() { + if (!zconf.isClusterMode()) { + return; + } + + // RaftClient Thread + new Thread(new Runnable() { + @Override + public void run() { + LOGGER.info("RaftClientThread run() >>>"); + + int raftClientPort = 0; + try { + raftClientPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); + } catch (IOException e) { + LOGGER.error(e.getMessage()); + } + + MemberId memberId = MemberId.from(ZEPL_CLIENT_ID + zeplServerHost + ":" + raftClientPort); + Address address = Address.from(zeplServerHost, raftClientPort); + raftAddressMap.put(memberId, address); + + MessagingService messagingManager + = NettyMessagingService.builder().withAddress(address).build().start().join(); + RaftClientProtocol protocol = new RaftClientMessagingProtocol( + messagingManager, protocolSerializer, raftAddressMap::get); + + raftClient = RaftClient.builder() + .withMemberId(memberId) + .withPartitionId(PartitionId.from("partition", 1)) + .withProtocol(protocol) + .build(); + + raftClient.connect(clusterMemberIds).join(); + + raftSessionClient = createProxy(raftClient); + + LOGGER.info("RaftClientThread run() <<<"); + } + }).start(); + + // Cluster Meta Consume Thread + new Thread(new Runnable() { + @Override + public void run() { + try { + while (getRunning().get()) { + ClusterMetaEntity metaEntity = clusterMetaQueue.peek(); + if (null != metaEntity) { + // Determine whether the client is connected + int retry = 0; + while (!raftInitialized()) { + retry++; + if (0 == retry % 30) { + LOGGER.error("Raft incomplete initialization! retry[{}]", retry); + } + Thread.sleep(100); + } + boolean success = false; + switch (metaEntity.getOperation()) { + case DELETE_OPERATION: + success = deleteClusterMeta(metaEntity); + break; + case PUT_OPERATION: + success = putClusterMeta(metaEntity); + break; + } + if (true == success) { + // The operation was successfully deleted + clusterMetaQueue.remove(metaEntity); + } else { + LOGGER.error("Cluster Meta Consume faild!"); + } + } else { + Thread.sleep(100); + } + } + } catch (InterruptedException e) { + LOGGER.error(e.getMessage()); + } + } + }).start(); + } + + // cluster shutdown + public void shutdown() { + if (!zconf.isClusterMode()) { + return; + } + + running.set(false); + + try { + if (null != raftSessionClient) { + raftSessionClient.close().get(3, TimeUnit.SECONDS); + } + if (null != raftClient) { + raftClient.close().get(3, TimeUnit.SECONDS); + } + } catch (InterruptedException e) { + LOGGER.error(e.getMessage()); + } catch (ExecutionException e) { + LOGGER.error(e.getMessage()); + } catch (TimeoutException e) { + LOGGER.error(e.getMessage()); + } + } + + public String getClusterName() { + return zeplServerHost + ":" + raftServerPort; + } + + // put metadata into cluster metadata + private boolean putClusterMeta(ClusterMetaEntity entity) { + if (!raftInitialized()) { + LOGGER.error("Raft incomplete initialization!"); + return false; + } + + ClusterMetaType metaType = entity.getMetaType(); + String metaKey = entity.getKey(); + HashMap<String, Object> newMetaValue = entity.getValues(); + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("putClusterMeta {} {}", metaType, metaKey); + } + + // add cluster name + newMetaValue.put(ClusterMeta.SERVER_HOST, zeplServerHost); + newMetaValue.put(ClusterMeta.SERVER_PORT, raftServerPort); + + raftSessionClient.execute(operation(ClusterStateMachine.PUT, + clientSerializer.encode(entity))) + .<Long>thenApply(clientSerializer::decode); + return true; + } + + // put metadata into cluster metadata + public void putClusterMeta(ClusterMetaType type, String key, HashMap<String, Object> values) { + ClusterMetaEntity metaEntity = new ClusterMetaEntity(PUT_OPERATION, type, key, values); + + boolean result = putClusterMeta(metaEntity); + if (false == result) { + LOGGER.warn("putClusterMeta failure, Cache metadata to queue."); + clusterMetaQueue.add(metaEntity); + } + } + + // delete metadata by cluster metadata + private boolean deleteClusterMeta(ClusterMetaEntity entity) { + ClusterMetaType metaType = entity.getMetaType(); + String metaKey = entity.getKey(); + + // Need to pay attention to delete metadata operations + LOGGER.info("deleteClusterMeta {} {}", metaType, metaKey); + + if (!raftInitialized()) { + LOGGER.error("Raft incomplete initialization!"); + return false; + } + + raftSessionClient.execute(operation( + ClusterStateMachine.REMOVE, + clientSerializer.encode(entity))) + .<Long>thenApply(clientSerializer::decode) + .thenAccept(result -> { + LOGGER.info("deleteClusterMeta {}", result); + }); + + return true; + } + + // delete metadata from cluster metadata + public void deleteClusterMeta(ClusterMetaType type, String key) { + ClusterMetaEntity metaEntity = new ClusterMetaEntity(DELETE_OPERATION, type, key, null); + + boolean result = deleteClusterMeta(metaEntity); + if (false == result) { + LOGGER.warn("deleteClusterMeta faild, Cache data to queue."); + clusterMetaQueue.add(metaEntity); + } + } + + // get metadata by cluster metadata + public HashMap<String, HashMap<String, Object>> getClusterMeta( + ClusterMetaType metaType, String metaKey) { + HashMap<String, HashMap<String, Object>> clusterMeta = new HashMap<>(); + if (!raftInitialized()) { + LOGGER.error("Raft incomplete initialization!"); + return clusterMeta; + } + + ClusterMetaEntity entity = new ClusterMetaEntity(GET_OPERATION, metaType, metaKey, null); + + byte[] mateData = null; + try { + mateData = raftSessionClient.execute(operation(ClusterStateMachine.GET, + clientSerializer.encode(entity))).get(3, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.error(e.getMessage()); + } catch (ExecutionException e) { + LOGGER.error(e.getMessage()); + } catch (TimeoutException e) { + LOGGER.error(e.getMessage()); + } + + if (null != mateData) { + clusterMeta = clientSerializer.decode(mateData); + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("getClusterMeta >>> {}", clusterMeta.toString()); + } + + return clusterMeta; + } + + protected static final Serializer protocolSerializer = Serializer.using(Namespace.builder() + .register(OpenSessionRequest.class) + .register(OpenSessionResponse.class) + .register(CloseSessionRequest.class) + .register(CloseSessionResponse.class) + .register(KeepAliveRequest.class) + .register(KeepAliveResponse.class) + .register(QueryRequest.class) + .register(QueryResponse.class) + .register(CommandRequest.class) + .register(CommandResponse.class) + .register(MetadataRequest.class) + .register(MetadataResponse.class) + .register(JoinRequest.class) + .register(JoinResponse.class) + .register(LeaveRequest.class) + .register(LeaveResponse.class) + .register(ConfigureRequest.class) + .register(ConfigureResponse.class) + .register(ReconfigureRequest.class) + .register(ReconfigureResponse.class) + .register(InstallRequest.class) + .register(InstallResponse.class) + .register(PollRequest.class) + .register(PollResponse.class) + .register(VoteRequest.class) + .register(VoteResponse.class) + .register(AppendRequest.class) + .register(AppendResponse.class) + .register(PublishRequest.class) + .register(ResetRequest.class) + .register(RaftResponse.Status.class) + .register(RaftError.class) + .register(RaftError.Type.class) + .register(PrimitiveOperation.class) + .register(ReadConsistency.class) + .register(byte[].class) + .register(long[].class) + .register(CloseSessionEntry.class) + .register(CommandEntry.class) + .register(ConfigurationEntry.class) + .register(InitializeEntry.class) + .register(KeepAliveEntry.class) + .register(MetadataEntry.class) + .register(OpenSessionEntry.class) + .register(QueryEntry.class) + .register(PrimitiveOperation.class) + .register(DefaultOperationId.class) + .register(OperationType.class) + .register(ReadConsistency.class) + .register(ArrayList.class) + .register(HashMap.class) + .register(ClusterMetaEntity.class) + .register(Date.class) + .register(Collections.emptyList().getClass()) + .register(HashSet.class) + .register(DefaultRaftMember.class) + .register(MemberId.class) + .register(SessionId.class) + .register(RaftMember.Type.class) + .register(Instant.class) + .register(Configuration.class) + .build()); + + protected static final Serializer storageSerializer = Serializer.using(Namespace.builder() + .register(CloseSessionEntry.class) + .register(CommandEntry.class) + .register(ConfigurationEntry.class) + .register(InitializeEntry.class) + .register(KeepAliveEntry.class) + .register(MetadataEntry.class) + .register(OpenSessionEntry.class) + .register(QueryEntry.class) + .register(PrimitiveOperation.class) + .register(DefaultOperationId.class) + .register(OperationType.class) + .register(ReadConsistency.class) + .register(ArrayList.class) + .register(ClusterMetaEntity.class) + .register(HashMap.class) + .register(HashSet.class) + .register(Date.class) + .register(DefaultRaftMember.class) + .register(MemberId.class) + .register(RaftMember.Type.class) + .register(Instant.class) + .register(Configuration.class) + .register(byte[].class) + .register(long[].class) + .build()); + + protected static final Serializer clientSerializer = Serializer.using(Namespace.builder() + .register(ReadConsistency.class) + .register(ClusterMetaEntity.class) + .register(ClusterMetaOperation.class) + .register(ClusterMetaType.class) + .register(HashMap.class) + .register(Date.class) + .register(Maps.immutableEntry(new String(), new Object()).getClass()) + .build()); +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterPrimitiveType.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterPrimitiveType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterPrimitiveType.java new file mode 100644 index 0000000..b4802a0 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterPrimitiveType.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.cluster; + +import io.atomix.primitive.PrimitiveBuilder; +import io.atomix.primitive.PrimitiveManagementService; +import io.atomix.primitive.PrimitiveType; +import io.atomix.primitive.config.PrimitiveConfig; +import io.atomix.primitive.service.PrimitiveService; +import io.atomix.primitive.service.ServiceConfig; + +/** + * Cluster primitive type + * Creating a custom distributed primitive is defining the primitive type. + * To create a new type, implement the PrimitiveType interface + */ +public class ClusterPrimitiveType implements PrimitiveType { + public static final ClusterPrimitiveType INSTANCE = new ClusterPrimitiveType(); + + public static final String PRIMITIVE_NAME = "CLUSTER_PRIMITIVE"; + + @Override + public String name() { + return PRIMITIVE_NAME; + } + + @Override + public PrimitiveConfig newConfig() { + throw new UnsupportedOperationException(); + } + + @Override + public PrimitiveBuilder newBuilder(String primitiveName, + PrimitiveConfig config, + PrimitiveManagementService managementService) { + throw new UnsupportedOperationException(); + } + + @Override + public PrimitiveService newService(ServiceConfig config) { + return new ClusterStateMachine(); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java new file mode 100644 index 0000000..460f6ac --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.cluster; + +import com.google.common.collect.Maps; +import io.atomix.primitive.operation.OperationId; +import io.atomix.primitive.service.AbstractPrimitiveService; +import io.atomix.primitive.service.BackupOutput; +import io.atomix.primitive.service.BackupInput; +import io.atomix.primitive.service.Commit; +import io.atomix.primitive.service.ServiceExecutor; +import io.atomix.utils.serializer.Serializer; +import org.apache.zeppelin.cluster.meta.ClusterMeta; +import org.apache.zeppelin.cluster.meta.ClusterMetaEntity; +import org.apache.zeppelin.cluster.meta.ClusterMetaType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** + * Cluster State Machine for Zeppelin + * The cluster state is implemented as a snapshot state machine. + * The state machine stores the service and process metadata information of the cluster. + * Metadata information can be manipulated by put, get, remove, index, and snapshot. + */ +public class ClusterStateMachine extends AbstractPrimitiveService { + private static Logger logger = LoggerFactory.getLogger(ClusterStateMachine.class); + private ClusterMeta clusterMeta = new ClusterMeta(); + + // Command to operation a variable in cluster state machine + public static final OperationId PUT = OperationId.command("put"); + public static final OperationId GET = OperationId.query("get"); + public static final OperationId REMOVE = OperationId.command("remove"); + public static final OperationId INDEX = OperationId.command("index"); + + public ClusterStateMachine() { + super(ClusterPrimitiveType.INSTANCE); + } + + @Override + public Serializer serializer() { + return ClusterManager.clientSerializer; + } + + @Override + protected void configure(ServiceExecutor executor) { + executor.register(PUT, this::put); + executor.register(GET, this::get); + executor.register(REMOVE, this::remove); + executor.register(INDEX, this::index); + } + + protected long put(Commit<ClusterMetaEntity> commit) { + clusterMeta.put(commit.value().getMetaType(), + commit.value().getKey(), commit.value().getValues()); + return commit.index(); + } + + protected Map<String, Map<String, Object>> get(Commit<ClusterMetaEntity> commit) { + return clusterMeta.get(commit.value().getMetaType(), commit.value().getKey()); + } + + protected long remove(Commit<ClusterMetaEntity> commit) { + clusterMeta.remove(commit.value().getMetaType(), commit.value().getKey()); + return commit.index(); + } + + protected long index(Commit<Void> commit) { + return commit.index(); + } + + @Override + public void backup(BackupOutput writer) { + if (logger.isDebugEnabled()) { + logger.debug("ClusterStateMachine.backup()"); + } + + // backup ServerMeta + // cluster meta map struct + // cluster_name -> {server_tserver_host,server_tserver_port,cpu_capacity,...} + Map<String, Map<String, Object>> mapServerMeta + = clusterMeta.get(ClusterMetaType.ServerMeta, ""); + // write all ServerMeta size + writer.writeInt(mapServerMeta.size()); + for (Map.Entry<String, Map<String, Object>> entry : mapServerMeta.entrySet()) { + // write cluster_name + writer.writeString(entry.getKey()); + + Map<String, Object> kvPairs = entry.getValue(); + // write cluster mate kv pairs size + writer.writeInt(kvPairs.size()); + for (Map.Entry<String, Object> entryValue : kvPairs.entrySet()) { + // write cluster mate kv pairs + writer.writeString(entryValue.getKey()); + writer.writeObject(entryValue.getValue()); + } + } + + // backup IntpProcessMeta + // Interpreter meta map struct + // IntpGroupId -> {server_tserver_host,server_tserver_port,...} + Map<String, Map<String, Object>> mapIntpProcMeta + = clusterMeta.get(ClusterMetaType.IntpProcessMeta, ""); + // write interpreter size + writer.writeInt(mapIntpProcMeta.size()); + for (Map.Entry<String, Map<String, Object>> entry : mapIntpProcMeta.entrySet()) { + // write IntpGroupId + writer.writeString(entry.getKey()); + + Map<String, Object> kvPairs = entry.getValue(); + // write interpreter mate kv pairs size + writer.writeInt(kvPairs.size()); + for (Map.Entry<String, Object> entryValue : kvPairs.entrySet()) { + // write interpreter mate kv pairs + writer.writeString(entryValue.getKey()); + writer.writeObject(entryValue.getValue()); + } + } + } + + @Override + public void restore(BackupInput reader) { + if (logger.isDebugEnabled()) { + logger.debug("ClusterStateMachine.restore()"); + } + + clusterMeta = new ClusterMeta(); + // read all ServerMeta size + int nServerMeta = reader.readInt(); + for (int i = 0; i < nServerMeta; i++) { + // read cluster_name + String clusterName = reader.readString(); + + // read cluster mate kv pairs size + int nKVpairs = reader.readInt(); + for (int j = 0; j < nKVpairs; i++) { + // read cluster mate kv pairs + String key = reader.readString(); + Object value = reader.readObject(); + + clusterMeta.put(ClusterMetaType.ServerMeta, + clusterName, Maps.immutableEntry(key, value)); + } + } + + // read all IntpProcessMeta size + int nIntpMeta = reader.readInt(); + for (int i = 0; i < nIntpMeta; i++) { + // read interpreter name + String intpName = reader.readString(); + + // read interpreter mate kv pairs size + int nKVpairs = reader.readInt(); + for (int j = 0; j < nKVpairs; i++) { + // read interpreter mate kv pairs + String key = reader.readString(); + Object value = reader.readObject(); + + clusterMeta.put(ClusterMetaType.IntpProcessMeta, + intpName, Maps.immutableEntry(key, value)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/listener/ZeppelinClusterMembershipEventListener.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/listener/ZeppelinClusterMembershipEventListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/listener/ZeppelinClusterMembershipEventListener.java new file mode 100644 index 0000000..6283813 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/listener/ZeppelinClusterMembershipEventListener.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.cluster.listener; + +import io.atomix.cluster.ClusterMembershipEvent; +import io.atomix.cluster.ClusterMembershipEventListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Entity capable of receiving device cluster-related events. + * Listen for new zeppelin servers to join or leave the cluster, + * Monitor whether the metadata in the cluster server changes + */ +public class ZeppelinClusterMembershipEventListener implements ClusterMembershipEventListener { + private static Logger logger + = LoggerFactory.getLogger(ZeppelinClusterMembershipEventListener.class); + + @Override + public void event(ClusterMembershipEvent event) { + switch (event.type()) { + case MEMBER_ADDED: + logger.info(event.subject().id() + " joined the cluster."); + break; + case MEMBER_REMOVED: + logger.info(event.subject().id() + " left the cluster."); + break; + case METADATA_CHANGED: + logger.info(event.subject().id() + " meta data changed."); + break; + case REACHABILITY_CHANGED: + logger.info(event.subject().id() + " reachability changed."); + break; + } + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java new file mode 100644 index 0000000..b96e32b --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.cluster.meta; + +import com.google.gson.Gson; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +/** + * Metadata stores metadata information in a KV key-value pair + */ +public class ClusterMeta implements Serializable { + private static Logger logger = LoggerFactory.getLogger(ClusterMeta.class); + + // zeppelin-server meta + public static String SERVER_HOST = "SERVER_HOST"; + public static String SERVER_PORT = "SERVER_PORT"; + public static String SERVER_TSERVER_HOST = "SERVER_TSERVER_HOST"; + public static String SERVER_TSERVER_PORT = "SERVER_TSERVER_PORT"; + public static String SERVER_START_TIME = "SERVER_START_TIME"; + + // interperter-process meta + public static String INTP_TSERVER_HOST = "INTP_TSERVER_HOST"; + public static String INTP_TSERVER_PORT = "INTP_TSERVER_PORT"; + public static String INTP_START_TIME = "INTP_START_TIME"; + + // zeppelin-server resource usage + public static String CPU_CAPACITY = "CPU_CAPACITY"; + public static String CPU_USED = "CPU_USED"; + public static String MEMORY_CAPACITY = "MEMORY_CAPACITY"; + public static String MEMORY_USED = "MEMORY_USED"; + + public static String HEARTBEAT = "HEARTBEAT"; + + // zeppelin-server or interperter-process status + public static String STATUS = "STATUS"; + public static String ONLINE_STATUS = "ONLINE"; + public static String OFFLINE_STATUS = "OFFLINE"; + + // cluster_name = host:port + // Map:cluster_name -> {server_tserver_host,server_tserver_port,cpu_capacity,...} + private Map<String, Map<String, Object>> mapServerMeta = new HashMap<>(); + + // Map:InterpreterGroupId -> {cluster_name,intp_tserver_host,...} + private Map<String, Map<String, Object>> mapInterpreterMeta = new HashMap<>(); + + public static Gson gson = new Gson(); + + public void put(ClusterMetaType type, String key, Object value) { + Map<String, Object> mapValue = (Map<String, Object>) value; + + switch (type) { + case ServerMeta: + // Because it may be partially updated metadata information + if (mapServerMeta.containsKey(key)) { + Map<String, Object> values = mapServerMeta.get(key); + values.putAll(mapValue); + } else { + mapServerMeta.put(key, mapValue); + } + break; + case IntpProcessMeta: + if (mapInterpreterMeta.containsKey(key)) { + Map<String, Object> values = mapInterpreterMeta.get(key); + values.putAll(mapValue); + } else { + mapInterpreterMeta.put(key, mapValue); + } + break; + } + } + + public Map<String, Map<String, Object>> get(ClusterMetaType type, String key) { + Map<String, Object> values = null; + + switch (type) { + case ServerMeta: + if (null == key || StringUtils.isEmpty(key)) { + return mapServerMeta; + } + if (mapServerMeta.containsKey(key)) { + values = mapServerMeta.get(key); + } else { + logger.warn("can not find key : {}", key); + } + break; + case IntpProcessMeta: + if (null == key || StringUtils.isEmpty(key)) { + return mapInterpreterMeta; + } + if (mapInterpreterMeta.containsKey(key)) { + values = mapInterpreterMeta.get(key); + } else { + logger.warn("can not find key : {}", key); + } + break; + } + + Map<String, Map<String, Object>> result = new HashMap<>(); + result.put(key, values); + + return result; + } + + public Map<String, Object> remove(ClusterMetaType type, String key) { + switch (type) { + case ServerMeta: + if (mapServerMeta.containsKey(key)) { + return mapServerMeta.remove(key); + } else { + logger.warn("can not find key : {}", key); + } + break; + case IntpProcessMeta: + if (mapInterpreterMeta.containsKey(key)) { + return mapInterpreterMeta.remove(key); + } else { + logger.warn("can not find key : {}", key); + } + break; + } + + return null; + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaEntity.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaEntity.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaEntity.java new file mode 100644 index 0000000..7a5afb0 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaEntity.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.cluster.meta; + +import java.io.Serializable; +import java.util.HashMap; + +/** + * Cluster operations, cluster types, encapsulation objects for keys and values + */ +public class ClusterMetaEntity implements Serializable { + private ClusterMetaOperation operation; + private ClusterMetaType type; + private String key; + private HashMap<String, Object> values = new HashMap<>(); + + public ClusterMetaEntity(ClusterMetaOperation operation, ClusterMetaType type, + String key, HashMap<String, Object> values) { + this.operation = operation; + this.type = type; + this.key = key; + + if (null != values) { + this.values.putAll(values); + } + } + + public ClusterMetaOperation getOperation() { + return operation; + } + + public ClusterMetaType getMetaType() { + return type; + } + + public String getKey() { + return key; + } + + public HashMap<String, Object> getValues() { + return values; + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaOperation.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaOperation.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaOperation.java new file mode 100644 index 0000000..33c99c8 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaOperation.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.cluster.meta; + +/** + * Type of cluster metadata operation + */ +public enum ClusterMetaOperation { + GET_OPERATION, + PUT_OPERATION, + DELETE_OPERATION +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java new file mode 100644 index 0000000..c6229bd --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.cluster.meta; + +/** + * Type of cluster metadata + */ +public enum ClusterMetaType { + ServerMeta, + IntpProcessMeta +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftClientProtocol.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftClientProtocol.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftClientProtocol.java new file mode 100644 index 0000000..eb7a762 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftClientProtocol.java @@ -0,0 +1,163 @@ +/* + * Copyright 2017-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.cluster.protocol; + +import com.google.common.collect.Maps; +import io.atomix.cluster.MemberId; +import io.atomix.primitive.session.SessionId; + +import io.atomix.protocols.raft.protocol.HeartbeatRequest; +import io.atomix.protocols.raft.protocol.PublishRequest; +import io.atomix.protocols.raft.protocol.RaftClientProtocol; +import io.atomix.protocols.raft.protocol.HeartbeatResponse; +import io.atomix.protocols.raft.protocol.OpenSessionResponse; +import io.atomix.protocols.raft.protocol.OpenSessionRequest; +import io.atomix.protocols.raft.protocol.CloseSessionResponse; +import io.atomix.protocols.raft.protocol.CloseSessionRequest; +import io.atomix.protocols.raft.protocol.KeepAliveResponse; +import io.atomix.protocols.raft.protocol.KeepAliveRequest; +import io.atomix.protocols.raft.protocol.QueryResponse; +import io.atomix.protocols.raft.protocol.QueryRequest; +import io.atomix.protocols.raft.protocol.CommandResponse; +import io.atomix.protocols.raft.protocol.CommandRequest; +import io.atomix.protocols.raft.protocol.MetadataResponse; +import io.atomix.protocols.raft.protocol.MetadataRequest; +import io.atomix.protocols.raft.protocol.ResetRequest; +import io.atomix.utils.concurrent.Futures; +import io.atomix.utils.serializer.Serializer; + +import java.net.ConnectException; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * Protocol for intercommunication between Raft clients for each server in the cluster. + * Communication protocol for handling sessions, queries, commands, and services within the cluster. + */ +public class LocalRaftClientProtocol extends LocalRaftProtocol implements RaftClientProtocol { + private Function<HeartbeatRequest, CompletableFuture<HeartbeatResponse>> heartbeatHandler; + private final Map<Long, Consumer<PublishRequest>> publishListeners = Maps.newConcurrentMap(); + + public LocalRaftClientProtocol(MemberId memberId, + Serializer serializer, + Map<MemberId, LocalRaftServerProtocol> servers, + Map<MemberId, LocalRaftClientProtocol> clients) { + super(serializer, servers, clients); + clients.put(memberId, this); + } + + private CompletableFuture<LocalRaftServerProtocol> getServer(MemberId memberId) { + LocalRaftServerProtocol server = server(memberId); + if (server != null) { + return Futures.completedFuture(server); + } else { + return Futures.exceptionalFuture(new ConnectException()); + } + } + + @Override + public CompletableFuture<OpenSessionResponse> openSession(MemberId memberId, + OpenSessionRequest request) { + return getServer(memberId).thenCompose(protocol -> + protocol.openSession(encode(request))).thenApply(this::decode); + } + + @Override + public CompletableFuture<CloseSessionResponse> closeSession(MemberId memberId, + CloseSessionRequest request) { + return getServer(memberId).thenCompose(protocol -> + protocol.closeSession(encode(request))).thenApply(this::decode); + } + + @Override + public CompletableFuture<KeepAliveResponse> keepAlive(MemberId memberId, + KeepAliveRequest request) { + return getServer(memberId).thenCompose(protocol -> + protocol.keepAlive(encode(request))).thenApply(this::decode); + } + + @Override + public CompletableFuture<QueryResponse> query(MemberId memberId, QueryRequest request) { + return getServer(memberId).thenCompose(protocol -> + protocol.query(encode(request))).thenApply(this::decode); + } + + @Override + public CompletableFuture<CommandResponse> command(MemberId memberId, + CommandRequest request) { + return getServer(memberId).thenCompose(protocol -> + protocol.command(encode(request))).thenApply(this::decode); + } + + @Override + public CompletableFuture<MetadataResponse> metadata(MemberId memberId, + MetadataRequest request) { + return getServer(memberId).thenCompose(protocol -> + protocol.metadata(encode(request))).thenApply(this::decode); + } + + CompletableFuture<byte[]> heartbeat(byte[] request) { + if (heartbeatHandler != null) { + return heartbeatHandler.apply(decode(request)).thenApply(this::encode); + } else { + return Futures.exceptionalFuture(new ConnectException()); + } + } + + @Override + public void registerHeartbeatHandler(Function<HeartbeatRequest, + CompletableFuture<HeartbeatResponse>> handler) { + this.heartbeatHandler = handler; + } + + @Override + public void unregisterHeartbeatHandler() { + this.heartbeatHandler = null; + } + + @Override + public void reset(Set<MemberId> members, ResetRequest request) { + members.forEach(nodeId -> { + LocalRaftServerProtocol server = server(nodeId); + if (server != null) { + server.reset(request.session(), encode(request)); + } + }); + } + + void publish(long sessionId, byte[] request) { + Consumer<PublishRequest> listener = publishListeners.get(sessionId); + if (listener != null) { + listener.accept(decode(request)); + } + } + + @Override + public void registerPublishListener(SessionId sessionId, + Consumer<PublishRequest> listener, Executor executor) { + publishListeners.put(sessionId.id(), request -> + executor.execute(() -> listener.accept(request))); + } + + @Override + public void unregisterPublishListener(SessionId sessionId) { + publishListeners.remove(sessionId.id()); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftProtocol.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftProtocol.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftProtocol.java new file mode 100644 index 0000000..c28047a --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftProtocol.java @@ -0,0 +1,58 @@ +/* + * Copyright 2017-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.cluster.protocol; + +import io.atomix.cluster.MemberId; +import io.atomix.utils.serializer.Serializer; + +import java.util.Map; + +/** + * Base class for Raft protocol. + */ +public abstract class LocalRaftProtocol { + private final Serializer serializer; + private final Map<MemberId, LocalRaftServerProtocol> servers; + private final Map<MemberId, LocalRaftClientProtocol> clients; + + public LocalRaftProtocol(Serializer serializer, + Map<MemberId, LocalRaftServerProtocol> servers, + Map<MemberId, LocalRaftClientProtocol> clients) { + this.serializer = serializer; + this.servers = servers; + this.clients = clients; + } + + <T> T copy(T value) { + return serializer.decode(serializer.encode(value)); + } + + byte[] encode(Object value) { + return serializer.encode(value); + } + + <T> T decode(byte[] bytes) { + return serializer.decode(bytes); + } + + LocalRaftServerProtocol server(MemberId memberId) { + return servers.get(memberId); + } + + LocalRaftClientProtocol client(MemberId memberId) { + return clients.get(memberId); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f28e9631/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftProtocolFactory.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftProtocolFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftProtocolFactory.java new file mode 100644 index 0000000..83d2502 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftProtocolFactory.java @@ -0,0 +1,57 @@ +/* + * Copyright 2017-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.cluster.protocol; + +import com.google.common.collect.Maps; +import io.atomix.cluster.MemberId; +import io.atomix.protocols.raft.protocol.RaftClientProtocol; +import io.atomix.protocols.raft.protocol.RaftServerProtocol; +import io.atomix.utils.serializer.Serializer; + +import java.util.Map; + +/** + * Cluster Raft protocol factory. + */ +public class LocalRaftProtocolFactory { + private final Serializer serializer; + private final Map<MemberId, LocalRaftServerProtocol> servers = Maps.newConcurrentMap(); + private final Map<MemberId, LocalRaftClientProtocol> clients = Maps.newConcurrentMap(); + + public LocalRaftProtocolFactory(Serializer serializer) { + this.serializer = serializer; + } + + /** + * Returns a new test client protocol. + * + * @param memberId the client member identifier + * @return a new test client protocol + */ + public RaftClientProtocol newClientProtocol(MemberId memberId) { + return new LocalRaftClientProtocol(memberId, serializer, servers, clients); + } + + /** + * Returns a new test server protocol. + * + * @param memberId the server member identifier + * @return a new test server protocol + */ + public RaftServerProtocol newServerProtocol(MemberId memberId) { + return new LocalRaftServerProtocol(memberId, serializer, servers, clients); + } +}