This is an automated email from the ASF dual-hosted git repository.

wangchao316 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new acce8c4631  [IOTDB-3781] Reinforce the regionCleaner task's startup 
logic (#6804)
acce8c4631 is described below

commit acce8c46312869f4f8b9312e1dfa35ca2b802380
Author: YongzaoDan <[email protected]>
AuthorDate: Thu Jul 28 09:10:32 2022 +0800

     [IOTDB-3781] Reinforce the regionCleaner task's startup logic (#6804)
    
    [IOTDB-3781] Reinforce the regionCleaner task's startup logic
---
 .../statemachine/PartitionRegionStateMachine.java  |  6 +-
 .../iotdb/confignode/manager/PartitionManager.java | 79 +++++++++++++++-------
 .../iotdb/confignode/manager/ProcedureManager.java |  2 +
 .../iotdb/confignode/manager/load/LoadManager.java |  4 ++
 4 files changed, 64 insertions(+), 27 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
index 1afcde2f4c..4259e5df71 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
@@ -139,12 +139,16 @@ public class PartitionRegionStateMachine implements 
IStateMachine, IStateMachine
   @Override
   public void notifyLeaderChanged(ConsensusGroupId groupId, TEndPoint 
newLeader) {
     if (currentNode.equals(newLeader)) {
-      LOGGER.info("Current node {} is Leader, start procedure manager.", 
newLeader);
+      LOGGER.info("Current node {} becomes Leader", newLeader);
       configManager.getProcedureManager().shiftExecutor(true);
       configManager.getLoadManager().start();
+      configManager.getPartitionManager().startRegionCleaner();
     } else {
+      LOGGER.info(
+          "Current node {} is not longer the leader, the new leader is {}", 
currentNode, newLeader);
       configManager.getProcedureManager().shiftExecutor(false);
       configManager.getLoadManager().stop();
+      configManager.getPartitionManager().stopRegionCleaner();
     }
   }
 
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
index bd95b0e155..0ba379af5f 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
@@ -62,6 +62,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -73,23 +74,22 @@ public class PartitionManager {
 
   private final IManager configManager;
   private final PartitionInfo partitionInfo;
-  private static final int REGION_CLEANER_WORK_INTERVAL = 300;
-  private static final int REGION_CLEANER_WORK_INITIAL_DELAY = 10;
 
   private SeriesPartitionExecutor executor;
+
+  /** Region cleaner */
+  // Monitor for leadership change
+  private final Object scheduleMonitor = new Object();
+  // Try to delete Regions in every 10s
+  private static final int REGION_CLEANER_WORK_INTERVAL = 10;
   private final ScheduledExecutorService regionCleaner;
+  private Future<?> currentRegionCleanerFuture;
 
   public PartitionManager(IManager configManager, PartitionInfo partitionInfo) 
{
     this.configManager = configManager;
     this.partitionInfo = partitionInfo;
     this.regionCleaner =
         
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("IoTDB-Region-Cleaner");
-    ScheduledExecutorUtil.safelyScheduleAtFixedRate(
-        regionCleaner,
-        this::clearDeletedRegions,
-        REGION_CLEANER_WORK_INITIAL_DELAY,
-        REGION_CLEANER_WORK_INTERVAL,
-        TimeUnit.SECONDS);
     setSeriesPartitionExecutor();
   }
 
@@ -397,24 +397,6 @@ public class PartitionManager {
     getConsensusManager().write(preDeleteStorageGroupPlan);
   }
 
-  /**
-   * Called by {@link PartitionManager#regionCleaner} Delete regions of 
logical deleted storage
-   * groups periodically.
-   */
-  public void clearDeletedRegions() {
-    if (getConsensusManager().isLeader()) {
-      final Set<TRegionReplicaSet> deletedRegionSet = 
partitionInfo.getDeletedRegionSet();
-      if (!deletedRegionSet.isEmpty()) {
-        LOGGER.info(
-            "DELETE REGIONS {} START",
-            deletedRegionSet.stream()
-                .map(TRegionReplicaSet::getRegionId)
-                .collect(Collectors.toList()));
-        SyncDataNodeClientPool.getInstance().deleteRegions(deletedRegionSet);
-      }
-    }
-  }
-
   public void addMetrics() {
     partitionInfo.addMetrics();
   }
@@ -453,6 +435,51 @@ public class PartitionManager {
     return partitionInfo.getRegionStorageGroup(regionId);
   }
 
+  /**
+   * Called by {@link PartitionManager#regionCleaner} Delete regions of 
logical deleted storage
+   * groups periodically.
+   */
+  public void clearDeletedRegions() {
+    if (getConsensusManager().isLeader()) {
+      final Set<TRegionReplicaSet> deletedRegionSet = 
partitionInfo.getDeletedRegionSet();
+      if (!deletedRegionSet.isEmpty()) {
+        LOGGER.info(
+            "DELETE REGIONS {} START",
+            deletedRegionSet.stream()
+                .map(TRegionReplicaSet::getRegionId)
+                .collect(Collectors.toList()));
+        SyncDataNodeClientPool.getInstance().deleteRegions(deletedRegionSet);
+      }
+    }
+  }
+
+  public void startRegionCleaner() {
+    synchronized (scheduleMonitor) {
+      if (currentRegionCleanerFuture == null) {
+        /* Start the RegionCleaner service */
+        currentRegionCleanerFuture =
+            ScheduledExecutorUtil.safelyScheduleAtFixedRate(
+                regionCleaner,
+                this::clearDeletedRegions,
+                0,
+                REGION_CLEANER_WORK_INTERVAL,
+                TimeUnit.SECONDS);
+        LOGGER.info("RegionCleaner is started successfully.");
+      }
+    }
+  }
+
+  public void stopRegionCleaner() {
+    synchronized (scheduleMonitor) {
+      if (currentRegionCleanerFuture != null) {
+        /* Stop the RegionCleaner service */
+        currentRegionCleanerFuture.cancel(false);
+        currentRegionCleanerFuture = null;
+        LOGGER.info("RegionCleaner is stopped successfully.");
+      }
+    }
+  }
+
   public ScheduledExecutorService getRegionCleaner() {
     return regionCleaner;
   }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 7ed3527ab3..564a921f7e 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -83,6 +83,7 @@ public class ProcedureManager {
             CONFIG_NODE_CONFIG.getProcedureCompletedCleanInterval(),
             CONFIG_NODE_CONFIG.getProcedureCompletedEvictTTL());
         store.start();
+        LOGGER.info("ProcedureManager is started successfully.");
       }
     } else {
       if (executor.isRunning()) {
@@ -90,6 +91,7 @@ public class ProcedureManager {
         if (!executor.isRunning()) {
           executor.join();
           store.stop();
+          LOGGER.info("ProcedureManager is stopped successfully.");
         }
       }
     }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 61babc277a..ad55d0cbf2 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -225,6 +225,7 @@ public class LoadManager {
                 0,
                 heartbeatInterval,
                 TimeUnit.MILLISECONDS);
+        LOGGER.info("Heartbeat service is started successfully.");
       }
 
       /* Start the load balancing service */
@@ -236,6 +237,7 @@ public class LoadManager {
                 0,
                 heartbeatInterval,
                 TimeUnit.MILLISECONDS);
+        LOGGER.info("LoadBalancing service is started successfully.");
       }
     }
   }
@@ -247,8 +249,10 @@ public class LoadManager {
       if (currentHeartbeatFuture != null) {
         currentHeartbeatFuture.cancel(false);
         currentHeartbeatFuture = null;
+        LOGGER.info("Heartbeat service is stopped successfully.");
         currentLoadBalancingFuture.cancel(false);
         currentLoadBalancingFuture = null;
+        LOGGER.info("LoadBalancing service is stopped successfully.");
       }
     }
   }

Reply via email to