This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/master by this push:
new 4ecb22e37 [Feature] Add nebula engine to linkis (#4903)
4ecb22e37 is described below
commit 4ecb22e373d7407226acc7ac7c63d698f11b394f
Author: ChengJie1053 <[email protected]>
AuthorDate: Wed Sep 20 20:32:46 2023 +0800
[Feature] Add nebula engine to linkis (#4903)
* Add nebula engine to linkis
* Reuse nebula session
* Code optimization and remove wds prefix
---
.../linkis/ujes/jdbc/LinkisSQLConnection.scala | 1 +
.../linkis/manager/am/conf/AMConfiguration.java | 7 +-
.../manager/label/conf/LabelCommonConfig.java | 3 +
.../manager/label/entity/engine/EngineType.scala | 3 +
.../manager/label/entity/engine/RunType.scala | 1 +
.../label/utils/EngineTypeLabelCreator.java | 2 +
linkis-engineconn-plugins/nebula/pom.xml | 110 ++++++
.../nebula/src/main/assembly/distribution.xml | 71 ++++
.../nebula/NebulaEngineConnPlugin.java | 72 ++++
.../NebulaProcessEngineConnLaunchBuilder.java | 22 ++
.../nebula/conf/NebulaConfiguration.java | 50 +++
.../engineplugin/nebula/conf/NebulaEngineConf.java | 53 +++
.../nebula/errorcode/NebulaErrorCodeSummary.java | 47 +++
.../nebula/exception/NebulaClientException.java | 27 ++
.../nebula/exception/NebulaExecuteError.java | 27 ++
.../exception/NebulaStateInvalidException.java | 27 ++
.../nebula/executor/NebulaEngineConnExecutor.java | 388 +++++++++++++++++++++
.../main/resources/linkis-engineconn.properties | 23 ++
.../nebula/src/main/resources/log4j2.xml | 91 +++++
.../nebula/factory/NebulaEngineConnFactory.scala | 44 +++
pom.xml | 1 +
21 files changed, 1067 insertions(+), 3 deletions(-)
diff --git
a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala
b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala
index b80069876..e111615ce 100644
---
a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala
+++
b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala
@@ -431,6 +431,7 @@ class LinkisSQLConnection(private[jdbc] val ujesClient:
UJESClient, props: Prope
case EngineType.HIVE => RunType.HIVE
case EngineType.TRINO => RunType.TRINO_SQL
case EngineType.PRESTO => RunType.PRESTO_SQL
+ case EngineType.NEBULA => RunType.NEBULA_SQL
case EngineType.ELASTICSEARCH => RunType.ES_SQL
case EngineType.JDBC => RunType.JDBC
case EngineType.PYTHON => RunType.SHELL
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java
index d916387d2..8aba14267 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java
@@ -68,7 +68,8 @@ public class AMConfiguration {
public static final CommonVars<String> MULTI_USER_ENGINE_TYPES =
CommonVars.apply(
- "wds.linkis.multi.user.engine.types",
"jdbc,es,presto,io_file,appconn,openlookeng,trino");
+ "wds.linkis.multi.user.engine.types",
+ "jdbc,es,presto,io_file,appconn,openlookeng,trino,nebula");
public static final CommonVars<String> ALLOW_BATCH_KILL_ENGINE_TYPES =
CommonVars.apply("wds.linkis.allow.batch.kill.engine.types",
"spark,hive,python");
@@ -104,8 +105,8 @@ public class AMConfiguration {
public static String getDefaultMultiEngineUser() {
String jvmUser = Utils.getJvmUser();
return String.format(
- "{jdbc:\"%s\", es: \"%s\", presto:\"%s\", appconn:\"%s\",
openlookeng:\"%s\", trino:\"%s\", io_file:\"root\"}",
- jvmUser, jvmUser, jvmUser, jvmUser, jvmUser, jvmUser);
+ "{jdbc:\"%s\", es: \"%s\", presto:\"%s\", appconn:\"%s\",
openlookeng:\"%s\", trino:\"%s\", nebula:\"%s\",io_file:\"root\"}",
+ jvmUser, jvmUser, jvmUser, jvmUser, jvmUser, jvmUser, jvmUser);
}
public static boolean isMultiUserEngine(String engineType) {
diff --git
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/conf/LabelCommonConfig.java
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/conf/LabelCommonConfig.java
index d0854186a..f4b52a156 100644
---
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/conf/LabelCommonConfig.java
+++
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/conf/LabelCommonConfig.java
@@ -69,6 +69,9 @@ public class LabelCommonConfig {
public static final CommonVars<String> DATAX_ENGINE_VERSION =
CommonVars.apply("wds.linkis.datax.engine.version", "3.0.0");
+ public static final CommonVars<String> NEBULA_ENGINE_VERSION =
+ CommonVars.apply("wds.linkis.nebula.engine.version", "3.0.0");
+
public static final CommonVars<String> PRESTO_ENGINE_VERSION =
CommonVars.apply("wds.linkis.presto.engine.version", "0.234");
diff --git
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineType.scala
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineType.scala
index d47bb8ec3..77e7204a7 100644
---
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineType.scala
+++
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineType.scala
@@ -45,6 +45,8 @@ object EngineType extends Enumeration with Logging {
val PRESTO = Value("presto")
+ val NEBULA = Value("nebula")
+
val FLINK = Value("flink")
val APPCONN = Value("appconn")
@@ -89,6 +91,7 @@ object EngineType extends Enumeration with Logging {
case _ if IO_ENGINE_HDFS.toString.equalsIgnoreCase(str) => IO_ENGINE_HDFS
case _ if PIPELINE.toString.equalsIgnoreCase(str) => PIPELINE
case _ if PRESTO.toString.equalsIgnoreCase(str) => PRESTO
+ case _ if NEBULA.toString.equalsIgnoreCase(str) => NEBULA
case _ if FLINK.toString.equalsIgnoreCase(str) => FLINK
case _ if APPCONN.toString.equals(str) => APPCONN
case _ if SQOOP.toString.equalsIgnoreCase(str) => SQOOP
diff --git
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
index 21a067ed4..abb3e010f 100644
---
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
+++
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
@@ -35,6 +35,7 @@ object RunType extends Enumeration {
val PIPELINE = Value("pipeline")
val JDBC = Value("jdbc")
val PRESTO_SQL = Value("psql")
+ val NEBULA_SQL = Value("ngql")
val JAR = Value("jar")
val APPCONN = Value("appconn")
val FUNCTION_MDQ_TYPE = Value("function.mdq")
diff --git
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/utils/EngineTypeLabelCreator.java
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/utils/EngineTypeLabelCreator.java
index 0d6ae3c5c..e90f282aa 100644
---
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/utils/EngineTypeLabelCreator.java
+++
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/utils/EngineTypeLabelCreator.java
@@ -69,6 +69,8 @@ public class EngineTypeLabelCreator {
EngineType.FLINK().toString(),
LabelCommonConfig.FLINK_ENGINE_VERSION.getValue());
defaultVersion.put(
EngineType.PRESTO().toString(),
LabelCommonConfig.PRESTO_ENGINE_VERSION.getValue());
+ defaultVersion.put(
+ EngineType.NEBULA().toString(),
LabelCommonConfig.NEBULA_ENGINE_VERSION.getValue());
defaultVersion.put(
EngineType.SQOOP().toString(),
LabelCommonConfig.SQOOP_ENGINE_VERSION.getValue());
defaultVersion.put(
diff --git a/linkis-engineconn-plugins/nebula/pom.xml
b/linkis-engineconn-plugins/nebula/pom.xml
new file mode 100644
index 000000000..bfe971456
--- /dev/null
+++ b/linkis-engineconn-plugins/nebula/pom.xml
@@ -0,0 +1,110 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.linkis</groupId>
+ <artifactId>linkis</artifactId>
+ <version>${revision}</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>linkis-engineplugin-nebula</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.linkis</groupId>
+ <artifactId>linkis-engineconn-plugin-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.linkis</groupId>
+ <artifactId>linkis-computation-engineconn</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.linkis</groupId>
+ <artifactId>linkis-storage</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.linkis</groupId>
+ <artifactId>linkis-rpc</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.linkis</groupId>
+ <artifactId>linkis-common</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- nebula -->
+ <dependency>
+ <groupId>com.vesoft</groupId>
+ <artifactId>client</artifactId>
+ <version>${nebula.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+
+ <plugins>
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <inherited>false</inherited>
+ <configuration>
+ <skipAssembly>false</skipAssembly>
+ <finalName>out</finalName>
+ <appendAssemblyId>false</appendAssemblyId>
+ <attach>false</attach>
+ <descriptors>
+ <descriptor>src/main/assembly/distribution.xml</descriptor>
+ </descriptors>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <descriptors>
+ <descriptor>src/main/assembly/distribution.xml</descriptor>
+ </descriptors>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git
a/linkis-engineconn-plugins/nebula/src/main/assembly/distribution.xml
b/linkis-engineconn-plugins/nebula/src/main/assembly/distribution.xml
new file mode 100644
index 000000000..eaa9c296f
--- /dev/null
+++ b/linkis-engineconn-plugins/nebula/src/main/assembly/distribution.xml
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.1.1"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.1.1
https://maven.apache.org/xsd/assembly-2.1.1.xsd">
+ <id>linkis-engineplugin-nebula</id>
+ <formats>
+ <format>dir</format>
+ <format>zip</format>
+ </formats>
+ <includeBaseDirectory>true</includeBaseDirectory>
+ <baseDirectory>nebula</baseDirectory>
+
+ <dependencySets>
+ <dependencySet>
+ <!-- Enable access to all projects in the current multimodule
build! <useAllReactorProjects>true</useAllReactorProjects> -->
+ <!-- Now, select which projects to include in this module-set. -->
+ <outputDirectory>/dist/${nebula.version}/lib</outputDirectory>
+ <useProjectArtifact>true</useProjectArtifact>
+ <useTransitiveDependencies>true</useTransitiveDependencies>
+ <unpack>false</unpack>
+ <useStrictFiltering>false</useStrictFiltering>
+ <useTransitiveFiltering>true</useTransitiveFiltering>
+
+ </dependencySet>
+ </dependencySets>
+
+ <fileSets>
+
+ <fileSet>
+ <directory>${basedir}/src/main/resources</directory>
+ <includes>
+ <include>linkis-engineconn.properties</include>
+ <include>log4j2.xml</include>
+ </includes>
+ <fileMode>0777</fileMode>
+ <outputDirectory>dist/${nebula.version}/conf</outputDirectory>
+ <lineEnding>unix</lineEnding>
+ </fileSet>
+
+ <fileSet>
+ <directory>${basedir}/target</directory>
+ <includes>
+ <include>*.jar</include>
+ </includes>
+ <excludes>
+ <exclude>*doc.jar</exclude>
+ </excludes>
+ <fileMode>0777</fileMode>
+ <outputDirectory>plugin/${nebula.version}</outputDirectory>
+ </fileSet>
+
+ </fileSets>
+
+</assembly>
+
diff --git
a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/NebulaEngineConnPlugin.java
b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/NebulaEngineConnPlugin.java
new file mode 100644
index 000000000..a22d2c8a8
--- /dev/null
+++
b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/NebulaEngineConnPlugin.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.nebula;
+
+import
org.apache.linkis.engineplugin.nebula.builder.NebulaProcessEngineConnLaunchBuilder;
+import org.apache.linkis.engineplugin.nebula.factory.NebulaEngineConnFactory;
+import org.apache.linkis.manager.engineplugin.common.EngineConnPlugin;
+import
org.apache.linkis.manager.engineplugin.common.creation.EngineConnFactory;
+import
org.apache.linkis.manager.engineplugin.common.launch.EngineConnLaunchBuilder;
+import
org.apache.linkis.manager.engineplugin.common.resource.EngineResourceFactory;
+import
org.apache.linkis.manager.engineplugin.common.resource.GenericEngineResourceFactory;
+import org.apache.linkis.manager.label.entity.Label;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class NebulaEngineConnPlugin implements EngineConnPlugin {
+ private Object resourceLocker = new Object();
+ private Object engineFactoryLocker = new Object();
+ private volatile EngineResourceFactory engineResourceFactory;
+ private volatile EngineConnFactory engineFactory;
+ private List<Label<?>> defaultLabels = new ArrayList<>();
+
+ @Override
+ public void init(Map<String, Object> params) {}
+
+ @Override
+ public EngineResourceFactory getEngineResourceFactory() {
+ if (null == engineResourceFactory) {
+ synchronized (resourceLocker) {
+ engineResourceFactory = new GenericEngineResourceFactory();
+ }
+ }
+ return engineResourceFactory;
+ }
+
+ @Override
+ public EngineConnLaunchBuilder getEngineConnLaunchBuilder() {
+ return new NebulaProcessEngineConnLaunchBuilder();
+ }
+
+ @Override
+ public EngineConnFactory getEngineConnFactory() {
+ if (null == engineFactory) {
+ synchronized (engineFactoryLocker) {
+ engineFactory = new NebulaEngineConnFactory();
+ }
+ }
+ return engineFactory;
+ }
+
+ @Override
+ public List<Label<?>> getDefaultLabels() {
+ return defaultLabels;
+ }
+}
diff --git
a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/builder/NebulaProcessEngineConnLaunchBuilder.java
b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/builder/NebulaProcessEngineConnLaunchBuilder.java
new file mode 100644
index 000000000..fb95910cf
--- /dev/null
+++
b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/builder/NebulaProcessEngineConnLaunchBuilder.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.nebula.builder;
+
+import
org.apache.linkis.manager.engineplugin.common.launch.process.JavaProcessEngineConnLaunchBuilder;
+
+public class NebulaProcessEngineConnLaunchBuilder extends
JavaProcessEngineConnLaunchBuilder {}
diff --git
a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaConfiguration.java
b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaConfiguration.java
new file mode 100644
index 000000000..dfbb7a8b1
--- /dev/null
+++
b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaConfiguration.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.nebula.conf;
+
+import org.apache.linkis.common.conf.CommonVars;
+
+public class NebulaConfiguration {
+
+ public static final CommonVars<Integer> ENGINE_CONCURRENT_LIMIT =
+ CommonVars.apply("linkis.engineconn.concurrent.limit", 100);
+
+ public static final CommonVars<Integer> ENGINE_DEFAULT_LIMIT =
+ CommonVars.apply("linkis.nebula.default.limit", 5000);
+
+ public static final CommonVars<String> NEBULA_HOST =
+ CommonVars.apply("linkis.nebula.host", "127.0.0.1");
+
+ public static final CommonVars<Integer> NEBULA_PORT =
+ CommonVars.apply("linkis.nebula.port", 9669);
+
+ public static final CommonVars<Integer> NEBULA_MAX_CONN_SIZE =
+ CommonVars.apply("linkis.nebula.max.conn.size", 100);
+
+ public static final CommonVars<String> NEBULA_USER_NAME =
+ CommonVars.apply("linkis.nebula.username", "root");
+
+ public static final CommonVars<String> NEBULA_PASSWORD =
+ CommonVars.apply("linkis.nebula.password", "nebula");
+
+ public static final CommonVars<Boolean> NEBULA_RECONNECT_ENABLED =
+ CommonVars.apply(
+ "linkis.nebula.reconnect.enabled",
+ false,
+ "whether to retry after the connection is disconnected");
+}
diff --git
a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaEngineConf.java
b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaEngineConf.java
new file mode 100644
index 000000000..92cc32ca0
--- /dev/null
+++
b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaEngineConf.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.nebula.conf;
+
+import org.apache.linkis.common.conf.Configuration;
+import
org.apache.linkis.governance.common.protocol.conf.RequestQueryEngineConfigWithGlobalConfig;
+import org.apache.linkis.governance.common.protocol.conf.ResponseQueryConfig;
+import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel;
+import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel;
+import org.apache.linkis.protocol.CacheableProtocol;
+import org.apache.linkis.rpc.RPCMapCache;
+
+import java.util.Map;
+
+import scala.Tuple2;
+
+public class NebulaEngineConf
+ extends RPCMapCache<Tuple2<UserCreatorLabel, EngineTypeLabel>, String,
String> {
+
+ public NebulaEngineConf() {
+
super(Configuration.CLOUD_CONSOLE_CONFIGURATION_SPRING_APPLICATION_NAME().getValue());
+ }
+
+ @Override
+ public CacheableProtocol createRequest(Tuple2<UserCreatorLabel,
EngineTypeLabel> labelTuple) {
+ return new RequestQueryEngineConfigWithGlobalConfig(labelTuple._1(),
labelTuple._2(), null);
+ }
+
+ @Override
+ public Map<String, String> createMap(Object obj) {
+ if (obj instanceof ResponseQueryConfig) {
+ ResponseQueryConfig response = (ResponseQueryConfig) obj;
+ return response.getKeyAndValue();
+ } else {
+ return null;
+ }
+ }
+}
diff --git
a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/errorcode/NebulaErrorCodeSummary.java
b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/errorcode/NebulaErrorCodeSummary.java
new file mode 100644
index 000000000..80aa2e197
--- /dev/null
+++
b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/errorcode/NebulaErrorCodeSummary.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.nebula.errorcode;
+
+import org.apache.linkis.common.errorcode.ErrorCodeUtils;
+import org.apache.linkis.common.errorcode.LinkisErrorCode;
+
+public enum NebulaErrorCodeSummary implements LinkisErrorCode {
+ NEBULA_CLIENT_INITIALIZATION_FAILED(28001, "Nebula client initialization
failed(Nebula客户端初始化失败)"),
+ NEBULA_EXECUTOR_ERROR(28002, "Nebula executor error(Nebula执行异常)"),
+ NEBULA_CLIENT_ERROR(28003, "Nebula client error(Nebula客户端异常)");
+
+ private final int errorCode;
+
+ private final String errorDesc;
+
+ NebulaErrorCodeSummary(int errorCode, String errorDesc) {
+ ErrorCodeUtils.validateErrorCode(errorCode, 26000, 29999);
+ this.errorCode = errorCode;
+ this.errorDesc = errorDesc;
+ }
+
+ @Override
+ public int getErrorCode() {
+ return errorCode;
+ }
+
+ @Override
+ public String getErrorDesc() {
+ return errorDesc;
+ }
+}
diff --git
a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaClientException.java
b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaClientException.java
new file mode 100644
index 000000000..59b3620b0
--- /dev/null
+++
b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaClientException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.nebula.exception;
+
+import org.apache.linkis.common.exception.ErrorException;
+
+public class NebulaClientException extends ErrorException {
+
+ public NebulaClientException(int errorCode, String message) {
+ super(errorCode, message);
+ }
+}
diff --git
a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaExecuteError.java
b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaExecuteError.java
new file mode 100644
index 000000000..f2c164d5a
--- /dev/null
+++
b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaExecuteError.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.nebula.exception;
+
+import org.apache.linkis.common.exception.ErrorException;
+
+public class NebulaExecuteError extends ErrorException {
+
+ public NebulaExecuteError(int errorCode, String message) {
+ super(errorCode, message);
+ }
+}
diff --git
a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaStateInvalidException.java
b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaStateInvalidException.java
new file mode 100644
index 000000000..202d478b7
--- /dev/null
+++
b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaStateInvalidException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.nebula.exception;
+
+import org.apache.linkis.common.exception.ErrorException;
+
+public class NebulaStateInvalidException extends ErrorException {
+
+ public NebulaStateInvalidException(int errorCode, String message) {
+ super(errorCode, message);
+ }
+}
diff --git
a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/executor/NebulaEngineConnExecutor.java
b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/executor/NebulaEngineConnExecutor.java
new file mode 100644
index 000000000..188ea60ec
--- /dev/null
+++
b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/executor/NebulaEngineConnExecutor.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.nebula.executor;
+
+import org.apache.linkis.common.exception.ErrorException;
+import org.apache.linkis.common.io.resultset.ResultSetWriter;
+import org.apache.linkis.common.log.LogUtils;
+import org.apache.linkis.common.utils.OverloadUtils;
+import org.apache.linkis.engineconn.common.conf.EngineConnConf;
+import org.apache.linkis.engineconn.common.conf.EngineConnConstant;
+import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask;
+import
org.apache.linkis.engineconn.computation.executor.execute.ConcurrentComputationExecutor;
+import
org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext;
+import org.apache.linkis.engineconn.core.EngineConnObject;
+import org.apache.linkis.engineplugin.nebula.conf.NebulaConfiguration;
+import org.apache.linkis.engineplugin.nebula.conf.NebulaEngineConf;
+import org.apache.linkis.engineplugin.nebula.errorcode.NebulaErrorCodeSummary;
+import org.apache.linkis.engineplugin.nebula.exception.NebulaClientException;
+import org.apache.linkis.engineplugin.nebula.exception.NebulaExecuteError;
+import org.apache.linkis.governance.common.paser.SQLCodeParser;
+import org.apache.linkis.manager.common.entity.resource.CommonNodeResource;
+import org.apache.linkis.manager.common.entity.resource.LoadResource;
+import org.apache.linkis.manager.common.entity.resource.NodeResource;
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils;
+import org.apache.linkis.manager.label.entity.Label;
+import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel;
+import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel;
+import org.apache.linkis.protocol.engine.JobProgressInfo;
+import org.apache.linkis.rpc.Sender;
+import org.apache.linkis.scheduler.executer.ErrorExecuteResponse;
+import org.apache.linkis.scheduler.executer.ExecuteResponse;
+import org.apache.linkis.scheduler.executer.SuccessExecuteResponse;
+import org.apache.linkis.storage.domain.Column;
+import org.apache.linkis.storage.domain.DataType;
+import org.apache.linkis.storage.resultset.ResultSetFactory;
+import org.apache.linkis.storage.resultset.table.TableMetaData;
+import org.apache.linkis.storage.resultset.table.TableRecord;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import org.springframework.util.CollectionUtils;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import scala.Tuple2;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.vesoft.nebula.ErrorCode;
+import com.vesoft.nebula.client.graph.NebulaPoolConfig;
+import com.vesoft.nebula.client.graph.data.HostAddress;
+import com.vesoft.nebula.client.graph.data.ResultSet;
+import com.vesoft.nebula.client.graph.net.NebulaPool;
+import com.vesoft.nebula.client.graph.net.Session;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NebulaEngineConnExecutor extends ConcurrentComputationExecutor {
+
+ private static final Logger logger =
LoggerFactory.getLogger(NebulaEngineConnExecutor.class);
+ private int id;
+ private List<Label<?>> executorLabels = new ArrayList<>(2);
+ private Map<String, Session> sessionCache = new ConcurrentHashMap<>();
+
+ private Map<String, String> configMap = new HashMap<>();
+
+ private Cache<String, NebulaPool> nebulaPoolCache =
+ CacheBuilder.newBuilder()
+ .expireAfterAccess(
+
Long.valueOf(EngineConnConf.ENGINE_TASK_EXPIRE_TIME().getValue().toString()),
+ TimeUnit.MILLISECONDS)
+ .maximumSize(EngineConnConstant.MAX_TASK_NUM())
+ .build();
+
+ public NebulaEngineConnExecutor(int outputPrintLimit, int id) {
+ super(outputPrintLimit);
+ this.id = id;
+ }
+
+ @Override
+ public void init() {
+ setCodeParser(new SQLCodeParser());
+ super.init();
+ }
+
+ @Override
+ public ExecuteResponse execute(EngineConnTask engineConnTask) {
+ Optional<Label<?>> userCreatorLabelOp =
+ Arrays.stream(engineConnTask.getLables())
+ .filter(label -> label instanceof UserCreatorLabel)
+ .findFirst();
+ Optional<Label<?>> engineTypeLabelOp =
+ Arrays.stream(engineConnTask.getLables())
+ .filter(label -> label instanceof EngineTypeLabel)
+ .findFirst();
+
+ Map<String, String> configMap = null;
+ if (userCreatorLabelOp.isPresent() && engineTypeLabelOp.isPresent()) {
+ UserCreatorLabel userCreatorLabel = (UserCreatorLabel)
userCreatorLabelOp.get();
+ EngineTypeLabel engineTypeLabel = (EngineTypeLabel)
engineTypeLabelOp.get();
+
+ configMap =
+ new NebulaEngineConf().getCacheMap(new Tuple2<>(userCreatorLabel,
engineTypeLabel));
+ }
+
+ nebulaPoolCache.put(
+ engineConnTask.getTaskId(),
getNebulaPool(engineConnTask.getProperties(), configMap));
+ return super.execute(engineConnTask);
+ }
+
+ @Override
+ public ExecuteResponse executeLine(EngineExecutionContext
engineExecutorContext, String code) {
+ String realCode;
+ if (StringUtils.isBlank(code)) {
+ realCode = "SHOW SPACES";
+ } else {
+ realCode = code.trim();
+ }
+ logger.info("Nebula client begins to run ngql code:\n {}", realCode);
+
+ String taskId = engineExecutorContext.getJobId().get();
+ NebulaPool nebulaPool = nebulaPoolCache.getIfPresent(taskId);
+ Session session = getSession(taskId, nebulaPool);
+
+ initialStatusUpdates(taskId, engineExecutorContext, session);
+ ResultSet resultSet = null;
+
+ try {
+ resultSet = session.execute(code);
+ } catch (Exception e) {
+ logger.error("Nebula executor error.");
+ throw new NebulaExecuteError(
+ NebulaErrorCodeSummary.NEBULA_EXECUTOR_ERROR.getErrorCode(),
+ NebulaErrorCodeSummary.NEBULA_EXECUTOR_ERROR.getErrorDesc());
+ }
+
+ if (resultSet.isSucceeded() && !resultSet.isEmpty()) {
+ queryOutput(taskId, engineExecutorContext, resultSet);
+ }
+ ErrorExecuteResponse errorResponse = null;
+ try {
+ errorResponse = verifyServerError(taskId, engineExecutorContext,
resultSet);
+ } catch (ErrorException e) {
+ logger.error("Nebula execute failed (#{}): {}", e.getErrCode(),
e.getMessage());
+ }
+ if (errorResponse == null) {
+ return new SuccessExecuteResponse();
+ } else {
+ return errorResponse;
+ }
+ }
+
+ @Override
+ public ExecuteResponse executeCompletely(
+ EngineExecutionContext engineExecutorContext, String code, String
completedLine) {
+ return null;
+ }
+
+ @Override
+ public float progress(String taskID) {
+ return 0.0f;
+ }
+
+ @Override
+ public JobProgressInfo[] getProgressInfo(String taskID) {
+ return new JobProgressInfo[0];
+ }
+
+ @Override
+ public void killTask(String taskId) {
+ Session session = sessionCache.remove(taskId);
+ if (null != session) {
+ session.release();
+ }
+ super.killTask(taskId);
+ }
+
+ @Override
+ public List<Label<?>> getExecutorLabels() {
+ return executorLabels;
+ }
+
+ @Override
+ public void setExecutorLabels(List<Label<?>> labels) {
+ if (!CollectionUtils.isEmpty(labels)) {
+ executorLabels.clear();
+ executorLabels.addAll(labels);
+ }
+ }
+
+ @Override
+ public boolean supportCallBackLogs() {
+ return false;
+ }
+
+ @Override
+ public NodeResource requestExpectedResource(NodeResource expectedResource) {
+ return null;
+ }
+
+ @Override
+ public NodeResource getCurrentNodeResource() {
+ NodeResourceUtils.appendMemoryUnitIfMissing(
+ EngineConnObject.getEngineCreationContext().getOptions());
+
+ CommonNodeResource resource = new CommonNodeResource();
+ LoadResource usedResource = new
LoadResource(OverloadUtils.getProcessMaxMemory(), 1);
+ resource.setUsedResource(usedResource);
+ return resource;
+ }
+
+ @Override
+ public String getId() {
+ return Sender.getThisServiceInstance().getInstance() + "_" + id;
+ }
+
+ @Override
+ public int getConcurrentLimit() {
+ return NebulaConfiguration.ENGINE_CONCURRENT_LIMIT.getValue();
+ }
+
+ private NebulaPool getNebulaPool(Map<String, Object> taskParams, Map<String,
String> cacheMap) {
+ if (!CollectionUtils.isEmpty(cacheMap)) {
+ configMap.putAll(cacheMap);
+ }
+ taskParams.entrySet().stream()
+ .filter(entry -> entry.getValue() != null)
+ .forEach(entry -> configMap.put(entry.getKey(),
String.valueOf(entry.getValue())));
+
+ String host = NebulaConfiguration.NEBULA_HOST.getValue(configMap);
+ Integer port = NebulaConfiguration.NEBULA_PORT.getValue(configMap);
+ Integer maxConnSize =
NebulaConfiguration.NEBULA_MAX_CONN_SIZE.getValue(configMap);
+
+ NebulaPool nebulaPool = new NebulaPool();
+ Boolean initResult = false;
+ try {
+
+ NebulaPoolConfig nebulaPoolConfig = new NebulaPoolConfig();
+ nebulaPoolConfig.setMaxConnSize(maxConnSize);
+ List<HostAddress> addresses = Arrays.asList(new HostAddress(host, port));
+ initResult = nebulaPool.init(addresses, nebulaPoolConfig);
+ } catch (Exception e) {
+ logger.error("NebulaPool initialization failed.");
+ throw new NebulaClientException(
+
NebulaErrorCodeSummary.NEBULA_CLIENT_INITIALIZATION_FAILED.getErrorCode(),
+
NebulaErrorCodeSummary.NEBULA_CLIENT_INITIALIZATION_FAILED.getErrorDesc());
+ }
+ if (!initResult) {
+ logger.error("NebulaPool initialization failed.");
+ throw new NebulaClientException(
+
NebulaErrorCodeSummary.NEBULA_CLIENT_INITIALIZATION_FAILED.getErrorCode(),
+
NebulaErrorCodeSummary.NEBULA_CLIENT_INITIALIZATION_FAILED.getErrorDesc());
+ }
+ return nebulaPool;
+ }
+
+ private Session getSession(String taskId, NebulaPool nebulaPool) {
+ if (sessionCache.containsKey(taskId)
+ && sessionCache.get(taskId) != null
+ && sessionCache.get(taskId).ping()) {
+ return sessionCache.get(taskId);
+ } else {
+ Session session;
+ String username =
NebulaConfiguration.NEBULA_USER_NAME.getValue(configMap);
+ String password =
NebulaConfiguration.NEBULA_PASSWORD.getValue(configMap);
+ Boolean reconnect =
NebulaConfiguration.NEBULA_RECONNECT_ENABLED.getValue(configMap);
+
+ try {
+ session = nebulaPool.getSession(username, password, reconnect);
+ } catch (Exception e) {
+ logger.error("Nebula Session initialization failed.");
+ throw new NebulaClientException(
+
NebulaErrorCodeSummary.NEBULA_CLIENT_INITIALIZATION_FAILED.getErrorCode(),
+
NebulaErrorCodeSummary.NEBULA_CLIENT_INITIALIZATION_FAILED.getErrorDesc());
+ }
+
+ sessionCache.put(taskId, session);
+ return session;
+ }
+ }
+
+ private void initialStatusUpdates(
+ String taskId, EngineExecutionContext engineExecutorContext, Session
session) {
+ if (session.ping()) {
+ engineExecutorContext.pushProgress(progress(taskId),
getProgressInfo(taskId));
+ }
+ }
+
+ private void queryOutput(
+ String taskId, EngineExecutionContext engineExecutorContext, ResultSet
resultSet) {
+ int columnCount = 0;
+ ResultSetWriter resultSetWriter =
+
engineExecutorContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE);
+
+ try {
+ List<String> colNames = resultSet.keys();
+
+ if (CollectionUtils.isEmpty(colNames)) {
+ throw new RuntimeException("Nebula columns is null.");
+ }
+
+ List<Column> columns =
+ colNames.stream()
+ .map(column -> new Column(column, DataType.toDataType("string"),
""))
+ .collect(Collectors.toList());
+ columnCount = columns.size();
+ resultSetWriter.addMetaData(new TableMetaData(columns.toArray(new
Column[0])));
+ if (!resultSet.isEmpty()) {
+ for (int i = 0; i < resultSet.rowsSize(); i++) {
+ ResultSet.Record record = resultSet.rowValues(i);
+ if (record != null) {
+ String[] rowArray =
+ record.values().stream()
+ .map(
+ x -> {
+ try {
+ return x.asString();
+ } catch (Exception e) {
+ return "";
+ }
+ })
+ .toArray(String[]::new);
+ resultSetWriter.addRecord(new TableRecord(rowArray));
+ }
+ }
+ engineExecutorContext.pushProgress(progress(taskId),
getProgressInfo(taskId));
+ }
+ } catch (Exception e) {
+ IOUtils.closeQuietly(resultSetWriter);
+ }
+ String message =
+ String.format("Fetched %d col(s) : %d row(s) in Nebula", columnCount,
resultSet.rowsSize());
+ logger.info(message);
+ engineExecutorContext.appendStdout(LogUtils.generateInfo(message));
+ engineExecutorContext.sendResultSet(resultSetWriter);
+ }
+
+ private ErrorExecuteResponse verifyServerError(
+ String taskId, EngineExecutionContext engineExecutorContext, ResultSet
resultSet)
+ throws ErrorException {
+ engineExecutorContext.pushProgress(progress(taskId),
getProgressInfo(taskId));
+
+ if (!resultSet.isSucceeded() || resultSet.getErrorCode() !=
ErrorCode.SUCCEEDED.getValue()) {
+ logger.error(
+ "Nebula execute failed (#{}): {}", resultSet.getErrorCode(),
resultSet.getErrorMessage());
+
engineExecutorContext.appendStdout(LogUtils.generateERROR(resultSet.getErrorMessage()));
+ return new ErrorExecuteResponse(resultSet.getErrorMessage(), null);
+ }
+ return null;
+ }
+
+ @Override
+ public void killAll() {
+ Iterator<Session> iterator = sessionCache.values().iterator();
+ while (iterator.hasNext()) {
+ Session session = iterator.next();
+ if (session != null) {
+ session.release();
+ }
+ }
+ sessionCache.clear();
+ }
+
+ @Override
+ public void close() {
+ killAll();
+ super.close();
+ }
+}
diff --git
a/linkis-engineconn-plugins/nebula/src/main/resources/linkis-engineconn.properties
b/linkis-engineconn-plugins/nebula/src/main/resources/linkis-engineconn.properties
new file mode 100644
index 000000000..059eccb79
--- /dev/null
+++
b/linkis-engineconn-plugins/nebula/src/main/resources/linkis-engineconn.properties
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+wds.linkis.server.version=v1
+#wds.linkis.engineconn.debug.enable=true
+#wds.linkis.keytab.enable=true
+wds.linkis.engineconn.plugin.default.class=org.apache.linkis.engineplugin.nebula.NebulaEngineConnPlugin
+
+wds.linkis.engineconn.support.parallelism=true
+
+wds.linkis.engineconn.max.free.time=0
\ No newline at end of file
diff --git a/linkis-engineconn-plugins/nebula/src/main/resources/log4j2.xml
b/linkis-engineconn-plugins/nebula/src/main/resources/log4j2.xml
new file mode 100644
index 000000000..2cd3e264c
--- /dev/null
+++ b/linkis-engineconn-plugins/nebula/src/main/resources/log4j2.xml
@@ -0,0 +1,91 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration status="error" monitorInterval="30">
+ <appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t]
%logger{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
+ </Console>
+
+ <Send name="Send" >
+ <Filters>
+ <ThresholdFilter level="WARN" onMatch="ACCEPT"
onMismatch="DENY" />
+ </Filters>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t]
%logger{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
+ </Send>
+
+ <Send name="SendPackage" >
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t]
%logger{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
+ </Send>
+
+ <Console name="stderr" target="SYSTEM_ERR">
+ <ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY"
/>
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M
[JobId-%X{jobId}] - %msg%xEx%n"/>
+ </Console>
+ </appenders>
+
+ <loggers>
+ <root level="INFO">
+ <appender-ref ref="stderr"/>
+ <appender-ref ref="Console"/>
+ <appender-ref ref="Send"/>
+ </root>
+ <logger name="org.apache.hadoop.hive.ql.exec.StatsTask" level="info"
additivity="true">
+ <appender-ref ref="SendPackage"/>
+ </logger>
+ <logger
name="org.springframework.boot.diagnostics.LoggingFailureAnalysisReporter"
level="error" additivity="true">
+ <appender-ref ref="stderr"/>
+ </logger>
+ <logger name="com.netflix.discovery" level="warn" additivity="true">
+ <appender-ref ref="Send"/>
+ </logger>
+ <logger name="org.apache.hadoop.yarn" level="warn" additivity="true">
+ <appender-ref ref="Send"/>
+ </logger>
+ <logger name="org.springframework" level="warn" additivity="true">
+ <appender-ref ref="Send"/>
+ </logger>
+ <logger name="org.apache.linkis.server.security" level="warn"
additivity="true">
+ <appender-ref ref="Send"/>
+ </logger>
+ <logger name="org.apache.hadoop.hive.ql.exec.mr.ExecDriver"
level="fatal" additivity="true">
+ <appender-ref ref="Send"/>
+ </logger>
+ <logger name="org.apache.hadoop.hdfs.KeyProviderCache" level="fatal"
additivity="true">
+ <appender-ref ref="Send"/>
+ </logger>
+ <logger name="org.spark_project.jetty" level="ERROR" additivity="true">
+ <appender-ref ref="Send"/>
+ </logger>
+ <logger name="org.eclipse.jetty" level="ERROR" additivity="true">
+ <appender-ref ref="Send"/>
+ </logger>
+ <logger name="org.springframework" level="ERROR" additivity="true">
+ <appender-ref ref="Send"/>
+ </logger>
+ <logger name="org.reflections.Reflections" level="ERROR"
additivity="true">
+ <appender-ref ref="Send"/>
+ </logger>
+
+ <logger name="org.apache.hadoop.ipc.Client" level="ERROR"
additivity="true">
+ <appender-ref ref="Send"/>
+ </logger>
+
+ </loggers>
+</configuration>
diff --git
a/linkis-engineconn-plugins/nebula/src/main/scala/org/apache/linkis/engineplugin/nebula/factory/NebulaEngineConnFactory.scala
b/linkis-engineconn-plugins/nebula/src/main/scala/org/apache/linkis/engineplugin/nebula/factory/NebulaEngineConnFactory.scala
new file mode 100644
index 000000000..2f7c3c8fb
--- /dev/null
+++
b/linkis-engineconn-plugins/nebula/src/main/scala/org/apache/linkis/engineplugin/nebula/factory/NebulaEngineConnFactory.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.nebula.factory
+
+import org.apache.linkis.engineconn.common.creation.EngineCreationContext
+import org.apache.linkis.engineconn.common.engineconn.EngineConn
+import
org.apache.linkis.engineconn.computation.executor.creation.ComputationSingleExecutorEngineConnFactory
+import org.apache.linkis.engineconn.executor.entity.LabelExecutor
+import org.apache.linkis.engineplugin.nebula.conf.NebulaConfiguration
+import org.apache.linkis.engineplugin.nebula.executor.NebulaEngineConnExecutor
+import org.apache.linkis.manager.label.entity.engine.{EngineType, RunType}
+import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType
+import org.apache.linkis.manager.label.entity.engine.RunType.RunType
+
+class NebulaEngineConnFactory extends
ComputationSingleExecutorEngineConnFactory {
+
+ override def newExecutor(
+ id: Int,
+ engineCreationContext: EngineCreationContext,
+ engineConn: EngineConn
+ ): LabelExecutor = {
+ new
NebulaEngineConnExecutor(NebulaConfiguration.ENGINE_DEFAULT_LIMIT.getValue, id)
+ }
+
+ override protected def getEngineConnType: EngineType = EngineType.NEBULA
+
+ override protected def getRunType: RunType = RunType.NEBULA_SQL
+
+}
diff --git a/pom.xml b/pom.xml
index 001e8189d..f9930b12b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -128,6 +128,7 @@
<openlookeng.version>1.5.0</openlookeng.version>
<pipeline.version>1</pipeline.version>
<presto.version>0.234</presto.version>
+ <nebula.version>3.0.0</nebula.version>
<python.version>python2</python.version>
<seatunnel.version>2.1.2</seatunnel.version>
<shell.version>1</shell.version>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]