This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch TriggerTest
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/TriggerTest by this push:
     new 18a165f6e1 fix ExecutableManager and complete createTrigger of 
ConfigNode (#7427)
18a165f6e1 is described below

commit 18a165f6e1045699344ae937be7e3caf48db2350
Author: Weihao Li <[email protected]>
AuthorDate: Mon Sep 26 10:38:44 2022 +0800

    fix ExecutableManager and complete createTrigger of ConfigNode (#7427)
---
 .../iotdb/confignode/persistence/TriggerInfo.java  |  4 +-
 .../procedure/env/ConfigNodeProcedureEnv.java      | 37 +++++++++++++++++
 .../procedure/impl/CreateTriggerProcedure.java     | 46 +++++++++++++++++++++-
 .../commons/executable/ExecutableManager.java      | 15 +++----
 .../config/executor/ClusterConfigTaskExecutor.java |  1 -
 5 files changed, 89 insertions(+), 14 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
index 6636a37ca7..449ef63997 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
@@ -115,7 +115,9 @@ public class TriggerInfo implements SnapshotProcessor {
   public TSStatus addTriggerInTable(AddTriggerInTablePlan physicalPlan) {
     try {
       TriggerInformation triggerInformation = 
physicalPlan.getTriggerInformation();
-      triggerTable.addTriggerInformation(triggerInformation.getTriggerName(), 
triggerInformation);
+      String triggerName = triggerInformation.getTriggerName();
+      triggerTable.addTriggerInformation(triggerName, triggerInformation);
+      existedJarToMD5.put(triggerName, triggerInformation.getJarFileMD5());
       if (physicalPlan.getJarFile() != null) {
         triggerExecutableManager.writeToLibDir(
             ByteBuffer.wrap(physicalPlan.getJarFile().getValues()),
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 142c2b3d0a..d169dbd51b 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -50,6 +50,8 @@ import 
org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
 import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
 import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateTriggerInstanceReq;
+import org.apache.iotdb.mpp.rpc.thrift.TDropTriggerInstanceReq;
+import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.Binary;
@@ -374,6 +376,24 @@ public class ConfigNodeProcedureEnv {
     return dataNodeResponseStatus;
   }
 
+  public List<TSStatus> dropTriggerOnDataNodes(TriggerInformation 
triggerInformation)
+      throws IOException {
+    NodeManager nodeManager = configManager.getNodeManager();
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        nodeManager.getRegisteredDataNodeLocations();
+    final List<TSStatus> dataNodeResponseStatus =
+        Collections.synchronizedList(new 
ArrayList<>(dataNodeLocationMap.size()));
+    final TDropTriggerInstanceReq request =
+        new TDropTriggerInstanceReq(triggerInformation.getTriggerName(), 
false);
+    AsyncDataNodeClientPool.getInstance()
+        .sendAsyncRequestToDataNodeWithRetry(
+            request,
+            dataNodeLocationMap,
+            DataNodeRequestType.DROP_TRIGGER_INSTANCE,
+            dataNodeResponseStatus);
+    return dataNodeResponseStatus;
+  }
+
   public List<TSStatus> activeTriggerOnDataNodes(String triggerName) throws 
IOException {
     NodeManager nodeManager = configManager.getNodeManager();
     final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
@@ -391,6 +411,23 @@ public class ConfigNodeProcedureEnv {
     return dataNodeResponseStatus;
   }
 
+  public List<TSStatus> inactiveTriggerOnDataNodes(String triggerName) throws 
IOException {
+    NodeManager nodeManager = configManager.getNodeManager();
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        nodeManager.getRegisteredDataNodeLocations();
+    final List<TSStatus> dataNodeResponseStatus =
+        Collections.synchronizedList(new 
ArrayList<>(dataNodeLocationMap.size()));
+    final TInactiveTriggerInstanceReq request = new 
TInactiveTriggerInstanceReq(triggerName);
+
+    AsyncDataNodeClientPool.getInstance()
+        .sendAsyncRequestToDataNodeWithRetry(
+            request,
+            dataNodeLocationMap,
+            DataNodeRequestType.INACTIVE_TRIGGER_INSTANCE,
+            dataNodeResponseStatus);
+    return dataNodeResponseStatus;
+  }
+
   public LockQueue getNodeLock() {
     return nodeLock;
   }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedure.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedure.java
index a6a42ddfde..35e6907e25 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedure.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedure.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.confignode.procedure.impl;
 import org.apache.iotdb.commons.trigger.TriggerInformation;
 import org.apache.iotdb.commons.trigger.exception.TriggerManagementException;
 import 
org.apache.iotdb.confignode.consensus.request.write.trigger.AddTriggerInTablePlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.trigger.DeleteTriggerInTablePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerStateInTablePlan;
 import org.apache.iotdb.confignode.persistence.TriggerInfo;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
@@ -120,7 +121,7 @@ public class CreateTriggerProcedure extends 
AbstractNodeProcedure<CreateTriggerS
                       triggerInformation.getTriggerName(), 
TTriggerState.ACTIVE));
           setNextState(CreateTriggerState.CONFIG_NODE_ACTIVE);
           break;
-        case CONFIG_NODE_ACTIVE:
+        case CONFIG_NODE_ACTIVE: // TODO change name to END
           
env.getConfigManager().getTriggerManager().getTriggerInfo().releaseTriggerTableLock();
           return Flow.NO_MORE_STATE;
       }
@@ -144,7 +145,48 @@ public class CreateTriggerProcedure extends 
AbstractNodeProcedure<CreateTriggerS
   @Override
   protected void rollbackState(ConfigNodeProcedureEnv env, CreateTriggerState 
state)
       throws IOException, InterruptedException, ProcedureException {
-    // TODO
+    switch (state) {
+      case INIT:
+        LOG.info("Start [INIT] rollback of trigger [{}]", 
triggerInformation.getTriggerName());
+        env.getConfigManager()
+            .getConsensusManager()
+            .write(new 
DeleteTriggerInTablePlan(triggerInformation.getTriggerName()));
+        
env.getConfigManager().getTriggerManager().getTriggerInfo().releaseTriggerTableLock();
+        break;
+      case CONFIG_NODE_INACTIVE:
+        LOG.info(
+            "Start to [CONFIG_NODE_INACTIVE] rollback of trigger [{}]",
+            triggerInformation.getTriggerName());
+        if 
(RpcUtils.squashResponseStatusList(env.dropTriggerOnDataNodes(triggerInformation))
+                .getCode()
+            == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        } else {
+          throw new TriggerManagementException(
+              String.format(
+                  "Fail to [CONFIG_NODE_INACTIVE] rollback of trigger [{}]",
+                  triggerInformation.getTriggerName()));
+        }
+        break;
+      case DATA_NODE_INACTIVE:
+        LOG.info(
+            "Start to [DATA_NODE_INACTIVE] rollback of trigger [{}]",
+            triggerInformation.getTriggerName());
+        if (RpcUtils.squashResponseStatusList(
+                    
env.inactiveTriggerOnDataNodes(triggerInformation.getTriggerName()))
+                .getCode()
+            == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          setNextState(CreateTriggerState.DATA_NODE_ACTIVE);
+        } else {
+          throw new TriggerManagementException(
+              String.format(
+                  "Fail to [DATA_NODE_INACTIVE] rollback of trigger [{}]",
+                  triggerInformation.getTriggerName()));
+        }
+        break;
+      case DATA_NODE_ACTIVE:
+      case CONFIG_NODE_ACTIVE:
+        break;
+    }
   }
 
   @Override
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
index e47d1d1fc5..2094cc3939 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
@@ -77,19 +77,12 @@ public class ExecutableManager {
         false);
   }
 
-  public void moveFileUnderTempRootToExtLibDir(ExecutableResource resource, 
String name)
-      throws IOException {
-    FileUtils.moveFileToDirectory(
-        getFileByFullPath(
-            getDirStringUnderTempRootByRequestId(resource.getRequestId()) + 
File.separator + name),
-        getFileByFullPath(libRoot),
-        false);
-  }
-
   public void copyFileToExtLibDir(String filePath) throws IOException {
     FileUtils.copyFileToDirectory(
         FSFactoryProducer.getFSFactory().getFile(filePath),
         FSFactoryProducer.getFSFactory().getFile(this.libRoot));
+  }
+
   public void removeFromTemporaryLibRoot(ExecutableResource resource) {
     removeFromTemporaryLibRoot(resource.getRequestId());
   }
@@ -188,7 +181,9 @@ public class ExecutableManager {
    */
   public void writeToLibDir(ByteBuffer byteBuffer, String fileName) throws 
IOException {
     String destination = this.libRoot + File.separator + fileName;
-    Files.deleteIfExists(Paths.get(destination));
+    Path path = Paths.get(destination);
+    Files.deleteIfExists(path);
+    Files.createFile(path);
     try (FileOutputStream outputStream = new FileOutputStream(destination)) {
       outputStream.getChannel().write(byteBuffer);
     } catch (IOException e) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 5b6e61b428..8d250d1eab 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -99,7 +99,6 @@ import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.trigger.api.Trigger;
 import org.apache.iotdb.trigger.api.enums.FailureStrategy;
-import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
 import com.google.common.util.concurrent.SettableFuture;
 import org.apache.commons.codec.digest.DigestUtils;

Reply via email to