This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.4.0
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/dev-1.4.0 by this push:
new 0fb148a8e Translate engineconn-plugins-presto service classes from
Scala to Java (#4514)
0fb148a8e is described below
commit 0fb148a8e68d6889e3f88761ef4d1e25b86e87a0
Author: ChengJie1053 <[email protected]>
AuthorDate: Wed May 10 21:34:34 2023 +0800
Translate engineconn-plugins-presto service classes from Scala to Java
(#4514)
---
.../service/TaskExecutionServiceImpl.scala | 119 +++---
.../presto/PrestoEngineConnPlugin.java | 72 ++++
.../PrestoProcessEngineConnLaunchBuilder.java} | 16 +-
.../presto/conf/PrestoConfiguration.java | 63 +++
.../engineplugin/presto/conf/PrestoEngineConf.java | 53 +++
.../presto/exception/PrestoClientException.java} | 15 +-
.../exception/PrestoStateInvalidException.java} | 13 +-
.../presto/executor/PrestoEngineConnExecutor.java | 468 +++++++++++++++++++++
.../engineplugin/presto/utils/PrestoSQLHook.java} | 18 +-
.../presto/PrestoEngineConnPlugin.scala | 71 ----
.../presto/conf/PrestoConfiguration.scala | 49 ---
.../presto/conf/PrestoEngineConf.scala | 46 --
.../presto/executor/PrestoEngineConnExecutor.scala | 453 --------------------
.../presto/TestPrestoEngineConnPlugin.java} | 14 +-
.../TestPrestoProcessEngineConnLaunchBuilder.java} | 21 +-
.../presto/conf/TestPrestoConfiguration.java} | 36 +-
.../presto/exception/TestPrestoException.java} | 26 +-
.../factory/TestPrestoEngineConnFactory.java | 43 ++
.../presto/utils/TestPrestoSQLHook.java} | 15 +-
.../factory/TestPrestoEngineConnFactory.scala | 42 --
20 files changed, 847 insertions(+), 806 deletions(-)
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
index 651fc0f3d..f096401c5 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
@@ -50,7 +50,6 @@ import
org.apache.linkis.engineconn.core.executor.ExecutorManager
import org.apache.linkis.engineconn.executor.entity.ResourceFetchExecutor
import
org.apache.linkis.engineconn.executor.listener.ExecutorListenerBusContext
import org.apache.linkis.engineconn.executor.listener.event.EngineConnSyncEvent
-import org.apache.linkis.engineconn.launch.EngineConnServer
import org.apache.linkis.governance.common.constant.ec.ECConstants
import org.apache.linkis.governance.common.entity.ExecutionNodeStatus
import org.apache.linkis.governance.common.exception.engineconn.{
@@ -64,7 +63,6 @@ import org.apache.linkis.manager.common.protocol.resource.{
ResponseTaskRunningInfo,
ResponseTaskYarnResource
}
-import
org.apache.linkis.manager.engineplugin.common.launch.process.LaunchConstants
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.protocol.constants.TaskConstant
import org.apache.linkis.protocol.message.RequestProtocol
@@ -425,56 +423,19 @@ class TaskExecutionServiceImpl
.isRunning(task.getStatus)
) {
val progressResponse = taskProgress(task.getTaskId)
- val resourceResponse: ResponseTaskYarnResource =
- taskYarnResource(task.getTaskId) match {
- case responseTaskYarnResource: ResponseTaskYarnResource =>
- if (
- responseTaskYarnResource.resourceMap != null &&
!responseTaskYarnResource.resourceMap.isEmpty
- ) {
- responseTaskYarnResource
- } else {
- null
- }
- case _ =>
- null
- }
- val extraInfoMap = new util.HashMap[String, Object]()
- extraInfoMap.put(TaskConstant.ENGINE_INSTANCE,
Sender.getThisInstance)
- extraInfoMap.put(
- ECConstants.EC_TICKET_ID_KEY,
- EngineConnObject.getEngineCreationContext.getTicketId
- )
- val ecParams = EngineConnObject.getEngineCreationContext.getOptions
- if (ecParams.containsKey(ECConstants.YARN_QUEUE_NAME_CONFIG_KEY)) {
- extraInfoMap.put(
- ECConstants.YARN_QUEUE_NAME_KEY,
- ecParams.get(ECConstants.YARN_QUEUE_NAME_CONFIG_KEY)
- )
- }
- extraInfoMap.put(TaskConstant.ENGINE_CONN_TASK_ID, task.getTaskId)
- extraInfoMap.put(
- TaskConstant.ENGINE_CONN_SUBMIT_TIME,
- System.currentTimeMillis.toString
- )
+ val resourceResponse = buildResourceMap(task)
+ val extraInfoMap = buildExtraInfoMap(task)
// todo add other info
- var respRunningInfo: ResponseTaskRunningInfo = null
- if (null != resourceResponse) {
- respRunningInfo = ResponseTaskRunningInfo(
- progressResponse.execId,
- progressResponse.progress,
- progressResponse.progressInfo,
- resourceResponse.resourceMap,
- extraInfoMap
- )
- } else {
- respRunningInfo = ResponseTaskRunningInfo(
- progressResponse.execId,
- progressResponse.progress,
- progressResponse.progressInfo,
- null,
- extraInfoMap
- )
- }
+ val resourceMap = if (null != resourceResponse)
resourceResponse.resourceMap else null
+
+ val respRunningInfo: ResponseTaskRunningInfo =
ResponseTaskRunningInfo(
+ progressResponse.execId,
+ progressResponse.progress,
+ progressResponse.progressInfo,
+ resourceMap,
+ extraInfoMap
+ )
+
sendToEntrance(task, respRunningInfo)
Thread.sleep(TimeUnit.MILLISECONDS.convert(sleepInterval,
TimeUnit.SECONDS))
}
@@ -483,6 +444,42 @@ class TaskExecutionServiceImpl
})
}
+ private def buildExtraInfoMap(task: EngineConnTask): util.HashMap[String,
Object] = {
+ val extraInfoMap = new util.HashMap[String, Object]()
+ extraInfoMap.put(TaskConstant.ENGINE_INSTANCE, Sender.getThisInstance)
+ extraInfoMap.put(
+ ECConstants.EC_TICKET_ID_KEY,
+ EngineConnObject.getEngineCreationContext.getTicketId
+ )
+ val ecParams = EngineConnObject.getEngineCreationContext.getOptions
+ if (ecParams.containsKey(ECConstants.YARN_QUEUE_NAME_CONFIG_KEY)) {
+ extraInfoMap.put(
+ ECConstants.YARN_QUEUE_NAME_KEY,
+ ecParams.get(ECConstants.YARN_QUEUE_NAME_CONFIG_KEY)
+ )
+ }
+ extraInfoMap.put(TaskConstant.ENGINE_CONN_TASK_ID, task.getTaskId)
+ extraInfoMap.put(TaskConstant.ENGINE_CONN_SUBMIT_TIME,
System.currentTimeMillis.toString)
+ extraInfoMap
+ }
+
+ private def buildResourceMap(task: EngineConnTask): ResponseTaskYarnResource
= {
+ val resourceResponse: ResponseTaskYarnResource =
+ taskYarnResource(task.getTaskId) match {
+ case responseTaskYarnResource: ResponseTaskYarnResource =>
+ if (
+ responseTaskYarnResource.resourceMap != null &&
!responseTaskYarnResource.resourceMap.isEmpty
+ ) {
+ responseTaskYarnResource
+ } else {
+ null
+ }
+ case _ =>
+ null
+ }
+ resourceResponse
+ }
+
private def taskYarnResource(taskID: String): ResponseTaskYarnResource = {
val executor = taskIdCache.getIfPresent(taskID)
executor match {
@@ -664,14 +661,20 @@ class TaskExecutionServiceImpl
if (EngineConnConf.ENGINE_PUSH_LOG_TO_ENTRANCE.getValue) {
val task = getTaskByTaskId(taskProgressUpdateEvent.taskId)
if (null != task) {
- sendToEntrance(
- task,
- ResponseTaskProgress(
- taskProgressUpdateEvent.taskId,
- taskProgressUpdateEvent.progress,
- taskProgressUpdateEvent.progressInfo
- )
+ val resourceResponse = buildResourceMap(task)
+ val extraInfoMap = buildExtraInfoMap(task)
+
+ val resourceMap = if (null != resourceResponse)
resourceResponse.resourceMap else null
+
+ val respRunningInfo: ResponseTaskRunningInfo = ResponseTaskRunningInfo(
+ taskProgressUpdateEvent.taskId,
+ taskProgressUpdateEvent.progress,
+ taskProgressUpdateEvent.progressInfo,
+ resourceMap,
+ extraInfoMap
)
+
+ sendToEntrance(task, respRunningInfo)
} else {
logger.error(
"Task cannot null! taskProgressUpdateEvent : " +
ComputationEngineUtils.GSON
diff --git
a/linkis-engineconn-plugins/presto/src/main/java/org/apache/linkis/engineplugin/presto/PrestoEngineConnPlugin.java
b/linkis-engineconn-plugins/presto/src/main/java/org/apache/linkis/engineplugin/presto/PrestoEngineConnPlugin.java
new file mode 100644
index 000000000..c364c2813
--- /dev/null
+++
b/linkis-engineconn-plugins/presto/src/main/java/org/apache/linkis/engineplugin/presto/PrestoEngineConnPlugin.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.presto;
+
+import
org.apache.linkis.engineplugin.presto.builder.PrestoProcessEngineConnLaunchBuilder;
+import org.apache.linkis.engineplugin.presto.factory.PrestoEngineConnFactory;
+import org.apache.linkis.manager.engineplugin.common.EngineConnPlugin;
+import
org.apache.linkis.manager.engineplugin.common.creation.EngineConnFactory;
+import
org.apache.linkis.manager.engineplugin.common.launch.EngineConnLaunchBuilder;
+import
org.apache.linkis.manager.engineplugin.common.resource.EngineResourceFactory;
+import
org.apache.linkis.manager.engineplugin.common.resource.GenericEngineResourceFactory;
+import org.apache.linkis.manager.label.entity.Label;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class PrestoEngineConnPlugin implements EngineConnPlugin {
+ private Object resourceLocker = new Object();
+ private Object engineFactoryLocker = new Object();
+ private volatile EngineResourceFactory engineResourceFactory;
+ private volatile EngineConnFactory engineFactory;
+ private List<Label<?>> defaultLabels = new ArrayList<>();
+
+ @Override
+ public void init(Map<String, Object> params) {}
+
+ @Override
+ public EngineResourceFactory getEngineResourceFactory() {
+ if (null == engineResourceFactory) {
+ synchronized (resourceLocker) {
+ engineResourceFactory = new GenericEngineResourceFactory();
+ }
+ }
+ return engineResourceFactory;
+ }
+
+ @Override
+ public EngineConnLaunchBuilder getEngineConnLaunchBuilder() {
+ return new PrestoProcessEngineConnLaunchBuilder();
+ }
+
+ @Override
+ public EngineConnFactory getEngineConnFactory() {
+ if (null == engineFactory) {
+ synchronized (engineFactoryLocker) {
+ engineFactory = new PrestoEngineConnFactory();
+ }
+ }
+ return engineFactory;
+ }
+
+ @Override
+ public List<Label<?>> getDefaultLabels() {
+ return defaultLabels;
+ }
+}
diff --git
a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/builder/PrestoProcessEngineConnLaunchBuilder.scala
b/linkis-engineconn-plugins/presto/src/main/java/org/apache/linkis/engineplugin/presto/builder/PrestoProcessEngineConnLaunchBuilder.java
similarity index 70%
rename from
linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/builder/PrestoProcessEngineConnLaunchBuilder.scala
rename to
linkis-engineconn-plugins/presto/src/main/java/org/apache/linkis/engineplugin/presto/builder/PrestoProcessEngineConnLaunchBuilder.java
index 9e9d8065e..9a74967a9 100644
---
a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/builder/PrestoProcessEngineConnLaunchBuilder.scala
+++
b/linkis-engineconn-plugins/presto/src/main/java/org/apache/linkis/engineplugin/presto/builder/PrestoProcessEngineConnLaunchBuilder.java
@@ -15,16 +15,16 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.presto.builder
+package org.apache.linkis.engineplugin.presto.builder;
-import
org.apache.linkis.manager.engineplugin.common.launch.process.JavaProcessEngineConnLaunchBuilder
-import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel
-import org.apache.linkis.storage.utils.StorageConfiguration
+import
org.apache.linkis.manager.engineplugin.common.launch.process.JavaProcessEngineConnLaunchBuilder;
+import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel;
+import org.apache.linkis.storage.utils.StorageConfiguration;
-class PrestoProcessEngineConnLaunchBuilder extends
JavaProcessEngineConnLaunchBuilder {
+public class PrestoProcessEngineConnLaunchBuilder extends
JavaProcessEngineConnLaunchBuilder {
- override def getEngineStartUser(label: UserCreatorLabel): String = {
- StorageConfiguration.HDFS_ROOT_USER.getValue
+ @Override
+ public String getEngineStartUser(UserCreatorLabel label) {
+ return StorageConfiguration.HDFS_ROOT_USER.getValue();
}
-
}
diff --git
a/linkis-engineconn-plugins/presto/src/main/java/org/apache/linkis/engineplugin/presto/conf/PrestoConfiguration.java
b/linkis-engineconn-plugins/presto/src/main/java/org/apache/linkis/engineplugin/presto/conf/PrestoConfiguration.java
new file mode 100644
index 000000000..d5af92645
--- /dev/null
+++
b/linkis-engineconn-plugins/presto/src/main/java/org/apache/linkis/engineplugin/presto/conf/PrestoConfiguration.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.presto.conf;
+
+import org.apache.linkis.common.conf.CommonVars;
+
+public class PrestoConfiguration {
+
+ public static final CommonVars<Integer> ENGINE_CONCURRENT_LIMIT =
+ CommonVars.apply("wds.linkis.engineconn.concurrent.limit", 100);
+
+ // unit in seconds
+ public static final CommonVars<Long> PRESTO_HTTP_CONNECT_TIME_OUT =
+ CommonVars.apply("wds.linkis.presto.http.connectTimeout", 60L);
+
+ public static final CommonVars<Long> PRESTO_HTTP_READ_TIME_OUT =
+ CommonVars.apply("wds.linkis.presto.http.readTimeout", 60L);
+
+ public static final CommonVars<Integer> ENGINE_DEFAULT_LIMIT =
+ CommonVars.apply("wds.linkis.presto.default.limit", 5000);
+
+ public static final CommonVars<String> PRESTO_URL =
+ CommonVars.apply("wds.linkis.presto.url", "http://127.0.0.1:8080");
+
+ public static final CommonVars<String> PRESTO_RESOURCE_CONFIG_PATH =
+ CommonVars.apply("wds.linkis.presto.resource.config", "");
+
+ public static final CommonVars<String> PRESTO_USER_NAME =
+ CommonVars.apply("wds.linkis.presto.username", "default");
+
+ public static final CommonVars<String> PRESTO_PASSWORD =
+ CommonVars.apply("wds.linkis.presto.password", "");
+
+ public static final CommonVars<String> PRESTO_CATALOG =
+ CommonVars.apply("wds.linkis.presto.catalog", "system");
+
+ public static final CommonVars<String> PRESTO_SCHEMA =
+ CommonVars.apply("wds.linkis.presto.schema", "");
+
+ public static final CommonVars<String> PRESTO_SOURCE =
+ CommonVars.apply("wds.linkis.presto.source", "global");
+
+ public static final CommonVars<String> PRESTO_REQUEST_MEMORY =
+ CommonVars.apply("presto.session.query_max_total_memory", "8GB");
+
+ public static final CommonVars<Boolean> PRESTO_SQL_HOOK_ENABLED =
+ CommonVars.apply("linkis.presto.sql.hook.enabled", true, "presto sql
hook");
+}
diff --git
a/linkis-engineconn-plugins/presto/src/main/java/org/apache/linkis/engineplugin/presto/conf/PrestoEngineConf.java
b/linkis-engineconn-plugins/presto/src/main/java/org/apache/linkis/engineplugin/presto/conf/PrestoEngineConf.java
new file mode 100644
index 000000000..cc1637294
--- /dev/null
+++
b/linkis-engineconn-plugins/presto/src/main/java/org/apache/linkis/engineplugin/presto/conf/PrestoEngineConf.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.presto.conf;
+
+import org.apache.linkis.common.conf.Configuration;
+import
org.apache.linkis.governance.common.protocol.conf.RequestQueryEngineConfigWithGlobalConfig;
+import org.apache.linkis.governance.common.protocol.conf.ResponseQueryConfig;
+import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel;
+import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel;
+import org.apache.linkis.protocol.CacheableProtocol;
+import org.apache.linkis.rpc.RPCMapCache;
+
+import java.util.Map;
+
+import scala.Tuple2;
+
+public class PrestoEngineConf
+ extends RPCMapCache<Tuple2<UserCreatorLabel, EngineTypeLabel>, String,
String> {
+
+ public PrestoEngineConf() {
+
super(Configuration.CLOUD_CONSOLE_CONFIGURATION_SPRING_APPLICATION_NAME().getValue());
+ }
+
+ @Override
+ public CacheableProtocol createRequest(Tuple2<UserCreatorLabel,
EngineTypeLabel> labelTuple) {
+ return new RequestQueryEngineConfigWithGlobalConfig(labelTuple._1(),
labelTuple._2(), null);
+ }
+
+ @Override
+ public Map<String, String> createMap(Object obj) {
+ if (obj instanceof ResponseQueryConfig) {
+ ResponseQueryConfig response = (ResponseQueryConfig) obj;
+ return response.getKeyAndValue();
+ } else {
+ return null;
+ }
+ }
+}
diff --git
a/linkis-engineconn-plugins/presto/src/test/scala/org/apache/linkis/engineplugin/presto/utils/testPrestoSQLHook.scala
b/linkis-engineconn-plugins/presto/src/main/java/org/apache/linkis/engineplugin/presto/exception/PrestoClientException.java
similarity index 71%
rename from
linkis-engineconn-plugins/presto/src/test/scala/org/apache/linkis/engineplugin/presto/utils/testPrestoSQLHook.scala
rename to
linkis-engineconn-plugins/presto/src/main/java/org/apache/linkis/engineplugin/presto/exception/PrestoClientException.java
index 99681c69c..2b2686d17 100644
---
a/linkis-engineconn-plugins/presto/src/test/scala/org/apache/linkis/engineplugin/presto/utils/testPrestoSQLHook.scala
+++
b/linkis-engineconn-plugins/presto/src/main/java/org/apache/linkis/engineplugin/presto/exception/PrestoClientException.java
@@ -15,18 +15,13 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.presto.utils
+package org.apache.linkis.engineplugin.presto.exception;
-import org.junit.jupiter.api.{Assertions, Test}
+import org.apache.linkis.common.exception.ErrorException;
-class testPrestoSQLHook {
+public class PrestoClientException extends ErrorException {
- @Test
- def testPreExecuteHook {
- val prestoSQLHook = PrestoSQLHook
- val code = "`1104`"
- val codes = prestoSQLHook.preExecuteHook(code)
- Assertions.assertEquals(codes, "\"1104\"")
+ public PrestoClientException(int errorCode, String message) {
+ super(errorCode, message);
}
-
}
diff --git
a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/exception/PrestoException.scala
b/linkis-engineconn-plugins/presto/src/main/java/org/apache/linkis/engineplugin/presto/exception/PrestoStateInvalidException.java
similarity index 69%
rename from
linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/exception/PrestoException.scala
rename to
linkis-engineconn-plugins/presto/src/main/java/org/apache/linkis/engineplugin/presto/exception/PrestoStateInvalidException.java
index 221bc0c9f..71831d746 100644
---
a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/exception/PrestoException.scala
+++
b/linkis-engineconn-plugins/presto/src/main/java/org/apache/linkis/engineplugin/presto/exception/PrestoStateInvalidException.java
@@ -15,12 +15,13 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.presto.exception
+package org.apache.linkis.engineplugin.presto.exception;
-import org.apache.linkis.common.exception.ErrorException
+import org.apache.linkis.common.exception.ErrorException;
-case class PrestoStateInvalidException(errorCode: Int, message: String)
- extends ErrorException(errorCode, message: String)
+public class PrestoStateInvalidException extends ErrorException {
-case class PrestoClientException(errorCode: Int, message: String)
- extends ErrorException(errorCode, message: String)
+ public PrestoStateInvalidException(int errorCode, String message) {
+ super(errorCode, message);
+ }
+}
diff --git
a/linkis-engineconn-plugins/presto/src/main/java/org/apache/linkis/engineplugin/presto/executor/PrestoEngineConnExecutor.java
b/linkis-engineconn-plugins/presto/src/main/java/org/apache/linkis/engineplugin/presto/executor/PrestoEngineConnExecutor.java
new file mode 100644
index 000000000..df2d22910
--- /dev/null
+++
b/linkis-engineconn-plugins/presto/src/main/java/org/apache/linkis/engineplugin/presto/executor/PrestoEngineConnExecutor.java
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.presto.executor;
+
+import org.apache.linkis.common.exception.ErrorException;
+import org.apache.linkis.common.io.resultset.ResultSetWriter;
+import org.apache.linkis.common.log.LogUtils;
+import org.apache.linkis.common.utils.OverloadUtils;
+import org.apache.linkis.engineconn.common.conf.EngineConnConf;
+import org.apache.linkis.engineconn.common.conf.EngineConnConstant;
+import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask;
+import
org.apache.linkis.engineconn.computation.executor.execute.ConcurrentComputationExecutor;
+import
org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext;
+import org.apache.linkis.engineconn.core.EngineConnObject;
+import org.apache.linkis.engineplugin.presto.conf.PrestoConfiguration;
+import org.apache.linkis.engineplugin.presto.conf.PrestoEngineConf;
+import org.apache.linkis.engineplugin.presto.errorcode.PrestoErrorCodeSummary;
+import org.apache.linkis.engineplugin.presto.exception.PrestoClientException;
+import
org.apache.linkis.engineplugin.presto.exception.PrestoStateInvalidException;
+import org.apache.linkis.engineplugin.presto.utils.PrestoSQLHook;
+import org.apache.linkis.governance.common.paser.SQLCodeParser;
+import org.apache.linkis.manager.common.entity.resource.CommonNodeResource;
+import org.apache.linkis.manager.common.entity.resource.LoadResource;
+import org.apache.linkis.manager.common.entity.resource.NodeResource;
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils;
+import org.apache.linkis.manager.label.entity.Label;
+import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel;
+import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel;
+import org.apache.linkis.protocol.engine.JobProgressInfo;
+import org.apache.linkis.rpc.Sender;
+import org.apache.linkis.scheduler.executer.ErrorExecuteResponse;
+import org.apache.linkis.scheduler.executer.ExecuteResponse;
+import org.apache.linkis.scheduler.executer.SuccessExecuteResponse;
+import org.apache.linkis.storage.domain.Column;
+import org.apache.linkis.storage.domain.DataType;
+import org.apache.linkis.storage.resultset.ResultSetFactory;
+import org.apache.linkis.storage.resultset.table.TableMetaData;
+import org.apache.linkis.storage.resultset.table.TableRecord;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+
+import org.springframework.util.CollectionUtils;
+
+import java.net.URI;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import scala.Tuple2;
+
+import com.facebook.presto.client.*;
+import com.facebook.presto.spi.security.SelectedRole;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import okhttp3.OkHttpClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PrestoEngineConnExecutor extends ConcurrentComputationExecutor {
+
+ private static final Logger logger =
LoggerFactory.getLogger(PrestoEngineConnExecutor.class);
+
+ private static OkHttpClient okHttpClient =
+ new OkHttpClient.Builder()
+ .socketFactory(new SocketChannelSocketFactory())
+ .connectTimeout(
+ PrestoConfiguration.PRESTO_HTTP_CONNECT_TIME_OUT.getValue(),
TimeUnit.SECONDS)
+
.readTimeout(PrestoConfiguration.PRESTO_HTTP_READ_TIME_OUT.getValue(),
TimeUnit.SECONDS)
+ .build();
+ private int id;
+ private List<Label<?>> executorLabels = new ArrayList<>(2);
+ private Map<String, StatementClient> statementClientCache = new
ConcurrentHashMap<>();
+ private Cache<String, ClientSession> clientSessionCache =
+ CacheBuilder.newBuilder()
+ .expireAfterAccess(
+
Long.valueOf(EngineConnConf.ENGINE_TASK_EXPIRE_TIME().getValue().toString()),
+ TimeUnit.MILLISECONDS)
+ .maximumSize(EngineConnConstant.MAX_TASK_NUM())
+ .build();
+
+ public PrestoEngineConnExecutor(int outputPrintLimit, int id) {
+ super(outputPrintLimit);
+ this.id = id;
+ }
+
+ @Override
+ public void init() {
+ setCodeParser(new SQLCodeParser());
+ super.init();
+ }
+
+ @Override
+ public ExecuteResponse execute(EngineConnTask engineConnTask) {
+ String user = getUserCreatorLabel(engineConnTask.getLables()).getUser();
+ Optional<Label<?>> userCreatorLabelOp =
+ Arrays.stream(engineConnTask.getLables())
+ .filter(label -> label instanceof UserCreatorLabel)
+ .findFirst();
+ Optional<Label<?>> engineTypeLabelOp =
+ Arrays.stream(engineConnTask.getLables())
+ .filter(label -> label instanceof EngineTypeLabel)
+ .findFirst();
+
+ Map<String, String> configMap = null;
+ if (userCreatorLabelOp.isPresent() && engineTypeLabelOp.isPresent()) {
+ UserCreatorLabel userCreatorLabel = (UserCreatorLabel)
userCreatorLabelOp.get();
+ EngineTypeLabel engineTypeLabel = (EngineTypeLabel)
engineTypeLabelOp.get();
+
+ configMap =
+ new PrestoEngineConf().getCacheMap(new Tuple2<>(userCreatorLabel,
engineTypeLabel));
+ }
+
+ clientSessionCache.put(
+ engineConnTask.getTaskId(),
+ getClientSession(user, engineConnTask.getProperties(), configMap));
+ return super.execute(engineConnTask);
+ }
+
+ @Override
+ public ExecuteResponse executeLine(EngineExecutionContext
engineExecutorContext, String code) {
+ boolean enableSqlHook =
PrestoConfiguration.PRESTO_SQL_HOOK_ENABLED.getValue();
+ String realCode;
+ if (StringUtils.isBlank(code)) {
+ realCode = "SELECT 1";
+ } else if (enableSqlHook) {
+ realCode = PrestoSQLHook.preExecuteHook(code.trim());
+ } else {
+ realCode = code.trim();
+ }
+ logger.info("presto client begins to run psql code:\n {}", realCode);
+
+ String taskId = engineExecutorContext.getJobId().get();
+ ClientSession clientSession = clientSessionCache.getIfPresent(taskId);
+ StatementClient statement =
+ StatementClientFactory.newStatementClient(okHttpClient, clientSession,
realCode);
+ statementClientCache.put(taskId, statement);
+
+ try {
+ initialStatusUpdates(taskId, engineExecutorContext, statement);
+ if (statement.isRunning()
+ || (statement.isFinished() && statement.finalStatusInfo().getError()
== null)) {
+ queryOutput(taskId, engineExecutorContext, statement);
+ }
+ ErrorExecuteResponse errorResponse = null;
+ try {
+ errorResponse = verifyServerError(taskId, engineExecutorContext,
statement);
+ } catch (ErrorException e) {
+ logger.error("Presto execute failed (#{}): {}", e.getErrCode(),
e.getMessage());
+ }
+ if (errorResponse == null) {
+ // update session
+ clientSessionCache.put(taskId, updateSession(clientSession,
statement));
+ return new SuccessExecuteResponse();
+ } else {
+ return errorResponse;
+ }
+ } finally {
+ statementClientCache.remove(taskId);
+ }
+ }
+
+ @Override
+ public ExecuteResponse executeCompletely(
+ EngineExecutionContext engineExecutorContext, String code, String
completedLine) {
+ return null;
+ }
+
+ // todo
+ @Override
+ public float progress(String taskID) {
+ return 0.0f;
+ }
+
+ @Override
+ public JobProgressInfo[] getProgressInfo(String taskID) {
+ return new JobProgressInfo[0];
+ }
+
+ @Override
+ public void killTask(String taskId) {
+ StatementClient statement = statementClientCache.remove(taskId);
+ if (null != statement) {
+ statement.cancelLeafStage();
+ }
+ super.killTask(taskId);
+ }
+
+ @Override
+ public List<Label<?>> getExecutorLabels() {
+ return executorLabels;
+ }
+
+ @Override
+ public void setExecutorLabels(List<Label<?>> labels) {
+ if (!CollectionUtils.isEmpty(labels)) {
+ executorLabels.clear();
+ executorLabels.addAll(labels);
+ }
+ }
+
+ @Override
+ public boolean supportCallBackLogs() {
+ return false;
+ }
+
+ @Override
+ public NodeResource requestExpectedResource(NodeResource expectedResource) {
+ return null;
+ }
+
+ @Override
+ public NodeResource getCurrentNodeResource() {
+ NodeResourceUtils.appendMemoryUnitIfMissing(
+ EngineConnObject.getEngineCreationContext().getOptions());
+
+ CommonNodeResource resource = new CommonNodeResource();
+ LoadResource usedResource = new
LoadResource(OverloadUtils.getProcessMaxMemory(), 1);
+ resource.setUsedResource(usedResource);
+ return resource;
+ }
+
+ @Override
+ public String getId() {
+ return Sender.getThisServiceInstance().getInstance() + "_" + id;
+ }
+
+ @Override
+ public int getConcurrentLimit() {
+ return PrestoConfiguration.ENGINE_CONCURRENT_LIMIT.getValue();
+ }
+
+ private ClientSession getClientSession(
+ String user, Map<String, Object> taskParams, Map<String, String>
cacheMap) {
+ Map<String, String> configMap = new HashMap<>();
+ // The parameter priority specified at runtime is higher than the
configuration priority of the
+ // management console
+ if (!CollectionUtils.isEmpty(cacheMap)) {
+ configMap.putAll(cacheMap);
+ }
+ taskParams.entrySet().stream()
+ .filter(entry -> entry.getValue() != null)
+ .forEach(entry -> configMap.put(entry.getKey(),
String.valueOf(entry.getValue())));
+
+ URI httpUri =
URI.create(PrestoConfiguration.PRESTO_URL.getValue(configMap));
+ String source = PrestoConfiguration.PRESTO_SOURCE.getValue(configMap);
+ String catalog = PrestoConfiguration.PRESTO_CATALOG.getValue(configMap);
+ String schema = PrestoConfiguration.PRESTO_SCHEMA.getValue(configMap);
+
+ Map<String, String> properties =
+ configMap.entrySet().stream()
+ .filter(entry -> entry.getKey().startsWith("presto.session."))
+ .collect(
+ Collectors.toMap(
+ entry ->
entry.getKey().substring("presto.session.".length()),
+ Map.Entry::getValue));
+
+ String clientInfo = "Linkis";
+ String transactionId = null;
+ Optional<String> traceToken = Optional.empty();
+ Set<String> clientTags = Collections.emptySet();
+ String timeZonId = TimeZone.getDefault().getID();
+ Locale locale = Locale.getDefault();
+ Map<String, String> resourceEstimates = Collections.emptyMap();
+ Map<String, String> preparedStatements = Collections.emptyMap();
+ Map<String, SelectedRole> roles = Collections.emptyMap();
+ Map<String, String> extraCredentials = Collections.emptyMap();
+ io.airlift.units.Duration clientRequestTimeout =
+ new io.airlift.units.Duration(0, TimeUnit.MILLISECONDS);
+
+ return new ClientSession(
+ httpUri,
+ user,
+ source,
+ traceToken,
+ clientTags,
+ clientInfo,
+ catalog,
+ schema,
+ timeZonId,
+ locale,
+ resourceEstimates,
+ properties,
+ preparedStatements,
+ roles,
+ extraCredentials,
+ transactionId,
+ clientRequestTimeout);
+ }
+
+ private UserCreatorLabel getUserCreatorLabel(Label<?>[] labels) {
+ return (UserCreatorLabel)
+ Arrays.stream(labels).filter(label -> label instanceof
UserCreatorLabel).findFirst().get();
+ }
+
+ private void initialStatusUpdates(
+ String taskId, EngineExecutionContext engineExecutorContext,
StatementClient statement) {
+ while (statement.isRunning()
+ && (statement.currentData().getData() == null
+ || statement.currentStatusInfo().getUpdateType() != null)) {
+ engineExecutorContext.pushProgress(progress(taskId),
getProgressInfo(taskId));
+ statement.advance();
+ }
+ }
+
+ private void queryOutput(
+ String taskId, EngineExecutionContext engineExecutorContext,
StatementClient statement) {
+ int columnCount = 0;
+ int rows = 0;
+ ResultSetWriter resultSetWriter =
+
engineExecutorContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE);
+ try {
+ QueryStatusInfo results = null;
+ if (statement.isRunning()) {
+ results = statement.currentStatusInfo();
+ } else {
+ results = statement.finalStatusInfo();
+ }
+ if (results.getColumns() == null) {
+ throw new RuntimeException("presto columns is null.");
+ }
+ List<Column> columns =
+ results.getColumns().stream()
+ .map(
+ column -> new Column(column.getName(),
DataType.toDataType(column.getType()), ""))
+ .collect(Collectors.toList());
+ columnCount = columns.size();
+ resultSetWriter.addMetaData(new TableMetaData(columns.toArray(new
Column[0])));
+ while (statement.isRunning()) {
+ Iterable<List<Object>> data = statement.currentData().getData();
+ if (data != null) {
+ for (List<Object> row : data) {
+ String[] rowArray = row.stream().map(r ->
String.valueOf(r)).toArray(String[]::new);
+ resultSetWriter.addRecord(new TableRecord(rowArray));
+ rows += 1;
+ }
+ }
+ engineExecutorContext.pushProgress(progress(taskId),
getProgressInfo(taskId));
+ statement.advance();
+ }
+ } catch (Exception e) {
+ IOUtils.closeQuietly(resultSetWriter);
+ }
+ String message = String.format("Fetched %d col(s) : %d row(s) in presto",
columnCount, rows);
+ logger.info(message);
+ engineExecutorContext.appendStdout(LogUtils.generateInfo(message));
+ engineExecutorContext.sendResultSet(resultSetWriter);
+ }
+
+ private ErrorExecuteResponse verifyServerError(
+ String taskId, EngineExecutionContext engineExecutorContext,
StatementClient statement)
+ throws ErrorException {
+ engineExecutorContext.pushProgress(progress(taskId),
getProgressInfo(taskId));
+ if (statement.isFinished()) {
+ QueryStatusInfo info = statement.finalStatusInfo();
+ if (info.getError() != null) {
+ QueryError error = Objects.requireNonNull(info.getError());
+ logger.error("Presto execute failed (#{}): {}", info.getId(),
error.getMessage());
+ Throwable cause = null;
+ if (error.getFailureInfo() != null) {
+ cause = error.getFailureInfo().toException();
+ }
+ engineExecutorContext.appendStdout(
+ LogUtils.generateERROR(ExceptionUtils.getStackTrace(cause)));
+ return new ErrorExecuteResponse(ExceptionUtils.getMessage(cause),
cause);
+ } else {
+ return null;
+ }
+ } else if (statement.isClientAborted()) {
+ logger.warn("Presto statement is killed.");
+ return null;
+ } else if (statement.isClientError()) {
+ throw new PrestoClientException(
+ PrestoErrorCodeSummary.PRESTO_CLIENT_ERROR.getErrorCode(),
+ PrestoErrorCodeSummary.PRESTO_CLIENT_ERROR.getErrorDesc());
+ } else {
+ throw new PrestoStateInvalidException(
+ PrestoErrorCodeSummary.PRESTO_STATE_INVALID.getErrorCode(),
+ PrestoErrorCodeSummary.PRESTO_STATE_INVALID.getErrorDesc());
+ }
+ }
+
+ private ClientSession updateSession(ClientSession clientSession,
StatementClient statement) {
+ ClientSession newSession = clientSession;
+
+ // update catalog and schema if present
+ if (statement.getSetCatalog().isPresent() ||
statement.getSetSchema().isPresent()) {
+ newSession =
+ ClientSession.builder(newSession)
+
.withCatalog(statement.getSetCatalog().orElse(newSession.getCatalog()))
+
.withSchema(statement.getSetSchema().orElse(newSession.getSchema()))
+ .build();
+ }
+
+ // update transaction ID if necessary
+ if (statement.isClearTransactionId()) {
+ newSession = ClientSession.stripTransactionId(newSession);
+ }
+
+ ClientSession.Builder builder = ClientSession.builder(newSession);
+
+ if (statement.getStartedTransactionId() != null) {
+ builder = builder.withTransactionId(statement.getStartedTransactionId());
+ }
+
+ // update session properties if present
+ if (!statement.getSetSessionProperties().isEmpty()
+ || !statement.getResetSessionProperties().isEmpty()) {
+ Map<String, String> sessionProperties = new
HashMap<>(newSession.getProperties());
+ sessionProperties.putAll(statement.getSetSessionProperties());
+
sessionProperties.keySet().removeAll(statement.getResetSessionProperties());
+ builder = builder.withProperties(sessionProperties);
+ }
+
+ // update session roles
+ if (!statement.getSetRoles().isEmpty()) {
+ Map<String, SelectedRole> roles = new HashMap<>(newSession.getRoles());
+ roles.putAll(statement.getSetRoles());
+ builder = builder.withRoles(roles);
+ }
+
+ // update prepared statements if present
+ if (!statement.getAddedPreparedStatements().isEmpty()
+ || !statement.getDeallocatedPreparedStatements().isEmpty()) {
+ Map<String, String> preparedStatements = new
HashMap<>(newSession.getPreparedStatements());
+ preparedStatements.putAll(statement.getAddedPreparedStatements());
+
preparedStatements.keySet().removeAll(statement.getDeallocatedPreparedStatements());
+ builder = builder.withPreparedStatements(preparedStatements);
+ }
+
+ return builder.build();
+ }
+
+ @Override
+ public void killAll() {
+ Iterator<StatementClient> iterator =
statementClientCache.values().iterator();
+ while (iterator.hasNext()) {
+ StatementClient statement = iterator.next();
+ if (statement != null) {
+ statement.cancelLeafStage();
+ }
+ }
+ statementClientCache.clear();
+ }
+
+ @Override
+ public void close() {
+ killAll();
+ super.close();
+ }
+}
diff --git
a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/utils/PrestoSQLHook.scala
b/linkis-engineconn-plugins/presto/src/main/java/org/apache/linkis/engineplugin/presto/utils/PrestoSQLHook.java
similarity index 72%
rename from
linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/utils/PrestoSQLHook.scala
rename to
linkis-engineconn-plugins/presto/src/main/java/org/apache/linkis/engineplugin/presto/utils/PrestoSQLHook.java
index 6282af112..5acfbc012 100644
---
a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/utils/PrestoSQLHook.scala
+++
b/linkis-engineconn-plugins/presto/src/main/java/org/apache/linkis/engineplugin/presto/utils/PrestoSQLHook.java
@@ -15,22 +15,20 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.presto.utils
+package org.apache.linkis.engineplugin.presto.utils;
-import org.apache.commons.lang3.StringUtils
+import org.apache.commons.lang3.StringUtils;
-object PrestoSQLHook {
-
- def preExecuteHook(code: String): String = {
- replaceBackQuoted(code)
+public class PrestoSQLHook {
+ public static String preExecuteHook(String code) {
+ return replaceBackQuoted(code);
}
- private def replaceBackQuoted(code: String): String = {
+ private static String replaceBackQuoted(String code) {
if (StringUtils.isNotBlank(code)) {
- code.replaceAll("`", "\"")
+ return code.replaceAll("`", "\"");
} else {
- code
+ return code;
}
}
-
}
diff --git
a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/PrestoEngineConnPlugin.scala
b/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/PrestoEngineConnPlugin.scala
deleted file mode 100644
index f86dfa73b..000000000
---
a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/PrestoEngineConnPlugin.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.engineplugin.presto
-
-import
org.apache.linkis.engineplugin.presto.builder.PrestoProcessEngineConnLaunchBuilder
-import org.apache.linkis.engineplugin.presto.factory.PrestoEngineConnFactory
-import org.apache.linkis.manager.engineplugin.common.EngineConnPlugin
-import org.apache.linkis.manager.engineplugin.common.creation.EngineConnFactory
-import
org.apache.linkis.manager.engineplugin.common.launch.EngineConnLaunchBuilder
-import org.apache.linkis.manager.engineplugin.common.resource.{
- EngineResourceFactory,
- GenericEngineResourceFactory
-}
-import org.apache.linkis.manager.label.entity.Label
-
-import java.util
-
-class PrestoEngineConnPlugin extends EngineConnPlugin {
-
- private val resourceLocker = new Object()
-
- private val engineLaunchBuilderLocker = new Object()
-
- private val engineFactoryLocker = new Object()
-
- private var engineResourceFactory: EngineResourceFactory = _
-
- private var engineLaunchBuilder: EngineConnLaunchBuilder = _
-
- private var engineFactory: EngineConnFactory = _
-
- private val defaultLabels: util.List[Label[_]] = new
util.ArrayList[Label[_]]()
-
- override def init(params: util.Map[String, AnyRef]): Unit = {}
-
- override def getEngineResourceFactory: EngineResourceFactory = {
- if (null == engineResourceFactory) resourceLocker synchronized {
- engineResourceFactory = new GenericEngineResourceFactory
- }
- engineResourceFactory
- }
-
- override def getEngineConnLaunchBuilder: EngineConnLaunchBuilder = {
- new PrestoProcessEngineConnLaunchBuilder
- }
-
- override def getEngineConnFactory: EngineConnFactory = {
- if (null == engineFactory) engineFactoryLocker synchronized {
- engineFactory = new PrestoEngineConnFactory
- }
- engineFactory
- }
-
- override def getDefaultLabels: util.List[Label[_]] = defaultLabels
-
-}
diff --git
a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/conf/PrestoConfiguration.scala
b/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/conf/PrestoConfiguration.scala
deleted file mode 100644
index 576e59199..000000000
---
a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/conf/PrestoConfiguration.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.engineplugin.presto.conf
-
-import org.apache.linkis.common.conf.{ByteType, CommonVars}
-
-import java.lang
-
-object PrestoConfiguration {
-
- val ENGINE_CONCURRENT_LIMIT =
CommonVars[Int]("wds.linkis.engineconn.concurrent.limit", 100)
-
- val PRESTO_HTTP_CONNECT_TIME_OUT = CommonVars[java.lang.Long](
- "wds.linkis.presto.http.connectTimeout",
- new lang.Long(60)
- ) // unit in seconds
-
- val PRESTO_HTTP_READ_TIME_OUT =
- CommonVars[java.lang.Long]("wds.linkis.presto.http.readTimeout", new
lang.Long(60))
-
- val ENGINE_DEFAULT_LIMIT = CommonVars("wds.linkis.presto.default.limit",
5000)
- val PRESTO_URL = CommonVars("wds.linkis.presto.url", "http://127.0.0.1:8080")
- val PRESTO_RESOURCE_CONFIG_PATH =
CommonVars("wds.linkis.presto.resource.config", "");
- val PRESTO_USER_NAME = CommonVars("wds.linkis.presto.username", "default")
- val PRESTO_PASSWORD = CommonVars("wds.linkis.presto.password", "")
- val PRESTO_CATALOG = CommonVars("wds.linkis.presto.catalog", "system")
- val PRESTO_SCHEMA = CommonVars("wds.linkis.presto.schema", "")
- val PRESTO_SOURCE = CommonVars("wds.linkis.presto.source", "global")
- val PRESTO_REQUEST_MEMORY =
CommonVars("presto.session.query_max_total_memory", "8GB")
-
- val PRESTO_SQL_HOOK_ENABLED =
- CommonVars("linkis.presto.sql.hook.enabled", true, "presto sql hook")
-
-}
diff --git
a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/conf/PrestoEngineConf.scala
b/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/conf/PrestoEngineConf.scala
deleted file mode 100644
index 98568c916..000000000
---
a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/conf/PrestoEngineConf.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.engineplugin.presto.conf
-
-import org.apache.linkis.common.conf.Configuration
-import org.apache.linkis.governance.common.protocol.conf.{
- RequestQueryEngineConfigWithGlobalConfig,
- ResponseQueryConfig
-}
-import org.apache.linkis.manager.label.entity.engine.{EngineTypeLabel,
UserCreatorLabel}
-import org.apache.linkis.protocol.CacheableProtocol
-import org.apache.linkis.rpc.RPCMapCache
-
-import java.util
-
-object PrestoEngineConf
- extends RPCMapCache[(UserCreatorLabel, EngineTypeLabel), String, String](
-
Configuration.CLOUD_CONSOLE_CONFIGURATION_SPRING_APPLICATION_NAME.getValue
- ) {
-
- override protected def createRequest(
- labelTuple: (UserCreatorLabel, EngineTypeLabel)
- ): CacheableProtocol = {
- RequestQueryEngineConfigWithGlobalConfig(labelTuple._1, labelTuple._2)
- }
-
- override protected def createMap(any: Any): util.Map[String, String] = any
match {
- case response: ResponseQueryConfig => response.getKeyAndValue
- }
-
-}
diff --git
a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/executor/PrestoEngineConnExecutor.scala
b/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/executor/PrestoEngineConnExecutor.scala
deleted file mode 100644
index 878ae63f8..000000000
---
a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/executor/PrestoEngineConnExecutor.scala
+++ /dev/null
@@ -1,453 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.engineplugin.presto.executor
-
-import org.apache.linkis.common.log.LogUtils
-import org.apache.linkis.common.utils.{OverloadUtils, Utils}
-import org.apache.linkis.engineconn.common.conf.{EngineConnConf,
EngineConnConstant}
-import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask
-import org.apache.linkis.engineconn.computation.executor.execute.{
- ConcurrentComputationExecutor,
- EngineExecutionContext
-}
-import org.apache.linkis.engineconn.core.EngineConnObject
-import org.apache.linkis.engineplugin.presto.conf.PrestoConfiguration._
-import org.apache.linkis.engineplugin.presto.conf.PrestoEngineConf
-import org.apache.linkis.engineplugin.presto.errorcode.PrestoErrorCodeSummary
-import org.apache.linkis.engineplugin.presto.exception.{
- PrestoClientException,
- PrestoStateInvalidException
-}
-import org.apache.linkis.engineplugin.presto.utils.PrestoSQLHook
-import org.apache.linkis.governance.common.paser.SQLCodeParser
-import org.apache.linkis.manager.common.entity.resource.{
- CommonNodeResource,
- LoadResource,
- NodeResource
-}
-import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
-import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
-import org.apache.linkis.manager.label.entity.Label
-import org.apache.linkis.manager.label.entity.engine.{EngineTypeLabel,
UserCreatorLabel}
-import org.apache.linkis.protocol.engine.JobProgressInfo
-import org.apache.linkis.rpc.Sender
-import org.apache.linkis.scheduler.executer.{
- ErrorExecuteResponse,
- ExecuteResponse,
- SuccessExecuteResponse
-}
-import org.apache.linkis.storage.domain.{Column, DataType}
-import org.apache.linkis.storage.resultset.ResultSetFactory
-import org.apache.linkis.storage.resultset.table.{TableMetaData, TableRecord}
-
-import org.apache.commons.io.IOUtils
-import org.apache.commons.lang3.StringUtils
-import org.apache.commons.lang3.exception.ExceptionUtils
-
-import org.springframework.util.CollectionUtils
-
-import java.net.URI
-import java.util
-import java.util._
-import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
-
-import scala.collection.JavaConverters._
-
-import com.facebook.presto.client._
-import com.facebook.presto.spi.security.SelectedRole
-import com.google.common.cache.{Cache, CacheBuilder}
-import okhttp3.OkHttpClient
-
-class PrestoEngineConnExecutor(override val outputPrintLimit: Int, val id: Int)
- extends ConcurrentComputationExecutor(outputPrintLimit) {
-
- private val okHttpClient: OkHttpClient =
PrestoEngineConnExecutor.OK_HTTP_CLIENT
-
- private val executorLabels: util.List[Label[_]] = new
util.ArrayList[Label[_]](2)
-
- private val statementClientCache: util.Map[String, StatementClient] =
- new ConcurrentHashMap[String, StatementClient]()
-
- private val clientSessionCache: Cache[String, ClientSession] = CacheBuilder
- .newBuilder()
- .expireAfterAccess(EngineConnConf.ENGINE_TASK_EXPIRE_TIME.getValue,
TimeUnit.MILLISECONDS)
- .maximumSize(EngineConnConstant.MAX_TASK_NUM)
- .build()
-
- override def init: Unit = {
- setCodeParser(new SQLCodeParser)
- super.init
- }
-
- override def execute(engineConnTask: EngineConnTask): ExecuteResponse = {
- val user = getUserCreatorLabel(engineConnTask.getLables).getUser
- val userCreatorLabel =
engineConnTask.getLables.find(_.isInstanceOf[UserCreatorLabel]).get
- val engineTypeLabel =
engineConnTask.getLables.find(_.isInstanceOf[EngineTypeLabel]).get
- var configMap: util.Map[String, String] = null
- if (userCreatorLabel != null && engineTypeLabel != null) {
- configMap = PrestoEngineConf.getCacheMap(
- (
- userCreatorLabel.asInstanceOf[UserCreatorLabel],
- engineTypeLabel.asInstanceOf[EngineTypeLabel]
- )
- )
- }
- clientSessionCache.put(
- engineConnTask.getTaskId,
- getClientSession(user, engineConnTask.getProperties, configMap)
- )
- super.execute(engineConnTask)
- }
-
- override def executeLine(
- engineExecutorContext: EngineExecutionContext,
- code: String
- ): ExecuteResponse = {
- val enableSqlHook = PRESTO_SQL_HOOK_ENABLED.getValue
- val realCode = if (StringUtils.isBlank(code)) {
- "SELECT 1"
- } else if (enableSqlHook) {
- PrestoSQLHook.preExecuteHook(code.trim)
- } else {
- code.trim
- }
- logger.info(s"presto client begins to run psql code:\n $realCode")
-
- val taskId = engineExecutorContext.getJobId.get
-
- val clientSession = clientSessionCache.getIfPresent(taskId)
- val statement = StatementClientFactory.newStatementClient(okHttpClient,
clientSession, realCode)
- statementClientCache.put(taskId, statement)
- Utils.tryFinally {
- initialStatusUpdates(taskId, engineExecutorContext, statement)
- if (
- statement.isRunning || (statement.isFinished && statement
- .finalStatusInfo()
- .getError == null)
- ) {
- queryOutput(taskId, engineExecutorContext, statement)
- }
- val errorResponse = verifyServerError(taskId, engineExecutorContext,
statement)
- if (errorResponse == null) {
- // update session
- clientSessionCache.put(taskId, updateSession(clientSession, statement))
- SuccessExecuteResponse()
- } else {
- errorResponse
- }
- } {
- statementClientCache.remove(taskId)
- }
-
- }
-
- override def executeCompletely(
- engineExecutorContext: EngineExecutionContext,
- code: String,
- completedLine: String
- ): ExecuteResponse = null
-
- // todo
- override def progress(taskID: String): Float = 0.0f
-
- override def getProgressInfo(taskID: String): Array[JobProgressInfo] =
- Array.empty[JobProgressInfo]
-
- override def killTask(taskId: String): Unit = {
- val statement = statementClientCache.remove(taskId)
- if (null != statement) {
- Utils.tryAndWarn(statement.cancelLeafStage())
- }
- super.killTask(taskId)
- }
-
- override def getExecutorLabels(): util.List[Label[_]] = executorLabels
-
- override def setExecutorLabels(labels: util.List[Label[_]]): Unit = {
- if (!CollectionUtils.isEmpty(labels)) {
- executorLabels.clear()
- executorLabels.addAll(labels)
- }
- }
-
- override def supportCallBackLogs(): Boolean = false
-
- override def requestExpectedResource(expectedResource: NodeResource):
NodeResource = {
- null
- }
-
- override def getCurrentNodeResource(): NodeResource = {
- NodeResourceUtils.appendMemoryUnitIfMissing(
- EngineConnObject.getEngineCreationContext.getOptions
- )
-
- val resource = new CommonNodeResource
- val usedResource = new LoadResource(OverloadUtils.getProcessMaxMemory, 1)
- resource.setUsedResource(usedResource)
- resource
- }
-
- override def getId(): String = Sender.getThisServiceInstance.getInstance +
s"_$id"
-
- override def getConcurrentLimit: Int = ENGINE_CONCURRENT_LIMIT.getValue
-
- private def getClientSession(
- user: String,
- taskParams: util.Map[String, Object],
- cacheMap: util.Map[String, String]
- ): ClientSession = {
- val configMap = new util.HashMap[String, String]()
- // The parameter priority specified at runtime is higher than the
configuration priority of the management console
- if (!CollectionUtils.isEmpty(cacheMap)) configMap.putAll(cacheMap)
- taskParams.asScala.foreach {
- case (key: String, value: Object) if value != null =>
- configMap.put(key, String.valueOf(value))
- case _ =>
- }
- val httpUri: URI = URI.create(PRESTO_URL.getValue(configMap))
- val source: String = PRESTO_SOURCE.getValue(configMap)
- val catalog: String = PRESTO_CATALOG.getValue(configMap)
- val schema: String = PRESTO_SCHEMA.getValue(configMap)
-
- val properties: util.Map[String, String] = configMap.asScala
- .filter(tuple => tuple._1.startsWith("presto.session."))
- .map(tuple => (tuple._1.substring("presto.session.".length), tuple._2))
- .asJava
-
- val clientInfo: String = "Linkis"
- val transactionId: String = null
- val traceToken: util.Optional[String] = Optional.empty()
- val clientTags: util.Set[String] = Collections.emptySet()
- val timeZonId = TimeZone.getDefault.getID
- val locale: Locale = Locale.getDefault
- val resourceEstimates: util.Map[String, String] = Collections.emptyMap()
- val preparedStatements: util.Map[String, String] = Collections.emptyMap()
- val roles: java.util.Map[String, SelectedRole] = Collections.emptyMap()
- val extraCredentials: util.Map[String, String] = Collections.emptyMap()
-
- val clientRequestTimeout: io.airlift.units.Duration =
- new io.airlift.units.Duration(0, TimeUnit.MILLISECONDS)
-
- new ClientSession(
- httpUri,
- user,
- source,
- traceToken,
- clientTags,
- clientInfo,
- catalog,
- schema,
- timeZonId,
- locale,
- resourceEstimates,
- properties,
- preparedStatements,
- roles,
- extraCredentials,
- transactionId,
- clientRequestTimeout
- )
- }
-
- private def getUserCreatorLabel(labels: Array[Label[_]]): UserCreatorLabel =
{
- labels
- .find(l => l.isInstanceOf[UserCreatorLabel])
- .get
- .asInstanceOf[UserCreatorLabel]
- }
-
- private def initialStatusUpdates(
- taskId: String,
- engineExecutorContext: EngineExecutionContext,
- statement: StatementClient
- ): Unit = {
- while (
- statement.isRunning
- && (statement.currentData().getData == null || statement
- .currentStatusInfo()
- .getUpdateType != null)
- ) {
- engineExecutorContext.pushProgress(progress(taskId),
getProgressInfo(taskId))
- statement.advance()
- }
- }
-
- private def queryOutput(
- taskId: String,
- engineExecutorContext: EngineExecutionContext,
- statement: StatementClient
- ): Unit = {
- var columnCount = 0
- var rows = 0
- val resultSetWriter =
engineExecutorContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE)
- Utils.tryCatch {
- var results: QueryStatusInfo = null
- if (statement.isRunning) {
- results = statement.currentStatusInfo()
- } else {
- results = statement.finalStatusInfo()
- }
- if (results.getColumns == null) {
- throw new RuntimeException("presto columns is null.")
- }
- val columns = results.getColumns.asScala
- .map(column => new Column(column.getName,
DataType.toDataType(column.getType), ""))
- .toArray[Column]
- columnCount = columns.length
- resultSetWriter.addMetaData(new TableMetaData(columns))
- while (statement.isRunning) {
- val data = statement.currentData().getData
- if (data != null) for (row <- data.asScala) {
- val rowArray = row.asScala.map(r => String.valueOf(r))
- resultSetWriter.addRecord(new TableRecord(rowArray.toArray))
- rows += 1
- }
- engineExecutorContext.pushProgress(progress(taskId),
getProgressInfo(taskId))
- statement.advance()
- }
- } { case e: Exception =>
- IOUtils.closeQuietly(resultSetWriter)
- throw e
- }
- logger.info(s"Fetched $columnCount col(s) : $rows row(s) in presto")
- engineExecutorContext.appendStdout(
- LogUtils.generateInfo(s"Fetched $columnCount col(s) : $rows row(s) in
presto")
- );
- engineExecutorContext.sendResultSet(resultSetWriter)
- }
-
- // check presto error
- private def verifyServerError(
- taskId: String,
- engineExecutorContext: EngineExecutionContext,
- statement: StatementClient
- ): ErrorExecuteResponse = {
- engineExecutorContext.pushProgress(progress(taskId),
getProgressInfo(taskId))
- if (statement.isFinished) {
- val info: QueryStatusInfo = statement.finalStatusInfo()
- if (info.getError != null) {
- val error = Objects.requireNonNull(info.getError);
- val message: String = s"Presto execute failed (#${info.getId}):
${error.getMessage}"
- var cause: Throwable = null
- if (error.getFailureInfo != null) {
- cause = error.getFailureInfo.toException
- }
- engineExecutorContext.appendStdout(
- LogUtils.generateERROR(ExceptionUtils.getStackTrace(cause))
- )
- ErrorExecuteResponse(ExceptionUtils.getMessage(cause), cause)
- } else null
- } else if (statement.isClientAborted) {
- logger.warn(s"Presto statement is killed.")
- null
- } else if (statement.isClientError) {
- throw PrestoClientException(
- PrestoErrorCodeSummary.PRESTO_CLIENT_ERROR.getErrorCode,
- PrestoErrorCodeSummary.PRESTO_CLIENT_ERROR.getErrorDesc
- )
- } else {
- throw PrestoStateInvalidException(
- PrestoErrorCodeSummary.PRESTO_STATE_INVALID.getErrorCode,
- PrestoErrorCodeSummary.PRESTO_STATE_INVALID.getErrorDesc
- )
- }
- }
-
- private def updateSession(
- clientSession: ClientSession,
- statement: StatementClient
- ): ClientSession = {
- var newSession = clientSession
- // update catalog and schema if present
- if (statement.getSetCatalog.isPresent || statement.getSetSchema.isPresent)
{
- newSession = ClientSession
- .builder(newSession)
- .withCatalog(statement.getSetCatalog.orElse(newSession.getCatalog))
- .withSchema(statement.getSetSchema.orElse(newSession.getSchema))
- .build
- }
-
- // update transaction ID if necessary
- if (statement.isClearTransactionId) newSession =
ClientSession.stripTransactionId(newSession)
-
- var builder: ClientSession.Builder = ClientSession.builder(newSession)
-
- if (statement.getStartedTransactionId != null) {
- builder = builder.withTransactionId(statement.getStartedTransactionId)
- }
-
- // update session properties if present
- if (
- !statement.getSetSessionProperties.isEmpty ||
!statement.getResetSessionProperties.isEmpty
- ) {
- val sessionProperties: util.Map[String, String] =
- new util.HashMap[String, String](newSession.getProperties)
- sessionProperties.putAll(statement.getSetSessionProperties)
- sessionProperties.keySet.removeAll(statement.getResetSessionProperties)
- builder = builder.withProperties(sessionProperties)
- }
-
- // update session roles
- if (!statement.getSetRoles.isEmpty) {
- val roles: util.Map[String, SelectedRole] =
- new util.HashMap[String, SelectedRole](newSession.getRoles)
- roles.putAll(statement.getSetRoles)
- builder = builder.withRoles(roles)
- }
-
- // update prepared statements if present
- if (
- !statement.getAddedPreparedStatements.isEmpty ||
!statement.getDeallocatedPreparedStatements.isEmpty
- ) {
- val preparedStatements: util.Map[String, String] =
- new util.HashMap[String, String](newSession.getPreparedStatements)
- preparedStatements.putAll(statement.getAddedPreparedStatements)
-
preparedStatements.keySet.removeAll(statement.getDeallocatedPreparedStatements)
- builder = builder.withPreparedStatements(preparedStatements)
- }
-
- newSession
- }
-
- override def killAll(): Unit = {
- val iterator = statementClientCache.values().iterator()
- while (iterator.hasNext) {
- val statement = iterator.next()
- if (null != statement) {
- Utils.tryAndWarn(statement.cancelLeafStage())
- }
- }
- statementClientCache.clear()
- }
-
- override def close(): Unit = {
- killAll()
- super.close()
- }
-
-}
-
-object PrestoEngineConnExecutor {
-
- private val OK_HTTP_CLIENT: OkHttpClient = new OkHttpClient.Builder()
- .socketFactory(new SocketChannelSocketFactory)
- .connectTimeout(PRESTO_HTTP_CONNECT_TIME_OUT.getValue, TimeUnit.SECONDS)
- .readTimeout(PRESTO_HTTP_READ_TIME_OUT.getValue, TimeUnit.SECONDS)
- .build()
-
-}
diff --git
a/linkis-engineconn-plugins/presto/src/test/scala/org/apache/linkis/engineplugin/presto/TestPrestoEngineConnPlugin.scala
b/linkis-engineconn-plugins/presto/src/test/java/org/apache/linkis/engineplugin/presto/TestPrestoEngineConnPlugin.java
similarity index 74%
copy from
linkis-engineconn-plugins/presto/src/test/scala/org/apache/linkis/engineplugin/presto/TestPrestoEngineConnPlugin.scala
copy to
linkis-engineconn-plugins/presto/src/test/java/org/apache/linkis/engineplugin/presto/TestPrestoEngineConnPlugin.java
index 663f91126..d05a32aa4 100644
---
a/linkis-engineconn-plugins/presto/src/test/scala/org/apache/linkis/engineplugin/presto/TestPrestoEngineConnPlugin.scala
+++
b/linkis-engineconn-plugins/presto/src/test/java/org/apache/linkis/engineplugin/presto/TestPrestoEngineConnPlugin.java
@@ -15,16 +15,16 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.presto
+package org.apache.linkis.engineplugin.presto;
-import org.junit.jupiter.api.{Assertions, Test}
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
-class TestPrestoEngineConnPlugin {
+public class TestPrestoEngineConnPlugin {
@Test
- def testGetEngineResourceFactory: Unit = {
- val prestoEngineConnPlugin = new PrestoEngineConnPlugin
- Assertions.assertNotNull(prestoEngineConnPlugin.getEngineConnFactory)
+ public void testGetEngineResourceFactory() {
+ PrestoEngineConnPlugin prestoEngineConnPlugin = new
PrestoEngineConnPlugin();
+ Assertions.assertNotNull(prestoEngineConnPlugin.getEngineConnFactory());
}
-
}
diff --git
a/linkis-engineconn-plugins/presto/src/test/scala/org/apache/linkis/engineplugin/presto/builder/TestPrestoProcessEngineConnLaunchBuilder.scala
b/linkis-engineconn-plugins/presto/src/test/java/org/apache/linkis/engineplugin/presto/builder/TestPrestoProcessEngineConnLaunchBuilder.java
similarity index 61%
rename from
linkis-engineconn-plugins/presto/src/test/scala/org/apache/linkis/engineplugin/presto/builder/TestPrestoProcessEngineConnLaunchBuilder.scala
rename to
linkis-engineconn-plugins/presto/src/test/java/org/apache/linkis/engineplugin/presto/builder/TestPrestoProcessEngineConnLaunchBuilder.java
index c7aad5f6d..ee96f998f 100644
---
a/linkis-engineconn-plugins/presto/src/test/scala/org/apache/linkis/engineplugin/presto/builder/TestPrestoProcessEngineConnLaunchBuilder.scala
+++
b/linkis-engineconn-plugins/presto/src/test/java/org/apache/linkis/engineplugin/presto/builder/TestPrestoProcessEngineConnLaunchBuilder.java
@@ -15,20 +15,21 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.presto.builder
+package org.apache.linkis.engineplugin.presto.builder;
-import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel
+import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel;
-import org.junit.jupiter.api.{Assertions, Test}
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
-class TestPrestoProcessEngineConnLaunchBuilder {
+public class TestPrestoProcessEngineConnLaunchBuilder {
@Test
- def testGetEngineStartUser {
- val engineConnLaunchBuilder = new PrestoProcessEngineConnLaunchBuilder
- val userCreatorLabel = new UserCreatorLabel
- val value = engineConnLaunchBuilder.getEngineStartUser(userCreatorLabel)
- Assertions.assertEquals(value, "hadoop")
+ public void testGetEngineStartUser() {
+ PrestoProcessEngineConnLaunchBuilder engineConnLaunchBuilder =
+ new PrestoProcessEngineConnLaunchBuilder();
+ UserCreatorLabel userCreatorLabel = new UserCreatorLabel();
+ String value =
engineConnLaunchBuilder.getEngineStartUser(userCreatorLabel);
+ Assertions.assertEquals(value, "hadoop");
}
-
}
diff --git
a/linkis-engineconn-plugins/presto/src/test/scala/org/apache/linkis/engineplugin/presto/conf/TestPrestoConfiguration.scala
b/linkis-engineconn-plugins/presto/src/test/java/org/apache/linkis/engineplugin/presto/conf/TestPrestoConfiguration.java
similarity index 76%
rename from
linkis-engineconn-plugins/presto/src/test/scala/org/apache/linkis/engineplugin/presto/conf/TestPrestoConfiguration.scala
rename to
linkis-engineconn-plugins/presto/src/test/java/org/apache/linkis/engineplugin/presto/conf/TestPrestoConfiguration.java
index 9a07311f2..94b73e433 100644
---
a/linkis-engineconn-plugins/presto/src/test/scala/org/apache/linkis/engineplugin/presto/conf/TestPrestoConfiguration.scala
+++
b/linkis-engineconn-plugins/presto/src/test/java/org/apache/linkis/engineplugin/presto/conf/TestPrestoConfiguration.java
@@ -15,28 +15,26 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.presto.conf
+package org.apache.linkis.engineplugin.presto.conf;
-import org.apache.linkis.common.conf.TimeType
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.{Assertions, Test}
-
-class TestPrestoConfiguration {
+public class TestPrestoConfiguration {
@Test
- def testConfig: Unit = {
- Assertions.assertEquals(100,
PrestoConfiguration.ENGINE_CONCURRENT_LIMIT.getValue)
- Assertions.assertEquals(60,
PrestoConfiguration.PRESTO_HTTP_CONNECT_TIME_OUT.getValue)
- Assertions.assertEquals(60,
PrestoConfiguration.PRESTO_HTTP_READ_TIME_OUT.getValue)
- Assertions.assertEquals(5000,
PrestoConfiguration.ENGINE_DEFAULT_LIMIT.getValue)
- Assertions.assertEquals("http://127.0.0.1:8080",
PrestoConfiguration.PRESTO_URL.getValue)
- Assertions.assertEquals("",
PrestoConfiguration.PRESTO_RESOURCE_CONFIG_PATH.getValue)
- Assertions.assertEquals("default",
PrestoConfiguration.PRESTO_USER_NAME.getValue)
- Assertions.assertEquals("", PrestoConfiguration.PRESTO_PASSWORD.getValue)
- Assertions.assertEquals("system",
PrestoConfiguration.PRESTO_CATALOG.getValue)
- Assertions.assertEquals("", PrestoConfiguration.PRESTO_SCHEMA.getValue)
- Assertions.assertEquals("global",
PrestoConfiguration.PRESTO_SOURCE.getValue)
- Assertions.assertEquals("8GB",
PrestoConfiguration.PRESTO_REQUEST_MEMORY.getValue)
+ public void testConfig() {
+ Assertions.assertEquals(100,
PrestoConfiguration.ENGINE_CONCURRENT_LIMIT.getValue());
+ Assertions.assertEquals(60,
PrestoConfiguration.PRESTO_HTTP_CONNECT_TIME_OUT.getValue());
+ Assertions.assertEquals(60,
PrestoConfiguration.PRESTO_HTTP_READ_TIME_OUT.getValue());
+ Assertions.assertEquals(5000,
PrestoConfiguration.ENGINE_DEFAULT_LIMIT.getValue());
+ Assertions.assertEquals("http://127.0.0.1:8080",
PrestoConfiguration.PRESTO_URL.getValue());
+ Assertions.assertEquals("",
PrestoConfiguration.PRESTO_RESOURCE_CONFIG_PATH.getValue());
+ Assertions.assertEquals("default",
PrestoConfiguration.PRESTO_USER_NAME.getValue());
+ Assertions.assertEquals("",
PrestoConfiguration.PRESTO_PASSWORD.getValue());
+ Assertions.assertEquals("system",
PrestoConfiguration.PRESTO_CATALOG.getValue());
+ Assertions.assertEquals("", PrestoConfiguration.PRESTO_SCHEMA.getValue());
+ Assertions.assertEquals("global",
PrestoConfiguration.PRESTO_SOURCE.getValue());
+ Assertions.assertEquals("8GB",
PrestoConfiguration.PRESTO_REQUEST_MEMORY.getValue());
}
-
}
diff --git
a/linkis-engineconn-plugins/presto/src/test/scala/org/apache/linkis/engineplugin/presto/exception/TestPrestoException.scala
b/linkis-engineconn-plugins/presto/src/test/java/org/apache/linkis/engineplugin/presto/exception/TestPrestoException.java
similarity index 51%
rename from
linkis-engineconn-plugins/presto/src/test/scala/org/apache/linkis/engineplugin/presto/exception/TestPrestoException.scala
rename to
linkis-engineconn-plugins/presto/src/test/java/org/apache/linkis/engineplugin/presto/exception/TestPrestoException.java
index a7ea5ea7c..309b24ac3 100644
---
a/linkis-engineconn-plugins/presto/src/test/scala/org/apache/linkis/engineplugin/presto/exception/TestPrestoException.scala
+++
b/linkis-engineconn-plugins/presto/src/test/java/org/apache/linkis/engineplugin/presto/exception/TestPrestoException.java
@@ -15,22 +15,28 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.presto.exception
+package org.apache.linkis.engineplugin.presto.exception;
-import org.junit.jupiter.api.{Assertions, Test}
+import org.apache.linkis.engineplugin.presto.errorcode.PrestoErrorCodeSummary;
-class TestPrestoException {
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestPrestoException {
@Test
- def testPrestoStateInvalidException: Unit = {
- val exception = PrestoStateInvalidException
- Assertions.assertNotNull(exception)
+ public void testPrestoStateInvalidException() {
+ Assertions.assertNotNull(
+ new PrestoStateInvalidException(
+ PrestoErrorCodeSummary.PRESTO_STATE_INVALID.getErrorCode(),
+ PrestoErrorCodeSummary.PRESTO_STATE_INVALID.getErrorDesc()));
}
@Test
- def testPythonSessionStartFailedException: Unit = {
- val exception = PrestoClientException
- Assertions.assertNotNull(exception)
+ public void testPythonSessionStartFailedException() {
+ Assertions.assertNotNull(
+ new PrestoClientException(
+ PrestoErrorCodeSummary.PRESTO_CLIENT_ERROR.getErrorCode(),
+ PrestoErrorCodeSummary.PRESTO_CLIENT_ERROR.getErrorDesc()));
}
-
}
diff --git
a/linkis-engineconn-plugins/presto/src/test/java/org/apache/linkis/engineplugin/presto/factory/TestPrestoEngineConnFactory.java
b/linkis-engineconn-plugins/presto/src/test/java/org/apache/linkis/engineplugin/presto/factory/TestPrestoEngineConnFactory.java
new file mode 100644
index 000000000..dda7c718a
--- /dev/null
+++
b/linkis-engineconn-plugins/presto/src/test/java/org/apache/linkis/engineplugin/presto/factory/TestPrestoEngineConnFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.presto.factory;
+
+import
org.apache.linkis.engineconn.common.creation.DefaultEngineCreationContext;
+import org.apache.linkis.engineconn.common.creation.EngineCreationContext;
+import org.apache.linkis.engineconn.common.engineconn.EngineConn;
+
+import java.util.HashMap;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestPrestoEngineConnFactory {
+
+ @Test
+ public void testNewExecutor() {
+ System.setProperty("prestoVersion", "presto");
+ PrestoEngineConnFactory engineConnFactory = new PrestoEngineConnFactory();
+ EngineCreationContext engineCreationContext = new
DefaultEngineCreationContext();
+ HashMap<String, String> jMap = new HashMap<>();
+ jMap.put("presto.version", "presto");
+ engineCreationContext.setOptions(jMap);
+ EngineConn engineConn =
engineConnFactory.createEngineConn(engineCreationContext);
+ Object executor = engineConnFactory.newExecutor(1, engineCreationContext,
engineConn);
+ Assertions.assertNotNull(executor);
+ }
+}
diff --git
a/linkis-engineconn-plugins/presto/src/test/scala/org/apache/linkis/engineplugin/presto/TestPrestoEngineConnPlugin.scala
b/linkis-engineconn-plugins/presto/src/test/java/org/apache/linkis/engineplugin/presto/utils/TestPrestoSQLHook.java
similarity index 71%
rename from
linkis-engineconn-plugins/presto/src/test/scala/org/apache/linkis/engineplugin/presto/TestPrestoEngineConnPlugin.scala
rename to
linkis-engineconn-plugins/presto/src/test/java/org/apache/linkis/engineplugin/presto/utils/TestPrestoSQLHook.java
index 663f91126..c9e370e9a 100644
---
a/linkis-engineconn-plugins/presto/src/test/scala/org/apache/linkis/engineplugin/presto/TestPrestoEngineConnPlugin.scala
+++
b/linkis-engineconn-plugins/presto/src/test/java/org/apache/linkis/engineplugin/presto/utils/TestPrestoSQLHook.java
@@ -15,16 +15,17 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.presto
+package org.apache.linkis.engineplugin.presto.utils;
-import org.junit.jupiter.api.{Assertions, Test}
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
-class TestPrestoEngineConnPlugin {
+public class TestPrestoSQLHook {
@Test
- def testGetEngineResourceFactory: Unit = {
- val prestoEngineConnPlugin = new PrestoEngineConnPlugin
- Assertions.assertNotNull(prestoEngineConnPlugin.getEngineConnFactory)
+ public void testPreExecuteHook() {
+ String code = "`1104`";
+ String codes = PrestoSQLHook.preExecuteHook(code);
+ Assertions.assertEquals(codes, "\"1104\"");
}
-
}
diff --git
a/linkis-engineconn-plugins/presto/src/test/scala/org/apache/linkis/engineplugin/presto/factory/TestPrestoEngineConnFactory.scala
b/linkis-engineconn-plugins/presto/src/test/scala/org/apache/linkis/engineplugin/presto/factory/TestPrestoEngineConnFactory.scala
deleted file mode 100644
index bd9963b31..000000000
---
a/linkis-engineconn-plugins/presto/src/test/scala/org/apache/linkis/engineplugin/presto/factory/TestPrestoEngineConnFactory.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.engineplugin.presto.factory
-
-import org.apache.linkis.engineconn.common.creation.{
- DefaultEngineCreationContext,
- EngineCreationContext
-}
-
-import org.junit.jupiter.api.{Assertions, Test}
-
-class TestPrestoEngineConnFactory {
-
- @Test
- def testNewExecutor {
- System.setProperty("prestoVersion", "presto")
- val engineConnFactory: PrestoEngineConnFactory = new
PrestoEngineConnFactory
- val engineCreationContext: EngineCreationContext = new
DefaultEngineCreationContext
- val jMap = new java.util.HashMap[String, String]()
- jMap.put("presto.version", "presto")
- engineCreationContext.setOptions(jMap)
- val engineConn = engineConnFactory.createEngineConn(engineCreationContext)
- val executor = engineConnFactory.newExecutor(1, engineCreationContext,
engineConn)
- Assertions.assertNotNull(executor)
- }
-
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]