This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a968e7253  [IOTDB-4550] Add NodeId to Peer (#7531)
9a968e7253 is described below

commit 9a968e72534275e4d3f0180eebc2321969b41368
Author: Itami Sho <[email protected]>
AuthorDate: Wed Oct 12 22:04:58 2022 +0800

     [IOTDB-4550] Add NodeId to Peer (#7531)
---
 .../iotdb/confignode/conf/ConfigNodeConfig.java    | 13 +++++
 .../confignode/conf/SystemPropertiesUtils.java     | 10 ++++
 .../statemachine/PartitionRegionStateMachine.java  | 24 ++++++---
 .../iotdb/confignode/manager/ConfigManager.java    | 61 ++++++++++++++--------
 .../iotdb/confignode/manager/ConsensusManager.java | 19 +++++--
 .../apache/iotdb/confignode/manager/IManager.java  |  7 ++-
 .../iotdb/confignode/manager/node/NodeManager.java | 38 ++++++++++++--
 .../iotdb/confignode/persistence/NodeInfo.java     |  2 +
 .../iotdb/confignode/service/ConfigNode.java       | 35 ++++++++++---
 .../thrift/ConfigNodeRPCServiceProcessor.java      |  9 ++--
 .../thrift/ConfigNodeRPCServiceProcessorTest.java  |  4 +-
 .../org/apache/iotdb/consensus/IStateMachine.java  |  5 +-
 .../org/apache/iotdb/consensus/common/Peer.java    | 26 +++++++--
 .../iotdb/consensus/config/ConsensusConfig.java    | 22 ++++++--
 .../multileader/MultiLeaderConsensus.java          | 12 +++--
 .../multileader/MultiLeaderServerImpl.java         |  6 ++-
 .../service/MultiLeaderRPCServiceProcessor.java    |  4 +-
 .../ratis/ApplicationStateMachineProxy.java        |  2 +-
 .../iotdb/consensus/ratis/RatisConsensus.java      | 24 +++++----
 .../org/apache/iotdb/consensus/ratis/Utils.java    | 36 +++++++++----
 .../consensus/standalone/StandAloneConsensus.java  | 11 ++--
 .../consensus/standalone/StandAloneServerImpl.java |  2 +-
 .../multileader/MultiLeaderConsensusTest.java      |  7 +--
 .../iotdb/consensus/multileader/RecoveryTest.java  |  5 +-
 .../iotdb/consensus/ratis/RatisConsensusTest.java  |  7 +--
 .../apache/iotdb/consensus/ratis/TestUtils.java    |  7 +--
 .../iotdb/consensus/standalone/RecoveryTest.java   |  5 +-
 .../standalone/StandAloneConsensusTest.java        | 30 ++++++-----
 .../apache/iotdb/commons/conf/IoTDBConstant.java   |  2 +
 .../apache/iotdb/db/client/ConfigNodeClient.java   |  3 +-
 .../db/consensus/DataRegionConsensusImpl.java      |  1 +
 .../db/consensus/SchemaRegionConsensusImpl.java    |  1 +
 .../iotdb/db/service/RegionMigrateService.java     |  8 ++-
 .../impl/DataNodeInternalRPCServiceImpl.java       |  9 +++-
 .../service/thrift/impl/DataNodeRegionManager.java |  4 +-
 .../DataNodeInternalRPCServiceImplTest.java        |  8 ++-
 .../src/main/thrift/confignode.thrift              |  7 ++-
 .../src/main/thrift/mutlileader.thrift             |  2 +
 38 files changed, 345 insertions(+), 133 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 269ac05a5f..627ba6ccb5 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -29,6 +29,11 @@ import java.io.File;
 
 public class ConfigNodeConfig {
 
+  /**
+   * the config node id for cluster mode, the default value -1 should be 
changed after join cluster
+   */
+  private int configNodeId = 0;
+
   /** could set ip or hostname */
   private String internalAddress = "0.0.0.0";
 
@@ -209,6 +214,14 @@ public class ConfigNodeConfig {
     return dir;
   }
 
+  public int getConfigNodeId() {
+    return configNodeId;
+  }
+
+  public void setConfigNodeId(int configNodeId) {
+    this.configNodeId = configNodeId;
+  }
+
   public String getInternalAddress() {
     return internalAddress;
   }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
index abff028239..9c166589d1 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
@@ -239,6 +239,16 @@ public class SystemPropertiesUtils {
     storeSystemProperties(systemProperties);
   }
 
+  public static void storeConfigNodeId(int nodeId) throws IOException {
+    if (!systemPropertiesFile.exists()) {
+      return;
+    }
+
+    Properties systemProperties = getSystemProperties();
+    systemProperties.setProperty("config_node_id", String.valueOf(nodeId));
+    storeSystemProperties(systemProperties);
+  }
+
   private static synchronized Properties getSystemProperties() throws 
IOException {
     // Create confignode-system.properties file if necessary
     if (!systemPropertiesFile.exists()) {
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
index b45c933671..a901e5d187 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.confignode.consensus.statemachine;
 
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.auth.AuthException;
@@ -46,12 +47,14 @@ public class PartitionRegionStateMachine
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PartitionRegionStateMachine.class);
   private final ConfigPlanExecutor executor;
   private ConfigManager configManager;
-  private final TEndPoint currentNode;
+  private final TEndPoint currentNodeTEndPoint;
+  private final int currentNodeId;
 
   public PartitionRegionStateMachine(ConfigManager configManager, 
ConfigPlanExecutor executor) {
     this.executor = executor;
     this.configManager = configManager;
-    this.currentNode =
+    this.currentNodeId = 
ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId();
+    this.currentNodeTEndPoint =
         new TEndPoint()
             
.setIp(ConfigNodeDescriptor.getInstance().getConf().getInternalAddress())
             
.setPort(ConfigNodeDescriptor.getInstance().getConf().getConsensusPort());
@@ -145,16 +148,25 @@ public class PartitionRegionStateMachine
   }
 
   @Override
-  public void notifyLeaderChanged(ConsensusGroupId groupId, TEndPoint 
newLeader) {
-    if (currentNode.equals(newLeader)) {
-      LOGGER.info("Current node {} becomes Leader", newLeader);
+  public void notifyLeaderChanged(ConsensusGroupId groupId, int newLeaderId) {
+    TConfigNodeLocation newLeaderConfigNodeLocation =
+        configManager.getConfigNodeLocation(currentNodeId);
+    if (currentNodeId == newLeaderId) {
+      LOGGER.info(
+          "Current node [nodeId: {}, ip:port: {}] becomes Leader",
+          newLeaderId,
+          currentNodeTEndPoint);
       configManager.getProcedureManager().shiftExecutor(true);
       configManager.getLoadManager().startLoadBalancingService();
       configManager.getNodeManager().startHeartbeatService();
       configManager.getPartitionManager().startRegionCleaner();
     } else {
       LOGGER.info(
-          "Current node {} is not longer the leader, the new leader is {}", 
currentNode, newLeader);
+          "Current node [nodeId:{}, ip:port: {}] is not longer the leader, the 
new leader is [nodeId:{}, ip:port: {}]",
+          currentNodeId,
+          currentNodeTEndPoint,
+          newLeaderId,
+          newLeaderConfigNodeLocation.getInternalEndPoint());
       configManager.getProcedureManager().shiftExecutor(false);
       configManager.getLoadManager().stopLoadBalancingService();
       configManager.getNodeManager().stopHeartbeatService();
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index c1bde6dbc2..ad87b1f7a5 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -87,6 +87,7 @@ import 
org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
 import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo;
 import org.apache.iotdb.confignode.persistence.sync.ClusterSyncInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
+import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
@@ -143,7 +144,7 @@ public class ConfigManager implements IManager {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ConfigManager.class);
 
   /** Manage PartitionTable read/write requests through the ConsensusLayer */
-  private final ConsensusManager consensusManager;
+  private ConsensusManager consensusManager;
 
   /** Manage cluster node */
   private final NodeManager nodeManager;
@@ -170,6 +171,8 @@ public class ConfigManager implements IManager {
   /** Sync */
   private final SyncManager syncManager;
 
+  private final PartitionRegionStateMachine stateMachine;
+
   public ConfigManager() throws IOException {
     // Build the persistence module
     NodeInfo nodeInfo = new NodeInfo();
@@ -192,7 +195,7 @@ public class ConfigManager implements IManager {
             udfInfo,
             triggerInfo,
             syncInfo);
-    PartitionRegionStateMachine stateMachine = new 
PartitionRegionStateMachine(this, executor);
+    this.stateMachine = new PartitionRegionStateMachine(this, executor);
 
     // Build the manager module
     this.nodeManager = new NodeManager(this, nodeInfo);
@@ -204,10 +207,10 @@ public class ConfigManager implements IManager {
     this.triggerManager = new TriggerManager(this, triggerInfo);
     this.loadManager = new LoadManager(this);
     this.syncManager = new SyncManager(this, syncInfo);
+  }
 
-    // ConsensusManager must be initialized last, as it would load states from 
disk and reinitialize
-    // above managers
-    this.consensusManager = new ConsensusManager(this, stateMachine);
+  public void initConsensusManager() throws IOException {
+    this.consensusManager = new ConsensusManager(this, this.stateMachine);
   }
 
   public void close() throws IOException {
@@ -665,24 +668,11 @@ public class ConfigManager implements IManager {
   }
 
   @Override
-  public TSStatus registerConfigNode(TConfigNodeRegisterReq req) {
-    // Check global configuration
-    TSStatus status = confirmLeader();
-
-    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      TSStatus errorStatus = checkConfigNodeGlobalConfig(req);
-      if (errorStatus != null) {
-        return errorStatus;
-      }
-
-      procedureManager.addConfigNode(req);
-      return StatusUtils.OK;
-    }
-
-    return status;
+  public TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq 
req) {
+    return nodeManager.registerConfigNode(req);
   }
 
-  private TSStatus checkConfigNodeGlobalConfig(TConfigNodeRegisterReq req) {
+  public TSStatus checkConfigNodeGlobalConfig(TConfigNodeRegisterReq req) {
     final String errorPrefix = "Reject register, please ensure that the 
parameter ";
     final String errorSuffix = " is consistent with the Seed-ConfigNode.";
 
@@ -735,8 +725,23 @@ public class ConfigManager implements IManager {
 
   @Override
   public TSStatus createPeerForConsensusGroup(List<TConfigNodeLocation> 
configNodeLocations) {
-    consensusManager.createPeerForConsensusGroup(configNodeLocations);
-    return StatusUtils.OK;
+    for (int i = 0; i < 30; i++) {
+      try {
+        if (consensusManager == null) {
+          Thread.sleep(1000);
+        } else {
+          consensusManager.createPeerForConsensusGroup(configNodeLocations);
+          return StatusUtils.OK;
+        }
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        LOGGER.warn("Unexpected interruption during retry creating peer for 
consensus group");
+      } catch (Exception e) {
+        LOGGER.error("Failed to create peer for consensus group", e);
+        break;
+      }
+    }
+    return StatusUtils.INTERNAL_ERROR;
   }
 
   @Override
@@ -1163,4 +1168,14 @@ public class ConfigManager implements IManager {
     }
     return filteredRegionReplicaSets;
   }
+
+  public TConfigNodeLocation getConfigNodeLocation(int nodeId) {
+    List<TConfigNodeLocation> configNodeLocations = 
this.nodeManager.getRegisteredConfigNodes();
+    for (TConfigNodeLocation configNodeLocation : configNodeLocations) {
+      if (configNodeLocation.getConfigNodeId() == nodeId) {
+        return configNodeLocation;
+      }
+    }
+    return null;
+  }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
index 0b35146163..99dd15d077 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
@@ -84,6 +84,7 @@ public class ConsensusManager {
         ConsensusFactory.getConsensusImpl(
                 CONF.getConfigNodeConsensusProtocolClass(),
                 ConsensusConfig.newBuilder()
+                    .setThisNodeId(CONF.getConfigNodeId())
                     .setThisNode(new TEndPoint(CONF.getInternalAddress(), 
CONF.getConsensusPort()))
                     .setRatisConfig(
                         RatisConfig.newBuilder()
@@ -172,7 +173,11 @@ public class ConsensusManager {
 
     List<Peer> peerList = new ArrayList<>();
     for (TConfigNodeLocation configNodeLocation : configNodeLocations) {
-      peerList.add(new Peer(consensusGroupId, 
configNodeLocation.getConsensusEndPoint()));
+      peerList.add(
+          new Peer(
+              consensusGroupId,
+              configNodeLocation.getConfigNodeId(),
+              configNodeLocation.getConsensusEndPoint()));
     }
     consensusImpl.createPeer(consensusGroupId, peerList);
   }
@@ -188,7 +193,10 @@ public class ConsensusManager {
         consensusImpl
             .addPeer(
                 consensusGroupId,
-                new Peer(consensusGroupId, 
configNodeLocation.getConsensusEndPoint()))
+                new Peer(
+                    consensusGroupId,
+                    configNodeLocation.getConfigNodeId(),
+                    configNodeLocation.getConsensusEndPoint()))
             .isSuccess();
 
     if (!result) {
@@ -207,7 +215,10 @@ public class ConsensusManager {
     return consensusImpl
         .removePeer(
             consensusGroupId,
-            new Peer(consensusGroupId, 
tConfigNodeLocation.getConsensusEndPoint()))
+            new Peer(
+                consensusGroupId,
+                tConfigNodeLocation.getConfigNodeId(),
+                tConfigNodeLocation.getConsensusEndPoint()))
         .isSuccess();
   }
 
@@ -234,7 +245,7 @@ public class ConsensusManager {
             getNodeManager().getRegisteredConfigNodes();
         TConfigNodeLocation leaderLocation =
             registeredConfigNodes.stream()
-                .filter(leader -> 
leader.getConsensusEndPoint().equals(leaderPeer.getEndpoint()))
+                .filter(leader -> leader.getConfigNodeId() == 
leaderPeer.getNodeId())
                 .findFirst()
                 .orElse(null);
         if (leaderLocation != null) {
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 514cc03ba7..45707f8943 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -47,6 +47,7 @@ import org.apache.iotdb.confignode.manager.load.LoadManager;
 import org.apache.iotdb.confignode.manager.node.NodeManager;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
+import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
@@ -275,9 +276,9 @@ public interface IManager {
   /**
    * Register ConfigNode when it is first startup
    *
-   * @return TSStatus
+   * @return TConfigNodeRegisterResp
    */
-  TSStatus registerConfigNode(TConfigNodeRegisterReq req);
+  TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq req);
 
   /**
    * Create peer in new node to build consensus group.
@@ -461,4 +462,6 @@ public interface IManager {
   TGetTimeSlotListResp getTimeSlotList(GetTimeSlotListPlan plan);
 
   TGetSeriesSlotListResp getSeriesSlotList(GetSeriesSlotListPlan plan);
+
+  TSStatus checkConfigNodeGlobalConfig(TConfigNodeRegisterReq req);
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index df31b8c922..7cb7159060 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -32,6 +32,7 @@ import 
org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.confignode.client.DataNodeRequestType;
 import 
org.apache.iotdb.confignode.client.async.AsyncConfigNodeHeartbeatClientPool;
 import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
@@ -60,6 +61,8 @@ import org.apache.iotdb.confignode.persistence.NodeInfo;
 import org.apache.iotdb.confignode.persistence.metric.NodeInfoMetrics;
 import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
+import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
 import org.apache.iotdb.confignode.rpc.thrift.TRatisConfig;
@@ -100,6 +103,9 @@ public class NodeManager {
   public static final TEndPoint CURRENT_NODE =
       new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort());
 
+  // when fail to register a new node, set node id to -1
+  private static final int ERROR_STATUS_NODE_ID = -1;
+
   private final IManager configManager;
   private final NodeInfo nodeInfo;
 
@@ -247,6 +253,28 @@ public class NodeManager {
     return dataSet;
   }
 
+  public TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq 
req) {
+    // Check global configuration
+    TSStatus status = configManager.getConsensusManager().confirmLeader();
+
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      TSStatus errorStatus = configManager.checkConfigNodeGlobalConfig(req);
+      if (errorStatus != null) {
+        return new TConfigNodeRegisterResp()
+            .setStatus(errorStatus)
+            .setConfigNodeId(ERROR_STATUS_NODE_ID);
+      }
+
+      int nodeId = generateNodeId();
+      req.getConfigNodeLocation().setConfigNodeId(nodeId);
+
+      configManager.getProcedureManager().addConfigNode(req);
+      return new 
TConfigNodeRegisterResp().setStatus(StatusUtils.OK).setConfigNodeId(nodeId);
+    }
+
+    return new 
TConfigNodeRegisterResp().setStatus(status).setConfigNodeId(ERROR_STATUS_NODE_ID);
+  }
+
   /**
    * Get TDataNodeConfiguration
    *
@@ -383,8 +411,6 @@ public class NodeManager {
    * @param configNodeLocation The new ConfigNode
    */
   public void applyConfigNode(TConfigNodeLocation configNodeLocation) {
-    // Generate new ConfigNode's index
-    configNodeLocation.setConfigNodeId(nodeInfo.generateNextNodeId());
     ApplyConfigNodePlan applyConfigNodePlan = new 
ApplyConfigNodePlan(configNodeLocation);
     getConsensusManager().write(applyConfigNodePlan);
   }
@@ -447,7 +473,9 @@ public class NodeManager {
     ConsensusGenericResponse resp =
         getConsensusManager()
             .getConsensusImpl()
-            .transferLeader(groupId, new Peer(groupId, 
newLeader.getConsensusEndPoint()));
+            .transferLeader(
+                groupId,
+                new Peer(groupId, newLeader.getConfigNodeId(), 
newLeader.getConsensusEndPoint()));
     if (!resp.isSuccess()) {
       return new 
TSStatus(TSStatusCode.REMOVE_CONFIGNODE_FAILED.getStatusCode())
           .setMessage("Remove ConfigNode failed because transfer ConfigNode 
leader failed.");
@@ -750,4 +778,8 @@ public class NodeManager {
   private PartitionManager getPartitionManager() {
     return configManager.getPartitionManager();
   }
+
+  public int generateNodeId() {
+    return nodeInfo.generateNextNodeId();
+  }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
index 7adcf7e389..2d3876a3f3 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
@@ -305,6 +305,8 @@ public class NodeInfo implements SnapshotProcessor {
 
       registeredConfigNodes.add(applyConfigNodePlan.getConfigNodeLocation());
       SystemPropertiesUtils.storeConfigNodeList(new 
ArrayList<>(registeredConfigNodes));
+      SystemPropertiesUtils.storeConfigNodeId(
+          applyConfigNodePlan.getConfigNodeLocation().getConfigNodeId());
       LOGGER.info(
           "Successfully apply ConfigNode: {}. Current ConfigNodeGroup: {}",
           applyConfigNodePlan.getConfigNodeLocation(),
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java 
b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index 6d5411c47f..501bd7b5e1 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
+import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
 import org.apache.iotdb.confignode.service.thrift.ConfigNodeRPCService;
 import 
org.apache.iotdb.confignode.service.thrift.ConfigNodeRPCServiceProcessor;
 import org.apache.iotdb.db.service.metrics.MetricService;
@@ -55,6 +56,10 @@ public class ConfigNode implements ConfigNodeMBean {
 
   private static final int SCHEDULE_WAITING_RETRY_NUM = 20;
 
+  private static final int SEED_CONFIG_NODE_ID = 0;
+
+  private static final int INIT_NON_SEED_CONFIG_NODE_ID = -1;
+
   private final String mbeanName =
       String.format(
           "%s:%s=%s",
@@ -90,13 +95,16 @@ public class ConfigNode implements ConfigNodeMBean {
 
       /* Initial startup of Seed-ConfigNode */
       if (ConfigNodeDescriptor.getInstance().isSeedConfigNode()) {
+        configManager.initConsensusManager();
+
         SystemPropertiesUtils.storeSystemParameters();
+        SystemPropertiesUtils.storeConfigNodeId(SEED_CONFIG_NODE_ID);
         // Seed-ConfigNode should apply itself when first start
         configManager
             .getNodeManager()
             .applyConfigNode(
                 new TConfigNodeLocation(
-                    0,
+                    SEED_CONFIG_NODE_ID,
                     new TEndPoint(CONF.getInternalAddress(), 
CONF.getInternalPort()),
                     new TEndPoint(CONF.getInternalAddress(), 
CONF.getConsensusPort())));
         // We always set up Seed-ConfigNode's RPC service lastly to ensure that
@@ -185,7 +193,7 @@ public class ConfigNode implements ConfigNodeMBean {
     TConfigNodeRegisterReq req =
         new TConfigNodeRegisterReq(
             new TConfigNodeLocation(
-                -1,
+                INIT_NON_SEED_CONFIG_NODE_ID,
                 new TEndPoint(CONF.getInternalAddress(), 
CONF.getInternalPort()),
                 new TEndPoint(CONF.getInternalAddress(), 
CONF.getConsensusPort())),
             CONF.getDataRegionConsensusProtocolClass(),
@@ -208,12 +216,25 @@ public class ConfigNode implements ConfigNodeMBean {
     }
 
     for (int retry = 0; retry < 3; retry++) {
-      TSStatus status =
-          (TSStatus)
-              SyncConfigNodeClientPool.getInstance()
-                  .sendSyncRequestToConfigNodeWithRetry(
-                      targetConfigNode, req, 
ConfigNodeRequestType.REGISTER_CONFIG_NODE);
+      TSStatus status;
+      TConfigNodeRegisterResp resp = null;
+      Object obj =
+          SyncConfigNodeClientPool.getInstance()
+              .sendSyncRequestToConfigNodeWithRetry(
+                  targetConfigNode, req, 
ConfigNodeRequestType.REGISTER_CONFIG_NODE);
+
+      if (obj instanceof TConfigNodeRegisterResp) {
+        resp = (TConfigNodeRegisterResp) obj;
+        status = resp.getStatus();
+      } else {
+        status = (TSStatus) obj;
+      }
+
       if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        SystemPropertiesUtils.storeConfigNodeId(resp.getConfigNodeId());
+        CONF.setConfigNodeId(resp.getConfigNodeId());
+
+        configManager.initConsensusManager();
         return;
       } else if (status.getCode() == 
TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
         targetConfigNode = status.getRedirectNode();
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 8381d6e2cc..ace66de37e 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -70,6 +70,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
 import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
+import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
@@ -400,13 +401,13 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
   }
 
   @Override
-  public TSStatus registerConfigNode(TConfigNodeRegisterReq req) {
-    TSStatus status = configManager.registerConfigNode(req);
+  public TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq 
req) throws TException {
+    TConfigNodeRegisterResp resp = configManager.registerConfigNode(req);
 
     // Print log to record the ConfigNode that performs the 
RegisterConfigNodeRequest
-    LOGGER.info("Execute RegisterConfigNodeRequest {} with result {}", req, 
status);
+    LOGGER.info("Execute RegisterConfigNodeRequest {} with result {}", req, 
resp.getStatus());
 
-    return status;
+    return resp;
   }
 
   @Override
diff --git 
a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
 
b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
index 1ce234806f..50a1e800d2 100644
--- 
a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
+++ 
b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
@@ -87,7 +87,9 @@ public class ConfigNodeRPCServiceProcessorTest {
 
   @Before
   public void before() throws IOException {
-    processor = new ConfigNodeRPCServiceProcessor(new ConfigManager());
+    ConfigManager configManager = new ConfigManager();
+    configManager.initConsensusManager();
+    processor = new ConfigNodeRPCServiceProcessor(configManager);
     processor.getConsensusManager().singleCopyMayWaitUntilLeaderReady();
   }
 
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java 
b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
index f0b489e2ad..4641898a0e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.consensus;
 
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.consensus.common.DataSet;
@@ -132,9 +131,9 @@ public interface IStateMachine {
      * can possibly be this server.
      *
      * @param groupId The id of this consensus group.
-     * @param newLeader The id of the new leader.
+     * @param newLeaderId The id of the new leader node.
      */
-    default void notifyLeaderChanged(ConsensusGroupId groupId, TEndPoint 
newLeader) {}
+    default void notifyLeaderChanged(ConsensusGroupId groupId, int 
newLeaderId) {}
 
     /**
      * Notify the {@link IStateMachine} a configuration change. This method 
will be invoked when a
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java 
b/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java
index ad962c2525..c7466206f1 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java
@@ -21,20 +21,28 @@ package org.apache.iotdb.consensus.common;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil;
 import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.DataOutputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Objects;
 
 // TODO Use a mature IDL framework such as Protobuf to manage this structure
 public class Peer {
 
+  private final Logger LOGGER = LoggerFactory.getLogger(Peer.class);
   private final ConsensusGroupId groupId;
   private final TEndPoint endpoint;
+  private final int nodeId;
 
-  public Peer(ConsensusGroupId groupId, TEndPoint endpoint) {
+  public Peer(ConsensusGroupId groupId, int nodeId, TEndPoint endpoint) {
     this.groupId = groupId;
+    this.nodeId = nodeId;
     this.endpoint = endpoint;
   }
 
@@ -46,16 +54,26 @@ public class Peer {
     return endpoint;
   }
 
+  public int getNodeId() {
+    return nodeId;
+  }
+
   public void serialize(DataOutputStream stream) {
-    ThriftCommonsSerDeUtils.serializeTConsensusGroupId(
-        groupId.convertToTConsensusGroupId(), stream);
-    ThriftCommonsSerDeUtils.serializeTEndPoint(endpoint, stream);
+    try {
+      ThriftCommonsSerDeUtils.serializeTConsensusGroupId(
+          groupId.convertToTConsensusGroupId(), stream);
+      BasicStructureSerDeUtil.write(nodeId, stream);
+      ThriftCommonsSerDeUtils.serializeTEndPoint(endpoint, stream);
+    } catch (IOException e) {
+      LOGGER.error("Failed to serialize Peer", e);
+    }
   }
 
   public static Peer deserialize(ByteBuffer buffer) {
     return new Peer(
         ConsensusGroupId.Factory.createFromTConsensusGroupId(
             ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(buffer)),
+        BasicStructureSerDeUtil.readInt(buffer),
         ThriftCommonsSerDeUtils.deserializeTEndPoint(buffer));
   }
 
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
index be3b4bf342..42527275ea 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
@@ -23,24 +23,31 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 
 public class ConsensusConfig {
 
-  private final TEndPoint thisNode;
+  private final TEndPoint thisNodeEndPoint;
+  private final int thisNodeId;
   private final String storageDir;
   private final RatisConfig ratisConfig;
   private final MultiLeaderConfig multiLeaderConfig;
 
   private ConsensusConfig(
       TEndPoint thisNode,
+      int thisNodeId,
       String storageDir,
       RatisConfig ratisConfig,
       MultiLeaderConfig multiLeaderConfig) {
-    this.thisNode = thisNode;
+    this.thisNodeEndPoint = thisNode;
+    this.thisNodeId = thisNodeId;
     this.storageDir = storageDir;
     this.ratisConfig = ratisConfig;
     this.multiLeaderConfig = multiLeaderConfig;
   }
 
-  public TEndPoint getThisNode() {
-    return thisNode;
+  public TEndPoint getThisNodeEndPoint() {
+    return thisNodeEndPoint;
+  }
+
+  public int getThisNodeId() {
+    return thisNodeId;
   }
 
   public String getStorageDir() {
@@ -62,6 +69,7 @@ public class ConsensusConfig {
   public static class Builder {
 
     private TEndPoint thisNode;
+    private int thisNodeId;
     private String storageDir;
     private RatisConfig ratisConfig;
     private MultiLeaderConfig multiLeaderConfig;
@@ -69,6 +77,7 @@ public class ConsensusConfig {
     public ConsensusConfig build() {
       return new ConsensusConfig(
           thisNode,
+          thisNodeId,
           storageDir,
           ratisConfig != null ? ratisConfig : RatisConfig.newBuilder().build(),
           multiLeaderConfig != null ? multiLeaderConfig : 
MultiLeaderConfig.newBuilder().build());
@@ -79,6 +88,11 @@ public class ConsensusConfig {
       return this;
     }
 
+    public Builder setThisNodeId(int thisNodeId) {
+      this.thisNodeId = thisNodeId;
+      return this;
+    }
+
     public Builder setStorageDir(String storageDir) {
       this.storageDir = storageDir;
       return this;
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
index 2f35bdfe71..a5a5c9d0d3 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
@@ -71,6 +71,7 @@ public class MultiLeaderConsensus implements IConsensus {
   private final Logger logger = 
LoggerFactory.getLogger(MultiLeaderConsensus.class);
 
   private final TEndPoint thisNode;
+  private final int thisNodeId;
   private final File storageDir;
   private final IStateMachine.Registry registry;
   private final Map<ConsensusGroupId, MultiLeaderServerImpl> stateMachineMap =
@@ -82,7 +83,8 @@ public class MultiLeaderConsensus implements IConsensus {
   private final IClientManager<TEndPoint, SyncMultiLeaderServiceClient> 
syncClientManager;
 
   public MultiLeaderConsensus(ConsensusConfig config, Registry registry) {
-    this.thisNode = config.getThisNode();
+    this.thisNode = config.getThisNodeEndPoint();
+    this.thisNodeId = config.getThisNodeId();
     this.storageDir = new File(config.getStorageDir());
     this.config = config.getMultiLeaderConfig();
     this.registry = registry;
@@ -126,7 +128,7 @@ public class MultiLeaderConsensus implements IConsensus {
           MultiLeaderServerImpl consensus =
               new MultiLeaderServerImpl(
                   path.toString(),
-                  new Peer(consensusGroupId, thisNode),
+                  new Peer(consensusGroupId, thisNodeId, thisNode),
                   new ArrayList<>(),
                   registry.apply(consensusGroupId),
                   clientManager,
@@ -188,7 +190,7 @@ public class MultiLeaderConsensus implements IConsensus {
           .setException(new IllegalPeerNumException(consensusGroupSize))
           .build();
     }
-    if (!peers.contains(new Peer(groupId, thisNode))) {
+    if (!peers.contains(new Peer(groupId, thisNodeId, thisNode))) {
       return ConsensusGenericResponse.newBuilder()
           .setException(new IllegalPeerEndpointException(thisNode, peers))
           .build();
@@ -206,7 +208,7 @@ public class MultiLeaderConsensus implements IConsensus {
           MultiLeaderServerImpl impl =
               new MultiLeaderServerImpl(
                   path,
-                  new Peer(groupId, thisNode),
+                  new Peer(groupId, thisNodeId, thisNode),
                   peers,
                   registry.apply(groupId),
                   clientManager,
@@ -345,7 +347,7 @@ public class MultiLeaderConsensus implements IConsensus {
     if (!stateMachineMap.containsKey(groupId)) {
       return null;
     }
-    return new Peer(groupId, thisNode);
+    return new Peer(groupId, thisNodeId, thisNode);
   }
 
   @Override
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index c9c71aabca..aaeb63d788 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -355,7 +355,8 @@ public class MultiLeaderServerImpl {
               client.buildSyncLogChannel(
                   new TBuildSyncLogChannelReq(
                       targetPeer.getGroupId().convertToTConsensusGroupId(),
-                      targetPeer.getEndpoint()));
+                      targetPeer.getEndpoint(),
+                      targetPeer.getNodeId()));
           if (!isSuccess(res.status)) {
             throw new ConsensusGroupAddPeerException(
                 String.format("build sync log channel failed from %s to %s", 
peer, targetPeer));
@@ -397,7 +398,8 @@ public class MultiLeaderServerImpl {
               client.removeSyncLogChannel(
                   new TRemoveSyncLogChannelReq(
                       targetPeer.getGroupId().convertToTConsensusGroupId(),
-                      targetPeer.getEndpoint()));
+                      targetPeer.getEndpoint(),
+                      targetPeer.getNodeId()));
           if (!isSuccess(res.status)) {
             throw new ConsensusGroupAddPeerException(
                 String.format("remove sync log channel failed from %s to %s", 
peer, targetPeer));
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index 2544451785..3f045e5e66 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -193,7 +193,7 @@ public class MultiLeaderRPCServiceProcessor implements 
MultiLeaderConsensusIServ
     }
     TSStatus responseStatus;
     try {
-      impl.buildSyncLogChannel(new Peer(groupId, req.endPoint));
+      impl.buildSyncLogChannel(new Peer(groupId, req.nodeId, req.endPoint));
       responseStatus = new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     } catch (ConsensusGroupAddPeerException e) {
       responseStatus = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
@@ -220,7 +220,7 @@ public class MultiLeaderRPCServiceProcessor implements 
MultiLeaderConsensusIServ
     }
     TSStatus responseStatus;
     try {
-      impl.removeSyncLogChannel(new Peer(groupId, req.endPoint));
+      impl.removeSyncLogChannel(new Peer(groupId, req.nodeId, req.endPoint));
       responseStatus = new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     } catch (ConsensusGroupAddPeerException e) {
       responseStatus = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
index 45fa3f5e7d..938a49b4c4 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
@@ -257,7 +257,7 @@ public class ApplicationStateMachineProxy extends 
BaseStateMachine {
         .event()
         .notifyLeaderChanged(
             
Utils.fromRaftGroupIdToConsensusGroupId(groupMemberId.getGroupId()),
-            Utils.formRaftPeerIdToTEndPoint(newLeaderId));
+            Utils.fromRaftPeerIdToNodeId(newLeaderId));
   }
 
   @Override
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java 
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 61e61dd9e4..1e9eeda0dd 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -119,13 +119,15 @@ class RatisConsensus implements IConsensus {
 
   public RatisConsensus(ConsensusConfig config, IStateMachine.Registry 
registry)
       throws IOException {
-    myself = Utils.fromTEndPointAndPriorityToRaftPeer(config.getThisNode(), 
DEFAULT_PRIORITY);
+    myself =
+        Utils.fromNodeInfoAndPriorityToRaftPeer(
+            config.getThisNodeId(), config.getThisNodeEndPoint(), 
DEFAULT_PRIORITY);
 
     System.setProperty(
         
"org.apache.ratis.thirdparty.io.netty.allocator.useCacheForAllThreads", 
"false");
     RaftServerConfigKeys.setStorageDir(
         properties, Collections.singletonList(new 
File(config.getStorageDir())));
-    GrpcConfigKeys.Server.setPort(properties, config.getThisNode().getPort());
+    GrpcConfigKeys.Server.setPort(properties, 
config.getThisNodeEndPoint().getPort());
 
     Utils.initRatisConfig(properties, config.getRatisConfig());
     this.config = config.getRatisConfig();
@@ -267,7 +269,7 @@ class RatisConsensus implements IConsensus {
     }
 
     if (suggestedLeader != null) {
-      TEndPoint leaderEndPoint = 
Utils.formRaftPeerIdToTEndPoint(suggestedLeader.getId());
+      TEndPoint leaderEndPoint = 
Utils.fromRaftPeerAddressToTEndPoint(suggestedLeader.getAddress());
       writeResult.setRedirectNode(new TEndPoint(leaderEndPoint.getIp(), 
leaderEndPoint.getPort()));
     }
 
@@ -382,7 +384,7 @@ class RatisConsensus implements IConsensus {
   public ConsensusGenericResponse addPeer(ConsensusGroupId groupId, Peer peer) 
{
     RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
     RaftGroup group = getGroupInfo(raftGroupId);
-    RaftPeer peerToAdd = Utils.fromTEndPointAndPriorityToRaftPeer(peer, 
DEFAULT_PRIORITY);
+    RaftPeer peerToAdd = Utils.fromPeerAndPriorityToRaftPeer(peer, 
DEFAULT_PRIORITY);
 
     // pre-conditions: group exists and myself in this group
     if (group == null || !group.getPeers().contains(myself)) {
@@ -417,7 +419,7 @@ class RatisConsensus implements IConsensus {
   public ConsensusGenericResponse removePeer(ConsensusGroupId groupId, Peer 
peer) {
     RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
     RaftGroup group = getGroupInfo(raftGroupId);
-    RaftPeer peerToRemove = Utils.fromTEndPointAndPriorityToRaftPeer(peer, 
DEFAULT_PRIORITY);
+    RaftPeer peerToRemove = Utils.fromPeerAndPriorityToRaftPeer(peer, 
DEFAULT_PRIORITY);
 
     // pre-conditions: group exists and myself in this group
     if (group == null || !group.getPeers().contains(myself)) {
@@ -487,7 +489,7 @@ class RatisConsensus implements IConsensus {
       return failed(new ConsensusGroupNotExistException(groupId));
     }
 
-    RaftPeer newRaftLeader = 
Utils.fromTEndPointAndPriorityToRaftPeer(newLeader, LEADER_PRIORITY);
+    RaftPeer newRaftLeader = Utils.fromPeerAndPriorityToRaftPeer(newLeader, 
LEADER_PRIORITY);
 
     ArrayList<RaftPeer> newConfiguration = new ArrayList<>();
     for (RaftPeer raftPeer : raftGroup.getPeers()) {
@@ -496,8 +498,10 @@ class RatisConsensus implements IConsensus {
       } else {
         // degrade every other peer to default priority
         newConfiguration.add(
-            Utils.fromTEndPointAndPriorityToRaftPeer(
-                Utils.formRaftPeerIdToTEndPoint(raftPeer.getId()), 
DEFAULT_PRIORITY));
+            Utils.fromNodeInfoAndPriorityToRaftPeer(
+                Utils.fromRaftPeerIdToNodeId(raftPeer.getId()),
+                Utils.fromRaftPeerAddressToTEndPoint(raftPeer.getAddress()),
+                DEFAULT_PRIORITY));
       }
     }
 
@@ -603,8 +607,8 @@ class RatisConsensus implements IConsensus {
     if (leaderId == null) {
       return null;
     }
-    TEndPoint leaderEndpoint = Utils.formRaftPeerIdToTEndPoint(leaderId);
-    return new Peer(groupId, leaderEndpoint);
+    int nodeId = Utils.fromRaftPeerIdToNodeId(leaderId);
+    return new Peer(groupId, nodeId, null);
   }
 
   @Override
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java 
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
index 582efd9f35..3754031fa0 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
@@ -66,44 +66,58 @@ public class Utils {
     return groupCode;
   }
 
-  public static RaftPeerId fromTEndPointToRaftPeerId(TEndPoint endpoint) {
-    return RaftPeerId.valueOf(fromTEndPointToString(endpoint));
+  public static RaftPeerId fromNodeIdToRaftPeerId(int nodeId) {
+    return RaftPeerId.valueOf(String.valueOf(nodeId));
   }
 
-  public static TEndPoint formRaftPeerIdToTEndPoint(RaftPeerId id) {
-    String[] items = id.toString().split("_");
+  public static TEndPoint fromRaftPeerAddressToTEndPoint(String address) {
+    String[] items = address.split(":");
     return new TEndPoint(items[0], Integer.parseInt(items[1]));
   }
 
-  public static TEndPoint formRaftPeerProtoToTEndPoint(RaftPeerProto proto) {
+  public static int fromRaftPeerIdToNodeId(RaftPeerId id) {
+    return Integer.parseInt(id.toString());
+  }
+
+  public static TEndPoint fromRaftPeerProtoToTEndPoint(RaftPeerProto proto) {
     String[] items = proto.getAddress().split(":");
     return new TEndPoint(items[0], Integer.parseInt(items[1]));
   }
 
   // priority is used as ordinal of leader election
-  public static RaftPeer fromTEndPointAndPriorityToRaftPeer(TEndPoint 
endpoint, int priority) {
+  public static RaftPeer fromNodeInfoAndPriorityToRaftPeer(
+      int nodeId, TEndPoint endpoint, int priority) {
     return RaftPeer.newBuilder()
-        .setId(fromTEndPointToRaftPeerId(endpoint))
+        .setId(fromNodeIdToRaftPeerId(nodeId))
         .setAddress(HostAddress(endpoint))
         .setPriority(priority)
         .build();
   }
 
-  public static RaftPeer fromTEndPointAndPriorityToRaftPeer(Peer peer, int 
priority) {
-    return fromTEndPointAndPriorityToRaftPeer(peer.getEndpoint(), priority);
+  public static RaftPeer fromPeerAndPriorityToRaftPeer(Peer peer, int 
priority) {
+    return fromNodeInfoAndPriorityToRaftPeer(peer.getNodeId(), 
peer.getEndpoint(), priority);
   }
 
   public static List<RaftPeer> fromPeersAndPriorityToRaftPeers(List<Peer> 
peers, int priority) {
     return peers.stream()
-        .map(peer -> Utils.fromTEndPointAndPriorityToRaftPeer(peer, priority))
+        .map(peer -> Utils.fromPeerAndPriorityToRaftPeer(peer, priority))
         .collect(Collectors.toList());
   }
 
+  public static int fromRaftPeerProtoToNodeId(RaftPeerProto proto) {
+    return Integer.parseInt(proto.getId().toStringUtf8());
+  }
+
   public static List<Peer> fromRaftProtoListAndRaftGroupIdToPeers(
       List<RaftPeerProto> raftProtoList, RaftGroupId id) {
     ConsensusGroupId consensusGroupId = 
Utils.fromRaftGroupIdToConsensusGroupId(id);
     return raftProtoList.stream()
-        .map(peer -> new Peer(consensusGroupId, 
Utils.formRaftPeerProtoToTEndPoint(peer)))
+        .map(
+            peer ->
+                new Peer(
+                    consensusGroupId,
+                    Utils.fromRaftPeerProtoToNodeId(peer),
+                    Utils.fromRaftPeerProtoToTEndPoint(peer)))
         .collect(Collectors.toList());
   }
 
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
index 27d6679cdc..76bb103bc6 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
@@ -62,13 +62,15 @@ class StandAloneConsensus implements IConsensus {
   private final Logger logger = 
LoggerFactory.getLogger(StandAloneConsensus.class);
 
   private final TEndPoint thisNode;
+  private final int thisNodeId;
   private final File storageDir;
   private final IStateMachine.Registry registry;
   private final Map<ConsensusGroupId, StandAloneServerImpl> stateMachineMap =
       new ConcurrentHashMap<>();
 
   public StandAloneConsensus(ConsensusConfig config, Registry registry) {
-    this.thisNode = config.getThisNode();
+    this.thisNode = config.getThisNodeEndPoint();
+    this.thisNodeId = config.getThisNodeId();
     this.storageDir = new File(config.getStorageDir());
     this.registry = registry;
   }
@@ -92,7 +94,8 @@ class StandAloneConsensus implements IConsensus {
                   Integer.parseInt(items[0]), Integer.parseInt(items[1]));
           StandAloneServerImpl consensus =
               new StandAloneServerImpl(
-                  new Peer(consensusGroupId, thisNode), 
registry.apply(consensusGroupId));
+                  new Peer(consensusGroupId, thisNodeId, thisNode),
+                  registry.apply(consensusGroupId));
           stateMachineMap.put(consensusGroupId, consensus);
           consensus.start();
         }
@@ -143,7 +146,7 @@ class StandAloneConsensus implements IConsensus {
           .setException(new IllegalPeerNumException(consensusGroupSize))
           .build();
     }
-    if (!peers.contains(new Peer(groupId, thisNode))) {
+    if (!peers.contains(new Peer(groupId, thisNodeId, thisNode))) {
       return ConsensusGenericResponse.newBuilder()
           .setException(new IllegalPeerEndpointException(thisNode, peers))
           .build();
@@ -226,7 +229,7 @@ class StandAloneConsensus implements IConsensus {
     if (!stateMachineMap.containsKey(groupId)) {
       return null;
     }
-    return new Peer(groupId, thisNode);
+    return new Peer(groupId, thisNodeId, thisNode);
   }
 
   @Override
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
index 494a12ee1e..b59f4e1f53 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
@@ -49,7 +49,7 @@ public class StandAloneServerImpl implements IStateMachine {
   public void start() {
     stateMachine.start();
     // Notify itself as the leader
-    stateMachine.event().notifyLeaderChanged(peer.getGroupId(), 
peer.getEndpoint());
+    stateMachine.event().notifyLeaderChanged(peer.getGroupId(), 
peer.getNodeId());
   }
 
   @Override
diff --git 
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
 
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
index a170b5a269..ca17b52cd5 100644
--- 
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
+++ 
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
@@ -51,9 +51,9 @@ public class MultiLeaderConsensusTest {
 
   private final List<Peer> peers =
       Arrays.asList(
-          new Peer(gid, new TEndPoint("127.0.0.1", 6000)),
-          new Peer(gid, new TEndPoint("127.0.0.1", 6001)),
-          new Peer(gid, new TEndPoint("127.0.0.1", 6002)));
+          new Peer(gid, 1, new TEndPoint("127.0.0.1", 6000)),
+          new Peer(gid, 2, new TEndPoint("127.0.0.1", 6001)),
+          new Peer(gid, 3, new TEndPoint("127.0.0.1", 6002)));
 
   private final List<File> peersStorage =
       Arrays.asList(
@@ -90,6 +90,7 @@ public class MultiLeaderConsensusTest {
               ConsensusFactory.getConsensusImpl(
                       ConsensusFactory.MultiLeaderConsensus,
                       ConsensusConfig.newBuilder()
+                          .setThisNodeId(peers.get(i).getNodeId())
                           .setThisNode(peers.get(i).getEndpoint())
                           .setStorageDir(peersStorage.get(i).getAbsolutePath())
                           .build(),
diff --git 
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/RecoveryTest.java
 
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/RecoveryTest.java
index 9b5943a534..1c38644fbc 100644
--- 
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/RecoveryTest.java
+++ 
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/RecoveryTest.java
@@ -49,6 +49,7 @@ public class RecoveryTest {
         ConsensusFactory.getConsensusImpl(
                 ConsensusFactory.MultiLeaderConsensus,
                 ConsensusConfig.newBuilder()
+                    .setThisNodeId(1)
                     .setThisNode(new TEndPoint("0.0.0.0", 9000))
                     .setStorageDir("target" + java.io.File.separator + 
"recovery")
                     .build(),
@@ -77,7 +78,7 @@ public class RecoveryTest {
   public void recoveryTest() throws Exception {
     consensusImpl.createPeer(
         schemaRegionId,
-        Collections.singletonList(new Peer(schemaRegionId, new 
TEndPoint("0.0.0.0", 9000))));
+        Collections.singletonList(new Peer(schemaRegionId, 1, new 
TEndPoint("0.0.0.0", 9000))));
 
     consensusImpl.deletePeer(schemaRegionId);
 
@@ -89,7 +90,7 @@ public class RecoveryTest {
     ConsensusGenericResponse response =
         consensusImpl.createPeer(
             schemaRegionId,
-            Collections.singletonList(new Peer(schemaRegionId, new 
TEndPoint("0.0.0.0", 9000))));
+            Collections.singletonList(new Peer(schemaRegionId, 1, new 
TEndPoint("0.0.0.0", 9000))));
 
     Assert.assertTrue(response.isSuccess());
   }
diff --git 
a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
 
b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
index e950fd753a..8d9d0615b5 100644
--- 
a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
+++ 
b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
@@ -75,6 +75,7 @@ public class RatisConsensusTest {
           ConsensusFactory.getConsensusImpl(
                   ConsensusFactory.RatisConsensus,
                   ConsensusConfig.newBuilder()
+                      .setThisNodeId(peers.get(i).getNodeId())
                       .setThisNode(peers.get(i).getEndpoint())
                       .setRatisConfig(config)
                       .setStorageDir(peersStorage.get(i).getAbsolutePath())
@@ -94,9 +95,9 @@ public class RatisConsensusTest {
   public void setUp() throws IOException {
     gid = new DataRegionId(1);
     peers = new ArrayList<>();
-    peers.add(new Peer(gid, new TEndPoint("127.0.0.1", 6000)));
-    peers.add(new Peer(gid, new TEndPoint("127.0.0.1", 6001)));
-    peers.add(new Peer(gid, new TEndPoint("127.0.0.1", 6002)));
+    peers.add(new Peer(gid, 1, new TEndPoint("127.0.0.1", 6000)));
+    peers.add(new Peer(gid, 2, new TEndPoint("127.0.0.1", 6001)));
+    peers.add(new Peer(gid, 3, new TEndPoint("127.0.0.1", 6002)));
     peersStorage = new ArrayList<>();
     peersStorage.add(new File("target" + java.io.File.separator + "1"));
     peersStorage.add(new File("target" + java.io.File.separator + "2"));
diff --git 
a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java 
b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
index d383fe902b..0c66c38330 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
@@ -76,6 +76,7 @@ public class TestUtils {
     private AtomicInteger integer;
     private final Logger logger = 
LoggerFactory.getLogger(IntegerCounter.class);
     private TEndPoint leaderEndpoint;
+    private int leaderId;
     private List<Peer> configuration;
 
     @Override
@@ -131,11 +132,11 @@ public class TestUtils {
     }
 
     @Override
-    public void notifyLeaderChanged(ConsensusGroupId groupId, TEndPoint 
newLeader) {
-      this.leaderEndpoint = newLeader;
+    public void notifyLeaderChanged(ConsensusGroupId groupId, int newLeaderId) 
{
+      this.leaderId = newLeaderId;
       System.out.println("---------newLeader-----------");
       System.out.println(groupId);
-      System.out.println(newLeader);
+      System.out.println(newLeaderId);
       System.out.println("----------------------");
     }
 
diff --git 
a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java
 
b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java
index df88b7ffd2..6288439d12 100644
--- 
a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java
+++ 
b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java
@@ -48,6 +48,7 @@ public class RecoveryTest {
         ConsensusFactory.getConsensusImpl(
                 ConsensusFactory.StandAloneConsensus,
                 ConsensusConfig.newBuilder()
+                    .setThisNodeId(1)
                     .setThisNode(new TEndPoint("0.0.0.0", 9000))
                     .setStorageDir("target" + java.io.File.separator + 
"recovery")
                     .build(),
@@ -76,7 +77,7 @@ public class RecoveryTest {
   public void recoveryTest() throws Exception {
     consensusImpl.createPeer(
         schemaRegionId,
-        Collections.singletonList(new Peer(schemaRegionId, new 
TEndPoint("0.0.0.0", 9000))));
+        Collections.singletonList(new Peer(schemaRegionId, 1, new 
TEndPoint("0.0.0.0", 9000))));
 
     consensusImpl.stop();
     consensusImpl = null;
@@ -86,7 +87,7 @@ public class RecoveryTest {
     ConsensusGenericResponse response =
         consensusImpl.createPeer(
             schemaRegionId,
-            Collections.singletonList(new Peer(schemaRegionId, new 
TEndPoint("0.0.0.0", 9000))));
+            Collections.singletonList(new Peer(schemaRegionId, 1, new 
TEndPoint("0.0.0.0", 9000))));
 
     Assert.assertEquals(
         response.getException().getMessage(),
diff --git 
a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
 
b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
index 47d58a8a7c..6a386c5e1a 100644
--- 
a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
+++ 
b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
@@ -128,6 +128,7 @@ public class StandAloneConsensusTest {
         ConsensusFactory.getConsensusImpl(
                 ConsensusFactory.StandAloneConsensus,
                 ConsensusConfig.newBuilder()
+                    .setThisNodeId(1)
                     .setThisNode(new TEndPoint("0.0.0.0", 6667))
                     .setStorageDir("target" + java.io.File.separator + 
"standalone")
                     .build(),
@@ -160,14 +161,14 @@ public class StandAloneConsensusTest {
     ConsensusGenericResponse response1 =
         consensusImpl.createPeer(
             dataRegionId,
-            Collections.singletonList(new Peer(dataRegionId, new 
TEndPoint("0.0.0.0", 6667))));
+            Collections.singletonList(new Peer(dataRegionId, 1, new 
TEndPoint("0.0.0.0", 6667))));
     assertTrue(response1.isSuccess());
     assertNull(response1.getException());
 
     ConsensusGenericResponse response2 =
         consensusImpl.createPeer(
             dataRegionId,
-            Collections.singletonList(new Peer(dataRegionId, new 
TEndPoint("0.0.0.0", 6667))));
+            Collections.singletonList(new Peer(dataRegionId, 1, new 
TEndPoint("0.0.0.0", 6667))));
     assertFalse(response2.isSuccess());
     assertTrue(response2.getException() instanceof 
ConsensusGroupAlreadyExistException);
 
@@ -175,22 +176,22 @@ public class StandAloneConsensusTest {
         consensusImpl.createPeer(
             dataRegionId,
             Arrays.asList(
-                new Peer(dataRegionId, new TEndPoint("0.0.0.0", 6667)),
-                new Peer(dataRegionId, new TEndPoint("0.0.0.1", 6667))));
+                new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 6667)),
+                new Peer(dataRegionId, 1, new TEndPoint("0.0.0.1", 6667))));
     assertFalse(response3.isSuccess());
     assertTrue(response3.getException() instanceof IllegalPeerNumException);
 
     ConsensusGenericResponse response4 =
         consensusImpl.createPeer(
             dataRegionId,
-            Collections.singletonList(new Peer(dataRegionId, new 
TEndPoint("0.0.0.1", 6667))));
+            Collections.singletonList(new Peer(dataRegionId, 1, new 
TEndPoint("0.0.0.1", 6667))));
     assertFalse(response4.isSuccess());
     assertTrue(response4.getException() instanceof 
IllegalPeerEndpointException);
 
     ConsensusGenericResponse response5 =
         consensusImpl.createPeer(
             schemaRegionId,
-            Collections.singletonList(new Peer(schemaRegionId, new 
TEndPoint("0.0.0.0", 6667))));
+            Collections.singletonList(new Peer(schemaRegionId, 1, new 
TEndPoint("0.0.0.0", 6667))));
     assertTrue(response5.isSuccess());
     assertNull(response5.getException());
   }
@@ -204,7 +205,7 @@ public class StandAloneConsensusTest {
     ConsensusGenericResponse response2 =
         consensusImpl.createPeer(
             dataRegionId,
-            Collections.singletonList(new Peer(dataRegionId, new 
TEndPoint("0.0.0.0", 6667))));
+            Collections.singletonList(new Peer(dataRegionId, 1, new 
TEndPoint("0.0.0.0", 6667))));
     assertTrue(response2.isSuccess());
     assertNull(response2.getException());
 
@@ -216,7 +217,8 @@ public class StandAloneConsensusTest {
   @Test
   public void addPeer() {
     ConsensusGenericResponse response =
-        consensusImpl.addPeer(dataRegionId, new Peer(dataRegionId, new 
TEndPoint("0.0.0.0", 6667)));
+        consensusImpl.addPeer(
+            dataRegionId, new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 
6667)));
     assertFalse(response.isSuccess());
   }
 
@@ -224,7 +226,7 @@ public class StandAloneConsensusTest {
   public void removePeer() {
     ConsensusGenericResponse response =
         consensusImpl.removePeer(
-            dataRegionId, new Peer(dataRegionId, new TEndPoint("0.0.0.0", 
6667)));
+            dataRegionId, new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 
6667)));
     assertFalse(response.isSuccess());
   }
 
@@ -233,7 +235,7 @@ public class StandAloneConsensusTest {
     ConsensusGenericResponse response =
         consensusImpl.changePeer(
             dataRegionId,
-            Collections.singletonList(new Peer(dataRegionId, new 
TEndPoint("0.0.0.0", 6667))));
+            Collections.singletonList(new Peer(dataRegionId, 1, new 
TEndPoint("0.0.0.0", 6667))));
     assertFalse(response.isSuccess());
   }
 
@@ -241,7 +243,7 @@ public class StandAloneConsensusTest {
   public void transferLeader() {
     ConsensusGenericResponse response =
         consensusImpl.transferLeader(
-            dataRegionId, new Peer(dataRegionId, new TEndPoint("0.0.0.0", 
6667)));
+            dataRegionId, new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 
6667)));
     assertFalse(response.isSuccess());
   }
 
@@ -256,21 +258,21 @@ public class StandAloneConsensusTest {
     ConsensusGenericResponse response1 =
         consensusImpl.createPeer(
             dataRegionId,
-            Collections.singletonList(new Peer(dataRegionId, new 
TEndPoint("0.0.0.0", 6667))));
+            Collections.singletonList(new Peer(dataRegionId, 1, new 
TEndPoint("0.0.0.0", 6667))));
     assertTrue(response1.isSuccess());
     assertNull(response1.getException());
 
     ConsensusGenericResponse response2 =
         consensusImpl.createPeer(
             schemaRegionId,
-            Collections.singletonList(new Peer(schemaRegionId, new 
TEndPoint("0.0.0.0", 6667))));
+            Collections.singletonList(new Peer(schemaRegionId, 1, new 
TEndPoint("0.0.0.0", 6667))));
     assertTrue(response2.isSuccess());
     assertNull(response2.getException());
 
     ConsensusGenericResponse response3 =
         consensusImpl.createPeer(
             configId,
-            Collections.singletonList(new Peer(configId, new 
TEndPoint("0.0.0.0", 6667))));
+            Collections.singletonList(new Peer(configId, 1, new 
TEndPoint("0.0.0.0", 6667))));
     assertTrue(response3.isSuccess());
     assertNull(response3.getException());
 
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java 
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
index 292eebf642..5e6616e678 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -48,6 +48,8 @@ public class IoTDBConstant {
   public static final String IOTDB_CONF = "IOTDB_CONF";
   public static final String GLOBAL_DB_NAME = "IoTDB";
 
+  public static final String CONFIG_NODE_ID = "config_node_id";
+  public static final String DATA_NODE_ID = "data_node_id";
   public static final String RPC_ADDRESS = "rpc_address";
   public static final String RPC_PORT = "rpc_port";
   public static final String INTERNAL_ADDRESS = "internal_address";
diff --git 
a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java 
b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index 2ebff31347..9aeef0be07 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
 import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
+import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
@@ -657,7 +658,7 @@ public class ConfigNodeClient
   }
 
   @Override
-  public TSStatus registerConfigNode(TConfigNodeRegisterReq req) throws 
TException {
+  public TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq 
req) throws TException {
     throw new TException("DataNode to ConfigNode client doesn't support 
registerConfigNode.");
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index b611f442f8..05d5cb8831 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -60,6 +60,7 @@ public class DataRegionConsensusImpl {
           ConsensusFactory.getConsensusImpl(
                   conf.getDataRegionConsensusProtocolClass(),
                   ConsensusConfig.newBuilder()
+                      .setThisNodeId(conf.getDataNodeId())
                       .setThisNode(
                           new TEndPoint(
                               conf.getInternalAddress(), 
conf.getDataRegionConsensusPort()))
diff --git 
a/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
index 4ae4b63c42..cd41d45325 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
@@ -58,6 +58,7 @@ public class SchemaRegionConsensusImpl {
           ConsensusFactory.getConsensusImpl(
                   conf.getSchemaRegionConsensusProtocolClass(),
                   ConsensusConfig.newBuilder()
+                      .setThisNodeId(conf.getDataNodeId())
                       .setThisNode(
                           new TEndPoint(
                               conf.getInternalAddress(), 
conf.getSchemaRegionConsensusPort()))
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java 
b/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
index 09707a4e92..a1f73fdb14 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
@@ -271,7 +271,9 @@ public class RegionMigrateService implements IService {
           if (!addPeerSucceed) {
             Thread.sleep(SLEEP_MILLIS);
           }
-          resp = addRegionPeer(regionId, new Peer(regionId, newPeerNode));
+          resp =
+              addRegionPeer(
+                  regionId, new Peer(regionId, 
selectedDataNode.getDataNodeId(), newPeerNode));
           addPeerSucceed = true;
         } catch (Throwable e) {
           addPeerSucceed = false;
@@ -378,7 +380,9 @@ public class RegionMigrateService implements IService {
           if (!removePeerSucceed) {
             Thread.sleep(SLEEP_MILLIS);
           }
-          resp = removeRegionPeer(regionId, new Peer(regionId, oldPeerNode));
+          resp =
+              removeRegionPeer(
+                  regionId, new Peer(regionId, 
selectedDataNode.getDataNodeId(), oldPeerNode));
           removePeerSucceed = true;
         } catch (Throwable e) {
           removePeerSucceed = false;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 63df1528e1..46f7b7fa0f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -811,7 +811,7 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
     TConsensusGroupId tgId = req.getRegionId();
     ConsensusGroupId regionId = 
ConsensusGroupId.Factory.createFromTConsensusGroupId(tgId);
     TEndPoint newNode = getConsensusEndPoint(req.getNewLeaderNode(), regionId);
-    Peer newLeaderPeer = new Peer(regionId, newNode);
+    Peer newLeaderPeer = new Peer(regionId, 
req.getNewLeaderNode().getDataNodeId(), newNode);
     if (!isLeader(regionId)) {
       LOGGER.info("region {} is not leader, no need to change leader", 
regionId);
       return status;
@@ -859,7 +859,12 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
         
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getRegionId());
     List<Peer> peers =
         req.getRegionLocations().stream()
-            .map(location -> new Peer(regionId, getConsensusEndPoint(location, 
regionId)))
+            .map(
+                location ->
+                    new Peer(
+                        regionId,
+                        location.getDataNodeId(),
+                        getConsensusEndPoint(location, regionId)))
             .collect(Collectors.toList());
     TSStatus status = createNewRegion(regionId, req.getStorageGroup(), 
req.getTtl());
     if (!isSucceed(status)) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeRegionManager.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeRegionManager.java
index 3a9ecdbc0a..18b56fceb5 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeRegionManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeRegionManager.java
@@ -116,7 +116,7 @@ public class DataNodeRegionManager {
             new TEndPoint(
                 dataNodeLocation.getSchemaRegionConsensusEndPoint().getIp(),
                 dataNodeLocation.getSchemaRegionConsensusEndPoint().getPort());
-        peers.add(new Peer(schemaRegionId, endpoint));
+        peers.add(new Peer(schemaRegionId, dataNodeLocation.getDataNodeId(), 
endpoint));
       }
       ConsensusGenericResponse consensusGenericResponse =
           SchemaRegionConsensusImpl.getInstance().createPeer(schemaRegionId, 
peers);
@@ -152,7 +152,7 @@ public class DataNodeRegionManager {
             new TEndPoint(
                 dataNodeLocation.getDataRegionConsensusEndPoint().getIp(),
                 dataNodeLocation.getDataRegionConsensusEndPoint().getPort());
-        peers.add(new Peer(dataRegionId, endpoint));
+        peers.add(new Peer(dataRegionId, dataNodeLocation.getDataNodeId(), 
endpoint));
       }
       ConsensusGenericResponse consensusGenericResponse =
           DataRegionConsensusImpl.getInstance().createPeer(dataRegionId, 
peers);
diff --git 
a/server/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
 
b/server/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
index e3008808d0..7d11039ef6 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
@@ -69,9 +69,13 @@ public class DataNodeInternalRPCServiceImplTest {
   private static final IoTDBConfig conf = 
IoTDBDescriptor.getInstance().getConfig();
   DataNodeInternalRPCServiceImpl dataNodeInternalRPCServiceImpl;
   static LocalConfigNode configNode;
+  private static final int dataNodeId = 0;
 
   @BeforeClass
   public static void setUpBeforeClass() throws IOException, MetadataException {
+    // In standalone mode, we need to set dataNodeId to 0 for RaftPeerId in 
RatisConsensus
+    conf.setDataNodeId(dataNodeId);
+
     IoTDB.configManager.init();
     configNode = LocalConfigNode.getInstance();
     configNode.getBelongedSchemaRegionIdWithAutoCreate(new 
PartialPath("root.ln"));
@@ -332,7 +336,8 @@ public class DataNodeInternalRPCServiceImplTest {
 
     // construct fragmentInstance
     return new TRegionReplicaSet(
-        new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 0), 
dataNodeList);
+        new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 
conf.getDataNodeId()),
+        dataNodeList);
   }
 
   private List<Peer> genSchemaRegionPeerList(TRegionReplicaSet 
regionReplicaSet) {
@@ -341,6 +346,7 @@ public class DataNodeInternalRPCServiceImplTest {
       peerList.add(
           new Peer(
               new SchemaRegionId(regionReplicaSet.getRegionId().getId()),
+              node.getDataNodeId(),
               node.getSchemaRegionConsensusEndPoint()));
     }
     return peerList;
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift 
b/thrift-confignode/src/main/thrift/confignode.thrift
index 4f4fc495d5..1b9581a710 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -274,6 +274,11 @@ struct TConfigNodeRegisterReq {
   13: required double diskSpaceWarningThreshold
 }
 
+struct TConfigNodeRegisterResp {
+  1: required common.TSStatus status
+  2: required i32 configNodeId
+}
+
 struct TAddConsensusGroupReq {
   1: required list<common.TConfigNodeLocation> configNodeList
 }
@@ -675,7 +680,7 @@ service IConfigNodeRPCService {
    *         ERROR_GLOBAL_CONFIG if some global configurations in the 
Non-Seed-ConfigNode
    *                             are inconsist with the ConfigNode-leader
    */
-  common.TSStatus registerConfigNode(TConfigNodeRegisterReq req)
+  TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq req)
 
   /** The ConfigNode-leader will guide the Non-Seed-ConfigNode to join the 
ConsensusGroup when first startup */
   common.TSStatus addConsensusGroup(TAddConsensusGroupReq req)
diff --git a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift 
b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
index 380f30afd1..345fc70900 100644
--- a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
+++ b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
@@ -56,6 +56,7 @@ struct TSyncLogRes {
 struct TBuildSyncLogChannelReq {
   1: required common.TConsensusGroupId consensusGroupId
   2: required common.TEndPoint endPoint
+  3: required i32 nodeId
 }
 
 struct TBuildSyncLogChannelRes {
@@ -65,6 +66,7 @@ struct TBuildSyncLogChannelRes {
 struct TRemoveSyncLogChannelReq {
   1: required common.TConsensusGroupId consensusGroupId
   2: required common.TEndPoint endPoint
+  3: required i32 nodeId
 }
 
 struct TRemoveSyncLogChannelRes {

Reply via email to