This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch TriggerTest
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/TriggerTest by this push:
     new 88bf37ca32 draft (#7412)
88bf37ca32 is described below

commit 88bf37ca32a782fbecf9992ddd7cdd8257fa3d6e
Author: Liao Lanyu <[email protected]>
AuthorDate: Fri Sep 23 09:33:47 2022 +0800

    draft (#7412)
---
 .../commons/executable/ExecutableManager.java      |  35 ++++++
 .../iotdb/commons/trigger/TriggerInformation.java  |  17 ++-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   2 +-
 .../config/executor/ClusterConfigTaskExecutor.java |   7 ++
 .../impl/DataNodeInternalRPCServiceImpl.java       |   8 +-
 .../trigger/service/TriggerManagementService.java  | 130 ++++++++++++++-------
 .../src/main/thrift/confignode.thrift              |   3 +-
 thrift/src/main/thrift/datanode.thrift             |   8 +-
 8 files changed, 158 insertions(+), 52 deletions(-)

diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
index 4be42361ab..358bb6ae38 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
@@ -166,4 +166,39 @@ public class ExecutableManager {
       throw e;
     }
   }
+
+  
/////////////////////////////////////////////////////////////////////////////////////////////////
+  // other functions
+  
/////////////////////////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * @param fileName given file name
+   * @return true if file exists under LibRoot
+   */
+  public boolean hasFileUnderLibRoot(String fileName) {
+    return Files.exists(Paths.get(this.libRoot + File.separator + fileName));
+  }
+
+  public boolean hasFileUnderTemporaryRoot(String fileName) {
+    return Files.exists(Paths.get(this.temporaryLibRoot + File.separator + 
fileName));
+  }
+
+  public void saveTextAsFileUnderTemporaryRoot(String text, String fileName) 
throws IOException {
+    Path path = Paths.get(this.temporaryLibRoot + File.separator + fileName);
+    Files.deleteIfExists(path);
+    Files.write(path, text.getBytes());
+  }
+
+  public String readTextFromFileUnderTemporaryRoot(String fileName) throws 
IOException {
+    Path path = Paths.get(this.temporaryLibRoot + File.separator + fileName);
+    return new String(Files.readAllBytes(path));
+  }
+
+  public String getTemporaryLibRoot() {
+    return temporaryLibRoot;
+  }
+
+  public String getLibRoot() {
+    return libRoot;
+  }
 }
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerInformation.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerInformation.java
index bc66dd28b8..7b3a911139 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerInformation.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerInformation.java
@@ -48,6 +48,9 @@ public class TriggerInformation {
   /** only used for Stateful Trigger */
   private TDataNodeLocation dataNodeLocation;
 
+  /** MD5 of the Jar File */
+  private String jarFileMD5;
+
   public TriggerInformation() {};
 
   public TriggerInformation(
@@ -58,7 +61,8 @@ public class TriggerInformation {
       Map<String, String> attributes,
       TTriggerState triggerState,
       boolean isStateful,
-      TDataNodeLocation dataNodeLocation) {
+      TDataNodeLocation dataNodeLocation,
+      String jarFileMD5) {
     this.pathPattern = pathPattern;
     this.triggerName = triggerName;
     this.className = className;
@@ -67,6 +71,7 @@ public class TriggerInformation {
     this.triggerState = triggerState;
     this.isStateful = isStateful;
     this.dataNodeLocation = dataNodeLocation;
+    this.jarFileMD5 = jarFileMD5;
   }
 
   public ByteBuffer serialize() throws IOException {
@@ -87,6 +92,7 @@ public class TriggerInformation {
     if (isStateful) {
       ThriftCommonsSerDeUtils.serializeTDataNodeLocation(dataNodeLocation, 
outputStream);
     }
+    ReadWriteIOUtils.write(jarFileMD5, outputStream);
   }
 
   public static TriggerInformation deserialize(ByteBuffer byteBuffer) {
@@ -104,6 +110,7 @@ public class TriggerInformation {
       triggerInformation.dataNodeLocation =
           ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer);
     }
+    triggerInformation.jarFileMD5 = ReadWriteIOUtils.readString(byteBuffer);
     return triggerInformation;
   }
 
@@ -166,4 +173,12 @@ public class TriggerInformation {
   public void setDataNodeLocation(TDataNodeLocation dataNodeLocation) {
     this.dataNodeLocation = dataNodeLocation;
   }
+
+  public String getJarFileMD5() {
+    return jarFileMD5;
+  }
+
+  public void setJarFileMD5(String jarFileMD5) {
+    this.jarFileMD5 = jarFileMD5;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 5513ff51c5..6906a82d78 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -270,7 +270,7 @@ public class IoTDBConfig {
 
   /** External temporary lib directory for storing downloaded trigger JAR 
files */
   private String triggerTemporaryLibDir =
-      IoTDBConstant.EXT_FOLDER_NAME + File.separator + 
IoTDBConstant.UDF_TMP_FOLDER_NAME;
+      IoTDBConstant.EXT_FOLDER_NAME + File.separator + 
IoTDBConstant.TRIGGER_TMP_FOLDER_NAME;
 
   /** External lib directory for ext Pipe plugins, stores user-defined JAR 
files */
   private String extPipeDir =
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index bf4356ec12..bc16779b69 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -92,6 +92,7 @@ import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import com.google.common.util.concurrent.SettableFuture;
+import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
@@ -102,6 +103,8 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -289,6 +292,10 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
         // If jarPath is a file path, we transfer it to ByteBuffer and send it 
to ConfigNode.
         tCreateTriggerReq.setJarFile(
             
ExecutableManager.transferToBytebuffer(createTriggerStatement.getJarPath()));
+        // set md5 of the jar file
+        tCreateTriggerReq.setJarMD5(
+            DigestUtils.md5Hex(
+                
Files.newInputStream(Paths.get(createTriggerStatement.getJarPath()))));
       }
 
       final TSStatus executionStatus = client.createTrigger(tCreateTriggerReq);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 4d434f5a3a..296e794896 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -87,6 +87,7 @@ import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
 import org.apache.iotdb.metrics.type.Gauge;
 import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
+import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCancelFragmentInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCancelPlanFragmentReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
@@ -96,6 +97,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest;
 import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TDeleteDataForDeleteTimeSeriesReq;
 import org.apache.iotdb.mpp.rpc.thrift.TDeleteTimeSeriesReq;
 import org.apache.iotdb.mpp.rpc.thrift.TDisableDataNodeReq;
@@ -126,8 +128,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
 import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
 import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
-import org.apache.iotdb.mpp.rpc.thrift.TactiveTriggerInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TcreateTriggerInstanceReq;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
@@ -925,7 +925,7 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
   }
 
   @Override
-  public TSStatus createTriggerInstance(TcreateTriggerInstanceReq req) throws 
TException {
+  public TSStatus createTriggerInstance(TCreateTriggerInstanceReq req) throws 
TException {
     TriggerInformation triggerInformation = 
TriggerInformation.deserialize(req.triggerInformation);
     try {
       // set state to INACTIVE when creating trigger instance
@@ -948,7 +948,7 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
   }
 
   @Override
-  public TSStatus activeTriggerInstance(TactiveTriggerInstanceReq req) throws 
TException {
+  public TSStatus activeTriggerInstance(TActiveTriggerInstanceReq req) throws 
TException {
     try {
       TriggerManagementService.getInstance().activeTrigger(req.triggerName);
     } catch (Exception e) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
 
b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
index 6b63006df6..221cdf9faf 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
@@ -19,8 +19,6 @@
 
 package org.apache.iotdb.db.trigger.service;
 
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.service.IService;
 import org.apache.iotdb.commons.service.ServiceType;
@@ -29,15 +27,21 @@ import org.apache.iotdb.commons.trigger.TriggerTable;
 import org.apache.iotdb.commons.trigger.exception.TriggerManagementException;
 import org.apache.iotdb.commons.trigger.service.TriggerClassLoader;
 import org.apache.iotdb.commons.trigger.service.TriggerClassLoaderManager;
+import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.trigger.executor.TriggerExecutor;
 import org.apache.iotdb.trigger.api.Trigger;
 
+import org.apache.commons.codec.digest.DigestUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantLock;
@@ -46,7 +50,7 @@ public class TriggerManagementService implements IService {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TriggerManagementService.class);
 
-  private final ReentrantLock registrationLock;
+  private final ReentrantLock lock;
 
   private final TriggerTable triggerTable;
 
@@ -54,44 +58,107 @@ public class TriggerManagementService implements IService {
 
   private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
 
-  private TDataNodeLocation tDataNodeLocationCache;
+  private static final int DATA_NODE_ID = 
IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
 
   private TriggerManagementService() {
-    this.registrationLock = new ReentrantLock();
+    this.lock = new ReentrantLock();
     this.triggerTable = new TriggerTable();
     this.executorMap = new ConcurrentHashMap<>();
   }
 
-  public void acquireRegistrationLock() {
-    registrationLock.lock();
+  public void acquireLock() {
+    lock.lock();
   }
 
-  public void releaseRegistrationLock() {
-    registrationLock.unlock();
+  public void releaseLock() {
+    lock.unlock();
   }
 
   public void register(TriggerInformation triggerInformation) {
-    acquireRegistrationLock();
-    checkIfRegistered(triggerInformation);
-    doRegister(triggerInformation);
-    releaseRegistrationLock();
+    try {
+      acquireLock();
+      checkIfRegistered(triggerInformation);
+      doRegister(triggerInformation);
+    } catch (Exception e) {
+      LOGGER.warn(
+          "Failed to register trigger({}) on data node, the cause is: {}",
+          triggerInformation.getTriggerName(),
+          e.getMessage());
+    } finally {
+      releaseLock();
+    }
   };
 
   public void activeTrigger(String triggerName) {
-    triggerTable.activeTrigger(triggerName);
+    try {
+      acquireLock();
+      triggerTable.activeTrigger(triggerName);
+    } catch (Exception e) {
+      LOGGER.warn(
+          "Failed to active trigger({}) on data node, the cause is: {}",
+          triggerName,
+          e.getMessage());
+    } finally {
+      releaseLock();
+    }
   };
 
   private void checkIfRegistered(TriggerInformation triggerInformation)
       throws TriggerManagementException {
     String triggerName = triggerInformation.getTriggerName();
     if (triggerTable.containsTrigger(triggerName)) {
-      String errorMessage =
-          String.format(
-              "Failed to registered trigger %s, "
-                  + "because trigger %s has already been registered in 
TriggerTable",
-              triggerName, triggerName);
-      LOGGER.warn(errorMessage);
-      throw new TriggerManagementException(errorMessage);
+      String jarName = triggerInformation.getJarName();
+      if (TriggerExecutableManager.getInstance().hasFileUnderLibRoot(jarName)) 
{
+        // A jar with the same name exists, we need to check md5
+        String existedMd5 = "";
+        String md5FilePath = triggerName + ".txt";
+
+        // if meet error when reading md5 from txt, we need to compute it again
+        boolean hasComputed = false;
+        if 
(TriggerExecutableManager.getInstance().hasFileUnderTemporaryRoot(md5FilePath)) 
{
+          try {
+            existedMd5 =
+                TriggerExecutableManager.getInstance()
+                    .readTextFromFileUnderTemporaryRoot(md5FilePath);
+            hasComputed = true;
+          } catch (IOException e) {
+            LOGGER.warn("Error occurred when trying to read md5 of {}", 
md5FilePath);
+          }
+        }
+        if (!hasComputed) {
+          try {
+            existedMd5 =
+                DigestUtils.md5Hex(
+                    Files.newInputStream(
+                        Paths.get(
+                            TriggerExecutableManager.getInstance().getLibRoot()
+                                + File.separator
+                                + triggerInformation.getJarName())));
+            // save the md5 in a txt under trigger temporary lib
+            TriggerExecutableManager.getInstance()
+                .saveTextAsFileUnderTemporaryRoot(existedMd5, md5FilePath);
+          } catch (IOException e) {
+            String errorMessage =
+                String.format(
+                    "Failed to registered trigger %s, "
+                        + "because error occurred when trying to compute md5 
of jar file for trigger %s ",
+                    triggerName, triggerName);
+            LOGGER.warn(errorMessage);
+            throw new TriggerManagementException(errorMessage);
+          }
+        }
+
+        if (!existedMd5.equals(triggerInformation.getJarFileMD5())) {
+          // same jar name with different md5
+          String errorMessage =
+              String.format(
+                  "Failed to registered trigger %s, "
+                      + "because existed md5 of jar file for trigger %s is 
different from the new jar file. ",
+                  triggerName, triggerName);
+          LOGGER.warn(errorMessage);
+          throw new TriggerManagementException(errorMessage);
+        }
+      }
     }
   }
 
@@ -102,7 +169,7 @@ public class TriggerManagementService implements IService {
       triggerTable.addTriggerInformation(triggerName, triggerInformation);
       // if it is a stateful trigger, we only create its instance on specified 
DataNode
       if (!triggerInformation.isStateful()
-          || 
triggerInformation.getDataNodeLocation().equals(getTDataNodeLocation())) {
+          || triggerInformation.getDataNodeLocation().getDataNodeId() == 
DATA_NODE_ID) {
         // get trigger instance
         Trigger trigger =
             constructTriggerInstance(triggerInformation.getClassName(), 
currentActiveClassLoader);
@@ -138,25 +205,6 @@ public class TriggerManagementService implements IService {
     }
   }
 
-  private TDataNodeLocation getTDataNodeLocation() {
-    if (tDataNodeLocationCache == null) {
-      // Set DataNodeLocation
-      tDataNodeLocationCache = new TDataNodeLocation();
-      tDataNodeLocationCache.setDataNodeId(CONFIG.getDataNodeId());
-      tDataNodeLocationCache.setClientRpcEndPoint(
-          new TEndPoint(CONFIG.getRpcAddress(), CONFIG.getRpcPort()));
-      tDataNodeLocationCache.setInternalEndPoint(
-          new TEndPoint(CONFIG.getInternalAddress(), 
CONFIG.getInternalPort()));
-      tDataNodeLocationCache.setMPPDataExchangeEndPoint(
-          new TEndPoint(CONFIG.getInternalAddress(), 
CONFIG.getMppDataExchangePort()));
-      tDataNodeLocationCache.setDataRegionConsensusEndPoint(
-          new TEndPoint(CONFIG.getInternalAddress(), 
CONFIG.getDataRegionConsensusPort()));
-      tDataNodeLocationCache.setSchemaRegionConsensusEndPoint(
-          new TEndPoint(CONFIG.getInternalAddress(), 
CONFIG.getSchemaRegionConsensusPort()));
-    }
-    return tDataNodeLocationCache;
-  }
-
   @Override
   public void start() throws StartupException {}
 
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift 
b/thrift-confignode/src/main/thrift/confignode.thrift
index 14b7979fce..262c0dd7e6 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -264,7 +264,8 @@ struct TCreateTriggerReq {
   6: required byte triggerType
   7: required binary pathPattern,
   8: required map<string, string> attributes,
-  9: optional binary jarFile
+  9: optional binary jarFile,
+  10: optional string jarMD5
 }
 
 struct TDropTriggerReq {
diff --git a/thrift/src/main/thrift/datanode.thrift 
b/thrift/src/main/thrift/datanode.thrift
index 2fc8d346ef..12d3a41ddb 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -169,12 +169,12 @@ struct TDropFunctionRequest {
   1: required string udfName
 }
 
-struct TcreateTriggerInstanceReq {
+struct TCreateTriggerInstanceReq {
   1: required binary triggerInformation
   2: required binary jarFile
 }
 
-struct TactiveTriggerInstanceReq {
+struct TActiveTriggerInstanceReq {
   1: required string triggerName
 }
 
@@ -421,14 +421,14 @@ service IDataNodeRPCService {
    *
    * @param TriggerInformation, jar file.
    **/
-  common.TSStatus createTriggerInstance(TcreateTriggerInstanceReq req)
+  common.TSStatus createTriggerInstance(TCreateTriggerInstanceReq req)
 
   /**
    * Config node will active a trigger instance on data node.
    *
    * @param trigger name.
    **/
-  common.TSStatus activeTriggerInstance(TactiveTriggerInstanceReq req)
+  common.TSStatus activeTriggerInstance(TActiveTriggerInstanceReq req)
 
   /**
     * Config node will drop a trigger on all online config nodes and data 
nodes.

Reply via email to