This is an automated email from the ASF dual-hosted git repository.

dope pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster by this push:
     new 0b7b70c  finish hash functuin code
     new 98342a4  Merge branch 'cluster' of github.com:apache/incubator-iotdb 
into cluster
0b7b70c is described below

commit 0b7b70c213579460eaac21c5724085c422f00c6d
Author: [email protected] <[email protected]>
AuthorDate: Mon Mar 25 20:55:06 2019 +0800

    finish hash functuin code
---
 .../cluster/execption/ErrorConfigureExecption.java |  21 +++
 .../org/apache/iotdb/cluster/utils/Router.java     | 158 ---------------------
 .../iotdb/cluster => }/utils/HashFunction.java     |   2 +-
 .../utils/HashFunction.java => utils/MD5Hash.java} |  31 +++-
 .../apache/iotdb/cluster/utils/PhysicalNode.java   |  65 +++++++++
 .../org/apache/iotdb/cluster/utils/Router.java     | 121 ++++++++++++++++
 .../HashFunction.java => utils/VirtualNode.java}   |  21 ++-
 .../apache/iotdb/cluster/utils/MD5HashTest.java    |  24 ++++
 .../iotdb/cluster/utils/PhysicalNodeTest.java      |  34 +++++
 .../org/apache/iotdb/cluster/utils/RouterTest.java |  29 ++++
 10 files changed, 341 insertions(+), 165 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/execption/ErrorConfigureExecption.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/execption/ErrorConfigureExecption.java
new file mode 100644
index 0000000..2c5d3e1
--- /dev/null
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/execption/ErrorConfigureExecption.java
@@ -0,0 +1,21 @@
+package org.apache.iotdb.cluster.execption;
+
+public class ErrorConfigureExecption extends RuntimeException {
+       private static final long serialVersionUID = 5530077196040763508L;
+
+       public ErrorConfigureExecption() {
+               super();
+       }
+
+       public ErrorConfigureExecption(Exception pathExcp) {
+               super(pathExcp.getMessage());
+       }
+
+       public ErrorConfigureExecption(String msg) {
+               super(msg);
+       }
+
+       public ErrorConfigureExecption(Throwable throwable) {
+               super(throwable.getMessage());
+       }
+}
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/org/apache/iotdb/cluster/utils/Router.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/org/apache/iotdb/cluster/utils/Router.java
deleted file mode 100644
index a1168dd..0000000
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/org/apache/iotdb/cluster/utils/Router.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.cluster.org.apache.iotdb.cluster.utils;
-
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class Router {
-  private List<PhysicalNode> nodes = new ArrayList<>();
-  // Replication number
-  private final int replicator;
-  private final int numOfVirtulaNodes = 2;
-  private HashFunction hashFunction = new MD5Hash();
-  private final SortedMap<Integer, PhysicalNode> physicalRing = new 
TreeMap<>();
-  private final SortedMap<Integer, VirtualNode> virtualRing = new TreeMap<>();
-
-
-  // A local cache to store Which nodes do a storage group correspond to
-  private Map<String, PhysicalNode[]> router = new ConcurrentHashMap<>();
-  private Map<PhysicalNode, PhysicalNode[][]> dataPartitionCache = new 
HashMap<>();
-
-  public Router(){
-    // TODO get replicator form config file
-    String[] ipList = {"192.168.130.1", "192.168.130.2", "192.168.130.3",};
-    this.replicator = 3;
-    int port = 7777;
-
-    for(String ip: ipList){
-      nodes.add(new PhysicalNode(ip, port));
-    }
-    init();
-  }
-
-  private void init(){
-    for(PhysicalNode node : nodes){
-      addNode(node, this.numOfVirtulaNodes);
-    }
-
-  }
-
-  private void addNode(PhysicalNode node, int virtualNum){
-    physicalRing.put(hashFunction.hash(node.getKey()), node);
-    for(int i = 0; i < virtualNum; i++){
-      VirtualNode vNode = new VirtualNode(i, node);
-      virtualRing.put(hashFunction.hash(vNode.getKey()), vNode);
-    }
-  }
-
-  // For a storage group, compute the nearest physical node on the VRing
-  private PhysicalNode routeNode(String objectKey){
-//    if(router.containsKey(objectKey)){
-//      return router.get(objectKey);
-//    }
-//    int hashVal = hashFunction.hash(objectKey);
-//    SortedMap<Integer,VirtualNode> tailMap = virtualRing.tailMap(hashVal);
-//    Integer nodeHashVal = !tailMap.isEmpty() ? tailMap.firstKey() : 
virtualRing.firstKey();
-//    PhysicalNode node = virtualRing.get(nodeHashVal).getPhysicalNode();
-//    router.put(objectKey, node);
-//    return node;
-    return null;
-  }
-
-  // Calculate the physical nodes corresponding to the replications where a 
data point is located
-  public PhysicalNode[] routeGroup(String objectKey){
-    return router.get(objectKey);
-  }
-
-  public PhysicalNode[][] generateGroups(String ip, int port){
-    return this.generateGroups(new PhysicalNode(ip, port));
-  }
-
-  // For a given physical, how many groups does it belong to
-  private PhysicalNode[][] generateGroups(PhysicalNode node){
-    return dataPartitionCache.get(node);
-  }
-
-
-
-  private class PhysicalNode{
-    final String ip;
-    final int port;
-
-    PhysicalNode(String ip, int port){
-      this.ip = ip;
-      this.port = port;
-    }
-
-    String getKey(){
-      return String.format("%s:%d", ip, port);
-    }
-
-  }
-  private class VirtualNode {
-    private final int replicaIndex;
-    private final PhysicalNode physicalNode;
-
-    VirtualNode(int replicaIndex, PhysicalNode physicalNode){
-      this.replicaIndex = replicaIndex;
-      this.physicalNode = physicalNode;
-    }
-
-    PhysicalNode getPhysicalNode(){
-      return this.physicalNode;
-    }
-
-    String getKey(){
-      return String.format("%s-%d", physicalNode.getKey(), replicaIndex);
-    }
-
-  }
-
-  private class MD5Hash implements HashFunction{
-    MessageDigest instance;
-
-    MD5Hash() {
-      try {
-        instance = MessageDigest.getInstance("MD5");
-      } catch (NoSuchAlgorithmException e) {
-      }
-    }
-
-    public synchronized int hash(String key) {
-      instance.reset();
-      instance.update(key.getBytes());
-      byte[] digest = instance.digest();
-
-      int h = 0;
-      for (int i = 0; i < 4; i++) {
-        h <<= 8;
-        h |= ((int) digest[i]) & 0xFF;
-      }
-      return h;
-    }
-  }
-}
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/org/apache/iotdb/cluster/utils/HashFunction.java
 b/cluster/src/main/java/org/apache/iotdb/cluster/utils/HashFunction.java
similarity index 93%
copy from 
cluster/src/main/java/org/apache/iotdb/cluster/org/apache/iotdb/cluster/utils/HashFunction.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/utils/HashFunction.java
index 1101d1d..db6e7bd 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/org/apache/iotdb/cluster/utils/HashFunction.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/HashFunction.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.org.apache.iotdb.cluster.utils;
+package org.apache.iotdb.cluster.utils;
 
 public interface HashFunction {
   public int hash(String str);
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/org/apache/iotdb/cluster/utils/HashFunction.java
 b/cluster/src/main/java/org/apache/iotdb/cluster/utils/MD5Hash.java
similarity index 56%
copy from 
cluster/src/main/java/org/apache/iotdb/cluster/org/apache/iotdb/cluster/utils/HashFunction.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/utils/MD5Hash.java
index 1101d1d..94567e8 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/org/apache/iotdb/cluster/utils/HashFunction.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/MD5Hash.java
@@ -16,8 +16,33 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.org.apache.iotdb.cluster.utils;
+package org.apache.iotdb.cluster.utils;
 
-public interface HashFunction {
-  public int hash(String str);
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+public class MD5Hash implements HashFunction {
+
+  MessageDigest instance;
+
+  public MD5Hash() {
+    try {
+      instance = MessageDigest.getInstance("MD5");
+    } catch (NoSuchAlgorithmException e) {
+    }
+  }
+
+  @Override
+  public synchronized int hash(String str) {
+    instance.reset();
+    instance.update(str.getBytes());
+    byte[] digest = instance.digest();
+
+    int h = 0;
+    for (int i = 0; i < 4; i++) {
+      h <<= 8;
+      h |= ((int) digest[i]) & 0xFF;
+    }
+    return h;
+  }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PhysicalNode.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PhysicalNode.java
new file mode 100644
index 0000000..57a5bea
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PhysicalNode.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.utils;
+
+
+public class PhysicalNode {
+
+  final String ip;
+  final int port;
+
+  PhysicalNode(String ip, int port) {
+    this.ip = ip;
+    this.port = port;
+  }
+
+  String getKey() {
+    return String.format("%s:%d", ip, port);
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((ip == null) ? 0 : ip.hashCode());
+    result = prime * result + port;
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    PhysicalNode other = (PhysicalNode) obj;
+    if (this.port != other.port) {
+      return false;
+    }
+    if (this.ip == null) {
+      return other.ip == null ? true : false;
+    }
+    return this.ip.equals(other.ip);
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/Router.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/Router.java
new file mode 100644
index 0000000..573df45
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/Router.java
@@ -0,0 +1,121 @@
+
+package org.apache.iotdb.cluster.utils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.iotdb.cluster.execption.ErrorConfigureExecption;
+
+public class Router {
+
+  private List<PhysicalNode> nodes = new ArrayList<>();
+  // Replication number
+  private int replicator;
+  private final int numOfVirtulaNodes = 2;
+  private HashFunction hashFunction = new MD5Hash();
+  private final SortedMap<Integer, PhysicalNode> physicalRing = new 
TreeMap<>();
+  private final SortedMap<Integer, VirtualNode> virtualRing = new TreeMap<>();
+
+  // A local cache to store Which nodes do a storage group correspond to
+  private Map<String, PhysicalNode[]> router = new ConcurrentHashMap<>();
+  private Map<PhysicalNode, PhysicalNode[][]> dataPartitionCache = new 
HashMap<>();
+
+  private static class RouterHolder {
+
+    private static final Router INSTANCE = new Router();
+  }
+
+  private Router() {
+    // TODO get form config file
+    // String[] ipList = {"192.168.130.1", "192.168.130.2", "192.168.130.3"};
+    // this.replicator = replicator;
+    // int port = 7777;
+//    for (String ip : ipList) {
+//      nodes.add(new PhysicalNode(ip, port));
+//    }
+    init();
+  }
+
+  public static final Router getInstance() {
+    return RouterHolder.INSTANCE;
+  }
+
+  private void init() {
+    reset();
+    for (PhysicalNode node : nodes) {
+      addNode(node, this.numOfVirtulaNodes);
+    }
+    PhysicalNode[] nodes = (PhysicalNode[]) physicalRing.values().toArray();
+    int len = nodes.length;
+    for (int i = 0; i < len; i++) {
+      PhysicalNode first = nodes[i];
+      if (len < replicator) {
+        throw new ErrorConfigureExecption(String.format("Replicator number %d 
is greater "
+            + "than cluster number %d", replicator, len));
+      } else if (len == replicator) {
+        PhysicalNode[][] val = new PhysicalNode[1][len];
+        for (int j = 0; j < len; j++) {
+          val[0][j] = nodes[i + j % len];
+        }
+        dataPartitionCache.put(first, val);
+      } else {
+        PhysicalNode[][] val = new PhysicalNode[replicator][replicator];
+        for (int j = 0; j < replicator; j++) {
+          for (int k = 0; k < replicator; k++) {
+            val[j][k] = nodes[(i - j + k) % len];
+          }
+        }
+        dataPartitionCache.put(first, val);
+      }
+    }
+  }
+
+  // Calculate the physical nodes corresponding to the replications 
+  // where a data point is located
+  public PhysicalNode[] routeGroup(String objectKey) {
+    if (router.containsKey(objectKey)) {
+      return router.get(objectKey);
+    }
+    PhysicalNode node = routeNode(objectKey);
+    PhysicalNode[] nodes = dataPartitionCache.get(node)[0];
+    router.put(objectKey, nodes);
+    return nodes;
+  }
+
+  public PhysicalNode[][] generateGroups(String ip, int port) {
+    return this.generateGroups(new PhysicalNode(ip, port));
+  }
+
+  private void addNode(PhysicalNode node, int virtualNum) {
+    physicalRing.put(hashFunction.hash(node.getKey()), node);
+    for (int i = 0; i < virtualNum; i++) {
+      VirtualNode vNode = new VirtualNode(i, node);
+      virtualRing.put(hashFunction.hash(vNode.getKey()), vNode);
+    }
+  }
+
+  // For a storage group, compute the nearest physical node on the VRing
+  private PhysicalNode routeNode(String objectKey) {
+    int hashVal = hashFunction.hash(objectKey);
+    SortedMap<Integer, VirtualNode> tailMap = virtualRing.tailMap(hashVal);
+    Integer nodeHashVal = !tailMap.isEmpty() ? tailMap.firstKey() : 
virtualRing.firstKey();
+    return virtualRing.get(nodeHashVal).getPhysicalNode();
+  }
+
+  // For a given physical, how many groups does it belong to
+  private PhysicalNode[][] generateGroups(PhysicalNode node) {
+    return dataPartitionCache.get(node);
+  }
+
+  private void reset(){
+    physicalRing.clear();
+    virtualRing.clear();
+    router.clear();
+    dataPartitionCache.clear();
+  }
+}
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/org/apache/iotdb/cluster/utils/HashFunction.java
 b/cluster/src/main/java/org/apache/iotdb/cluster/utils/VirtualNode.java
similarity index 64%
rename from 
cluster/src/main/java/org/apache/iotdb/cluster/org/apache/iotdb/cluster/utils/HashFunction.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/utils/VirtualNode.java
index 1101d1d..3ae5f2f 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/org/apache/iotdb/cluster/utils/HashFunction.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/VirtualNode.java
@@ -16,8 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.org.apache.iotdb.cluster.utils;
+package org.apache.iotdb.cluster.utils;
 
-public interface HashFunction {
-  public int hash(String str);
+public class VirtualNode {
+
+  private final int replicaIndex;
+  private final PhysicalNode physicalNode;
+
+  VirtualNode(int replicaIndex, PhysicalNode physicalNode) {
+    this.replicaIndex = replicaIndex;
+    this.physicalNode = physicalNode;
+  }
+
+  PhysicalNode getPhysicalNode() {
+    return this.physicalNode;
+  }
+
+  String getKey() {
+    return String.format("%s-%d", physicalNode.getKey(), replicaIndex);
+  }
 }
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/utils/MD5HashTest.java 
b/cluster/src/test/java/org/apache/iotdb/cluster/utils/MD5HashTest.java
new file mode 100644
index 0000000..e6c1c63
--- /dev/null
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/utils/MD5HashTest.java
@@ -0,0 +1,24 @@
+package org.apache.iotdb.cluster.utils;
+
+import static org.junit.Assert.*;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MD5HashTest {
+
+       @Before
+       public void setUp() throws Exception {
+       }
+
+       @After
+       public void tearDown() throws Exception {
+       }
+
+       @Test
+       public void testHash() {
+               fail("Not yet implemented");
+       }
+
+}
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/utils/PhysicalNodeTest.java 
b/cluster/src/test/java/org/apache/iotdb/cluster/utils/PhysicalNodeTest.java
new file mode 100644
index 0000000..b006194
--- /dev/null
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/utils/PhysicalNodeTest.java
@@ -0,0 +1,34 @@
+package org.apache.iotdb.cluster.utils;
+
+import static org.junit.Assert.*;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PhysicalNodeTest {
+
+       @Before
+       public void setUp() throws Exception {
+       }
+
+       @After
+       public void tearDown() throws Exception {
+       }
+
+       @Test
+       public void testHashCode() {
+               fail("Not yet implemented");
+       }
+
+       @Test
+       public void testGetKey() {
+               fail("Not yet implemented");
+       }
+
+       @Test
+       public void testEqualsObject() {
+               fail("Not yet implemented");
+       }
+
+}
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/utils/RouterTest.java 
b/cluster/src/test/java/org/apache/iotdb/cluster/utils/RouterTest.java
new file mode 100644
index 0000000..d6e13bf
--- /dev/null
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/utils/RouterTest.java
@@ -0,0 +1,29 @@
+package org.apache.iotdb.cluster.utils;
+
+import static org.junit.Assert.*;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RouterTest {
+
+       @Before
+       public void setUp() throws Exception {
+       }
+
+       @After
+       public void tearDown() throws Exception {
+       }
+
+       @Test
+       public void testRouteGroup() {
+               fail("Not yet implemented");
+       }
+
+       @Test
+       public void testGenerateGroups() {
+               fail("Not yet implemented");
+       }
+
+}

Reply via email to