This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d1a974e1219 Pipe: avoid executing too many PipeMetaSyncProcedure after
system reboot (#12213)
d1a974e1219 is described below
commit d1a974e121987f60b61b87a9d5a46fcde9059c34
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Mar 21 20:38:16 2024 +0800
Pipe: avoid executing too many PipeMetaSyncProcedure after system reboot
(#12213)
---
.../impl/pipe/runtime/PipeMetaSyncProcedure.java | 26 +++++++++++++++++++++-
1 file changed, 25 insertions(+), 1 deletion(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
index ff4b6c335e4..5f3e72f00a8 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
@@ -20,11 +20,13 @@
package org.apache.iotdb.confignode.procedure.impl.pipe.runtime;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleMetaChangePlan;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import
org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2;
import org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation;
+import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
@@ -39,15 +41,37 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
public class PipeMetaSyncProcedure extends AbstractOperatePipeProcedureV2 {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeMetaSyncProcedure.class);
+ private static final long MIN_EXECUTION_INTERVAL_MS =
+ PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 *
1000 / 2;
+ // No need to serialize this field
+ private static final AtomicLong LAST_EXECUTION_TIME = new AtomicLong(0);
+
public PipeMetaSyncProcedure() {
super();
}
+ @Override
+ protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv
configNodeProcedureEnv) {
+ // Skip the procedure if the last execution time is within the minimum
execution interval.
+ // Often used to prevent the procedure from being executed too frequently
when system reboot.
+ if (System.currentTimeMillis() - LAST_EXECUTION_TIME.get() <
MIN_EXECUTION_INTERVAL_MS) {
+ // Skip by setting the pipeTaskInfo to null
+ pipeTaskInfo = null;
+ LOGGER.info(
+ "PipeMetaSyncProcedure: executeFromValidateTask, skip the procedure
due to the last execution time {}",
+ LAST_EXECUTION_TIME.get());
+ return ProcedureLockState.LOCK_ACQUIRED;
+ }
+
+ return super.acquireLock(configNodeProcedureEnv);
+ }
+
@Override
protected PipeTaskOperation getOperation() {
return PipeTaskOperation.SYNC_PIPE_META;
@@ -57,7 +81,7 @@ public class PipeMetaSyncProcedure extends
AbstractOperatePipeProcedureV2 {
public boolean executeFromValidateTask(ConfigNodeProcedureEnv env) {
LOGGER.info("PipeMetaSyncProcedure: executeFromValidateTask");
- // Do nothing
+ LAST_EXECUTION_TIME.set(System.currentTimeMillis());
return false;
}