This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d05dcde13c [IOTDB-3391] && [IOTDB-3372] Judge ConfigNode status 
through heartbeat && show cluster did not return a correct result when stop a 
datanode or confignode (#6402)
d05dcde13c is described below

commit d05dcde13c908b4613c6214b6f10d7657f7b62ae
Author: 任宇华 <[email protected]>
AuthorDate: Tue Jun 28 15:42:17 2022 +0800

    [IOTDB-3391] && [IOTDB-3372] Judge ConfigNode status through heartbeat && 
show cluster did not return a correct result when stop a datanode or confignode 
(#6402)
---
 .../client/AsyncConfigNodeClientPool.java          | 72 ++++++++++++++++++++++
 .../confignode/client/AsyncDataNodeClientPool.java |  7 ++-
 ...andler.java => ConfigNodeHeartbeatHandler.java} | 34 +++++-----
 ...tHandler.java => DataNodeHeartbeatHandler.java} | 17 ++---
 .../iotdb/confignode/manager/ConfigManager.java    | 25 ++++++++
 .../apache/iotdb/confignode/manager/IManager.java  |  3 +
 .../iotdb/confignode/manager/NodeManager.java      |  6 ++
 .../iotdb/confignode/manager/load/LoadManager.java | 64 +++++++++++++++----
 ...eatCache.java => ConfigNodeHeartbeatCache.java} | 11 +---
 ...tbeatCache.java => DataNodeHeartbeatCache.java} |  6 +-
 .../thrift/ConfigNodeRPCServiceProcessor.java      | 20 ++----
 .../apache/iotdb/db/client/ConfigNodeClient.java   | 13 ++++
 .../mpp/plan/execution/config/ShowClusterTask.java |  5 +-
 .../service/thrift/impl/InternalServiceImpl.java   |  2 +-
 thrift-commons/src/main/thrift/common.thrift       | 10 +--
 .../src/main/thrift/confignode.thrift              |  4 ++
 thrift/src/main/thrift/mpp.thrift                  |  2 +-
 17 files changed, 227 insertions(+), 74 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncConfigNodeClientPool.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncConfigNodeClientPool.java
new file mode 100644
index 0000000000..00e37493dd
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncConfigNodeClientPool.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.client;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.async.AsyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.client.handlers.ConfigNodeHeartbeatHandler;
+import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AsyncConfigNodeClientPool {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AsyncConfigNodeClientPool.class);
+
+  private final IClientManager<TEndPoint, AsyncConfigNodeIServiceClient> 
clientManager;
+
+  private AsyncConfigNodeClientPool() {
+    clientManager =
+        new IClientManager.Factory<TEndPoint, AsyncConfigNodeIServiceClient>()
+            .createClientManager(
+                new 
DataNodeClientPoolFactory.AsyncConfigNodeIServiceClientPoolFactory());
+  }
+
+  /**
+   * Only used in LoadManager
+   *
+   * @param endPoint The specific ConfigNode
+   */
+  public void getConfigNodeHeartBeat(
+      TEndPoint endPoint, long timestamp, ConfigNodeHeartbeatHandler handler) {
+    AsyncConfigNodeIServiceClient client;
+    try {
+      client = clientManager.borrowClient(endPoint);
+      client.getConfigNodeHeartBeat(timestamp, handler);
+    } catch (Exception e) {
+      LOGGER.error("Asking ConfigNode: {}, for heartbeat failed", endPoint, e);
+    }
+  }
+
+  // TODO: Is the ClientPool must be a singleton?
+  private static class AsyncConfigNodeClientPoolHolder {
+
+    private static final AsyncConfigNodeClientPool INSTANCE = new 
AsyncConfigNodeClientPool();
+
+    private AsyncConfigNodeClientPoolHolder() {
+      // Empty constructor
+    }
+  }
+
+  public static AsyncConfigNodeClientPool getInstance() {
+    return AsyncConfigNodeClientPool.AsyncConfigNodeClientPoolHolder.INSTANCE;
+  }
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
index 9ecab37d8a..aa5b679902 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
@@ -26,9 +26,9 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.client.IClientManager;
 import 
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
 import org.apache.iotdb.confignode.client.handlers.CreateRegionHandler;
+import org.apache.iotdb.confignode.client.handlers.DataNodeHeartbeatHandler;
 import org.apache.iotdb.confignode.client.handlers.FlushHandler;
 import org.apache.iotdb.confignode.client.handlers.FunctionManagementHandler;
-import org.apache.iotdb.confignode.client.handlers.HeartbeatHandler;
 import org.apache.iotdb.confignode.consensus.request.write.CreateRegionsReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest;
@@ -219,11 +219,12 @@ public class AsyncDataNodeClientPool {
    *
    * @param endPoint The specific DataNode
    */
-  public void getHeartBeat(TEndPoint endPoint, THeartbeatReq req, 
HeartbeatHandler handler) {
+  public void getDataNodeHeartBeat(
+      TEndPoint endPoint, THeartbeatReq req, DataNodeHeartbeatHandler handler) 
{
     AsyncDataNodeInternalServiceClient client;
     try {
       client = clientManager.borrowClient(endPoint);
-      client.getHeartBeat(req, handler);
+      client.getDataNodeHeartBeat(req, handler);
     } catch (Exception e) {
       LOGGER.error("Asking DataNode: {}, for heartbeat failed", endPoint, e);
     }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/HeartbeatHandler.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/ConfigNodeHeartbeatHandler.java
similarity index 52%
copy from 
confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/HeartbeatHandler.java
copy to 
confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/ConfigNodeHeartbeatHandler.java
index e4a4a27d55..4a3b6b94f7 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/HeartbeatHandler.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/ConfigNodeHeartbeatHandler.java
@@ -18,40 +18,40 @@
  */
 package org.apache.iotdb.confignode.client.handlers;
 
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatCache;
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import 
org.apache.iotdb.confignode.manager.load.heartbeat.ConfigNodeHeartbeatCache;
 import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatPackage;
-import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
 
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class HeartbeatHandler implements AsyncMethodCallback<THeartbeatResp> {
+public class ConfigNodeHeartbeatHandler implements AsyncMethodCallback<Long> {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(HeartbeatHandler.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ConfigNodeHeartbeatHandler.class);
 
-  // Update HeartbeatCache when success
-  private final TDataNodeLocation dataNodeLocation;
-  private final HeartbeatCache heartbeatCache;
+  // Update ConfigNodeHeartbeatCache when success
+  private final TConfigNodeLocation configNodeLocation;
+  private final ConfigNodeHeartbeatCache configNodeHeartbeatCache;
 
-  public HeartbeatHandler(TDataNodeLocation dataNodeLocation, HeartbeatCache 
heartbeatCache) {
-    this.dataNodeLocation = dataNodeLocation;
-    this.heartbeatCache = heartbeatCache;
+  public ConfigNodeHeartbeatHandler(
+      TConfigNodeLocation configNodeLocation, ConfigNodeHeartbeatCache 
configNodeHeartbeatCache) {
+    this.configNodeLocation = configNodeLocation;
+    this.configNodeHeartbeatCache = configNodeHeartbeatCache;
   }
 
   @Override
-  public void onComplete(THeartbeatResp tHeartbeatResp) {
-    heartbeatCache.cacheHeartBeat(
-        new HeartbeatPackage(tHeartbeatResp.getHeartbeatTimestamp(), 
System.currentTimeMillis()));
+  public void onComplete(Long timestamp) {
+    configNodeHeartbeatCache.cacheHeartBeat(
+        new HeartbeatPackage(timestamp, System.currentTimeMillis()));
   }
 
   @Override
   public void onError(Exception e) {
     LOGGER.warn(
-        "Heartbeat error on DataNode: {id={}, internalEndPoint={}}",
-        dataNodeLocation.getDataNodeId(),
-        dataNodeLocation.getInternalEndPoint(),
+        "Heartbeat error on ConfigNode: {id={}, internalEndPoint={}}",
+        configNodeLocation.getConfigNodeId(),
+        configNodeLocation.getInternalEndPoint(),
         e);
   }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/HeartbeatHandler.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/DataNodeHeartbeatHandler.java
similarity index 73%
rename from 
confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/HeartbeatHandler.java
rename to 
confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/DataNodeHeartbeatHandler.java
index e4a4a27d55..0da0022626 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/HeartbeatHandler.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/DataNodeHeartbeatHandler.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.confignode.client.handlers;
 
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatCache;
+import 
org.apache.iotdb.confignode.manager.load.heartbeat.DataNodeHeartbeatCache;
 import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatPackage;
 import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
 
@@ -27,22 +27,23 @@ import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class HeartbeatHandler implements AsyncMethodCallback<THeartbeatResp> {
+public class DataNodeHeartbeatHandler implements 
AsyncMethodCallback<THeartbeatResp> {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(HeartbeatHandler.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DataNodeHeartbeatHandler.class);
 
-  // Update HeartbeatCache when success
+  // Update DataNodeHeartbeatCache when success
   private final TDataNodeLocation dataNodeLocation;
-  private final HeartbeatCache heartbeatCache;
+  private final DataNodeHeartbeatCache dataNodeHeartbeatCache;
 
-  public HeartbeatHandler(TDataNodeLocation dataNodeLocation, HeartbeatCache 
heartbeatCache) {
+  public DataNodeHeartbeatHandler(
+      TDataNodeLocation dataNodeLocation, DataNodeHeartbeatCache 
dataNodeHeartbeatCache) {
     this.dataNodeLocation = dataNodeLocation;
-    this.heartbeatCache = heartbeatCache;
+    this.dataNodeHeartbeatCache = dataNodeHeartbeatCache;
   }
 
   @Override
   public void onComplete(THeartbeatResp tHeartbeatResp) {
-    heartbeatCache.cacheHeartBeat(
+    dataNodeHeartbeatCache.cacheHeartBeat(
         new HeartbeatPackage(tHeartbeatResp.getHeartbeatTimestamp(), 
System.currentTimeMillis()));
   }
 
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 88db72bf78..dd972bbbac 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -20,6 +20,8 @@
 package org.apache.iotdb.confignode.manager;
 
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
@@ -67,6 +69,7 @@ import org.apache.iotdb.confignode.persistence.ProcedureInfo;
 import org.apache.iotdb.confignode.persistence.UDFInfo;
 import org.apache.iotdb.confignode.persistence.executor.ConfigRequestExecutor;
 import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TClusterNodeInfos;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionResp;
@@ -185,6 +188,28 @@ public class ConfigManager implements IManager {
     }
   }
 
+  @Override
+  public TClusterNodeInfos getAllClusterNodeInfos() {
+    TSStatus status = confirmLeader();
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      List<TConfigNodeLocation> configNodeLocations = 
getNodeManager().getOnlineConfigNodes();
+      List<TDataNodeLocation> dataNodeInfoLocations =
+          getNodeManager().getOnlineDataNodes(-1).stream()
+              .map(TDataNodeInfo::getLocation)
+              .collect(Collectors.toList());
+      Map<Integer, String> nodeStatus = new HashMap<>();
+      getLoadManager()
+          .getHeartbeatCacheMap()
+          .forEach(
+              (nodeId, heartbeatCache) -> {
+                nodeStatus.put(nodeId, 
heartbeatCache.getNodeStatus().getStatus());
+              });
+      return new TClusterNodeInfos(status, configNodeLocations, 
dataNodeInfoLocations, nodeStatus);
+    } else {
+      return new TClusterNodeInfos(status, new ArrayList<>(), new 
ArrayList<>(), new HashMap<>());
+    }
+  }
+
   @Override
   public TSStatus setTTL(SetTTLReq setTTLReq) {
     TSStatus status = confirmLeader();
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 65ce70ade3..f6af3efe3d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -37,6 +37,7 @@ import 
org.apache.iotdb.confignode.consensus.request.write.SetStorageGroupReq;
 import org.apache.iotdb.confignode.consensus.request.write.SetTTLReq;
 import 
org.apache.iotdb.confignode.consensus.request.write.SetTimePartitionIntervalReq;
 import org.apache.iotdb.confignode.manager.load.LoadManager;
+import org.apache.iotdb.confignode.rpc.thrift.TClusterNodeInfos;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionResp;
@@ -117,6 +118,8 @@ public interface IManager {
    */
   DataSet getDataNodeInfo(GetDataNodeInfoReq getDataNodeInfoReq);
 
+  TClusterNodeInfos getAllClusterNodeInfos();
+
   TSStatus setTTL(SetTTLReq configRequest);
 
   TSStatus setSchemaReplicationFactor(SetSchemaReplicationFactorReq 
configRequest);
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
index b448d10cca..ea644481ba 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
@@ -229,6 +229,10 @@ public class NodeManager {
 
         // Execute removePeer
         if (getConsensusManager().removeConfigNodePeer(removeConfigNodeReq)) {
+          configManager
+              .getLoadManager()
+              .removeNodeHeartbeatHandCache(
+                  
removeConfigNodeReq.getConfigNodeLocation().getConfigNodeId());
           return getConsensusManager().write(removeConfigNodeReq).getStatus();
         } else {
           return new 
TSStatus(TSStatusCode.REMOVE_CONFIGNODE_FAILED.getStatusCode())
@@ -306,6 +310,8 @@ public class NodeManager {
 
     @Override
     public void removeDataNode(TDataNodeLocation dataNodeInfo) {
+      // TODO: When removing a datanode, do the following
+      //  
configManager.getLoadManager().removeNodeHeartbeatHandCache(dataNodeId);
       serverChanged();
     }
 
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 3d76fea103..198085009e 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.confignode.manager.load;
 
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
@@ -28,8 +29,10 @@ import 
org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.partition.DataPartitionTable;
 import org.apache.iotdb.commons.partition.SchemaPartitionTable;
+import org.apache.iotdb.confignode.client.AsyncConfigNodeClientPool;
 import org.apache.iotdb.confignode.client.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.handlers.HeartbeatHandler;
+import org.apache.iotdb.confignode.client.handlers.ConfigNodeHeartbeatHandler;
+import org.apache.iotdb.confignode.client.handlers.DataNodeHeartbeatHandler;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.request.write.CreateRegionsReq;
 import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
@@ -42,7 +45,8 @@ import org.apache.iotdb.confignode.manager.PartitionManager;
 import org.apache.iotdb.confignode.manager.load.balancer.PartitionBalancer;
 import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer;
 import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
-import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatCache;
+import 
org.apache.iotdb.confignode.manager.load.heartbeat.ConfigNodeHeartbeatCache;
+import 
org.apache.iotdb.confignode.manager.load.heartbeat.DataNodeHeartbeatCache;
 import org.apache.iotdb.confignode.manager.load.heartbeat.IHeartbeatStatistic;
 import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
 
@@ -69,8 +73,8 @@ public class LoadManager {
 
   private final long heartbeatInterval =
       ConfigNodeDescriptor.getInstance().getConf().getHeartbeatInterval();
-  // Map<NodeId, HeartbeatCache>
-  private final Map<Integer, HeartbeatCache> heartbeatCacheMap;
+  // Map<NodeId, IHeartbeatStatistic>
+  private final Map<Integer, IHeartbeatStatistic> heartbeatCacheMap;
 
   // Balancers
   private final RegionBalancer regionBalancer;
@@ -201,8 +205,8 @@ public class LoadManager {
     if (getConsensusManager().isLeader()) {
       // Send heartbeat requests to all the online DataNodes
       pingOnlineDataNodes(getNodeManager().getOnlineDataNodes(-1));
-      // TODO: Send heartbeat requests to all the online ConfigNodes
-
+      // Send heartbeat requests to all the online ConfigNodes
+      pingOnlineConfigNodes(getNodeManager().getOnlineConfigNodes());
       // Do load balancing
       doLoadBalancing(balanceCount);
       balanceCount += 1;
@@ -232,17 +236,51 @@ public class LoadManager {
   private void pingOnlineDataNodes(List<TDataNodeInfo> onlineDataNodes) {
     // Send heartbeat requests
     for (TDataNodeInfo dataNodeInfo : onlineDataNodes) {
-      HeartbeatHandler handler =
-          new HeartbeatHandler(
+      DataNodeHeartbeatHandler handler =
+          new DataNodeHeartbeatHandler(
               dataNodeInfo.getLocation(),
-              heartbeatCacheMap.computeIfAbsent(
-                  dataNodeInfo.getLocation().getDataNodeId(), empty -> new 
HeartbeatCache()));
+              (DataNodeHeartbeatCache)
+                  heartbeatCacheMap.computeIfAbsent(
+                      dataNodeInfo.getLocation().getDataNodeId(),
+                      empty -> new DataNodeHeartbeatCache()));
       AsyncDataNodeClientPool.getInstance()
-          .getHeartBeat(
+          .getDataNodeHeartBeat(
               dataNodeInfo.getLocation().getInternalEndPoint(), 
genHeartbeatReq(), handler);
     }
   }
 
+  /**
+   * Send heartbeat requests to all the online ConfigNodes
+   *
+   * @param onlineConfigNodes ConfigNodes that currently online
+   */
+  private void pingOnlineConfigNodes(List<TConfigNodeLocation> 
onlineConfigNodes) {
+    // Send heartbeat requests
+    for (TConfigNodeLocation configNodeLocation : onlineConfigNodes) {
+      ConfigNodeHeartbeatHandler handler =
+          new ConfigNodeHeartbeatHandler(
+              configNodeLocation,
+              (ConfigNodeHeartbeatCache)
+                  heartbeatCacheMap.computeIfAbsent(
+                      configNodeLocation.getConfigNodeId(),
+                      empty -> new ConfigNodeHeartbeatCache()));
+      AsyncConfigNodeClientPool.getInstance()
+          .getConfigNodeHeartBeat(
+              configNodeLocation.getInternalEndPoint(),
+              genHeartbeatReq().getHeartbeatTimestamp(),
+              handler);
+    }
+  }
+
+  /**
+   * When a node is removed, clear the node's cache
+   *
+   * @param nodeId removed node id
+   */
+  public void removeNodeHeartbeatHandCache(Integer nodeId) {
+    heartbeatCacheMap.remove(nodeId);
+  }
+
   private ConsensusManager getConsensusManager() {
     return configManager.getConsensusManager();
   }
@@ -258,4 +296,8 @@ public class LoadManager {
   private PartitionManager getPartitionManager() {
     return configManager.getPartitionManager();
   }
+
+  public Map<Integer, IHeartbeatStatistic> getHeartbeatCacheMap() {
+    return heartbeatCacheMap;
+  }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatCache.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/ConfigNodeHeartbeatCache.java
similarity index 88%
copy from 
confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatCache.java
copy to 
confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/ConfigNodeHeartbeatCache.java
index 734d186833..039dda5583 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatCache.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/ConfigNodeHeartbeatCache.java
@@ -22,22 +22,18 @@ import org.apache.iotdb.commons.cluster.NodeStatus;
 
 import java.util.LinkedList;
 
-/** HeartbeatCache caches and maintains all the heartbeat data */
-public class HeartbeatCache implements IHeartbeatStatistic {
+public class ConfigNodeHeartbeatCache implements IHeartbeatStatistic {
 
   // Cache heartbeat samples
   private static final int maximumWindowSize = 100;
   private final LinkedList<HeartbeatPackage> slidingWindow;
 
-  // For guiding queries, the higher the score the higher the load
-  private volatile float loadScore;
   // For showing cluster
   private volatile NodeStatus status;
 
-  public HeartbeatCache() {
+  public ConfigNodeHeartbeatCache() {
     this.slidingWindow = new LinkedList<>();
 
-    this.loadScore = 0;
     this.status = NodeStatus.Running;
   }
 
@@ -67,7 +63,6 @@ public class HeartbeatCache implements IHeartbeatStatistic {
     }
 
     // TODO: Optimize
-    loadScore = -lastSendTime;
     if (System.currentTimeMillis() - lastSendTime > 20_000) {
       status = NodeStatus.Unknown;
     } else {
@@ -80,7 +75,7 @@ public class HeartbeatCache implements IHeartbeatStatistic {
     // Return a copy of loadScore
     switch (status) {
       case Running:
-        return loadScore;
+        return 0;
       case Unknown:
       default:
         // The Unknown Node will get the highest loadScore
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatCache.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/DataNodeHeartbeatCache.java
similarity index 94%
rename from 
confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatCache.java
rename to 
confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/DataNodeHeartbeatCache.java
index 734d186833..3ea8803705 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatCache.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/DataNodeHeartbeatCache.java
@@ -22,8 +22,8 @@ import org.apache.iotdb.commons.cluster.NodeStatus;
 
 import java.util.LinkedList;
 
-/** HeartbeatCache caches and maintains all the heartbeat data */
-public class HeartbeatCache implements IHeartbeatStatistic {
+/** DataNodeHeartbeatCache caches and maintains all the heartbeat data */
+public class DataNodeHeartbeatCache implements IHeartbeatStatistic {
 
   // Cache heartbeat samples
   private static final int maximumWindowSize = 100;
@@ -34,7 +34,7 @@ public class HeartbeatCache implements IHeartbeatStatistic {
   // For showing cluster
   private volatile NodeStatus status;
 
-  public HeartbeatCache() {
+  public DataNodeHeartbeatCache() {
     this.slidingWindow = new LinkedList<>();
 
     this.loadScore = 0;
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 3527afa643..30c7ebf814 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -19,8 +19,6 @@
 package org.apache.iotdb.confignode.service.thrift;
 
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.auth.AuthException;
@@ -101,7 +99,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
-import java.util.stream.Collectors;
 
 /** ConfigNodeRPCServer exposes the interface that interacts with the DataNode 
*/
 public class ConfigNodeRPCServiceProcessor implements ConfigIService.Iface {
@@ -157,17 +154,7 @@ public class ConfigNodeRPCServiceProcessor implements 
ConfigIService.Iface {
 
   @Override
   public TClusterNodeInfos getAllClusterNodeInfos() throws TException {
-    List<TConfigNodeLocation> configNodeLocations =
-        configManager.getNodeManager().getOnlineConfigNodes();
-    List<TDataNodeLocation> dataNodeInfoLocations =
-        configManager.getNodeManager().getOnlineDataNodes(-1).stream()
-            .map(TDataNodeInfo::getLocation)
-            .collect(Collectors.toList());
-
-    return new TClusterNodeInfos(
-        new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
-        configNodeLocations,
-        dataNodeInfoLocations);
+    return configManager.getAllClusterNodeInfos();
   }
 
   @Override
@@ -473,6 +460,11 @@ public class ConfigNodeRPCServiceProcessor implements 
ConfigIService.Iface {
     return showRegionResp;
   }
 
+  @Override
+  public long getConfigNodeHeartBeat(long timestamp) throws TException {
+    return timestamp;
+  }
+
   public void handleClientExit() {}
 
   // TODO: Interfaces for data operations
diff --git 
a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java 
b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index 63d521621f..4a303b6ff1 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -717,6 +717,19 @@ public class ConfigNodeClient implements 
ConfigIService.Iface, SyncThriftClient,
     throw new TException(MSG_RECONNECTION_FAIL);
   }
 
+  @Override
+  public long getConfigNodeHeartBeat(long timestamp) throws TException {
+    for (int i = 0; i < RETRY_NUM; i++) {
+      try {
+        return client.getConfigNodeHeartBeat(timestamp);
+      } catch (TException e) {
+        configLeader = null;
+      }
+      reconnect();
+    }
+    throw new TException(MSG_RECONNECTION_FAIL);
+  }
+
   @Override
   public TSStatus dropFunction(TDropFunctionReq req) throws TException {
     for (int i = 0; i < RETRY_NUM; i++) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ShowClusterTask.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ShowClusterTask.java
index ddc1d78fc5..2b1c810e7d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ShowClusterTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ShowClusterTask.java
@@ -31,7 +31,6 @@ import org.apache.iotdb.tsfile.utils.Binary;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 
-import static org.apache.iotdb.commons.conf.IoTDBConstant.NODE_STATUS_RUNNING;
 import static 
org.apache.iotdb.commons.conf.IoTDBConstant.NODE_TYPE_CONFIG_NODE;
 import static org.apache.iotdb.commons.conf.IoTDBConstant.NODE_TYPE_DATA_NODE;
 
@@ -74,7 +73,7 @@ public class ShowClusterTask implements IConfigTask {
                     builder,
                     e.getConfigNodeId(),
                     NODE_TYPE_CONFIG_NODE,
-                    NODE_STATUS_RUNNING,
+                    clusterNodeInfos.getNodeStatus().get(e.getConfigNodeId()),
                     e.getInternalEndPoint().getIp(),
                     e.getInternalEndPoint().getPort()));
 
@@ -86,7 +85,7 @@ public class ShowClusterTask implements IConfigTask {
                     builder,
                     e.getDataNodeId(),
                     NODE_TYPE_DATA_NODE,
-                    NODE_STATUS_RUNNING,
+                    clusterNodeInfos.getNodeStatus().get(e.getDataNodeId()),
                     e.getInternalEndPoint().getIp(),
                     e.getInternalEndPoint().getPort()));
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
index 31a6eccaba..1fc8acf022 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
@@ -315,7 +315,7 @@ public class InternalServiceImpl implements 
InternalService.Iface {
   }
 
   @Override
-  public THeartbeatResp getHeartBeat(THeartbeatReq req) throws TException {
+  public THeartbeatResp getDataNodeHeartBeat(THeartbeatReq req) throws 
TException {
     THeartbeatResp resp = new THeartbeatResp(req.getHeartbeatTimestamp(), 
getJudgedLeaders());
     Random whetherToGetMetric = new Random();
     if 
(MetricConfigDescriptor.getInstance().getMetricConfig().getEnableMetric()
diff --git a/thrift-commons/src/main/thrift/common.thrift 
b/thrift-commons/src/main/thrift/common.thrift
index a9c365656b..b0c0dc8923 100644
--- a/thrift-commons/src/main/thrift/common.thrift
+++ b/thrift-commons/src/main/thrift/common.thrift
@@ -80,11 +80,11 @@ struct TDataNodeLocation {
 struct TRegionInfo {
   1: required TConsensusGroupId consensusGroupId
   2: required string storageGroup
-  3: required i32 dataNodeId;
-  4: required string clientRpcIp;
-  5: required i32 clientRpcPort;
-  6: required i64 slots;
-  7: optional string status;
+  3: required i32 dataNodeId
+  4: required string clientRpcIp
+  5: required i32 clientRpcPort
+  6: required i64 slots
+  7: optional string status
 }
 
 struct TDataNodeInfo {
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift 
b/thrift-confignode/src/main/thrift/confignode.thrift
index c573bc2639..66af9851ae 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -216,6 +216,7 @@ struct TClusterNodeInfos {
   1: required common.TSStatus status
   2: required list<common.TConfigNodeLocation> configNodeList
   3: required list<common.TDataNodeLocation> dataNodeList
+  4: required map<i32, string> nodeStatus
 }
 
 // UDF
@@ -322,5 +323,8 @@ service ConfigIService {
 
   TShowRegionResp showRegion(TShowRegionReq req)
 
+  /* Get confignode heartbeat */
+  i64 getConfigNodeHeartBeat(i64 timestamp)
+
 }
 
diff --git a/thrift/src/main/thrift/mpp.thrift 
b/thrift/src/main/thrift/mpp.thrift
index 7a60ea2ff3..02866b7f64 100644
--- a/thrift/src/main/thrift/mpp.thrift
+++ b/thrift/src/main/thrift/mpp.thrift
@@ -245,7 +245,7 @@ service InternalService {
   *
   * @param ConfigNode will send the latest config_node_list and load balancing 
policies in THeartbeatReq
   **/
-  THeartbeatResp getHeartBeat(THeartbeatReq req)
+  THeartbeatResp getDataNodeHeartBeat(THeartbeatReq req)
 
   /**
    * Config node will create a function on a list of data nodes.

Reply via email to