This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3b179eeb3ab To adapt activation (#11463)
3b179eeb3ab is described below
commit 3b179eeb3ab18322266309db84146a131e2b173b
Author: Li Yu Heng <[email protected]>
AuthorDate: Fri Nov 3 14:10:07 2023 +0800
To adapt activation (#11463)
---
.../iotdb/it/env/cluster/env/AbstractEnv.java | 18 +++++++++++++++
.../java/org/apache/iotdb/itbase/env/BaseEnv.java | 8 +++++++
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
.../async/AsyncConfigNodeHeartbeatClientPool.java | 7 ++++--
.../heartbeat/ConfigNodeHeartbeatHandler.java | 14 ++++++-----
.../iotdb/confignode/manager/ConfigManager.java | 22 ++++++++----------
.../confignode/manager/load/cache/LoadCache.java | 8 ++++---
.../load/cache/node/NodeHeartbeatSample.java | 8 +++++++
.../manager/load/service/HeartbeatService.java | 24 +++++++++++--------
.../iotdb/confignode/manager/node/NodeManager.java | 4 ++--
.../iotdb/confignode/service/ConfigNode.java | 10 +++++---
.../confignode/service/ConfigNodeCommandLine.java | 8 +++++--
.../confignode/service/ConfigNodeShutdownHook.java | 6 ++++-
.../thrift/ConfigNodeRPCServiceProcessor.java | 8 +++++--
.../iotdb/db/protocol/client/ConfigNodeClient.java | 5 +++-
.../impl/DataNodeInternalRPCServiceImpl.java | 11 ++++-----
.../execution/config/metadata/ShowClusterTask.java | 1 +
.../java/org/apache/iotdb/db/service/DataNode.java | 4 ++--
.../org/apache/iotdb/db/utils/DateTimeUtils.java | 27 ++++++++++++++++++++++
.../thrift-commons/src/main/thrift/common.thrift | 11 +++++++++
.../src/main/thrift/confignode.thrift | 27 +++++++++++++++++++++-
.../src/main/thrift/datanode.thrift | 8 +++++++
22 files changed, 187 insertions(+), 53 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
index 6edfe0122ff..88a99162281 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
@@ -599,6 +599,24 @@ public abstract class AbstractEnv implements BaseEnv {
}
}
+ @Override
+ public IConfigNodeRPCService.Iface getConfigNodeConnection(int index) throws
Exception {
+ Exception lastException = null;
+ ConfigNodeWrapper configNodeWrapper = configNodeWrapperList.get(index);
+ for (int i = 0; i < 30; i++) {
+ try {
+ return clientManager.borrowClient(
+ new TEndPoint(configNodeWrapper.getIp(),
configNodeWrapper.getPort()));
+ } catch (Exception e) {
+ lastException = e;
+ }
+ // Sleep 1s before next retry
+ TimeUnit.SECONDS.sleep(1);
+ }
+ throw new IOException(
+ "Failed to get connection to this ConfigNode. Last error: " +
lastException);
+ }
+
@Override
public int getLeaderConfigNodeIndex() throws IOException,
InterruptedException {
Exception lastException = null;
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
index 5e3f18f450f..08a99c936b2 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
@@ -47,6 +47,10 @@ public interface BaseEnv {
*/
void initClusterEnvironment(int configNodesNum, int dataNodesNum);
+ default void addClusterDataNodes(int dataNodesNum) throws IOException,
InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
/**
* Init a cluster with the specified number of ConfigNodes and DataNodes.
*
@@ -94,6 +98,10 @@ public interface BaseEnv {
IConfigNodeRPCService.Iface getLeaderConfigNodeConnection()
throws ClientManagerException, IOException, InterruptedException;
+ default IConfigNodeRPCService.Iface getConfigNodeConnection(int index)
throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
ISessionPool getSessionPool(int maxSize);
ISession getSessionConnection() throws IoTDBConnectionException;
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 0283acd556b..eb87684e3b3 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -46,6 +46,7 @@ public enum TSStatusCode {
OVERLAP_WITH_EXISTING_TASK(304),
INTERNAL_SERVER_ERROR(305),
DISPATCH_ERROR(306),
+ LICENSE_ERROR(307),
// Client,
REDIRECTION_RECOMMEND(400),
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
index d11ab569ff8..fdcd3b0d1e2 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.async.AsyncConfigNodeIServiceClient;
import
org.apache.iotdb.confignode.client.async.handlers.heartbeat.ConfigNodeHeartbeatHandler;
+import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeHeartbeatReq;
public class AsyncConfigNodeHeartbeatClientPool {
@@ -42,9 +43,11 @@ public class AsyncConfigNodeHeartbeatClientPool {
* @param endPoint The specific ConfigNode
*/
public void getConfigNodeHeartBeat(
- TEndPoint endPoint, long timestamp, ConfigNodeHeartbeatHandler handler) {
+ TEndPoint endPoint,
+ TConfigNodeHeartbeatReq heartbeatReq,
+ ConfigNodeHeartbeatHandler handler) {
try {
- clientManager.borrowClient(endPoint).getConfigNodeHeartBeat(timestamp,
handler);
+
clientManager.borrowClient(endPoint).getConfigNodeHeartBeat(heartbeatReq,
handler);
} catch (Exception ignore) {
// Just ignore
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java
index 69f72df9702..869fa471d5c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java
@@ -22,26 +22,28 @@ package
org.apache.iotdb.confignode.client.async.handlers.heartbeat;
import org.apache.iotdb.commons.client.ThriftClient;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.cluster.NodeType;
+import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.load.cache.LoadCache;
import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeHeartbeatResp;
import org.apache.thrift.async.AsyncMethodCallback;
-public class ConfigNodeHeartbeatHandler implements AsyncMethodCallback<Long> {
+public class ConfigNodeHeartbeatHandler implements
AsyncMethodCallback<TConfigNodeHeartbeatResp> {
+ private final IManager configManager;
private final int nodeId;
private final LoadCache loadCache;
- public ConfigNodeHeartbeatHandler(int nodeId, LoadCache loadCache) {
+ public ConfigNodeHeartbeatHandler(IManager configManager, int nodeId,
LoadCache loadCache) {
+ this.configManager = configManager;
this.nodeId = nodeId;
this.loadCache = loadCache;
}
@Override
- public void onComplete(Long timestamp) {
- long receiveTime = System.currentTimeMillis();
- loadCache.cacheConfigNodeHeartbeatSample(
- nodeId, new NodeHeartbeatSample(timestamp, receiveTime));
+ public void onComplete(TConfigNodeHeartbeatResp resp) {
+ loadCache.cacheConfigNodeHeartbeatSample(nodeId, resp);
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 94d93bc3124..18eaf51456e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -428,25 +428,23 @@ public class ConfigManager implements IManager {
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
List<TConfigNodeLocation> configNodeLocations =
getNodeManager().getRegisteredConfigNodes();
configNodeLocations.sort(Comparator.comparingInt(TConfigNodeLocation::getConfigNodeId));
- List<TDataNodeLocation> dataNodeInfoLocations =
+ List<TDataNodeLocation> dataNodeLocations =
getNodeManager().getRegisteredDataNodes().stream()
.map(TDataNodeConfiguration::getLocation)
.sorted(Comparator.comparingInt(TDataNodeLocation::getDataNodeId))
.collect(Collectors.toList());
Map<Integer, TNodeVersionInfo> nodeVersionInfo =
getNodeManager().getNodeVersionInfo();
Map<Integer, String> nodeStatus =
getLoadManager().getNodeStatusWithReason();
- for (TConfigNodeLocation configNodeLocation : configNodeLocations) {
- if (!nodeStatus.containsKey(configNodeLocation.getConfigNodeId())) {
- nodeStatus.put(configNodeLocation.getConfigNodeId(),
NodeStatus.Unknown.toString());
- }
- }
- for (TDataNodeLocation dataNodeLocation : dataNodeInfoLocations) {
- if (!nodeStatus.containsKey(dataNodeLocation.getDataNodeId())) {
- nodeStatus.put(dataNodeLocation.getDataNodeId(),
NodeStatus.Unknown.toString());
- }
- }
+ configNodeLocations.forEach(
+ configNodeLocation ->
+ nodeStatus.putIfAbsent(
+ configNodeLocation.getConfigNodeId(),
NodeStatus.Unknown.toString()));
+ dataNodeLocations.forEach(
+ dataNodeLocation ->
+ nodeStatus.putIfAbsent(
+ dataNodeLocation.getDataNodeId(),
NodeStatus.Unknown.toString()));
return new TShowClusterResp(
- status, configNodeLocations, dataNodeInfoLocations, nodeStatus,
nodeVersionInfo);
+ status, configNodeLocations, dataNodeLocations, nodeStatus,
nodeVersionInfo);
} else {
return new TShowClusterResp(
status, new ArrayList<>(), new ArrayList<>(), new HashMap<>(), new
HashMap<>());
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
index bb559135dec..00cc834909a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
@@ -40,6 +40,7 @@ import
org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupStatisti
import
org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
import org.apache.iotdb.confignode.manager.load.cache.route.RegionRouteCache;
import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
+import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeHeartbeatResp;
import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
@@ -135,12 +136,13 @@ public class LoadCache {
* Cache the latest heartbeat sample of a ConfigNode.
*
* @param nodeId the id of the ConfigNode
- * @param sample the latest heartbeat sample
+ * @param resp the heartbeat response
*/
- public void cacheConfigNodeHeartbeatSample(int nodeId, NodeHeartbeatSample
sample) {
+ public void cacheConfigNodeHeartbeatSample(int nodeId,
TConfigNodeHeartbeatResp resp) {
+ long receiveTime = System.currentTimeMillis();
nodeCacheMap
.computeIfAbsent(nodeId, empty -> new ConfigNodeHeartbeatCache(nodeId))
- .cacheHeartbeatSample(sample);
+ .cacheHeartbeatSample(new NodeHeartbeatSample(resp, receiveTime));
}
/**
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeHeartbeatSample.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeHeartbeatSample.java
index c188978e2c3..f1db9e82a48 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeHeartbeatSample.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeHeartbeatSample.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.confignode.manager.load.cache.node;
import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TLoadSample;
@@ -55,6 +56,13 @@ public class NodeHeartbeatSample {
}
}
+ public NodeHeartbeatSample(TConfigNodeHeartbeatResp heartbeatResp, long
receiveTimestamp) {
+ this.sendTimestamp = heartbeatResp.getTimestamp();
+ this.receiveTimestamp = receiveTimestamp;
+ this.status = NodeStatus.Running;
+ this.statusReason = null;
+ }
+
public long getSendTimestamp() {
return sendTimestamp;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
index 284a4d85bbd..97de40fb314 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
@@ -35,6 +35,7 @@ import
org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
import org.apache.iotdb.confignode.manager.load.cache.LoadCache;
import
org.apache.iotdb.confignode.manager.load.cache.node.ConfigNodeHeartbeatCache;
import org.apache.iotdb.confignode.manager.node.NodeManager;
+import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeHeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -108,13 +109,12 @@ public class HeartbeatService {
.ifPresent(
consensusManager -> {
if (getConsensusManager().isLeader()) {
- // Generate HeartbeatReq
- THeartbeatReq heartbeatReq = genHeartbeatReq();
// Send heartbeat requests to all the registered ConfigNodes
pingRegisteredConfigNodes(
- heartbeatReq, getNodeManager().getRegisteredConfigNodes());
+ genConfigNodeHeartbeatReq(),
getNodeManager().getRegisteredConfigNodes());
// Send heartbeat requests to all the registered DataNodes
- pingRegisteredDataNodes(heartbeatReq,
getNodeManager().getRegisteredDataNodes());
+ pingRegisteredDataNodes(
+ genHeartbeatReq(),
getNodeManager().getRegisteredDataNodes());
}
});
}
@@ -150,13 +150,19 @@ public class HeartbeatService {
return heartbeatReq;
}
+ private TConfigNodeHeartbeatReq genConfigNodeHeartbeatReq() {
+ TConfigNodeHeartbeatReq req = new TConfigNodeHeartbeatReq();
+ req.setTimestamp(System.currentTimeMillis());
+ return req;
+ }
+
/**
* Send heartbeat requests to all the Registered ConfigNodes.
*
* @param registeredConfigNodes ConfigNodes that registered in cluster
*/
private void pingRegisteredConfigNodes(
- THeartbeatReq heartbeatReq, List<TConfigNodeLocation>
registeredConfigNodes) {
+ TConfigNodeHeartbeatReq heartbeatReq, List<TConfigNodeLocation>
registeredConfigNodes) {
// Send heartbeat requests
for (TConfigNodeLocation configNodeLocation : registeredConfigNodes) {
if (configNodeLocation.getConfigNodeId() ==
ConfigNodeHeartbeatCache.CURRENT_NODE_ID) {
@@ -165,12 +171,10 @@ public class HeartbeatService {
}
ConfigNodeHeartbeatHandler handler =
- new ConfigNodeHeartbeatHandler(configNodeLocation.getConfigNodeId(),
loadCache);
+ new ConfigNodeHeartbeatHandler(
+ configManager, configNodeLocation.getConfigNodeId(), loadCache);
AsyncConfigNodeHeartbeatClientPool.getInstance()
- .getConfigNodeHeartBeat(
- configNodeLocation.getInternalEndPoint(),
- heartbeatReq.getHeartbeatTimestamp(),
- handler);
+ .getConfigNodeHeartBeat(configNodeLocation.getInternalEndPoint(),
heartbeatReq, handler);
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 41ef5ca9ce5..a340df8f352 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -476,7 +476,7 @@ public class NodeManager {
List<TDataNodeConfiguration> registeredDataNodes =
this.getRegisteredDataNodes();
if (registeredDataNodes != null) {
registeredDataNodes.forEach(
- (registeredDataNode) -> {
+ registeredDataNode -> {
TDataNodeInfo dataNodeInfo = new TDataNodeInfo();
int dataNodeId = registeredDataNode.getLocation().getDataNodeId();
dataNodeInfo.setDataNodeId(dataNodeId);
@@ -544,7 +544,7 @@ public class NodeManager {
List<TConfigNodeLocation> registeredConfigNodes =
this.getRegisteredConfigNodes();
if (registeredConfigNodes != null) {
registeredConfigNodes.forEach(
- (configNodeLocation) -> {
+ configNodeLocation -> {
TConfigNodeInfo info = new TConfigNodeInfo();
int configNodeId = configNodeLocation.getConfigNodeId();
info.setConfigNodeId(configNodeId);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index 0a2a9fd9c5e..d086e53fdd9 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -92,7 +92,7 @@ public class ConfigNode implements ConfigNodeMBean {
ServiceType.CONFIG_NODE.getJmxName());
private final RegisterManager registerManager = new RegisterManager();
- private ConfigManager configManager;
+ protected ConfigManager configManager;
private ConfigNode() {
// We do not init anything here, so that we can re-initialize the instance
in IT.
@@ -116,7 +116,7 @@ public class ConfigNode implements ConfigNodeMBean {
try {
processPid();
// Add shutdown hook
- Runtime.getRuntime().addShutdownHook(new ConfigNodeShutdownHook());
+ addShutDownHook();
// Set up internal services
setUpInternalServices();
// Init ConfigManager
@@ -291,7 +291,7 @@ public class ConfigNode implements ConfigNodeMBean {
x -> ThreadName.getThreadPoolTheThreadBelongs(x).name()));
}
- private void initConfigManager() {
+ void initConfigManager() {
try {
configManager = new ConfigManager();
} catch (IOException e) {
@@ -420,6 +420,10 @@ public class ConfigNode implements ConfigNodeMBean {
configManager.addMetrics();
}
+ protected void addShutDownHook() {
+ Runtime.getRuntime().addShutdownHook(new ConfigNodeShutdownHook());
+ }
+
@TestOnly
public void setConfigManager(ConfigManager configManager) {
this.configManager = configManager;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeCommandLine.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeCommandLine.java
index 3eee551f50f..21ccbab3421 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeCommandLine.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeCommandLine.java
@@ -76,7 +76,7 @@ public class ConfigNodeCommandLine extends ServerCommandLine {
LOGGER.error("Meet error when doing start checking", e);
return -1;
}
- ConfigNode.getInstance().active();
+ activeConfigNodeInstance();
} else if (MODE_REMOVE.equals(mode)) {
// remove ConfigNode
try {
@@ -93,7 +93,11 @@ public class ConfigNodeCommandLine extends ServerCommandLine
{
return 0;
}
- private void doRemoveConfigNode(String[] args) throws IOException {
+ protected void activeConfigNodeInstance() {
+ ConfigNode.getInstance().active();
+ }
+
+ protected void doRemoveConfigNode(String[] args) throws IOException {
if (args.length != 2) {
LOGGER.info(REMOVE_CONFIGNODE_USAGE);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeShutdownHook.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeShutdownHook.java
index bd8ea079690..bf3d0d6d8a4 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeShutdownHook.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeShutdownHook.java
@@ -46,7 +46,7 @@ public class ConfigNodeShutdownHook extends Thread {
@Override
public void run() {
- boolean isLeader =
ConfigNode.getInstance().getConfigManager().getConsensusManager().isLeader();
+ boolean isLeader =
getConfigNodeInstance().getConfigManager().getConsensusManager().isLeader();
try {
ConfigNode.getInstance().deactivate();
@@ -93,4 +93,8 @@ public class ConfigNodeShutdownHook extends Thread {
Runtime.getRuntime().totalMemory() -
Runtime.getRuntime().freeMemory()));
}
}
+
+ protected ConfigNode getConfigNodeInstance() {
+ return ConfigNode.getInstance();
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 047629d477d..3c084e62fa3 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -73,6 +73,8 @@ import
org.apache.iotdb.confignode.rpc.thrift.TAuthizedPatternTreeResp;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
+import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeHeartbeatReq;
+import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeHeartbeatResp;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TCountDatabaseResp;
@@ -819,8 +821,10 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
}
@Override
- public long getConfigNodeHeartBeat(long timestamp) {
- return timestamp;
+ public TConfigNodeHeartbeatResp
getConfigNodeHeartBeat(TConfigNodeHeartbeatReq heartbeatReq) {
+ TConfigNodeHeartbeatResp resp = new TConfigNodeHeartbeatResp();
+ resp.setTimestamp(System.currentTimeMillis());
+ return resp;
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
index dba48f6e09c..7ee76d0d97a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
@@ -41,6 +41,8 @@ import
org.apache.iotdb.confignode.rpc.thrift.TAuthizedPatternTreeResp;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
+import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeHeartbeatReq;
+import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeHeartbeatResp;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TCountDatabaseResp;
@@ -682,7 +684,8 @@ public class ConfigNodeClient implements
IConfigNodeRPCService.Iface, ThriftClie
}
@Override
- public long getConfigNodeHeartBeat(long timestamp) throws TException {
+ public TConfigNodeHeartbeatResp
getConfigNodeHeartBeat(TConfigNodeHeartbeatReq req)
+ throws TException {
throw new TException("DataNode to ConfigNode client doesn't support
getConfigNodeHeartBeat.");
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 1f49bae4ecd..9260e39d22b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -257,6 +257,8 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
private final DataNodeThrottleQuotaManager throttleQuotaManager =
DataNodeThrottleQuotaManager.getInstance();
+ private final CommonConfig commonConfig =
CommonDescriptor.getInstance().getConfig();
+
private static final String SYSTEM = "system";
public DataNodeInternalRPCServiceImpl() {
@@ -1156,9 +1158,9 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
}
AuthorityChecker.getAuthorityFetcher().refreshToken();
resp.setHeartbeatTimestamp(req.getHeartbeatTimestamp());
-
resp.setStatus(CommonDescriptor.getInstance().getConfig().getNodeStatus().getStatus());
- if (CommonDescriptor.getInstance().getConfig().getStatusReason() != null) {
-
resp.setStatusReason(CommonDescriptor.getInstance().getConfig().getStatusReason());
+ resp.setStatus(commonConfig.getNodeStatus().getStatus());
+ if (commonConfig.getStatusReason() != null) {
+ resp.setStatusReason(commonConfig.getStatusReason());
}
if (req.getSchemaRegionIds() != null) {
spaceQuotaManager.updateSpaceQuotaUsage(req.getSpaceQuotaUsage());
@@ -1240,8 +1242,6 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
}
private void sampleDiskLoad(TLoadSample loadSample) {
- final CommonConfig commonConfig =
CommonDescriptor.getInstance().getConfig();
-
double availableDisk =
MetricService.getInstance()
.getAutoGauge(
@@ -1335,7 +1335,6 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
@Override
public TSStatus setSystemStatus(String status) throws TException {
try {
- final CommonConfig commonConfig =
CommonDescriptor.getInstance().getConfig();
commonConfig.setNodeStatus(NodeStatus.parse(status));
if (commonConfig.getNodeStatus().equals(NodeStatus.Removing)) {
PipeAgent.runtime().stop();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowClusterTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowClusterTask.java
index e71130ac2d5..8a2662b6d99 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowClusterTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowClusterTask.java
@@ -98,6 +98,7 @@ public class ShowClusterTask implements IConfigTask {
.getColumnBuilder(6)
.writeBinary(new Binary(versionInfo.getBuildInfo(),
TSFileConfig.STRING_CHARSET));
}
+
builder.declarePosition();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 3f8ee4f3c90..0b802f3b8fe 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -614,11 +614,11 @@ public class DataNode implements DataNodeMBean {
}
/**
- * Generate dataNodeConfiguration.
+ * Generate dataNodeConfiguration. Warning: Don't private this method !!!
*
* @return TDataNodeConfiguration
*/
- private TDataNodeConfiguration generateDataNodeConfiguration() {
+ public TDataNodeConfiguration generateDataNodeConfiguration() {
// Set DataNodeLocation
TDataNodeLocation location = generateDataNodeLocation();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DateTimeUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DateTimeUtils.java
index 7c74cb44842..2bc5afddcdc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DateTimeUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DateTimeUtils.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.protocol.session.SessionManager;
import java.time.DateTimeException;
+import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
@@ -695,6 +696,32 @@ public class DateTimeUtils {
.toString();
}
+ public static String convertMillisecondToDurationStr(long millisecond) {
+ Duration duration = Duration.ofMillis(millisecond);
+ long days = duration.toDays();
+ long years = days / 365;
+ days = days % 365;
+ long months = days / 30;
+ days %= 30;
+ long hours = duration.toHours() % 24;
+ long minutes = duration.toMinutes() % 60;
+ long seconds = duration.getSeconds() % 60;
+ StringBuilder result = new StringBuilder();
+ if (years > 0) {
+ result.append(years).append(" years ");
+ }
+ if (months > 0) {
+ result.append(months).append(" months ");
+ }
+ if (days > 0) {
+ result.append(days).append(" days ");
+ }
+ result.append(hours).append(" hours ");
+ result.append(minutes).append(" minutes ");
+ result.append(seconds).append(" seconds");
+ return result.toString();
+ }
+
public static ZoneOffset toZoneOffset(ZoneId zoneId) {
return zoneId.getRules().getOffset(Instant.now());
}
diff --git a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
index 7a3564630c2..bb4e7135509 100644
--- a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
+++ b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
@@ -165,6 +165,17 @@ struct TSetThrottleQuotaReq {
2: required TThrottleQuota throttleQuota
}
+struct TLicense {
+ 1: required i64 licenseIssueTimestamp
+ 2: required i64 expireTimestamp
+ 4: required i16 dataNodeNumLimit
+ 5: required i32 cpuCoreNumLimit
+ 6: required i64 deviceNumLimit
+ 7: required i64 sensorNumLimit
+ 8: required i64 disconnectionFromActiveNodeTimeLimit
+ 9: required i16 mlNodeNumLimit
+}
+
enum TAggregationType {
COUNT,
AVG,
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index df9e090f859..c7629909bea 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -40,6 +40,7 @@ struct TGlobalConfig {
8: optional string timestampPrecision
9: optional string schemaEngineMode
10: optional i32 tagAttributeTotalSize
+ 11: optional bool isEnterprise
}
struct TRatisConfig {
@@ -406,6 +407,17 @@ struct TConfigNodeRegisterResp {
2: optional i32 configNodeId
}
+struct TConfigNodeHeartbeatReq {
+ 1: required i64 timestamp
+ 2: optional common.TLicense licence
+}
+
+struct TConfigNodeHeartbeatResp {
+ 1: required i64 timestamp
+ 2: optional string activateStatus
+ 3: optional common.TLicense license
+}
+
struct TAddConsensusGroupReq {
1: required list<common.TConfigNodeLocation> configNodeList
}
@@ -519,6 +531,10 @@ struct TNodeVersionInfo {
2: required string buildInfo;
}
+struct TNodeActivateInfo {
+ 1: required string status
+}
+
struct TShowVariablesResp {
1: required common.TSStatus status
2: optional TClusterParameters clusterParameters
@@ -781,6 +797,15 @@ struct TShowThrottleReq{
1: optional string userName;
}
+// ====================================================
+// Activation
+// ====================================================
+struct TLicenseContentResp {
+ 1: required common.TSStatus status
+ 2: optional common.TLicense licenseContent
+}
+
+
service IConfigNodeRPCService {
// ======================================================
@@ -1055,7 +1080,7 @@ service IConfigNodeRPCService {
common.TSStatus stopConfigNode(common.TConfigNodeLocation configNodeLocation)
/** The ConfigNode-leader will ping other ConfigNodes periodically */
- i64 getConfigNodeHeartBeat(i64 timestamp)
+ TConfigNodeHeartbeatResp getConfigNodeHeartBeat(TConfigNodeHeartbeatReq req)
// ======================================================
// UDF
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index d1502b7f401..c3333fe4d1c 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -250,6 +250,13 @@ struct THeartbeatReq {
7: optional map<string, common.TSpaceQuota> spaceQuotaUsage
8: optional bool needPipeMetaList
9: optional i64 deviceQuotaRemain
+ 10: optional TDataNodeActivation activation
+}
+
+struct TDataNodeActivation {
+ 1: required bool activated
+ 2: required i64 deviceNumRemain
+ 3: required i64 sensorNumRemain
}
struct THeartbeatResp {
@@ -264,6 +271,7 @@ struct THeartbeatResp {
// TODO: schemaLimitLevel is not used from 1.3.0, keep it for compatibility
9: optional TSchemaLimitLevel schemaLimitLevel
10: optional list<binary> pipeMetaList
+ 11: optional string activateStatus
}
struct TPipeHeartbeatReq {