This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0cf20ae58c [IOTFB-4279] Migrate Trigger related class to node-commons
(#7215)
0cf20ae58c is described below
commit 0cf20ae58c037be29eae648138d86f6e56b96b2f
Author: Liao Lanyu <[email protected]>
AuthorDate: Mon Sep 5 13:46:30 2022 +0800
[IOTFB-4279] Migrate Trigger related class to node-commons (#7215)
---
.../org/apache/iotdb/db/qp/sql/IdentifierParser.g4 | 2 +
.../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 6 +-
.../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 | 8 ++
.../iotdb/confignode/persistence/UDFInfo.java | 4 +-
.../commons/executable/ExecutableManager.java | 115 +++++++++++++++++++++
.../ExecutableResource.java} | 6 +-
.../iotdb/commons/file/SystemFileFactory.java | 11 ++
.../apache/iotdb/commons/service/ServiceType.java | 4 +-
.../exception/TriggerExecutionException.java} | 21 ++--
.../exception/TriggerRegistrationException.java} | 21 ++--
.../trigger/service/TriggerClassLoader.java | 4 +-
.../service/TriggerClassLoaderManager.java} | 74 ++++---------
.../service/TriggerRegistrationService.java | 89 ++++++++++++++++
.../commons/udf/service/UDFClassLoaderManager.java | 12 +--
.../commons/udf/service/UDFExecutableManager.java | 108 ++-----------------
.../udf/service/UDFRegistrationService.java | 3 +-
.../engine/trigger/executor/TriggerExecutor.java | 2 +-
.../trigger/service/TriggerClassLoaderManager.java | 1 +
.../service/TriggerRegistrationService.java | 3 +-
.../config/executor/ClusterConfigTaskExecutor.java | 2 +
.../config/executor/IConfigTaskExecutor.java | 2 +
.../executor/StandaloneConfigTaskExecutor.java | 2 +
.../config/metadata/CreateTriggerTask.java | 8 +-
.../iotdb/db/mpp/plan/parser/ASTVisitor.java | 15 +++
.../statement/metadata/CreateTriggerStatement.java | 17 +++
25 files changed, 332 insertions(+), 208 deletions(-)
diff --git
a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
index 03b9219d9a..6a649b840f 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
@@ -75,6 +75,7 @@ keyWords
| EVERY
| EXPLAIN
| FILL
+ | FILE
| FLUSH
| FOR
| FROM
@@ -169,6 +170,7 @@ keyWords
| UNSET
| UPDATE
| UPSERT
+ | URI
| USER
| USING
| VALUES
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index 8855e97839..6a141ce128 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -126,7 +126,7 @@ uri
// Create Trigger
createTrigger
- : CREATE triggerType? TRIGGER triggerName=identifier triggerEventClause ON
prefixPath AS className=STRING_LITERAL triggerAttributeClause?
+ : CREATE triggerType? TRIGGER triggerName=identifier triggerEventClause ON
prefixPath AS className=STRING_LITERAL jarLocation? triggerAttributeClause?
;
triggerType
@@ -137,6 +137,10 @@ triggerEventClause
: (BEFORE | AFTER) (INSERT | DELETE)
;
+jarLocation
+ : USING ((FILE fileName=STRING_LITERAL) | URI uri)
+ ;
+
triggerAttributeClause
: WITH LR_BRACKET triggerAttribute (COMMA triggerAttribute)* RR_BRACKET
;
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
index 475317029f..10bb58e1cc 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
@@ -215,6 +215,10 @@ FILL
: F I L L
;
+FILE
+ : F I L E
+ ;
+
FLUSH
: F L U S H
;
@@ -603,6 +607,10 @@ UPSERT
: U P S E R T
;
+URI
+ : U R I
+ ;
+
USER
: U S E R
;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java
index e3a7568c95..3665b7d31d 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java
@@ -20,10 +20,10 @@
package org.apache.iotdb.confignode.persistence;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.executable.ExecutableResource;
import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
import org.apache.iotdb.commons.udf.service.UDFClassLoader;
import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
-import org.apache.iotdb.commons.udf.service.UDFExecutableResource;
import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
@@ -78,7 +78,7 @@ public class UDFInfo implements SnapshotProcessor {
private void fetchExecutablesAndCheckInstantiation(String className,
List<String> uris)
throws Exception {
- final UDFExecutableResource resource = udfExecutableManager.request(uris);
+ final ExecutableResource resource = udfExecutableManager.request(uris);
try (UDFClassLoader temporaryUdfClassLoader = new
UDFClassLoader(resource.getResourceDir())) {
Class.forName(className, true, temporaryUdfClassLoader)
.getDeclaredConstructor()
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
b/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
new file mode 100644
index 0000000000..7022416d14
--- /dev/null
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.executable;
+
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+
+import org.apache.commons.io.FileUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class ExecutableManager {
+
+ protected final String temporaryLibRoot;
+ protected final String libRoot;
+
+ protected final AtomicLong requestCounter;
+
+ public ExecutableManager(String temporaryLibRoot, String libRoot) {
+ this.temporaryLibRoot = temporaryLibRoot;
+ this.libRoot = libRoot;
+
+ requestCounter = new AtomicLong(0);
+ }
+
+ public ExecutableResource request(List<String> uris) throws
URISyntaxException, IOException {
+ final long requestId = generateNextRequestId();
+ downloadExecutables(uris, requestId);
+ return new ExecutableResource(requestId,
getDirStringByRequestId(requestId));
+ }
+
+ public void moveToExtLibDir(ExecutableResource resource, String name) throws
IOException {
+ FileUtils.moveDirectory(getDirByRequestId(resource.getRequestId()),
getDirByName(name));
+ }
+
+ public void removeFromTemporaryLibRoot(ExecutableResource resource) {
+ removeFromTemporaryLibRoot(resource.getRequestId());
+ }
+
+ public void removeFromExtLibDir(String functionName) {
+ FileUtils.deleteQuietly(getDirByName(functionName));
+ }
+
+ private synchronized long generateNextRequestId() throws IOException {
+ long requestId = requestCounter.getAndIncrement();
+ while (FileUtils.isDirectory(getDirByRequestId(requestId))) {
+ requestId = requestCounter.getAndIncrement();
+ }
+ FileUtils.forceMkdir(getDirByRequestId(requestId));
+ return requestId;
+ }
+
+ private void downloadExecutables(List<String> uris, long requestId)
+ throws IOException, URISyntaxException {
+ // TODO: para download
+ try {
+ for (String uriString : uris) {
+ final URL url = new URI(uriString).toURL();
+ final String fileName = uriString.substring(uriString.lastIndexOf("/")
+ 1);
+ final String destination =
+ temporaryLibRoot + File.separator + requestId + File.separator +
fileName;
+ FileUtils.copyURLToFile(url,
FSFactoryProducer.getFSFactory().getFile(destination));
+ }
+ } catch (Exception e) {
+ removeFromTemporaryLibRoot(requestId);
+ throw e;
+ }
+ }
+
+ private void removeFromTemporaryLibRoot(long requestId) {
+ FileUtils.deleteQuietly(getDirByRequestId(requestId));
+ }
+
+
/////////////////////////////////////////////////////////////////////////////////////////////////
+ // dir string and dir file generation
+
/////////////////////////////////////////////////////////////////////////////////////////////////
+
+ public File getDirByRequestId(long requestId) {
+ return
FSFactoryProducer.getFSFactory().getFile(getDirStringByRequestId(requestId));
+ }
+
+ public String getDirStringByRequestId(long requestId) {
+ return temporaryLibRoot + File.separator + requestId + File.separator;
+ }
+
+ public File getDirByName(String name) {
+ return FSFactoryProducer.getFSFactory().getFile(getDirStringByName(name));
+ }
+
+ public String getDirStringByName(String name) {
+ return libRoot + File.separator + name + File.separator;
+ }
+}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableResource.java
b/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableResource.java
similarity index 87%
copy from
node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableResource.java
copy to
node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableResource.java
index ec0c375997..d763445a84 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableResource.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableResource.java
@@ -17,14 +17,14 @@
* under the License.
*/
-package org.apache.iotdb.commons.udf.service;
+package org.apache.iotdb.commons.executable;
-public class UDFExecutableResource {
+public class ExecutableResource {
private final long requestId;
private final String resourceDir;
- public UDFExecutableResource(long requestId, String resourceDir) {
+ public ExecutableResource(long requestId, String resourceDir) {
this.requestId = requestId;
this.resourceDir = resourceDir;
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/file/SystemFileFactory.java
b/node-commons/src/main/java/org/apache/iotdb/commons/file/SystemFileFactory.java
index bbed0563da..54f2e139b5 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/file/SystemFileFactory.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/file/SystemFileFactory.java
@@ -22,7 +22,10 @@ package org.apache.iotdb.commons.file;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.tsfile.fileSystem.FSType;
+import org.apache.commons.io.FileUtils;
+
import java.io.File;
+import java.io.IOException;
import java.net.URI;
public enum SystemFileFactory {
@@ -67,4 +70,12 @@ public enum SystemFileFactory {
return new File(uri);
}
}
+
+ public void makeDirIfNecessary(String dir) throws IOException {
+ File file = getFile(dir);
+ if (file.exists() && file.isDirectory()) {
+ return;
+ }
+ FileUtils.forceMkdir(file);
+ }
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
index ec38df7015..f9bdc39eb1 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
@@ -44,8 +44,10 @@ public enum ServiceType {
UDF_CLASSLOADER_MANAGER_SERVICE("UDF Classloader Manager Service", ""),
UDF_REGISTRATION_SERVICE("UDF Registration Service", ""),
UDF_EXECUTABLE_MANAGER_SERVICE("UDF Executable Manager Service", ""),
- TEMPORARY_QUERY_DATA_FILE_SERVICE("Temporary Query Data File Service", ""),
+ TRIGGER_CLASSLOADER_MANAGER_SERVICE("Trigger ClassLoader Manager Service",
""),
TRIGGER_REGISTRATION_SERVICE("Trigger Registration Service", ""),
+ TEMPORARY_QUERY_DATA_FILE_SERVICE("Temporary Query Data File Service", ""),
+ TRIGGER_REGISTRATION_SERVICE_OLD("Old Standalone Trigger Registration
Service", ""),
CACHE_HIT_RATIO_DISPLAY_SERVICE(
"CACHE_HIT_RATIO_DISPLAY_SERVICE",
generateJmxName("org.apache.iotdb.service", "Cache Hit Ratio")),
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableResource.java
b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/exception/TriggerExecutionException.java
similarity index 66%
copy from
node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableResource.java
copy to
node-commons/src/main/java/org/apache/iotdb/commons/trigger/exception/TriggerExecutionException.java
index ec0c375997..0c4ba54620 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableResource.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/exception/TriggerExecutionException.java
@@ -17,23 +17,14 @@
* under the License.
*/
-package org.apache.iotdb.commons.udf.service;
+package org.apache.iotdb.commons.trigger.exception;
-public class UDFExecutableResource {
-
- private final long requestId;
- private final String resourceDir;
-
- public UDFExecutableResource(long requestId, String resourceDir) {
- this.requestId = requestId;
- this.resourceDir = resourceDir;
- }
-
- public long getRequestId() {
- return requestId;
+public class TriggerExecutionException extends RuntimeException {
+ public TriggerExecutionException(String message) {
+ super(message);
}
- public String getResourceDir() {
- return resourceDir;
+ public TriggerExecutionException(String message, Throwable cause) {
+ super(message, cause);
}
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableResource.java
b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/exception/TriggerRegistrationException.java
similarity index 66%
rename from
node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableResource.java
rename to
node-commons/src/main/java/org/apache/iotdb/commons/trigger/exception/TriggerRegistrationException.java
index ec0c375997..34a18dc6c2 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableResource.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/exception/TriggerRegistrationException.java
@@ -17,23 +17,14 @@
* under the License.
*/
-package org.apache.iotdb.commons.udf.service;
+package org.apache.iotdb.commons.trigger.exception;
-public class UDFExecutableResource {
-
- private final long requestId;
- private final String resourceDir;
-
- public UDFExecutableResource(long requestId, String resourceDir) {
- this.requestId = requestId;
- this.resourceDir = resourceDir;
- }
-
- public long getRequestId() {
- return requestId;
+public class TriggerRegistrationException extends RuntimeException {
+ public TriggerRegistrationException(String message) {
+ super(message);
}
- public String getResourceDir() {
- return resourceDir;
+ public TriggerRegistrationException(String message, Throwable cause) {
+ super(message, cause);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/trigger/service/TriggerClassLoader.java
b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/service/TriggerClassLoader.java
similarity index 93%
rename from
server/src/main/java/org/apache/iotdb/db/engine/trigger/service/TriggerClassLoader.java
rename to
node-commons/src/main/java/org/apache/iotdb/commons/trigger/service/TriggerClassLoader.java
index 0207b1081d..9356c23f73 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/trigger/service/TriggerClassLoader.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/service/TriggerClassLoader.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.engine.trigger.service;
+package org.apache.iotdb.commons.trigger.service;
import org.apache.iotdb.commons.file.SystemFileFactory;
@@ -37,7 +37,7 @@ public class TriggerClassLoader extends URLClassLoader {
private final String libRoot;
- TriggerClassLoader(String libRoot) throws IOException {
+ public TriggerClassLoader(String libRoot) throws IOException {
super(new URL[0]);
this.libRoot = libRoot;
LOGGER.info("Trigger lib root: {}", libRoot);
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFClassLoaderManager.java
b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/service/TriggerClassLoaderManager.java
similarity index 51%
copy from
node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFClassLoaderManager.java
copy to
node-commons/src/main/java/org/apache/iotdb/commons/trigger/service/TriggerClassLoaderManager.java
index b74f47c3ba..f94d343ef9 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFClassLoaderManager.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/service/TriggerClassLoaderManager.java
@@ -17,70 +17,46 @@
* under the License.
*/
-package org.apache.iotdb.commons.udf.service;
+package org.apache.iotdb.commons.trigger.service;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
-import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-public class UDFClassLoaderManager implements IService {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(UDFClassLoaderManager.class);
+public class TriggerClassLoaderManager implements IService {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TriggerClassLoaderManager.class);
+ /** The dir that stores jar files. */
private final String libRoot;
- /** The keys in the map are the query IDs of the UDF queries being executed.
*/
- private final Map<Long, UDFClassLoader> queryIdToUDFClassLoaderMap;
-
/**
* activeClassLoader is used to load all classes under libRoot. libRoot may
be updated before the
- * user executes CREATE FUNCTION or after the user executes DROP FUNCTION.
Therefore, we need to
+ * user executes CREATE TRIGGER or after the user executes DROP TRIGGER.
Therefore, we need to
* continuously maintain the activeClassLoader so that the classes it loads
are always up-to-date.
*/
- private volatile UDFClassLoader activeClassLoader;
+ private volatile TriggerClassLoader activeClassLoader;
- private UDFClassLoaderManager(String libRoot) {
+ private TriggerClassLoaderManager(String libRoot) {
this.libRoot = libRoot;
- LOGGER.info("UDF lib root: {}", libRoot);
- queryIdToUDFClassLoaderMap = new ConcurrentHashMap<>();
+ LOGGER.info("Trigger lib root: {}", libRoot);
activeClassLoader = null;
}
- public void initializeUDFQuery(long queryId) {
- activeClassLoader.acquire();
- queryIdToUDFClassLoaderMap.put(queryId, activeClassLoader);
- }
-
- public void finalizeUDFQuery(long queryId) {
- UDFClassLoader classLoader = queryIdToUDFClassLoaderMap.remove(queryId);
- try {
- if (classLoader != null) {
- classLoader.release();
- }
- } catch (IOException e) {
- LOGGER.warn(
- "Failed to close UDFClassLoader (queryId: {}), because {}", queryId,
e.toString());
- }
- }
-
- public UDFClassLoader updateAndGetActiveClassLoader() throws IOException {
- UDFClassLoader deprecatedClassLoader = activeClassLoader;
- activeClassLoader = new UDFClassLoader(libRoot);
- deprecatedClassLoader.markAsDeprecated();
+ /** Call this method to get up-to-date ClassLoader before registering
triggers */
+ public TriggerClassLoader updateAndGetActiveClassLoader() throws IOException
{
+ TriggerClassLoader deprecatedClassLoader = activeClassLoader;
+ activeClassLoader = new TriggerClassLoader(libRoot);
+ deprecatedClassLoader.close();
return activeClassLoader;
}
- public UDFClassLoader getActiveClassLoader() {
+ public TriggerClassLoader getActiveClassLoader() {
return activeClassLoader;
}
@@ -91,21 +67,13 @@ public class UDFClassLoaderManager implements IService {
@Override
public void start() throws StartupException {
try {
- makeDirIfNecessary();
- activeClassLoader = new UDFClassLoader(libRoot);
+ SystemFileFactory.INSTANCE.makeDirIfNecessary(libRoot);
+ activeClassLoader = new TriggerClassLoader(libRoot);
} catch (IOException e) {
throw new StartupException(this.getID().getName(), e.getMessage());
}
}
- private void makeDirIfNecessary() throws IOException {
- File file = SystemFileFactory.INSTANCE.getFile(libRoot);
- if (file.exists() && file.isDirectory()) {
- return;
- }
- FileUtils.forceMkdir(file);
- }
-
@Override
public void stop() {
// nothing to do
@@ -113,23 +81,23 @@ public class UDFClassLoaderManager implements IService {
@Override
public ServiceType getID() {
- return ServiceType.UDF_CLASSLOADER_MANAGER_SERVICE;
+ return ServiceType.TRIGGER_CLASSLOADER_MANAGER_SERVICE;
}
/////////////////////////////////////////////////////////////////////////////////////////////////
// singleton instance holder
/////////////////////////////////////////////////////////////////////////////////////////////////
- private static UDFClassLoaderManager INSTANCE = null;
+ private static TriggerClassLoaderManager INSTANCE = null;
- public static synchronized UDFClassLoaderManager setupAndGetInstance(String
libRoot) {
+ public static synchronized TriggerClassLoaderManager
setupAndGetInstance(String libRoot) {
if (INSTANCE == null) {
- INSTANCE = new UDFClassLoaderManager(libRoot);
+ INSTANCE = new TriggerClassLoaderManager(libRoot);
}
return INSTANCE;
}
- public static UDFClassLoaderManager getInstance() {
+ public static TriggerClassLoaderManager getInstance() {
return INSTANCE;
}
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/service/TriggerRegistrationService.java
b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/service/TriggerRegistrationService.java
new file mode 100644
index 0000000000..68e7e2074b
--- /dev/null
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/service/TriggerRegistrationService.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.trigger.service;
+
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.service.IService;
+import org.apache.iotdb.commons.service.ServiceType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+public class TriggerRegistrationService implements IService {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TriggerRegistrationService.class);
+
+ private final ReentrantLock registrationLock;
+
+ private TriggerRegistrationService() {
+ this.registrationLock = new ReentrantLock();
+ }
+
+ public void acquireRegistrationLock() {
+ registrationLock.lock();
+ }
+
+ public void releaseRegistrationLock() {
+ registrationLock.unlock();
+ }
+
+ // todo: implementation
+ public void register() {
+ // validate before registering
+ // add to triggerTable and set inactive
+ // throw exception if registered
+ };
+
+ public void activeTrigger(String triggerName) {
+ // active trigger in table
+ };
+
+ @Override
+ public void start() throws StartupException {}
+
+ @Override
+ public void stop() {
+ // nothing to do
+ }
+
+ @Override
+ public ServiceType getID() {
+ return ServiceType.TRIGGER_REGISTRATION_SERVICE;
+ }
+
+
/////////////////////////////////////////////////////////////////////////////////////////////////
+ // singleton instance holder
+
/////////////////////////////////////////////////////////////////////////////////////////////////
+
+ private static TriggerRegistrationService INSTANCE = null;
+
+ public static synchronized TriggerRegistrationService setupAndGetInstance() {
+ if (INSTANCE == null) {
+ INSTANCE = new TriggerRegistrationService();
+ }
+ return INSTANCE;
+ }
+
+ public static TriggerRegistrationService getInstance() {
+ return INSTANCE;
+ }
+}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFClassLoaderManager.java
b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFClassLoaderManager.java
index b74f47c3ba..9d98576753 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFClassLoaderManager.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFClassLoaderManager.java
@@ -24,11 +24,9 @@ import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
-import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -91,21 +89,13 @@ public class UDFClassLoaderManager implements IService {
@Override
public void start() throws StartupException {
try {
- makeDirIfNecessary();
+ SystemFileFactory.INSTANCE.makeDirIfNecessary(libRoot);
activeClassLoader = new UDFClassLoader(libRoot);
} catch (IOException e) {
throw new StartupException(this.getID().getName(), e.getMessage());
}
}
- private void makeDirIfNecessary() throws IOException {
- File file = SystemFileFactory.INSTANCE.getFile(libRoot);
- if (file.exists() && file.isDirectory()) {
- return;
- }
- FileUtils.forceMkdir(file);
- }
-
@Override
public void stop() {
// nothing to do
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java
b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java
index f7f18d7171..a1cf6f34c8 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java
@@ -20,104 +20,19 @@
package org.apache.iotdb.commons.udf.service;
import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.executable.ExecutableManager;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
-import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
-
-import org.apache.commons.io.FileUtils;
import java.io.File;
import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class UDFExecutableManager implements IService, SnapshotProcessor {
-
- private final String temporaryLibRoot;
- private final String udfLibRoot;
- private final AtomicLong requestCounter;
+public class UDFExecutableManager extends ExecutableManager implements
IService, SnapshotProcessor {
private UDFExecutableManager(String temporaryLibRoot, String udfLibRoot) {
- this.temporaryLibRoot = temporaryLibRoot;
- this.udfLibRoot = udfLibRoot;
-
- requestCounter = new AtomicLong(0);
- }
-
- public UDFExecutableResource request(List<String> uris) throws
URISyntaxException, IOException {
- final long requestId = generateNextRequestId();
- downloadExecutables(uris, requestId);
- return new UDFExecutableResource(requestId,
getDirStringByRequestId(requestId));
- }
-
- public void moveToExtLibDir(UDFExecutableResource resource, String
functionName)
- throws IOException {
- FileUtils.moveDirectory(
- getDirByRequestId(resource.getRequestId()),
getDirByFunctionName(functionName));
- }
-
- public void removeFromTemporaryLibRoot(UDFExecutableResource resource) {
- removeFromTemporaryLibRoot(resource.getRequestId());
- }
-
- public void removeFromExtLibDir(String functionName) {
- FileUtils.deleteQuietly(getDirByFunctionName(functionName));
- }
-
- private synchronized long generateNextRequestId() throws IOException {
- long requestId = requestCounter.getAndIncrement();
- while (FileUtils.isDirectory(getDirByRequestId(requestId))) {
- requestId = requestCounter.getAndIncrement();
- }
- FileUtils.forceMkdir(getDirByRequestId(requestId));
- return requestId;
- }
-
- private void downloadExecutables(List<String> uris, long requestId)
- throws IOException, URISyntaxException {
- // TODO: para download
- try {
- for (String uriString : uris) {
- final URL url = new URI(uriString).toURL();
- final String fileName = uriString.substring(uriString.lastIndexOf("/")
+ 1);
- final String destination =
- temporaryLibRoot + File.separator + requestId + File.separator +
fileName;
- FileUtils.copyURLToFile(url,
FSFactoryProducer.getFSFactory().getFile(destination));
- }
- } catch (Exception e) {
- removeFromTemporaryLibRoot(requestId);
- throw e;
- }
- }
-
- private void removeFromTemporaryLibRoot(long requestId) {
- FileUtils.deleteQuietly(getDirByRequestId(requestId));
- }
-
-
/////////////////////////////////////////////////////////////////////////////////////////////////
- // dir string and dir file generation
-
/////////////////////////////////////////////////////////////////////////////////////////////////
-
- public File getDirByRequestId(long requestId) {
- return
FSFactoryProducer.getFSFactory().getFile(getDirStringByRequestId(requestId));
- }
-
- public String getDirStringByRequestId(long requestId) {
- return temporaryLibRoot + File.separator + requestId + File.separator;
- }
-
- public File getDirByFunctionName(String functionName) {
- return
FSFactoryProducer.getFSFactory().getFile(getDirStringByFunctionName(functionName));
- }
-
- public String getDirStringByFunctionName(String functionName) {
- return udfLibRoot + File.separator + functionName + File.separator;
+ super(temporaryLibRoot, udfLibRoot);
}
/////////////////////////////////////////////////////////////////////////////////////////////////
@@ -127,8 +42,8 @@ public class UDFExecutableManager implements IService,
SnapshotProcessor {
@Override
public void start() throws StartupException {
try {
- makeDirIfNecessary(temporaryLibRoot);
- makeDirIfNecessary(udfLibRoot);
+ SystemFileFactory.INSTANCE.makeDirIfNecessary(temporaryLibRoot);
+ SystemFileFactory.INSTANCE.makeDirIfNecessary(libRoot);
} catch (Exception e) {
throw new StartupException(e);
}
@@ -158,14 +73,6 @@ public class UDFExecutableManager implements IService,
SnapshotProcessor {
return INSTANCE;
}
- private static void makeDirIfNecessary(String dir) throws IOException {
- File file = SystemFileFactory.INSTANCE.getFile(dir);
- if (file.exists() && file.isDirectory()) {
- return;
- }
- FileUtils.forceMkdir(file);
- }
-
public static UDFExecutableManager getInstance() {
return INSTANCE;
}
@@ -180,7 +87,7 @@ public class UDFExecutableManager implements IService,
SnapshotProcessor {
temporaryLibRoot,
snapshotDir.getAbsolutePath() + File.separator + "ext" +
File.separator + "temporary")
&& SnapshotUtils.takeSnapshotForDir(
- udfLibRoot,
+ libRoot,
snapshotDir.getAbsolutePath() + File.separator + "ext" +
File.separator + "udf");
}
@@ -190,7 +97,6 @@ public class UDFExecutableManager implements IService,
SnapshotProcessor {
snapshotDir.getAbsolutePath() + File.separator + "ext" +
File.separator + "temporary",
temporaryLibRoot);
SnapshotUtils.loadSnapshotForDir(
- snapshotDir.getAbsolutePath() + File.separator + "ext" +
File.separator + "udf",
- udfLibRoot);
+ snapshotDir.getAbsolutePath() + File.separator + "ext" +
File.separator + "udf", libRoot);
}
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationService.java
b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationService.java
index 332906ec26..c0bf6a5d3e 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationService.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationService.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.commons.udf.service;
import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.executable.ExecutableResource;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
@@ -170,7 +171,7 @@ public class UDFRegistrationService implements IService,
SnapshotProcessor {
}
try {
- final UDFExecutableResource resource =
udfExecutableManager.request(uris);
+ final ExecutableResource resource = udfExecutableManager.request(uris);
try {
udfExecutableManager.removeFromExtLibDir(functionName);
udfExecutableManager.moveToExtLibDir(resource, functionName);
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/trigger/executor/TriggerExecutor.java
b/server/src/main/java/org/apache/iotdb/db/engine/trigger/executor/TriggerExecutor.java
index e5a61d3167..b782ad5777 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/trigger/executor/TriggerExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/trigger/executor/TriggerExecutor.java
@@ -20,9 +20,9 @@
package org.apache.iotdb.db.engine.trigger.executor;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.trigger.service.TriggerClassLoader;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.engine.trigger.api.Trigger;
-import org.apache.iotdb.db.engine.trigger.service.TriggerClassLoader;
import
org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationInformation;
import org.apache.iotdb.db.exception.TriggerExecutionException;
import org.apache.iotdb.db.exception.TriggerManagementException;
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/trigger/service/TriggerClassLoaderManager.java
b/server/src/main/java/org/apache/iotdb/db/engine/trigger/service/TriggerClassLoaderManager.java
index ca46142586..400d2adcbb 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/trigger/service/TriggerClassLoaderManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/trigger/service/TriggerClassLoaderManager.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.engine.trigger.service;
+import org.apache.iotdb.commons.trigger.service.TriggerClassLoader;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.TriggerManagementException;
import org.apache.iotdb.tsfile.utils.Pair;
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/trigger/service/TriggerRegistrationService.java
b/server/src/main/java/org/apache/iotdb/db/engine/trigger/service/TriggerRegistrationService.java
index f5a33a5963..f555c11376 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/trigger/service/TriggerRegistrationService.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/trigger/service/TriggerRegistrationService.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.commons.trigger.service.TriggerClassLoader;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -484,7 +485,7 @@ public class TriggerRegistrationService implements IService
{
@Override
public ServiceType getID() {
- return ServiceType.TRIGGER_REGISTRATION_SERVICE;
+ return ServiceType.TRIGGER_REGISTRATION_SERVICE_OLD;
}
public int executorSize() {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 76f2c6c109..8d051756bb 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -251,6 +251,8 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
public SettableFuture<ConfigTaskResult> createTrigger(
String triggerName,
String className,
+ String jarPath,
+ boolean usingURI,
TriggerEvent triggerEvent,
TriggerType triggerType,
PartialPath pathPattern) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
index e35d224edf..50330fbba7 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
@@ -68,6 +68,8 @@ public interface IConfigTaskExecutor {
SettableFuture<ConfigTaskResult> createTrigger(
String triggerName,
String className,
+ String jarPath,
+ boolean usingURI,
TriggerEvent triggerEvent,
TriggerType triggerType,
PartialPath pathPattern);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
index 2e85751084..fdefca20da 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
@@ -227,6 +227,8 @@ public class StandaloneConfigTaskExecutor implements
IConfigTaskExecutor {
public SettableFuture<ConfigTaskResult> createTrigger(
String triggerName,
String className,
+ String jarPath,
+ boolean usingURI,
TriggerEvent triggerEvent,
TriggerType triggerType,
PartialPath pathPattern) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/CreateTriggerTask.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/CreateTriggerTask.java
index b335f6095e..98d3085aea 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/CreateTriggerTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/CreateTriggerTask.java
@@ -34,6 +34,10 @@ public class CreateTriggerTask implements IConfigTask {
private final String className;
+ private final String jarPath;
+
+ private final boolean usingURI;
+
private final TriggerEvent triggerEvent;
private final TriggerType triggerType;
@@ -43,6 +47,8 @@ public class CreateTriggerTask implements IConfigTask {
public CreateTriggerTask(CreateTriggerStatement createTriggerStatement) {
this.triggerName = createTriggerStatement.getTriggerName();
this.className = createTriggerStatement.getClassName();
+ this.jarPath = createTriggerStatement.getJarPath();
+ this.usingURI = createTriggerStatement.isUsingURI();
this.triggerEvent = createTriggerStatement.getTriggerEvent();
this.triggerType = createTriggerStatement.getTriggerType();
this.pathPattern = createTriggerStatement.getPathPattern();
@@ -52,6 +58,6 @@ public class CreateTriggerTask implements IConfigTask {
public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor
configTaskExecutor)
throws InterruptedException {
return configTaskExecutor.createTrigger(
- triggerName, className, triggerEvent, triggerType, pathPattern);
+ triggerName, className, jarPath, usingURI, triggerEvent, triggerType,
pathPattern);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index f070b0dfc7..a68c04b34a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -727,6 +727,19 @@ public class ASTVisitor extends
IoTDBSqlParserBaseVisitor<Statement> {
if (ctx.triggerType() == null) {
throw new SemanticException("Please specify trigger type: STATELESS or
STATEFUL.");
}
+ if (ctx.jarLocation() == null) {
+ throw new SemanticException("Please specify the location of jar.");
+ }
+ // parse jarPath
+ String jarPath;
+ boolean usingURI;
+ if (ctx.jarLocation().FILE() != null) {
+ usingURI = false;
+ jarPath = parseFilePath(ctx.jarLocation().fileName.getText());
+ } else {
+ usingURI = true;
+ jarPath = parseFilePath(ctx.jarLocation().uri().getText());
+ }
Map<String, String> attributes = new HashMap<>();
if (ctx.triggerAttributeClause() != null) {
for (IoTDBSqlParser.TriggerAttributeContext triggerAttributeContext :
@@ -739,6 +752,8 @@ public class ASTVisitor extends
IoTDBSqlParserBaseVisitor<Statement> {
return new CreateTriggerStatement(
parseIdentifier(ctx.triggerName.getText()),
parseStringLiteral(ctx.className.getText()),
+ jarPath,
+ usingURI,
ctx.triggerEventClause().BEFORE() != null
? TriggerEvent.BEFORE_INSERT
: TriggerEvent.AFTER_INSERT,
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateTriggerStatement.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateTriggerStatement.java
index 495e7cace1..baae407148 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateTriggerStatement.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateTriggerStatement.java
@@ -36,6 +36,11 @@ public class CreateTriggerStatement extends Statement
implements IConfigStatemen
private final String className;
+ private final String jarPath;
+
+ /** usingURI == true indicates that jarPath is a URI */
+ private final boolean usingURI;
+
private final TriggerEvent triggerEvent;
private final TriggerType triggerType;
@@ -47,12 +52,16 @@ public class CreateTriggerStatement extends Statement
implements IConfigStatemen
public CreateTriggerStatement(
String triggerName,
String className,
+ String jarPath,
+ boolean usingURI,
TriggerEvent triggerEvent,
TriggerType triggerType,
PartialPath pathPattern,
Map<String, String> attributes) {
this.triggerName = triggerName;
this.className = className;
+ this.jarPath = jarPath;
+ this.usingURI = usingURI;
this.triggerEvent = triggerEvent;
this.triggerType = triggerType;
this.pathPattern = pathPattern;
@@ -83,6 +92,14 @@ public class CreateTriggerStatement extends Statement
implements IConfigStatemen
return attributes;
}
+ public String getJarPath() {
+ return jarPath;
+ }
+
+ public boolean isUsingURI() {
+ return usingURI;
+ }
+
@Override
public QueryType getQueryType() {
return QueryType.WRITE;