This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 37ecc6e166 [IOTDB-3195] Added a configuration interface for the 
consensus layer (#6081)
37ecc6e166 is described below

commit 37ecc6e166ab5296c444260b9dd974fb78a52d5e
Author: Potato <[email protected]>
AuthorDate: Wed Jun 1 22:48:16 2022 +0800

    [IOTDB-3195] Added a configuration interface for the consensus layer (#6081)
---
 .../iotdb/confignode/manager/ConsensusManager.java |   8 +-
 .../apache/iotdb/consensus/ConsensusFactory.java   |  11 +-
 .../iotdb/consensus/config/ConsensusConfig.java    |  97 +++
 .../iotdb/consensus/config/MultiLeaderConfig.java  | 257 ++++++++
 .../apache/iotdb/consensus/config/RatisConfig.java | 695 +++++++++++++++++++++
 .../multileader/MultiLeaderConsensus.java          |  17 +-
 .../multileader/MultiLeaderServerImpl.java         |  15 +-
 .../multileader/client/DispatchLogHandler.java     |   7 +-
 .../client/MultiLeaderConsensusClientPool.java     |  16 +-
 .../conf/MultiLeaderConsensusConfig.java           |  40 --
 .../multileader/logdispatcher/LogDispatcher.java   |  39 +-
 .../multileader/logdispatcher/SyncStatus.java      |   8 +-
 .../multileader/service/MultiLeaderRPCService.java |  12 +-
 .../iotdb/consensus/ratis/RatisConsensus.java      |  30 +-
 .../org/apache/iotdb/consensus/ratis/Utils.java    |  65 ++
 .../consensus/standalone/StandAloneConsensus.java  |   7 +-
 .../multileader/MultiLeaderConsensusTest.java      |   7 +-
 .../multileader/logdispatcher/SyncStatusTest.java  |  79 +--
 .../iotdb/consensus/ratis/RatisConsensusTest.java  |   7 +-
 .../iotdb/consensus/standalone/RecoveryTest.java   |   7 +-
 .../standalone/StandAloneConsensusTest.java        |   7 +-
 .../db/consensus/DataRegionConsensusImpl.java      |  27 +-
 .../db/consensus/SchemaRegionConsensusImpl.java    |  10 +-
 23 files changed, 1299 insertions(+), 169 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
index 702f5cfa05..918c4b693f 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
@@ -35,12 +35,12 @@ import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -88,8 +88,10 @@ public class ConsensusManager {
     consensusImpl =
         ConsensusFactory.getConsensusImpl(
                 conf.getConfigNodeConsensusProtocolClass(),
-                new TEndPoint(conf.getRpcAddress(), conf.getConsensusPort()),
-                new File(conf.getConsensusDir()),
+                ConsensusConfig.newBuilder()
+                    .setThisNode(new TEndPoint(conf.getRpcAddress(), 
conf.getConsensusPort()))
+                    .setStorageDir(conf.getConsensusDir())
+                    .build(),
                 gid -> stateMachine)
             .orElseThrow(
                 () ->
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java 
b/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
index 15e2492b2e..8146fcf3a4 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
@@ -19,12 +19,11 @@
 
 package org.apache.iotdb.consensus;
 
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.util.Optional;
@@ -42,15 +41,13 @@ public class ConsensusFactory {
   private static final Logger logger = 
LoggerFactory.getLogger(ConsensusFactory.class);
 
   public static Optional<IConsensus> getConsensusImpl(
-      String className, TEndPoint endpoint, File storageDir, 
IStateMachine.Registry registry) {
+      String className, ConsensusConfig config, IStateMachine.Registry 
registry) {
     try {
       Class<?> executor = Class.forName(className);
       Constructor<?> executorConstructor =
-          executor.getDeclaredConstructor(
-              TEndPoint.class, File.class, IStateMachine.Registry.class);
+          executor.getDeclaredConstructor(ConsensusConfig.class, 
IStateMachine.Registry.class);
       executorConstructor.setAccessible(true);
-      return Optional.of(
-          (IConsensus) executorConstructor.newInstance(endpoint, storageDir, 
registry));
+      return Optional.of((IConsensus) executorConstructor.newInstance(config, 
registry));
     } catch (ClassNotFoundException
         | NoSuchMethodException
         | InstantiationException
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
new file mode 100644
index 0000000000..be3b4bf342
--- /dev/null
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.config;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+
+public class ConsensusConfig {
+
+  private final TEndPoint thisNode;
+  private final String storageDir;
+  private final RatisConfig ratisConfig;
+  private final MultiLeaderConfig multiLeaderConfig;
+
+  private ConsensusConfig(
+      TEndPoint thisNode,
+      String storageDir,
+      RatisConfig ratisConfig,
+      MultiLeaderConfig multiLeaderConfig) {
+    this.thisNode = thisNode;
+    this.storageDir = storageDir;
+    this.ratisConfig = ratisConfig;
+    this.multiLeaderConfig = multiLeaderConfig;
+  }
+
+  public TEndPoint getThisNode() {
+    return thisNode;
+  }
+
+  public String getStorageDir() {
+    return storageDir;
+  }
+
+  public RatisConfig getRatisConfig() {
+    return ratisConfig;
+  }
+
+  public MultiLeaderConfig getMultiLeaderConfig() {
+    return multiLeaderConfig;
+  }
+
+  public static ConsensusConfig.Builder newBuilder() {
+    return new ConsensusConfig.Builder();
+  }
+
+  public static class Builder {
+
+    private TEndPoint thisNode;
+    private String storageDir;
+    private RatisConfig ratisConfig;
+    private MultiLeaderConfig multiLeaderConfig;
+
+    public ConsensusConfig build() {
+      return new ConsensusConfig(
+          thisNode,
+          storageDir,
+          ratisConfig != null ? ratisConfig : RatisConfig.newBuilder().build(),
+          multiLeaderConfig != null ? multiLeaderConfig : 
MultiLeaderConfig.newBuilder().build());
+    }
+
+    public Builder setThisNode(TEndPoint thisNode) {
+      this.thisNode = thisNode;
+      return this;
+    }
+
+    public Builder setStorageDir(String storageDir) {
+      this.storageDir = storageDir;
+      return this;
+    }
+
+    public Builder setRatisConfig(RatisConfig ratisConfig) {
+      this.ratisConfig = ratisConfig;
+      return this;
+    }
+
+    public Builder setMultiLeaderConfig(MultiLeaderConfig multiLeaderConfig) {
+      this.multiLeaderConfig = multiLeaderConfig;
+      return this;
+    }
+  }
+}
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
new file mode 100644
index 0000000000..b98df9f822
--- /dev/null
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.config;
+
+import java.util.concurrent.TimeUnit;
+
+public class MultiLeaderConfig {
+
+  private final RPC rpc;
+  private final Replication replication;
+
+  private MultiLeaderConfig(RPC rpc, Replication replication) {
+    this.rpc = rpc;
+    this.replication = replication;
+  }
+
+  public RPC getRpc() {
+    return rpc;
+  }
+
+  public Replication getReplication() {
+    return replication;
+  }
+
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+
+    private RPC rpc;
+    private Replication replication;
+
+    public MultiLeaderConfig build() {
+      return new MultiLeaderConfig(
+          rpc != null ? rpc : new RPC.Builder().build(),
+          replication != null ? replication : new 
Replication.Builder().build());
+    }
+
+    public Builder setRpc(RPC rpc) {
+      this.rpc = rpc;
+      return this;
+    }
+
+    public Builder setReplication(Replication replication) {
+      this.replication = replication;
+      return this;
+    }
+  }
+
+  public static class RPC {
+    private final int rpcMaxConcurrentClientNum;
+    private final int thriftServerAwaitTimeForStopService;
+    private final boolean isRpcThriftCompressionEnabled;
+    private final int selectorNumOfClientManager;
+    private final int connectionTimeoutInMs;
+
+    private RPC(
+        int rpcMaxConcurrentClientNum,
+        int thriftServerAwaitTimeForStopService,
+        boolean isRpcThriftCompressionEnabled,
+        int selectorNumOfClientManager,
+        int connectionTimeoutInMs) {
+      this.rpcMaxConcurrentClientNum = rpcMaxConcurrentClientNum;
+      this.thriftServerAwaitTimeForStopService = 
thriftServerAwaitTimeForStopService;
+      this.isRpcThriftCompressionEnabled = isRpcThriftCompressionEnabled;
+      this.selectorNumOfClientManager = selectorNumOfClientManager;
+      this.connectionTimeoutInMs = connectionTimeoutInMs;
+    }
+
+    public int getRpcMaxConcurrentClientNum() {
+      return rpcMaxConcurrentClientNum;
+    }
+
+    public int getThriftServerAwaitTimeForStopService() {
+      return thriftServerAwaitTimeForStopService;
+    }
+
+    public boolean isRpcThriftCompressionEnabled() {
+      return isRpcThriftCompressionEnabled;
+    }
+
+    public int getSelectorNumOfClientManager() {
+      return selectorNumOfClientManager;
+    }
+
+    public int getConnectionTimeoutInMs() {
+      return connectionTimeoutInMs;
+    }
+
+    public static RPC.Builder newBuilder() {
+      return new RPC.Builder();
+    }
+
+    public static class Builder {
+      private int rpcMaxConcurrentClientNum = 65535;
+      private int thriftServerAwaitTimeForStopService = 60;
+      private boolean isRpcThriftCompressionEnabled = false;
+      private int selectorNumOfClientManager = 1;
+      private int connectionTimeoutInMs = (int) TimeUnit.SECONDS.toMillis(20);
+
+      public RPC.Builder setRpcMaxConcurrentClientNum(int 
rpcMaxConcurrentClientNum) {
+        this.rpcMaxConcurrentClientNum = rpcMaxConcurrentClientNum;
+        return this;
+      }
+
+      public RPC.Builder setThriftServerAwaitTimeForStopService(
+          int thriftServerAwaitTimeForStopService) {
+        this.thriftServerAwaitTimeForStopService = 
thriftServerAwaitTimeForStopService;
+        return this;
+      }
+
+      public RPC.Builder setRpcThriftCompressionEnabled(boolean 
rpcThriftCompressionEnabled) {
+        isRpcThriftCompressionEnabled = rpcThriftCompressionEnabled;
+        return this;
+      }
+
+      public RPC.Builder setSelectorNumOfClientManager(int 
selectorNumOfClientManager) {
+        this.selectorNumOfClientManager = selectorNumOfClientManager;
+        return this;
+      }
+
+      public RPC.Builder setConnectionTimeoutInMs(int connectionTimeoutInMs) {
+        this.connectionTimeoutInMs = connectionTimeoutInMs;
+        return this;
+      }
+
+      public RPC build() {
+        return new RPC(
+            rpcMaxConcurrentClientNum,
+            thriftServerAwaitTimeForStopService,
+            isRpcThriftCompressionEnabled,
+            selectorNumOfClientManager,
+            connectionTimeoutInMs);
+      }
+    }
+  }
+
+  public static class Replication {
+    private final int maxPendingRequestNumPerNode;
+    private final int maxRequestPerBatch;
+    private final int maxPendingBatch;
+    private final int maxWaitingTimeForAccumulatingBatchInMs;
+    private final long basicRetryWaitTimeMs;
+    private final long maxRetryWaitTimeMs;
+
+    private Replication(
+        int maxPendingRequestNumPerNode,
+        int maxRequestPerBatch,
+        int maxPendingBatch,
+        int maxWaitingTimeForAccumulatingBatchInMs,
+        long basicRetryWaitTimeMs,
+        long maxRetryWaitTimeMs) {
+      this.maxPendingRequestNumPerNode = maxPendingRequestNumPerNode;
+      this.maxRequestPerBatch = maxRequestPerBatch;
+      this.maxPendingBatch = maxPendingBatch;
+      this.maxWaitingTimeForAccumulatingBatchInMs = 
maxWaitingTimeForAccumulatingBatchInMs;
+      this.basicRetryWaitTimeMs = basicRetryWaitTimeMs;
+      this.maxRetryWaitTimeMs = maxRetryWaitTimeMs;
+    }
+
+    public int getMaxPendingRequestNumPerNode() {
+      return maxPendingRequestNumPerNode;
+    }
+
+    public int getMaxRequestPerBatch() {
+      return maxRequestPerBatch;
+    }
+
+    public int getMaxPendingBatch() {
+      return maxPendingBatch;
+    }
+
+    public int getMaxWaitingTimeForAccumulatingBatchInMs() {
+      return maxWaitingTimeForAccumulatingBatchInMs;
+    }
+
+    public long getBasicRetryWaitTimeMs() {
+      return basicRetryWaitTimeMs;
+    }
+
+    public long getMaxRetryWaitTimeMs() {
+      return maxRetryWaitTimeMs;
+    }
+
+    public static Replication.Builder newBuilder() {
+      return new Replication.Builder();
+    }
+
+    public static class Builder {
+      private int maxPendingRequestNumPerNode = 1000;
+      private int maxRequestPerBatch = 100;
+      private int maxPendingBatch = 50;
+      private int maxWaitingTimeForAccumulatingBatchInMs = 10;
+      private long basicRetryWaitTimeMs = TimeUnit.MILLISECONDS.toMillis(100);
+      private long maxRetryWaitTimeMs = TimeUnit.SECONDS.toMillis(20);
+
+      public Replication.Builder setMaxPendingRequestNumPerNode(int 
maxPendingRequestNumPerNode) {
+        this.maxPendingRequestNumPerNode = maxPendingRequestNumPerNode;
+        return this;
+      }
+
+      public Replication.Builder setMaxRequestPerBatch(int maxRequestPerBatch) 
{
+        this.maxRequestPerBatch = maxRequestPerBatch;
+        return this;
+      }
+
+      public Replication.Builder setMaxPendingBatch(int maxPendingBatch) {
+        this.maxPendingBatch = maxPendingBatch;
+        return this;
+      }
+
+      public Replication.Builder setMaxWaitingTimeForAccumulatingBatchInMs(
+          int maxWaitingTimeForAccumulatingBatchInMs) {
+        this.maxWaitingTimeForAccumulatingBatchInMs = 
maxWaitingTimeForAccumulatingBatchInMs;
+        return this;
+      }
+
+      public Replication.Builder setBasicRetryWaitTimeMs(long 
basicRetryWaitTimeMs) {
+        this.basicRetryWaitTimeMs = basicRetryWaitTimeMs;
+        return this;
+      }
+
+      public Replication.Builder setMaxRetryWaitTimeMs(long 
maxRetryWaitTimeMs) {
+        this.maxRetryWaitTimeMs = maxRetryWaitTimeMs;
+        return this;
+      }
+
+      public Replication build() {
+        return new Replication(
+            maxPendingRequestNumPerNode,
+            maxRequestPerBatch,
+            maxPendingBatch,
+            maxWaitingTimeForAccumulatingBatchInMs,
+            basicRetryWaitTimeMs,
+            maxRetryWaitTimeMs);
+      }
+    }
+  }
+}
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java 
b/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
new file mode 100644
index 0000000000..048fd21d76
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
@@ -0,0 +1,695 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.config;
+
+import org.apache.ratis.grpc.GrpcConfigKeys.Server;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.TimeDuration;
+
+import java.util.concurrent.TimeUnit;
+
+public class RatisConfig {
+
+  private final Rpc rpc;
+  private final LeaderElection leaderElection;
+  private final Snapshot snapshot;
+  private final ThreadPool threadPool;
+  private final Log log;
+  private final Grpc grpc;
+
+  private RatisConfig(
+      Rpc rpc,
+      LeaderElection leaderElection,
+      Snapshot snapshot,
+      ThreadPool threadPool,
+      Log log,
+      Grpc grpc) {
+    this.rpc = rpc;
+    this.leaderElection = leaderElection;
+    this.snapshot = snapshot;
+    this.threadPool = threadPool;
+    this.log = log;
+    this.grpc = grpc;
+  }
+
+  public Rpc getRpc() {
+    return rpc;
+  }
+
+  public LeaderElection getLeaderElection() {
+    return leaderElection;
+  }
+
+  public Snapshot getSnapshot() {
+    return snapshot;
+  }
+
+  public ThreadPool getThreadPool() {
+    return threadPool;
+  }
+
+  public Log getLog() {
+    return log;
+  }
+
+  public Grpc getGrpc() {
+    return grpc;
+  }
+
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+    private Rpc rpc;
+    private LeaderElection leaderElection;
+    private Snapshot snapshot;
+    private ThreadPool threadPool;
+    private Log log;
+    private Grpc grpc;
+
+    public RatisConfig build() {
+      return new RatisConfig(
+          rpc != null ? rpc : Rpc.newBuilder().build(),
+          leaderElection != null ? leaderElection : 
LeaderElection.newBuilder().build(),
+          snapshot != null ? snapshot : Snapshot.newBuilder().build(),
+          threadPool != null ? threadPool : ThreadPool.newBuilder().build(),
+          log != null ? log : Log.newBuilder().build(),
+          grpc != null ? grpc : Grpc.newBuilder().build());
+    }
+
+    public Builder setRpc(Rpc rpc) {
+      this.rpc = rpc;
+      return this;
+    }
+
+    public Builder setLeaderElection(LeaderElection leaderElection) {
+      this.leaderElection = leaderElection;
+      return this;
+    }
+
+    public Builder setSnapshot(Snapshot snapshot) {
+      this.snapshot = snapshot;
+      return this;
+    }
+
+    public Builder setThreadPool(ThreadPool threadPool) {
+      this.threadPool = threadPool;
+      return this;
+    }
+
+    public Builder setLog(Log log) {
+      this.log = log;
+      return this;
+    }
+
+    public Builder setGrpc(Grpc grpc) {
+      this.grpc = grpc;
+      return this;
+    }
+  }
+
+  /** server rpc timeout related */
+  public static class Rpc {
+    private final TimeDuration timeoutMin;
+    private final TimeDuration timeoutMax;
+    private final TimeDuration requestTimeout;
+    private final TimeDuration sleepTime;
+    private final TimeDuration slownessTimeout;
+
+    private Rpc(
+        TimeDuration timeoutMin,
+        TimeDuration timeoutMax,
+        TimeDuration requestTimeout,
+        TimeDuration sleepTime,
+        TimeDuration slownessTimeout) {
+      this.timeoutMin = timeoutMin;
+      this.timeoutMax = timeoutMax;
+      this.requestTimeout = requestTimeout;
+      this.sleepTime = sleepTime;
+      this.slownessTimeout = slownessTimeout;
+    }
+
+    public TimeDuration getTimeoutMin() {
+      return timeoutMin;
+    }
+
+    public TimeDuration getTimeoutMax() {
+      return timeoutMax;
+    }
+
+    public TimeDuration getRequestTimeout() {
+      return requestTimeout;
+    }
+
+    public TimeDuration getSleepTime() {
+      return sleepTime;
+    }
+
+    public TimeDuration getSlownessTimeout() {
+      return slownessTimeout;
+    }
+
+    public static Rpc.Builder newBuilder() {
+      return new Rpc.Builder();
+    }
+
+    public static class Builder {
+      private TimeDuration timeoutMin = TimeDuration.valueOf(2, 
TimeUnit.SECONDS);
+      private TimeDuration timeoutMax = TimeDuration.valueOf(8, 
TimeUnit.SECONDS);
+      private TimeDuration requestTimeout = TimeDuration.valueOf(20, 
TimeUnit.SECONDS);
+      private TimeDuration sleepTime = TimeDuration.valueOf(1, 
TimeUnit.SECONDS);
+      private TimeDuration slownessTimeout = TimeDuration.valueOf(10, 
TimeUnit.MINUTES);
+
+      public Rpc build() {
+        return new Rpc(timeoutMin, timeoutMax, requestTimeout, sleepTime, 
slownessTimeout);
+      }
+
+      public Rpc.Builder setTimeoutMin(TimeDuration timeoutMin) {
+        this.timeoutMin = timeoutMin;
+        return this;
+      }
+
+      public Rpc.Builder setTimeoutMax(TimeDuration timeoutMax) {
+        this.timeoutMax = timeoutMax;
+        return this;
+      }
+
+      public Rpc.Builder setRequestTimeout(TimeDuration requestTimeout) {
+        this.requestTimeout = requestTimeout;
+        return this;
+      }
+
+      public Rpc.Builder setSleepTime(TimeDuration sleepTime) {
+        this.sleepTime = sleepTime;
+        return this;
+      }
+
+      public Rpc.Builder setSlownessTimeout(TimeDuration slownessTimeout) {
+        this.slownessTimeout = slownessTimeout;
+        return this;
+      }
+    }
+  }
+
+  public static class LeaderElection {
+    private final TimeDuration leaderStepDownWaitTimeKey;
+    private final boolean preVote;
+
+    private LeaderElection(TimeDuration leaderStepDownWaitTimeKey, boolean 
preVote) {
+      this.leaderStepDownWaitTimeKey = leaderStepDownWaitTimeKey;
+      this.preVote = preVote;
+    }
+
+    public TimeDuration getLeaderStepDownWaitTimeKey() {
+      return leaderStepDownWaitTimeKey;
+    }
+
+    public boolean isPreVote() {
+      return preVote;
+    }
+
+    public static LeaderElection.Builder newBuilder() {
+      return new LeaderElection.Builder();
+    }
+
+    public static class Builder {
+      private TimeDuration leaderStepDownWaitTimeKey = 
TimeDuration.valueOf(30, TimeUnit.SECONDS);
+      private boolean preVote = 
RaftServerConfigKeys.LeaderElection.PRE_VOTE_DEFAULT;
+
+      public LeaderElection build() {
+        return new LeaderElection(leaderStepDownWaitTimeKey, preVote);
+      }
+
+      public LeaderElection.Builder setLeaderStepDownWaitTimeKey(
+          TimeDuration leaderStepDownWaitTimeKey) {
+        this.leaderStepDownWaitTimeKey = leaderStepDownWaitTimeKey;
+        return this;
+      }
+
+      public LeaderElection.Builder setPreVote(boolean preVote) {
+        this.preVote = preVote;
+        return this;
+      }
+    }
+  }
+
+  public static class Snapshot {
+    private final boolean autoTriggerEnabled;
+    private final long creationGap;
+    private final long autoTriggerThreshold;
+    private final int retentionFileNum;
+
+    private Snapshot(
+        boolean autoTriggerEnabled,
+        long creationGap,
+        long autoTriggerThreshold,
+        int retentionFileNum) {
+      this.autoTriggerEnabled = autoTriggerEnabled;
+      this.creationGap = creationGap;
+      this.autoTriggerThreshold = autoTriggerThreshold;
+      this.retentionFileNum = retentionFileNum;
+    }
+
+    public boolean isAutoTriggerEnabled() {
+      return autoTriggerEnabled;
+    }
+
+    public long getCreationGap() {
+      return creationGap;
+    }
+
+    public long getAutoTriggerThreshold() {
+      return autoTriggerThreshold;
+    }
+
+    public int getRetentionFileNum() {
+      return retentionFileNum;
+    }
+
+    public static Snapshot.Builder newBuilder() {
+      return new Snapshot.Builder();
+    }
+
+    public static class Builder {
+      private boolean autoTriggerEnabled = true;
+      private long creationGap = 
RaftServerConfigKeys.Snapshot.CREATION_GAP_DEFAULT;
+      private long autoTriggerThreshold =
+          RaftServerConfigKeys.Snapshot.AUTO_TRIGGER_THRESHOLD_DEFAULT;
+      private int retentionFileNum = 
RaftServerConfigKeys.Snapshot.RETENTION_FILE_NUM_DEFAULT;
+
+      public Snapshot build() {
+        return new Snapshot(
+            autoTriggerEnabled, creationGap, autoTriggerThreshold, 
retentionFileNum);
+      }
+
+      public Snapshot.Builder setAutoTriggerEnabled(boolean 
autoTriggerEnabled) {
+        this.autoTriggerEnabled = autoTriggerEnabled;
+        return this;
+      }
+
+      public Snapshot.Builder setCreationGap(long creationGap) {
+        this.creationGap = creationGap;
+        return this;
+      }
+
+      public Snapshot.Builder setAutoTriggerThreshold(long 
autoTriggerThreshold) {
+        this.autoTriggerThreshold = autoTriggerThreshold;
+        return this;
+      }
+
+      public Snapshot.Builder setRetentionFileNum(int retentionFileNum) {
+        this.retentionFileNum = retentionFileNum;
+        return this;
+      }
+    }
+  }
+
+  public static class ThreadPool {
+    private final boolean proxyCached;
+    private final int proxySize;
+    private final boolean serverCached;
+    private final int serverSize;
+    private final boolean clientCached;
+    private final int clientSize;
+
+    private ThreadPool(
+        boolean proxyCached,
+        int proxySize,
+        boolean serverCached,
+        int serverSize,
+        boolean clientCached,
+        int clientSize) {
+      this.proxyCached = proxyCached;
+      this.proxySize = proxySize;
+      this.serverCached = serverCached;
+      this.serverSize = serverSize;
+      this.clientCached = clientCached;
+      this.clientSize = clientSize;
+    }
+
+    public boolean isProxyCached() {
+      return proxyCached;
+    }
+
+    public int getProxySize() {
+      return proxySize;
+    }
+
+    public boolean isServerCached() {
+      return serverCached;
+    }
+
+    public int getServerSize() {
+      return serverSize;
+    }
+
+    public boolean isClientCached() {
+      return clientCached;
+    }
+
+    public int getClientSize() {
+      return clientSize;
+    }
+
+    public static ThreadPool.Builder newBuilder() {
+      return new ThreadPool.Builder();
+    }
+
+    public static class Builder {
+      private boolean proxyCached = 
RaftServerConfigKeys.ThreadPool.PROXY_CACHED_DEFAULT;
+      private int proxySize = 
RaftServerConfigKeys.ThreadPool.PROXY_SIZE_DEFAULT;
+      private boolean serverCached = 
RaftServerConfigKeys.ThreadPool.SERVER_CACHED_DEFAULT;
+      private int serverSize = 
RaftServerConfigKeys.ThreadPool.SERVER_SIZE_DEFAULT;
+      private boolean clientCached = 
RaftServerConfigKeys.ThreadPool.CLIENT_CACHED_DEFAULT;
+      private int clientSize = 
RaftServerConfigKeys.ThreadPool.CLIENT_SIZE_DEFAULT;
+
+      public ThreadPool build() {
+        return new ThreadPool(
+            proxyCached, proxySize, serverCached, serverSize, clientCached, 
clientSize);
+      }
+
+      public ThreadPool.Builder setProxyCached(boolean proxyCached) {
+        this.proxyCached = proxyCached;
+        return this;
+      }
+
+      public ThreadPool.Builder setProxySize(int proxySize) {
+        this.proxySize = proxySize;
+        return this;
+      }
+
+      public ThreadPool.Builder setServerCached(boolean serverCached) {
+        this.serverCached = serverCached;
+        return this;
+      }
+
+      public ThreadPool.Builder setServerSize(int serverSize) {
+        this.serverSize = serverSize;
+        return this;
+      }
+
+      public ThreadPool.Builder setClientCached(boolean clientCached) {
+        this.clientCached = clientCached;
+        return this;
+      }
+
+      public ThreadPool.Builder setClientSize(int clientSize) {
+        this.clientSize = clientSize;
+        return this;
+      }
+    }
+  }
+
+  public static class Log {
+
+    private final boolean useMemory;
+    private final int queueElementLimit;
+    private final SizeInBytes queueByteLimit;
+    private final int purgeGap;
+    private final boolean purgeUptoSnapshotIndex;
+    private final SizeInBytes segmentSizeMax;
+    private final int segmentCacheNumMax;
+    private final SizeInBytes segmentCacheSizeMax;
+    private final SizeInBytes preallocatedSize;
+    private final SizeInBytes writeBufferSize;
+    private final int forceSyncNum;
+    private final boolean unsafeFlushEnabled;
+
+    private Log(
+        boolean useMemory,
+        int queueElementLimit,
+        SizeInBytes queueByteLimit,
+        int purgeGap,
+        boolean purgeUptoSnapshotIndex,
+        SizeInBytes segmentSizeMax,
+        int segmentCacheNumMax,
+        SizeInBytes segmentCacheSizeMax,
+        SizeInBytes preallocatedSize,
+        SizeInBytes writeBufferSize,
+        int forceSyncNum,
+        boolean unsafeFlushEnabled) {
+      this.useMemory = useMemory;
+      this.queueElementLimit = queueElementLimit;
+      this.queueByteLimit = queueByteLimit;
+      this.purgeGap = purgeGap;
+      this.purgeUptoSnapshotIndex = purgeUptoSnapshotIndex;
+      this.segmentSizeMax = segmentSizeMax;
+      this.segmentCacheNumMax = segmentCacheNumMax;
+      this.segmentCacheSizeMax = segmentCacheSizeMax;
+      this.preallocatedSize = preallocatedSize;
+      this.writeBufferSize = writeBufferSize;
+      this.forceSyncNum = forceSyncNum;
+      this.unsafeFlushEnabled = unsafeFlushEnabled;
+    }
+
+    public boolean isUseMemory() {
+      return useMemory;
+    }
+
+    public int getQueueElementLimit() {
+      return queueElementLimit;
+    }
+
+    public SizeInBytes getQueueByteLimit() {
+      return queueByteLimit;
+    }
+
+    public int getPurgeGap() {
+      return purgeGap;
+    }
+
+    public boolean isPurgeUptoSnapshotIndex() {
+      return purgeUptoSnapshotIndex;
+    }
+
+    public SizeInBytes getSegmentSizeMax() {
+      return segmentSizeMax;
+    }
+
+    public int getSegmentCacheNumMax() {
+      return segmentCacheNumMax;
+    }
+
+    public SizeInBytes getSegmentCacheSizeMax() {
+      return segmentCacheSizeMax;
+    }
+
+    public SizeInBytes getPreallocatedSize() {
+      return preallocatedSize;
+    }
+
+    public SizeInBytes getWriteBufferSize() {
+      return writeBufferSize;
+    }
+
+    public int getForceSyncNum() {
+      return forceSyncNum;
+    }
+
+    public boolean isUnsafeFlushEnabled() {
+      return unsafeFlushEnabled;
+    }
+
+    public static Log.Builder newBuilder() {
+      return new Log.Builder();
+    }
+
+    public static class Builder {
+      private boolean useMemory = false;
+      private int queueElementLimit = 4096;
+      private SizeInBytes queueByteLimit = SizeInBytes.valueOf("64MB");
+      private int purgeGap = 1024;
+      private boolean purgeUptoSnapshotIndex = false;
+      private SizeInBytes segmentSizeMax = SizeInBytes.valueOf("8MB");
+      private int segmentCacheNumMax = 6;
+      private SizeInBytes segmentCacheSizeMax = SizeInBytes.valueOf("200MB");
+      private SizeInBytes preallocatedSize = SizeInBytes.valueOf("4MB");
+      private SizeInBytes writeBufferSize = SizeInBytes.valueOf("64KB");
+      private int forceSyncNum = 128;
+      private boolean unsafeFlushEnabled = false;
+
+      public Log build() {
+        return new Log(
+            useMemory,
+            queueElementLimit,
+            queueByteLimit,
+            purgeGap,
+            purgeUptoSnapshotIndex,
+            segmentSizeMax,
+            segmentCacheNumMax,
+            segmentCacheSizeMax,
+            preallocatedSize,
+            writeBufferSize,
+            forceSyncNum,
+            unsafeFlushEnabled);
+      }
+
+      public Log.Builder setUseMemory(boolean useMemory) {
+        this.useMemory = useMemory;
+        return this;
+      }
+
+      public Log.Builder setQueueElementLimit(int queueElementLimit) {
+        this.queueElementLimit = queueElementLimit;
+        return this;
+      }
+
+      public Log.Builder setQueueByteLimit(SizeInBytes queueByteLimit) {
+        this.queueByteLimit = queueByteLimit;
+        return this;
+      }
+
+      public Log.Builder setPurgeGap(int purgeGap) {
+        this.purgeGap = purgeGap;
+        return this;
+      }
+
+      public Log.Builder setPurgeUptoSnapshotIndex(boolean 
purgeUptoSnapshotIndex) {
+        this.purgeUptoSnapshotIndex = purgeUptoSnapshotIndex;
+        return this;
+      }
+
+      public Log.Builder setSegmentSizeMax(SizeInBytes segmentSizeMax) {
+        this.segmentSizeMax = segmentSizeMax;
+        return this;
+      }
+
+      public Log.Builder setSegmentCacheNumMax(int segmentCacheNumMax) {
+        this.segmentCacheNumMax = segmentCacheNumMax;
+        return this;
+      }
+
+      public Log.Builder setSegmentCacheSizeMax(SizeInBytes 
segmentCacheSizeMax) {
+        this.segmentCacheSizeMax = segmentCacheSizeMax;
+        return this;
+      }
+
+      public Log.Builder setPreallocatedSize(SizeInBytes preallocatedSize) {
+        this.preallocatedSize = preallocatedSize;
+        return this;
+      }
+
+      public Log.Builder setWriteBufferSize(SizeInBytes writeBufferSize) {
+        this.writeBufferSize = writeBufferSize;
+        return this;
+      }
+
+      public Log.Builder setForceSyncNum(int forceSyncNum) {
+        this.forceSyncNum = forceSyncNum;
+        return this;
+      }
+
+      public Log.Builder setUnsafeFlushEnabled(boolean unsafeFlushEnabled) {
+        this.unsafeFlushEnabled = unsafeFlushEnabled;
+        return this;
+      }
+    }
+  }
+
+  public static class Grpc {
+    private final SizeInBytes messageSizeMax;
+    private final SizeInBytes flowControlWindow;
+    private final boolean asyncRequestThreadPoolCached;
+    private final int asyncRequestThreadPoolSize;
+    private final int leaderOutstandingAppendsMax;
+
+    private Grpc(
+        SizeInBytes messageSizeMax,
+        SizeInBytes flowControlWindow,
+        boolean asyncRequestThreadPoolCached,
+        int asyncRequestThreadPoolSize,
+        int leaderOutstandingAppendsMax) {
+      this.messageSizeMax = messageSizeMax;
+      this.flowControlWindow = flowControlWindow;
+      this.asyncRequestThreadPoolCached = asyncRequestThreadPoolCached;
+      this.asyncRequestThreadPoolSize = asyncRequestThreadPoolSize;
+      this.leaderOutstandingAppendsMax = leaderOutstandingAppendsMax;
+    }
+
+    public SizeInBytes getMessageSizeMax() {
+      return messageSizeMax;
+    }
+
+    public SizeInBytes getFlowControlWindow() {
+      return flowControlWindow;
+    }
+
+    public boolean isAsyncRequestThreadPoolCached() {
+      return asyncRequestThreadPoolCached;
+    }
+
+    public int getAsyncRequestThreadPoolSize() {
+      return asyncRequestThreadPoolSize;
+    }
+
+    public int getLeaderOutstandingAppendsMax() {
+      return leaderOutstandingAppendsMax;
+    }
+
+    public static Grpc.Builder newBuilder() {
+      return new Grpc.Builder();
+    }
+
+    public static class Builder {
+      private SizeInBytes messageSizeMax = SizeInBytes.valueOf("512MB");
+      private SizeInBytes flowControlWindow = SizeInBytes.valueOf("4MB");
+      private boolean asyncRequestThreadPoolCached =
+          Server.ASYNC_REQUEST_THREAD_POOL_CACHED_DEFAULT;
+      private int asyncRequestThreadPoolSize = 
Server.ASYNC_REQUEST_THREAD_POOL_SIZE_DEFAULT;
+      private int leaderOutstandingAppendsMax = 
Server.LEADER_OUTSTANDING_APPENDS_MAX_DEFAULT;
+
+      public Grpc build() {
+        return new Grpc(
+            messageSizeMax,
+            flowControlWindow,
+            asyncRequestThreadPoolCached,
+            asyncRequestThreadPoolSize,
+            leaderOutstandingAppendsMax);
+      }
+
+      public Grpc.Builder setMessageSizeMax(SizeInBytes messageSizeMax) {
+        this.messageSizeMax = messageSizeMax;
+        return this;
+      }
+
+      public Grpc.Builder setFlowControlWindow(SizeInBytes flowControlWindow) {
+        this.flowControlWindow = flowControlWindow;
+        return this;
+      }
+
+      public Grpc.Builder setAsyncRequestThreadPoolCached(boolean 
asyncRequestThreadPoolCached) {
+        this.asyncRequestThreadPoolCached = asyncRequestThreadPoolCached;
+        return this;
+      }
+
+      public Grpc.Builder setAsyncRequestThreadPoolSize(int 
asyncRequestThreadPoolSize) {
+        this.asyncRequestThreadPoolSize = asyncRequestThreadPoolSize;
+        return this;
+      }
+
+      public Grpc.Builder setLeaderOutstandingAppendsMax(int 
leaderOutstandingAppendsMax) {
+        this.leaderOutstandingAppendsMax = leaderOutstandingAppendsMax;
+        return this;
+      }
+    }
+  }
+}
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
index c9393ad642..30484bfa09 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
@@ -31,6 +31,8 @@ import 
org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
+import org.apache.iotdb.consensus.config.MultiLeaderConfig;
 import 
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
 import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
 import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
@@ -64,12 +66,14 @@ public class MultiLeaderConsensus implements IConsensus {
       new ConcurrentHashMap<>();
   private final MultiLeaderRPCService service;
   private final RegisterManager registerManager = new RegisterManager();
+  private final MultiLeaderConfig config;
 
-  public MultiLeaderConsensus(TEndPoint thisNode, File storageDir, Registry 
registry) {
-    this.thisNode = thisNode;
-    this.storageDir = storageDir;
+  public MultiLeaderConsensus(ConsensusConfig config, Registry registry) {
+    this.thisNode = config.getThisNode();
+    this.storageDir = new File(config.getStorageDir());
+    this.config = config.getMultiLeaderConfig();
     this.registry = registry;
-    this.service = new MultiLeaderRPCService(thisNode);
+    this.service = new MultiLeaderRPCService(thisNode, 
config.getMultiLeaderConfig());
   }
 
   @Override
@@ -100,7 +104,8 @@ public class MultiLeaderConsensus implements IConsensus {
                   path.toString(),
                   new Peer(consensusGroupId, thisNode),
                   new ArrayList<>(),
-                  registry.apply(consensusGroupId));
+                  registry.apply(consensusGroupId),
+                  config);
           stateMachineMap.put(consensusGroupId, consensus);
           consensus.start();
         }
@@ -161,7 +166,7 @@ public class MultiLeaderConsensus implements IConsensus {
           }
           MultiLeaderServerImpl impl =
               new MultiLeaderServerImpl(
-                  path, new Peer(groupId, thisNode), peers, 
registry.apply(groupId));
+                  path, new Peer(groupId, thisNode), peers, 
registry.apply(groupId), config);
           impl.start();
           return impl;
         });
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index 017fe05a15..81b74e55c5 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
+import org.apache.iotdb.consensus.config.MultiLeaderConfig;
 import org.apache.iotdb.consensus.multileader.logdispatcher.IndexController;
 import org.apache.iotdb.consensus.multileader.logdispatcher.LogDispatcher;
 import org.apache.iotdb.consensus.multileader.thrift.TLogType;
@@ -55,9 +56,14 @@ public class MultiLeaderServerImpl {
   private final List<Peer> configuration;
   private final IndexController controller;
   private final LogDispatcher logDispatcher;
+  private final MultiLeaderConfig config;
 
   public MultiLeaderServerImpl(
-      String storageDir, Peer thisNode, List<Peer> configuration, 
IStateMachine stateMachine) {
+      String storageDir,
+      Peer thisNode,
+      List<Peer> configuration,
+      IStateMachine stateMachine,
+      MultiLeaderConfig config) {
     this.storageDir = storageDir;
     this.thisNode = thisNode;
     this.stateMachine = stateMachine;
@@ -69,7 +75,8 @@ public class MultiLeaderServerImpl {
     } else {
       persistConfiguration();
     }
-    logDispatcher = new LogDispatcher(this);
+    this.config = config;
+    this.logDispatcher = new LogDispatcher(this);
   }
 
   public IStateMachine getStateMachine() {
@@ -183,4 +190,8 @@ public class MultiLeaderServerImpl {
   public IndexController getController() {
     return controller;
   }
+
+  public MultiLeaderConfig getConfig() {
+    return config;
+  }
 }
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
index 8cc7d10bef..14a2b4ad58 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.consensus.multileader.client;
 
-import org.apache.iotdb.consensus.multileader.conf.MultiLeaderConsensusConfig;
 import 
org.apache.iotdb.consensus.multileader.logdispatcher.LogDispatcher.LogDispatcherThread;
 import org.apache.iotdb.consensus.multileader.logdispatcher.PendingBatch;
 import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
@@ -79,9 +78,11 @@ public class DispatchLogHandler implements 
AsyncMethodCallback<TSyncLogRes> {
           try {
             long defaultSleepTime =
                 (long)
-                    (MultiLeaderConsensusConfig.BASIC_RETRY_WAIT_TIME_MS * 
Math.pow(2, retryCount));
+                    
(thread.getConfig().getReplication().getBasicRetryWaitTimeMs()
+                        * Math.pow(2, retryCount));
             Thread.sleep(
-                Math.min(defaultSleepTime, 
MultiLeaderConsensusConfig.MAX_RETRY_WAIT_TIME_MS));
+                Math.min(
+                    defaultSleepTime, 
thread.getConfig().getReplication().getMaxRetryWaitTimeMs()));
           } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             logger.warn("Unexpected interruption during retry pending batch");
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/MultiLeaderConsensusClientPool.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/MultiLeaderConsensusClientPool.java
index a43a979694..e3687edf8f 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/MultiLeaderConsensusClientPool.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/MultiLeaderConsensusClientPool.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.commons.client.ClientFactoryProperty;
 import org.apache.iotdb.commons.client.ClientManager;
 import org.apache.iotdb.commons.client.ClientPoolProperty;
 import org.apache.iotdb.commons.client.IClientPoolFactory;
-import org.apache.iotdb.consensus.multileader.conf.MultiLeaderConsensusConfig;
+import org.apache.iotdb.consensus.config.MultiLeaderConfig;
 
 import org.apache.commons.pool2.KeyedObjectPool;
 import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
@@ -35,6 +35,13 @@ public class MultiLeaderConsensusClientPool {
 
   public static class AsyncMultiLeaderServiceClientPoolFactory
       implements IClientPoolFactory<TEndPoint, AsyncMultiLeaderServiceClient> {
+
+    private final MultiLeaderConfig config;
+
+    public AsyncMultiLeaderServiceClientPoolFactory(MultiLeaderConfig config) {
+      this.config = config;
+    }
+
     @Override
     public KeyedObjectPool<TEndPoint, AsyncMultiLeaderServiceClient> 
createClientPool(
         ClientManager<TEndPoint, AsyncMultiLeaderServiceClient> manager) {
@@ -42,11 +49,10 @@ public class MultiLeaderConsensusClientPool {
           new AsyncMultiLeaderServiceClient.Factory(
               manager,
               new ClientFactoryProperty.Builder()
-                  
.setConnectionTimeoutMs(MultiLeaderConsensusConfig.CONNECTION_TIMEOUT_IN_MS)
-                  .setRpcThriftCompressionEnabled(
-                      
MultiLeaderConsensusConfig.IS_RPC_THRIFT_COMPRESSION_ENABLED)
+                  
.setConnectionTimeoutMs(config.getRpc().getConnectionTimeoutInMs())
+                  
.setRpcThriftCompressionEnabled(config.getRpc().isRpcThriftCompressionEnabled())
                   .setSelectorNumOfAsyncClientManager(
-                      
MultiLeaderConsensusConfig.SELECTOR_NUM_OF_CLIENT_MANAGER)
+                      config.getRpc().getSelectorNumOfClientManager())
                   .build()),
           new 
ClientPoolProperty.Builder<AsyncMultiLeaderServiceClient>().build().getConfig());
     }
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/conf/MultiLeaderConsensusConfig.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/conf/MultiLeaderConsensusConfig.java
deleted file mode 100644
index 10a547728f..0000000000
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/conf/MultiLeaderConsensusConfig.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.consensus.multileader.conf;
-
-import java.util.concurrent.TimeUnit;
-
-// TODO make it configurable
-public class MultiLeaderConsensusConfig {
-
-  private MultiLeaderConsensusConfig() {}
-
-  public static final int RPC_MAX_CONCURRENT_CLIENT_NUM = 65535;
-  public static final int THRIFT_SERVER_AWAIT_TIME_FOR_STOP_SERVICE = 60;
-  public static final boolean IS_RPC_THRIFT_COMPRESSION_ENABLED = false;
-  public static final int SELECTOR_NUM_OF_CLIENT_MANAGER = 1;
-  public static final int CONNECTION_TIMEOUT_IN_MS = (int) 
TimeUnit.SECONDS.toMillis(20);
-  public static final int MAX_PENDING_REQUEST_NUM_PER_NODE = 1000;
-  public static final int MAX_REQUEST_PER_BATCH = 100;
-  public static final int MAX_PENDING_BATCH = 50;
-  public static final int MAX_WAITING_TIME_FOR_ACCUMULATE_BATCH_IN_MS = 10;
-  public static final long BASIC_RETRY_WAIT_TIME_MS = 
TimeUnit.MILLISECONDS.toMillis(100);
-  public static final long MAX_RETRY_WAIT_TIME_MS = 
TimeUnit.SECONDS.toMillis(20);
-}
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 3e664ce4e0..398d50fc93 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -25,11 +25,11 @@ import 
org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
+import org.apache.iotdb.consensus.config.MultiLeaderConfig;
 import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
 import 
org.apache.iotdb.consensus.multileader.client.AsyncMultiLeaderServiceClient;
 import org.apache.iotdb.consensus.multileader.client.DispatchLogHandler;
 import 
org.apache.iotdb.consensus.multileader.client.MultiLeaderConsensusClientPool.AsyncMultiLeaderServiceClientPoolFactory;
-import org.apache.iotdb.consensus.multileader.conf.MultiLeaderConsensusConfig;
 import org.apache.iotdb.consensus.multileader.thrift.TLogBatch;
 import org.apache.iotdb.consensus.multileader.thrift.TLogType;
 import org.apache.iotdb.consensus.multileader.thrift.TSyncLogReq;
@@ -72,14 +72,14 @@ public class LogDispatcher {
     this.threads =
         impl.getConfiguration().stream()
             .filter(x -> !Objects.equals(x, impl.getThisNode()))
-            .map(LogDispatcherThread::new)
+            .map(x -> new LogDispatcherThread(x, impl.getConfig()))
             .collect(Collectors.toList());
     if (!threads.isEmpty()) {
       this.executorService =
           IoTDBThreadPoolFactory.newFixedThreadPool(threads.size(), 
"LogDispatcher");
       this.clientManager =
           new IClientManager.Factory<TEndPoint, 
AsyncMultiLeaderServiceClient>()
-              .createClientManager(new 
AsyncMultiLeaderServiceClientPoolFactory());
+              .createClientManager(new 
AsyncMultiLeaderServiceClientPoolFactory(impl.getConfig()));
     }
   }
 
@@ -121,26 +121,29 @@ public class LogDispatcher {
 
   public class LogDispatcherThread implements Runnable {
 
-    private volatile boolean stopped = false;
+    private final MultiLeaderConfig config;
     private final Peer peer;
     private final IndexController controller;
     // A sliding window class that manages asynchronously pendingBatches
     private final SyncStatus syncStatus;
     // A queue used to receive asynchronous replication requests
-    private final BlockingQueue<IndexedConsensusRequest> pendingRequest =
-        new 
ArrayBlockingQueue<>(MultiLeaderConsensusConfig.MAX_PENDING_REQUEST_NUM_PER_NODE);
+    private final BlockingQueue<IndexedConsensusRequest> pendingRequest;
     // A container used to cache requests, whose size changes dynamically
     private final List<IndexedConsensusRequest> bufferedRequest = new 
LinkedList<>();
     // A reader management class that gets requests from the DataRegion
     private final ConsensusReqReader reader =
         (ConsensusReqReader) impl.getStateMachine().read(new 
GetConsensusReqReaderPlan());
+    private volatile boolean stopped = false;
 
-    public LogDispatcherThread(Peer peer) {
+    public LogDispatcherThread(Peer peer, MultiLeaderConfig config) {
       this.peer = peer;
+      this.config = config;
+      this.pendingRequest =
+          new 
ArrayBlockingQueue<>(config.getReplication().getMaxPendingRequestNumPerNode());
       this.controller =
           new IndexController(
               impl.getStorageDir(), 
Utils.fromTEndPointToString(peer.getEndpoint()), false);
-      this.syncStatus = new SyncStatus(controller);
+      this.syncStatus = new SyncStatus(controller, config);
     }
 
     public IndexController getController() {
@@ -155,6 +158,10 @@ public class LogDispatcher {
       return peer;
     }
 
+    public MultiLeaderConfig getConfig() {
+      return config;
+    }
+
     public boolean offer(IndexedConsensusRequest request) {
       return pendingRequest.offer(request);
     }
@@ -177,8 +184,8 @@ public class LogDispatcher {
             // we may block here if there is no requests in the queue
             bufferedRequest.add(pendingRequest.take());
             // If write pressure is low, we simply sleep a little to reduce 
the number of RPC
-            if (pendingRequest.size() <= 
MultiLeaderConsensusConfig.MAX_REQUEST_PER_BATCH) {
-              
Thread.sleep(MultiLeaderConsensusConfig.MAX_WAITING_TIME_FOR_ACCUMULATE_BATCH_IN_MS);
+            if (pendingRequest.size() <= 
config.getReplication().getMaxRequestPerBatch()) {
+              
Thread.sleep(config.getReplication().getMaxWaitingTimeForAccumulatingBatchInMs());
             }
           }
           // we may block here if the synchronization pipeline is full
@@ -200,11 +207,11 @@ public class LogDispatcher {
       long startIndex = syncStatus.getNextSendingIndex();
       long maxIndex = impl.getController().getCurrentIndex();
       long endIndex;
-      if (bufferedRequest.size() <= 
MultiLeaderConsensusConfig.MAX_REQUEST_PER_BATCH) {
+      if (bufferedRequest.size() <= 
config.getReplication().getMaxRequestPerBatch()) {
         // Use drainTo instead of poll to reduce lock overhead
         pendingRequest.drainTo(
             bufferedRequest,
-            MultiLeaderConsensusConfig.MAX_REQUEST_PER_BATCH - 
bufferedRequest.size());
+            config.getReplication().getMaxRequestPerBatch() - 
bufferedRequest.size());
       }
       if (bufferedRequest.isEmpty()) {
         // only execute this after a restart
@@ -217,7 +224,7 @@ public class LogDispatcher {
         // Prevents gap between logs. For example, some requests are not 
written into the queue when
         // the queue is full. In this case, requests need to be loaded from 
the WAL
         endIndex = constructBatchFromWAL(startIndex, prev.getSearchIndex(), 
logBatches);
-        if (logBatches.size() == 
MultiLeaderConsensusConfig.MAX_REQUEST_PER_BATCH) {
+        if (logBatches.size() == 
config.getReplication().getMaxRequestPerBatch()) {
           batch = new PendingBatch(startIndex, endIndex, logBatches);
           logger.debug("accumulated a {} from wal", batch);
           return batch;
@@ -226,14 +233,14 @@ public class LogDispatcher {
         endIndex = prev.getSearchIndex();
         iterator.remove();
         while (iterator.hasNext()
-            && logBatches.size() <= 
MultiLeaderConsensusConfig.MAX_REQUEST_PER_BATCH) {
+            && logBatches.size() <= 
config.getReplication().getMaxRequestPerBatch()) {
           IndexedConsensusRequest current = iterator.next();
           // Prevents gap between logs. For example, some logs are not written 
into the queue when
           // the queue is full. In this case, requests need to be loaded from 
the WAL
           if (current.getSearchIndex() != prev.getSearchIndex() + 1) {
             endIndex =
                 constructBatchFromWAL(prev.getSearchIndex(), 
current.getSearchIndex(), logBatches);
-            if (logBatches.size() == 
MultiLeaderConsensusConfig.MAX_REQUEST_PER_BATCH) {
+            if (logBatches.size() == 
config.getReplication().getMaxRequestPerBatch()) {
               batch = new PendingBatch(startIndex, endIndex, logBatches);
               logger.debug("accumulated a {} from queue and wal", batch);
               return batch;
@@ -271,7 +278,7 @@ public class LogDispatcher {
     private long constructBatchFromWAL(
         long currentIndex, long maxIndex, List<TLogBatch> logBatches) {
       while (currentIndex < maxIndex
-          && logBatches.size() < 
MultiLeaderConsensusConfig.MAX_REQUEST_PER_BATCH) {
+          && logBatches.size() < 
config.getReplication().getMaxRequestPerBatch()) {
         // TODO iterator
         IConsensusRequest data = reader.getReq(currentIndex++);
         if (data != null) {
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java
index fb521b9b07..e9901d931a 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.consensus.multileader.logdispatcher;
 
 import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.consensus.multileader.conf.MultiLeaderConsensusConfig;
+import org.apache.iotdb.consensus.config.MultiLeaderConfig;
 
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -28,17 +28,19 @@ import java.util.List;
 
 public class SyncStatus {
 
+  private final MultiLeaderConfig config;
   private final IndexController controller;
   private final List<PendingBatch> pendingBatches = new LinkedList<>();
 
-  public SyncStatus(IndexController controller) {
+  public SyncStatus(IndexController controller, MultiLeaderConfig config) {
     this.controller = controller;
+    this.config = config;
   }
 
   /** we may block here if the synchronization pipeline is full */
   public void addNextBatch(PendingBatch batch) throws InterruptedException {
     synchronized (this) {
-      while (pendingBatches.size() >= 
MultiLeaderConsensusConfig.MAX_PENDING_BATCH) {
+      while (pendingBatches.size() >= 
config.getReplication().getMaxPendingBatch()) {
         wait();
       }
       pendingBatches.add(batch);
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCService.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCService.java
index 694a7e038b..64dadf2aea 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCService.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCService.java
@@ -26,7 +26,7 @@ import 
org.apache.iotdb.commons.exception.runtime.RPCServiceException;
 import org.apache.iotdb.commons.service.ServiceType;
 import org.apache.iotdb.commons.service.ThriftService;
 import org.apache.iotdb.commons.service.ThriftServiceThread;
-import org.apache.iotdb.consensus.multileader.conf.MultiLeaderConsensusConfig;
+import org.apache.iotdb.consensus.config.MultiLeaderConfig;
 import 
org.apache.iotdb.consensus.multileader.thrift.MultiLeaderConsensusIService;
 
 import java.lang.reflect.InvocationTargetException;
@@ -34,10 +34,12 @@ import java.lang.reflect.InvocationTargetException;
 public class MultiLeaderRPCService extends ThriftService implements 
MultiLeaderRPCServiceMBean {
 
   private final TEndPoint thisNode;
+  private final MultiLeaderConfig config;
   private MultiLeaderRPCServiceProcessor multiLeaderRPCServiceProcessor;
 
-  public MultiLeaderRPCService(TEndPoint thisNode) {
+  public MultiLeaderRPCService(TEndPoint thisNode, MultiLeaderConfig config) {
     this.thisNode = thisNode;
+    this.config = config;
   }
 
   @Override
@@ -73,10 +75,10 @@ public class MultiLeaderRPCService extends ThriftService 
implements MultiLeaderR
               ThreadName.MULTI_LEADER_CONSENSUS_RPC_CLIENT.getName(),
               getBindIP(),
               getBindPort(),
-              MultiLeaderConsensusConfig.RPC_MAX_CONCURRENT_CLIENT_NUM,
-              
MultiLeaderConsensusConfig.THRIFT_SERVER_AWAIT_TIME_FOR_STOP_SERVICE,
+              config.getRpc().getRpcMaxConcurrentClientNum(),
+              config.getRpc().getThriftServerAwaitTimeForStopService(),
               new MultiLeaderRPCServiceHandler(multiLeaderRPCServiceProcessor),
-              MultiLeaderConsensusConfig.IS_RPC_THRIFT_COMPRESSION_ENABLED);
+              config.getRpc().isRpcThriftCompressionEnabled());
     } catch (RPCServiceException e) {
       throw new IllegalAccessException(e.getMessage());
     }
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java 
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 0c96034f7c..ff1a207403 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -36,6 +36,7 @@ import 
org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
 import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
 import 
org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
@@ -44,7 +45,6 @@ import 
org.apache.iotdb.consensus.exception.RatisRequestFailedException;
 
 import org.apache.commons.pool2.KeyedObjectPool;
 import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
-import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.client.RaftClientRpc;
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
@@ -62,7 +62,6 @@ import 
org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.apache.ratis.server.DivisionInfo;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.util.TimeDuration;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -109,29 +108,18 @@ class RatisConsensus implements IConsensus {
   // TODO make it configurable
   private static final int DEFAULT_WAIT_LEADER_READY_TIMEOUT = (int) 
TimeUnit.SECONDS.toMillis(20);
 
-  /**
-   * @param ratisStorageDir different groups of RatisConsensus Peer all share 
ratisStorageDir as
-   *     root dir
-   */
-  public RatisConsensus(TEndPoint endpoint, File ratisStorageDir, 
IStateMachine.Registry registry)
+  public RatisConsensus(ConsensusConfig config, IStateMachine.Registry 
registry)
       throws IOException {
-    myself = Utils.fromTEndPointAndPriorityToRaftPeer(endpoint, 
DEFAULT_PRIORITY);
+    myself = Utils.fromTEndPointAndPriorityToRaftPeer(config.getThisNode(), 
DEFAULT_PRIORITY);
 
     System.setProperty(
         
"org.apache.ratis.thirdparty.io.netty.allocator.useCacheForAllThreads", 
"false");
-    RaftServerConfigKeys.setStorageDir(properties, 
Collections.singletonList(ratisStorageDir));
-    RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(properties, true);
-    // TODO make this configurable so that RatisConsensusTest can trigger 
multiple snapshot process
-    // RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold(properties, 20);
-    RaftServerConfigKeys.Rpc.setSlownessTimeout(
-        properties, TimeDuration.valueOf(10, TimeUnit.MINUTES));
-    RaftServerConfigKeys.Rpc.setTimeoutMin(properties, TimeDuration.valueOf(2, 
TimeUnit.SECONDS));
-    RaftServerConfigKeys.Rpc.setTimeoutMax(properties, TimeDuration.valueOf(8, 
TimeUnit.SECONDS));
-    RaftServerConfigKeys.Rpc.setSleepTime(properties, TimeDuration.valueOf(1, 
TimeUnit.SECONDS));
-    RaftClientConfigKeys.Rpc.setRequestTimeout(
-        properties, TimeDuration.valueOf(20, TimeUnit.SECONDS));
-
-    GrpcConfigKeys.Server.setPort(properties, endpoint.getPort());
+    RaftServerConfigKeys.setStorageDir(
+        properties, Collections.singletonList(new 
File(config.getStorageDir())));
+    GrpcConfigKeys.Server.setPort(properties, config.getThisNode().getPort());
+
+    Utils.initRatisConfig(properties, config.getRatisConfig());
+
     clientRpc = new GrpcFactory(new 
Parameters()).newRaftClientRpc(ClientId.randomId(), properties);
 
     server =
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java 
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
index 93a351082c..7d3071ba98 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
@@ -22,11 +22,16 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.config.RatisConfig;
 
+import org.apache.ratis.client.RaftClientConfigKeys;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
 import org.apache.ratis.proto.RaftProtos.RaftPeerProto;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.thrift.TException;
@@ -154,4 +159,64 @@ public class Utils {
     String[] items = metadata.split("_");
     return TermIndex.valueOf(Long.parseLong(items[0]), 
Long.parseLong(items[1]));
   }
+
+  public static void initRatisConfig(RaftProperties properties, RatisConfig 
config) {
+    GrpcConfigKeys.setMessageSizeMax(properties, 
config.getGrpc().getMessageSizeMax());
+    GrpcConfigKeys.setFlowControlWindow(properties, 
config.getGrpc().getFlowControlWindow());
+    GrpcConfigKeys.Server.setAsyncRequestThreadPoolCached(
+        properties, config.getGrpc().isAsyncRequestThreadPoolCached());
+    GrpcConfigKeys.Server.setAsyncRequestThreadPoolSize(
+        properties, config.getGrpc().getAsyncRequestThreadPoolSize());
+    GrpcConfigKeys.Server.setLeaderOutstandingAppendsMax(
+        properties, config.getGrpc().getLeaderOutstandingAppendsMax());
+
+    RaftServerConfigKeys.Rpc.setSlownessTimeout(properties, 
config.getRpc().getSlownessTimeout());
+    RaftServerConfigKeys.Rpc.setTimeoutMin(properties, 
config.getRpc().getTimeoutMin());
+    RaftServerConfigKeys.Rpc.setTimeoutMax(properties, 
config.getRpc().getTimeoutMax());
+    RaftServerConfigKeys.Rpc.setSleepTime(properties, 
config.getRpc().getSleepTime());
+    RaftClientConfigKeys.Rpc.setRequestTimeout(properties, 
config.getRpc().getRequestTimeout());
+
+    RaftServerConfigKeys.LeaderElection.setLeaderStepDownWaitTime(
+        properties, config.getLeaderElection().getLeaderStepDownWaitTimeKey());
+    RaftServerConfigKeys.LeaderElection.setPreVote(
+        properties, config.getLeaderElection().isPreVote());
+
+    RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(
+        properties, config.getSnapshot().isAutoTriggerEnabled());
+    RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold(
+        properties, config.getSnapshot().getAutoTriggerThreshold());
+    RaftServerConfigKeys.Snapshot.setCreationGap(properties, 
config.getSnapshot().getCreationGap());
+    RaftServerConfigKeys.Snapshot.setRetentionFileNum(
+        properties, config.getSnapshot().getRetentionFileNum());
+
+    RaftServerConfigKeys.ThreadPool.setClientCached(
+        properties, config.getThreadPool().isClientCached());
+    RaftServerConfigKeys.ThreadPool.setClientSize(
+        properties, config.getThreadPool().getClientSize());
+    RaftServerConfigKeys.ThreadPool.setProxyCached(
+        properties, config.getThreadPool().isProxyCached());
+    RaftServerConfigKeys.ThreadPool.setProxySize(properties, 
config.getThreadPool().getProxySize());
+    RaftServerConfigKeys.ThreadPool.setServerCached(
+        properties, config.getThreadPool().isServerCached());
+    RaftServerConfigKeys.ThreadPool.setServerSize(
+        properties, config.getThreadPool().getServerSize());
+
+    RaftServerConfigKeys.Log.setUseMemory(properties, 
config.getLog().isUseMemory());
+    RaftServerConfigKeys.Log.setQueueElementLimit(
+        properties, config.getLog().getQueueElementLimit());
+    RaftServerConfigKeys.Log.setQueueByteLimit(properties, 
config.getLog().getQueueByteLimit());
+    RaftServerConfigKeys.Log.setPurgeGap(properties, 
config.getLog().getPurgeGap());
+    RaftServerConfigKeys.Log.setPurgeUptoSnapshotIndex(
+        properties, config.getLog().isPurgeUptoSnapshotIndex());
+    RaftServerConfigKeys.Log.setSegmentSizeMax(properties, 
config.getLog().getSegmentSizeMax());
+    RaftServerConfigKeys.Log.setSegmentCacheNumMax(
+        properties, config.getLog().getSegmentCacheNumMax());
+    RaftServerConfigKeys.Log.setSegmentCacheSizeMax(
+        properties, config.getLog().getSegmentCacheSizeMax());
+    RaftServerConfigKeys.Log.setPreallocatedSize(properties, 
config.getLog().getPreallocatedSize());
+    RaftServerConfigKeys.Log.setWriteBufferSize(properties, 
config.getLog().getWriteBufferSize());
+    RaftServerConfigKeys.Log.setForceSyncNum(properties, 
config.getLog().getForceSyncNum());
+    RaftServerConfigKeys.Log.setUnsafeFlushEnabled(
+        properties, config.getLog().isUnsafeFlushEnabled());
+  }
 }
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
index 3604d29059..69b23d3691 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
 import 
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
 import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
 import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
@@ -63,9 +64,9 @@ class StandAloneConsensus implements IConsensus {
   private final Map<ConsensusGroupId, StandAloneServerImpl> stateMachineMap =
       new ConcurrentHashMap<>();
 
-  public StandAloneConsensus(TEndPoint thisNode, File storageDir, Registry 
registry) {
-    this.thisNode = thisNode;
-    this.storageDir = storageDir;
+  public StandAloneConsensus(ConsensusConfig config, Registry registry) {
+    this.thisNode = config.getThisNode();
+    this.storageDir = new File(config.getStorageDir());
     this.registry = registry;
   }
 
diff --git 
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
 
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
index 770031bee1..045c3c44d8 100644
--- 
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
+++ 
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
 import org.apache.iotdb.consensus.multileader.logdispatcher.IndexController;
 import org.apache.iotdb.consensus.multileader.thrift.TLogType;
 import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
@@ -100,8 +101,10 @@ public class MultiLeaderConsensusTest {
           (MultiLeaderConsensus)
               ConsensusFactory.getConsensusImpl(
                       ConsensusFactory.MultiLeaderConsensus,
-                      peers.get(i).getEndpoint(),
-                      peersStorage.get(i),
+                      ConsensusConfig.newBuilder()
+                          .setThisNode(peers.get(i).getEndpoint())
+                          .setStorageDir(peersStorage.get(i).getAbsolutePath())
+                          .build(),
                       groupId -> stateMachines.get(finalI))
                   .orElseThrow(
                       () ->
diff --git 
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
 
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
index cba5147872..cbf015257e 100644
--- 
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
+++ 
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.consensus.multileader.logdispatcher;
 
-import org.apache.iotdb.consensus.multileader.conf.MultiLeaderConsensusConfig;
+import org.apache.iotdb.consensus.config.MultiLeaderConfig;
 
 import org.apache.ratis.util.FileUtils;
 import org.junit.After;
@@ -39,6 +39,7 @@ public class SyncStatusTest {
 
   private static final File storageDir = new File("target" + 
java.io.File.separator + "test");
   private static final String prefix = "version";
+  private static final MultiLeaderConfig config = new 
MultiLeaderConfig.Builder().build();
 
   @Before
   public void setUp() throws IOException {
@@ -56,22 +57,22 @@ public class SyncStatusTest {
     IndexController controller = new 
IndexController(storageDir.getAbsolutePath(), prefix, true);
     Assert.assertEquals(0, controller.getCurrentIndex());
 
-    SyncStatus status = new SyncStatus(controller);
+    SyncStatus status = new SyncStatus(controller, config);
     List<PendingBatch> batchList = new ArrayList<>();
 
-    for (long i = 0; i < MultiLeaderConsensusConfig.MAX_PENDING_BATCH; i++) {
+    for (long i = 0; i < config.getReplication().getMaxPendingBatch(); i++) {
       PendingBatch batch = new PendingBatch(i, i, Collections.emptyList());
       batchList.add(batch);
       status.addNextBatch(batch);
     }
 
-    for (int i = 0; i < MultiLeaderConsensusConfig.MAX_PENDING_BATCH; i++) {
+    for (int i = 0; i < config.getReplication().getMaxPendingBatch(); i++) {
       status.removeBatch(batchList.get(i));
       Assert.assertEquals(
-          MultiLeaderConsensusConfig.MAX_PENDING_BATCH - 1 - i, 
status.getPendingBatches().size());
+          config.getReplication().getMaxPendingBatch() - 1 - i, 
status.getPendingBatches().size());
       Assert.assertEquals(i, controller.getCurrentIndex());
       Assert.assertEquals(
-          MultiLeaderConsensusConfig.MAX_PENDING_BATCH, 
status.getNextSendingIndex());
+          config.getReplication().getMaxPendingBatch(), 
status.getNextSendingIndex());
     }
   }
 
@@ -82,29 +83,29 @@ public class SyncStatusTest {
     Assert.assertEquals(0, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
-    SyncStatus status = new SyncStatus(controller);
+    SyncStatus status = new SyncStatus(controller, config);
     List<PendingBatch> batchList = new ArrayList<>();
 
-    for (long i = 0; i < MultiLeaderConsensusConfig.MAX_PENDING_BATCH; i++) {
+    for (long i = 0; i < config.getReplication().getMaxPendingBatch(); i++) {
       PendingBatch batch = new PendingBatch(i, i, Collections.emptyList());
       batchList.add(batch);
       status.addNextBatch(batch);
     }
 
-    for (int i = 0; i < MultiLeaderConsensusConfig.MAX_PENDING_BATCH - 1; i++) 
{
-      
status.removeBatch(batchList.get(MultiLeaderConsensusConfig.MAX_PENDING_BATCH - 
1 - i));
+    for (int i = 0; i < config.getReplication().getMaxPendingBatch() - 1; i++) 
{
+      
status.removeBatch(batchList.get(config.getReplication().getMaxPendingBatch() - 
1 - i));
       Assert.assertEquals(
-          MultiLeaderConsensusConfig.MAX_PENDING_BATCH, 
status.getPendingBatches().size());
+          config.getReplication().getMaxPendingBatch(), 
status.getPendingBatches().size());
       Assert.assertEquals(0, controller.getCurrentIndex());
       Assert.assertEquals(
-          MultiLeaderConsensusConfig.MAX_PENDING_BATCH, 
status.getNextSendingIndex());
+          config.getReplication().getMaxPendingBatch(), 
status.getNextSendingIndex());
     }
 
     status.removeBatch(batchList.get(0));
     Assert.assertEquals(0, status.getPendingBatches().size());
     Assert.assertEquals(
-        MultiLeaderConsensusConfig.MAX_PENDING_BATCH - 1, 
controller.getCurrentIndex());
-    Assert.assertEquals(MultiLeaderConsensusConfig.MAX_PENDING_BATCH, 
status.getNextSendingIndex());
+        config.getReplication().getMaxPendingBatch() - 1, 
controller.getCurrentIndex());
+    Assert.assertEquals(config.getReplication().getMaxPendingBatch(), 
status.getNextSendingIndex());
   }
 
   /** Confirm success first from front to back, then back to front */
@@ -114,39 +115,39 @@ public class SyncStatusTest {
     Assert.assertEquals(0, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
-    SyncStatus status = new SyncStatus(controller);
+    SyncStatus status = new SyncStatus(controller, config);
     List<PendingBatch> batchList = new ArrayList<>();
 
-    for (long i = 0; i < MultiLeaderConsensusConfig.MAX_PENDING_BATCH; i++) {
+    for (long i = 0; i < config.getReplication().getMaxPendingBatch(); i++) {
       PendingBatch batch = new PendingBatch(i, i, Collections.emptyList());
       batchList.add(batch);
       status.addNextBatch(batch);
     }
 
-    for (int i = 0; i < MultiLeaderConsensusConfig.MAX_PENDING_BATCH / 2; i++) 
{
+    for (int i = 0; i < config.getReplication().getMaxPendingBatch() / 2; i++) 
{
       status.removeBatch(batchList.get(i));
       Assert.assertEquals(
-          MultiLeaderConsensusConfig.MAX_PENDING_BATCH - 1 - i, 
status.getPendingBatches().size());
+          config.getReplication().getMaxPendingBatch() - 1 - i, 
status.getPendingBatches().size());
       Assert.assertEquals(i, controller.getCurrentIndex());
       Assert.assertEquals(
-          MultiLeaderConsensusConfig.MAX_PENDING_BATCH, 
status.getNextSendingIndex());
+          config.getReplication().getMaxPendingBatch(), 
status.getNextSendingIndex());
     }
 
-    for (int i = MultiLeaderConsensusConfig.MAX_PENDING_BATCH / 2 + 1;
-        i < MultiLeaderConsensusConfig.MAX_PENDING_BATCH;
+    for (int i = config.getReplication().getMaxPendingBatch() / 2 + 1;
+        i < config.getReplication().getMaxPendingBatch();
         i++) {
       status.removeBatch(batchList.get(i));
       Assert.assertEquals(
-          MultiLeaderConsensusConfig.MAX_PENDING_BATCH / 2, 
status.getPendingBatches().size());
+          config.getReplication().getMaxPendingBatch() / 2, 
status.getPendingBatches().size());
       Assert.assertEquals(
-          MultiLeaderConsensusConfig.MAX_PENDING_BATCH, 
status.getNextSendingIndex());
+          config.getReplication().getMaxPendingBatch(), 
status.getNextSendingIndex());
     }
 
-    
status.removeBatch(batchList.get(MultiLeaderConsensusConfig.MAX_PENDING_BATCH / 
2));
+    
status.removeBatch(batchList.get(config.getReplication().getMaxPendingBatch() / 
2));
     Assert.assertEquals(0, status.getPendingBatches().size());
     Assert.assertEquals(
-        MultiLeaderConsensusConfig.MAX_PENDING_BATCH - 1, 
controller.getCurrentIndex());
-    Assert.assertEquals(MultiLeaderConsensusConfig.MAX_PENDING_BATCH, 
status.getNextSendingIndex());
+        config.getReplication().getMaxPendingBatch() - 1, 
controller.getCurrentIndex());
+    Assert.assertEquals(config.getReplication().getMaxPendingBatch(), 
status.getNextSendingIndex());
   }
 
   /** Test Blocking while addNextBatch */
@@ -155,22 +156,22 @@ public class SyncStatusTest {
     IndexController controller = new 
IndexController(storageDir.getAbsolutePath(), prefix, true);
     Assert.assertEquals(0, controller.getCurrentIndex());
 
-    SyncStatus status = new SyncStatus(controller);
+    SyncStatus status = new SyncStatus(controller, config);
     List<PendingBatch> batchList = new ArrayList<>();
 
-    for (long i = 0; i < MultiLeaderConsensusConfig.MAX_PENDING_BATCH; i++) {
+    for (long i = 0; i < config.getReplication().getMaxPendingBatch(); i++) {
       PendingBatch batch = new PendingBatch(i, i, Collections.emptyList());
       batchList.add(batch);
       status.addNextBatch(batch);
     }
 
-    for (int i = 0; i < MultiLeaderConsensusConfig.MAX_PENDING_BATCH - 1; i++) 
{
-      
status.removeBatch(batchList.get(MultiLeaderConsensusConfig.MAX_PENDING_BATCH - 
1 - i));
+    for (int i = 0; i < config.getReplication().getMaxPendingBatch() - 1; i++) 
{
+      
status.removeBatch(batchList.get(config.getReplication().getMaxPendingBatch() - 
1 - i));
       Assert.assertEquals(
-          MultiLeaderConsensusConfig.MAX_PENDING_BATCH, 
status.getPendingBatches().size());
+          config.getReplication().getMaxPendingBatch(), 
status.getPendingBatches().size());
       Assert.assertEquals(0, controller.getCurrentIndex());
       Assert.assertEquals(
-          MultiLeaderConsensusConfig.MAX_PENDING_BATCH, 
status.getNextSendingIndex());
+          config.getReplication().getMaxPendingBatch(), 
status.getNextSendingIndex());
     }
 
     CompletableFuture<Boolean> future =
@@ -178,8 +179,8 @@ public class SyncStatusTest {
             () -> {
               PendingBatch batch =
                   new PendingBatch(
-                      MultiLeaderConsensusConfig.MAX_PENDING_BATCH,
-                      MultiLeaderConsensusConfig.MAX_PENDING_BATCH,
+                      config.getReplication().getMaxPendingBatch(),
+                      config.getReplication().getMaxPendingBatch(),
                       Collections.emptyList());
               batchList.add(batch);
               try {
@@ -198,14 +199,14 @@ public class SyncStatusTest {
     Assert.assertTrue(future.get());
     Assert.assertEquals(1, status.getPendingBatches().size());
     Assert.assertEquals(
-        MultiLeaderConsensusConfig.MAX_PENDING_BATCH - 1, 
controller.getCurrentIndex());
+        config.getReplication().getMaxPendingBatch() - 1, 
controller.getCurrentIndex());
     Assert.assertEquals(
-        MultiLeaderConsensusConfig.MAX_PENDING_BATCH + 1, 
status.getNextSendingIndex());
+        config.getReplication().getMaxPendingBatch() + 1, 
status.getNextSendingIndex());
 
-    
status.removeBatch(batchList.get(MultiLeaderConsensusConfig.MAX_PENDING_BATCH));
+    
status.removeBatch(batchList.get(config.getReplication().getMaxPendingBatch()));
     Assert.assertEquals(0, status.getPendingBatches().size());
-    Assert.assertEquals(MultiLeaderConsensusConfig.MAX_PENDING_BATCH, 
controller.getCurrentIndex());
+    Assert.assertEquals(config.getReplication().getMaxPendingBatch(), 
controller.getCurrentIndex());
     Assert.assertEquals(
-        MultiLeaderConsensusConfig.MAX_PENDING_BATCH + 1, 
status.getNextSendingIndex());
+        config.getReplication().getMaxPendingBatch() + 1, 
status.getNextSendingIndex());
   }
 }
diff --git 
a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
 
b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
index b114c2305a..063c798722 100644
--- 
a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
+++ 
b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
 
 import org.apache.ratis.util.FileUtils;
 import org.junit.After;
@@ -63,8 +64,10 @@ public class RatisConsensusTest {
       servers.add(
           ConsensusFactory.getConsensusImpl(
                   ConsensusFactory.RatisConsensus,
-                  peers.get(i).getEndpoint(),
-                  peersStorage.get(i),
+                  ConsensusConfig.newBuilder()
+                      .setThisNode(peers.get(i).getEndpoint())
+                      .setStorageDir(peersStorage.get(i).getAbsolutePath())
+                      .build(),
                   groupId -> stateMachines.get(finalI))
               .orElseThrow(
                   () ->
diff --git 
a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java
 
b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java
index de4442ed79..2dd7cd9fbc 100644
--- 
a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java
+++ 
b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
 import 
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
 
 import org.apache.ratis.util.FileUtils;
@@ -45,8 +46,10 @@ public class RecoveryTest {
     consensusImpl =
         ConsensusFactory.getConsensusImpl(
                 ConsensusFactory.StandAloneConsensus,
-                new TEndPoint("0.0.0.0", 9000),
-                new File("target" + java.io.File.separator + "recovery"),
+                ConsensusConfig.newBuilder()
+                    .setThisNode(new TEndPoint("0.0.0.0", 9000))
+                    .setStorageDir("target" + java.io.File.separator + 
"recovery")
+                    .build(),
                 gid -> new EmptyStateMachine())
             .orElseThrow(
                 () ->
diff --git 
a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
 
b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
index 5c3ae96453..2973fe7f05 100644
--- 
a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
+++ 
b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
@@ -34,6 +34,7 @@ import 
org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
 import 
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
 import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
 import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
@@ -123,8 +124,10 @@ public class StandAloneConsensusTest {
     consensusImpl =
         ConsensusFactory.getConsensusImpl(
                 ConsensusFactory.StandAloneConsensus,
-                new TEndPoint("0.0.0.0", 6667),
-                new File("target" + java.io.File.separator + "standalone"),
+                ConsensusConfig.newBuilder()
+                    .setThisNode(new TEndPoint("0.0.0.0", 6667))
+                    .setStorageDir("target" + java.io.File.separator + 
"standalone")
+                    .build(),
                 gid -> {
                   switch (gid.getType()) {
                     case SchemaRegion:
diff --git 
a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index 2007f1af59..4832d99f26 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -23,13 +23,14 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
+import org.apache.iotdb.consensus.config.MultiLeaderConfig;
+import org.apache.iotdb.consensus.config.MultiLeaderConfig.RPC;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.consensus.statemachine.DataRegionStateMachine;
 import org.apache.iotdb.db.engine.StorageEngineV2;
 
-import java.io.File;
-
 /**
  * We can use DataRegionConsensusImpl.getInstance() to obtain a consensus 
layer reference for
  * dataRegion's reading and writing
@@ -48,8 +49,26 @@ public class DataRegionConsensusImpl {
     private static final IConsensus INSTANCE =
         ConsensusFactory.getConsensusImpl(
                 conf.getDataRegionConsensusProtocolClass(),
-                new TEndPoint(conf.getInternalIp(), 
conf.getDataRegionConsensusPort()),
-                new File(conf.getDataRegionConsensusDir()),
+                ConsensusConfig.newBuilder()
+                    .setThisNode(
+                        new TEndPoint(conf.getInternalIp(), 
conf.getDataRegionConsensusPort()))
+                    .setStorageDir(conf.getDataRegionConsensusDir())
+                    .setMultiLeaderConfig(
+                        MultiLeaderConfig.newBuilder()
+                            .setRpc(
+                                RPC.newBuilder()
+                                    
.setConnectionTimeoutInMs(conf.getConnectionTimeoutInMS())
+                                    .setRpcMaxConcurrentClientNum(
+                                        conf.getRpcMaxConcurrentClientNum())
+                                    .setRpcThriftCompressionEnabled(
+                                        conf.isRpcThriftCompressionEnable())
+                                    .setSelectorNumOfClientManager(
+                                        conf.getSelectorNumOfClientManager())
+                                    .setThriftServerAwaitTimeForStopService(
+                                        
conf.getThriftServerAwaitTimeForStopService())
+                                    .build())
+                            .build())
+                    .build(),
                 gid ->
                     new DataRegionStateMachine(
                         
StorageEngineV2.getInstance().getDataRegion((DataRegionId) gid)))
diff --git 
a/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
index 1bde912992..177d909c21 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
@@ -23,13 +23,12 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.consensus.statemachine.SchemaRegionStateMachine;
 import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
 
-import java.io.File;
-
 /**
  * We can use SchemaRegionConsensusImpl.getInstance() to obtain a consensus 
layer reference for
  * schemaRegion's reading and writing
@@ -48,8 +47,11 @@ public class SchemaRegionConsensusImpl {
     private static final IConsensus INSTANCE =
         ConsensusFactory.getConsensusImpl(
                 conf.getSchemaRegionConsensusProtocolClass(),
-                new TEndPoint(conf.getInternalIp(), 
conf.getSchemaRegionConsensusPort()),
-                new File(conf.getSchemaRegionConsensusDir()),
+                ConsensusConfig.newBuilder()
+                    .setThisNode(
+                        new TEndPoint(conf.getInternalIp(), 
conf.getSchemaRegionConsensusPort()))
+                    .setStorageDir(conf.getSchemaRegionConsensusDir())
+                    .build(),
                 gid ->
                     new SchemaRegionStateMachine(
                         
SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) gid)))

Reply via email to