This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch TriggerTest
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/TriggerTest by this push:
new 18a165f6e1 fix ExecutableManager and complete createTrigger of
ConfigNode (#7427)
18a165f6e1 is described below
commit 18a165f6e1045699344ae937be7e3caf48db2350
Author: Weihao Li <[email protected]>
AuthorDate: Mon Sep 26 10:38:44 2022 +0800
fix ExecutableManager and complete createTrigger of ConfigNode (#7427)
---
.../iotdb/confignode/persistence/TriggerInfo.java | 4 +-
.../procedure/env/ConfigNodeProcedureEnv.java | 37 +++++++++++++++++
.../procedure/impl/CreateTriggerProcedure.java | 46 +++++++++++++++++++++-
.../commons/executable/ExecutableManager.java | 15 +++----
.../config/executor/ClusterConfigTaskExecutor.java | 1 -
5 files changed, 89 insertions(+), 14 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
index 6636a37ca7..449ef63997 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
@@ -115,7 +115,9 @@ public class TriggerInfo implements SnapshotProcessor {
public TSStatus addTriggerInTable(AddTriggerInTablePlan physicalPlan) {
try {
TriggerInformation triggerInformation =
physicalPlan.getTriggerInformation();
- triggerTable.addTriggerInformation(triggerInformation.getTriggerName(),
triggerInformation);
+ String triggerName = triggerInformation.getTriggerName();
+ triggerTable.addTriggerInformation(triggerName, triggerInformation);
+ existedJarToMD5.put(triggerName, triggerInformation.getJarFileMD5());
if (physicalPlan.getJarFile() != null) {
triggerExecutableManager.writeToLibDir(
ByteBuffer.wrap(physicalPlan.getJarFile().getValues()),
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 142c2b3d0a..d169dbd51b 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -50,6 +50,8 @@ import
org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateTriggerInstanceReq;
+import org.apache.iotdb.mpp.rpc.thrift.TDropTriggerInstanceReq;
+import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.Binary;
@@ -374,6 +376,24 @@ public class ConfigNodeProcedureEnv {
return dataNodeResponseStatus;
}
+ public List<TSStatus> dropTriggerOnDataNodes(TriggerInformation
triggerInformation)
+ throws IOException {
+ NodeManager nodeManager = configManager.getNodeManager();
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ nodeManager.getRegisteredDataNodeLocations();
+ final List<TSStatus> dataNodeResponseStatus =
+ Collections.synchronizedList(new
ArrayList<>(dataNodeLocationMap.size()));
+ final TDropTriggerInstanceReq request =
+ new TDropTriggerInstanceReq(triggerInformation.getTriggerName(),
false);
+ AsyncDataNodeClientPool.getInstance()
+ .sendAsyncRequestToDataNodeWithRetry(
+ request,
+ dataNodeLocationMap,
+ DataNodeRequestType.DROP_TRIGGER_INSTANCE,
+ dataNodeResponseStatus);
+ return dataNodeResponseStatus;
+ }
+
public List<TSStatus> activeTriggerOnDataNodes(String triggerName) throws
IOException {
NodeManager nodeManager = configManager.getNodeManager();
final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
@@ -391,6 +411,23 @@ public class ConfigNodeProcedureEnv {
return dataNodeResponseStatus;
}
+ public List<TSStatus> inactiveTriggerOnDataNodes(String triggerName) throws
IOException {
+ NodeManager nodeManager = configManager.getNodeManager();
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ nodeManager.getRegisteredDataNodeLocations();
+ final List<TSStatus> dataNodeResponseStatus =
+ Collections.synchronizedList(new
ArrayList<>(dataNodeLocationMap.size()));
+ final TInactiveTriggerInstanceReq request = new
TInactiveTriggerInstanceReq(triggerName);
+
+ AsyncDataNodeClientPool.getInstance()
+ .sendAsyncRequestToDataNodeWithRetry(
+ request,
+ dataNodeLocationMap,
+ DataNodeRequestType.INACTIVE_TRIGGER_INSTANCE,
+ dataNodeResponseStatus);
+ return dataNodeResponseStatus;
+ }
+
public LockQueue getNodeLock() {
return nodeLock;
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedure.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedure.java
index a6a42ddfde..35e6907e25 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedure.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedure.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.confignode.procedure.impl;
import org.apache.iotdb.commons.trigger.TriggerInformation;
import org.apache.iotdb.commons.trigger.exception.TriggerManagementException;
import
org.apache.iotdb.confignode.consensus.request.write.trigger.AddTriggerInTablePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.trigger.DeleteTriggerInTablePlan;
import
org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerStateInTablePlan;
import org.apache.iotdb.confignode.persistence.TriggerInfo;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
@@ -120,7 +121,7 @@ public class CreateTriggerProcedure extends
AbstractNodeProcedure<CreateTriggerS
triggerInformation.getTriggerName(),
TTriggerState.ACTIVE));
setNextState(CreateTriggerState.CONFIG_NODE_ACTIVE);
break;
- case CONFIG_NODE_ACTIVE:
+ case CONFIG_NODE_ACTIVE: // TODO change name to END
env.getConfigManager().getTriggerManager().getTriggerInfo().releaseTriggerTableLock();
return Flow.NO_MORE_STATE;
}
@@ -144,7 +145,48 @@ public class CreateTriggerProcedure extends
AbstractNodeProcedure<CreateTriggerS
@Override
protected void rollbackState(ConfigNodeProcedureEnv env, CreateTriggerState
state)
throws IOException, InterruptedException, ProcedureException {
- // TODO
+ switch (state) {
+ case INIT:
+ LOG.info("Start [INIT] rollback of trigger [{}]",
triggerInformation.getTriggerName());
+ env.getConfigManager()
+ .getConsensusManager()
+ .write(new
DeleteTriggerInTablePlan(triggerInformation.getTriggerName()));
+
env.getConfigManager().getTriggerManager().getTriggerInfo().releaseTriggerTableLock();
+ break;
+ case CONFIG_NODE_INACTIVE:
+ LOG.info(
+ "Start to [CONFIG_NODE_INACTIVE] rollback of trigger [{}]",
+ triggerInformation.getTriggerName());
+ if
(RpcUtils.squashResponseStatusList(env.dropTriggerOnDataNodes(triggerInformation))
+ .getCode()
+ == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ } else {
+ throw new TriggerManagementException(
+ String.format(
+ "Fail to [CONFIG_NODE_INACTIVE] rollback of trigger [{}]",
+ triggerInformation.getTriggerName()));
+ }
+ break;
+ case DATA_NODE_INACTIVE:
+ LOG.info(
+ "Start to [DATA_NODE_INACTIVE] rollback of trigger [{}]",
+ triggerInformation.getTriggerName());
+ if (RpcUtils.squashResponseStatusList(
+
env.inactiveTriggerOnDataNodes(triggerInformation.getTriggerName()))
+ .getCode()
+ == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ setNextState(CreateTriggerState.DATA_NODE_ACTIVE);
+ } else {
+ throw new TriggerManagementException(
+ String.format(
+ "Fail to [DATA_NODE_INACTIVE] rollback of trigger [{}]",
+ triggerInformation.getTriggerName()));
+ }
+ break;
+ case DATA_NODE_ACTIVE:
+ case CONFIG_NODE_ACTIVE:
+ break;
+ }
}
@Override
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
b/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
index e47d1d1fc5..2094cc3939 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
@@ -77,19 +77,12 @@ public class ExecutableManager {
false);
}
- public void moveFileUnderTempRootToExtLibDir(ExecutableResource resource,
String name)
- throws IOException {
- FileUtils.moveFileToDirectory(
- getFileByFullPath(
- getDirStringUnderTempRootByRequestId(resource.getRequestId()) +
File.separator + name),
- getFileByFullPath(libRoot),
- false);
- }
-
public void copyFileToExtLibDir(String filePath) throws IOException {
FileUtils.copyFileToDirectory(
FSFactoryProducer.getFSFactory().getFile(filePath),
FSFactoryProducer.getFSFactory().getFile(this.libRoot));
+ }
+
public void removeFromTemporaryLibRoot(ExecutableResource resource) {
removeFromTemporaryLibRoot(resource.getRequestId());
}
@@ -188,7 +181,9 @@ public class ExecutableManager {
*/
public void writeToLibDir(ByteBuffer byteBuffer, String fileName) throws
IOException {
String destination = this.libRoot + File.separator + fileName;
- Files.deleteIfExists(Paths.get(destination));
+ Path path = Paths.get(destination);
+ Files.deleteIfExists(path);
+ Files.createFile(path);
try (FileOutputStream outputStream = new FileOutputStream(destination)) {
outputStream.getChannel().write(byteBuffer);
} catch (IOException e) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 5b6e61b428..8d250d1eab 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -99,7 +99,6 @@ import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.trigger.api.Trigger;
import org.apache.iotdb.trigger.api.enums.FailureStrategy;
-import org.apache.iotdb.tsfile.utils.PublicBAOS;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.commons.codec.digest.DigestUtils;