This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 37ecc6e166 [IOTDB-3195] Added a configuration interface for the
consensus layer (#6081)
37ecc6e166 is described below
commit 37ecc6e166ab5296c444260b9dd974fb78a52d5e
Author: Potato <[email protected]>
AuthorDate: Wed Jun 1 22:48:16 2022 +0800
[IOTDB-3195] Added a configuration interface for the consensus layer (#6081)
---
.../iotdb/confignode/manager/ConsensusManager.java | 8 +-
.../apache/iotdb/consensus/ConsensusFactory.java | 11 +-
.../iotdb/consensus/config/ConsensusConfig.java | 97 +++
.../iotdb/consensus/config/MultiLeaderConfig.java | 257 ++++++++
.../apache/iotdb/consensus/config/RatisConfig.java | 695 +++++++++++++++++++++
.../multileader/MultiLeaderConsensus.java | 17 +-
.../multileader/MultiLeaderServerImpl.java | 15 +-
.../multileader/client/DispatchLogHandler.java | 7 +-
.../client/MultiLeaderConsensusClientPool.java | 16 +-
.../conf/MultiLeaderConsensusConfig.java | 40 --
.../multileader/logdispatcher/LogDispatcher.java | 39 +-
.../multileader/logdispatcher/SyncStatus.java | 8 +-
.../multileader/service/MultiLeaderRPCService.java | 12 +-
.../iotdb/consensus/ratis/RatisConsensus.java | 30 +-
.../org/apache/iotdb/consensus/ratis/Utils.java | 65 ++
.../consensus/standalone/StandAloneConsensus.java | 7 +-
.../multileader/MultiLeaderConsensusTest.java | 7 +-
.../multileader/logdispatcher/SyncStatusTest.java | 79 +--
.../iotdb/consensus/ratis/RatisConsensusTest.java | 7 +-
.../iotdb/consensus/standalone/RecoveryTest.java | 7 +-
.../standalone/StandAloneConsensusTest.java | 7 +-
.../db/consensus/DataRegionConsensusImpl.java | 27 +-
.../db/consensus/SchemaRegionConsensusImpl.java | 10 +-
23 files changed, 1299 insertions(+), 169 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
index 702f5cfa05..918c4b693f 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
@@ -35,12 +35,12 @@ import org.apache.iotdb.consensus.IConsensus;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -88,8 +88,10 @@ public class ConsensusManager {
consensusImpl =
ConsensusFactory.getConsensusImpl(
conf.getConfigNodeConsensusProtocolClass(),
- new TEndPoint(conf.getRpcAddress(), conf.getConsensusPort()),
- new File(conf.getConsensusDir()),
+ ConsensusConfig.newBuilder()
+ .setThisNode(new TEndPoint(conf.getRpcAddress(),
conf.getConsensusPort()))
+ .setStorageDir(conf.getConsensusDir())
+ .build(),
gid -> stateMachine)
.orElseThrow(
() ->
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
b/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
index 15e2492b2e..8146fcf3a4 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
@@ -19,12 +19,11 @@
package org.apache.iotdb.consensus;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Optional;
@@ -42,15 +41,13 @@ public class ConsensusFactory {
private static final Logger logger =
LoggerFactory.getLogger(ConsensusFactory.class);
public static Optional<IConsensus> getConsensusImpl(
- String className, TEndPoint endpoint, File storageDir,
IStateMachine.Registry registry) {
+ String className, ConsensusConfig config, IStateMachine.Registry
registry) {
try {
Class<?> executor = Class.forName(className);
Constructor<?> executorConstructor =
- executor.getDeclaredConstructor(
- TEndPoint.class, File.class, IStateMachine.Registry.class);
+ executor.getDeclaredConstructor(ConsensusConfig.class,
IStateMachine.Registry.class);
executorConstructor.setAccessible(true);
- return Optional.of(
- (IConsensus) executorConstructor.newInstance(endpoint, storageDir,
registry));
+ return Optional.of((IConsensus) executorConstructor.newInstance(config,
registry));
} catch (ClassNotFoundException
| NoSuchMethodException
| InstantiationException
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
b/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
new file mode 100644
index 0000000000..be3b4bf342
--- /dev/null
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.config;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+
+public class ConsensusConfig {
+
+ private final TEndPoint thisNode;
+ private final String storageDir;
+ private final RatisConfig ratisConfig;
+ private final MultiLeaderConfig multiLeaderConfig;
+
+ private ConsensusConfig(
+ TEndPoint thisNode,
+ String storageDir,
+ RatisConfig ratisConfig,
+ MultiLeaderConfig multiLeaderConfig) {
+ this.thisNode = thisNode;
+ this.storageDir = storageDir;
+ this.ratisConfig = ratisConfig;
+ this.multiLeaderConfig = multiLeaderConfig;
+ }
+
+ public TEndPoint getThisNode() {
+ return thisNode;
+ }
+
+ public String getStorageDir() {
+ return storageDir;
+ }
+
+ public RatisConfig getRatisConfig() {
+ return ratisConfig;
+ }
+
+ public MultiLeaderConfig getMultiLeaderConfig() {
+ return multiLeaderConfig;
+ }
+
+ public static ConsensusConfig.Builder newBuilder() {
+ return new ConsensusConfig.Builder();
+ }
+
+ public static class Builder {
+
+ private TEndPoint thisNode;
+ private String storageDir;
+ private RatisConfig ratisConfig;
+ private MultiLeaderConfig multiLeaderConfig;
+
+ public ConsensusConfig build() {
+ return new ConsensusConfig(
+ thisNode,
+ storageDir,
+ ratisConfig != null ? ratisConfig : RatisConfig.newBuilder().build(),
+ multiLeaderConfig != null ? multiLeaderConfig :
MultiLeaderConfig.newBuilder().build());
+ }
+
+ public Builder setThisNode(TEndPoint thisNode) {
+ this.thisNode = thisNode;
+ return this;
+ }
+
+ public Builder setStorageDir(String storageDir) {
+ this.storageDir = storageDir;
+ return this;
+ }
+
+ public Builder setRatisConfig(RatisConfig ratisConfig) {
+ this.ratisConfig = ratisConfig;
+ return this;
+ }
+
+ public Builder setMultiLeaderConfig(MultiLeaderConfig multiLeaderConfig) {
+ this.multiLeaderConfig = multiLeaderConfig;
+ return this;
+ }
+ }
+}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
new file mode 100644
index 0000000000..b98df9f822
--- /dev/null
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.config;
+
+import java.util.concurrent.TimeUnit;
+
+public class MultiLeaderConfig {
+
+ private final RPC rpc;
+ private final Replication replication;
+
+ private MultiLeaderConfig(RPC rpc, Replication replication) {
+ this.rpc = rpc;
+ this.replication = replication;
+ }
+
+ public RPC getRpc() {
+ return rpc;
+ }
+
+ public Replication getReplication() {
+ return replication;
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+
+ private RPC rpc;
+ private Replication replication;
+
+ public MultiLeaderConfig build() {
+ return new MultiLeaderConfig(
+ rpc != null ? rpc : new RPC.Builder().build(),
+ replication != null ? replication : new
Replication.Builder().build());
+ }
+
+ public Builder setRpc(RPC rpc) {
+ this.rpc = rpc;
+ return this;
+ }
+
+ public Builder setReplication(Replication replication) {
+ this.replication = replication;
+ return this;
+ }
+ }
+
+ public static class RPC {
+ private final int rpcMaxConcurrentClientNum;
+ private final int thriftServerAwaitTimeForStopService;
+ private final boolean isRpcThriftCompressionEnabled;
+ private final int selectorNumOfClientManager;
+ private final int connectionTimeoutInMs;
+
+ private RPC(
+ int rpcMaxConcurrentClientNum,
+ int thriftServerAwaitTimeForStopService,
+ boolean isRpcThriftCompressionEnabled,
+ int selectorNumOfClientManager,
+ int connectionTimeoutInMs) {
+ this.rpcMaxConcurrentClientNum = rpcMaxConcurrentClientNum;
+ this.thriftServerAwaitTimeForStopService =
thriftServerAwaitTimeForStopService;
+ this.isRpcThriftCompressionEnabled = isRpcThriftCompressionEnabled;
+ this.selectorNumOfClientManager = selectorNumOfClientManager;
+ this.connectionTimeoutInMs = connectionTimeoutInMs;
+ }
+
+ public int getRpcMaxConcurrentClientNum() {
+ return rpcMaxConcurrentClientNum;
+ }
+
+ public int getThriftServerAwaitTimeForStopService() {
+ return thriftServerAwaitTimeForStopService;
+ }
+
+ public boolean isRpcThriftCompressionEnabled() {
+ return isRpcThriftCompressionEnabled;
+ }
+
+ public int getSelectorNumOfClientManager() {
+ return selectorNumOfClientManager;
+ }
+
+ public int getConnectionTimeoutInMs() {
+ return connectionTimeoutInMs;
+ }
+
+ public static RPC.Builder newBuilder() {
+ return new RPC.Builder();
+ }
+
+ public static class Builder {
+ private int rpcMaxConcurrentClientNum = 65535;
+ private int thriftServerAwaitTimeForStopService = 60;
+ private boolean isRpcThriftCompressionEnabled = false;
+ private int selectorNumOfClientManager = 1;
+ private int connectionTimeoutInMs = (int) TimeUnit.SECONDS.toMillis(20);
+
+ public RPC.Builder setRpcMaxConcurrentClientNum(int
rpcMaxConcurrentClientNum) {
+ this.rpcMaxConcurrentClientNum = rpcMaxConcurrentClientNum;
+ return this;
+ }
+
+ public RPC.Builder setThriftServerAwaitTimeForStopService(
+ int thriftServerAwaitTimeForStopService) {
+ this.thriftServerAwaitTimeForStopService =
thriftServerAwaitTimeForStopService;
+ return this;
+ }
+
+ public RPC.Builder setRpcThriftCompressionEnabled(boolean
rpcThriftCompressionEnabled) {
+ isRpcThriftCompressionEnabled = rpcThriftCompressionEnabled;
+ return this;
+ }
+
+ public RPC.Builder setSelectorNumOfClientManager(int
selectorNumOfClientManager) {
+ this.selectorNumOfClientManager = selectorNumOfClientManager;
+ return this;
+ }
+
+ public RPC.Builder setConnectionTimeoutInMs(int connectionTimeoutInMs) {
+ this.connectionTimeoutInMs = connectionTimeoutInMs;
+ return this;
+ }
+
+ public RPC build() {
+ return new RPC(
+ rpcMaxConcurrentClientNum,
+ thriftServerAwaitTimeForStopService,
+ isRpcThriftCompressionEnabled,
+ selectorNumOfClientManager,
+ connectionTimeoutInMs);
+ }
+ }
+ }
+
+ public static class Replication {
+ private final int maxPendingRequestNumPerNode;
+ private final int maxRequestPerBatch;
+ private final int maxPendingBatch;
+ private final int maxWaitingTimeForAccumulatingBatchInMs;
+ private final long basicRetryWaitTimeMs;
+ private final long maxRetryWaitTimeMs;
+
+ private Replication(
+ int maxPendingRequestNumPerNode,
+ int maxRequestPerBatch,
+ int maxPendingBatch,
+ int maxWaitingTimeForAccumulatingBatchInMs,
+ long basicRetryWaitTimeMs,
+ long maxRetryWaitTimeMs) {
+ this.maxPendingRequestNumPerNode = maxPendingRequestNumPerNode;
+ this.maxRequestPerBatch = maxRequestPerBatch;
+ this.maxPendingBatch = maxPendingBatch;
+ this.maxWaitingTimeForAccumulatingBatchInMs =
maxWaitingTimeForAccumulatingBatchInMs;
+ this.basicRetryWaitTimeMs = basicRetryWaitTimeMs;
+ this.maxRetryWaitTimeMs = maxRetryWaitTimeMs;
+ }
+
+ public int getMaxPendingRequestNumPerNode() {
+ return maxPendingRequestNumPerNode;
+ }
+
+ public int getMaxRequestPerBatch() {
+ return maxRequestPerBatch;
+ }
+
+ public int getMaxPendingBatch() {
+ return maxPendingBatch;
+ }
+
+ public int getMaxWaitingTimeForAccumulatingBatchInMs() {
+ return maxWaitingTimeForAccumulatingBatchInMs;
+ }
+
+ public long getBasicRetryWaitTimeMs() {
+ return basicRetryWaitTimeMs;
+ }
+
+ public long getMaxRetryWaitTimeMs() {
+ return maxRetryWaitTimeMs;
+ }
+
+ public static Replication.Builder newBuilder() {
+ return new Replication.Builder();
+ }
+
+ public static class Builder {
+ private int maxPendingRequestNumPerNode = 1000;
+ private int maxRequestPerBatch = 100;
+ private int maxPendingBatch = 50;
+ private int maxWaitingTimeForAccumulatingBatchInMs = 10;
+ private long basicRetryWaitTimeMs = TimeUnit.MILLISECONDS.toMillis(100);
+ private long maxRetryWaitTimeMs = TimeUnit.SECONDS.toMillis(20);
+
+ public Replication.Builder setMaxPendingRequestNumPerNode(int
maxPendingRequestNumPerNode) {
+ this.maxPendingRequestNumPerNode = maxPendingRequestNumPerNode;
+ return this;
+ }
+
+ public Replication.Builder setMaxRequestPerBatch(int maxRequestPerBatch)
{
+ this.maxRequestPerBatch = maxRequestPerBatch;
+ return this;
+ }
+
+ public Replication.Builder setMaxPendingBatch(int maxPendingBatch) {
+ this.maxPendingBatch = maxPendingBatch;
+ return this;
+ }
+
+ public Replication.Builder setMaxWaitingTimeForAccumulatingBatchInMs(
+ int maxWaitingTimeForAccumulatingBatchInMs) {
+ this.maxWaitingTimeForAccumulatingBatchInMs =
maxWaitingTimeForAccumulatingBatchInMs;
+ return this;
+ }
+
+ public Replication.Builder setBasicRetryWaitTimeMs(long
basicRetryWaitTimeMs) {
+ this.basicRetryWaitTimeMs = basicRetryWaitTimeMs;
+ return this;
+ }
+
+ public Replication.Builder setMaxRetryWaitTimeMs(long
maxRetryWaitTimeMs) {
+ this.maxRetryWaitTimeMs = maxRetryWaitTimeMs;
+ return this;
+ }
+
+ public Replication build() {
+ return new Replication(
+ maxPendingRequestNumPerNode,
+ maxRequestPerBatch,
+ maxPendingBatch,
+ maxWaitingTimeForAccumulatingBatchInMs,
+ basicRetryWaitTimeMs,
+ maxRetryWaitTimeMs);
+ }
+ }
+ }
+}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
b/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
new file mode 100644
index 0000000000..048fd21d76
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
@@ -0,0 +1,695 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.config;
+
+import org.apache.ratis.grpc.GrpcConfigKeys.Server;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.TimeDuration;
+
+import java.util.concurrent.TimeUnit;
+
+public class RatisConfig {
+
+ private final Rpc rpc;
+ private final LeaderElection leaderElection;
+ private final Snapshot snapshot;
+ private final ThreadPool threadPool;
+ private final Log log;
+ private final Grpc grpc;
+
+ private RatisConfig(
+ Rpc rpc,
+ LeaderElection leaderElection,
+ Snapshot snapshot,
+ ThreadPool threadPool,
+ Log log,
+ Grpc grpc) {
+ this.rpc = rpc;
+ this.leaderElection = leaderElection;
+ this.snapshot = snapshot;
+ this.threadPool = threadPool;
+ this.log = log;
+ this.grpc = grpc;
+ }
+
+ public Rpc getRpc() {
+ return rpc;
+ }
+
+ public LeaderElection getLeaderElection() {
+ return leaderElection;
+ }
+
+ public Snapshot getSnapshot() {
+ return snapshot;
+ }
+
+ public ThreadPool getThreadPool() {
+ return threadPool;
+ }
+
+ public Log getLog() {
+ return log;
+ }
+
+ public Grpc getGrpc() {
+ return grpc;
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private Rpc rpc;
+ private LeaderElection leaderElection;
+ private Snapshot snapshot;
+ private ThreadPool threadPool;
+ private Log log;
+ private Grpc grpc;
+
+ public RatisConfig build() {
+ return new RatisConfig(
+ rpc != null ? rpc : Rpc.newBuilder().build(),
+ leaderElection != null ? leaderElection :
LeaderElection.newBuilder().build(),
+ snapshot != null ? snapshot : Snapshot.newBuilder().build(),
+ threadPool != null ? threadPool : ThreadPool.newBuilder().build(),
+ log != null ? log : Log.newBuilder().build(),
+ grpc != null ? grpc : Grpc.newBuilder().build());
+ }
+
+ public Builder setRpc(Rpc rpc) {
+ this.rpc = rpc;
+ return this;
+ }
+
+ public Builder setLeaderElection(LeaderElection leaderElection) {
+ this.leaderElection = leaderElection;
+ return this;
+ }
+
+ public Builder setSnapshot(Snapshot snapshot) {
+ this.snapshot = snapshot;
+ return this;
+ }
+
+ public Builder setThreadPool(ThreadPool threadPool) {
+ this.threadPool = threadPool;
+ return this;
+ }
+
+ public Builder setLog(Log log) {
+ this.log = log;
+ return this;
+ }
+
+ public Builder setGrpc(Grpc grpc) {
+ this.grpc = grpc;
+ return this;
+ }
+ }
+
+ /** server rpc timeout related */
+ public static class Rpc {
+ private final TimeDuration timeoutMin;
+ private final TimeDuration timeoutMax;
+ private final TimeDuration requestTimeout;
+ private final TimeDuration sleepTime;
+ private final TimeDuration slownessTimeout;
+
+ private Rpc(
+ TimeDuration timeoutMin,
+ TimeDuration timeoutMax,
+ TimeDuration requestTimeout,
+ TimeDuration sleepTime,
+ TimeDuration slownessTimeout) {
+ this.timeoutMin = timeoutMin;
+ this.timeoutMax = timeoutMax;
+ this.requestTimeout = requestTimeout;
+ this.sleepTime = sleepTime;
+ this.slownessTimeout = slownessTimeout;
+ }
+
+ public TimeDuration getTimeoutMin() {
+ return timeoutMin;
+ }
+
+ public TimeDuration getTimeoutMax() {
+ return timeoutMax;
+ }
+
+ public TimeDuration getRequestTimeout() {
+ return requestTimeout;
+ }
+
+ public TimeDuration getSleepTime() {
+ return sleepTime;
+ }
+
+ public TimeDuration getSlownessTimeout() {
+ return slownessTimeout;
+ }
+
+ public static Rpc.Builder newBuilder() {
+ return new Rpc.Builder();
+ }
+
+ public static class Builder {
+ private TimeDuration timeoutMin = TimeDuration.valueOf(2,
TimeUnit.SECONDS);
+ private TimeDuration timeoutMax = TimeDuration.valueOf(8,
TimeUnit.SECONDS);
+ private TimeDuration requestTimeout = TimeDuration.valueOf(20,
TimeUnit.SECONDS);
+ private TimeDuration sleepTime = TimeDuration.valueOf(1,
TimeUnit.SECONDS);
+ private TimeDuration slownessTimeout = TimeDuration.valueOf(10,
TimeUnit.MINUTES);
+
+ public Rpc build() {
+ return new Rpc(timeoutMin, timeoutMax, requestTimeout, sleepTime,
slownessTimeout);
+ }
+
+ public Rpc.Builder setTimeoutMin(TimeDuration timeoutMin) {
+ this.timeoutMin = timeoutMin;
+ return this;
+ }
+
+ public Rpc.Builder setTimeoutMax(TimeDuration timeoutMax) {
+ this.timeoutMax = timeoutMax;
+ return this;
+ }
+
+ public Rpc.Builder setRequestTimeout(TimeDuration requestTimeout) {
+ this.requestTimeout = requestTimeout;
+ return this;
+ }
+
+ public Rpc.Builder setSleepTime(TimeDuration sleepTime) {
+ this.sleepTime = sleepTime;
+ return this;
+ }
+
+ public Rpc.Builder setSlownessTimeout(TimeDuration slownessTimeout) {
+ this.slownessTimeout = slownessTimeout;
+ return this;
+ }
+ }
+ }
+
+ public static class LeaderElection {
+ private final TimeDuration leaderStepDownWaitTimeKey;
+ private final boolean preVote;
+
+ private LeaderElection(TimeDuration leaderStepDownWaitTimeKey, boolean
preVote) {
+ this.leaderStepDownWaitTimeKey = leaderStepDownWaitTimeKey;
+ this.preVote = preVote;
+ }
+
+ public TimeDuration getLeaderStepDownWaitTimeKey() {
+ return leaderStepDownWaitTimeKey;
+ }
+
+ public boolean isPreVote() {
+ return preVote;
+ }
+
+ public static LeaderElection.Builder newBuilder() {
+ return new LeaderElection.Builder();
+ }
+
+ public static class Builder {
+ private TimeDuration leaderStepDownWaitTimeKey =
TimeDuration.valueOf(30, TimeUnit.SECONDS);
+ private boolean preVote =
RaftServerConfigKeys.LeaderElection.PRE_VOTE_DEFAULT;
+
+ public LeaderElection build() {
+ return new LeaderElection(leaderStepDownWaitTimeKey, preVote);
+ }
+
+ public LeaderElection.Builder setLeaderStepDownWaitTimeKey(
+ TimeDuration leaderStepDownWaitTimeKey) {
+ this.leaderStepDownWaitTimeKey = leaderStepDownWaitTimeKey;
+ return this;
+ }
+
+ public LeaderElection.Builder setPreVote(boolean preVote) {
+ this.preVote = preVote;
+ return this;
+ }
+ }
+ }
+
+ public static class Snapshot {
+ private final boolean autoTriggerEnabled;
+ private final long creationGap;
+ private final long autoTriggerThreshold;
+ private final int retentionFileNum;
+
+ private Snapshot(
+ boolean autoTriggerEnabled,
+ long creationGap,
+ long autoTriggerThreshold,
+ int retentionFileNum) {
+ this.autoTriggerEnabled = autoTriggerEnabled;
+ this.creationGap = creationGap;
+ this.autoTriggerThreshold = autoTriggerThreshold;
+ this.retentionFileNum = retentionFileNum;
+ }
+
+ public boolean isAutoTriggerEnabled() {
+ return autoTriggerEnabled;
+ }
+
+ public long getCreationGap() {
+ return creationGap;
+ }
+
+ public long getAutoTriggerThreshold() {
+ return autoTriggerThreshold;
+ }
+
+ public int getRetentionFileNum() {
+ return retentionFileNum;
+ }
+
+ public static Snapshot.Builder newBuilder() {
+ return new Snapshot.Builder();
+ }
+
+ public static class Builder {
+ private boolean autoTriggerEnabled = true;
+ private long creationGap =
RaftServerConfigKeys.Snapshot.CREATION_GAP_DEFAULT;
+ private long autoTriggerThreshold =
+ RaftServerConfigKeys.Snapshot.AUTO_TRIGGER_THRESHOLD_DEFAULT;
+ private int retentionFileNum =
RaftServerConfigKeys.Snapshot.RETENTION_FILE_NUM_DEFAULT;
+
+ public Snapshot build() {
+ return new Snapshot(
+ autoTriggerEnabled, creationGap, autoTriggerThreshold,
retentionFileNum);
+ }
+
+ public Snapshot.Builder setAutoTriggerEnabled(boolean
autoTriggerEnabled) {
+ this.autoTriggerEnabled = autoTriggerEnabled;
+ return this;
+ }
+
+ public Snapshot.Builder setCreationGap(long creationGap) {
+ this.creationGap = creationGap;
+ return this;
+ }
+
+ public Snapshot.Builder setAutoTriggerThreshold(long
autoTriggerThreshold) {
+ this.autoTriggerThreshold = autoTriggerThreshold;
+ return this;
+ }
+
+ public Snapshot.Builder setRetentionFileNum(int retentionFileNum) {
+ this.retentionFileNum = retentionFileNum;
+ return this;
+ }
+ }
+ }
+
+ public static class ThreadPool {
+ private final boolean proxyCached;
+ private final int proxySize;
+ private final boolean serverCached;
+ private final int serverSize;
+ private final boolean clientCached;
+ private final int clientSize;
+
+ private ThreadPool(
+ boolean proxyCached,
+ int proxySize,
+ boolean serverCached,
+ int serverSize,
+ boolean clientCached,
+ int clientSize) {
+ this.proxyCached = proxyCached;
+ this.proxySize = proxySize;
+ this.serverCached = serverCached;
+ this.serverSize = serverSize;
+ this.clientCached = clientCached;
+ this.clientSize = clientSize;
+ }
+
+ public boolean isProxyCached() {
+ return proxyCached;
+ }
+
+ public int getProxySize() {
+ return proxySize;
+ }
+
+ public boolean isServerCached() {
+ return serverCached;
+ }
+
+ public int getServerSize() {
+ return serverSize;
+ }
+
+ public boolean isClientCached() {
+ return clientCached;
+ }
+
+ public int getClientSize() {
+ return clientSize;
+ }
+
+ public static ThreadPool.Builder newBuilder() {
+ return new ThreadPool.Builder();
+ }
+
+ public static class Builder {
+ private boolean proxyCached =
RaftServerConfigKeys.ThreadPool.PROXY_CACHED_DEFAULT;
+ private int proxySize =
RaftServerConfigKeys.ThreadPool.PROXY_SIZE_DEFAULT;
+ private boolean serverCached =
RaftServerConfigKeys.ThreadPool.SERVER_CACHED_DEFAULT;
+ private int serverSize =
RaftServerConfigKeys.ThreadPool.SERVER_SIZE_DEFAULT;
+ private boolean clientCached =
RaftServerConfigKeys.ThreadPool.CLIENT_CACHED_DEFAULT;
+ private int clientSize =
RaftServerConfigKeys.ThreadPool.CLIENT_SIZE_DEFAULT;
+
+ public ThreadPool build() {
+ return new ThreadPool(
+ proxyCached, proxySize, serverCached, serverSize, clientCached,
clientSize);
+ }
+
+ public ThreadPool.Builder setProxyCached(boolean proxyCached) {
+ this.proxyCached = proxyCached;
+ return this;
+ }
+
+ public ThreadPool.Builder setProxySize(int proxySize) {
+ this.proxySize = proxySize;
+ return this;
+ }
+
+ public ThreadPool.Builder setServerCached(boolean serverCached) {
+ this.serverCached = serverCached;
+ return this;
+ }
+
+ public ThreadPool.Builder setServerSize(int serverSize) {
+ this.serverSize = serverSize;
+ return this;
+ }
+
+ public ThreadPool.Builder setClientCached(boolean clientCached) {
+ this.clientCached = clientCached;
+ return this;
+ }
+
+ public ThreadPool.Builder setClientSize(int clientSize) {
+ this.clientSize = clientSize;
+ return this;
+ }
+ }
+ }
+
+ public static class Log {
+
+ private final boolean useMemory;
+ private final int queueElementLimit;
+ private final SizeInBytes queueByteLimit;
+ private final int purgeGap;
+ private final boolean purgeUptoSnapshotIndex;
+ private final SizeInBytes segmentSizeMax;
+ private final int segmentCacheNumMax;
+ private final SizeInBytes segmentCacheSizeMax;
+ private final SizeInBytes preallocatedSize;
+ private final SizeInBytes writeBufferSize;
+ private final int forceSyncNum;
+ private final boolean unsafeFlushEnabled;
+
+ private Log(
+ boolean useMemory,
+ int queueElementLimit,
+ SizeInBytes queueByteLimit,
+ int purgeGap,
+ boolean purgeUptoSnapshotIndex,
+ SizeInBytes segmentSizeMax,
+ int segmentCacheNumMax,
+ SizeInBytes segmentCacheSizeMax,
+ SizeInBytes preallocatedSize,
+ SizeInBytes writeBufferSize,
+ int forceSyncNum,
+ boolean unsafeFlushEnabled) {
+ this.useMemory = useMemory;
+ this.queueElementLimit = queueElementLimit;
+ this.queueByteLimit = queueByteLimit;
+ this.purgeGap = purgeGap;
+ this.purgeUptoSnapshotIndex = purgeUptoSnapshotIndex;
+ this.segmentSizeMax = segmentSizeMax;
+ this.segmentCacheNumMax = segmentCacheNumMax;
+ this.segmentCacheSizeMax = segmentCacheSizeMax;
+ this.preallocatedSize = preallocatedSize;
+ this.writeBufferSize = writeBufferSize;
+ this.forceSyncNum = forceSyncNum;
+ this.unsafeFlushEnabled = unsafeFlushEnabled;
+ }
+
+ public boolean isUseMemory() {
+ return useMemory;
+ }
+
+ public int getQueueElementLimit() {
+ return queueElementLimit;
+ }
+
+ public SizeInBytes getQueueByteLimit() {
+ return queueByteLimit;
+ }
+
+ public int getPurgeGap() {
+ return purgeGap;
+ }
+
+ public boolean isPurgeUptoSnapshotIndex() {
+ return purgeUptoSnapshotIndex;
+ }
+
+ public SizeInBytes getSegmentSizeMax() {
+ return segmentSizeMax;
+ }
+
+ public int getSegmentCacheNumMax() {
+ return segmentCacheNumMax;
+ }
+
+ public SizeInBytes getSegmentCacheSizeMax() {
+ return segmentCacheSizeMax;
+ }
+
+ public SizeInBytes getPreallocatedSize() {
+ return preallocatedSize;
+ }
+
+ public SizeInBytes getWriteBufferSize() {
+ return writeBufferSize;
+ }
+
+ public int getForceSyncNum() {
+ return forceSyncNum;
+ }
+
+ public boolean isUnsafeFlushEnabled() {
+ return unsafeFlushEnabled;
+ }
+
+ public static Log.Builder newBuilder() {
+ return new Log.Builder();
+ }
+
+ public static class Builder {
+ private boolean useMemory = false;
+ private int queueElementLimit = 4096;
+ private SizeInBytes queueByteLimit = SizeInBytes.valueOf("64MB");
+ private int purgeGap = 1024;
+ private boolean purgeUptoSnapshotIndex = false;
+ private SizeInBytes segmentSizeMax = SizeInBytes.valueOf("8MB");
+ private int segmentCacheNumMax = 6;
+ private SizeInBytes segmentCacheSizeMax = SizeInBytes.valueOf("200MB");
+ private SizeInBytes preallocatedSize = SizeInBytes.valueOf("4MB");
+ private SizeInBytes writeBufferSize = SizeInBytes.valueOf("64KB");
+ private int forceSyncNum = 128;
+ private boolean unsafeFlushEnabled = false;
+
+ public Log build() {
+ return new Log(
+ useMemory,
+ queueElementLimit,
+ queueByteLimit,
+ purgeGap,
+ purgeUptoSnapshotIndex,
+ segmentSizeMax,
+ segmentCacheNumMax,
+ segmentCacheSizeMax,
+ preallocatedSize,
+ writeBufferSize,
+ forceSyncNum,
+ unsafeFlushEnabled);
+ }
+
+ public Log.Builder setUseMemory(boolean useMemory) {
+ this.useMemory = useMemory;
+ return this;
+ }
+
+ public Log.Builder setQueueElementLimit(int queueElementLimit) {
+ this.queueElementLimit = queueElementLimit;
+ return this;
+ }
+
+ public Log.Builder setQueueByteLimit(SizeInBytes queueByteLimit) {
+ this.queueByteLimit = queueByteLimit;
+ return this;
+ }
+
+ public Log.Builder setPurgeGap(int purgeGap) {
+ this.purgeGap = purgeGap;
+ return this;
+ }
+
+ public Log.Builder setPurgeUptoSnapshotIndex(boolean
purgeUptoSnapshotIndex) {
+ this.purgeUptoSnapshotIndex = purgeUptoSnapshotIndex;
+ return this;
+ }
+
+ public Log.Builder setSegmentSizeMax(SizeInBytes segmentSizeMax) {
+ this.segmentSizeMax = segmentSizeMax;
+ return this;
+ }
+
+ public Log.Builder setSegmentCacheNumMax(int segmentCacheNumMax) {
+ this.segmentCacheNumMax = segmentCacheNumMax;
+ return this;
+ }
+
+ public Log.Builder setSegmentCacheSizeMax(SizeInBytes
segmentCacheSizeMax) {
+ this.segmentCacheSizeMax = segmentCacheSizeMax;
+ return this;
+ }
+
+ public Log.Builder setPreallocatedSize(SizeInBytes preallocatedSize) {
+ this.preallocatedSize = preallocatedSize;
+ return this;
+ }
+
+ public Log.Builder setWriteBufferSize(SizeInBytes writeBufferSize) {
+ this.writeBufferSize = writeBufferSize;
+ return this;
+ }
+
+ public Log.Builder setForceSyncNum(int forceSyncNum) {
+ this.forceSyncNum = forceSyncNum;
+ return this;
+ }
+
+ public Log.Builder setUnsafeFlushEnabled(boolean unsafeFlushEnabled) {
+ this.unsafeFlushEnabled = unsafeFlushEnabled;
+ return this;
+ }
+ }
+ }
+
+ public static class Grpc {
+ private final SizeInBytes messageSizeMax;
+ private final SizeInBytes flowControlWindow;
+ private final boolean asyncRequestThreadPoolCached;
+ private final int asyncRequestThreadPoolSize;
+ private final int leaderOutstandingAppendsMax;
+
+ private Grpc(
+ SizeInBytes messageSizeMax,
+ SizeInBytes flowControlWindow,
+ boolean asyncRequestThreadPoolCached,
+ int asyncRequestThreadPoolSize,
+ int leaderOutstandingAppendsMax) {
+ this.messageSizeMax = messageSizeMax;
+ this.flowControlWindow = flowControlWindow;
+ this.asyncRequestThreadPoolCached = asyncRequestThreadPoolCached;
+ this.asyncRequestThreadPoolSize = asyncRequestThreadPoolSize;
+ this.leaderOutstandingAppendsMax = leaderOutstandingAppendsMax;
+ }
+
+ public SizeInBytes getMessageSizeMax() {
+ return messageSizeMax;
+ }
+
+ public SizeInBytes getFlowControlWindow() {
+ return flowControlWindow;
+ }
+
+ public boolean isAsyncRequestThreadPoolCached() {
+ return asyncRequestThreadPoolCached;
+ }
+
+ public int getAsyncRequestThreadPoolSize() {
+ return asyncRequestThreadPoolSize;
+ }
+
+ public int getLeaderOutstandingAppendsMax() {
+ return leaderOutstandingAppendsMax;
+ }
+
+ public static Grpc.Builder newBuilder() {
+ return new Grpc.Builder();
+ }
+
+ public static class Builder {
+ private SizeInBytes messageSizeMax = SizeInBytes.valueOf("512MB");
+ private SizeInBytes flowControlWindow = SizeInBytes.valueOf("4MB");
+ private boolean asyncRequestThreadPoolCached =
+ Server.ASYNC_REQUEST_THREAD_POOL_CACHED_DEFAULT;
+ private int asyncRequestThreadPoolSize =
Server.ASYNC_REQUEST_THREAD_POOL_SIZE_DEFAULT;
+ private int leaderOutstandingAppendsMax =
Server.LEADER_OUTSTANDING_APPENDS_MAX_DEFAULT;
+
+ public Grpc build() {
+ return new Grpc(
+ messageSizeMax,
+ flowControlWindow,
+ asyncRequestThreadPoolCached,
+ asyncRequestThreadPoolSize,
+ leaderOutstandingAppendsMax);
+ }
+
+ public Grpc.Builder setMessageSizeMax(SizeInBytes messageSizeMax) {
+ this.messageSizeMax = messageSizeMax;
+ return this;
+ }
+
+ public Grpc.Builder setFlowControlWindow(SizeInBytes flowControlWindow) {
+ this.flowControlWindow = flowControlWindow;
+ return this;
+ }
+
+ public Grpc.Builder setAsyncRequestThreadPoolCached(boolean
asyncRequestThreadPoolCached) {
+ this.asyncRequestThreadPoolCached = asyncRequestThreadPoolCached;
+ return this;
+ }
+
+ public Grpc.Builder setAsyncRequestThreadPoolSize(int
asyncRequestThreadPoolSize) {
+ this.asyncRequestThreadPoolSize = asyncRequestThreadPoolSize;
+ return this;
+ }
+
+ public Grpc.Builder setLeaderOutstandingAppendsMax(int
leaderOutstandingAppendsMax) {
+ this.leaderOutstandingAppendsMax = leaderOutstandingAppendsMax;
+ return this;
+ }
+ }
+ }
+}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
index c9393ad642..30484bfa09 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
@@ -31,6 +31,8 @@ import
org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
+import org.apache.iotdb.consensus.config.MultiLeaderConfig;
import
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
@@ -64,12 +66,14 @@ public class MultiLeaderConsensus implements IConsensus {
new ConcurrentHashMap<>();
private final MultiLeaderRPCService service;
private final RegisterManager registerManager = new RegisterManager();
+ private final MultiLeaderConfig config;
- public MultiLeaderConsensus(TEndPoint thisNode, File storageDir, Registry
registry) {
- this.thisNode = thisNode;
- this.storageDir = storageDir;
+ public MultiLeaderConsensus(ConsensusConfig config, Registry registry) {
+ this.thisNode = config.getThisNode();
+ this.storageDir = new File(config.getStorageDir());
+ this.config = config.getMultiLeaderConfig();
this.registry = registry;
- this.service = new MultiLeaderRPCService(thisNode);
+ this.service = new MultiLeaderRPCService(thisNode,
config.getMultiLeaderConfig());
}
@Override
@@ -100,7 +104,8 @@ public class MultiLeaderConsensus implements IConsensus {
path.toString(),
new Peer(consensusGroupId, thisNode),
new ArrayList<>(),
- registry.apply(consensusGroupId));
+ registry.apply(consensusGroupId),
+ config);
stateMachineMap.put(consensusGroupId, consensus);
consensus.start();
}
@@ -161,7 +166,7 @@ public class MultiLeaderConsensus implements IConsensus {
}
MultiLeaderServerImpl impl =
new MultiLeaderServerImpl(
- path, new Peer(groupId, thisNode), peers,
registry.apply(groupId));
+ path, new Peer(groupId, thisNode), peers,
registry.apply(groupId), config);
impl.start();
return impl;
});
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index 017fe05a15..81b74e55c5 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
+import org.apache.iotdb.consensus.config.MultiLeaderConfig;
import org.apache.iotdb.consensus.multileader.logdispatcher.IndexController;
import org.apache.iotdb.consensus.multileader.logdispatcher.LogDispatcher;
import org.apache.iotdb.consensus.multileader.thrift.TLogType;
@@ -55,9 +56,14 @@ public class MultiLeaderServerImpl {
private final List<Peer> configuration;
private final IndexController controller;
private final LogDispatcher logDispatcher;
+ private final MultiLeaderConfig config;
public MultiLeaderServerImpl(
- String storageDir, Peer thisNode, List<Peer> configuration,
IStateMachine stateMachine) {
+ String storageDir,
+ Peer thisNode,
+ List<Peer> configuration,
+ IStateMachine stateMachine,
+ MultiLeaderConfig config) {
this.storageDir = storageDir;
this.thisNode = thisNode;
this.stateMachine = stateMachine;
@@ -69,7 +75,8 @@ public class MultiLeaderServerImpl {
} else {
persistConfiguration();
}
- logDispatcher = new LogDispatcher(this);
+ this.config = config;
+ this.logDispatcher = new LogDispatcher(this);
}
public IStateMachine getStateMachine() {
@@ -183,4 +190,8 @@ public class MultiLeaderServerImpl {
public IndexController getController() {
return controller;
}
+
+ public MultiLeaderConfig getConfig() {
+ return config;
+ }
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
index 8cc7d10bef..14a2b4ad58 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.consensus.multileader.client;
-import org.apache.iotdb.consensus.multileader.conf.MultiLeaderConsensusConfig;
import
org.apache.iotdb.consensus.multileader.logdispatcher.LogDispatcher.LogDispatcherThread;
import org.apache.iotdb.consensus.multileader.logdispatcher.PendingBatch;
import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
@@ -79,9 +78,11 @@ public class DispatchLogHandler implements
AsyncMethodCallback<TSyncLogRes> {
try {
long defaultSleepTime =
(long)
- (MultiLeaderConsensusConfig.BASIC_RETRY_WAIT_TIME_MS *
Math.pow(2, retryCount));
+
(thread.getConfig().getReplication().getBasicRetryWaitTimeMs()
+ * Math.pow(2, retryCount));
Thread.sleep(
- Math.min(defaultSleepTime,
MultiLeaderConsensusConfig.MAX_RETRY_WAIT_TIME_MS));
+ Math.min(
+ defaultSleepTime,
thread.getConfig().getReplication().getMaxRetryWaitTimeMs()));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Unexpected interruption during retry pending batch");
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/MultiLeaderConsensusClientPool.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/MultiLeaderConsensusClientPool.java
index a43a979694..e3687edf8f 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/MultiLeaderConsensusClientPool.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/MultiLeaderConsensusClientPool.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.commons.client.ClientFactoryProperty;
import org.apache.iotdb.commons.client.ClientManager;
import org.apache.iotdb.commons.client.ClientPoolProperty;
import org.apache.iotdb.commons.client.IClientPoolFactory;
-import org.apache.iotdb.consensus.multileader.conf.MultiLeaderConsensusConfig;
+import org.apache.iotdb.consensus.config.MultiLeaderConfig;
import org.apache.commons.pool2.KeyedObjectPool;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
@@ -35,6 +35,13 @@ public class MultiLeaderConsensusClientPool {
public static class AsyncMultiLeaderServiceClientPoolFactory
implements IClientPoolFactory<TEndPoint, AsyncMultiLeaderServiceClient> {
+
+ private final MultiLeaderConfig config;
+
+ public AsyncMultiLeaderServiceClientPoolFactory(MultiLeaderConfig config) {
+ this.config = config;
+ }
+
@Override
public KeyedObjectPool<TEndPoint, AsyncMultiLeaderServiceClient>
createClientPool(
ClientManager<TEndPoint, AsyncMultiLeaderServiceClient> manager) {
@@ -42,11 +49,10 @@ public class MultiLeaderConsensusClientPool {
new AsyncMultiLeaderServiceClient.Factory(
manager,
new ClientFactoryProperty.Builder()
-
.setConnectionTimeoutMs(MultiLeaderConsensusConfig.CONNECTION_TIMEOUT_IN_MS)
- .setRpcThriftCompressionEnabled(
-
MultiLeaderConsensusConfig.IS_RPC_THRIFT_COMPRESSION_ENABLED)
+
.setConnectionTimeoutMs(config.getRpc().getConnectionTimeoutInMs())
+
.setRpcThriftCompressionEnabled(config.getRpc().isRpcThriftCompressionEnabled())
.setSelectorNumOfAsyncClientManager(
-
MultiLeaderConsensusConfig.SELECTOR_NUM_OF_CLIENT_MANAGER)
+ config.getRpc().getSelectorNumOfClientManager())
.build()),
new
ClientPoolProperty.Builder<AsyncMultiLeaderServiceClient>().build().getConfig());
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/conf/MultiLeaderConsensusConfig.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/conf/MultiLeaderConsensusConfig.java
deleted file mode 100644
index 10a547728f..0000000000
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/conf/MultiLeaderConsensusConfig.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.consensus.multileader.conf;
-
-import java.util.concurrent.TimeUnit;
-
-// TODO make it configurable
-public class MultiLeaderConsensusConfig {
-
- private MultiLeaderConsensusConfig() {}
-
- public static final int RPC_MAX_CONCURRENT_CLIENT_NUM = 65535;
- public static final int THRIFT_SERVER_AWAIT_TIME_FOR_STOP_SERVICE = 60;
- public static final boolean IS_RPC_THRIFT_COMPRESSION_ENABLED = false;
- public static final int SELECTOR_NUM_OF_CLIENT_MANAGER = 1;
- public static final int CONNECTION_TIMEOUT_IN_MS = (int)
TimeUnit.SECONDS.toMillis(20);
- public static final int MAX_PENDING_REQUEST_NUM_PER_NODE = 1000;
- public static final int MAX_REQUEST_PER_BATCH = 100;
- public static final int MAX_PENDING_BATCH = 50;
- public static final int MAX_WAITING_TIME_FOR_ACCUMULATE_BATCH_IN_MS = 10;
- public static final long BASIC_RETRY_WAIT_TIME_MS =
TimeUnit.MILLISECONDS.toMillis(100);
- public static final long MAX_RETRY_WAIT_TIME_MS =
TimeUnit.SECONDS.toMillis(20);
-}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 3e664ce4e0..398d50fc93 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -25,11 +25,11 @@ import
org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
+import org.apache.iotdb.consensus.config.MultiLeaderConfig;
import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
import
org.apache.iotdb.consensus.multileader.client.AsyncMultiLeaderServiceClient;
import org.apache.iotdb.consensus.multileader.client.DispatchLogHandler;
import
org.apache.iotdb.consensus.multileader.client.MultiLeaderConsensusClientPool.AsyncMultiLeaderServiceClientPoolFactory;
-import org.apache.iotdb.consensus.multileader.conf.MultiLeaderConsensusConfig;
import org.apache.iotdb.consensus.multileader.thrift.TLogBatch;
import org.apache.iotdb.consensus.multileader.thrift.TLogType;
import org.apache.iotdb.consensus.multileader.thrift.TSyncLogReq;
@@ -72,14 +72,14 @@ public class LogDispatcher {
this.threads =
impl.getConfiguration().stream()
.filter(x -> !Objects.equals(x, impl.getThisNode()))
- .map(LogDispatcherThread::new)
+ .map(x -> new LogDispatcherThread(x, impl.getConfig()))
.collect(Collectors.toList());
if (!threads.isEmpty()) {
this.executorService =
IoTDBThreadPoolFactory.newFixedThreadPool(threads.size(),
"LogDispatcher");
this.clientManager =
new IClientManager.Factory<TEndPoint,
AsyncMultiLeaderServiceClient>()
- .createClientManager(new
AsyncMultiLeaderServiceClientPoolFactory());
+ .createClientManager(new
AsyncMultiLeaderServiceClientPoolFactory(impl.getConfig()));
}
}
@@ -121,26 +121,29 @@ public class LogDispatcher {
public class LogDispatcherThread implements Runnable {
- private volatile boolean stopped = false;
+ private final MultiLeaderConfig config;
private final Peer peer;
private final IndexController controller;
// A sliding window class that manages asynchronously pendingBatches
private final SyncStatus syncStatus;
// A queue used to receive asynchronous replication requests
- private final BlockingQueue<IndexedConsensusRequest> pendingRequest =
- new
ArrayBlockingQueue<>(MultiLeaderConsensusConfig.MAX_PENDING_REQUEST_NUM_PER_NODE);
+ private final BlockingQueue<IndexedConsensusRequest> pendingRequest;
// A container used to cache requests, whose size changes dynamically
private final List<IndexedConsensusRequest> bufferedRequest = new
LinkedList<>();
// A reader management class that gets requests from the DataRegion
private final ConsensusReqReader reader =
(ConsensusReqReader) impl.getStateMachine().read(new
GetConsensusReqReaderPlan());
+ private volatile boolean stopped = false;
- public LogDispatcherThread(Peer peer) {
+ public LogDispatcherThread(Peer peer, MultiLeaderConfig config) {
this.peer = peer;
+ this.config = config;
+ this.pendingRequest =
+ new
ArrayBlockingQueue<>(config.getReplication().getMaxPendingRequestNumPerNode());
this.controller =
new IndexController(
impl.getStorageDir(),
Utils.fromTEndPointToString(peer.getEndpoint()), false);
- this.syncStatus = new SyncStatus(controller);
+ this.syncStatus = new SyncStatus(controller, config);
}
public IndexController getController() {
@@ -155,6 +158,10 @@ public class LogDispatcher {
return peer;
}
+ public MultiLeaderConfig getConfig() {
+ return config;
+ }
+
public boolean offer(IndexedConsensusRequest request) {
return pendingRequest.offer(request);
}
@@ -177,8 +184,8 @@ public class LogDispatcher {
// we may block here if there is no requests in the queue
bufferedRequest.add(pendingRequest.take());
// If write pressure is low, we simply sleep a little to reduce
the number of RPC
- if (pendingRequest.size() <=
MultiLeaderConsensusConfig.MAX_REQUEST_PER_BATCH) {
-
Thread.sleep(MultiLeaderConsensusConfig.MAX_WAITING_TIME_FOR_ACCUMULATE_BATCH_IN_MS);
+ if (pendingRequest.size() <=
config.getReplication().getMaxRequestPerBatch()) {
+
Thread.sleep(config.getReplication().getMaxWaitingTimeForAccumulatingBatchInMs());
}
}
// we may block here if the synchronization pipeline is full
@@ -200,11 +207,11 @@ public class LogDispatcher {
long startIndex = syncStatus.getNextSendingIndex();
long maxIndex = impl.getController().getCurrentIndex();
long endIndex;
- if (bufferedRequest.size() <=
MultiLeaderConsensusConfig.MAX_REQUEST_PER_BATCH) {
+ if (bufferedRequest.size() <=
config.getReplication().getMaxRequestPerBatch()) {
// Use drainTo instead of poll to reduce lock overhead
pendingRequest.drainTo(
bufferedRequest,
- MultiLeaderConsensusConfig.MAX_REQUEST_PER_BATCH -
bufferedRequest.size());
+ config.getReplication().getMaxRequestPerBatch() -
bufferedRequest.size());
}
if (bufferedRequest.isEmpty()) {
// only execute this after a restart
@@ -217,7 +224,7 @@ public class LogDispatcher {
// Prevents gap between logs. For example, some requests are not
written into the queue when
// the queue is full. In this case, requests need to be loaded from
the WAL
endIndex = constructBatchFromWAL(startIndex, prev.getSearchIndex(),
logBatches);
- if (logBatches.size() ==
MultiLeaderConsensusConfig.MAX_REQUEST_PER_BATCH) {
+ if (logBatches.size() ==
config.getReplication().getMaxRequestPerBatch()) {
batch = new PendingBatch(startIndex, endIndex, logBatches);
logger.debug("accumulated a {} from wal", batch);
return batch;
@@ -226,14 +233,14 @@ public class LogDispatcher {
endIndex = prev.getSearchIndex();
iterator.remove();
while (iterator.hasNext()
- && logBatches.size() <=
MultiLeaderConsensusConfig.MAX_REQUEST_PER_BATCH) {
+ && logBatches.size() <=
config.getReplication().getMaxRequestPerBatch()) {
IndexedConsensusRequest current = iterator.next();
// Prevents gap between logs. For example, some logs are not written
into the queue when
// the queue is full. In this case, requests need to be loaded from
the WAL
if (current.getSearchIndex() != prev.getSearchIndex() + 1) {
endIndex =
constructBatchFromWAL(prev.getSearchIndex(),
current.getSearchIndex(), logBatches);
- if (logBatches.size() ==
MultiLeaderConsensusConfig.MAX_REQUEST_PER_BATCH) {
+ if (logBatches.size() ==
config.getReplication().getMaxRequestPerBatch()) {
batch = new PendingBatch(startIndex, endIndex, logBatches);
logger.debug("accumulated a {} from queue and wal", batch);
return batch;
@@ -271,7 +278,7 @@ public class LogDispatcher {
private long constructBatchFromWAL(
long currentIndex, long maxIndex, List<TLogBatch> logBatches) {
while (currentIndex < maxIndex
- && logBatches.size() <
MultiLeaderConsensusConfig.MAX_REQUEST_PER_BATCH) {
+ && logBatches.size() <
config.getReplication().getMaxRequestPerBatch()) {
// TODO iterator
IConsensusRequest data = reader.getReq(currentIndex++);
if (data != null) {
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java
index fb521b9b07..e9901d931a 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.consensus.multileader.logdispatcher;
import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.consensus.multileader.conf.MultiLeaderConsensusConfig;
+import org.apache.iotdb.consensus.config.MultiLeaderConfig;
import java.util.Iterator;
import java.util.LinkedList;
@@ -28,17 +28,19 @@ import java.util.List;
public class SyncStatus {
+ private final MultiLeaderConfig config;
private final IndexController controller;
private final List<PendingBatch> pendingBatches = new LinkedList<>();
- public SyncStatus(IndexController controller) {
+ public SyncStatus(IndexController controller, MultiLeaderConfig config) {
this.controller = controller;
+ this.config = config;
}
/** we may block here if the synchronization pipeline is full */
public void addNextBatch(PendingBatch batch) throws InterruptedException {
synchronized (this) {
- while (pendingBatches.size() >=
MultiLeaderConsensusConfig.MAX_PENDING_BATCH) {
+ while (pendingBatches.size() >=
config.getReplication().getMaxPendingBatch()) {
wait();
}
pendingBatches.add(batch);
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCService.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCService.java
index 694a7e038b..64dadf2aea 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCService.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCService.java
@@ -26,7 +26,7 @@ import
org.apache.iotdb.commons.exception.runtime.RPCServiceException;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.service.ThriftService;
import org.apache.iotdb.commons.service.ThriftServiceThread;
-import org.apache.iotdb.consensus.multileader.conf.MultiLeaderConsensusConfig;
+import org.apache.iotdb.consensus.config.MultiLeaderConfig;
import
org.apache.iotdb.consensus.multileader.thrift.MultiLeaderConsensusIService;
import java.lang.reflect.InvocationTargetException;
@@ -34,10 +34,12 @@ import java.lang.reflect.InvocationTargetException;
public class MultiLeaderRPCService extends ThriftService implements
MultiLeaderRPCServiceMBean {
private final TEndPoint thisNode;
+ private final MultiLeaderConfig config;
private MultiLeaderRPCServiceProcessor multiLeaderRPCServiceProcessor;
- public MultiLeaderRPCService(TEndPoint thisNode) {
+ public MultiLeaderRPCService(TEndPoint thisNode, MultiLeaderConfig config) {
this.thisNode = thisNode;
+ this.config = config;
}
@Override
@@ -73,10 +75,10 @@ public class MultiLeaderRPCService extends ThriftService
implements MultiLeaderR
ThreadName.MULTI_LEADER_CONSENSUS_RPC_CLIENT.getName(),
getBindIP(),
getBindPort(),
- MultiLeaderConsensusConfig.RPC_MAX_CONCURRENT_CLIENT_NUM,
-
MultiLeaderConsensusConfig.THRIFT_SERVER_AWAIT_TIME_FOR_STOP_SERVICE,
+ config.getRpc().getRpcMaxConcurrentClientNum(),
+ config.getRpc().getThriftServerAwaitTimeForStopService(),
new MultiLeaderRPCServiceHandler(multiLeaderRPCServiceProcessor),
- MultiLeaderConsensusConfig.IS_RPC_THRIFT_COMPRESSION_ENABLED);
+ config.getRpc().isRpcThriftCompressionEnabled());
} catch (RPCServiceException e) {
throw new IllegalAccessException(e.getMessage());
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 0c96034f7c..ff1a207403 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -36,6 +36,7 @@ import
org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
import
org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
@@ -44,7 +45,6 @@ import
org.apache.iotdb.consensus.exception.RatisRequestFailedException;
import org.apache.commons.pool2.KeyedObjectPool;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
-import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.client.RaftClientRpc;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
@@ -62,7 +62,6 @@ import
org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.server.DivisionInfo;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.util.TimeDuration;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -109,29 +108,18 @@ class RatisConsensus implements IConsensus {
// TODO make it configurable
private static final int DEFAULT_WAIT_LEADER_READY_TIMEOUT = (int)
TimeUnit.SECONDS.toMillis(20);
- /**
- * @param ratisStorageDir different groups of RatisConsensus Peer all share
ratisStorageDir as
- * root dir
- */
- public RatisConsensus(TEndPoint endpoint, File ratisStorageDir,
IStateMachine.Registry registry)
+ public RatisConsensus(ConsensusConfig config, IStateMachine.Registry
registry)
throws IOException {
- myself = Utils.fromTEndPointAndPriorityToRaftPeer(endpoint,
DEFAULT_PRIORITY);
+ myself = Utils.fromTEndPointAndPriorityToRaftPeer(config.getThisNode(),
DEFAULT_PRIORITY);
System.setProperty(
"org.apache.ratis.thirdparty.io.netty.allocator.useCacheForAllThreads",
"false");
- RaftServerConfigKeys.setStorageDir(properties,
Collections.singletonList(ratisStorageDir));
- RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(properties, true);
- // TODO make this configurable so that RatisConsensusTest can trigger
multiple snapshot process
- // RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold(properties, 20);
- RaftServerConfigKeys.Rpc.setSlownessTimeout(
- properties, TimeDuration.valueOf(10, TimeUnit.MINUTES));
- RaftServerConfigKeys.Rpc.setTimeoutMin(properties, TimeDuration.valueOf(2,
TimeUnit.SECONDS));
- RaftServerConfigKeys.Rpc.setTimeoutMax(properties, TimeDuration.valueOf(8,
TimeUnit.SECONDS));
- RaftServerConfigKeys.Rpc.setSleepTime(properties, TimeDuration.valueOf(1,
TimeUnit.SECONDS));
- RaftClientConfigKeys.Rpc.setRequestTimeout(
- properties, TimeDuration.valueOf(20, TimeUnit.SECONDS));
-
- GrpcConfigKeys.Server.setPort(properties, endpoint.getPort());
+ RaftServerConfigKeys.setStorageDir(
+ properties, Collections.singletonList(new
File(config.getStorageDir())));
+ GrpcConfigKeys.Server.setPort(properties, config.getThisNode().getPort());
+
+ Utils.initRatisConfig(properties, config.getRatisConfig());
+
clientRpc = new GrpcFactory(new
Parameters()).newRaftClientRpc(ClientId.randomId(), properties);
server =
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
index 93a351082c..7d3071ba98 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
@@ -22,11 +22,16 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.config.RatisConfig;
+import org.apache.ratis.client.RaftClientConfigKeys;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.proto.RaftProtos.RaftPeerProto;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.thrift.TException;
@@ -154,4 +159,64 @@ public class Utils {
String[] items = metadata.split("_");
return TermIndex.valueOf(Long.parseLong(items[0]),
Long.parseLong(items[1]));
}
+
+ public static void initRatisConfig(RaftProperties properties, RatisConfig
config) {
+ GrpcConfigKeys.setMessageSizeMax(properties,
config.getGrpc().getMessageSizeMax());
+ GrpcConfigKeys.setFlowControlWindow(properties,
config.getGrpc().getFlowControlWindow());
+ GrpcConfigKeys.Server.setAsyncRequestThreadPoolCached(
+ properties, config.getGrpc().isAsyncRequestThreadPoolCached());
+ GrpcConfigKeys.Server.setAsyncRequestThreadPoolSize(
+ properties, config.getGrpc().getAsyncRequestThreadPoolSize());
+ GrpcConfigKeys.Server.setLeaderOutstandingAppendsMax(
+ properties, config.getGrpc().getLeaderOutstandingAppendsMax());
+
+ RaftServerConfigKeys.Rpc.setSlownessTimeout(properties,
config.getRpc().getSlownessTimeout());
+ RaftServerConfigKeys.Rpc.setTimeoutMin(properties,
config.getRpc().getTimeoutMin());
+ RaftServerConfigKeys.Rpc.setTimeoutMax(properties,
config.getRpc().getTimeoutMax());
+ RaftServerConfigKeys.Rpc.setSleepTime(properties,
config.getRpc().getSleepTime());
+ RaftClientConfigKeys.Rpc.setRequestTimeout(properties,
config.getRpc().getRequestTimeout());
+
+ RaftServerConfigKeys.LeaderElection.setLeaderStepDownWaitTime(
+ properties, config.getLeaderElection().getLeaderStepDownWaitTimeKey());
+ RaftServerConfigKeys.LeaderElection.setPreVote(
+ properties, config.getLeaderElection().isPreVote());
+
+ RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(
+ properties, config.getSnapshot().isAutoTriggerEnabled());
+ RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold(
+ properties, config.getSnapshot().getAutoTriggerThreshold());
+ RaftServerConfigKeys.Snapshot.setCreationGap(properties,
config.getSnapshot().getCreationGap());
+ RaftServerConfigKeys.Snapshot.setRetentionFileNum(
+ properties, config.getSnapshot().getRetentionFileNum());
+
+ RaftServerConfigKeys.ThreadPool.setClientCached(
+ properties, config.getThreadPool().isClientCached());
+ RaftServerConfigKeys.ThreadPool.setClientSize(
+ properties, config.getThreadPool().getClientSize());
+ RaftServerConfigKeys.ThreadPool.setProxyCached(
+ properties, config.getThreadPool().isProxyCached());
+ RaftServerConfigKeys.ThreadPool.setProxySize(properties,
config.getThreadPool().getProxySize());
+ RaftServerConfigKeys.ThreadPool.setServerCached(
+ properties, config.getThreadPool().isServerCached());
+ RaftServerConfigKeys.ThreadPool.setServerSize(
+ properties, config.getThreadPool().getServerSize());
+
+ RaftServerConfigKeys.Log.setUseMemory(properties,
config.getLog().isUseMemory());
+ RaftServerConfigKeys.Log.setQueueElementLimit(
+ properties, config.getLog().getQueueElementLimit());
+ RaftServerConfigKeys.Log.setQueueByteLimit(properties,
config.getLog().getQueueByteLimit());
+ RaftServerConfigKeys.Log.setPurgeGap(properties,
config.getLog().getPurgeGap());
+ RaftServerConfigKeys.Log.setPurgeUptoSnapshotIndex(
+ properties, config.getLog().isPurgeUptoSnapshotIndex());
+ RaftServerConfigKeys.Log.setSegmentSizeMax(properties,
config.getLog().getSegmentSizeMax());
+ RaftServerConfigKeys.Log.setSegmentCacheNumMax(
+ properties, config.getLog().getSegmentCacheNumMax());
+ RaftServerConfigKeys.Log.setSegmentCacheSizeMax(
+ properties, config.getLog().getSegmentCacheSizeMax());
+ RaftServerConfigKeys.Log.setPreallocatedSize(properties,
config.getLog().getPreallocatedSize());
+ RaftServerConfigKeys.Log.setWriteBufferSize(properties,
config.getLog().getWriteBufferSize());
+ RaftServerConfigKeys.Log.setForceSyncNum(properties,
config.getLog().getForceSyncNum());
+ RaftServerConfigKeys.Log.setUnsafeFlushEnabled(
+ properties, config.getLog().isUnsafeFlushEnabled());
+ }
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
index 3604d29059..69b23d3691 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
import
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
@@ -63,9 +64,9 @@ class StandAloneConsensus implements IConsensus {
private final Map<ConsensusGroupId, StandAloneServerImpl> stateMachineMap =
new ConcurrentHashMap<>();
- public StandAloneConsensus(TEndPoint thisNode, File storageDir, Registry
registry) {
- this.thisNode = thisNode;
- this.storageDir = storageDir;
+ public StandAloneConsensus(ConsensusConfig config, Registry registry) {
+ this.thisNode = config.getThisNode();
+ this.storageDir = new File(config.getStorageDir());
this.registry = registry;
}
diff --git
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
index 770031bee1..045c3c44d8 100644
---
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
+++
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
import org.apache.iotdb.consensus.multileader.logdispatcher.IndexController;
import org.apache.iotdb.consensus.multileader.thrift.TLogType;
import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
@@ -100,8 +101,10 @@ public class MultiLeaderConsensusTest {
(MultiLeaderConsensus)
ConsensusFactory.getConsensusImpl(
ConsensusFactory.MultiLeaderConsensus,
- peers.get(i).getEndpoint(),
- peersStorage.get(i),
+ ConsensusConfig.newBuilder()
+ .setThisNode(peers.get(i).getEndpoint())
+ .setStorageDir(peersStorage.get(i).getAbsolutePath())
+ .build(),
groupId -> stateMachines.get(finalI))
.orElseThrow(
() ->
diff --git
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
index cba5147872..cbf015257e 100644
---
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
+++
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.consensus.multileader.logdispatcher;
-import org.apache.iotdb.consensus.multileader.conf.MultiLeaderConsensusConfig;
+import org.apache.iotdb.consensus.config.MultiLeaderConfig;
import org.apache.ratis.util.FileUtils;
import org.junit.After;
@@ -39,6 +39,7 @@ public class SyncStatusTest {
private static final File storageDir = new File("target" +
java.io.File.separator + "test");
private static final String prefix = "version";
+ private static final MultiLeaderConfig config = new
MultiLeaderConfig.Builder().build();
@Before
public void setUp() throws IOException {
@@ -56,22 +57,22 @@ public class SyncStatusTest {
IndexController controller = new
IndexController(storageDir.getAbsolutePath(), prefix, true);
Assert.assertEquals(0, controller.getCurrentIndex());
- SyncStatus status = new SyncStatus(controller);
+ SyncStatus status = new SyncStatus(controller, config);
List<PendingBatch> batchList = new ArrayList<>();
- for (long i = 0; i < MultiLeaderConsensusConfig.MAX_PENDING_BATCH; i++) {
+ for (long i = 0; i < config.getReplication().getMaxPendingBatch(); i++) {
PendingBatch batch = new PendingBatch(i, i, Collections.emptyList());
batchList.add(batch);
status.addNextBatch(batch);
}
- for (int i = 0; i < MultiLeaderConsensusConfig.MAX_PENDING_BATCH; i++) {
+ for (int i = 0; i < config.getReplication().getMaxPendingBatch(); i++) {
status.removeBatch(batchList.get(i));
Assert.assertEquals(
- MultiLeaderConsensusConfig.MAX_PENDING_BATCH - 1 - i,
status.getPendingBatches().size());
+ config.getReplication().getMaxPendingBatch() - 1 - i,
status.getPendingBatches().size());
Assert.assertEquals(i, controller.getCurrentIndex());
Assert.assertEquals(
- MultiLeaderConsensusConfig.MAX_PENDING_BATCH,
status.getNextSendingIndex());
+ config.getReplication().getMaxPendingBatch(),
status.getNextSendingIndex());
}
}
@@ -82,29 +83,29 @@ public class SyncStatusTest {
Assert.assertEquals(0, controller.getCurrentIndex());
Assert.assertEquals(0, controller.getLastFlushedIndex());
- SyncStatus status = new SyncStatus(controller);
+ SyncStatus status = new SyncStatus(controller, config);
List<PendingBatch> batchList = new ArrayList<>();
- for (long i = 0; i < MultiLeaderConsensusConfig.MAX_PENDING_BATCH; i++) {
+ for (long i = 0; i < config.getReplication().getMaxPendingBatch(); i++) {
PendingBatch batch = new PendingBatch(i, i, Collections.emptyList());
batchList.add(batch);
status.addNextBatch(batch);
}
- for (int i = 0; i < MultiLeaderConsensusConfig.MAX_PENDING_BATCH - 1; i++)
{
-
status.removeBatch(batchList.get(MultiLeaderConsensusConfig.MAX_PENDING_BATCH -
1 - i));
+ for (int i = 0; i < config.getReplication().getMaxPendingBatch() - 1; i++)
{
+
status.removeBatch(batchList.get(config.getReplication().getMaxPendingBatch() -
1 - i));
Assert.assertEquals(
- MultiLeaderConsensusConfig.MAX_PENDING_BATCH,
status.getPendingBatches().size());
+ config.getReplication().getMaxPendingBatch(),
status.getPendingBatches().size());
Assert.assertEquals(0, controller.getCurrentIndex());
Assert.assertEquals(
- MultiLeaderConsensusConfig.MAX_PENDING_BATCH,
status.getNextSendingIndex());
+ config.getReplication().getMaxPendingBatch(),
status.getNextSendingIndex());
}
status.removeBatch(batchList.get(0));
Assert.assertEquals(0, status.getPendingBatches().size());
Assert.assertEquals(
- MultiLeaderConsensusConfig.MAX_PENDING_BATCH - 1,
controller.getCurrentIndex());
- Assert.assertEquals(MultiLeaderConsensusConfig.MAX_PENDING_BATCH,
status.getNextSendingIndex());
+ config.getReplication().getMaxPendingBatch() - 1,
controller.getCurrentIndex());
+ Assert.assertEquals(config.getReplication().getMaxPendingBatch(),
status.getNextSendingIndex());
}
/** Confirm success first from front to back, then back to front */
@@ -114,39 +115,39 @@ public class SyncStatusTest {
Assert.assertEquals(0, controller.getCurrentIndex());
Assert.assertEquals(0, controller.getLastFlushedIndex());
- SyncStatus status = new SyncStatus(controller);
+ SyncStatus status = new SyncStatus(controller, config);
List<PendingBatch> batchList = new ArrayList<>();
- for (long i = 0; i < MultiLeaderConsensusConfig.MAX_PENDING_BATCH; i++) {
+ for (long i = 0; i < config.getReplication().getMaxPendingBatch(); i++) {
PendingBatch batch = new PendingBatch(i, i, Collections.emptyList());
batchList.add(batch);
status.addNextBatch(batch);
}
- for (int i = 0; i < MultiLeaderConsensusConfig.MAX_PENDING_BATCH / 2; i++)
{
+ for (int i = 0; i < config.getReplication().getMaxPendingBatch() / 2; i++)
{
status.removeBatch(batchList.get(i));
Assert.assertEquals(
- MultiLeaderConsensusConfig.MAX_PENDING_BATCH - 1 - i,
status.getPendingBatches().size());
+ config.getReplication().getMaxPendingBatch() - 1 - i,
status.getPendingBatches().size());
Assert.assertEquals(i, controller.getCurrentIndex());
Assert.assertEquals(
- MultiLeaderConsensusConfig.MAX_PENDING_BATCH,
status.getNextSendingIndex());
+ config.getReplication().getMaxPendingBatch(),
status.getNextSendingIndex());
}
- for (int i = MultiLeaderConsensusConfig.MAX_PENDING_BATCH / 2 + 1;
- i < MultiLeaderConsensusConfig.MAX_PENDING_BATCH;
+ for (int i = config.getReplication().getMaxPendingBatch() / 2 + 1;
+ i < config.getReplication().getMaxPendingBatch();
i++) {
status.removeBatch(batchList.get(i));
Assert.assertEquals(
- MultiLeaderConsensusConfig.MAX_PENDING_BATCH / 2,
status.getPendingBatches().size());
+ config.getReplication().getMaxPendingBatch() / 2,
status.getPendingBatches().size());
Assert.assertEquals(
- MultiLeaderConsensusConfig.MAX_PENDING_BATCH,
status.getNextSendingIndex());
+ config.getReplication().getMaxPendingBatch(),
status.getNextSendingIndex());
}
-
status.removeBatch(batchList.get(MultiLeaderConsensusConfig.MAX_PENDING_BATCH /
2));
+
status.removeBatch(batchList.get(config.getReplication().getMaxPendingBatch() /
2));
Assert.assertEquals(0, status.getPendingBatches().size());
Assert.assertEquals(
- MultiLeaderConsensusConfig.MAX_PENDING_BATCH - 1,
controller.getCurrentIndex());
- Assert.assertEquals(MultiLeaderConsensusConfig.MAX_PENDING_BATCH,
status.getNextSendingIndex());
+ config.getReplication().getMaxPendingBatch() - 1,
controller.getCurrentIndex());
+ Assert.assertEquals(config.getReplication().getMaxPendingBatch(),
status.getNextSendingIndex());
}
/** Test Blocking while addNextBatch */
@@ -155,22 +156,22 @@ public class SyncStatusTest {
IndexController controller = new
IndexController(storageDir.getAbsolutePath(), prefix, true);
Assert.assertEquals(0, controller.getCurrentIndex());
- SyncStatus status = new SyncStatus(controller);
+ SyncStatus status = new SyncStatus(controller, config);
List<PendingBatch> batchList = new ArrayList<>();
- for (long i = 0; i < MultiLeaderConsensusConfig.MAX_PENDING_BATCH; i++) {
+ for (long i = 0; i < config.getReplication().getMaxPendingBatch(); i++) {
PendingBatch batch = new PendingBatch(i, i, Collections.emptyList());
batchList.add(batch);
status.addNextBatch(batch);
}
- for (int i = 0; i < MultiLeaderConsensusConfig.MAX_PENDING_BATCH - 1; i++)
{
-
status.removeBatch(batchList.get(MultiLeaderConsensusConfig.MAX_PENDING_BATCH -
1 - i));
+ for (int i = 0; i < config.getReplication().getMaxPendingBatch() - 1; i++)
{
+
status.removeBatch(batchList.get(config.getReplication().getMaxPendingBatch() -
1 - i));
Assert.assertEquals(
- MultiLeaderConsensusConfig.MAX_PENDING_BATCH,
status.getPendingBatches().size());
+ config.getReplication().getMaxPendingBatch(),
status.getPendingBatches().size());
Assert.assertEquals(0, controller.getCurrentIndex());
Assert.assertEquals(
- MultiLeaderConsensusConfig.MAX_PENDING_BATCH,
status.getNextSendingIndex());
+ config.getReplication().getMaxPendingBatch(),
status.getNextSendingIndex());
}
CompletableFuture<Boolean> future =
@@ -178,8 +179,8 @@ public class SyncStatusTest {
() -> {
PendingBatch batch =
new PendingBatch(
- MultiLeaderConsensusConfig.MAX_PENDING_BATCH,
- MultiLeaderConsensusConfig.MAX_PENDING_BATCH,
+ config.getReplication().getMaxPendingBatch(),
+ config.getReplication().getMaxPendingBatch(),
Collections.emptyList());
batchList.add(batch);
try {
@@ -198,14 +199,14 @@ public class SyncStatusTest {
Assert.assertTrue(future.get());
Assert.assertEquals(1, status.getPendingBatches().size());
Assert.assertEquals(
- MultiLeaderConsensusConfig.MAX_PENDING_BATCH - 1,
controller.getCurrentIndex());
+ config.getReplication().getMaxPendingBatch() - 1,
controller.getCurrentIndex());
Assert.assertEquals(
- MultiLeaderConsensusConfig.MAX_PENDING_BATCH + 1,
status.getNextSendingIndex());
+ config.getReplication().getMaxPendingBatch() + 1,
status.getNextSendingIndex());
-
status.removeBatch(batchList.get(MultiLeaderConsensusConfig.MAX_PENDING_BATCH));
+
status.removeBatch(batchList.get(config.getReplication().getMaxPendingBatch()));
Assert.assertEquals(0, status.getPendingBatches().size());
- Assert.assertEquals(MultiLeaderConsensusConfig.MAX_PENDING_BATCH,
controller.getCurrentIndex());
+ Assert.assertEquals(config.getReplication().getMaxPendingBatch(),
controller.getCurrentIndex());
Assert.assertEquals(
- MultiLeaderConsensusConfig.MAX_PENDING_BATCH + 1,
status.getNextSendingIndex());
+ config.getReplication().getMaxPendingBatch() + 1,
status.getNextSendingIndex());
}
}
diff --git
a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
index b114c2305a..063c798722 100644
---
a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
+++
b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
import org.apache.ratis.util.FileUtils;
import org.junit.After;
@@ -63,8 +64,10 @@ public class RatisConsensusTest {
servers.add(
ConsensusFactory.getConsensusImpl(
ConsensusFactory.RatisConsensus,
- peers.get(i).getEndpoint(),
- peersStorage.get(i),
+ ConsensusConfig.newBuilder()
+ .setThisNode(peers.get(i).getEndpoint())
+ .setStorageDir(peersStorage.get(i).getAbsolutePath())
+ .build(),
groupId -> stateMachines.get(finalI))
.orElseThrow(
() ->
diff --git
a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java
b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java
index de4442ed79..2dd7cd9fbc 100644
---
a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java
+++
b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.consensus.IConsensus;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
import
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
import org.apache.ratis.util.FileUtils;
@@ -45,8 +46,10 @@ public class RecoveryTest {
consensusImpl =
ConsensusFactory.getConsensusImpl(
ConsensusFactory.StandAloneConsensus,
- new TEndPoint("0.0.0.0", 9000),
- new File("target" + java.io.File.separator + "recovery"),
+ ConsensusConfig.newBuilder()
+ .setThisNode(new TEndPoint("0.0.0.0", 9000))
+ .setStorageDir("target" + java.io.File.separator +
"recovery")
+ .build(),
gid -> new EmptyStateMachine())
.orElseThrow(
() ->
diff --git
a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
index 5c3ae96453..2973fe7f05 100644
---
a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
+++
b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
@@ -34,6 +34,7 @@ import
org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
import
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
@@ -123,8 +124,10 @@ public class StandAloneConsensusTest {
consensusImpl =
ConsensusFactory.getConsensusImpl(
ConsensusFactory.StandAloneConsensus,
- new TEndPoint("0.0.0.0", 6667),
- new File("target" + java.io.File.separator + "standalone"),
+ ConsensusConfig.newBuilder()
+ .setThisNode(new TEndPoint("0.0.0.0", 6667))
+ .setStorageDir("target" + java.io.File.separator +
"standalone")
+ .build(),
gid -> {
switch (gid.getType()) {
case SchemaRegion:
diff --git
a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index 2007f1af59..4832d99f26 100644
---
a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -23,13 +23,14 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
+import org.apache.iotdb.consensus.config.MultiLeaderConfig;
+import org.apache.iotdb.consensus.config.MultiLeaderConfig.RPC;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.statemachine.DataRegionStateMachine;
import org.apache.iotdb.db.engine.StorageEngineV2;
-import java.io.File;
-
/**
* We can use DataRegionConsensusImpl.getInstance() to obtain a consensus
layer reference for
* dataRegion's reading and writing
@@ -48,8 +49,26 @@ public class DataRegionConsensusImpl {
private static final IConsensus INSTANCE =
ConsensusFactory.getConsensusImpl(
conf.getDataRegionConsensusProtocolClass(),
- new TEndPoint(conf.getInternalIp(),
conf.getDataRegionConsensusPort()),
- new File(conf.getDataRegionConsensusDir()),
+ ConsensusConfig.newBuilder()
+ .setThisNode(
+ new TEndPoint(conf.getInternalIp(),
conf.getDataRegionConsensusPort()))
+ .setStorageDir(conf.getDataRegionConsensusDir())
+ .setMultiLeaderConfig(
+ MultiLeaderConfig.newBuilder()
+ .setRpc(
+ RPC.newBuilder()
+
.setConnectionTimeoutInMs(conf.getConnectionTimeoutInMS())
+ .setRpcMaxConcurrentClientNum(
+ conf.getRpcMaxConcurrentClientNum())
+ .setRpcThriftCompressionEnabled(
+ conf.isRpcThriftCompressionEnable())
+ .setSelectorNumOfClientManager(
+ conf.getSelectorNumOfClientManager())
+ .setThriftServerAwaitTimeForStopService(
+
conf.getThriftServerAwaitTimeForStopService())
+ .build())
+ .build())
+ .build(),
gid ->
new DataRegionStateMachine(
StorageEngineV2.getInstance().getDataRegion((DataRegionId) gid)))
diff --git
a/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
b/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
index 1bde912992..177d909c21 100644
---
a/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
@@ -23,13 +23,12 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.statemachine.SchemaRegionStateMachine;
import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
-import java.io.File;
-
/**
* We can use SchemaRegionConsensusImpl.getInstance() to obtain a consensus
layer reference for
* schemaRegion's reading and writing
@@ -48,8 +47,11 @@ public class SchemaRegionConsensusImpl {
private static final IConsensus INSTANCE =
ConsensusFactory.getConsensusImpl(
conf.getSchemaRegionConsensusProtocolClass(),
- new TEndPoint(conf.getInternalIp(),
conf.getSchemaRegionConsensusPort()),
- new File(conf.getSchemaRegionConsensusDir()),
+ ConsensusConfig.newBuilder()
+ .setThisNode(
+ new TEndPoint(conf.getInternalIp(),
conf.getSchemaRegionConsensusPort()))
+ .setStorageDir(conf.getSchemaRegionConsensusDir())
+ .build(),
gid ->
new SchemaRegionStateMachine(
SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) gid)))