This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b1ba13eed1 [IOTDB-3688] Prevent initializing region consensus caused 
by heartbeat (#6496)
b1ba13eed1 is described below

commit b1ba13eed1036df21a6ff0ce30c786bc7d6c1eff
Author: Mrquan <[email protected]>
AuthorDate: Wed Jun 29 08:33:51 2022 +0800

    [IOTDB-3688] Prevent initializing region consensus caused by heartbeat 
(#6496)
---
 .../db/consensus/DataRegionConsensusImpl.java      | 93 +++++++++++-----------
 .../db/consensus/SchemaRegionConsensusImpl.java    | 50 ++++++------
 .../java/org/apache/iotdb/db/service/DataNode.java |  4 +-
 .../thrift/impl/DataNodeRPCServiceImpl.java        | 37 +++++----
 .../iotdb/db/service/ClientRPCServiceImplTest.java |  4 +-
 5 files changed, 100 insertions(+), 88 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index 2369498f13..54d70ab164 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -38,56 +38,59 @@ import org.apache.iotdb.db.engine.StorageEngineV2;
  * dataRegion's reading and writing
  */
 public class DataRegionConsensusImpl {
+  private static final IoTDBConfig conf = 
IoTDBDescriptor.getInstance().getConfig();
+
+  private static IConsensus INSTANCE = null;
 
   private DataRegionConsensusImpl() {}
 
+  // need to create instance before calling this method
   public static IConsensus getInstance() {
-    return DataRegionConsensusImplHolder.INSTANCE;
+    return INSTANCE;
   }
 
-  private static class DataRegionConsensusImplHolder {
-
-    private static final IoTDBConfig conf = 
IoTDBDescriptor.getInstance().getConfig();
-    private static final IConsensus INSTANCE =
-        ConsensusFactory.getConsensusImpl(
-                conf.getDataRegionConsensusProtocolClass(),
-                ConsensusConfig.newBuilder()
-                    .setThisNode(
-                        new TEndPoint(conf.getInternalIp(), 
conf.getDataRegionConsensusPort()))
-                    .setStorageDir(conf.getDataRegionConsensusDir())
-                    .setMultiLeaderConfig(
-                        MultiLeaderConfig.newBuilder()
-                            .setRpc(
-                                RPC.newBuilder()
-                                    
.setConnectionTimeoutInMs(conf.getConnectionTimeoutInMS())
-                                    .setRpcMaxConcurrentClientNum(
-                                        conf.getRpcMaxConcurrentClientNum())
-                                    .setRpcThriftCompressionEnabled(
-                                        conf.isRpcThriftCompressionEnable())
-                                    .setSelectorNumOfClientManager(
-                                        conf.getSelectorNumOfClientManager())
-                                    .setThriftServerAwaitTimeForStopService(
-                                        
conf.getThriftServerAwaitTimeForStopService())
-                                    .build())
-                            .build())
-                    .setRatisConfig(
-                        RatisConfig.newBuilder()
-                            // An empty log is committed after each restart, 
even if no data is
-                            // written. This setting ensures that compaction 
work is not discarded
-                            // even if there are frequent restarts
-                            
.setSnapshot(Snapshot.newBuilder().setCreationGap(1).build())
-                            .build())
-                    .build(),
-                gid ->
-                    new DataRegionStateMachine(
-                        
StorageEngineV2.getInstance().getDataRegion((DataRegionId) gid)))
-            .orElseThrow(
-                () ->
-                    new IllegalArgumentException(
-                        String.format(
-                            ConsensusFactory.CONSTRUCT_FAILED_MSG,
-                            conf.getDataRegionConsensusProtocolClass())));
-
-    private DataRegionConsensusImplHolder() {}
+  public static synchronized IConsensus setupAndGetInstance() {
+    if (INSTANCE == null) {
+      INSTANCE =
+          ConsensusFactory.getConsensusImpl(
+                  conf.getDataRegionConsensusProtocolClass(),
+                  ConsensusConfig.newBuilder()
+                      .setThisNode(
+                          new TEndPoint(conf.getInternalIp(), 
conf.getDataRegionConsensusPort()))
+                      .setStorageDir(conf.getDataRegionConsensusDir())
+                      .setMultiLeaderConfig(
+                          MultiLeaderConfig.newBuilder()
+                              .setRpc(
+                                  RPC.newBuilder()
+                                      
.setConnectionTimeoutInMs(conf.getConnectionTimeoutInMS())
+                                      .setRpcMaxConcurrentClientNum(
+                                          conf.getRpcMaxConcurrentClientNum())
+                                      .setRpcThriftCompressionEnabled(
+                                          conf.isRpcThriftCompressionEnable())
+                                      .setSelectorNumOfClientManager(
+                                          conf.getSelectorNumOfClientManager())
+                                      .setThriftServerAwaitTimeForStopService(
+                                          
conf.getThriftServerAwaitTimeForStopService())
+                                      .build())
+                              .build())
+                      .setRatisConfig(
+                          RatisConfig.newBuilder()
+                              // An empty log is committed after each restart, 
even if no data is
+                              // written. This setting ensures that compaction 
work is not discarded
+                              // even if there are frequent restarts
+                              
.setSnapshot(Snapshot.newBuilder().setCreationGap(1).build())
+                              .build())
+                      .build(),
+                  gid ->
+                      new DataRegionStateMachine(
+                          
StorageEngineV2.getInstance().getDataRegion((DataRegionId) gid)))
+              .orElseThrow(
+                  () ->
+                      new IllegalArgumentException(
+                          String.format(
+                              ConsensusFactory.CONSTRUCT_FAILED_MSG,
+                              conf.getDataRegionConsensusProtocolClass())));
+    }
+    return INSTANCE;
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
index 177d909c21..bcc8f584c0 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
@@ -35,33 +35,37 @@ import 
org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
  */
 public class SchemaRegionConsensusImpl {
 
+  private static final IoTDBConfig conf = 
IoTDBDescriptor.getInstance().getConfig();
+
+  private static IConsensus INSTANCE = null;
+
   private SchemaRegionConsensusImpl() {}
 
+  // need to create instance before calling this method
   public static IConsensus getInstance() {
-    return SchemaRegionConsensusImplHolder.INSTANCE;
+    return INSTANCE;
   }
 
-  private static class SchemaRegionConsensusImplHolder {
-
-    private static final IoTDBConfig conf = 
IoTDBDescriptor.getInstance().getConfig();
-    private static final IConsensus INSTANCE =
-        ConsensusFactory.getConsensusImpl(
-                conf.getSchemaRegionConsensusProtocolClass(),
-                ConsensusConfig.newBuilder()
-                    .setThisNode(
-                        new TEndPoint(conf.getInternalIp(), 
conf.getSchemaRegionConsensusPort()))
-                    .setStorageDir(conf.getSchemaRegionConsensusDir())
-                    .build(),
-                gid ->
-                    new SchemaRegionStateMachine(
-                        
SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) gid)))
-            .orElseThrow(
-                () ->
-                    new IllegalArgumentException(
-                        String.format(
-                            ConsensusFactory.CONSTRUCT_FAILED_MSG,
-                            conf.getSchemaRegionConsensusProtocolClass())));
-
-    private SchemaRegionConsensusImplHolder() {}
+  public static synchronized IConsensus setupAndGetInstance() {
+    if (INSTANCE == null) {
+      INSTANCE =
+          ConsensusFactory.getConsensusImpl(
+                  conf.getSchemaRegionConsensusProtocolClass(),
+                  ConsensusConfig.newBuilder()
+                      .setThisNode(
+                          new TEndPoint(conf.getInternalIp(), 
conf.getSchemaRegionConsensusPort()))
+                      .setStorageDir(conf.getSchemaRegionConsensusDir())
+                      .build(),
+                  gid ->
+                      new SchemaRegionStateMachine(
+                          
SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) gid)))
+              .orElseThrow(
+                  () ->
+                      new IllegalArgumentException(
+                          String.format(
+                              ConsensusFactory.CONSTRUCT_FAILED_MSG,
+                              conf.getSchemaRegionConsensusProtocolClass())));
+    }
+    return INSTANCE;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java 
b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index cda183bfeb..4762f54a41 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -276,8 +276,8 @@ public class DataNode implements DataNodeMBean {
 
     try {
       // TODO: Start consensus layer in some where else
-      SchemaRegionConsensusImpl.getInstance().start();
-      DataRegionConsensusImpl.getInstance().start();
+      SchemaRegionConsensusImpl.setupAndGetInstance().start();
+      DataRegionConsensusImpl.setupAndGetInstance().start();
     } catch (IOException e) {
       throw new StartupException(e);
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeRPCServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeRPCServiceImpl.java
index fb1d5fb85b..2ec25c1f38 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeRPCServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeRPCServiceImpl.java
@@ -340,22 +340,27 @@ public class DataNodeRPCServiceImpl implements 
IDataNodeRPCService.Iface {
 
   private Map<TConsensusGroupId, Boolean> getJudgedLeaders() {
     Map<TConsensusGroupId, Boolean> result = new HashMap<>();
-    DataRegionConsensusImpl.getInstance()
-        .getAllConsensusGroupIds()
-        .forEach(
-            groupId -> {
-              result.put(
-                  groupId.convertToTConsensusGroupId(),
-                  DataRegionConsensusImpl.getInstance().isLeader(groupId));
-            });
-    SchemaRegionConsensusImpl.getInstance()
-        .getAllConsensusGroupIds()
-        .forEach(
-            groupId -> {
-              result.put(
-                  groupId.convertToTConsensusGroupId(),
-                  SchemaRegionConsensusImpl.getInstance().isLeader(groupId));
-            });
+    if (DataRegionConsensusImpl.getInstance() != null) {
+      DataRegionConsensusImpl.getInstance()
+          .getAllConsensusGroupIds()
+          .forEach(
+              groupId -> {
+                result.put(
+                    groupId.convertToTConsensusGroupId(),
+                    DataRegionConsensusImpl.getInstance().isLeader(groupId));
+              });
+    }
+
+    if (SchemaRegionConsensusImpl.getInstance() != null) {
+      SchemaRegionConsensusImpl.getInstance()
+          .getAllConsensusGroupIds()
+          .forEach(
+              groupId -> {
+                result.put(
+                    groupId.convertToTConsensusGroupId(),
+                    SchemaRegionConsensusImpl.getInstance().isLeader(groupId));
+              });
+    }
     return result;
   }
 
diff --git 
a/server/src/test/java/org/apache/iotdb/db/service/ClientRPCServiceImplTest.java
 
b/server/src/test/java/org/apache/iotdb/db/service/ClientRPCServiceImplTest.java
index 6fdea59066..17be292632 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/service/ClientRPCServiceImplTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/service/ClientRPCServiceImplTest.java
@@ -74,8 +74,8 @@ public class ClientRPCServiceImplTest {
     IoTDB.configManager.init();
     configNode = LocalConfigNode.getInstance();
     configNode.getBelongedSchemaRegionIdWithAutoCreate(new 
PartialPath("root.ln"));
-    DataRegionConsensusImpl.getInstance().start();
-    SchemaRegionConsensusImpl.getInstance().start();
+    DataRegionConsensusImpl.setupAndGetInstance().start();
+    SchemaRegionConsensusImpl.setupAndGetInstance().start();
   }
 
   @Before

Reply via email to