This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d1a974e1219 Pipe: avoid executing too many PipeMetaSyncProcedure after 
system reboot (#12213)
d1a974e1219 is described below

commit d1a974e121987f60b61b87a9d5a46fcde9059c34
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Mar 21 20:38:16 2024 +0800

    Pipe: avoid executing too many PipeMetaSyncProcedure after system reboot 
(#12213)
---
 .../impl/pipe/runtime/PipeMetaSyncProcedure.java   | 26 +++++++++++++++++++++-
 1 file changed, 25 insertions(+), 1 deletion(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
index ff4b6c335e4..5f3e72f00a8 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
@@ -20,11 +20,13 @@
 package org.apache.iotdb.confignode.procedure.impl.pipe.runtime;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleMetaChangePlan;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import 
org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2;
 import org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation;
+import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
 import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
@@ -39,15 +41,37 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class PipeMetaSyncProcedure extends AbstractOperatePipeProcedureV2 {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeMetaSyncProcedure.class);
 
+  private static final long MIN_EXECUTION_INTERVAL_MS =
+      PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 * 
1000 / 2;
+  // No need to serialize this field
+  private static final AtomicLong LAST_EXECUTION_TIME = new AtomicLong(0);
+
   public PipeMetaSyncProcedure() {
     super();
   }
 
+  @Override
+  protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv 
configNodeProcedureEnv) {
+    // Skip the procedure if the last execution time is within the minimum 
execution interval.
+    // Often used to prevent the procedure from being executed too frequently 
when system reboot.
+    if (System.currentTimeMillis() - LAST_EXECUTION_TIME.get() < 
MIN_EXECUTION_INTERVAL_MS) {
+      // Skip by setting the pipeTaskInfo to null
+      pipeTaskInfo = null;
+      LOGGER.info(
+          "PipeMetaSyncProcedure: executeFromValidateTask, skip the procedure 
due to the last execution time {}",
+          LAST_EXECUTION_TIME.get());
+      return ProcedureLockState.LOCK_ACQUIRED;
+    }
+
+    return super.acquireLock(configNodeProcedureEnv);
+  }
+
   @Override
   protected PipeTaskOperation getOperation() {
     return PipeTaskOperation.SYNC_PIPE_META;
@@ -57,7 +81,7 @@ public class PipeMetaSyncProcedure extends 
AbstractOperatePipeProcedureV2 {
   public boolean executeFromValidateTask(ConfigNodeProcedureEnv env) {
     LOGGER.info("PipeMetaSyncProcedure: executeFromValidateTask");
 
-    // Do nothing
+    LAST_EXECUTION_TIME.set(System.currentTimeMillis());
     return false;
   }
 

Reply via email to