This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9a968e7253 [IOTDB-4550] Add NodeId to Peer (#7531)
9a968e7253 is described below
commit 9a968e72534275e4d3f0180eebc2321969b41368
Author: Itami Sho <[email protected]>
AuthorDate: Wed Oct 12 22:04:58 2022 +0800
[IOTDB-4550] Add NodeId to Peer (#7531)
---
.../iotdb/confignode/conf/ConfigNodeConfig.java | 13 +++++
.../confignode/conf/SystemPropertiesUtils.java | 10 ++++
.../statemachine/PartitionRegionStateMachine.java | 24 ++++++---
.../iotdb/confignode/manager/ConfigManager.java | 61 ++++++++++++++--------
.../iotdb/confignode/manager/ConsensusManager.java | 19 +++++--
.../apache/iotdb/confignode/manager/IManager.java | 7 ++-
.../iotdb/confignode/manager/node/NodeManager.java | 38 ++++++++++++--
.../iotdb/confignode/persistence/NodeInfo.java | 2 +
.../iotdb/confignode/service/ConfigNode.java | 35 ++++++++++---
.../thrift/ConfigNodeRPCServiceProcessor.java | 9 ++--
.../thrift/ConfigNodeRPCServiceProcessorTest.java | 4 +-
.../org/apache/iotdb/consensus/IStateMachine.java | 5 +-
.../org/apache/iotdb/consensus/common/Peer.java | 26 +++++++--
.../iotdb/consensus/config/ConsensusConfig.java | 22 ++++++--
.../multileader/MultiLeaderConsensus.java | 12 +++--
.../multileader/MultiLeaderServerImpl.java | 6 ++-
.../service/MultiLeaderRPCServiceProcessor.java | 4 +-
.../ratis/ApplicationStateMachineProxy.java | 2 +-
.../iotdb/consensus/ratis/RatisConsensus.java | 24 +++++----
.../org/apache/iotdb/consensus/ratis/Utils.java | 36 +++++++++----
.../consensus/standalone/StandAloneConsensus.java | 11 ++--
.../consensus/standalone/StandAloneServerImpl.java | 2 +-
.../multileader/MultiLeaderConsensusTest.java | 7 +--
.../iotdb/consensus/multileader/RecoveryTest.java | 5 +-
.../iotdb/consensus/ratis/RatisConsensusTest.java | 7 +--
.../apache/iotdb/consensus/ratis/TestUtils.java | 7 +--
.../iotdb/consensus/standalone/RecoveryTest.java | 5 +-
.../standalone/StandAloneConsensusTest.java | 30 ++++++-----
.../apache/iotdb/commons/conf/IoTDBConstant.java | 2 +
.../apache/iotdb/db/client/ConfigNodeClient.java | 3 +-
.../db/consensus/DataRegionConsensusImpl.java | 1 +
.../db/consensus/SchemaRegionConsensusImpl.java | 1 +
.../iotdb/db/service/RegionMigrateService.java | 8 ++-
.../impl/DataNodeInternalRPCServiceImpl.java | 9 +++-
.../service/thrift/impl/DataNodeRegionManager.java | 4 +-
.../DataNodeInternalRPCServiceImplTest.java | 8 ++-
.../src/main/thrift/confignode.thrift | 7 ++-
.../src/main/thrift/mutlileader.thrift | 2 +
38 files changed, 345 insertions(+), 133 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 269ac05a5f..627ba6ccb5 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -29,6 +29,11 @@ import java.io.File;
public class ConfigNodeConfig {
+ /**
+ * the config node id for cluster mode, the default value -1 should be
changed after join cluster
+ */
+ private int configNodeId = 0;
+
/** could set ip or hostname */
private String internalAddress = "0.0.0.0";
@@ -209,6 +214,14 @@ public class ConfigNodeConfig {
return dir;
}
+ public int getConfigNodeId() {
+ return configNodeId;
+ }
+
+ public void setConfigNodeId(int configNodeId) {
+ this.configNodeId = configNodeId;
+ }
+
public String getInternalAddress() {
return internalAddress;
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
index abff028239..9c166589d1 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
@@ -239,6 +239,16 @@ public class SystemPropertiesUtils {
storeSystemProperties(systemProperties);
}
+ public static void storeConfigNodeId(int nodeId) throws IOException {
+ if (!systemPropertiesFile.exists()) {
+ return;
+ }
+
+ Properties systemProperties = getSystemProperties();
+ systemProperties.setProperty("config_node_id", String.valueOf(nodeId));
+ storeSystemProperties(systemProperties);
+ }
+
private static synchronized Properties getSystemProperties() throws
IOException {
// Create confignode-system.properties file if necessary
if (!systemPropertiesFile.exists()) {
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
index b45c933671..a901e5d187 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.confignode.consensus.statemachine;
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.auth.AuthException;
@@ -46,12 +47,14 @@ public class PartitionRegionStateMachine
private static final Logger LOGGER =
LoggerFactory.getLogger(PartitionRegionStateMachine.class);
private final ConfigPlanExecutor executor;
private ConfigManager configManager;
- private final TEndPoint currentNode;
+ private final TEndPoint currentNodeTEndPoint;
+ private final int currentNodeId;
public PartitionRegionStateMachine(ConfigManager configManager,
ConfigPlanExecutor executor) {
this.executor = executor;
this.configManager = configManager;
- this.currentNode =
+ this.currentNodeId =
ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId();
+ this.currentNodeTEndPoint =
new TEndPoint()
.setIp(ConfigNodeDescriptor.getInstance().getConf().getInternalAddress())
.setPort(ConfigNodeDescriptor.getInstance().getConf().getConsensusPort());
@@ -145,16 +148,25 @@ public class PartitionRegionStateMachine
}
@Override
- public void notifyLeaderChanged(ConsensusGroupId groupId, TEndPoint
newLeader) {
- if (currentNode.equals(newLeader)) {
- LOGGER.info("Current node {} becomes Leader", newLeader);
+ public void notifyLeaderChanged(ConsensusGroupId groupId, int newLeaderId) {
+ TConfigNodeLocation newLeaderConfigNodeLocation =
+ configManager.getConfigNodeLocation(currentNodeId);
+ if (currentNodeId == newLeaderId) {
+ LOGGER.info(
+ "Current node [nodeId: {}, ip:port: {}] becomes Leader",
+ newLeaderId,
+ currentNodeTEndPoint);
configManager.getProcedureManager().shiftExecutor(true);
configManager.getLoadManager().startLoadBalancingService();
configManager.getNodeManager().startHeartbeatService();
configManager.getPartitionManager().startRegionCleaner();
} else {
LOGGER.info(
- "Current node {} is not longer the leader, the new leader is {}",
currentNode, newLeader);
+ "Current node [nodeId:{}, ip:port: {}] is not longer the leader, the
new leader is [nodeId:{}, ip:port: {}]",
+ currentNodeId,
+ currentNodeTEndPoint,
+ newLeaderId,
+ newLeaderConfigNodeLocation.getInternalEndPoint());
configManager.getProcedureManager().shiftExecutor(false);
configManager.getLoadManager().stopLoadBalancingService();
configManager.getNodeManager().stopHeartbeatService();
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index c1bde6dbc2..ad87b1f7a5 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -87,6 +87,7 @@ import
org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo;
import org.apache.iotdb.confignode.persistence.sync.ClusterSyncInfo;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
+import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
@@ -143,7 +144,7 @@ public class ConfigManager implements IManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(ConfigManager.class);
/** Manage PartitionTable read/write requests through the ConsensusLayer */
- private final ConsensusManager consensusManager;
+ private ConsensusManager consensusManager;
/** Manage cluster node */
private final NodeManager nodeManager;
@@ -170,6 +171,8 @@ public class ConfigManager implements IManager {
/** Sync */
private final SyncManager syncManager;
+ private final PartitionRegionStateMachine stateMachine;
+
public ConfigManager() throws IOException {
// Build the persistence module
NodeInfo nodeInfo = new NodeInfo();
@@ -192,7 +195,7 @@ public class ConfigManager implements IManager {
udfInfo,
triggerInfo,
syncInfo);
- PartitionRegionStateMachine stateMachine = new
PartitionRegionStateMachine(this, executor);
+ this.stateMachine = new PartitionRegionStateMachine(this, executor);
// Build the manager module
this.nodeManager = new NodeManager(this, nodeInfo);
@@ -204,10 +207,10 @@ public class ConfigManager implements IManager {
this.triggerManager = new TriggerManager(this, triggerInfo);
this.loadManager = new LoadManager(this);
this.syncManager = new SyncManager(this, syncInfo);
+ }
- // ConsensusManager must be initialized last, as it would load states from
disk and reinitialize
- // above managers
- this.consensusManager = new ConsensusManager(this, stateMachine);
+ public void initConsensusManager() throws IOException {
+ this.consensusManager = new ConsensusManager(this, this.stateMachine);
}
public void close() throws IOException {
@@ -665,24 +668,11 @@ public class ConfigManager implements IManager {
}
@Override
- public TSStatus registerConfigNode(TConfigNodeRegisterReq req) {
- // Check global configuration
- TSStatus status = confirmLeader();
-
- if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- TSStatus errorStatus = checkConfigNodeGlobalConfig(req);
- if (errorStatus != null) {
- return errorStatus;
- }
-
- procedureManager.addConfigNode(req);
- return StatusUtils.OK;
- }
-
- return status;
+ public TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq
req) {
+ return nodeManager.registerConfigNode(req);
}
- private TSStatus checkConfigNodeGlobalConfig(TConfigNodeRegisterReq req) {
+ public TSStatus checkConfigNodeGlobalConfig(TConfigNodeRegisterReq req) {
final String errorPrefix = "Reject register, please ensure that the
parameter ";
final String errorSuffix = " is consistent with the Seed-ConfigNode.";
@@ -735,8 +725,23 @@ public class ConfigManager implements IManager {
@Override
public TSStatus createPeerForConsensusGroup(List<TConfigNodeLocation>
configNodeLocations) {
- consensusManager.createPeerForConsensusGroup(configNodeLocations);
- return StatusUtils.OK;
+ for (int i = 0; i < 30; i++) {
+ try {
+ if (consensusManager == null) {
+ Thread.sleep(1000);
+ } else {
+ consensusManager.createPeerForConsensusGroup(configNodeLocations);
+ return StatusUtils.OK;
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.warn("Unexpected interruption during retry creating peer for
consensus group");
+ } catch (Exception e) {
+ LOGGER.error("Failed to create peer for consensus group", e);
+ break;
+ }
+ }
+ return StatusUtils.INTERNAL_ERROR;
}
@Override
@@ -1163,4 +1168,14 @@ public class ConfigManager implements IManager {
}
return filteredRegionReplicaSets;
}
+
+ public TConfigNodeLocation getConfigNodeLocation(int nodeId) {
+ List<TConfigNodeLocation> configNodeLocations =
this.nodeManager.getRegisteredConfigNodes();
+ for (TConfigNodeLocation configNodeLocation : configNodeLocations) {
+ if (configNodeLocation.getConfigNodeId() == nodeId) {
+ return configNodeLocation;
+ }
+ }
+ return null;
+ }
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
index 0b35146163..99dd15d077 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
@@ -84,6 +84,7 @@ public class ConsensusManager {
ConsensusFactory.getConsensusImpl(
CONF.getConfigNodeConsensusProtocolClass(),
ConsensusConfig.newBuilder()
+ .setThisNodeId(CONF.getConfigNodeId())
.setThisNode(new TEndPoint(CONF.getInternalAddress(),
CONF.getConsensusPort()))
.setRatisConfig(
RatisConfig.newBuilder()
@@ -172,7 +173,11 @@ public class ConsensusManager {
List<Peer> peerList = new ArrayList<>();
for (TConfigNodeLocation configNodeLocation : configNodeLocations) {
- peerList.add(new Peer(consensusGroupId,
configNodeLocation.getConsensusEndPoint()));
+ peerList.add(
+ new Peer(
+ consensusGroupId,
+ configNodeLocation.getConfigNodeId(),
+ configNodeLocation.getConsensusEndPoint()));
}
consensusImpl.createPeer(consensusGroupId, peerList);
}
@@ -188,7 +193,10 @@ public class ConsensusManager {
consensusImpl
.addPeer(
consensusGroupId,
- new Peer(consensusGroupId,
configNodeLocation.getConsensusEndPoint()))
+ new Peer(
+ consensusGroupId,
+ configNodeLocation.getConfigNodeId(),
+ configNodeLocation.getConsensusEndPoint()))
.isSuccess();
if (!result) {
@@ -207,7 +215,10 @@ public class ConsensusManager {
return consensusImpl
.removePeer(
consensusGroupId,
- new Peer(consensusGroupId,
tConfigNodeLocation.getConsensusEndPoint()))
+ new Peer(
+ consensusGroupId,
+ tConfigNodeLocation.getConfigNodeId(),
+ tConfigNodeLocation.getConsensusEndPoint()))
.isSuccess();
}
@@ -234,7 +245,7 @@ public class ConsensusManager {
getNodeManager().getRegisteredConfigNodes();
TConfigNodeLocation leaderLocation =
registeredConfigNodes.stream()
- .filter(leader ->
leader.getConsensusEndPoint().equals(leaderPeer.getEndpoint()))
+ .filter(leader -> leader.getConfigNodeId() ==
leaderPeer.getNodeId())
.findFirst()
.orElse(null);
if (leaderLocation != null) {
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 514cc03ba7..45707f8943 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -47,6 +47,7 @@ import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.manager.node.NodeManager;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
+import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
@@ -275,9 +276,9 @@ public interface IManager {
/**
* Register ConfigNode when it is first startup
*
- * @return TSStatus
+ * @return TConfigNodeRegisterResp
*/
- TSStatus registerConfigNode(TConfigNodeRegisterReq req);
+ TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq req);
/**
* Create peer in new node to build consensus group.
@@ -461,4 +462,6 @@ public interface IManager {
TGetTimeSlotListResp getTimeSlotList(GetTimeSlotListPlan plan);
TGetSeriesSlotListResp getSeriesSlotList(GetSeriesSlotListPlan plan);
+
+ TSStatus checkConfigNodeGlobalConfig(TConfigNodeRegisterReq req);
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index df31b8c922..7cb7159060 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -32,6 +32,7 @@ import
org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
import
org.apache.iotdb.confignode.client.async.AsyncConfigNodeHeartbeatClientPool;
import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
@@ -60,6 +61,8 @@ import org.apache.iotdb.confignode.persistence.NodeInfo;
import org.apache.iotdb.confignode.persistence.metric.NodeInfoMetrics;
import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
+import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
import org.apache.iotdb.confignode.rpc.thrift.TRatisConfig;
@@ -100,6 +103,9 @@ public class NodeManager {
public static final TEndPoint CURRENT_NODE =
new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort());
+ // when fail to register a new node, set node id to -1
+ private static final int ERROR_STATUS_NODE_ID = -1;
+
private final IManager configManager;
private final NodeInfo nodeInfo;
@@ -247,6 +253,28 @@ public class NodeManager {
return dataSet;
}
+ public TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq
req) {
+ // Check global configuration
+ TSStatus status = configManager.getConsensusManager().confirmLeader();
+
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ TSStatus errorStatus = configManager.checkConfigNodeGlobalConfig(req);
+ if (errorStatus != null) {
+ return new TConfigNodeRegisterResp()
+ .setStatus(errorStatus)
+ .setConfigNodeId(ERROR_STATUS_NODE_ID);
+ }
+
+ int nodeId = generateNodeId();
+ req.getConfigNodeLocation().setConfigNodeId(nodeId);
+
+ configManager.getProcedureManager().addConfigNode(req);
+ return new
TConfigNodeRegisterResp().setStatus(StatusUtils.OK).setConfigNodeId(nodeId);
+ }
+
+ return new
TConfigNodeRegisterResp().setStatus(status).setConfigNodeId(ERROR_STATUS_NODE_ID);
+ }
+
/**
* Get TDataNodeConfiguration
*
@@ -383,8 +411,6 @@ public class NodeManager {
* @param configNodeLocation The new ConfigNode
*/
public void applyConfigNode(TConfigNodeLocation configNodeLocation) {
- // Generate new ConfigNode's index
- configNodeLocation.setConfigNodeId(nodeInfo.generateNextNodeId());
ApplyConfigNodePlan applyConfigNodePlan = new
ApplyConfigNodePlan(configNodeLocation);
getConsensusManager().write(applyConfigNodePlan);
}
@@ -447,7 +473,9 @@ public class NodeManager {
ConsensusGenericResponse resp =
getConsensusManager()
.getConsensusImpl()
- .transferLeader(groupId, new Peer(groupId,
newLeader.getConsensusEndPoint()));
+ .transferLeader(
+ groupId,
+ new Peer(groupId, newLeader.getConfigNodeId(),
newLeader.getConsensusEndPoint()));
if (!resp.isSuccess()) {
return new
TSStatus(TSStatusCode.REMOVE_CONFIGNODE_FAILED.getStatusCode())
.setMessage("Remove ConfigNode failed because transfer ConfigNode
leader failed.");
@@ -750,4 +778,8 @@ public class NodeManager {
private PartitionManager getPartitionManager() {
return configManager.getPartitionManager();
}
+
+ public int generateNodeId() {
+ return nodeInfo.generateNextNodeId();
+ }
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
index 7adcf7e389..2d3876a3f3 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
@@ -305,6 +305,8 @@ public class NodeInfo implements SnapshotProcessor {
registeredConfigNodes.add(applyConfigNodePlan.getConfigNodeLocation());
SystemPropertiesUtils.storeConfigNodeList(new
ArrayList<>(registeredConfigNodes));
+ SystemPropertiesUtils.storeConfigNodeId(
+ applyConfigNodePlan.getConfigNodeLocation().getConfigNodeId());
LOGGER.info(
"Successfully apply ConfigNode: {}. Current ConfigNodeGroup: {}",
applyConfigNodePlan.getConfigNodeLocation(),
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index 6d5411c47f..501bd7b5e1 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
+import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.service.thrift.ConfigNodeRPCService;
import
org.apache.iotdb.confignode.service.thrift.ConfigNodeRPCServiceProcessor;
import org.apache.iotdb.db.service.metrics.MetricService;
@@ -55,6 +56,10 @@ public class ConfigNode implements ConfigNodeMBean {
private static final int SCHEDULE_WAITING_RETRY_NUM = 20;
+ private static final int SEED_CONFIG_NODE_ID = 0;
+
+ private static final int INIT_NON_SEED_CONFIG_NODE_ID = -1;
+
private final String mbeanName =
String.format(
"%s:%s=%s",
@@ -90,13 +95,16 @@ public class ConfigNode implements ConfigNodeMBean {
/* Initial startup of Seed-ConfigNode */
if (ConfigNodeDescriptor.getInstance().isSeedConfigNode()) {
+ configManager.initConsensusManager();
+
SystemPropertiesUtils.storeSystemParameters();
+ SystemPropertiesUtils.storeConfigNodeId(SEED_CONFIG_NODE_ID);
// Seed-ConfigNode should apply itself when first start
configManager
.getNodeManager()
.applyConfigNode(
new TConfigNodeLocation(
- 0,
+ SEED_CONFIG_NODE_ID,
new TEndPoint(CONF.getInternalAddress(),
CONF.getInternalPort()),
new TEndPoint(CONF.getInternalAddress(),
CONF.getConsensusPort())));
// We always set up Seed-ConfigNode's RPC service lastly to ensure that
@@ -185,7 +193,7 @@ public class ConfigNode implements ConfigNodeMBean {
TConfigNodeRegisterReq req =
new TConfigNodeRegisterReq(
new TConfigNodeLocation(
- -1,
+ INIT_NON_SEED_CONFIG_NODE_ID,
new TEndPoint(CONF.getInternalAddress(),
CONF.getInternalPort()),
new TEndPoint(CONF.getInternalAddress(),
CONF.getConsensusPort())),
CONF.getDataRegionConsensusProtocolClass(),
@@ -208,12 +216,25 @@ public class ConfigNode implements ConfigNodeMBean {
}
for (int retry = 0; retry < 3; retry++) {
- TSStatus status =
- (TSStatus)
- SyncConfigNodeClientPool.getInstance()
- .sendSyncRequestToConfigNodeWithRetry(
- targetConfigNode, req,
ConfigNodeRequestType.REGISTER_CONFIG_NODE);
+ TSStatus status;
+ TConfigNodeRegisterResp resp = null;
+ Object obj =
+ SyncConfigNodeClientPool.getInstance()
+ .sendSyncRequestToConfigNodeWithRetry(
+ targetConfigNode, req,
ConfigNodeRequestType.REGISTER_CONFIG_NODE);
+
+ if (obj instanceof TConfigNodeRegisterResp) {
+ resp = (TConfigNodeRegisterResp) obj;
+ status = resp.getStatus();
+ } else {
+ status = (TSStatus) obj;
+ }
+
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ SystemPropertiesUtils.storeConfigNodeId(resp.getConfigNodeId());
+ CONF.setConfigNodeId(resp.getConfigNodeId());
+
+ configManager.initConsensusManager();
return;
} else if (status.getCode() ==
TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
targetConfigNode = status.getRedirectNode();
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 8381d6e2cc..ace66de37e 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -70,6 +70,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
+import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
@@ -400,13 +401,13 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
}
@Override
- public TSStatus registerConfigNode(TConfigNodeRegisterReq req) {
- TSStatus status = configManager.registerConfigNode(req);
+ public TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq
req) throws TException {
+ TConfigNodeRegisterResp resp = configManager.registerConfigNode(req);
// Print log to record the ConfigNode that performs the
RegisterConfigNodeRequest
- LOGGER.info("Execute RegisterConfigNodeRequest {} with result {}", req,
status);
+ LOGGER.info("Execute RegisterConfigNodeRequest {} with result {}", req,
resp.getStatus());
- return status;
+ return resp;
}
@Override
diff --git
a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
index 1ce234806f..50a1e800d2 100644
---
a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
+++
b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
@@ -87,7 +87,9 @@ public class ConfigNodeRPCServiceProcessorTest {
@Before
public void before() throws IOException {
- processor = new ConfigNodeRPCServiceProcessor(new ConfigManager());
+ ConfigManager configManager = new ConfigManager();
+ configManager.initConsensusManager();
+ processor = new ConfigNodeRPCServiceProcessor(configManager);
processor.getConsensusManager().singleCopyMayWaitUntilLeaderReady();
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
index f0b489e2ad..4641898a0e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.consensus;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.consensus.common.DataSet;
@@ -132,9 +131,9 @@ public interface IStateMachine {
* can possibly be this server.
*
* @param groupId The id of this consensus group.
- * @param newLeader The id of the new leader.
+ * @param newLeaderId The id of the new leader node.
*/
- default void notifyLeaderChanged(ConsensusGroupId groupId, TEndPoint
newLeader) {}
+ default void notifyLeaderChanged(ConsensusGroupId groupId, int
newLeaderId) {}
/**
* Notify the {@link IStateMachine} a configuration change. This method
will be invoked when a
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java
b/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java
index ad962c2525..c7466206f1 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java
@@ -21,20 +21,28 @@ package org.apache.iotdb.consensus.common;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.DataOutputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
// TODO Use a mature IDL framework such as Protobuf to manage this structure
public class Peer {
+ private final Logger LOGGER = LoggerFactory.getLogger(Peer.class);
private final ConsensusGroupId groupId;
private final TEndPoint endpoint;
+ private final int nodeId;
- public Peer(ConsensusGroupId groupId, TEndPoint endpoint) {
+ public Peer(ConsensusGroupId groupId, int nodeId, TEndPoint endpoint) {
this.groupId = groupId;
+ this.nodeId = nodeId;
this.endpoint = endpoint;
}
@@ -46,16 +54,26 @@ public class Peer {
return endpoint;
}
+ public int getNodeId() {
+ return nodeId;
+ }
+
public void serialize(DataOutputStream stream) {
- ThriftCommonsSerDeUtils.serializeTConsensusGroupId(
- groupId.convertToTConsensusGroupId(), stream);
- ThriftCommonsSerDeUtils.serializeTEndPoint(endpoint, stream);
+ try {
+ ThriftCommonsSerDeUtils.serializeTConsensusGroupId(
+ groupId.convertToTConsensusGroupId(), stream);
+ BasicStructureSerDeUtil.write(nodeId, stream);
+ ThriftCommonsSerDeUtils.serializeTEndPoint(endpoint, stream);
+ } catch (IOException e) {
+ LOGGER.error("Failed to serialize Peer", e);
+ }
}
public static Peer deserialize(ByteBuffer buffer) {
return new Peer(
ConsensusGroupId.Factory.createFromTConsensusGroupId(
ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(buffer)),
+ BasicStructureSerDeUtil.readInt(buffer),
ThriftCommonsSerDeUtils.deserializeTEndPoint(buffer));
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
b/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
index be3b4bf342..42527275ea 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
@@ -23,24 +23,31 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
public class ConsensusConfig {
- private final TEndPoint thisNode;
+ private final TEndPoint thisNodeEndPoint;
+ private final int thisNodeId;
private final String storageDir;
private final RatisConfig ratisConfig;
private final MultiLeaderConfig multiLeaderConfig;
private ConsensusConfig(
TEndPoint thisNode,
+ int thisNodeId,
String storageDir,
RatisConfig ratisConfig,
MultiLeaderConfig multiLeaderConfig) {
- this.thisNode = thisNode;
+ this.thisNodeEndPoint = thisNode;
+ this.thisNodeId = thisNodeId;
this.storageDir = storageDir;
this.ratisConfig = ratisConfig;
this.multiLeaderConfig = multiLeaderConfig;
}
- public TEndPoint getThisNode() {
- return thisNode;
+ public TEndPoint getThisNodeEndPoint() {
+ return thisNodeEndPoint;
+ }
+
+ public int getThisNodeId() {
+ return thisNodeId;
}
public String getStorageDir() {
@@ -62,6 +69,7 @@ public class ConsensusConfig {
public static class Builder {
private TEndPoint thisNode;
+ private int thisNodeId;
private String storageDir;
private RatisConfig ratisConfig;
private MultiLeaderConfig multiLeaderConfig;
@@ -69,6 +77,7 @@ public class ConsensusConfig {
public ConsensusConfig build() {
return new ConsensusConfig(
thisNode,
+ thisNodeId,
storageDir,
ratisConfig != null ? ratisConfig : RatisConfig.newBuilder().build(),
multiLeaderConfig != null ? multiLeaderConfig :
MultiLeaderConfig.newBuilder().build());
@@ -79,6 +88,11 @@ public class ConsensusConfig {
return this;
}
+ public Builder setThisNodeId(int thisNodeId) {
+ this.thisNodeId = thisNodeId;
+ return this;
+ }
+
public Builder setStorageDir(String storageDir) {
this.storageDir = storageDir;
return this;
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
index 2f35bdfe71..a5a5c9d0d3 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
@@ -71,6 +71,7 @@ public class MultiLeaderConsensus implements IConsensus {
private final Logger logger =
LoggerFactory.getLogger(MultiLeaderConsensus.class);
private final TEndPoint thisNode;
+ private final int thisNodeId;
private final File storageDir;
private final IStateMachine.Registry registry;
private final Map<ConsensusGroupId, MultiLeaderServerImpl> stateMachineMap =
@@ -82,7 +83,8 @@ public class MultiLeaderConsensus implements IConsensus {
private final IClientManager<TEndPoint, SyncMultiLeaderServiceClient>
syncClientManager;
public MultiLeaderConsensus(ConsensusConfig config, Registry registry) {
- this.thisNode = config.getThisNode();
+ this.thisNode = config.getThisNodeEndPoint();
+ this.thisNodeId = config.getThisNodeId();
this.storageDir = new File(config.getStorageDir());
this.config = config.getMultiLeaderConfig();
this.registry = registry;
@@ -126,7 +128,7 @@ public class MultiLeaderConsensus implements IConsensus {
MultiLeaderServerImpl consensus =
new MultiLeaderServerImpl(
path.toString(),
- new Peer(consensusGroupId, thisNode),
+ new Peer(consensusGroupId, thisNodeId, thisNode),
new ArrayList<>(),
registry.apply(consensusGroupId),
clientManager,
@@ -188,7 +190,7 @@ public class MultiLeaderConsensus implements IConsensus {
.setException(new IllegalPeerNumException(consensusGroupSize))
.build();
}
- if (!peers.contains(new Peer(groupId, thisNode))) {
+ if (!peers.contains(new Peer(groupId, thisNodeId, thisNode))) {
return ConsensusGenericResponse.newBuilder()
.setException(new IllegalPeerEndpointException(thisNode, peers))
.build();
@@ -206,7 +208,7 @@ public class MultiLeaderConsensus implements IConsensus {
MultiLeaderServerImpl impl =
new MultiLeaderServerImpl(
path,
- new Peer(groupId, thisNode),
+ new Peer(groupId, thisNodeId, thisNode),
peers,
registry.apply(groupId),
clientManager,
@@ -345,7 +347,7 @@ public class MultiLeaderConsensus implements IConsensus {
if (!stateMachineMap.containsKey(groupId)) {
return null;
}
- return new Peer(groupId, thisNode);
+ return new Peer(groupId, thisNodeId, thisNode);
}
@Override
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index c9c71aabca..aaeb63d788 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -355,7 +355,8 @@ public class MultiLeaderServerImpl {
client.buildSyncLogChannel(
new TBuildSyncLogChannelReq(
targetPeer.getGroupId().convertToTConsensusGroupId(),
- targetPeer.getEndpoint()));
+ targetPeer.getEndpoint(),
+ targetPeer.getNodeId()));
if (!isSuccess(res.status)) {
throw new ConsensusGroupAddPeerException(
String.format("build sync log channel failed from %s to %s",
peer, targetPeer));
@@ -397,7 +398,8 @@ public class MultiLeaderServerImpl {
client.removeSyncLogChannel(
new TRemoveSyncLogChannelReq(
targetPeer.getGroupId().convertToTConsensusGroupId(),
- targetPeer.getEndpoint()));
+ targetPeer.getEndpoint(),
+ targetPeer.getNodeId()));
if (!isSuccess(res.status)) {
throw new ConsensusGroupAddPeerException(
String.format("remove sync log channel failed from %s to %s",
peer, targetPeer));
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index 2544451785..3f045e5e66 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -193,7 +193,7 @@ public class MultiLeaderRPCServiceProcessor implements
MultiLeaderConsensusIServ
}
TSStatus responseStatus;
try {
- impl.buildSyncLogChannel(new Peer(groupId, req.endPoint));
+ impl.buildSyncLogChannel(new Peer(groupId, req.nodeId, req.endPoint));
responseStatus = new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (ConsensusGroupAddPeerException e) {
responseStatus = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
@@ -220,7 +220,7 @@ public class MultiLeaderRPCServiceProcessor implements
MultiLeaderConsensusIServ
}
TSStatus responseStatus;
try {
- impl.removeSyncLogChannel(new Peer(groupId, req.endPoint));
+ impl.removeSyncLogChannel(new Peer(groupId, req.nodeId, req.endPoint));
responseStatus = new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (ConsensusGroupAddPeerException e) {
responseStatus = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
index 45fa3f5e7d..938a49b4c4 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
@@ -257,7 +257,7 @@ public class ApplicationStateMachineProxy extends
BaseStateMachine {
.event()
.notifyLeaderChanged(
Utils.fromRaftGroupIdToConsensusGroupId(groupMemberId.getGroupId()),
- Utils.formRaftPeerIdToTEndPoint(newLeaderId));
+ Utils.fromRaftPeerIdToNodeId(newLeaderId));
}
@Override
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 61e61dd9e4..1e9eeda0dd 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -119,13 +119,15 @@ class RatisConsensus implements IConsensus {
public RatisConsensus(ConsensusConfig config, IStateMachine.Registry
registry)
throws IOException {
- myself = Utils.fromTEndPointAndPriorityToRaftPeer(config.getThisNode(),
DEFAULT_PRIORITY);
+ myself =
+ Utils.fromNodeInfoAndPriorityToRaftPeer(
+ config.getThisNodeId(), config.getThisNodeEndPoint(),
DEFAULT_PRIORITY);
System.setProperty(
"org.apache.ratis.thirdparty.io.netty.allocator.useCacheForAllThreads",
"false");
RaftServerConfigKeys.setStorageDir(
properties, Collections.singletonList(new
File(config.getStorageDir())));
- GrpcConfigKeys.Server.setPort(properties, config.getThisNode().getPort());
+ GrpcConfigKeys.Server.setPort(properties,
config.getThisNodeEndPoint().getPort());
Utils.initRatisConfig(properties, config.getRatisConfig());
this.config = config.getRatisConfig();
@@ -267,7 +269,7 @@ class RatisConsensus implements IConsensus {
}
if (suggestedLeader != null) {
- TEndPoint leaderEndPoint =
Utils.formRaftPeerIdToTEndPoint(suggestedLeader.getId());
+ TEndPoint leaderEndPoint =
Utils.fromRaftPeerAddressToTEndPoint(suggestedLeader.getAddress());
writeResult.setRedirectNode(new TEndPoint(leaderEndPoint.getIp(),
leaderEndPoint.getPort()));
}
@@ -382,7 +384,7 @@ class RatisConsensus implements IConsensus {
public ConsensusGenericResponse addPeer(ConsensusGroupId groupId, Peer peer)
{
RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
RaftGroup group = getGroupInfo(raftGroupId);
- RaftPeer peerToAdd = Utils.fromTEndPointAndPriorityToRaftPeer(peer,
DEFAULT_PRIORITY);
+ RaftPeer peerToAdd = Utils.fromPeerAndPriorityToRaftPeer(peer,
DEFAULT_PRIORITY);
// pre-conditions: group exists and myself in this group
if (group == null || !group.getPeers().contains(myself)) {
@@ -417,7 +419,7 @@ class RatisConsensus implements IConsensus {
public ConsensusGenericResponse removePeer(ConsensusGroupId groupId, Peer
peer) {
RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
RaftGroup group = getGroupInfo(raftGroupId);
- RaftPeer peerToRemove = Utils.fromTEndPointAndPriorityToRaftPeer(peer,
DEFAULT_PRIORITY);
+ RaftPeer peerToRemove = Utils.fromPeerAndPriorityToRaftPeer(peer,
DEFAULT_PRIORITY);
// pre-conditions: group exists and myself in this group
if (group == null || !group.getPeers().contains(myself)) {
@@ -487,7 +489,7 @@ class RatisConsensus implements IConsensus {
return failed(new ConsensusGroupNotExistException(groupId));
}
- RaftPeer newRaftLeader =
Utils.fromTEndPointAndPriorityToRaftPeer(newLeader, LEADER_PRIORITY);
+ RaftPeer newRaftLeader = Utils.fromPeerAndPriorityToRaftPeer(newLeader,
LEADER_PRIORITY);
ArrayList<RaftPeer> newConfiguration = new ArrayList<>();
for (RaftPeer raftPeer : raftGroup.getPeers()) {
@@ -496,8 +498,10 @@ class RatisConsensus implements IConsensus {
} else {
// degrade every other peer to default priority
newConfiguration.add(
- Utils.fromTEndPointAndPriorityToRaftPeer(
- Utils.formRaftPeerIdToTEndPoint(raftPeer.getId()),
DEFAULT_PRIORITY));
+ Utils.fromNodeInfoAndPriorityToRaftPeer(
+ Utils.fromRaftPeerIdToNodeId(raftPeer.getId()),
+ Utils.fromRaftPeerAddressToTEndPoint(raftPeer.getAddress()),
+ DEFAULT_PRIORITY));
}
}
@@ -603,8 +607,8 @@ class RatisConsensus implements IConsensus {
if (leaderId == null) {
return null;
}
- TEndPoint leaderEndpoint = Utils.formRaftPeerIdToTEndPoint(leaderId);
- return new Peer(groupId, leaderEndpoint);
+ int nodeId = Utils.fromRaftPeerIdToNodeId(leaderId);
+ return new Peer(groupId, nodeId, null);
}
@Override
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
index 582efd9f35..3754031fa0 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
@@ -66,44 +66,58 @@ public class Utils {
return groupCode;
}
- public static RaftPeerId fromTEndPointToRaftPeerId(TEndPoint endpoint) {
- return RaftPeerId.valueOf(fromTEndPointToString(endpoint));
+ public static RaftPeerId fromNodeIdToRaftPeerId(int nodeId) {
+ return RaftPeerId.valueOf(String.valueOf(nodeId));
}
- public static TEndPoint formRaftPeerIdToTEndPoint(RaftPeerId id) {
- String[] items = id.toString().split("_");
+ public static TEndPoint fromRaftPeerAddressToTEndPoint(String address) {
+ String[] items = address.split(":");
return new TEndPoint(items[0], Integer.parseInt(items[1]));
}
- public static TEndPoint formRaftPeerProtoToTEndPoint(RaftPeerProto proto) {
+ public static int fromRaftPeerIdToNodeId(RaftPeerId id) {
+ return Integer.parseInt(id.toString());
+ }
+
+ public static TEndPoint fromRaftPeerProtoToTEndPoint(RaftPeerProto proto) {
String[] items = proto.getAddress().split(":");
return new TEndPoint(items[0], Integer.parseInt(items[1]));
}
// priority is used as ordinal of leader election
- public static RaftPeer fromTEndPointAndPriorityToRaftPeer(TEndPoint
endpoint, int priority) {
+ public static RaftPeer fromNodeInfoAndPriorityToRaftPeer(
+ int nodeId, TEndPoint endpoint, int priority) {
return RaftPeer.newBuilder()
- .setId(fromTEndPointToRaftPeerId(endpoint))
+ .setId(fromNodeIdToRaftPeerId(nodeId))
.setAddress(HostAddress(endpoint))
.setPriority(priority)
.build();
}
- public static RaftPeer fromTEndPointAndPriorityToRaftPeer(Peer peer, int
priority) {
- return fromTEndPointAndPriorityToRaftPeer(peer.getEndpoint(), priority);
+ public static RaftPeer fromPeerAndPriorityToRaftPeer(Peer peer, int
priority) {
+ return fromNodeInfoAndPriorityToRaftPeer(peer.getNodeId(),
peer.getEndpoint(), priority);
}
public static List<RaftPeer> fromPeersAndPriorityToRaftPeers(List<Peer>
peers, int priority) {
return peers.stream()
- .map(peer -> Utils.fromTEndPointAndPriorityToRaftPeer(peer, priority))
+ .map(peer -> Utils.fromPeerAndPriorityToRaftPeer(peer, priority))
.collect(Collectors.toList());
}
+ public static int fromRaftPeerProtoToNodeId(RaftPeerProto proto) {
+ return Integer.parseInt(proto.getId().toStringUtf8());
+ }
+
public static List<Peer> fromRaftProtoListAndRaftGroupIdToPeers(
List<RaftPeerProto> raftProtoList, RaftGroupId id) {
ConsensusGroupId consensusGroupId =
Utils.fromRaftGroupIdToConsensusGroupId(id);
return raftProtoList.stream()
- .map(peer -> new Peer(consensusGroupId,
Utils.formRaftPeerProtoToTEndPoint(peer)))
+ .map(
+ peer ->
+ new Peer(
+ consensusGroupId,
+ Utils.fromRaftPeerProtoToNodeId(peer),
+ Utils.fromRaftPeerProtoToTEndPoint(peer)))
.collect(Collectors.toList());
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
index 27d6679cdc..76bb103bc6 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
@@ -62,13 +62,15 @@ class StandAloneConsensus implements IConsensus {
private final Logger logger =
LoggerFactory.getLogger(StandAloneConsensus.class);
private final TEndPoint thisNode;
+ private final int thisNodeId;
private final File storageDir;
private final IStateMachine.Registry registry;
private final Map<ConsensusGroupId, StandAloneServerImpl> stateMachineMap =
new ConcurrentHashMap<>();
public StandAloneConsensus(ConsensusConfig config, Registry registry) {
- this.thisNode = config.getThisNode();
+ this.thisNode = config.getThisNodeEndPoint();
+ this.thisNodeId = config.getThisNodeId();
this.storageDir = new File(config.getStorageDir());
this.registry = registry;
}
@@ -92,7 +94,8 @@ class StandAloneConsensus implements IConsensus {
Integer.parseInt(items[0]), Integer.parseInt(items[1]));
StandAloneServerImpl consensus =
new StandAloneServerImpl(
- new Peer(consensusGroupId, thisNode),
registry.apply(consensusGroupId));
+ new Peer(consensusGroupId, thisNodeId, thisNode),
+ registry.apply(consensusGroupId));
stateMachineMap.put(consensusGroupId, consensus);
consensus.start();
}
@@ -143,7 +146,7 @@ class StandAloneConsensus implements IConsensus {
.setException(new IllegalPeerNumException(consensusGroupSize))
.build();
}
- if (!peers.contains(new Peer(groupId, thisNode))) {
+ if (!peers.contains(new Peer(groupId, thisNodeId, thisNode))) {
return ConsensusGenericResponse.newBuilder()
.setException(new IllegalPeerEndpointException(thisNode, peers))
.build();
@@ -226,7 +229,7 @@ class StandAloneConsensus implements IConsensus {
if (!stateMachineMap.containsKey(groupId)) {
return null;
}
- return new Peer(groupId, thisNode);
+ return new Peer(groupId, thisNodeId, thisNode);
}
@Override
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
index 494a12ee1e..b59f4e1f53 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
@@ -49,7 +49,7 @@ public class StandAloneServerImpl implements IStateMachine {
public void start() {
stateMachine.start();
// Notify itself as the leader
- stateMachine.event().notifyLeaderChanged(peer.getGroupId(),
peer.getEndpoint());
+ stateMachine.event().notifyLeaderChanged(peer.getGroupId(),
peer.getNodeId());
}
@Override
diff --git
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
index a170b5a269..ca17b52cd5 100644
---
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
+++
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
@@ -51,9 +51,9 @@ public class MultiLeaderConsensusTest {
private final List<Peer> peers =
Arrays.asList(
- new Peer(gid, new TEndPoint("127.0.0.1", 6000)),
- new Peer(gid, new TEndPoint("127.0.0.1", 6001)),
- new Peer(gid, new TEndPoint("127.0.0.1", 6002)));
+ new Peer(gid, 1, new TEndPoint("127.0.0.1", 6000)),
+ new Peer(gid, 2, new TEndPoint("127.0.0.1", 6001)),
+ new Peer(gid, 3, new TEndPoint("127.0.0.1", 6002)));
private final List<File> peersStorage =
Arrays.asList(
@@ -90,6 +90,7 @@ public class MultiLeaderConsensusTest {
ConsensusFactory.getConsensusImpl(
ConsensusFactory.MultiLeaderConsensus,
ConsensusConfig.newBuilder()
+ .setThisNodeId(peers.get(i).getNodeId())
.setThisNode(peers.get(i).getEndpoint())
.setStorageDir(peersStorage.get(i).getAbsolutePath())
.build(),
diff --git
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/RecoveryTest.java
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/RecoveryTest.java
index 9b5943a534..1c38644fbc 100644
---
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/RecoveryTest.java
+++
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/RecoveryTest.java
@@ -49,6 +49,7 @@ public class RecoveryTest {
ConsensusFactory.getConsensusImpl(
ConsensusFactory.MultiLeaderConsensus,
ConsensusConfig.newBuilder()
+ .setThisNodeId(1)
.setThisNode(new TEndPoint("0.0.0.0", 9000))
.setStorageDir("target" + java.io.File.separator +
"recovery")
.build(),
@@ -77,7 +78,7 @@ public class RecoveryTest {
public void recoveryTest() throws Exception {
consensusImpl.createPeer(
schemaRegionId,
- Collections.singletonList(new Peer(schemaRegionId, new
TEndPoint("0.0.0.0", 9000))));
+ Collections.singletonList(new Peer(schemaRegionId, 1, new
TEndPoint("0.0.0.0", 9000))));
consensusImpl.deletePeer(schemaRegionId);
@@ -89,7 +90,7 @@ public class RecoveryTest {
ConsensusGenericResponse response =
consensusImpl.createPeer(
schemaRegionId,
- Collections.singletonList(new Peer(schemaRegionId, new
TEndPoint("0.0.0.0", 9000))));
+ Collections.singletonList(new Peer(schemaRegionId, 1, new
TEndPoint("0.0.0.0", 9000))));
Assert.assertTrue(response.isSuccess());
}
diff --git
a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
index e950fd753a..8d9d0615b5 100644
---
a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
+++
b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
@@ -75,6 +75,7 @@ public class RatisConsensusTest {
ConsensusFactory.getConsensusImpl(
ConsensusFactory.RatisConsensus,
ConsensusConfig.newBuilder()
+ .setThisNodeId(peers.get(i).getNodeId())
.setThisNode(peers.get(i).getEndpoint())
.setRatisConfig(config)
.setStorageDir(peersStorage.get(i).getAbsolutePath())
@@ -94,9 +95,9 @@ public class RatisConsensusTest {
public void setUp() throws IOException {
gid = new DataRegionId(1);
peers = new ArrayList<>();
- peers.add(new Peer(gid, new TEndPoint("127.0.0.1", 6000)));
- peers.add(new Peer(gid, new TEndPoint("127.0.0.1", 6001)));
- peers.add(new Peer(gid, new TEndPoint("127.0.0.1", 6002)));
+ peers.add(new Peer(gid, 1, new TEndPoint("127.0.0.1", 6000)));
+ peers.add(new Peer(gid, 2, new TEndPoint("127.0.0.1", 6001)));
+ peers.add(new Peer(gid, 3, new TEndPoint("127.0.0.1", 6002)));
peersStorage = new ArrayList<>();
peersStorage.add(new File("target" + java.io.File.separator + "1"));
peersStorage.add(new File("target" + java.io.File.separator + "2"));
diff --git
a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
index d383fe902b..0c66c38330 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
@@ -76,6 +76,7 @@ public class TestUtils {
private AtomicInteger integer;
private final Logger logger =
LoggerFactory.getLogger(IntegerCounter.class);
private TEndPoint leaderEndpoint;
+ private int leaderId;
private List<Peer> configuration;
@Override
@@ -131,11 +132,11 @@ public class TestUtils {
}
@Override
- public void notifyLeaderChanged(ConsensusGroupId groupId, TEndPoint
newLeader) {
- this.leaderEndpoint = newLeader;
+ public void notifyLeaderChanged(ConsensusGroupId groupId, int newLeaderId)
{
+ this.leaderId = newLeaderId;
System.out.println("---------newLeader-----------");
System.out.println(groupId);
- System.out.println(newLeader);
+ System.out.println(newLeaderId);
System.out.println("----------------------");
}
diff --git
a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java
b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java
index df88b7ffd2..6288439d12 100644
---
a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java
+++
b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java
@@ -48,6 +48,7 @@ public class RecoveryTest {
ConsensusFactory.getConsensusImpl(
ConsensusFactory.StandAloneConsensus,
ConsensusConfig.newBuilder()
+ .setThisNodeId(1)
.setThisNode(new TEndPoint("0.0.0.0", 9000))
.setStorageDir("target" + java.io.File.separator +
"recovery")
.build(),
@@ -76,7 +77,7 @@ public class RecoveryTest {
public void recoveryTest() throws Exception {
consensusImpl.createPeer(
schemaRegionId,
- Collections.singletonList(new Peer(schemaRegionId, new
TEndPoint("0.0.0.0", 9000))));
+ Collections.singletonList(new Peer(schemaRegionId, 1, new
TEndPoint("0.0.0.0", 9000))));
consensusImpl.stop();
consensusImpl = null;
@@ -86,7 +87,7 @@ public class RecoveryTest {
ConsensusGenericResponse response =
consensusImpl.createPeer(
schemaRegionId,
- Collections.singletonList(new Peer(schemaRegionId, new
TEndPoint("0.0.0.0", 9000))));
+ Collections.singletonList(new Peer(schemaRegionId, 1, new
TEndPoint("0.0.0.0", 9000))));
Assert.assertEquals(
response.getException().getMessage(),
diff --git
a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
index 47d58a8a7c..6a386c5e1a 100644
---
a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
+++
b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
@@ -128,6 +128,7 @@ public class StandAloneConsensusTest {
ConsensusFactory.getConsensusImpl(
ConsensusFactory.StandAloneConsensus,
ConsensusConfig.newBuilder()
+ .setThisNodeId(1)
.setThisNode(new TEndPoint("0.0.0.0", 6667))
.setStorageDir("target" + java.io.File.separator +
"standalone")
.build(),
@@ -160,14 +161,14 @@ public class StandAloneConsensusTest {
ConsensusGenericResponse response1 =
consensusImpl.createPeer(
dataRegionId,
- Collections.singletonList(new Peer(dataRegionId, new
TEndPoint("0.0.0.0", 6667))));
+ Collections.singletonList(new Peer(dataRegionId, 1, new
TEndPoint("0.0.0.0", 6667))));
assertTrue(response1.isSuccess());
assertNull(response1.getException());
ConsensusGenericResponse response2 =
consensusImpl.createPeer(
dataRegionId,
- Collections.singletonList(new Peer(dataRegionId, new
TEndPoint("0.0.0.0", 6667))));
+ Collections.singletonList(new Peer(dataRegionId, 1, new
TEndPoint("0.0.0.0", 6667))));
assertFalse(response2.isSuccess());
assertTrue(response2.getException() instanceof
ConsensusGroupAlreadyExistException);
@@ -175,22 +176,22 @@ public class StandAloneConsensusTest {
consensusImpl.createPeer(
dataRegionId,
Arrays.asList(
- new Peer(dataRegionId, new TEndPoint("0.0.0.0", 6667)),
- new Peer(dataRegionId, new TEndPoint("0.0.0.1", 6667))));
+ new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 6667)),
+ new Peer(dataRegionId, 1, new TEndPoint("0.0.0.1", 6667))));
assertFalse(response3.isSuccess());
assertTrue(response3.getException() instanceof IllegalPeerNumException);
ConsensusGenericResponse response4 =
consensusImpl.createPeer(
dataRegionId,
- Collections.singletonList(new Peer(dataRegionId, new
TEndPoint("0.0.0.1", 6667))));
+ Collections.singletonList(new Peer(dataRegionId, 1, new
TEndPoint("0.0.0.1", 6667))));
assertFalse(response4.isSuccess());
assertTrue(response4.getException() instanceof
IllegalPeerEndpointException);
ConsensusGenericResponse response5 =
consensusImpl.createPeer(
schemaRegionId,
- Collections.singletonList(new Peer(schemaRegionId, new
TEndPoint("0.0.0.0", 6667))));
+ Collections.singletonList(new Peer(schemaRegionId, 1, new
TEndPoint("0.0.0.0", 6667))));
assertTrue(response5.isSuccess());
assertNull(response5.getException());
}
@@ -204,7 +205,7 @@ public class StandAloneConsensusTest {
ConsensusGenericResponse response2 =
consensusImpl.createPeer(
dataRegionId,
- Collections.singletonList(new Peer(dataRegionId, new
TEndPoint("0.0.0.0", 6667))));
+ Collections.singletonList(new Peer(dataRegionId, 1, new
TEndPoint("0.0.0.0", 6667))));
assertTrue(response2.isSuccess());
assertNull(response2.getException());
@@ -216,7 +217,8 @@ public class StandAloneConsensusTest {
@Test
public void addPeer() {
ConsensusGenericResponse response =
- consensusImpl.addPeer(dataRegionId, new Peer(dataRegionId, new
TEndPoint("0.0.0.0", 6667)));
+ consensusImpl.addPeer(
+ dataRegionId, new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0",
6667)));
assertFalse(response.isSuccess());
}
@@ -224,7 +226,7 @@ public class StandAloneConsensusTest {
public void removePeer() {
ConsensusGenericResponse response =
consensusImpl.removePeer(
- dataRegionId, new Peer(dataRegionId, new TEndPoint("0.0.0.0",
6667)));
+ dataRegionId, new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0",
6667)));
assertFalse(response.isSuccess());
}
@@ -233,7 +235,7 @@ public class StandAloneConsensusTest {
ConsensusGenericResponse response =
consensusImpl.changePeer(
dataRegionId,
- Collections.singletonList(new Peer(dataRegionId, new
TEndPoint("0.0.0.0", 6667))));
+ Collections.singletonList(new Peer(dataRegionId, 1, new
TEndPoint("0.0.0.0", 6667))));
assertFalse(response.isSuccess());
}
@@ -241,7 +243,7 @@ public class StandAloneConsensusTest {
public void transferLeader() {
ConsensusGenericResponse response =
consensusImpl.transferLeader(
- dataRegionId, new Peer(dataRegionId, new TEndPoint("0.0.0.0",
6667)));
+ dataRegionId, new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0",
6667)));
assertFalse(response.isSuccess());
}
@@ -256,21 +258,21 @@ public class StandAloneConsensusTest {
ConsensusGenericResponse response1 =
consensusImpl.createPeer(
dataRegionId,
- Collections.singletonList(new Peer(dataRegionId, new
TEndPoint("0.0.0.0", 6667))));
+ Collections.singletonList(new Peer(dataRegionId, 1, new
TEndPoint("0.0.0.0", 6667))));
assertTrue(response1.isSuccess());
assertNull(response1.getException());
ConsensusGenericResponse response2 =
consensusImpl.createPeer(
schemaRegionId,
- Collections.singletonList(new Peer(schemaRegionId, new
TEndPoint("0.0.0.0", 6667))));
+ Collections.singletonList(new Peer(schemaRegionId, 1, new
TEndPoint("0.0.0.0", 6667))));
assertTrue(response2.isSuccess());
assertNull(response2.getException());
ConsensusGenericResponse response3 =
consensusImpl.createPeer(
configId,
- Collections.singletonList(new Peer(configId, new
TEndPoint("0.0.0.0", 6667))));
+ Collections.singletonList(new Peer(configId, 1, new
TEndPoint("0.0.0.0", 6667))));
assertTrue(response3.isSuccess());
assertNull(response3.getException());
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
index 292eebf642..5e6616e678 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -48,6 +48,8 @@ public class IoTDBConstant {
public static final String IOTDB_CONF = "IOTDB_CONF";
public static final String GLOBAL_DB_NAME = "IoTDB";
+ public static final String CONFIG_NODE_ID = "config_node_id";
+ public static final String DATA_NODE_ID = "data_node_id";
public static final String RPC_ADDRESS = "rpc_address";
public static final String RPC_PORT = "rpc_port";
public static final String INTERNAL_ADDRESS = "internal_address";
diff --git
a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index 2ebff31347..9aeef0be07 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
+import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
@@ -657,7 +658,7 @@ public class ConfigNodeClient
}
@Override
- public TSStatus registerConfigNode(TConfigNodeRegisterReq req) throws
TException {
+ public TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq
req) throws TException {
throw new TException("DataNode to ConfigNode client doesn't support
registerConfigNode.");
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index b611f442f8..05d5cb8831 100644
---
a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -60,6 +60,7 @@ public class DataRegionConsensusImpl {
ConsensusFactory.getConsensusImpl(
conf.getDataRegionConsensusProtocolClass(),
ConsensusConfig.newBuilder()
+ .setThisNodeId(conf.getDataNodeId())
.setThisNode(
new TEndPoint(
conf.getInternalAddress(),
conf.getDataRegionConsensusPort()))
diff --git
a/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
b/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
index 4ae4b63c42..cd41d45325 100644
---
a/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
@@ -58,6 +58,7 @@ public class SchemaRegionConsensusImpl {
ConsensusFactory.getConsensusImpl(
conf.getSchemaRegionConsensusProtocolClass(),
ConsensusConfig.newBuilder()
+ .setThisNodeId(conf.getDataNodeId())
.setThisNode(
new TEndPoint(
conf.getInternalAddress(),
conf.getSchemaRegionConsensusPort()))
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
b/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
index 09707a4e92..a1f73fdb14 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
@@ -271,7 +271,9 @@ public class RegionMigrateService implements IService {
if (!addPeerSucceed) {
Thread.sleep(SLEEP_MILLIS);
}
- resp = addRegionPeer(regionId, new Peer(regionId, newPeerNode));
+ resp =
+ addRegionPeer(
+ regionId, new Peer(regionId,
selectedDataNode.getDataNodeId(), newPeerNode));
addPeerSucceed = true;
} catch (Throwable e) {
addPeerSucceed = false;
@@ -378,7 +380,9 @@ public class RegionMigrateService implements IService {
if (!removePeerSucceed) {
Thread.sleep(SLEEP_MILLIS);
}
- resp = removeRegionPeer(regionId, new Peer(regionId, oldPeerNode));
+ resp =
+ removeRegionPeer(
+ regionId, new Peer(regionId,
selectedDataNode.getDataNodeId(), oldPeerNode));
removePeerSucceed = true;
} catch (Throwable e) {
removePeerSucceed = false;
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 63df1528e1..46f7b7fa0f 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -811,7 +811,7 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
TConsensusGroupId tgId = req.getRegionId();
ConsensusGroupId regionId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(tgId);
TEndPoint newNode = getConsensusEndPoint(req.getNewLeaderNode(), regionId);
- Peer newLeaderPeer = new Peer(regionId, newNode);
+ Peer newLeaderPeer = new Peer(regionId,
req.getNewLeaderNode().getDataNodeId(), newNode);
if (!isLeader(regionId)) {
LOGGER.info("region {} is not leader, no need to change leader",
regionId);
return status;
@@ -859,7 +859,12 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getRegionId());
List<Peer> peers =
req.getRegionLocations().stream()
- .map(location -> new Peer(regionId, getConsensusEndPoint(location,
regionId)))
+ .map(
+ location ->
+ new Peer(
+ regionId,
+ location.getDataNodeId(),
+ getConsensusEndPoint(location, regionId)))
.collect(Collectors.toList());
TSStatus status = createNewRegion(regionId, req.getStorageGroup(),
req.getTtl());
if (!isSucceed(status)) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeRegionManager.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeRegionManager.java
index 3a9ecdbc0a..18b56fceb5 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeRegionManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeRegionManager.java
@@ -116,7 +116,7 @@ public class DataNodeRegionManager {
new TEndPoint(
dataNodeLocation.getSchemaRegionConsensusEndPoint().getIp(),
dataNodeLocation.getSchemaRegionConsensusEndPoint().getPort());
- peers.add(new Peer(schemaRegionId, endpoint));
+ peers.add(new Peer(schemaRegionId, dataNodeLocation.getDataNodeId(),
endpoint));
}
ConsensusGenericResponse consensusGenericResponse =
SchemaRegionConsensusImpl.getInstance().createPeer(schemaRegionId,
peers);
@@ -152,7 +152,7 @@ public class DataNodeRegionManager {
new TEndPoint(
dataNodeLocation.getDataRegionConsensusEndPoint().getIp(),
dataNodeLocation.getDataRegionConsensusEndPoint().getPort());
- peers.add(new Peer(dataRegionId, endpoint));
+ peers.add(new Peer(dataRegionId, dataNodeLocation.getDataNodeId(),
endpoint));
}
ConsensusGenericResponse consensusGenericResponse =
DataRegionConsensusImpl.getInstance().createPeer(dataRegionId,
peers);
diff --git
a/server/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
b/server/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
index e3008808d0..7d11039ef6 100644
---
a/server/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
@@ -69,9 +69,13 @@ public class DataNodeInternalRPCServiceImplTest {
private static final IoTDBConfig conf =
IoTDBDescriptor.getInstance().getConfig();
DataNodeInternalRPCServiceImpl dataNodeInternalRPCServiceImpl;
static LocalConfigNode configNode;
+ private static final int dataNodeId = 0;
@BeforeClass
public static void setUpBeforeClass() throws IOException, MetadataException {
+ // In standalone mode, we need to set dataNodeId to 0 for RaftPeerId in
RatisConsensus
+ conf.setDataNodeId(dataNodeId);
+
IoTDB.configManager.init();
configNode = LocalConfigNode.getInstance();
configNode.getBelongedSchemaRegionIdWithAutoCreate(new
PartialPath("root.ln"));
@@ -332,7 +336,8 @@ public class DataNodeInternalRPCServiceImplTest {
// construct fragmentInstance
return new TRegionReplicaSet(
- new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 0),
dataNodeList);
+ new TConsensusGroupId(TConsensusGroupType.SchemaRegion,
conf.getDataNodeId()),
+ dataNodeList);
}
private List<Peer> genSchemaRegionPeerList(TRegionReplicaSet
regionReplicaSet) {
@@ -341,6 +346,7 @@ public class DataNodeInternalRPCServiceImplTest {
peerList.add(
new Peer(
new SchemaRegionId(regionReplicaSet.getRegionId().getId()),
+ node.getDataNodeId(),
node.getSchemaRegionConsensusEndPoint()));
}
return peerList;
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift
b/thrift-confignode/src/main/thrift/confignode.thrift
index 4f4fc495d5..1b9581a710 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -274,6 +274,11 @@ struct TConfigNodeRegisterReq {
13: required double diskSpaceWarningThreshold
}
+struct TConfigNodeRegisterResp {
+ 1: required common.TSStatus status
+ 2: required i32 configNodeId
+}
+
struct TAddConsensusGroupReq {
1: required list<common.TConfigNodeLocation> configNodeList
}
@@ -675,7 +680,7 @@ service IConfigNodeRPCService {
* ERROR_GLOBAL_CONFIG if some global configurations in the
Non-Seed-ConfigNode
* are inconsist with the ConfigNode-leader
*/
- common.TSStatus registerConfigNode(TConfigNodeRegisterReq req)
+ TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq req)
/** The ConfigNode-leader will guide the Non-Seed-ConfigNode to join the
ConsensusGroup when first startup */
common.TSStatus addConsensusGroup(TAddConsensusGroupReq req)
diff --git a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
index 380f30afd1..345fc70900 100644
--- a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
+++ b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
@@ -56,6 +56,7 @@ struct TSyncLogRes {
struct TBuildSyncLogChannelReq {
1: required common.TConsensusGroupId consensusGroupId
2: required common.TEndPoint endPoint
+ 3: required i32 nodeId
}
struct TBuildSyncLogChannelRes {
@@ -65,6 +66,7 @@ struct TBuildSyncLogChannelRes {
struct TRemoveSyncLogChannelReq {
1: required common.TConsensusGroupId consensusGroupId
2: required common.TEndPoint endPoint
+ 3: required i32 nodeId
}
struct TRemoveSyncLogChannelRes {