This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch cluster_scalability in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4609bc00ff6d808dd0dbac64fb7edba81a07d4c1 Author: lta <[email protected]> AuthorDate: Tue Feb 23 20:32:01 2021 +0800 This pr fix following bugs: 1. LogPlan serialize and deserialize bug 2. add shell scripts to remove nodes 3. enrich the function of node tool 4. fix some bugs of adding new nodes --- cluster/src/assembly/resources/sbin/add-node.bat | 2 +- cluster/src/assembly/resources/sbin/add-node.sh | 2 +- .../sbin/{start-node.bat => remove-node.bat} | 16 +-- .../resources/sbin/{add-node.sh => remove-node.sh} | 28 +++-- cluster/src/assembly/resources/sbin/start-node.bat | 2 +- .../java/org/apache/iotdb/cluster/ClusterMain.java | 11 +- .../iotdb/cluster/log/manage/RaftLogManager.java | 3 + .../iotdb/cluster/partition/PartitionGroup.java | 4 + .../partition/balancer/DefaultSlotBalancer.java | 2 + .../iotdb/cluster/query/ClusterPlanRouter.java | 2 +- .../org/apache/iotdb/cluster/server/Response.java | 2 + .../server/handlers/caller/NodeStatusHandler.java | 9 +- .../cluster/server/member/MetaGroupMember.java | 139 +++++++++++++-------- .../iotdb/cluster/server/member/RaftMember.java | 1 - .../cluster/utils/nodetool/ClusterMonitor.java | 2 +- .../utils/nodetool/ClusterMonitorMBean.java | 4 +- .../cluster/utils/nodetool/function/Host.java | 2 +- .../cluster/utils/nodetool/function/Status.java | 24 +++- .../apache/iotdb/cluster/log/LogParserTest.java | 14 +++ .../apache/iotdb/db/qp/physical/sys/LogPlan.java | 11 +- 20 files changed, 179 insertions(+), 101 deletions(-) diff --git a/cluster/src/assembly/resources/sbin/add-node.bat b/cluster/src/assembly/resources/sbin/add-node.bat index 958f16f..452f1c3 100755 --- a/cluster/src/assembly/resources/sbin/add-node.bat +++ b/cluster/src/assembly/resources/sbin/add-node.bat @@ -19,7 +19,7 @@ @echo off echo ```````````````````````` -echo Starting IoTDB +echo Starting IoTDB (Cluster Mode) echo ```````````````````````` PATH %PATH%;%JAVA_HOME%\bin\ diff --git a/cluster/src/assembly/resources/sbin/add-node.sh b/cluster/src/assembly/resources/sbin/add-node.sh index 807175b..935abde 100755 --- a/cluster/src/assembly/resources/sbin/add-node.sh +++ b/cluster/src/assembly/resources/sbin/add-node.sh @@ -20,7 +20,7 @@ echo --------------------- -echo Starting IoTDB +echo "Starting IoTDB (Cluster Mode)" echo --------------------- if [ -z "${IOTDB_HOME}" ]; then diff --git a/cluster/src/assembly/resources/sbin/start-node.bat b/cluster/src/assembly/resources/sbin/remove-node.bat similarity index 91% copy from cluster/src/assembly/resources/sbin/start-node.bat copy to cluster/src/assembly/resources/sbin/remove-node.bat index f9a7d1f..a2b0564 100755 --- a/cluster/src/assembly/resources/sbin/start-node.bat +++ b/cluster/src/assembly/resources/sbin/remove-node.bat @@ -19,7 +19,7 @@ @echo off echo ```````````````````````` -echo Starting IoTDB +echo Starting to remove a node (Cluster Mode) echo ```````````````````````` PATH %PATH%;%JAVA_HOME%\bin\ @@ -57,20 +57,8 @@ popd set IOTDB_CONF=%IOTDB_HOME%\conf set IOTDB_LOGS=%IOTDB_HOME%\logs - -IF EXIST "%IOTDB_CONF%\iotdb-env.bat" ( - IF "%1" == "printgc" ( - CALL "%IOTDB_CONF%\iotdb-env.bat" printgc - SHIFT - ) ELSE ( - CALL "%IOTDB_CONF%\iotdb-env.bat" - ) -) ELSE ( - echo "can't find %IOTDB_CONF%\iotdb-env.bat" -) - @setlocal ENABLEDELAYEDEXPANSION ENABLEEXTENSIONS -set CONF_PARAMS=-s +set CONF_PARAMS=-r set is_conf_path=false for %%i in (%*) do ( IF "%%i" == "-c" ( diff --git a/cluster/src/assembly/resources/sbin/add-node.sh b/cluster/src/assembly/resources/sbin/remove-node.sh similarity index 82% copy from cluster/src/assembly/resources/sbin/add-node.sh copy to cluster/src/assembly/resources/sbin/remove-node.sh index 807175b..65ee58b 100755 --- a/cluster/src/assembly/resources/sbin/add-node.sh +++ b/cluster/src/assembly/resources/sbin/remove-node.sh @@ -20,7 +20,7 @@ echo --------------------- -echo Starting IoTDB +echo "Starting to remove a node(Cluster Mode)" echo --------------------- if [ -z "${IOTDB_HOME}" ]; then @@ -28,13 +28,24 @@ if [ -z "${IOTDB_HOME}" ]; then fi IOTDB_CONF=${IOTDB_HOME}/conf -# IOTDB_LOGS=${IOTDB_HOME}/logs -if [ -f "$IOTDB_CONF/iotdb-env.sh" ]; then - . "$IOTDB_CONF/iotdb-env.sh" -else - echo "can't find $IOTDB_CONF/iotdb-env.sh" -fi +is_conf_path=false +for arg do + shift + if [ "$arg" == "-c" ]; then + is_conf_path=true + continue + fi + + if [ $is_conf_path == true ]; then + IOTDB_CONF=$arg + is_conf_path=false + continue + fi + set -- "$@" "$arg" +done + +CONF_PARAMS="-r "$* if [ -n "$JAVA_HOME" ]; then for java in "$JAVA_HOME"/bin/amd64/java "$JAVA_HOME"/bin/java; do @@ -65,8 +76,9 @@ launch_service() iotdb_parms="$iotdb_parms -DIOTDB_HOME=${IOTDB_HOME}" iotdb_parms="$iotdb_parms -DTSFILE_HOME=${IOTDB_HOME}" iotdb_parms="$iotdb_parms -DIOTDB_CONF=${IOTDB_CONF}" + iotdb_parms="$iotdb_parms -DCLUSTER_CONF=${IOTDB_CONF}" iotdb_parms="$iotdb_parms -Dname=iotdb\.IoTDB" - exec "$JAVA" $iotdb_parms $IOTDB_JMX_OPTS -cp "$CLASSPATH" "$class" -a + exec "$JAVA" $iotdb_parms $IOTDB_JMX_OPTS -cp "$CLASSPATH" "$class" $CONF_PARAMS return $? } diff --git a/cluster/src/assembly/resources/sbin/start-node.bat b/cluster/src/assembly/resources/sbin/start-node.bat index f9a7d1f..161cc2a 100755 --- a/cluster/src/assembly/resources/sbin/start-node.bat +++ b/cluster/src/assembly/resources/sbin/start-node.bat @@ -19,7 +19,7 @@ @echo off echo ```````````````````````` -echo Starting IoTDB +echo Starting IoTDB (Cluster Mode) echo ```````````````````````` PATH %PATH%;%JAVA_HOME%\bin\ diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java index 33d8a5d..48f8813 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java @@ -68,8 +68,8 @@ public class ClusterMain { + "[-internal_data_port <internal data port>] " + "[-cluster_rpc_port <cluster rpc port>] " + "[-seed_nodes <node1:meta_port:data_port:cluster_rpc_port," - + "node2:meta_port:data_port:cluster_rpc_port," - + "...,noden:meta_port:data_port:cluster_rpc_port>] " + + "node2:meta_port:data_port:cluster_rpc_port," + + "...,noden:meta_port:data_port:cluster_rpc_port>] " + "[-sc] " + "[-rpc_port <rpc port>]"); return; @@ -276,6 +276,9 @@ public class ClusterMain { logger.error("Cluster size is too small, cannot remove any node"); } else if (response == Response.RESPONSE_REJECT) { logger.error("Node {} is not found in the cluster, please check", nodeToRemove); + } else if (response == Response.RESPONSE_CHANGE_MEMBERSHIP_CONFLICT) { + logger.warn( + "The cluster is performing other change membership operations. Change membership operations should be performed one by one. Please try again later"); } else { logger.error("Unexpected response {}", response); } @@ -302,7 +305,7 @@ public class ClusterMain { public int calculateSlotByTime(String storageGroupName, long timestamp, int maxSlotNum) { int sgSerialNum = extractSerialNumInSGName(storageGroupName) % k; if (sgSerialNum >= 0) { - return (int)(maxSlotNum / k * (sgSerialNum + 0.5)); + return (int) (maxSlotNum / k * (sgSerialNum + 0.5)); } else { return defaultStrategy.calculateSlotByTime(storageGroupName, timestamp, maxSlotNum); } @@ -313,7 +316,7 @@ public class ClusterMain { int maxSlotNum) { int sgSerialNum = extractSerialNumInSGName(storageGroupName) % k; if (sgSerialNum >= 0) { - return (int)(maxSlotNum / k * (sgSerialNum + 0.5)); + return (int) (maxSlotNum / k * (sgSerialNum + 0.5)); } else { return defaultStrategy .calculateSlotByPartitionNum(storageGroupName, partitionId, maxSlotNum); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java index f774567..b08d327 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java @@ -641,6 +641,9 @@ public abstract class RaftLogManager { */ void applyEntries(List<Log> entries) { for (Log entry : entries) { + if (entry.isApplied()) { + continue; + } if (blockAppliedCommitIndex > 0 && entry.getCurrLogIndex() > blockAppliedCommitIndex) { blockedUnappliedLogList.add(entry); continue; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java index b35cc10..e7d039c 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java @@ -98,4 +98,8 @@ public class PartitionGroup extends ArrayList<Node> { return id; } + @Override + public String toString() { + return String.format("PartitionGroup{id=%d, header=%s}", id, get(0)); + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/balancer/DefaultSlotBalancer.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/balancer/DefaultSlotBalancer.java index eb1825f..ad90d65 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/balancer/DefaultSlotBalancer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/balancer/DefaultSlotBalancer.java @@ -79,6 +79,7 @@ public class DefaultSlotBalancer implements SlotBalancer { previousNodeMap.get(curNode).put(slot, table.getHeaderGroup(entry.getKey(), oldRing)); slotNodes[slot] = curNode; } + slotsToMove.clear(); transferNum -= numToMove; if (transferNum > 0) { curNode = new RaftNode(newNode, ++raftId); @@ -89,6 +90,7 @@ public class DefaultSlotBalancer implements SlotBalancer { previousNodeMap.get(curNode).put(slot, table.getHeaderGroup(entry.getKey(), oldRing)); slotNodes[slot] = curNode; } + slotsToMove.clear(); } } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java index 65f66dc..8fbc772 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java @@ -161,7 +161,7 @@ public class ClusterPlanRouter { throw new UnsupportedPlanException(plan); } for (PartitionGroup partitionGroup: partitionTable.calculateGlobalGroups(oldRing)) { - result.put(plan, partitionGroup); + result.put(new LogPlan(plan), partitionGroup); } return result; } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java index 373d535..8a9b710 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java @@ -49,6 +49,8 @@ public class Response { // the new node, which tries to join the cluster, contains conflicted parameters with the // cluster, so the operation is rejected. public static final long RESPONSE_NEW_NODE_PARAMETER_CONFLICT = -10; + // add/remove node operations should one by one + public static final long RESPONSE_CHANGE_MEMBERSHIP_CONFLICT = -11; // the request is not executed locally anc should be forwarded public static final long RESPONSE_NULL = Long.MIN_VALUE; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/NodeStatusHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/NodeStatusHandler.java index f17bc6e..4a0c43f 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/NodeStatusHandler.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/NodeStatusHandler.java @@ -27,11 +27,11 @@ import org.apache.thrift.async.AsyncMethodCallback; public class NodeStatusHandler implements AsyncMethodCallback<Node> { - private Map<Node, Boolean> nodeStatusMap; + private Map<Node, Integer> nodeStatusMap; private AtomicInteger countResponse; - public NodeStatusHandler(Map<Node, Boolean> nodeStatusMap) { + public NodeStatusHandler(Map<Node, Integer> nodeStatusMap) { this.nodeStatusMap = nodeStatusMap; this.countResponse = new AtomicInteger(); } @@ -39,7 +39,10 @@ public class NodeStatusHandler implements AsyncMethodCallback<Node> { @Override public void onComplete(Node response) { synchronized (nodeStatusMap) { - nodeStatusMap.put(response, true); + if (response == null) { + return; + } + nodeStatusMap.put(response, 0); // except for this node itself if(countResponse.incrementAndGet() == nodeStatusMap.size() - 1){ nodeStatusMap.notifyAll(); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java index 9dc5d67..dd5b419 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java @@ -19,7 +19,40 @@ package org.apache.iotdb.cluster.server.member; +import static org.apache.iotdb.cluster.utils.ClusterUtils.WAIT_START_UP_CHECK_TIME_SEC; +import static org.apache.iotdb.cluster.utils.ClusterUtils.analyseStartUpCheckResult; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.iotdb.cluster.client.DataClientProvider; import org.apache.iotdb.cluster.client.async.AsyncClientPool; import org.apache.iotdb.cluster.client.async.AsyncMetaClient; @@ -117,40 +150,6 @@ import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.FileReader; -import java.io.FileWriter; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.apache.iotdb.cluster.utils.ClusterUtils.WAIT_START_UP_CHECK_TIME_SEC; -import static org.apache.iotdb.cluster.utils.ClusterUtils.analyseStartUpCheckResult; - @SuppressWarnings("java:S1135") public class MetaGroupMember extends RaftMember { @@ -472,6 +471,7 @@ public class MetaGroupMember extends RaftMember { partitionTable = new SlotPartitionTable(allNodes, thisNode); logger.info("Partition table is set up"); } + initIdNodeMap(); router = new ClusterPlanRouter(partitionTable); this.coordinator.setRouter(router); startSubServers(); @@ -604,6 +604,9 @@ public class MetaGroupMember extends RaftMember { setNodeIdentifier(genNodeIdentifier()); } else if (resp.getRespNum() == Response.RESPONSE_NEW_NODE_PARAMETER_CONFLICT) { handleConfigInconsistency(resp); + } else if (resp.getRespNum() == Response.RESPONSE_CHANGE_MEMBERSHIP_CONFLICT) { + logger.warn( + "The cluster is performing other change membership operations. Change membership operations should be performed one by one. Please try again later"); } else { logger .warn("Joining the cluster is rejected by {} for response {}", node, resp.getRespNum()); @@ -867,18 +870,27 @@ public class MetaGroupMember extends RaftMember { * immediately. If the identifier of "node" conflicts with an existing node, the request will be * turned down. * - * @param node cannot be the local node + * @param newNode cannot be the local node * @param startUpStatus the start up status of the new node * @param response the response that will be sent to "node" * @return true if the process is over, false if the request should be forwarded */ - private boolean processAddNodeLocally(Node node, StartUpStatus startUpStatus, + private boolean processAddNodeLocally(Node newNode, StartUpStatus startUpStatus, AddNodeResponse response) throws LogExecutionException { if (character != NodeCharacter.LEADER) { return false; } - if (allNodes.contains(node)) { - logger.debug("Node {} is already in the cluster", node); + boolean nodeExistInPartitionTable = false; + for (Node node : partitionTable.getAllNodes()) { + if (node.ip.equals(newNode.ip) && newNode.dataPort == node.dataPort + && newNode.metaPort == node.metaPort && newNode.clientPort == node.clientPort) { + newNode.nodeIdentifier = node.nodeIdentifier; + nodeExistInPartitionTable = true; + break; + } + } + if (allNodes.contains(newNode)) { + logger.debug("Node {} is already in the cluster", newNode); response.setRespNum((int) Response.RESPONSE_AGREE); synchronized (partitionTable) { response.setPartitionTableBytes(partitionTable.serialize()); @@ -886,9 +898,14 @@ public class MetaGroupMember extends RaftMember { return true; } - Node idConflictNode = idNodeMap.get(node.getNodeIdentifier()); + if (!nodeExistInPartitionTable && partitionTable.getAllNodes().size() != allNodes.size()) { + response.setRespNum((int) Response.RESPONSE_CHANGE_MEMBERSHIP_CONFLICT); + return true; + } + + Node idConflictNode = idNodeMap.get(newNode.getNodeIdentifier()); if (idConflictNode != null) { - logger.debug("{}'s id conflicts with {}", node, idConflictNode); + logger.debug("{}'s id conflicts with {}", newNode, idConflictNode); response.setRespNum((int) Response.RESPONSE_IDENTIFIER_CONFLICT); return true; } @@ -901,7 +918,7 @@ public class MetaGroupMember extends RaftMember { // node adding is serialized to reduce potential concurrency problem synchronized (logManager) { // update partition table - partitionTable.addNode(node); + partitionTable.addNode(newNode); ((SlotPartitionTable) partitionTable).setLastLogIndex(logManager.getLastLogIndex() + 1); AddNodeLog addNodeLog = new AddNodeLog(); @@ -910,28 +927,28 @@ public class MetaGroupMember extends RaftMember { addNodeLog.setCurrLogIndex(logManager.getLastLogIndex() + 1); addNodeLog.setMetaLogIndex(logManager.getLastLogIndex() + 1); - addNodeLog.setNewNode(node); + addNodeLog.setNewNode(newNode); logManager.append(addNodeLog); int retryTime = 1; while (true) { logger - .info("Send the join request of {} to other nodes, retry time: {}", node, retryTime); + .info("Send the join request of {} to other nodes, retry time: {}", newNode, retryTime); AppendLogResult result = sendLogToFollowers(addNodeLog); switch (result) { case OK: commitLog(addNodeLog); - logger.info("Join request of {} is accepted", node); + logger.info("Join request of {} is accepted", newNode); synchronized (partitionTable) { response.setPartitionTableBytes(partitionTable.serialize()); } response.setRespNum((int) Response.RESPONSE_AGREE); - logger.info("Sending join response of {}", node); + logger.info("Sending join response of {}", newNode); return true; case TIME_OUT: - logger.info("Join request of {} timed out", node); + logger.info("Join request of {} timed out", newNode); retryTime++; continue; case LEADERSHIP_STALE: @@ -1606,6 +1623,7 @@ public class MetaGroupMember extends RaftMember { allRedirect = false; } if (tmpStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + logger.error("Fail to send log {} to data group {}", entry.getKey(), entry.getValue()); // execution failed, record the error message errorCodePartitionGroups.add(String.format("[%s@%s:%s]", tmpStatus.getCode(), entry.getValue().getHeader(), @@ -1715,7 +1733,7 @@ public class MetaGroupMember extends RaftMember { private TSStatus forwardDataPlanSync(PhysicalPlan plan, Node receiver, Node header) throws IOException { - Client client = null; + Client client; try { client = getClientProvider().getSyncDataClient(receiver, RaftServer.getWriteOperationTimeoutMS()); @@ -1726,8 +1744,6 @@ public class MetaGroupMember extends RaftMember { } /** -======= ->>>>>>> master * Get the data groups that should be queried when querying "path" with "filter". First, the time * interval qualified by the filter will be extracted. If any side of the interval is open, query * all groups. Otherwise compute all involved groups w.r.t. the time partitioning. @@ -1776,14 +1792,14 @@ public class MetaGroupMember extends RaftMember { } @SuppressWarnings("java:S2274") - public Map<Node, Boolean> getAllNodeStatus() { + public Map<Node, Integer> getAllNodeStatus() { if (getPartitionTable() == null) { // the cluster is being built. return null; } - Map<Node, Boolean> nodeStatus = new HashMap<>(); + Map<Node, Integer> nodeStatus = new HashMap<>(); for (Node node : allNodes) { - nodeStatus.put(node, thisNode.equals(node)); + nodeStatus.put(node, thisNode.equals(node) ? 0 : 1); } try { @@ -1798,11 +1814,20 @@ public class MetaGroupMember extends RaftMember { Thread.currentThread().interrupt(); logger.warn("Cannot get the status of all nodes", e); } + + for (Node node: partitionTable.getAllNodes()) { + nodeStatus.putIfAbsent(node, 2); + } + for (Node node : allNodes) { + if (!partitionTable.getAllNodes().contains(node)) { + nodeStatus.put(node, 3); + } + } return nodeStatus; } @SuppressWarnings({"java:S2445", "java:S2274"}) - private void getNodeStatusAsync(Map<Node, Boolean> nodeStatus) + private void getNodeStatusAsync(Map<Node, Integer> nodeStatus) throws TException, InterruptedException { NodeStatusHandler nodeStatusHandler = new NodeStatusHandler(nodeStatus); synchronized (nodeStatus) { @@ -1816,7 +1841,7 @@ public class MetaGroupMember extends RaftMember { } } - private void getNodeStatusSync(Map<Node, Boolean> nodeStatus) { + private void getNodeStatusSync(Map<Node, Integer> nodeStatus) { NodeStatusHandler nodeStatusHandler = new NodeStatusHandler(nodeStatus); for (Node node : allNodes) { SyncMetaClient client = (SyncMetaClient) getSyncClient(node); @@ -1902,10 +1927,14 @@ public class MetaGroupMember extends RaftMember { return Response.RESPONSE_REJECT; } + if (partitionTable.getAllNodes().contains(target) && partitionTable.getAllNodes().size() != allNodes.size()) { + return Response.RESPONSE_CHANGE_MEMBERSHIP_CONFLICT; + } + // node removal must be serialized to reduce potential concurrency problem synchronized (logManager) { // update partition table - partitionTable.addNode(node); + partitionTable.removeNode(node); ((SlotPartitionTable) partitionTable).setLastLogIndex(logManager.getLastLogIndex() + 1); RemoveNodeLog removeNodeLog = new RemoveNodeLog(); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java index d05ddd9..2cf5f77 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java @@ -907,7 +907,6 @@ public abstract class RaftMember { try { if (appendLogInGroup(log)) { - TSStatus res = StatusUtils.OK; return StatusUtils.OK; } } catch (LogExecutionException e) { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java index 890b402..f1f46d2 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java @@ -134,7 +134,7 @@ public class ClusterMonitor implements ClusterMonitorMBean, IService { } @Override - public Map<Node, Boolean> getAllNodeStatus() { + public Map<Node, Integer> getAllNodeStatus() { MetaGroupMember metaGroupMember = getMetaGroupMember(); if (metaGroupMember == null) { return null; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitorMBean.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitorMBean.java index ea52c28..cc3e7b7 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitorMBean.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitorMBean.java @@ -66,9 +66,9 @@ public interface ClusterMonitorMBean { /** * Get status of all nodes * - * @return key: node, value: live or not + * @return key: node, value: 0(live), 1(offline), 2(joining), 3(leaving) */ - Map<Node, Boolean> getAllNodeStatus(); + Map<Node, Integer> getAllNodeStatus(); /** * diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Host.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Host.java index da4305d..d32b94d 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Host.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Host.java @@ -63,7 +63,7 @@ public class Host extends NodeToolCmd { for (int i = 1; i < raftGroup.size(); i++) { builder.append(", ").append(nodeToString(raftGroup.get(i))); } - builder.append(')'); + builder.append("),id=").append(raftGroup.getId()); msgPrintln(String.format("%-50s->%20s", builder.toString(), slotNum)); } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Status.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Status.java index 700990a..fe8eb0f 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Status.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Status.java @@ -22,6 +22,7 @@ import static org.apache.iotdb.cluster.utils.nodetool.Printer.msgPrintln; import io.airlift.airline.Command; import java.util.Map; +import java.util.Map.Entry; import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.iotdb.cluster.utils.nodetool.ClusterMonitorMBean; @@ -30,15 +31,26 @@ public class Status extends NodeToolCmd { @Override public void execute(ClusterMonitorMBean proxy) { - Map<Node, Boolean> statusMap = proxy.getAllNodeStatus(); - if(statusMap == null){ + Map<Node, Integer> statusMap = proxy.getAllNodeStatus(); + if (statusMap == null) { msgPrintln(BUILDING_CLUSTER_INFO); return; } msgPrintln(String.format("%-30s %10s", "Node", "Status")); - statusMap.forEach( - (node, status) -> msgPrintln(String.format("%-30s->%10s", nodeToString(node), - (Boolean.TRUE.equals(status) ? - "on" : "off")))); + for (Entry<Node, Integer> entry : statusMap.entrySet()) { + Node node = entry.getKey(); + Integer statusNum = entry.getValue(); + String status; + if (statusNum == 0) { + status = "on"; + } else if (statusNum == 1) { + status = "off"; + } else if (statusNum == 2) { + status = "joining"; + } else { + status = "leaving"; + } + msgPrintln(String.format("%-30s->%10s", nodeToString(node), status)); + } } } \ No newline at end of file diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java index 76efe5f..59874b4 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java @@ -21,6 +21,7 @@ package org.apache.iotdb.cluster.log; import static org.junit.Assert.assertEquals; +import java.io.IOException; import java.nio.ByteBuffer; import org.apache.iotdb.cluster.common.TestUtils; import org.apache.iotdb.cluster.exception.UnknownLogTypeException; @@ -29,8 +30,12 @@ import org.apache.iotdb.cluster.log.logtypes.CloseFileLog; import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog; import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog; import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog; +import org.apache.iotdb.cluster.partition.PartitionGroup; +import org.apache.iotdb.cluster.utils.PlanSerializer; import org.apache.iotdb.db.exception.metadata.IllegalPathException; import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.qp.physical.PhysicalPlan; +import org.apache.iotdb.db.qp.physical.sys.LogPlan; import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan; import org.junit.Test; @@ -98,4 +103,13 @@ public class LogParserTest { Log serialized = logParser.parse(byteBuffer); assertEquals(log, serialized); } + + @Test + public void testLogPlan() throws IOException, IllegalPathException, UnknownLogTypeException { + AddNodeLog log = new AddNodeLog(TestUtils.seralizePartitionTable, TestUtils.getNode(0)); + LogPlan logPlan = new LogPlan(log.serialize()); + ByteBuffer buffer = ByteBuffer.wrap(PlanSerializer.getInstance().serialize(logPlan)); + PhysicalPlan plan = PhysicalPlan.Factory.create(buffer); + LogParser.getINSTANCE().parse(((LogPlan) plan).getLog()); + } } \ No newline at end of file diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/LogPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/LogPlan.java index bdc19c5..d0118db 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/LogPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/LogPlan.java @@ -43,6 +43,11 @@ public class LogPlan extends PhysicalPlan { this.log = log; } + public LogPlan(LogPlan plan) { + super(false); + this.log = plan.log; + } + public ByteBuffer getLog() { log.clear(); return log; @@ -65,8 +70,10 @@ public class LogPlan extends PhysicalPlan { } @Override - public void serialize(ByteBuffer buffer) { + public void deserialize(ByteBuffer buffer) { int len = buffer.getInt(); - log = ByteBuffer.wrap(buffer.array(), buffer.position(), len); + byte[] data = new byte[len]; + System.arraycopy(buffer.array(), buffer.position(), data, 0, len); + log = ByteBuffer.wrap(data); } }
