This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch TriggerTest
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/TriggerTest by this push:
new 0abbb91099 Trigger test (#7419)
0abbb91099 is described below
commit 0abbb910990f1940d8fccb30e198236baf7b6b02
Author: Liao Lanyu <[email protected]>
AuthorDate: Fri Sep 23 17:31:36 2022 +0800
Trigger test (#7419)
---
.../commons/executable/ExecutableManager.java | 51 +++++++++++++------
.../commons/udf/service/UDFExecutableManager.java | 6 +++
.../udf/service/UDFRegistrationService.java | 6 +--
.../config/executor/ClusterConfigTaskExecutor.java | 58 ++++++++++++++++++++--
.../trigger/service/TriggerManagementService.java | 2 +-
.../src/main/thrift/confignode.thrift | 3 +-
.../iotdb/trigger/api/enums/FailureStrategy.java | 37 ++++++++++----
7 files changed, 129 insertions(+), 34 deletions(-)
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
b/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
index 4146e39ab8..21582ab5d5 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
@@ -59,27 +59,33 @@ public class ExecutableManager {
public ExecutableResource request(List<String> uris) throws
URISyntaxException, IOException {
final long requestId = generateNextRequestId();
downloadExecutables(uris, requestId);
- return new ExecutableResource(requestId,
getDirStringByRequestId(requestId));
+ return new ExecutableResource(requestId,
getDirStringUnderTempRootByRequestId(requestId));
}
- public void moveToExtLibDir(ExecutableResource resource, String name) throws
IOException {
- FileUtils.moveDirectory(getDirByRequestId(resource.getRequestId()),
getDirByName(name));
+ public void moveTempDirToExtLibDir(ExecutableResource resource, String name)
throws IOException {
+ FileUtils.moveDirectory(
+ getDirUnderTempRootByRequestId(resource.getRequestId()),
getDirUnderLibRootByName(name));
}
- public void removeFromTemporaryLibRoot(ExecutableResource resource) {
- removeFromTemporaryLibRoot(resource.getRequestId());
+ public void moveFileUnderTempRootToExtLibDir(ExecutableResource resource,
String name)
+ throws IOException {
+ FileUtils.moveFileToDirectory(
+ getFileByFullPath(
+ getDirStringUnderTempRootByRequestId(resource.getRequestId()) +
File.separator + name),
+ getFileByFullPath(libRoot),
+ false);
}
- public void removeFromExtLibDir(String functionName) {
- FileUtils.deleteQuietly(getDirByName(functionName));
+ public void removeFromTemporaryLibRoot(ExecutableResource resource) {
+ removeFromTemporaryLibRoot(resource.getRequestId());
}
private synchronized long generateNextRequestId() throws IOException {
long requestId = requestCounter.getAndIncrement();
- while (FileUtils.isDirectory(getDirByRequestId(requestId))) {
+ while (FileUtils.isDirectory(getDirUnderTempRootByRequestId(requestId))) {
requestId = requestCounter.getAndIncrement();
}
- FileUtils.forceMkdir(getDirByRequestId(requestId));
+ FileUtils.forceMkdir(getDirUnderTempRootByRequestId(requestId));
return requestId;
}
@@ -101,29 +107,42 @@ public class ExecutableManager {
}
private void removeFromTemporaryLibRoot(long requestId) {
- FileUtils.deleteQuietly(getDirByRequestId(requestId));
+ FileUtils.deleteQuietly(getDirUnderTempRootByRequestId(requestId));
}
/////////////////////////////////////////////////////////////////////////////////////////////////
// dir string and dir file generation
/////////////////////////////////////////////////////////////////////////////////////////////////
- public File getDirByRequestId(long requestId) {
- return
FSFactoryProducer.getFSFactory().getFile(getDirStringByRequestId(requestId));
+ public File getDirUnderTempRootByRequestId(long requestId) {
+ return FSFactoryProducer.getFSFactory()
+ .getFile(getDirStringUnderTempRootByRequestId(requestId));
}
- public String getDirStringByRequestId(long requestId) {
+ public String getDirStringUnderTempRootByRequestId(long requestId) {
return temporaryLibRoot + File.separator + requestId + File.separator;
}
- public File getDirByName(String name) {
- return FSFactoryProducer.getFSFactory().getFile(getDirStringByName(name));
+ public File getDirUnderLibRootByName(String name) {
+ return
FSFactoryProducer.getFSFactory().getFile(getDirStringUnderLibRootByName(name));
}
- public String getDirStringByName(String name) {
+ public String getDirStringUnderLibRootByName(String name) {
return libRoot + File.separator + name + File.separator;
}
+ public File getFileUnderLibRootByName(String name) {
+ return
FSFactoryProducer.getFSFactory().getFile(getFileStringUnderLibRootByName(name));
+ }
+
+ public String getFileStringUnderLibRootByName(String name) {
+ return libRoot + File.separator + name;
+ }
+
+ private File getFileByFullPath(String path) {
+ return FSFactoryProducer.getFSFactory().getFile(path);
+ }
+
/////////////////////////////////////////////////////////////////////////////////////////////////
// transfer jar file to bytebuffer for thrift
/////////////////////////////////////////////////////////////////////////////////////////////////
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java
b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java
index a1cf6f34c8..6ea6f840c0 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java
@@ -26,6 +26,8 @@ import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
+import org.apache.commons.io.FileUtils;
+
import java.io.File;
import java.io.IOException;
@@ -35,6 +37,10 @@ public class UDFExecutableManager extends ExecutableManager
implements IService,
super(temporaryLibRoot, udfLibRoot);
}
+ public void removeUDFJarFromExtLibDir(String functionName) {
+ FileUtils.deleteQuietly(getDirUnderLibRootByName(functionName));
+ }
+
/////////////////////////////////////////////////////////////////////////////////////////////////
// IService
/////////////////////////////////////////////////////////////////////////////////////////////////
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationService.java
b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationService.java
index c0bf6a5d3e..14bb0b510e 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationService.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationService.java
@@ -173,10 +173,10 @@ public class UDFRegistrationService implements IService,
SnapshotProcessor {
try {
final ExecutableResource resource = udfExecutableManager.request(uris);
try {
- udfExecutableManager.removeFromExtLibDir(functionName);
- udfExecutableManager.moveToExtLibDir(resource, functionName);
+ udfExecutableManager.removeUDFJarFromExtLibDir(functionName);
+ udfExecutableManager.moveTempDirToExtLibDir(resource, functionName);
} catch (Exception innerException) {
- udfExecutableManager.removeFromExtLibDir(functionName);
+ udfExecutableManager.removeUDFJarFromExtLibDir(functionName);
udfExecutableManager.removeFromTemporaryLibRoot(resource);
throw innerException;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index f62ad90d93..49015dcc1a 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -27,8 +27,12 @@ import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.consensus.PartitionRegionId;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.executable.ExecutableManager;
+import org.apache.iotdb.commons.executable.ExecutableResource;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.commons.trigger.service.TriggerClassLoader;
+import org.apache.iotdb.commons.trigger.service.TriggerClassLoaderManager;
+import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
@@ -90,6 +94,8 @@ import
org.apache.iotdb.db.mpp.plan.statement.sys.sync.StopPipeStatement;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.trigger.api.Trigger;
+import org.apache.iotdb.trigger.api.enums.FailureStrategy;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import com.google.common.util.concurrent.SettableFuture;
@@ -108,6 +114,7 @@ import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -290,9 +297,39 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
createTriggerStatement.getTriggerEvent().getId(),
createTriggerStatement.getTriggerType().getId(),
createTriggerStatement.getPathPattern().serialize(),
- createTriggerStatement.getAttributes());
+ createTriggerStatement.getAttributes(),
+ FailureStrategy.OPTIMISTIC.getId()); // set default strategy
- if (!createTriggerStatement.isUsingURI()) {
+ if (createTriggerStatement.isUsingURI()) {
+ try {
+ // download executable
+ ExecutableResource resource =
+ TriggerExecutableManager.getInstance()
+
.request(Collections.singletonList(createTriggerStatement.getJarPath()));
+ String uriString = createTriggerStatement.getJarPath();
+ String jarFileName = uriString.substring(uriString.lastIndexOf("/")
+ 1);
+ // move to ext
+ TriggerExecutableManager.getInstance()
+ .moveFileUnderTempRootToExtLibDir(resource, jarFileName);
+ tCreateTriggerReq.setJarPath(jarFileName);
+ // jarFilePath after moving to ext lib
+ String jarFilePathUnderLib =
+
TriggerExecutableManager.getInstance().getFileStringUnderLibRootByName(jarFileName);
+
tCreateTriggerReq.setJarFile(ExecutableManager.transferToBytebuffer(jarFilePathUnderLib));
+ tCreateTriggerReq.setJarMD5(
+
DigestUtils.md5Hex(Files.newInputStream(Paths.get(jarFilePathUnderLib))));
+
+ } catch (Exception e) {
+ LOGGER.warn(
+ "Failed to download executable for trigger({}) using URI: {},
the cause is: {}",
+ createTriggerStatement.getTriggerName(),
+ createTriggerStatement.getJarPath(),
+ e.getMessage());
+ throw e;
+ }
+ } else {
+ // set jarPath to file name instead of the full path
+ tCreateTriggerReq.setJarPath(new
File(createTriggerStatement.getJarPath()).getName());
// If jarPath is a file path, we transfer it to ByteBuffer and send it
to ConfigNode.
tCreateTriggerReq.setJarPath(new
File(createTriggerStatement.getJarPath()).getName());
@@ -304,6 +341,21 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
Files.newInputStream(Paths.get(createTriggerStatement.getJarPath()))));
}
+ // try to create instance, this request will fail if creation is not
successful
+ try (TriggerClassLoader classLoader =
+
TriggerClassLoaderManager.getInstance().updateAndGetActiveClassLoader()) {
+ Class<?> triggerClass =
+ Class.forName(createTriggerStatement.getClassName(), true,
classLoader);
+ Trigger trigger = (Trigger)
triggerClass.getDeclaredConstructor().newInstance();
+
tCreateTriggerReq.setFailureStrategy(trigger.getFailureStrategy().getId());
+ } catch (Exception e) {
+ LOGGER.warn(
+ "Failed to create trigger when try to create trigger({}) instance
first, the cause is: {}",
+ createTriggerStatement.getTriggerName(),
+ e.getMessage());
+ throw e;
+ }
+
final TSStatus executionStatus = client.createTrigger(tCreateTriggerReq);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() !=
executionStatus.getCode()) {
@@ -316,7 +368,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
- } catch (TException | IOException e) {
+ } catch (Exception e) {
future.setException(e);
}
return future;
diff --git
a/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
index 38cb52e44b..ae81a5d499 100644
---
a/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
+++
b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
@@ -204,7 +204,7 @@ public class TriggerManagementService implements IService {
}
}
- private Trigger constructTriggerInstance(String className,
TriggerClassLoader classLoader)
+ public Trigger constructTriggerInstance(String className, TriggerClassLoader
classLoader)
throws TriggerManagementException {
try {
Class<?> triggerClass = Class.forName(className, true, classLoader);
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift
b/thrift-confignode/src/main/thrift/confignode.thrift
index 1d9bc2b2a9..3f8f15b204 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -263,7 +263,8 @@ struct TCreateTriggerReq {
7: required binary pathPattern,
8: required map<string, string> attributes,
9: optional binary jarFile,
- 10: optional string jarMD5
+ 10: optional string jarMD5,
+ 11: required i32 failureStrategy
}
struct TDropTriggerReq {
diff --git
a/trigger-api/src/main/java/org/apache/iotdb/trigger/api/enums/FailureStrategy.java
b/trigger-api/src/main/java/org/apache/iotdb/trigger/api/enums/FailureStrategy.java
index 5920f33996..6f3faf342e 100644
---
a/trigger-api/src/main/java/org/apache/iotdb/trigger/api/enums/FailureStrategy.java
+++
b/trigger-api/src/main/java/org/apache/iotdb/trigger/api/enums/FailureStrategy.java
@@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.iotdb.trigger.api.enums;
import org.apache.iotdb.trigger.api.Trigger;
@@ -34,16 +33,34 @@ public enum FailureStrategy {
* not have any influence on the triggers that have not been fired. The
failure of this Trigger
* will be simply ignored.
*/
- OPTIMISTIC,
+ OPTIMISTIC(0),
/**
- * If this strategy were adopted, the failure of {@link
Trigger#fire(Tablet)} of one Tablet would
- * throw an exception and end this insertion. If a PESSIMISTIC trigger whose
TRIGGER_EVENT is
- * {@link TriggerEvent#BEFORE_INSERT} fails to fire in an insertion, all the
triggers that have
- * not fired will not be fired in this insertion and this insertion will not
be executed. if a
- * PESSIMISTIC trigger whose TRIGGER_EVENT is {@link
TriggerEvent#AFTER_INSERT} fails to fire in
- * an insertion, all the triggers that have not fired will not be fired, and
this insertion will
- * be marked as failed even if the insertion itself executed successfully.
+ * If this strategy were adopted, the failure of {@link
Trigger#fire(Tablet)} of one Tablet
+ * would @@ -45,5 +45,26 @@ public enum FailureStrategy { an insertion, all
the triggers that have
+ * not fired will not be fired, and this insertion will be marked as failed
even if the insertion
+ * itself executed successfully.
*/
- PESSIMISTIC,
+ PESSIMISTIC(1);
+
+ private final int id;
+
+ FailureStrategy(int id) {
+ this.id = id;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public static FailureStrategy construct(int id) {
+ switch (id) {
+ case 0:
+ return FailureStrategy.OPTIMISTIC;
+ case 1:
+ return FailureStrategy.PESSIMISTIC;
+ default:
+ throw new UnsupportedOperationException("Unsupported FailureStrategy
Type.");
+ }
+ }
}