This is an automated email from the ASF dual-hosted git repository.
dope pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster by this push:
new 0b7b70c finish hash functuin code
new 98342a4 Merge branch 'cluster' of github.com:apache/incubator-iotdb
into cluster
0b7b70c is described below
commit 0b7b70c213579460eaac21c5724085c422f00c6d
Author: [email protected] <[email protected]>
AuthorDate: Mon Mar 25 20:55:06 2019 +0800
finish hash functuin code
---
.../cluster/execption/ErrorConfigureExecption.java | 21 +++
.../org/apache/iotdb/cluster/utils/Router.java | 158 ---------------------
.../iotdb/cluster => }/utils/HashFunction.java | 2 +-
.../utils/HashFunction.java => utils/MD5Hash.java} | 31 +++-
.../apache/iotdb/cluster/utils/PhysicalNode.java | 65 +++++++++
.../org/apache/iotdb/cluster/utils/Router.java | 121 ++++++++++++++++
.../HashFunction.java => utils/VirtualNode.java} | 21 ++-
.../apache/iotdb/cluster/utils/MD5HashTest.java | 24 ++++
.../iotdb/cluster/utils/PhysicalNodeTest.java | 34 +++++
.../org/apache/iotdb/cluster/utils/RouterTest.java | 29 ++++
10 files changed, 341 insertions(+), 165 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/execption/ErrorConfigureExecption.java
b/cluster/src/main/java/org/apache/iotdb/cluster/execption/ErrorConfigureExecption.java
new file mode 100644
index 0000000..2c5d3e1
--- /dev/null
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/execption/ErrorConfigureExecption.java
@@ -0,0 +1,21 @@
+package org.apache.iotdb.cluster.execption;
+
+public class ErrorConfigureExecption extends RuntimeException {
+ private static final long serialVersionUID = 5530077196040763508L;
+
+ public ErrorConfigureExecption() {
+ super();
+ }
+
+ public ErrorConfigureExecption(Exception pathExcp) {
+ super(pathExcp.getMessage());
+ }
+
+ public ErrorConfigureExecption(String msg) {
+ super(msg);
+ }
+
+ public ErrorConfigureExecption(Throwable throwable) {
+ super(throwable.getMessage());
+ }
+}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/org/apache/iotdb/cluster/utils/Router.java
b/cluster/src/main/java/org/apache/iotdb/cluster/org/apache/iotdb/cluster/utils/Router.java
deleted file mode 100644
index a1168dd..0000000
---
a/cluster/src/main/java/org/apache/iotdb/cluster/org/apache/iotdb/cluster/utils/Router.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.cluster.org.apache.iotdb.cluster.utils;
-
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class Router {
- private List<PhysicalNode> nodes = new ArrayList<>();
- // Replication number
- private final int replicator;
- private final int numOfVirtulaNodes = 2;
- private HashFunction hashFunction = new MD5Hash();
- private final SortedMap<Integer, PhysicalNode> physicalRing = new
TreeMap<>();
- private final SortedMap<Integer, VirtualNode> virtualRing = new TreeMap<>();
-
-
- // A local cache to store Which nodes do a storage group correspond to
- private Map<String, PhysicalNode[]> router = new ConcurrentHashMap<>();
- private Map<PhysicalNode, PhysicalNode[][]> dataPartitionCache = new
HashMap<>();
-
- public Router(){
- // TODO get replicator form config file
- String[] ipList = {"192.168.130.1", "192.168.130.2", "192.168.130.3",};
- this.replicator = 3;
- int port = 7777;
-
- for(String ip: ipList){
- nodes.add(new PhysicalNode(ip, port));
- }
- init();
- }
-
- private void init(){
- for(PhysicalNode node : nodes){
- addNode(node, this.numOfVirtulaNodes);
- }
-
- }
-
- private void addNode(PhysicalNode node, int virtualNum){
- physicalRing.put(hashFunction.hash(node.getKey()), node);
- for(int i = 0; i < virtualNum; i++){
- VirtualNode vNode = new VirtualNode(i, node);
- virtualRing.put(hashFunction.hash(vNode.getKey()), vNode);
- }
- }
-
- // For a storage group, compute the nearest physical node on the VRing
- private PhysicalNode routeNode(String objectKey){
-// if(router.containsKey(objectKey)){
-// return router.get(objectKey);
-// }
-// int hashVal = hashFunction.hash(objectKey);
-// SortedMap<Integer,VirtualNode> tailMap = virtualRing.tailMap(hashVal);
-// Integer nodeHashVal = !tailMap.isEmpty() ? tailMap.firstKey() :
virtualRing.firstKey();
-// PhysicalNode node = virtualRing.get(nodeHashVal).getPhysicalNode();
-// router.put(objectKey, node);
-// return node;
- return null;
- }
-
- // Calculate the physical nodes corresponding to the replications where a
data point is located
- public PhysicalNode[] routeGroup(String objectKey){
- return router.get(objectKey);
- }
-
- public PhysicalNode[][] generateGroups(String ip, int port){
- return this.generateGroups(new PhysicalNode(ip, port));
- }
-
- // For a given physical, how many groups does it belong to
- private PhysicalNode[][] generateGroups(PhysicalNode node){
- return dataPartitionCache.get(node);
- }
-
-
-
- private class PhysicalNode{
- final String ip;
- final int port;
-
- PhysicalNode(String ip, int port){
- this.ip = ip;
- this.port = port;
- }
-
- String getKey(){
- return String.format("%s:%d", ip, port);
- }
-
- }
- private class VirtualNode {
- private final int replicaIndex;
- private final PhysicalNode physicalNode;
-
- VirtualNode(int replicaIndex, PhysicalNode physicalNode){
- this.replicaIndex = replicaIndex;
- this.physicalNode = physicalNode;
- }
-
- PhysicalNode getPhysicalNode(){
- return this.physicalNode;
- }
-
- String getKey(){
- return String.format("%s-%d", physicalNode.getKey(), replicaIndex);
- }
-
- }
-
- private class MD5Hash implements HashFunction{
- MessageDigest instance;
-
- MD5Hash() {
- try {
- instance = MessageDigest.getInstance("MD5");
- } catch (NoSuchAlgorithmException e) {
- }
- }
-
- public synchronized int hash(String key) {
- instance.reset();
- instance.update(key.getBytes());
- byte[] digest = instance.digest();
-
- int h = 0;
- for (int i = 0; i < 4; i++) {
- h <<= 8;
- h |= ((int) digest[i]) & 0xFF;
- }
- return h;
- }
- }
-}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/org/apache/iotdb/cluster/utils/HashFunction.java
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/HashFunction.java
similarity index 93%
copy from
cluster/src/main/java/org/apache/iotdb/cluster/org/apache/iotdb/cluster/utils/HashFunction.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/utils/HashFunction.java
index 1101d1d..db6e7bd 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/org/apache/iotdb/cluster/utils/HashFunction.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/HashFunction.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.cluster.org.apache.iotdb.cluster.utils;
+package org.apache.iotdb.cluster.utils;
public interface HashFunction {
public int hash(String str);
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/org/apache/iotdb/cluster/utils/HashFunction.java
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/MD5Hash.java
similarity index 56%
copy from
cluster/src/main/java/org/apache/iotdb/cluster/org/apache/iotdb/cluster/utils/HashFunction.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/utils/MD5Hash.java
index 1101d1d..94567e8 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/org/apache/iotdb/cluster/utils/HashFunction.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/MD5Hash.java
@@ -16,8 +16,33 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.cluster.org.apache.iotdb.cluster.utils;
+package org.apache.iotdb.cluster.utils;
-public interface HashFunction {
- public int hash(String str);
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+public class MD5Hash implements HashFunction {
+
+ MessageDigest instance;
+
+ public MD5Hash() {
+ try {
+ instance = MessageDigest.getInstance("MD5");
+ } catch (NoSuchAlgorithmException e) {
+ }
+ }
+
+ @Override
+ public synchronized int hash(String str) {
+ instance.reset();
+ instance.update(str.getBytes());
+ byte[] digest = instance.digest();
+
+ int h = 0;
+ for (int i = 0; i < 4; i++) {
+ h <<= 8;
+ h |= ((int) digest[i]) & 0xFF;
+ }
+ return h;
+ }
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PhysicalNode.java
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PhysicalNode.java
new file mode 100644
index 0000000..57a5bea
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PhysicalNode.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.utils;
+
+
+public class PhysicalNode {
+
+ final String ip;
+ final int port;
+
+ PhysicalNode(String ip, int port) {
+ this.ip = ip;
+ this.port = port;
+ }
+
+ String getKey() {
+ return String.format("%s:%d", ip, port);
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((ip == null) ? 0 : ip.hashCode());
+ result = prime * result + port;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ PhysicalNode other = (PhysicalNode) obj;
+ if (this.port != other.port) {
+ return false;
+ }
+ if (this.ip == null) {
+ return other.ip == null ? true : false;
+ }
+ return this.ip.equals(other.ip);
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/Router.java
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/Router.java
new file mode 100644
index 0000000..573df45
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/Router.java
@@ -0,0 +1,121 @@
+
+package org.apache.iotdb.cluster.utils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.iotdb.cluster.execption.ErrorConfigureExecption;
+
+public class Router {
+
+ private List<PhysicalNode> nodes = new ArrayList<>();
+ // Replication number
+ private int replicator;
+ private final int numOfVirtulaNodes = 2;
+ private HashFunction hashFunction = new MD5Hash();
+ private final SortedMap<Integer, PhysicalNode> physicalRing = new
TreeMap<>();
+ private final SortedMap<Integer, VirtualNode> virtualRing = new TreeMap<>();
+
+ // A local cache to store Which nodes do a storage group correspond to
+ private Map<String, PhysicalNode[]> router = new ConcurrentHashMap<>();
+ private Map<PhysicalNode, PhysicalNode[][]> dataPartitionCache = new
HashMap<>();
+
+ private static class RouterHolder {
+
+ private static final Router INSTANCE = new Router();
+ }
+
+ private Router() {
+ // TODO get form config file
+ // String[] ipList = {"192.168.130.1", "192.168.130.2", "192.168.130.3"};
+ // this.replicator = replicator;
+ // int port = 7777;
+// for (String ip : ipList) {
+// nodes.add(new PhysicalNode(ip, port));
+// }
+ init();
+ }
+
+ public static final Router getInstance() {
+ return RouterHolder.INSTANCE;
+ }
+
+ private void init() {
+ reset();
+ for (PhysicalNode node : nodes) {
+ addNode(node, this.numOfVirtulaNodes);
+ }
+ PhysicalNode[] nodes = (PhysicalNode[]) physicalRing.values().toArray();
+ int len = nodes.length;
+ for (int i = 0; i < len; i++) {
+ PhysicalNode first = nodes[i];
+ if (len < replicator) {
+ throw new ErrorConfigureExecption(String.format("Replicator number %d
is greater "
+ + "than cluster number %d", replicator, len));
+ } else if (len == replicator) {
+ PhysicalNode[][] val = new PhysicalNode[1][len];
+ for (int j = 0; j < len; j++) {
+ val[0][j] = nodes[i + j % len];
+ }
+ dataPartitionCache.put(first, val);
+ } else {
+ PhysicalNode[][] val = new PhysicalNode[replicator][replicator];
+ for (int j = 0; j < replicator; j++) {
+ for (int k = 0; k < replicator; k++) {
+ val[j][k] = nodes[(i - j + k) % len];
+ }
+ }
+ dataPartitionCache.put(first, val);
+ }
+ }
+ }
+
+ // Calculate the physical nodes corresponding to the replications
+ // where a data point is located
+ public PhysicalNode[] routeGroup(String objectKey) {
+ if (router.containsKey(objectKey)) {
+ return router.get(objectKey);
+ }
+ PhysicalNode node = routeNode(objectKey);
+ PhysicalNode[] nodes = dataPartitionCache.get(node)[0];
+ router.put(objectKey, nodes);
+ return nodes;
+ }
+
+ public PhysicalNode[][] generateGroups(String ip, int port) {
+ return this.generateGroups(new PhysicalNode(ip, port));
+ }
+
+ private void addNode(PhysicalNode node, int virtualNum) {
+ physicalRing.put(hashFunction.hash(node.getKey()), node);
+ for (int i = 0; i < virtualNum; i++) {
+ VirtualNode vNode = new VirtualNode(i, node);
+ virtualRing.put(hashFunction.hash(vNode.getKey()), vNode);
+ }
+ }
+
+ // For a storage group, compute the nearest physical node on the VRing
+ private PhysicalNode routeNode(String objectKey) {
+ int hashVal = hashFunction.hash(objectKey);
+ SortedMap<Integer, VirtualNode> tailMap = virtualRing.tailMap(hashVal);
+ Integer nodeHashVal = !tailMap.isEmpty() ? tailMap.firstKey() :
virtualRing.firstKey();
+ return virtualRing.get(nodeHashVal).getPhysicalNode();
+ }
+
+ // For a given physical, how many groups does it belong to
+ private PhysicalNode[][] generateGroups(PhysicalNode node) {
+ return dataPartitionCache.get(node);
+ }
+
+ private void reset(){
+ physicalRing.clear();
+ virtualRing.clear();
+ router.clear();
+ dataPartitionCache.clear();
+ }
+}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/org/apache/iotdb/cluster/utils/HashFunction.java
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/VirtualNode.java
similarity index 64%
rename from
cluster/src/main/java/org/apache/iotdb/cluster/org/apache/iotdb/cluster/utils/HashFunction.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/utils/VirtualNode.java
index 1101d1d..3ae5f2f 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/org/apache/iotdb/cluster/utils/HashFunction.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/VirtualNode.java
@@ -16,8 +16,23 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.cluster.org.apache.iotdb.cluster.utils;
+package org.apache.iotdb.cluster.utils;
-public interface HashFunction {
- public int hash(String str);
+public class VirtualNode {
+
+ private final int replicaIndex;
+ private final PhysicalNode physicalNode;
+
+ VirtualNode(int replicaIndex, PhysicalNode physicalNode) {
+ this.replicaIndex = replicaIndex;
+ this.physicalNode = physicalNode;
+ }
+
+ PhysicalNode getPhysicalNode() {
+ return this.physicalNode;
+ }
+
+ String getKey() {
+ return String.format("%s-%d", physicalNode.getKey(), replicaIndex);
+ }
}
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/utils/MD5HashTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/utils/MD5HashTest.java
new file mode 100644
index 0000000..e6c1c63
--- /dev/null
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/utils/MD5HashTest.java
@@ -0,0 +1,24 @@
+package org.apache.iotdb.cluster.utils;
+
+import static org.junit.Assert.*;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MD5HashTest {
+
+ @Before
+ public void setUp() throws Exception {
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testHash() {
+ fail("Not yet implemented");
+ }
+
+}
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/utils/PhysicalNodeTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/utils/PhysicalNodeTest.java
new file mode 100644
index 0000000..b006194
--- /dev/null
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/utils/PhysicalNodeTest.java
@@ -0,0 +1,34 @@
+package org.apache.iotdb.cluster.utils;
+
+import static org.junit.Assert.*;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PhysicalNodeTest {
+
+ @Before
+ public void setUp() throws Exception {
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testHashCode() {
+ fail("Not yet implemented");
+ }
+
+ @Test
+ public void testGetKey() {
+ fail("Not yet implemented");
+ }
+
+ @Test
+ public void testEqualsObject() {
+ fail("Not yet implemented");
+ }
+
+}
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/utils/RouterTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/utils/RouterTest.java
new file mode 100644
index 0000000..d6e13bf
--- /dev/null
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/utils/RouterTest.java
@@ -0,0 +1,29 @@
+package org.apache.iotdb.cluster.utils;
+
+import static org.junit.Assert.*;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RouterTest {
+
+ @Before
+ public void setUp() throws Exception {
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testRouteGroup() {
+ fail("Not yet implemented");
+ }
+
+ @Test
+ public void testGenerateGroups() {
+ fail("Not yet implemented");
+ }
+
+}