This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch lta_example_akka_cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 7d3febe3b61b11492bb557157b6ba5264e74c14e
Author: lta <[email protected]>
AuthorDate: Mon Mar 4 10:43:40 2019 +0800

    add akka cluster example
---
 iotdb/iotdb/conf/iotdb_cluster.conf                | 34 +++++++++++
 iotdb/pom.xml                                      | 20 +++++++
 .../org/apache/iotdb/db/cluster/IoTDBClustetr.java | 45 +++++++++++++++
 .../org/apache/iotdb/db/cluster/IoTDBNode.java     | 66 ++++++++++++++++++++++
 .../org/apache/iotdb/db/conf/IoTDBConstant.java    |  2 +
 5 files changed, 167 insertions(+)

diff --git a/iotdb/iotdb/conf/iotdb_cluster.conf 
b/iotdb/iotdb/conf/iotdb_cluster.conf
new file mode 100644
index 0000000..93a8fbe
--- /dev/null
+++ b/iotdb/iotdb/conf/iotdb_cluster.conf
@@ -0,0 +1,34 @@
+akka {
+  actor {
+    provider = "cluster"
+  }
+  remote {
+    netty.tcp {
+      hostname = "127.0.0.1"
+      port = 2552
+    }
+
+    artery {
+      # change this to enabled=on to use Artery instead of netty
+      # see https://doc.akka.io/docs/akka/current/remoting-artery.html
+      enabled = off
+      transport = tcp
+      canonical.hostname = "127.0.0.1"
+      canonical.port = 0
+    }
+  }
+
+  cluster {
+    # Note - Artery uses akka:// addresses
+    seed-nodes = [
+      "akka.tcp://[email protected]:2552",
+      "akka.tcp://[email protected]:2551"]
+
+    # auto downing is NOT safe for production deployments.
+    # you may want to use it during development, read more about it in the 
docs.
+    auto-down-unreachable-after = 10s
+  }
+}
+
+# Enable metrics extension in akka-cluster-metrics.
+akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]
diff --git a/iotdb/pom.xml b/iotdb/pom.xml
index 7ff37e9..b2c80c9 100644
--- a/iotdb/pom.xml
+++ b/iotdb/pom.xml
@@ -70,6 +70,26 @@
             <artifactId>commons-lang3</artifactId>
             <version>${common.lang3.version}</version>
         </dependency>
+        <dependency>
+            <groupId>com.typesafe.akka</groupId>
+            <artifactId>akka-cluster-tools_2.12</artifactId>
+            <version>2.5.20</version>
+        </dependency>
+        <dependency>
+            <groupId>com.typesafe.akka</groupId>
+            <artifactId>akka-cluster-metrics_2.12</artifactId>
+            <version>2.5.20</version>
+        </dependency>
+        <dependency>
+            <groupId>com.typesafe.akka</groupId>
+            <artifactId>akka-protobuf_2.11</artifactId>
+            <version>2.5.20</version>
+        </dependency>
+        <dependency>
+            <groupId>com.typesafe.akka</groupId>
+            <artifactId>akka-stream_2.11</artifactId>
+            <version>2.5.20</version>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/cluster/IoTDBClustetr.java 
b/iotdb/src/main/java/org/apache/iotdb/db/cluster/IoTDBClustetr.java
new file mode 100644
index 0000000..a0e0c6c
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/cluster/IoTDBClustetr.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership.  The ASF licenses this file to you under the Apache 
License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with the 
License.  You may obtain
+ * a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied.  See the License for the specific language governing 
permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.cluster;
+
+import akka.actor.ActorSystem;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import java.io.File;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+
+/**
+ * @author lta
+ */
+public class IoTDBClustetr {
+
+  public static void main(String[] args) {
+    String url = System.getProperty(IoTDBConstant.IOTDB_CONF, null);
+    Config config;
+    if (url != null) {
+      config = ConfigFactory.parseFile(new File(url, 
IoTDBConstant.CLUSTER_CONFIG));
+    } else {
+      config = ConfigFactory.parseFile(new File(
+          
"/Users/litianan/workspace/java/IDEA/IoTDBProject/incubator-iotdb/iotdb/iotdb/conf",
+          IoTDBConstant.CLUSTER_CONFIG));
+    }
+
+    // Create an Akka system
+    ActorSystem system = ActorSystem.create("IoTDBClusterSystem", config);
+
+    // Create an actor that handles cluster domain events
+    system.actorOf(IoTDBNode.props(), "clusterListener");
+  }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/cluster/IoTDBNode.java 
b/iotdb/src/main/java/org/apache/iotdb/db/cluster/IoTDBNode.java
new file mode 100644
index 0000000..e1dc5ec
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/cluster/IoTDBNode.java
@@ -0,0 +1,66 @@
+package org.apache.iotdb.db.cluster;
+
+import akka.actor.AbstractActor;
+import akka.actor.Props;
+import akka.cluster.Cluster;
+import akka.cluster.ClusterEvent;
+import akka.cluster.ClusterEvent.MemberEvent;
+import akka.cluster.ClusterEvent.MemberRemoved;
+import akka.cluster.ClusterEvent.MemberUp;
+import akka.cluster.ClusterEvent.UnreachableMember;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.service.IoTDB;
+
+/**
+ * @author lta
+ */
+public class IoTDBNode extends AbstractActor {
+
+  private LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+  private Cluster cluster = Cluster.get(getContext().system());
+  private IoTDB daemon;
+
+  static public Props props() {
+    return Props.create(IoTDBNode.class, () -> new IoTDBNode());
+  }
+
+  //subscribe to cluster changes
+  @Override
+  public void preStart() {
+    cluster.subscribe(self(), ClusterEvent.initialStateAsEvents(),
+        MemberEvent.class, UnreachableMember.class);
+    daemon = IoTDB.getInstance();
+    daemon.active();
+  }
+
+  //re-subscribe when restart
+  @Override
+  public void postStop() throws FileNodeManagerException {
+    cluster.unsubscribe(self());
+    daemon.stop();
+  }
+
+  public IoTDBNode() {
+  }
+
+
+  @Override
+  public Receive createReceive() {
+    return receiveBuilder()
+        .match(MemberUp.class, mUp -> {
+          log.info("IoTDB node is Up: {}", mUp.member());
+        })
+        .match(UnreachableMember.class, mUnreachable -> {
+          log.info("IoTDB node detected as unreachable: {}", 
mUnreachable.member());
+        })
+        .match(MemberRemoved.class, mRemoved -> {
+          log.info("IoTDB node is Removed: {}", mRemoved.member());
+        })
+        .match(MemberEvent.class, message -> {
+          // ignore
+        })
+        .build();
+  }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java 
b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index 5135e79..48dbd9b 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -55,4 +55,6 @@ public class IoTDBConstant {
   public static final String MAX_TIME = "max_time";
   public static final String MIN_TIME = "min_time";
   public static final int MIN_SUPPORTED_JDK_VERSION = 8;
+
+  public static final String CLUSTER_CONFIG = "iotdb_cluster.conf";
 }

Reply via email to