This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b1ba13eed1 [IOTDB-3688] Prevent initializing region consensus caused
by heartbeat (#6496)
b1ba13eed1 is described below
commit b1ba13eed1036df21a6ff0ce30c786bc7d6c1eff
Author: Mrquan <[email protected]>
AuthorDate: Wed Jun 29 08:33:51 2022 +0800
[IOTDB-3688] Prevent initializing region consensus caused by heartbeat
(#6496)
---
.../db/consensus/DataRegionConsensusImpl.java | 93 +++++++++++-----------
.../db/consensus/SchemaRegionConsensusImpl.java | 50 ++++++------
.../java/org/apache/iotdb/db/service/DataNode.java | 4 +-
.../thrift/impl/DataNodeRPCServiceImpl.java | 37 +++++----
.../iotdb/db/service/ClientRPCServiceImplTest.java | 4 +-
5 files changed, 100 insertions(+), 88 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index 2369498f13..54d70ab164 100644
---
a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -38,56 +38,59 @@ import org.apache.iotdb.db.engine.StorageEngineV2;
* dataRegion's reading and writing
*/
public class DataRegionConsensusImpl {
+ private static final IoTDBConfig conf =
IoTDBDescriptor.getInstance().getConfig();
+
+ private static IConsensus INSTANCE = null;
private DataRegionConsensusImpl() {}
+ // need to create instance before calling this method
public static IConsensus getInstance() {
- return DataRegionConsensusImplHolder.INSTANCE;
+ return INSTANCE;
}
- private static class DataRegionConsensusImplHolder {
-
- private static final IoTDBConfig conf =
IoTDBDescriptor.getInstance().getConfig();
- private static final IConsensus INSTANCE =
- ConsensusFactory.getConsensusImpl(
- conf.getDataRegionConsensusProtocolClass(),
- ConsensusConfig.newBuilder()
- .setThisNode(
- new TEndPoint(conf.getInternalIp(),
conf.getDataRegionConsensusPort()))
- .setStorageDir(conf.getDataRegionConsensusDir())
- .setMultiLeaderConfig(
- MultiLeaderConfig.newBuilder()
- .setRpc(
- RPC.newBuilder()
-
.setConnectionTimeoutInMs(conf.getConnectionTimeoutInMS())
- .setRpcMaxConcurrentClientNum(
- conf.getRpcMaxConcurrentClientNum())
- .setRpcThriftCompressionEnabled(
- conf.isRpcThriftCompressionEnable())
- .setSelectorNumOfClientManager(
- conf.getSelectorNumOfClientManager())
- .setThriftServerAwaitTimeForStopService(
-
conf.getThriftServerAwaitTimeForStopService())
- .build())
- .build())
- .setRatisConfig(
- RatisConfig.newBuilder()
- // An empty log is committed after each restart,
even if no data is
- // written. This setting ensures that compaction
work is not discarded
- // even if there are frequent restarts
-
.setSnapshot(Snapshot.newBuilder().setCreationGap(1).build())
- .build())
- .build(),
- gid ->
- new DataRegionStateMachine(
-
StorageEngineV2.getInstance().getDataRegion((DataRegionId) gid)))
- .orElseThrow(
- () ->
- new IllegalArgumentException(
- String.format(
- ConsensusFactory.CONSTRUCT_FAILED_MSG,
- conf.getDataRegionConsensusProtocolClass())));
-
- private DataRegionConsensusImplHolder() {}
+ public static synchronized IConsensus setupAndGetInstance() {
+ if (INSTANCE == null) {
+ INSTANCE =
+ ConsensusFactory.getConsensusImpl(
+ conf.getDataRegionConsensusProtocolClass(),
+ ConsensusConfig.newBuilder()
+ .setThisNode(
+ new TEndPoint(conf.getInternalIp(),
conf.getDataRegionConsensusPort()))
+ .setStorageDir(conf.getDataRegionConsensusDir())
+ .setMultiLeaderConfig(
+ MultiLeaderConfig.newBuilder()
+ .setRpc(
+ RPC.newBuilder()
+
.setConnectionTimeoutInMs(conf.getConnectionTimeoutInMS())
+ .setRpcMaxConcurrentClientNum(
+ conf.getRpcMaxConcurrentClientNum())
+ .setRpcThriftCompressionEnabled(
+ conf.isRpcThriftCompressionEnable())
+ .setSelectorNumOfClientManager(
+ conf.getSelectorNumOfClientManager())
+ .setThriftServerAwaitTimeForStopService(
+
conf.getThriftServerAwaitTimeForStopService())
+ .build())
+ .build())
+ .setRatisConfig(
+ RatisConfig.newBuilder()
+ // An empty log is committed after each restart,
even if no data is
+ // written. This setting ensures that compaction
work is not discarded
+ // even if there are frequent restarts
+
.setSnapshot(Snapshot.newBuilder().setCreationGap(1).build())
+ .build())
+ .build(),
+ gid ->
+ new DataRegionStateMachine(
+
StorageEngineV2.getInstance().getDataRegion((DataRegionId) gid)))
+ .orElseThrow(
+ () ->
+ new IllegalArgumentException(
+ String.format(
+ ConsensusFactory.CONSTRUCT_FAILED_MSG,
+ conf.getDataRegionConsensusProtocolClass())));
+ }
+ return INSTANCE;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
b/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
index 177d909c21..bcc8f584c0 100644
---
a/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
@@ -35,33 +35,37 @@ import
org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
*/
public class SchemaRegionConsensusImpl {
+ private static final IoTDBConfig conf =
IoTDBDescriptor.getInstance().getConfig();
+
+ private static IConsensus INSTANCE = null;
+
private SchemaRegionConsensusImpl() {}
+ // need to create instance before calling this method
public static IConsensus getInstance() {
- return SchemaRegionConsensusImplHolder.INSTANCE;
+ return INSTANCE;
}
- private static class SchemaRegionConsensusImplHolder {
-
- private static final IoTDBConfig conf =
IoTDBDescriptor.getInstance().getConfig();
- private static final IConsensus INSTANCE =
- ConsensusFactory.getConsensusImpl(
- conf.getSchemaRegionConsensusProtocolClass(),
- ConsensusConfig.newBuilder()
- .setThisNode(
- new TEndPoint(conf.getInternalIp(),
conf.getSchemaRegionConsensusPort()))
- .setStorageDir(conf.getSchemaRegionConsensusDir())
- .build(),
- gid ->
- new SchemaRegionStateMachine(
-
SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) gid)))
- .orElseThrow(
- () ->
- new IllegalArgumentException(
- String.format(
- ConsensusFactory.CONSTRUCT_FAILED_MSG,
- conf.getSchemaRegionConsensusProtocolClass())));
-
- private SchemaRegionConsensusImplHolder() {}
+ public static synchronized IConsensus setupAndGetInstance() {
+ if (INSTANCE == null) {
+ INSTANCE =
+ ConsensusFactory.getConsensusImpl(
+ conf.getSchemaRegionConsensusProtocolClass(),
+ ConsensusConfig.newBuilder()
+ .setThisNode(
+ new TEndPoint(conf.getInternalIp(),
conf.getSchemaRegionConsensusPort()))
+ .setStorageDir(conf.getSchemaRegionConsensusDir())
+ .build(),
+ gid ->
+ new SchemaRegionStateMachine(
+
SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) gid)))
+ .orElseThrow(
+ () ->
+ new IllegalArgumentException(
+ String.format(
+ ConsensusFactory.CONSTRUCT_FAILED_MSG,
+ conf.getSchemaRegionConsensusProtocolClass())));
+ }
+ return INSTANCE;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index cda183bfeb..4762f54a41 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -276,8 +276,8 @@ public class DataNode implements DataNodeMBean {
try {
// TODO: Start consensus layer in some where else
- SchemaRegionConsensusImpl.getInstance().start();
- DataRegionConsensusImpl.getInstance().start();
+ SchemaRegionConsensusImpl.setupAndGetInstance().start();
+ DataRegionConsensusImpl.setupAndGetInstance().start();
} catch (IOException e) {
throw new StartupException(e);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeRPCServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeRPCServiceImpl.java
index fb1d5fb85b..2ec25c1f38 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeRPCServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeRPCServiceImpl.java
@@ -340,22 +340,27 @@ public class DataNodeRPCServiceImpl implements
IDataNodeRPCService.Iface {
private Map<TConsensusGroupId, Boolean> getJudgedLeaders() {
Map<TConsensusGroupId, Boolean> result = new HashMap<>();
- DataRegionConsensusImpl.getInstance()
- .getAllConsensusGroupIds()
- .forEach(
- groupId -> {
- result.put(
- groupId.convertToTConsensusGroupId(),
- DataRegionConsensusImpl.getInstance().isLeader(groupId));
- });
- SchemaRegionConsensusImpl.getInstance()
- .getAllConsensusGroupIds()
- .forEach(
- groupId -> {
- result.put(
- groupId.convertToTConsensusGroupId(),
- SchemaRegionConsensusImpl.getInstance().isLeader(groupId));
- });
+ if (DataRegionConsensusImpl.getInstance() != null) {
+ DataRegionConsensusImpl.getInstance()
+ .getAllConsensusGroupIds()
+ .forEach(
+ groupId -> {
+ result.put(
+ groupId.convertToTConsensusGroupId(),
+ DataRegionConsensusImpl.getInstance().isLeader(groupId));
+ });
+ }
+
+ if (SchemaRegionConsensusImpl.getInstance() != null) {
+ SchemaRegionConsensusImpl.getInstance()
+ .getAllConsensusGroupIds()
+ .forEach(
+ groupId -> {
+ result.put(
+ groupId.convertToTConsensusGroupId(),
+ SchemaRegionConsensusImpl.getInstance().isLeader(groupId));
+ });
+ }
return result;
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/service/ClientRPCServiceImplTest.java
b/server/src/test/java/org/apache/iotdb/db/service/ClientRPCServiceImplTest.java
index 6fdea59066..17be292632 100644
---
a/server/src/test/java/org/apache/iotdb/db/service/ClientRPCServiceImplTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/service/ClientRPCServiceImplTest.java
@@ -74,8 +74,8 @@ public class ClientRPCServiceImplTest {
IoTDB.configManager.init();
configNode = LocalConfigNode.getInstance();
configNode.getBelongedSchemaRegionIdWithAutoCreate(new
PartialPath("root.ln"));
- DataRegionConsensusImpl.getInstance().start();
- SchemaRegionConsensusImpl.getInstance().start();
+ DataRegionConsensusImpl.setupAndGetInstance().start();
+ SchemaRegionConsensusImpl.setupAndGetInstance().start();
}
@Before