This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch TriggerTest
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/TriggerTest by this push:
     new 0abbb91099 Trigger test (#7419)
0abbb91099 is described below

commit 0abbb910990f1940d8fccb30e198236baf7b6b02
Author: Liao Lanyu <[email protected]>
AuthorDate: Fri Sep 23 17:31:36 2022 +0800

    Trigger test (#7419)
---
 .../commons/executable/ExecutableManager.java      | 51 +++++++++++++------
 .../commons/udf/service/UDFExecutableManager.java  |  6 +++
 .../udf/service/UDFRegistrationService.java        |  6 +--
 .../config/executor/ClusterConfigTaskExecutor.java | 58 ++++++++++++++++++++--
 .../trigger/service/TriggerManagementService.java  |  2 +-
 .../src/main/thrift/confignode.thrift              |  3 +-
 .../iotdb/trigger/api/enums/FailureStrategy.java   | 37 ++++++++++----
 7 files changed, 129 insertions(+), 34 deletions(-)

diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
index 4146e39ab8..21582ab5d5 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
@@ -59,27 +59,33 @@ public class ExecutableManager {
   public ExecutableResource request(List<String> uris) throws 
URISyntaxException, IOException {
     final long requestId = generateNextRequestId();
     downloadExecutables(uris, requestId);
-    return new ExecutableResource(requestId, 
getDirStringByRequestId(requestId));
+    return new ExecutableResource(requestId, 
getDirStringUnderTempRootByRequestId(requestId));
   }
 
-  public void moveToExtLibDir(ExecutableResource resource, String name) throws 
IOException {
-    FileUtils.moveDirectory(getDirByRequestId(resource.getRequestId()), 
getDirByName(name));
+  public void moveTempDirToExtLibDir(ExecutableResource resource, String name) 
throws IOException {
+    FileUtils.moveDirectory(
+        getDirUnderTempRootByRequestId(resource.getRequestId()), 
getDirUnderLibRootByName(name));
   }
 
-  public void removeFromTemporaryLibRoot(ExecutableResource resource) {
-    removeFromTemporaryLibRoot(resource.getRequestId());
+  public void moveFileUnderTempRootToExtLibDir(ExecutableResource resource, 
String name)
+      throws IOException {
+    FileUtils.moveFileToDirectory(
+        getFileByFullPath(
+            getDirStringUnderTempRootByRequestId(resource.getRequestId()) + 
File.separator + name),
+        getFileByFullPath(libRoot),
+        false);
   }
 
-  public void removeFromExtLibDir(String functionName) {
-    FileUtils.deleteQuietly(getDirByName(functionName));
+  public void removeFromTemporaryLibRoot(ExecutableResource resource) {
+    removeFromTemporaryLibRoot(resource.getRequestId());
   }
 
   private synchronized long generateNextRequestId() throws IOException {
     long requestId = requestCounter.getAndIncrement();
-    while (FileUtils.isDirectory(getDirByRequestId(requestId))) {
+    while (FileUtils.isDirectory(getDirUnderTempRootByRequestId(requestId))) {
       requestId = requestCounter.getAndIncrement();
     }
-    FileUtils.forceMkdir(getDirByRequestId(requestId));
+    FileUtils.forceMkdir(getDirUnderTempRootByRequestId(requestId));
     return requestId;
   }
 
@@ -101,29 +107,42 @@ public class ExecutableManager {
   }
 
   private void removeFromTemporaryLibRoot(long requestId) {
-    FileUtils.deleteQuietly(getDirByRequestId(requestId));
+    FileUtils.deleteQuietly(getDirUnderTempRootByRequestId(requestId));
   }
 
   
/////////////////////////////////////////////////////////////////////////////////////////////////
   // dir string and dir file generation
   
/////////////////////////////////////////////////////////////////////////////////////////////////
 
-  public File getDirByRequestId(long requestId) {
-    return 
FSFactoryProducer.getFSFactory().getFile(getDirStringByRequestId(requestId));
+  public File getDirUnderTempRootByRequestId(long requestId) {
+    return FSFactoryProducer.getFSFactory()
+        .getFile(getDirStringUnderTempRootByRequestId(requestId));
   }
 
-  public String getDirStringByRequestId(long requestId) {
+  public String getDirStringUnderTempRootByRequestId(long requestId) {
     return temporaryLibRoot + File.separator + requestId + File.separator;
   }
 
-  public File getDirByName(String name) {
-    return FSFactoryProducer.getFSFactory().getFile(getDirStringByName(name));
+  public File getDirUnderLibRootByName(String name) {
+    return 
FSFactoryProducer.getFSFactory().getFile(getDirStringUnderLibRootByName(name));
   }
 
-  public String getDirStringByName(String name) {
+  public String getDirStringUnderLibRootByName(String name) {
     return libRoot + File.separator + name + File.separator;
   }
 
+  public File getFileUnderLibRootByName(String name) {
+    return 
FSFactoryProducer.getFSFactory().getFile(getFileStringUnderLibRootByName(name));
+  }
+
+  public String getFileStringUnderLibRootByName(String name) {
+    return libRoot + File.separator + name;
+  }
+
+  private File getFileByFullPath(String path) {
+    return FSFactoryProducer.getFSFactory().getFile(path);
+  }
+
   
/////////////////////////////////////////////////////////////////////////////////////////////////
   // transfer jar file to bytebuffer for thrift
   
/////////////////////////////////////////////////////////////////////////////////////////////////
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java
index a1cf6f34c8..6ea6f840c0 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java
@@ -26,6 +26,8 @@ import org.apache.iotdb.commons.service.IService;
 import org.apache.iotdb.commons.service.ServiceType;
 import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
 
+import org.apache.commons.io.FileUtils;
+
 import java.io.File;
 import java.io.IOException;
 
@@ -35,6 +37,10 @@ public class UDFExecutableManager extends ExecutableManager 
implements IService,
     super(temporaryLibRoot, udfLibRoot);
   }
 
+  public void removeUDFJarFromExtLibDir(String functionName) {
+    FileUtils.deleteQuietly(getDirUnderLibRootByName(functionName));
+  }
+
   
/////////////////////////////////////////////////////////////////////////////////////////////////
   // IService
   
/////////////////////////////////////////////////////////////////////////////////////////////////
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationService.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationService.java
index c0bf6a5d3e..14bb0b510e 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationService.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationService.java
@@ -173,10 +173,10 @@ public class UDFRegistrationService implements IService, 
SnapshotProcessor {
     try {
       final ExecutableResource resource = udfExecutableManager.request(uris);
       try {
-        udfExecutableManager.removeFromExtLibDir(functionName);
-        udfExecutableManager.moveToExtLibDir(resource, functionName);
+        udfExecutableManager.removeUDFJarFromExtLibDir(functionName);
+        udfExecutableManager.moveTempDirToExtLibDir(resource, functionName);
       } catch (Exception innerException) {
-        udfExecutableManager.removeFromExtLibDir(functionName);
+        udfExecutableManager.removeUDFJarFromExtLibDir(functionName);
         udfExecutableManager.removeFromTemporaryLibRoot(resource);
         throw innerException;
       }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index f62ad90d93..49015dcc1a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -27,8 +27,12 @@ import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.consensus.PartitionRegionId;
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.executable.ExecutableManager;
+import org.apache.iotdb.commons.executable.ExecutableResource;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.commons.trigger.service.TriggerClassLoader;
+import org.apache.iotdb.commons.trigger.service.TriggerClassLoaderManager;
+import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
 import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
@@ -90,6 +94,8 @@ import 
org.apache.iotdb.db.mpp.plan.statement.sys.sync.StopPipeStatement;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.trigger.api.Trigger;
+import org.apache.iotdb.trigger.api.enums.FailureStrategy;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
 import com.google.common.util.concurrent.SettableFuture;
@@ -108,6 +114,7 @@ import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -290,9 +297,39 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
               createTriggerStatement.getTriggerEvent().getId(),
               createTriggerStatement.getTriggerType().getId(),
               createTriggerStatement.getPathPattern().serialize(),
-              createTriggerStatement.getAttributes());
+              createTriggerStatement.getAttributes(),
+              FailureStrategy.OPTIMISTIC.getId()); // set default strategy
 
-      if (!createTriggerStatement.isUsingURI()) {
+      if (createTriggerStatement.isUsingURI()) {
+        try {
+          // download executable
+          ExecutableResource resource =
+              TriggerExecutableManager.getInstance()
+                  
.request(Collections.singletonList(createTriggerStatement.getJarPath()));
+          String uriString = createTriggerStatement.getJarPath();
+          String jarFileName = uriString.substring(uriString.lastIndexOf("/") 
+ 1);
+          // move to ext
+          TriggerExecutableManager.getInstance()
+              .moveFileUnderTempRootToExtLibDir(resource, jarFileName);
+          tCreateTriggerReq.setJarPath(jarFileName);
+          // jarFilePath after moving to ext lib
+          String jarFilePathUnderLib =
+              
TriggerExecutableManager.getInstance().getFileStringUnderLibRootByName(jarFileName);
+          
tCreateTriggerReq.setJarFile(ExecutableManager.transferToBytebuffer(jarFilePathUnderLib));
+          tCreateTriggerReq.setJarMD5(
+              
DigestUtils.md5Hex(Files.newInputStream(Paths.get(jarFilePathUnderLib))));
+
+        } catch (Exception e) {
+          LOGGER.warn(
+              "Failed to download executable for trigger({}) using URI: {}, 
the cause is: {}",
+              createTriggerStatement.getTriggerName(),
+              createTriggerStatement.getJarPath(),
+              e.getMessage());
+          throw e;
+        }
+      } else {
+        // set jarPath to file name instead of the full path
+        tCreateTriggerReq.setJarPath(new 
File(createTriggerStatement.getJarPath()).getName());
         // If jarPath is a file path, we transfer it to ByteBuffer and send it 
to ConfigNode.
         tCreateTriggerReq.setJarPath(new 
File(createTriggerStatement.getJarPath()).getName());
 
@@ -304,6 +341,21 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
                 
Files.newInputStream(Paths.get(createTriggerStatement.getJarPath()))));
       }
 
+      // try to create instance, this request will fail if creation is not 
successful
+      try (TriggerClassLoader classLoader =
+          
TriggerClassLoaderManager.getInstance().updateAndGetActiveClassLoader()) {
+        Class<?> triggerClass =
+            Class.forName(createTriggerStatement.getClassName(), true, 
classLoader);
+        Trigger trigger = (Trigger) 
triggerClass.getDeclaredConstructor().newInstance();
+        
tCreateTriggerReq.setFailureStrategy(trigger.getFailureStrategy().getId());
+      } catch (Exception e) {
+        LOGGER.warn(
+            "Failed to create trigger when try to create trigger({}) instance 
first, the cause is: {}",
+            createTriggerStatement.getTriggerName(),
+            e.getMessage());
+        throw e;
+      }
+
       final TSStatus executionStatus = client.createTrigger(tCreateTriggerReq);
 
       if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != 
executionStatus.getCode()) {
@@ -316,7 +368,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
       } else {
         future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
       }
-    } catch (TException | IOException e) {
+    } catch (Exception e) {
       future.setException(e);
     }
     return future;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
 
b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
index 38cb52e44b..ae81a5d499 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
@@ -204,7 +204,7 @@ public class TriggerManagementService implements IService {
     }
   }
 
-  private Trigger constructTriggerInstance(String className, 
TriggerClassLoader classLoader)
+  public Trigger constructTriggerInstance(String className, TriggerClassLoader 
classLoader)
       throws TriggerManagementException {
     try {
       Class<?> triggerClass = Class.forName(className, true, classLoader);
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift 
b/thrift-confignode/src/main/thrift/confignode.thrift
index 1d9bc2b2a9..3f8f15b204 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -263,7 +263,8 @@ struct TCreateTriggerReq {
   7: required binary pathPattern,
   8: required map<string, string> attributes,
   9: optional binary jarFile,
-  10: optional string jarMD5
+  10: optional string jarMD5,
+  11: required i32 failureStrategy
 }
 
 struct TDropTriggerReq {
diff --git 
a/trigger-api/src/main/java/org/apache/iotdb/trigger/api/enums/FailureStrategy.java
 
b/trigger-api/src/main/java/org/apache/iotdb/trigger/api/enums/FailureStrategy.java
index 5920f33996..6f3faf342e 100644
--- 
a/trigger-api/src/main/java/org/apache/iotdb/trigger/api/enums/FailureStrategy.java
+++ 
b/trigger-api/src/main/java/org/apache/iotdb/trigger/api/enums/FailureStrategy.java
@@ -16,7 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.iotdb.trigger.api.enums;
 
 import org.apache.iotdb.trigger.api.Trigger;
@@ -34,16 +33,34 @@ public enum FailureStrategy {
    * not have any influence on the triggers that have not been fired. The 
failure of this Trigger
    * will be simply ignored.
    */
-  OPTIMISTIC,
+  OPTIMISTIC(0),
 
   /**
-   * If this strategy were adopted, the failure of {@link 
Trigger#fire(Tablet)} of one Tablet would
-   * throw an exception and end this insertion. If a PESSIMISTIC trigger whose 
TRIGGER_EVENT is
-   * {@link TriggerEvent#BEFORE_INSERT} fails to fire in an insertion, all the 
triggers that have
-   * not fired will not be fired in this insertion and this insertion will not 
be executed. if a
-   * PESSIMISTIC trigger whose TRIGGER_EVENT is {@link 
TriggerEvent#AFTER_INSERT} fails to fire in
-   * an insertion, all the triggers that have not fired will not be fired, and 
this insertion will
-   * be marked as failed even if the insertion itself executed successfully.
+   * If this strategy were adopted, the failure of {@link 
Trigger#fire(Tablet)} of one Tablet
+   * would @@ -45,5 +45,26 @@ public enum FailureStrategy { an insertion, all 
the triggers that have
+   * not fired will not be fired, and this insertion will be marked as failed 
even if the insertion
+   * itself executed successfully.
    */
-  PESSIMISTIC,
+  PESSIMISTIC(1);
+
+  private final int id;
+
+  FailureStrategy(int id) {
+    this.id = id;
+  }
+
+  public int getId() {
+    return id;
+  }
+
+  public static FailureStrategy construct(int id) {
+    switch (id) {
+      case 0:
+        return FailureStrategy.OPTIMISTIC;
+      case 1:
+        return FailureStrategy.PESSIMISTIC;
+      default:
+        throw new UnsupportedOperationException("Unsupported FailureStrategy 
Type.");
+    }
+  }
 }

Reply via email to