This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch TriggerTest
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/TriggerTest by this push:
new 88bf37ca32 draft (#7412)
88bf37ca32 is described below
commit 88bf37ca32a782fbecf9992ddd7cdd8257fa3d6e
Author: Liao Lanyu <[email protected]>
AuthorDate: Fri Sep 23 09:33:47 2022 +0800
draft (#7412)
---
.../commons/executable/ExecutableManager.java | 35 ++++++
.../iotdb/commons/trigger/TriggerInformation.java | 17 ++-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
.../config/executor/ClusterConfigTaskExecutor.java | 7 ++
.../impl/DataNodeInternalRPCServiceImpl.java | 8 +-
.../trigger/service/TriggerManagementService.java | 130 ++++++++++++++-------
.../src/main/thrift/confignode.thrift | 3 +-
thrift/src/main/thrift/datanode.thrift | 8 +-
8 files changed, 158 insertions(+), 52 deletions(-)
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
b/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
index 4be42361ab..358bb6ae38 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
@@ -166,4 +166,39 @@ public class ExecutableManager {
throw e;
}
}
+
+
/////////////////////////////////////////////////////////////////////////////////////////////////
+ // other functions
+
/////////////////////////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * @param fileName given file name
+ * @return true if file exists under LibRoot
+ */
+ public boolean hasFileUnderLibRoot(String fileName) {
+ return Files.exists(Paths.get(this.libRoot + File.separator + fileName));
+ }
+
+ public boolean hasFileUnderTemporaryRoot(String fileName) {
+ return Files.exists(Paths.get(this.temporaryLibRoot + File.separator +
fileName));
+ }
+
+ public void saveTextAsFileUnderTemporaryRoot(String text, String fileName)
throws IOException {
+ Path path = Paths.get(this.temporaryLibRoot + File.separator + fileName);
+ Files.deleteIfExists(path);
+ Files.write(path, text.getBytes());
+ }
+
+ public String readTextFromFileUnderTemporaryRoot(String fileName) throws
IOException {
+ Path path = Paths.get(this.temporaryLibRoot + File.separator + fileName);
+ return new String(Files.readAllBytes(path));
+ }
+
+ public String getTemporaryLibRoot() {
+ return temporaryLibRoot;
+ }
+
+ public String getLibRoot() {
+ return libRoot;
+ }
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerInformation.java
b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerInformation.java
index bc66dd28b8..7b3a911139 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerInformation.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerInformation.java
@@ -48,6 +48,9 @@ public class TriggerInformation {
/** only used for Stateful Trigger */
private TDataNodeLocation dataNodeLocation;
+ /** MD5 of the Jar File */
+ private String jarFileMD5;
+
public TriggerInformation() {};
public TriggerInformation(
@@ -58,7 +61,8 @@ public class TriggerInformation {
Map<String, String> attributes,
TTriggerState triggerState,
boolean isStateful,
- TDataNodeLocation dataNodeLocation) {
+ TDataNodeLocation dataNodeLocation,
+ String jarFileMD5) {
this.pathPattern = pathPattern;
this.triggerName = triggerName;
this.className = className;
@@ -67,6 +71,7 @@ public class TriggerInformation {
this.triggerState = triggerState;
this.isStateful = isStateful;
this.dataNodeLocation = dataNodeLocation;
+ this.jarFileMD5 = jarFileMD5;
}
public ByteBuffer serialize() throws IOException {
@@ -87,6 +92,7 @@ public class TriggerInformation {
if (isStateful) {
ThriftCommonsSerDeUtils.serializeTDataNodeLocation(dataNodeLocation,
outputStream);
}
+ ReadWriteIOUtils.write(jarFileMD5, outputStream);
}
public static TriggerInformation deserialize(ByteBuffer byteBuffer) {
@@ -104,6 +110,7 @@ public class TriggerInformation {
triggerInformation.dataNodeLocation =
ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer);
}
+ triggerInformation.jarFileMD5 = ReadWriteIOUtils.readString(byteBuffer);
return triggerInformation;
}
@@ -166,4 +173,12 @@ public class TriggerInformation {
public void setDataNodeLocation(TDataNodeLocation dataNodeLocation) {
this.dataNodeLocation = dataNodeLocation;
}
+
+ public String getJarFileMD5() {
+ return jarFileMD5;
+ }
+
+ public void setJarFileMD5(String jarFileMD5) {
+ this.jarFileMD5 = jarFileMD5;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 5513ff51c5..6906a82d78 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -270,7 +270,7 @@ public class IoTDBConfig {
/** External temporary lib directory for storing downloaded trigger JAR
files */
private String triggerTemporaryLibDir =
- IoTDBConstant.EXT_FOLDER_NAME + File.separator +
IoTDBConstant.UDF_TMP_FOLDER_NAME;
+ IoTDBConstant.EXT_FOLDER_NAME + File.separator +
IoTDBConstant.TRIGGER_TMP_FOLDER_NAME;
/** External lib directory for ext Pipe plugins, stores user-defined JAR
files */
private String extPipeDir =
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index bf4356ec12..bc16779b69 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -92,6 +92,7 @@ import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.TSStatusCode;
import com.google.common.util.concurrent.SettableFuture;
+import org.apache.commons.codec.digest.DigestUtils;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
@@ -102,6 +103,8 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -289,6 +292,10 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
// If jarPath is a file path, we transfer it to ByteBuffer and send it
to ConfigNode.
tCreateTriggerReq.setJarFile(
ExecutableManager.transferToBytebuffer(createTriggerStatement.getJarPath()));
+ // set md5 of the jar file
+ tCreateTriggerReq.setJarMD5(
+ DigestUtils.md5Hex(
+
Files.newInputStream(Paths.get(createTriggerStatement.getJarPath()))));
}
final TSStatus executionStatus = client.createTrigger(tCreateTriggerReq);
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 4d434f5a3a..296e794896 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -87,6 +87,7 @@ import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
import org.apache.iotdb.metrics.type.Gauge;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
+import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TCancelFragmentInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TCancelPlanFragmentReq;
import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
@@ -96,6 +97,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest;
import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TDeleteDataForDeleteTimeSeriesReq;
import org.apache.iotdb.mpp.rpc.thrift.TDeleteTimeSeriesReq;
import org.apache.iotdb.mpp.rpc.thrift.TDisableDataNodeReq;
@@ -126,8 +128,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
-import org.apache.iotdb.mpp.rpc.thrift.TactiveTriggerInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TcreateTriggerInstanceReq;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
@@ -925,7 +925,7 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
}
@Override
- public TSStatus createTriggerInstance(TcreateTriggerInstanceReq req) throws
TException {
+ public TSStatus createTriggerInstance(TCreateTriggerInstanceReq req) throws
TException {
TriggerInformation triggerInformation =
TriggerInformation.deserialize(req.triggerInformation);
try {
// set state to INACTIVE when creating trigger instance
@@ -948,7 +948,7 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
}
@Override
- public TSStatus activeTriggerInstance(TactiveTriggerInstanceReq req) throws
TException {
+ public TSStatus activeTriggerInstance(TActiveTriggerInstanceReq req) throws
TException {
try {
TriggerManagementService.getInstance().activeTrigger(req.triggerName);
} catch (Exception e) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
index 6b63006df6..221cdf9faf 100644
---
a/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
+++
b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
@@ -19,8 +19,6 @@
package org.apache.iotdb.db.trigger.service;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
@@ -29,15 +27,21 @@ import org.apache.iotdb.commons.trigger.TriggerTable;
import org.apache.iotdb.commons.trigger.exception.TriggerManagementException;
import org.apache.iotdb.commons.trigger.service.TriggerClassLoader;
import org.apache.iotdb.commons.trigger.service.TriggerClassLoaderManager;
+import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.trigger.executor.TriggerExecutor;
import org.apache.iotdb.trigger.api.Trigger;
+import org.apache.commons.codec.digest.DigestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
@@ -46,7 +50,7 @@ public class TriggerManagementService implements IService {
private static final Logger LOGGER =
LoggerFactory.getLogger(TriggerManagementService.class);
- private final ReentrantLock registrationLock;
+ private final ReentrantLock lock;
private final TriggerTable triggerTable;
@@ -54,44 +58,107 @@ public class TriggerManagementService implements IService {
private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
- private TDataNodeLocation tDataNodeLocationCache;
+ private static final int DATA_NODE_ID =
IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
private TriggerManagementService() {
- this.registrationLock = new ReentrantLock();
+ this.lock = new ReentrantLock();
this.triggerTable = new TriggerTable();
this.executorMap = new ConcurrentHashMap<>();
}
- public void acquireRegistrationLock() {
- registrationLock.lock();
+ public void acquireLock() {
+ lock.lock();
}
- public void releaseRegistrationLock() {
- registrationLock.unlock();
+ public void releaseLock() {
+ lock.unlock();
}
public void register(TriggerInformation triggerInformation) {
- acquireRegistrationLock();
- checkIfRegistered(triggerInformation);
- doRegister(triggerInformation);
- releaseRegistrationLock();
+ try {
+ acquireLock();
+ checkIfRegistered(triggerInformation);
+ doRegister(triggerInformation);
+ } catch (Exception e) {
+ LOGGER.warn(
+ "Failed to register trigger({}) on data node, the cause is: {}",
+ triggerInformation.getTriggerName(),
+ e.getMessage());
+ } finally {
+ releaseLock();
+ }
};
public void activeTrigger(String triggerName) {
- triggerTable.activeTrigger(triggerName);
+ try {
+ acquireLock();
+ triggerTable.activeTrigger(triggerName);
+ } catch (Exception e) {
+ LOGGER.warn(
+ "Failed to active trigger({}) on data node, the cause is: {}",
+ triggerName,
+ e.getMessage());
+ } finally {
+ releaseLock();
+ }
};
private void checkIfRegistered(TriggerInformation triggerInformation)
throws TriggerManagementException {
String triggerName = triggerInformation.getTriggerName();
if (triggerTable.containsTrigger(triggerName)) {
- String errorMessage =
- String.format(
- "Failed to registered trigger %s, "
- + "because trigger %s has already been registered in
TriggerTable",
- triggerName, triggerName);
- LOGGER.warn(errorMessage);
- throw new TriggerManagementException(errorMessage);
+ String jarName = triggerInformation.getJarName();
+ if (TriggerExecutableManager.getInstance().hasFileUnderLibRoot(jarName))
{
+ // A jar with the same name exists, we need to check md5
+ String existedMd5 = "";
+ String md5FilePath = triggerName + ".txt";
+
+ // if meet error when reading md5 from txt, we need to compute it again
+ boolean hasComputed = false;
+ if
(TriggerExecutableManager.getInstance().hasFileUnderTemporaryRoot(md5FilePath))
{
+ try {
+ existedMd5 =
+ TriggerExecutableManager.getInstance()
+ .readTextFromFileUnderTemporaryRoot(md5FilePath);
+ hasComputed = true;
+ } catch (IOException e) {
+ LOGGER.warn("Error occurred when trying to read md5 of {}",
md5FilePath);
+ }
+ }
+ if (!hasComputed) {
+ try {
+ existedMd5 =
+ DigestUtils.md5Hex(
+ Files.newInputStream(
+ Paths.get(
+ TriggerExecutableManager.getInstance().getLibRoot()
+ + File.separator
+ + triggerInformation.getJarName())));
+ // save the md5 in a txt under trigger temporary lib
+ TriggerExecutableManager.getInstance()
+ .saveTextAsFileUnderTemporaryRoot(existedMd5, md5FilePath);
+ } catch (IOException e) {
+ String errorMessage =
+ String.format(
+ "Failed to registered trigger %s, "
+ + "because error occurred when trying to compute md5
of jar file for trigger %s ",
+ triggerName, triggerName);
+ LOGGER.warn(errorMessage);
+ throw new TriggerManagementException(errorMessage);
+ }
+ }
+
+ if (!existedMd5.equals(triggerInformation.getJarFileMD5())) {
+ // same jar name with different md5
+ String errorMessage =
+ String.format(
+ "Failed to registered trigger %s, "
+ + "because existed md5 of jar file for trigger %s is
different from the new jar file. ",
+ triggerName, triggerName);
+ LOGGER.warn(errorMessage);
+ throw new TriggerManagementException(errorMessage);
+ }
+ }
}
}
@@ -102,7 +169,7 @@ public class TriggerManagementService implements IService {
triggerTable.addTriggerInformation(triggerName, triggerInformation);
// if it is a stateful trigger, we only create its instance on specified
DataNode
if (!triggerInformation.isStateful()
- ||
triggerInformation.getDataNodeLocation().equals(getTDataNodeLocation())) {
+ || triggerInformation.getDataNodeLocation().getDataNodeId() ==
DATA_NODE_ID) {
// get trigger instance
Trigger trigger =
constructTriggerInstance(triggerInformation.getClassName(),
currentActiveClassLoader);
@@ -138,25 +205,6 @@ public class TriggerManagementService implements IService {
}
}
- private TDataNodeLocation getTDataNodeLocation() {
- if (tDataNodeLocationCache == null) {
- // Set DataNodeLocation
- tDataNodeLocationCache = new TDataNodeLocation();
- tDataNodeLocationCache.setDataNodeId(CONFIG.getDataNodeId());
- tDataNodeLocationCache.setClientRpcEndPoint(
- new TEndPoint(CONFIG.getRpcAddress(), CONFIG.getRpcPort()));
- tDataNodeLocationCache.setInternalEndPoint(
- new TEndPoint(CONFIG.getInternalAddress(),
CONFIG.getInternalPort()));
- tDataNodeLocationCache.setMPPDataExchangeEndPoint(
- new TEndPoint(CONFIG.getInternalAddress(),
CONFIG.getMppDataExchangePort()));
- tDataNodeLocationCache.setDataRegionConsensusEndPoint(
- new TEndPoint(CONFIG.getInternalAddress(),
CONFIG.getDataRegionConsensusPort()));
- tDataNodeLocationCache.setSchemaRegionConsensusEndPoint(
- new TEndPoint(CONFIG.getInternalAddress(),
CONFIG.getSchemaRegionConsensusPort()));
- }
- return tDataNodeLocationCache;
- }
-
@Override
public void start() throws StartupException {}
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift
b/thrift-confignode/src/main/thrift/confignode.thrift
index 14b7979fce..262c0dd7e6 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -264,7 +264,8 @@ struct TCreateTriggerReq {
6: required byte triggerType
7: required binary pathPattern,
8: required map<string, string> attributes,
- 9: optional binary jarFile
+ 9: optional binary jarFile,
+ 10: optional string jarMD5
}
struct TDropTriggerReq {
diff --git a/thrift/src/main/thrift/datanode.thrift
b/thrift/src/main/thrift/datanode.thrift
index 2fc8d346ef..12d3a41ddb 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -169,12 +169,12 @@ struct TDropFunctionRequest {
1: required string udfName
}
-struct TcreateTriggerInstanceReq {
+struct TCreateTriggerInstanceReq {
1: required binary triggerInformation
2: required binary jarFile
}
-struct TactiveTriggerInstanceReq {
+struct TActiveTriggerInstanceReq {
1: required string triggerName
}
@@ -421,14 +421,14 @@ service IDataNodeRPCService {
*
* @param TriggerInformation, jar file.
**/
- common.TSStatus createTriggerInstance(TcreateTriggerInstanceReq req)
+ common.TSStatus createTriggerInstance(TCreateTriggerInstanceReq req)
/**
* Config node will active a trigger instance on data node.
*
* @param trigger name.
**/
- common.TSStatus activeTriggerInstance(TactiveTriggerInstanceReq req)
+ common.TSStatus activeTriggerInstance(TActiveTriggerInstanceReq req)
/**
* Config node will drop a trigger on all online config nodes and data
nodes.