This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d05dcde13c [IOTDB-3391] && [IOTDB-3372] Judge ConfigNode status
through heartbeat && show cluster did not return a correct result when stop a
datanode or confignode (#6402)
d05dcde13c is described below
commit d05dcde13c908b4613c6214b6f10d7657f7b62ae
Author: 任宇华 <[email protected]>
AuthorDate: Tue Jun 28 15:42:17 2022 +0800
[IOTDB-3391] && [IOTDB-3372] Judge ConfigNode status through heartbeat &&
show cluster did not return a correct result when stop a datanode or confignode
(#6402)
---
.../client/AsyncConfigNodeClientPool.java | 72 ++++++++++++++++++++++
.../confignode/client/AsyncDataNodeClientPool.java | 7 ++-
...andler.java => ConfigNodeHeartbeatHandler.java} | 34 +++++-----
...tHandler.java => DataNodeHeartbeatHandler.java} | 17 ++---
.../iotdb/confignode/manager/ConfigManager.java | 25 ++++++++
.../apache/iotdb/confignode/manager/IManager.java | 3 +
.../iotdb/confignode/manager/NodeManager.java | 6 ++
.../iotdb/confignode/manager/load/LoadManager.java | 64 +++++++++++++++----
...eatCache.java => ConfigNodeHeartbeatCache.java} | 11 +---
...tbeatCache.java => DataNodeHeartbeatCache.java} | 6 +-
.../thrift/ConfigNodeRPCServiceProcessor.java | 20 ++----
.../apache/iotdb/db/client/ConfigNodeClient.java | 13 ++++
.../mpp/plan/execution/config/ShowClusterTask.java | 5 +-
.../service/thrift/impl/InternalServiceImpl.java | 2 +-
thrift-commons/src/main/thrift/common.thrift | 10 +--
.../src/main/thrift/confignode.thrift | 4 ++
thrift/src/main/thrift/mpp.thrift | 2 +-
17 files changed, 227 insertions(+), 74 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncConfigNodeClientPool.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncConfigNodeClientPool.java
new file mode 100644
index 0000000000..00e37493dd
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncConfigNodeClientPool.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.client;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.async.AsyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.client.handlers.ConfigNodeHeartbeatHandler;
+import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AsyncConfigNodeClientPool {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AsyncConfigNodeClientPool.class);
+
+ private final IClientManager<TEndPoint, AsyncConfigNodeIServiceClient>
clientManager;
+
+ private AsyncConfigNodeClientPool() {
+ clientManager =
+ new IClientManager.Factory<TEndPoint, AsyncConfigNodeIServiceClient>()
+ .createClientManager(
+ new
DataNodeClientPoolFactory.AsyncConfigNodeIServiceClientPoolFactory());
+ }
+
+ /**
+ * Only used in LoadManager
+ *
+ * @param endPoint The specific ConfigNode
+ */
+ public void getConfigNodeHeartBeat(
+ TEndPoint endPoint, long timestamp, ConfigNodeHeartbeatHandler handler) {
+ AsyncConfigNodeIServiceClient client;
+ try {
+ client = clientManager.borrowClient(endPoint);
+ client.getConfigNodeHeartBeat(timestamp, handler);
+ } catch (Exception e) {
+ LOGGER.error("Asking ConfigNode: {}, for heartbeat failed", endPoint, e);
+ }
+ }
+
+ // TODO: Is the ClientPool must be a singleton?
+ private static class AsyncConfigNodeClientPoolHolder {
+
+ private static final AsyncConfigNodeClientPool INSTANCE = new
AsyncConfigNodeClientPool();
+
+ private AsyncConfigNodeClientPoolHolder() {
+ // Empty constructor
+ }
+ }
+
+ public static AsyncConfigNodeClientPool getInstance() {
+ return AsyncConfigNodeClientPool.AsyncConfigNodeClientPoolHolder.INSTANCE;
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
index 9ecab37d8a..aa5b679902 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
@@ -26,9 +26,9 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.client.IClientManager;
import
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
import org.apache.iotdb.confignode.client.handlers.CreateRegionHandler;
+import org.apache.iotdb.confignode.client.handlers.DataNodeHeartbeatHandler;
import org.apache.iotdb.confignode.client.handlers.FlushHandler;
import org.apache.iotdb.confignode.client.handlers.FunctionManagementHandler;
-import org.apache.iotdb.confignode.client.handlers.HeartbeatHandler;
import org.apache.iotdb.confignode.consensus.request.write.CreateRegionsReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest;
@@ -219,11 +219,12 @@ public class AsyncDataNodeClientPool {
*
* @param endPoint The specific DataNode
*/
- public void getHeartBeat(TEndPoint endPoint, THeartbeatReq req,
HeartbeatHandler handler) {
+ public void getDataNodeHeartBeat(
+ TEndPoint endPoint, THeartbeatReq req, DataNodeHeartbeatHandler handler)
{
AsyncDataNodeInternalServiceClient client;
try {
client = clientManager.borrowClient(endPoint);
- client.getHeartBeat(req, handler);
+ client.getDataNodeHeartBeat(req, handler);
} catch (Exception e) {
LOGGER.error("Asking DataNode: {}, for heartbeat failed", endPoint, e);
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/HeartbeatHandler.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/ConfigNodeHeartbeatHandler.java
similarity index 52%
copy from
confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/HeartbeatHandler.java
copy to
confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/ConfigNodeHeartbeatHandler.java
index e4a4a27d55..4a3b6b94f7 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/HeartbeatHandler.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/ConfigNodeHeartbeatHandler.java
@@ -18,40 +18,40 @@
*/
package org.apache.iotdb.confignode.client.handlers;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatCache;
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import
org.apache.iotdb.confignode.manager.load.heartbeat.ConfigNodeHeartbeatCache;
import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatPackage;
-import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class HeartbeatHandler implements AsyncMethodCallback<THeartbeatResp> {
+public class ConfigNodeHeartbeatHandler implements AsyncMethodCallback<Long> {
- private static final Logger LOGGER =
LoggerFactory.getLogger(HeartbeatHandler.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ConfigNodeHeartbeatHandler.class);
- // Update HeartbeatCache when success
- private final TDataNodeLocation dataNodeLocation;
- private final HeartbeatCache heartbeatCache;
+ // Update ConfigNodeHeartbeatCache when success
+ private final TConfigNodeLocation configNodeLocation;
+ private final ConfigNodeHeartbeatCache configNodeHeartbeatCache;
- public HeartbeatHandler(TDataNodeLocation dataNodeLocation, HeartbeatCache
heartbeatCache) {
- this.dataNodeLocation = dataNodeLocation;
- this.heartbeatCache = heartbeatCache;
+ public ConfigNodeHeartbeatHandler(
+ TConfigNodeLocation configNodeLocation, ConfigNodeHeartbeatCache
configNodeHeartbeatCache) {
+ this.configNodeLocation = configNodeLocation;
+ this.configNodeHeartbeatCache = configNodeHeartbeatCache;
}
@Override
- public void onComplete(THeartbeatResp tHeartbeatResp) {
- heartbeatCache.cacheHeartBeat(
- new HeartbeatPackage(tHeartbeatResp.getHeartbeatTimestamp(),
System.currentTimeMillis()));
+ public void onComplete(Long timestamp) {
+ configNodeHeartbeatCache.cacheHeartBeat(
+ new HeartbeatPackage(timestamp, System.currentTimeMillis()));
}
@Override
public void onError(Exception e) {
LOGGER.warn(
- "Heartbeat error on DataNode: {id={}, internalEndPoint={}}",
- dataNodeLocation.getDataNodeId(),
- dataNodeLocation.getInternalEndPoint(),
+ "Heartbeat error on ConfigNode: {id={}, internalEndPoint={}}",
+ configNodeLocation.getConfigNodeId(),
+ configNodeLocation.getInternalEndPoint(),
e);
}
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/HeartbeatHandler.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/DataNodeHeartbeatHandler.java
similarity index 73%
rename from
confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/HeartbeatHandler.java
rename to
confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/DataNodeHeartbeatHandler.java
index e4a4a27d55..0da0022626 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/HeartbeatHandler.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/DataNodeHeartbeatHandler.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.confignode.client.handlers;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatCache;
+import
org.apache.iotdb.confignode.manager.load.heartbeat.DataNodeHeartbeatCache;
import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatPackage;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
@@ -27,22 +27,23 @@ import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class HeartbeatHandler implements AsyncMethodCallback<THeartbeatResp> {
+public class DataNodeHeartbeatHandler implements
AsyncMethodCallback<THeartbeatResp> {
- private static final Logger LOGGER =
LoggerFactory.getLogger(HeartbeatHandler.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DataNodeHeartbeatHandler.class);
- // Update HeartbeatCache when success
+ // Update DataNodeHeartbeatCache when success
private final TDataNodeLocation dataNodeLocation;
- private final HeartbeatCache heartbeatCache;
+ private final DataNodeHeartbeatCache dataNodeHeartbeatCache;
- public HeartbeatHandler(TDataNodeLocation dataNodeLocation, HeartbeatCache
heartbeatCache) {
+ public DataNodeHeartbeatHandler(
+ TDataNodeLocation dataNodeLocation, DataNodeHeartbeatCache
dataNodeHeartbeatCache) {
this.dataNodeLocation = dataNodeLocation;
- this.heartbeatCache = heartbeatCache;
+ this.dataNodeHeartbeatCache = dataNodeHeartbeatCache;
}
@Override
public void onComplete(THeartbeatResp tHeartbeatResp) {
- heartbeatCache.cacheHeartBeat(
+ dataNodeHeartbeatCache.cacheHeartBeat(
new HeartbeatPackage(tHeartbeatResp.getHeartbeatTimestamp(),
System.currentTimeMillis()));
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 88db72bf78..dd972bbbac 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -20,6 +20,8 @@
package org.apache.iotdb.confignode.manager;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
@@ -67,6 +69,7 @@ import org.apache.iotdb.confignode.persistence.ProcedureInfo;
import org.apache.iotdb.confignode.persistence.UDFInfo;
import org.apache.iotdb.confignode.persistence.executor.ConfigRequestExecutor;
import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TClusterNodeInfos;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionResp;
@@ -185,6 +188,28 @@ public class ConfigManager implements IManager {
}
}
+ @Override
+ public TClusterNodeInfos getAllClusterNodeInfos() {
+ TSStatus status = confirmLeader();
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ List<TConfigNodeLocation> configNodeLocations =
getNodeManager().getOnlineConfigNodes();
+ List<TDataNodeLocation> dataNodeInfoLocations =
+ getNodeManager().getOnlineDataNodes(-1).stream()
+ .map(TDataNodeInfo::getLocation)
+ .collect(Collectors.toList());
+ Map<Integer, String> nodeStatus = new HashMap<>();
+ getLoadManager()
+ .getHeartbeatCacheMap()
+ .forEach(
+ (nodeId, heartbeatCache) -> {
+ nodeStatus.put(nodeId,
heartbeatCache.getNodeStatus().getStatus());
+ });
+ return new TClusterNodeInfos(status, configNodeLocations,
dataNodeInfoLocations, nodeStatus);
+ } else {
+ return new TClusterNodeInfos(status, new ArrayList<>(), new
ArrayList<>(), new HashMap<>());
+ }
+ }
+
@Override
public TSStatus setTTL(SetTTLReq setTTLReq) {
TSStatus status = confirmLeader();
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 65ce70ade3..f6af3efe3d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -37,6 +37,7 @@ import
org.apache.iotdb.confignode.consensus.request.write.SetStorageGroupReq;
import org.apache.iotdb.confignode.consensus.request.write.SetTTLReq;
import
org.apache.iotdb.confignode.consensus.request.write.SetTimePartitionIntervalReq;
import org.apache.iotdb.confignode.manager.load.LoadManager;
+import org.apache.iotdb.confignode.rpc.thrift.TClusterNodeInfos;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionResp;
@@ -117,6 +118,8 @@ public interface IManager {
*/
DataSet getDataNodeInfo(GetDataNodeInfoReq getDataNodeInfoReq);
+ TClusterNodeInfos getAllClusterNodeInfos();
+
TSStatus setTTL(SetTTLReq configRequest);
TSStatus setSchemaReplicationFactor(SetSchemaReplicationFactorReq
configRequest);
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
index b448d10cca..ea644481ba 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
@@ -229,6 +229,10 @@ public class NodeManager {
// Execute removePeer
if (getConsensusManager().removeConfigNodePeer(removeConfigNodeReq)) {
+ configManager
+ .getLoadManager()
+ .removeNodeHeartbeatHandCache(
+
removeConfigNodeReq.getConfigNodeLocation().getConfigNodeId());
return getConsensusManager().write(removeConfigNodeReq).getStatus();
} else {
return new
TSStatus(TSStatusCode.REMOVE_CONFIGNODE_FAILED.getStatusCode())
@@ -306,6 +310,8 @@ public class NodeManager {
@Override
public void removeDataNode(TDataNodeLocation dataNodeInfo) {
+ // TODO: When removing a datanode, do the following
+ //
configManager.getLoadManager().removeNodeHeartbeatHandCache(dataNodeId);
serverChanged();
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 3d76fea103..198085009e 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.confignode.manager.load;
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
@@ -28,8 +29,10 @@ import
org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.partition.DataPartitionTable;
import org.apache.iotdb.commons.partition.SchemaPartitionTable;
+import org.apache.iotdb.confignode.client.AsyncConfigNodeClientPool;
import org.apache.iotdb.confignode.client.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.handlers.HeartbeatHandler;
+import org.apache.iotdb.confignode.client.handlers.ConfigNodeHeartbeatHandler;
+import org.apache.iotdb.confignode.client.handlers.DataNodeHeartbeatHandler;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.write.CreateRegionsReq;
import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
@@ -42,7 +45,8 @@ import org.apache.iotdb.confignode.manager.PartitionManager;
import org.apache.iotdb.confignode.manager.load.balancer.PartitionBalancer;
import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer;
import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
-import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatCache;
+import
org.apache.iotdb.confignode.manager.load.heartbeat.ConfigNodeHeartbeatCache;
+import
org.apache.iotdb.confignode.manager.load.heartbeat.DataNodeHeartbeatCache;
import org.apache.iotdb.confignode.manager.load.heartbeat.IHeartbeatStatistic;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
@@ -69,8 +73,8 @@ public class LoadManager {
private final long heartbeatInterval =
ConfigNodeDescriptor.getInstance().getConf().getHeartbeatInterval();
- // Map<NodeId, HeartbeatCache>
- private final Map<Integer, HeartbeatCache> heartbeatCacheMap;
+ // Map<NodeId, IHeartbeatStatistic>
+ private final Map<Integer, IHeartbeatStatistic> heartbeatCacheMap;
// Balancers
private final RegionBalancer regionBalancer;
@@ -201,8 +205,8 @@ public class LoadManager {
if (getConsensusManager().isLeader()) {
// Send heartbeat requests to all the online DataNodes
pingOnlineDataNodes(getNodeManager().getOnlineDataNodes(-1));
- // TODO: Send heartbeat requests to all the online ConfigNodes
-
+ // Send heartbeat requests to all the online ConfigNodes
+ pingOnlineConfigNodes(getNodeManager().getOnlineConfigNodes());
// Do load balancing
doLoadBalancing(balanceCount);
balanceCount += 1;
@@ -232,17 +236,51 @@ public class LoadManager {
private void pingOnlineDataNodes(List<TDataNodeInfo> onlineDataNodes) {
// Send heartbeat requests
for (TDataNodeInfo dataNodeInfo : onlineDataNodes) {
- HeartbeatHandler handler =
- new HeartbeatHandler(
+ DataNodeHeartbeatHandler handler =
+ new DataNodeHeartbeatHandler(
dataNodeInfo.getLocation(),
- heartbeatCacheMap.computeIfAbsent(
- dataNodeInfo.getLocation().getDataNodeId(), empty -> new
HeartbeatCache()));
+ (DataNodeHeartbeatCache)
+ heartbeatCacheMap.computeIfAbsent(
+ dataNodeInfo.getLocation().getDataNodeId(),
+ empty -> new DataNodeHeartbeatCache()));
AsyncDataNodeClientPool.getInstance()
- .getHeartBeat(
+ .getDataNodeHeartBeat(
dataNodeInfo.getLocation().getInternalEndPoint(),
genHeartbeatReq(), handler);
}
}
+ /**
+ * Send heartbeat requests to all the online ConfigNodes
+ *
+ * @param onlineConfigNodes ConfigNodes that currently online
+ */
+ private void pingOnlineConfigNodes(List<TConfigNodeLocation>
onlineConfigNodes) {
+ // Send heartbeat requests
+ for (TConfigNodeLocation configNodeLocation : onlineConfigNodes) {
+ ConfigNodeHeartbeatHandler handler =
+ new ConfigNodeHeartbeatHandler(
+ configNodeLocation,
+ (ConfigNodeHeartbeatCache)
+ heartbeatCacheMap.computeIfAbsent(
+ configNodeLocation.getConfigNodeId(),
+ empty -> new ConfigNodeHeartbeatCache()));
+ AsyncConfigNodeClientPool.getInstance()
+ .getConfigNodeHeartBeat(
+ configNodeLocation.getInternalEndPoint(),
+ genHeartbeatReq().getHeartbeatTimestamp(),
+ handler);
+ }
+ }
+
+ /**
+ * When a node is removed, clear the node's cache
+ *
+ * @param nodeId removed node id
+ */
+ public void removeNodeHeartbeatHandCache(Integer nodeId) {
+ heartbeatCacheMap.remove(nodeId);
+ }
+
private ConsensusManager getConsensusManager() {
return configManager.getConsensusManager();
}
@@ -258,4 +296,8 @@ public class LoadManager {
private PartitionManager getPartitionManager() {
return configManager.getPartitionManager();
}
+
+ public Map<Integer, IHeartbeatStatistic> getHeartbeatCacheMap() {
+ return heartbeatCacheMap;
+ }
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatCache.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/ConfigNodeHeartbeatCache.java
similarity index 88%
copy from
confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatCache.java
copy to
confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/ConfigNodeHeartbeatCache.java
index 734d186833..039dda5583 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatCache.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/ConfigNodeHeartbeatCache.java
@@ -22,22 +22,18 @@ import org.apache.iotdb.commons.cluster.NodeStatus;
import java.util.LinkedList;
-/** HeartbeatCache caches and maintains all the heartbeat data */
-public class HeartbeatCache implements IHeartbeatStatistic {
+public class ConfigNodeHeartbeatCache implements IHeartbeatStatistic {
// Cache heartbeat samples
private static final int maximumWindowSize = 100;
private final LinkedList<HeartbeatPackage> slidingWindow;
- // For guiding queries, the higher the score the higher the load
- private volatile float loadScore;
// For showing cluster
private volatile NodeStatus status;
- public HeartbeatCache() {
+ public ConfigNodeHeartbeatCache() {
this.slidingWindow = new LinkedList<>();
- this.loadScore = 0;
this.status = NodeStatus.Running;
}
@@ -67,7 +63,6 @@ public class HeartbeatCache implements IHeartbeatStatistic {
}
// TODO: Optimize
- loadScore = -lastSendTime;
if (System.currentTimeMillis() - lastSendTime > 20_000) {
status = NodeStatus.Unknown;
} else {
@@ -80,7 +75,7 @@ public class HeartbeatCache implements IHeartbeatStatistic {
// Return a copy of loadScore
switch (status) {
case Running:
- return loadScore;
+ return 0;
case Unknown:
default:
// The Unknown Node will get the highest loadScore
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatCache.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/DataNodeHeartbeatCache.java
similarity index 94%
rename from
confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatCache.java
rename to
confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/DataNodeHeartbeatCache.java
index 734d186833..3ea8803705 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatCache.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/DataNodeHeartbeatCache.java
@@ -22,8 +22,8 @@ import org.apache.iotdb.commons.cluster.NodeStatus;
import java.util.LinkedList;
-/** HeartbeatCache caches and maintains all the heartbeat data */
-public class HeartbeatCache implements IHeartbeatStatistic {
+/** DataNodeHeartbeatCache caches and maintains all the heartbeat data */
+public class DataNodeHeartbeatCache implements IHeartbeatStatistic {
// Cache heartbeat samples
private static final int maximumWindowSize = 100;
@@ -34,7 +34,7 @@ public class HeartbeatCache implements IHeartbeatStatistic {
// For showing cluster
private volatile NodeStatus status;
- public HeartbeatCache() {
+ public DataNodeHeartbeatCache() {
this.slidingWindow = new LinkedList<>();
this.loadScore = 0;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 3527afa643..30c7ebf814 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -19,8 +19,6 @@
package org.apache.iotdb.confignode.service.thrift;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.auth.AuthException;
@@ -101,7 +99,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
-import java.util.stream.Collectors;
/** ConfigNodeRPCServer exposes the interface that interacts with the DataNode
*/
public class ConfigNodeRPCServiceProcessor implements ConfigIService.Iface {
@@ -157,17 +154,7 @@ public class ConfigNodeRPCServiceProcessor implements
ConfigIService.Iface {
@Override
public TClusterNodeInfos getAllClusterNodeInfos() throws TException {
- List<TConfigNodeLocation> configNodeLocations =
- configManager.getNodeManager().getOnlineConfigNodes();
- List<TDataNodeLocation> dataNodeInfoLocations =
- configManager.getNodeManager().getOnlineDataNodes(-1).stream()
- .map(TDataNodeInfo::getLocation)
- .collect(Collectors.toList());
-
- return new TClusterNodeInfos(
- new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
- configNodeLocations,
- dataNodeInfoLocations);
+ return configManager.getAllClusterNodeInfos();
}
@Override
@@ -473,6 +460,11 @@ public class ConfigNodeRPCServiceProcessor implements
ConfigIService.Iface {
return showRegionResp;
}
+ @Override
+ public long getConfigNodeHeartBeat(long timestamp) throws TException {
+ return timestamp;
+ }
+
public void handleClientExit() {}
// TODO: Interfaces for data operations
diff --git
a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index 63d521621f..4a303b6ff1 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -717,6 +717,19 @@ public class ConfigNodeClient implements
ConfigIService.Iface, SyncThriftClient,
throw new TException(MSG_RECONNECTION_FAIL);
}
+ @Override
+ public long getConfigNodeHeartBeat(long timestamp) throws TException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ return client.getConfigNodeHeartBeat(timestamp);
+ } catch (TException e) {
+ configLeader = null;
+ }
+ reconnect();
+ }
+ throw new TException(MSG_RECONNECTION_FAIL);
+ }
+
@Override
public TSStatus dropFunction(TDropFunctionReq req) throws TException {
for (int i = 0; i < RETRY_NUM; i++) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ShowClusterTask.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ShowClusterTask.java
index ddc1d78fc5..2b1c810e7d 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ShowClusterTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ShowClusterTask.java
@@ -31,7 +31,6 @@ import org.apache.iotdb.tsfile.utils.Binary;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.NODE_STATUS_RUNNING;
import static
org.apache.iotdb.commons.conf.IoTDBConstant.NODE_TYPE_CONFIG_NODE;
import static org.apache.iotdb.commons.conf.IoTDBConstant.NODE_TYPE_DATA_NODE;
@@ -74,7 +73,7 @@ public class ShowClusterTask implements IConfigTask {
builder,
e.getConfigNodeId(),
NODE_TYPE_CONFIG_NODE,
- NODE_STATUS_RUNNING,
+ clusterNodeInfos.getNodeStatus().get(e.getConfigNodeId()),
e.getInternalEndPoint().getIp(),
e.getInternalEndPoint().getPort()));
@@ -86,7 +85,7 @@ public class ShowClusterTask implements IConfigTask {
builder,
e.getDataNodeId(),
NODE_TYPE_DATA_NODE,
- NODE_STATUS_RUNNING,
+ clusterNodeInfos.getNodeStatus().get(e.getDataNodeId()),
e.getInternalEndPoint().getIp(),
e.getInternalEndPoint().getPort()));
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
index 31a6eccaba..1fc8acf022 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
@@ -315,7 +315,7 @@ public class InternalServiceImpl implements
InternalService.Iface {
}
@Override
- public THeartbeatResp getHeartBeat(THeartbeatReq req) throws TException {
+ public THeartbeatResp getDataNodeHeartBeat(THeartbeatReq req) throws
TException {
THeartbeatResp resp = new THeartbeatResp(req.getHeartbeatTimestamp(),
getJudgedLeaders());
Random whetherToGetMetric = new Random();
if
(MetricConfigDescriptor.getInstance().getMetricConfig().getEnableMetric()
diff --git a/thrift-commons/src/main/thrift/common.thrift
b/thrift-commons/src/main/thrift/common.thrift
index a9c365656b..b0c0dc8923 100644
--- a/thrift-commons/src/main/thrift/common.thrift
+++ b/thrift-commons/src/main/thrift/common.thrift
@@ -80,11 +80,11 @@ struct TDataNodeLocation {
struct TRegionInfo {
1: required TConsensusGroupId consensusGroupId
2: required string storageGroup
- 3: required i32 dataNodeId;
- 4: required string clientRpcIp;
- 5: required i32 clientRpcPort;
- 6: required i64 slots;
- 7: optional string status;
+ 3: required i32 dataNodeId
+ 4: required string clientRpcIp
+ 5: required i32 clientRpcPort
+ 6: required i64 slots
+ 7: optional string status
}
struct TDataNodeInfo {
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift
b/thrift-confignode/src/main/thrift/confignode.thrift
index c573bc2639..66af9851ae 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -216,6 +216,7 @@ struct TClusterNodeInfos {
1: required common.TSStatus status
2: required list<common.TConfigNodeLocation> configNodeList
3: required list<common.TDataNodeLocation> dataNodeList
+ 4: required map<i32, string> nodeStatus
}
// UDF
@@ -322,5 +323,8 @@ service ConfigIService {
TShowRegionResp showRegion(TShowRegionReq req)
+ /* Get confignode heartbeat */
+ i64 getConfigNodeHeartBeat(i64 timestamp)
+
}
diff --git a/thrift/src/main/thrift/mpp.thrift
b/thrift/src/main/thrift/mpp.thrift
index 7a60ea2ff3..02866b7f64 100644
--- a/thrift/src/main/thrift/mpp.thrift
+++ b/thrift/src/main/thrift/mpp.thrift
@@ -245,7 +245,7 @@ service InternalService {
*
* @param ConfigNode will send the latest config_node_list and load balancing
policies in THeartbeatReq
**/
- THeartbeatResp getHeartBeat(THeartbeatReq req)
+ THeartbeatResp getDataNodeHeartBeat(THeartbeatReq req)
/**
* Config node will create a function on a list of data nodes.