This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch lta_example_akka_cluster in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 7d3febe3b61b11492bb557157b6ba5264e74c14e Author: lta <[email protected]> AuthorDate: Mon Mar 4 10:43:40 2019 +0800 add akka cluster example --- iotdb/iotdb/conf/iotdb_cluster.conf | 34 +++++++++++ iotdb/pom.xml | 20 +++++++ .../org/apache/iotdb/db/cluster/IoTDBClustetr.java | 45 +++++++++++++++ .../org/apache/iotdb/db/cluster/IoTDBNode.java | 66 ++++++++++++++++++++++ .../org/apache/iotdb/db/conf/IoTDBConstant.java | 2 + 5 files changed, 167 insertions(+) diff --git a/iotdb/iotdb/conf/iotdb_cluster.conf b/iotdb/iotdb/conf/iotdb_cluster.conf new file mode 100644 index 0000000..93a8fbe --- /dev/null +++ b/iotdb/iotdb/conf/iotdb_cluster.conf @@ -0,0 +1,34 @@ +akka { + actor { + provider = "cluster" + } + remote { + netty.tcp { + hostname = "127.0.0.1" + port = 2552 + } + + artery { + # change this to enabled=on to use Artery instead of netty + # see https://doc.akka.io/docs/akka/current/remoting-artery.html + enabled = off + transport = tcp + canonical.hostname = "127.0.0.1" + canonical.port = 0 + } + } + + cluster { + # Note - Artery uses akka:// addresses + seed-nodes = [ + "akka.tcp://[email protected]:2552", + "akka.tcp://[email protected]:2551"] + + # auto downing is NOT safe for production deployments. + # you may want to use it during development, read more about it in the docs. + auto-down-unreachable-after = 10s + } +} + +# Enable metrics extension in akka-cluster-metrics. +akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"] diff --git a/iotdb/pom.xml b/iotdb/pom.xml index 7ff37e9..b2c80c9 100644 --- a/iotdb/pom.xml +++ b/iotdb/pom.xml @@ -70,6 +70,26 @@ <artifactId>commons-lang3</artifactId> <version>${common.lang3.version}</version> </dependency> + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-cluster-tools_2.12</artifactId> + <version>2.5.20</version> + </dependency> + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-cluster-metrics_2.12</artifactId> + <version>2.5.20</version> + </dependency> + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-protobuf_2.11</artifactId> + <version>2.5.20</version> + </dependency> + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-stream_2.11</artifactId> + <version>2.5.20</version> + </dependency> </dependencies> <build> <plugins> diff --git a/iotdb/src/main/java/org/apache/iotdb/db/cluster/IoTDBClustetr.java b/iotdb/src/main/java/org/apache/iotdb/db/cluster/IoTDBClustetr.java new file mode 100644 index 0000000..a0e0c6c --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/cluster/IoTDBClustetr.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.cluster; + +import akka.actor.ActorSystem; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import java.io.File; +import org.apache.iotdb.db.conf.IoTDBConstant; + +/** + * @author lta + */ +public class IoTDBClustetr { + + public static void main(String[] args) { + String url = System.getProperty(IoTDBConstant.IOTDB_CONF, null); + Config config; + if (url != null) { + config = ConfigFactory.parseFile(new File(url, IoTDBConstant.CLUSTER_CONFIG)); + } else { + config = ConfigFactory.parseFile(new File( + "/Users/litianan/workspace/java/IDEA/IoTDBProject/incubator-iotdb/iotdb/iotdb/conf", + IoTDBConstant.CLUSTER_CONFIG)); + } + + // Create an Akka system + ActorSystem system = ActorSystem.create("IoTDBClusterSystem", config); + + // Create an actor that handles cluster domain events + system.actorOf(IoTDBNode.props(), "clusterListener"); + } +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/cluster/IoTDBNode.java b/iotdb/src/main/java/org/apache/iotdb/db/cluster/IoTDBNode.java new file mode 100644 index 0000000..e1dc5ec --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/cluster/IoTDBNode.java @@ -0,0 +1,66 @@ +package org.apache.iotdb.db.cluster; + +import akka.actor.AbstractActor; +import akka.actor.Props; +import akka.cluster.Cluster; +import akka.cluster.ClusterEvent; +import akka.cluster.ClusterEvent.MemberEvent; +import akka.cluster.ClusterEvent.MemberRemoved; +import akka.cluster.ClusterEvent.MemberUp; +import akka.cluster.ClusterEvent.UnreachableMember; +import akka.event.Logging; +import akka.event.LoggingAdapter; +import org.apache.iotdb.db.exception.FileNodeManagerException; +import org.apache.iotdb.db.service.IoTDB; + +/** + * @author lta + */ +public class IoTDBNode extends AbstractActor { + + private LoggingAdapter log = Logging.getLogger(getContext().system(), this); + private Cluster cluster = Cluster.get(getContext().system()); + private IoTDB daemon; + + static public Props props() { + return Props.create(IoTDBNode.class, () -> new IoTDBNode()); + } + + //subscribe to cluster changes + @Override + public void preStart() { + cluster.subscribe(self(), ClusterEvent.initialStateAsEvents(), + MemberEvent.class, UnreachableMember.class); + daemon = IoTDB.getInstance(); + daemon.active(); + } + + //re-subscribe when restart + @Override + public void postStop() throws FileNodeManagerException { + cluster.unsubscribe(self()); + daemon.stop(); + } + + public IoTDBNode() { + } + + + @Override + public Receive createReceive() { + return receiveBuilder() + .match(MemberUp.class, mUp -> { + log.info("IoTDB node is Up: {}", mUp.member()); + }) + .match(UnreachableMember.class, mUnreachable -> { + log.info("IoTDB node detected as unreachable: {}", mUnreachable.member()); + }) + .match(MemberRemoved.class, mRemoved -> { + log.info("IoTDB node is Removed: {}", mRemoved.member()); + }) + .match(MemberEvent.class, message -> { + // ignore + }) + .build(); + } +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java index 5135e79..48dbd9b 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java @@ -55,4 +55,6 @@ public class IoTDBConstant { public static final String MAX_TIME = "max_time"; public static final String MIN_TIME = "min_time"; public static final int MIN_SUPPORTED_JDK_VERSION = 8; + + public static final String CLUSTER_CONFIG = "iotdb_cluster.conf"; }
