This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ecb22e37 [Feature] Add nebula engine to linkis (#4903)
4ecb22e37 is described below

commit 4ecb22e373d7407226acc7ac7c63d698f11b394f
Author: ChengJie1053 <[email protected]>
AuthorDate: Wed Sep 20 20:32:46 2023 +0800

    [Feature] Add nebula engine to linkis (#4903)
    
    * Add nebula engine to linkis
    
    * Reuse nebula session
    
    * Code optimization and remove wds prefix
---
 .../linkis/ujes/jdbc/LinkisSQLConnection.scala     |   1 +
 .../linkis/manager/am/conf/AMConfiguration.java    |   7 +-
 .../manager/label/conf/LabelCommonConfig.java      |   3 +
 .../manager/label/entity/engine/EngineType.scala   |   3 +
 .../manager/label/entity/engine/RunType.scala      |   1 +
 .../label/utils/EngineTypeLabelCreator.java        |   2 +
 linkis-engineconn-plugins/nebula/pom.xml           | 110 ++++++
 .../nebula/src/main/assembly/distribution.xml      |  71 ++++
 .../nebula/NebulaEngineConnPlugin.java             |  72 ++++
 .../NebulaProcessEngineConnLaunchBuilder.java      |  22 ++
 .../nebula/conf/NebulaConfiguration.java           |  50 +++
 .../engineplugin/nebula/conf/NebulaEngineConf.java |  53 +++
 .../nebula/errorcode/NebulaErrorCodeSummary.java   |  47 +++
 .../nebula/exception/NebulaClientException.java    |  27 ++
 .../nebula/exception/NebulaExecuteError.java       |  27 ++
 .../exception/NebulaStateInvalidException.java     |  27 ++
 .../nebula/executor/NebulaEngineConnExecutor.java  | 388 +++++++++++++++++++++
 .../main/resources/linkis-engineconn.properties    |  23 ++
 .../nebula/src/main/resources/log4j2.xml           |  91 +++++
 .../nebula/factory/NebulaEngineConnFactory.scala   |  44 +++
 pom.xml                                            |   1 +
 21 files changed, 1067 insertions(+), 3 deletions(-)

diff --git 
a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala
 
b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala
index b80069876..e111615ce 100644
--- 
a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala
+++ 
b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala
@@ -431,6 +431,7 @@ class LinkisSQLConnection(private[jdbc] val ujesClient: 
UJESClient, props: Prope
       case EngineType.HIVE => RunType.HIVE
       case EngineType.TRINO => RunType.TRINO_SQL
       case EngineType.PRESTO => RunType.PRESTO_SQL
+      case EngineType.NEBULA => RunType.NEBULA_SQL
       case EngineType.ELASTICSEARCH => RunType.ES_SQL
       case EngineType.JDBC => RunType.JDBC
       case EngineType.PYTHON => RunType.SHELL
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java
 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java
index d916387d2..8aba14267 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java
+++ 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java
@@ -68,7 +68,8 @@ public class AMConfiguration {
 
   public static final CommonVars<String> MULTI_USER_ENGINE_TYPES =
       CommonVars.apply(
-          "wds.linkis.multi.user.engine.types", 
"jdbc,es,presto,io_file,appconn,openlookeng,trino");
+          "wds.linkis.multi.user.engine.types",
+          "jdbc,es,presto,io_file,appconn,openlookeng,trino,nebula");
 
   public static final CommonVars<String> ALLOW_BATCH_KILL_ENGINE_TYPES =
       CommonVars.apply("wds.linkis.allow.batch.kill.engine.types", 
"spark,hive,python");
@@ -104,8 +105,8 @@ public class AMConfiguration {
   public static String getDefaultMultiEngineUser() {
     String jvmUser = Utils.getJvmUser();
     return String.format(
-        "{jdbc:\"%s\", es: \"%s\", presto:\"%s\", appconn:\"%s\", 
openlookeng:\"%s\", trino:\"%s\", io_file:\"root\"}",
-        jvmUser, jvmUser, jvmUser, jvmUser, jvmUser, jvmUser);
+        "{jdbc:\"%s\", es: \"%s\", presto:\"%s\", appconn:\"%s\", 
openlookeng:\"%s\", trino:\"%s\", nebula:\"%s\",io_file:\"root\"}",
+        jvmUser, jvmUser, jvmUser, jvmUser, jvmUser, jvmUser, jvmUser);
   }
 
   public static boolean isMultiUserEngine(String engineType) {
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/conf/LabelCommonConfig.java
 
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/conf/LabelCommonConfig.java
index d0854186a..f4b52a156 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/conf/LabelCommonConfig.java
+++ 
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/conf/LabelCommonConfig.java
@@ -69,6 +69,9 @@ public class LabelCommonConfig {
   public static final CommonVars<String> DATAX_ENGINE_VERSION =
       CommonVars.apply("wds.linkis.datax.engine.version", "3.0.0");
 
+  public static final CommonVars<String> NEBULA_ENGINE_VERSION =
+      CommonVars.apply("wds.linkis.nebula.engine.version", "3.0.0");
+
   public static final CommonVars<String> PRESTO_ENGINE_VERSION =
       CommonVars.apply("wds.linkis.presto.engine.version", "0.234");
 
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineType.scala
 
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineType.scala
index d47bb8ec3..77e7204a7 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineType.scala
+++ 
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineType.scala
@@ -45,6 +45,8 @@ object EngineType extends Enumeration with Logging {
 
   val PRESTO = Value("presto")
 
+  val NEBULA = Value("nebula")
+
   val FLINK = Value("flink")
 
   val APPCONN = Value("appconn")
@@ -89,6 +91,7 @@ object EngineType extends Enumeration with Logging {
     case _ if IO_ENGINE_HDFS.toString.equalsIgnoreCase(str) => IO_ENGINE_HDFS
     case _ if PIPELINE.toString.equalsIgnoreCase(str) => PIPELINE
     case _ if PRESTO.toString.equalsIgnoreCase(str) => PRESTO
+    case _ if NEBULA.toString.equalsIgnoreCase(str) => NEBULA
     case _ if FLINK.toString.equalsIgnoreCase(str) => FLINK
     case _ if APPCONN.toString.equals(str) => APPCONN
     case _ if SQOOP.toString.equalsIgnoreCase(str) => SQOOP
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
 
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
index 21a067ed4..abb3e010f 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
+++ 
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
@@ -35,6 +35,7 @@ object RunType extends Enumeration {
   val PIPELINE = Value("pipeline")
   val JDBC = Value("jdbc")
   val PRESTO_SQL = Value("psql")
+  val NEBULA_SQL = Value("ngql")
   val JAR = Value("jar")
   val APPCONN = Value("appconn")
   val FUNCTION_MDQ_TYPE = Value("function.mdq")
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/utils/EngineTypeLabelCreator.java
 
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/utils/EngineTypeLabelCreator.java
index 0d6ae3c5c..e90f282aa 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/utils/EngineTypeLabelCreator.java
+++ 
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/utils/EngineTypeLabelCreator.java
@@ -69,6 +69,8 @@ public class EngineTypeLabelCreator {
               EngineType.FLINK().toString(), 
LabelCommonConfig.FLINK_ENGINE_VERSION.getValue());
           defaultVersion.put(
               EngineType.PRESTO().toString(), 
LabelCommonConfig.PRESTO_ENGINE_VERSION.getValue());
+          defaultVersion.put(
+              EngineType.NEBULA().toString(), 
LabelCommonConfig.NEBULA_ENGINE_VERSION.getValue());
           defaultVersion.put(
               EngineType.SQOOP().toString(), 
LabelCommonConfig.SQOOP_ENGINE_VERSION.getValue());
           defaultVersion.put(
diff --git a/linkis-engineconn-plugins/nebula/pom.xml 
b/linkis-engineconn-plugins/nebula/pom.xml
new file mode 100644
index 000000000..bfe971456
--- /dev/null
+++ b/linkis-engineconn-plugins/nebula/pom.xml
@@ -0,0 +1,110 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.linkis</groupId>
+    <artifactId>linkis</artifactId>
+    <version>${revision}</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>linkis-engineplugin-nebula</artifactId>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.linkis</groupId>
+      <artifactId>linkis-engineconn-plugin-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.linkis</groupId>
+      <artifactId>linkis-computation-engineconn</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.linkis</groupId>
+      <artifactId>linkis-storage</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.linkis</groupId>
+      <artifactId>linkis-rpc</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.linkis</groupId>
+      <artifactId>linkis-common</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <!-- nebula -->
+    <dependency>
+      <groupId>com.vesoft</groupId>
+      <artifactId>client</artifactId>
+      <version>${nebula.version}</version>
+    </dependency>
+  </dependencies>
+
+  <build>
+
+    <plugins>
+      <plugin>
+        <groupId>net.alchim31.maven</groupId>
+        <artifactId>scala-maven-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <inherited>false</inherited>
+        <configuration>
+          <skipAssembly>false</skipAssembly>
+          <finalName>out</finalName>
+          <appendAssemblyId>false</appendAssemblyId>
+          <attach>false</attach>
+          <descriptors>
+            <descriptor>src/main/assembly/distribution.xml</descriptor>
+          </descriptors>
+        </configuration>
+        <executions>
+          <execution>
+            <id>make-assembly</id>
+            <goals>
+              <goal>single</goal>
+            </goals>
+            <phase>package</phase>
+            <configuration>
+              <descriptors>
+                <descriptor>src/main/assembly/distribution.xml</descriptor>
+              </descriptors>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
diff --git 
a/linkis-engineconn-plugins/nebula/src/main/assembly/distribution.xml 
b/linkis-engineconn-plugins/nebula/src/main/assembly/distribution.xml
new file mode 100644
index 000000000..eaa9c296f
--- /dev/null
+++ b/linkis-engineconn-plugins/nebula/src/main/assembly/distribution.xml
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.1.1"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+          xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.1.1 
https://maven.apache.org/xsd/assembly-2.1.1.xsd";>
+    <id>linkis-engineplugin-nebula</id>
+    <formats>
+        <format>dir</format>
+        <format>zip</format>
+    </formats>
+    <includeBaseDirectory>true</includeBaseDirectory>
+    <baseDirectory>nebula</baseDirectory>
+
+    <dependencySets>
+        <dependencySet>
+            <!-- Enable access to all projects in the current multimodule 
build! <useAllReactorProjects>true</useAllReactorProjects> -->
+            <!-- Now, select which projects to include in this module-set. -->
+            <outputDirectory>/dist/${nebula.version}/lib</outputDirectory>
+            <useProjectArtifact>true</useProjectArtifact>
+            <useTransitiveDependencies>true</useTransitiveDependencies>
+            <unpack>false</unpack>
+            <useStrictFiltering>false</useStrictFiltering>
+            <useTransitiveFiltering>true</useTransitiveFiltering>
+
+        </dependencySet>
+    </dependencySets>
+
+    <fileSets>
+
+        <fileSet>
+            <directory>${basedir}/src/main/resources</directory>
+            <includes>
+                <include>linkis-engineconn.properties</include>
+                <include>log4j2.xml</include>
+            </includes>
+            <fileMode>0777</fileMode>
+            <outputDirectory>dist/${nebula.version}/conf</outputDirectory>
+            <lineEnding>unix</lineEnding>
+        </fileSet>
+
+        <fileSet>
+            <directory>${basedir}/target</directory>
+            <includes>
+                <include>*.jar</include>
+            </includes>
+            <excludes>
+                <exclude>*doc.jar</exclude>
+            </excludes>
+            <fileMode>0777</fileMode>
+            <outputDirectory>plugin/${nebula.version}</outputDirectory>
+        </fileSet>
+
+    </fileSets>
+
+</assembly>
+
diff --git 
a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/NebulaEngineConnPlugin.java
 
b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/NebulaEngineConnPlugin.java
new file mode 100644
index 000000000..a22d2c8a8
--- /dev/null
+++ 
b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/NebulaEngineConnPlugin.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.nebula;
+
+import 
org.apache.linkis.engineplugin.nebula.builder.NebulaProcessEngineConnLaunchBuilder;
+import org.apache.linkis.engineplugin.nebula.factory.NebulaEngineConnFactory;
+import org.apache.linkis.manager.engineplugin.common.EngineConnPlugin;
+import 
org.apache.linkis.manager.engineplugin.common.creation.EngineConnFactory;
+import 
org.apache.linkis.manager.engineplugin.common.launch.EngineConnLaunchBuilder;
+import 
org.apache.linkis.manager.engineplugin.common.resource.EngineResourceFactory;
+import 
org.apache.linkis.manager.engineplugin.common.resource.GenericEngineResourceFactory;
+import org.apache.linkis.manager.label.entity.Label;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class NebulaEngineConnPlugin implements EngineConnPlugin {
+  private Object resourceLocker = new Object();
+  private Object engineFactoryLocker = new Object();
+  private volatile EngineResourceFactory engineResourceFactory;
+  private volatile EngineConnFactory engineFactory;
+  private List<Label<?>> defaultLabels = new ArrayList<>();
+
+  @Override
+  public void init(Map<String, Object> params) {}
+
+  @Override
+  public EngineResourceFactory getEngineResourceFactory() {
+    if (null == engineResourceFactory) {
+      synchronized (resourceLocker) {
+        engineResourceFactory = new GenericEngineResourceFactory();
+      }
+    }
+    return engineResourceFactory;
+  }
+
+  @Override
+  public EngineConnLaunchBuilder getEngineConnLaunchBuilder() {
+    return new NebulaProcessEngineConnLaunchBuilder();
+  }
+
+  @Override
+  public EngineConnFactory getEngineConnFactory() {
+    if (null == engineFactory) {
+      synchronized (engineFactoryLocker) {
+        engineFactory = new NebulaEngineConnFactory();
+      }
+    }
+    return engineFactory;
+  }
+
+  @Override
+  public List<Label<?>> getDefaultLabels() {
+    return defaultLabels;
+  }
+}
diff --git 
a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/builder/NebulaProcessEngineConnLaunchBuilder.java
 
b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/builder/NebulaProcessEngineConnLaunchBuilder.java
new file mode 100644
index 000000000..fb95910cf
--- /dev/null
+++ 
b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/builder/NebulaProcessEngineConnLaunchBuilder.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.nebula.builder;
+
+import 
org.apache.linkis.manager.engineplugin.common.launch.process.JavaProcessEngineConnLaunchBuilder;
+
+public class NebulaProcessEngineConnLaunchBuilder extends 
JavaProcessEngineConnLaunchBuilder {}
diff --git 
a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaConfiguration.java
 
b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaConfiguration.java
new file mode 100644
index 000000000..dfbb7a8b1
--- /dev/null
+++ 
b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaConfiguration.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.nebula.conf;
+
+import org.apache.linkis.common.conf.CommonVars;
+
+public class NebulaConfiguration {
+
+  public static final CommonVars<Integer> ENGINE_CONCURRENT_LIMIT =
+      CommonVars.apply("linkis.engineconn.concurrent.limit", 100);
+
+  public static final CommonVars<Integer> ENGINE_DEFAULT_LIMIT =
+      CommonVars.apply("linkis.nebula.default.limit", 5000);
+
+  public static final CommonVars<String> NEBULA_HOST =
+      CommonVars.apply("linkis.nebula.host", "127.0.0.1");
+
+  public static final CommonVars<Integer> NEBULA_PORT =
+      CommonVars.apply("linkis.nebula.port", 9669);
+
+  public static final CommonVars<Integer> NEBULA_MAX_CONN_SIZE =
+      CommonVars.apply("linkis.nebula.max.conn.size", 100);
+
+  public static final CommonVars<String> NEBULA_USER_NAME =
+      CommonVars.apply("linkis.nebula.username", "root");
+
+  public static final CommonVars<String> NEBULA_PASSWORD =
+      CommonVars.apply("linkis.nebula.password", "nebula");
+
+  public static final CommonVars<Boolean> NEBULA_RECONNECT_ENABLED =
+      CommonVars.apply(
+          "linkis.nebula.reconnect.enabled",
+          false,
+          "whether to retry after the connection is disconnected");
+}
diff --git 
a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaEngineConf.java
 
b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaEngineConf.java
new file mode 100644
index 000000000..92cc32ca0
--- /dev/null
+++ 
b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaEngineConf.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.nebula.conf;
+
+import org.apache.linkis.common.conf.Configuration;
+import 
org.apache.linkis.governance.common.protocol.conf.RequestQueryEngineConfigWithGlobalConfig;
+import org.apache.linkis.governance.common.protocol.conf.ResponseQueryConfig;
+import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel;
+import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel;
+import org.apache.linkis.protocol.CacheableProtocol;
+import org.apache.linkis.rpc.RPCMapCache;
+
+import java.util.Map;
+
+import scala.Tuple2;
+
+public class NebulaEngineConf
+    extends RPCMapCache<Tuple2<UserCreatorLabel, EngineTypeLabel>, String, 
String> {
+
+  public NebulaEngineConf() {
+    
super(Configuration.CLOUD_CONSOLE_CONFIGURATION_SPRING_APPLICATION_NAME().getValue());
+  }
+
+  @Override
+  public CacheableProtocol createRequest(Tuple2<UserCreatorLabel, 
EngineTypeLabel> labelTuple) {
+    return new RequestQueryEngineConfigWithGlobalConfig(labelTuple._1(), 
labelTuple._2(), null);
+  }
+
+  @Override
+  public Map<String, String> createMap(Object obj) {
+    if (obj instanceof ResponseQueryConfig) {
+      ResponseQueryConfig response = (ResponseQueryConfig) obj;
+      return response.getKeyAndValue();
+    } else {
+      return null;
+    }
+  }
+}
diff --git 
a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/errorcode/NebulaErrorCodeSummary.java
 
b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/errorcode/NebulaErrorCodeSummary.java
new file mode 100644
index 000000000..80aa2e197
--- /dev/null
+++ 
b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/errorcode/NebulaErrorCodeSummary.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.nebula.errorcode;
+
+import org.apache.linkis.common.errorcode.ErrorCodeUtils;
+import org.apache.linkis.common.errorcode.LinkisErrorCode;
+
+public enum NebulaErrorCodeSummary implements LinkisErrorCode {
+  NEBULA_CLIENT_INITIALIZATION_FAILED(28001, "Nebula client initialization 
failed(Nebula客户端初始化失败)"),
+  NEBULA_EXECUTOR_ERROR(28002, "Nebula executor error(Nebula执行异常)"),
+  NEBULA_CLIENT_ERROR(28003, "Nebula client error(Nebula客户端异常)");
+
+  private final int errorCode;
+
+  private final String errorDesc;
+
+  NebulaErrorCodeSummary(int errorCode, String errorDesc) {
+    ErrorCodeUtils.validateErrorCode(errorCode, 26000, 29999);
+    this.errorCode = errorCode;
+    this.errorDesc = errorDesc;
+  }
+
+  @Override
+  public int getErrorCode() {
+    return errorCode;
+  }
+
+  @Override
+  public String getErrorDesc() {
+    return errorDesc;
+  }
+}
diff --git 
a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaClientException.java
 
b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaClientException.java
new file mode 100644
index 000000000..59b3620b0
--- /dev/null
+++ 
b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaClientException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.nebula.exception;
+
+import org.apache.linkis.common.exception.ErrorException;
+
+public class NebulaClientException extends ErrorException {
+
+  public NebulaClientException(int errorCode, String message) {
+    super(errorCode, message);
+  }
+}
diff --git 
a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaExecuteError.java
 
b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaExecuteError.java
new file mode 100644
index 000000000..f2c164d5a
--- /dev/null
+++ 
b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaExecuteError.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.nebula.exception;
+
+import org.apache.linkis.common.exception.ErrorException;
+
+public class NebulaExecuteError extends ErrorException {
+
+  public NebulaExecuteError(int errorCode, String message) {
+    super(errorCode, message);
+  }
+}
diff --git 
a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaStateInvalidException.java
 
b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaStateInvalidException.java
new file mode 100644
index 000000000..202d478b7
--- /dev/null
+++ 
b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaStateInvalidException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.nebula.exception;
+
+import org.apache.linkis.common.exception.ErrorException;
+
+public class NebulaStateInvalidException extends ErrorException {
+
+  public NebulaStateInvalidException(int errorCode, String message) {
+    super(errorCode, message);
+  }
+}
diff --git 
a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/executor/NebulaEngineConnExecutor.java
 
b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/executor/NebulaEngineConnExecutor.java
new file mode 100644
index 000000000..188ea60ec
--- /dev/null
+++ 
b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/executor/NebulaEngineConnExecutor.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.nebula.executor;
+
+import org.apache.linkis.common.exception.ErrorException;
+import org.apache.linkis.common.io.resultset.ResultSetWriter;
+import org.apache.linkis.common.log.LogUtils;
+import org.apache.linkis.common.utils.OverloadUtils;
+import org.apache.linkis.engineconn.common.conf.EngineConnConf;
+import org.apache.linkis.engineconn.common.conf.EngineConnConstant;
+import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask;
+import 
org.apache.linkis.engineconn.computation.executor.execute.ConcurrentComputationExecutor;
+import 
org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext;
+import org.apache.linkis.engineconn.core.EngineConnObject;
+import org.apache.linkis.engineplugin.nebula.conf.NebulaConfiguration;
+import org.apache.linkis.engineplugin.nebula.conf.NebulaEngineConf;
+import org.apache.linkis.engineplugin.nebula.errorcode.NebulaErrorCodeSummary;
+import org.apache.linkis.engineplugin.nebula.exception.NebulaClientException;
+import org.apache.linkis.engineplugin.nebula.exception.NebulaExecuteError;
+import org.apache.linkis.governance.common.paser.SQLCodeParser;
+import org.apache.linkis.manager.common.entity.resource.CommonNodeResource;
+import org.apache.linkis.manager.common.entity.resource.LoadResource;
+import org.apache.linkis.manager.common.entity.resource.NodeResource;
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils;
+import org.apache.linkis.manager.label.entity.Label;
+import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel;
+import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel;
+import org.apache.linkis.protocol.engine.JobProgressInfo;
+import org.apache.linkis.rpc.Sender;
+import org.apache.linkis.scheduler.executer.ErrorExecuteResponse;
+import org.apache.linkis.scheduler.executer.ExecuteResponse;
+import org.apache.linkis.scheduler.executer.SuccessExecuteResponse;
+import org.apache.linkis.storage.domain.Column;
+import org.apache.linkis.storage.domain.DataType;
+import org.apache.linkis.storage.resultset.ResultSetFactory;
+import org.apache.linkis.storage.resultset.table.TableMetaData;
+import org.apache.linkis.storage.resultset.table.TableRecord;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import org.springframework.util.CollectionUtils;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import scala.Tuple2;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.vesoft.nebula.ErrorCode;
+import com.vesoft.nebula.client.graph.NebulaPoolConfig;
+import com.vesoft.nebula.client.graph.data.HostAddress;
+import com.vesoft.nebula.client.graph.data.ResultSet;
+import com.vesoft.nebula.client.graph.net.NebulaPool;
+import com.vesoft.nebula.client.graph.net.Session;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NebulaEngineConnExecutor extends ConcurrentComputationExecutor {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(NebulaEngineConnExecutor.class);
+  private int id;
+  private List<Label<?>> executorLabels = new ArrayList<>(2);
+  private Map<String, Session> sessionCache = new ConcurrentHashMap<>();
+
+  private Map<String, String> configMap = new HashMap<>();
+
+  private Cache<String, NebulaPool> nebulaPoolCache =
+      CacheBuilder.newBuilder()
+          .expireAfterAccess(
+              
Long.valueOf(EngineConnConf.ENGINE_TASK_EXPIRE_TIME().getValue().toString()),
+              TimeUnit.MILLISECONDS)
+          .maximumSize(EngineConnConstant.MAX_TASK_NUM())
+          .build();
+
+  public NebulaEngineConnExecutor(int outputPrintLimit, int id) {
+    super(outputPrintLimit);
+    this.id = id;
+  }
+
+  @Override
+  public void init() {
+    setCodeParser(new SQLCodeParser());
+    super.init();
+  }
+
+  @Override
+  public ExecuteResponse execute(EngineConnTask engineConnTask) {
+    Optional<Label<?>> userCreatorLabelOp =
+        Arrays.stream(engineConnTask.getLables())
+            .filter(label -> label instanceof UserCreatorLabel)
+            .findFirst();
+    Optional<Label<?>> engineTypeLabelOp =
+        Arrays.stream(engineConnTask.getLables())
+            .filter(label -> label instanceof EngineTypeLabel)
+            .findFirst();
+
+    Map<String, String> configMap = null;
+    if (userCreatorLabelOp.isPresent() && engineTypeLabelOp.isPresent()) {
+      UserCreatorLabel userCreatorLabel = (UserCreatorLabel) 
userCreatorLabelOp.get();
+      EngineTypeLabel engineTypeLabel = (EngineTypeLabel) 
engineTypeLabelOp.get();
+
+      configMap =
+          new NebulaEngineConf().getCacheMap(new Tuple2<>(userCreatorLabel, 
engineTypeLabel));
+    }
+
+    nebulaPoolCache.put(
+        engineConnTask.getTaskId(), 
getNebulaPool(engineConnTask.getProperties(), configMap));
+    return super.execute(engineConnTask);
+  }
+
+  @Override
+  public ExecuteResponse executeLine(EngineExecutionContext 
engineExecutorContext, String code) {
+    String realCode;
+    if (StringUtils.isBlank(code)) {
+      realCode = "SHOW SPACES";
+    } else {
+      realCode = code.trim();
+    }
+    logger.info("Nebula client begins to run ngql code:\n {}", realCode);
+
+    String taskId = engineExecutorContext.getJobId().get();
+    NebulaPool nebulaPool = nebulaPoolCache.getIfPresent(taskId);
+    Session session = getSession(taskId, nebulaPool);
+
+    initialStatusUpdates(taskId, engineExecutorContext, session);
+    ResultSet resultSet = null;
+
+    try {
+      resultSet = session.execute(code);
+    } catch (Exception e) {
+      logger.error("Nebula executor error.");
+      throw new NebulaExecuteError(
+          NebulaErrorCodeSummary.NEBULA_EXECUTOR_ERROR.getErrorCode(),
+          NebulaErrorCodeSummary.NEBULA_EXECUTOR_ERROR.getErrorDesc());
+    }
+
+    if (resultSet.isSucceeded() && !resultSet.isEmpty()) {
+      queryOutput(taskId, engineExecutorContext, resultSet);
+    }
+    ErrorExecuteResponse errorResponse = null;
+    try {
+      errorResponse = verifyServerError(taskId, engineExecutorContext, 
resultSet);
+    } catch (ErrorException e) {
+      logger.error("Nebula execute failed (#{}): {}", e.getErrCode(), 
e.getMessage());
+    }
+    if (errorResponse == null) {
+      return new SuccessExecuteResponse();
+    } else {
+      return errorResponse;
+    }
+  }
+
+  @Override
+  public ExecuteResponse executeCompletely(
+      EngineExecutionContext engineExecutorContext, String code, String 
completedLine) {
+    return null;
+  }
+
+  @Override
+  public float progress(String taskID) {
+    return 0.0f;
+  }
+
+  @Override
+  public JobProgressInfo[] getProgressInfo(String taskID) {
+    return new JobProgressInfo[0];
+  }
+
+  @Override
+  public void killTask(String taskId) {
+    Session session = sessionCache.remove(taskId);
+    if (null != session) {
+      session.release();
+    }
+    super.killTask(taskId);
+  }
+
+  @Override
+  public List<Label<?>> getExecutorLabels() {
+    return executorLabels;
+  }
+
+  @Override
+  public void setExecutorLabels(List<Label<?>> labels) {
+    if (!CollectionUtils.isEmpty(labels)) {
+      executorLabels.clear();
+      executorLabels.addAll(labels);
+    }
+  }
+
+  @Override
+  public boolean supportCallBackLogs() {
+    return false;
+  }
+
+  @Override
+  public NodeResource requestExpectedResource(NodeResource expectedResource) {
+    return null;
+  }
+
+  @Override
+  public NodeResource getCurrentNodeResource() {
+    NodeResourceUtils.appendMemoryUnitIfMissing(
+        EngineConnObject.getEngineCreationContext().getOptions());
+
+    CommonNodeResource resource = new CommonNodeResource();
+    LoadResource usedResource = new 
LoadResource(OverloadUtils.getProcessMaxMemory(), 1);
+    resource.setUsedResource(usedResource);
+    return resource;
+  }
+
+  @Override
+  public String getId() {
+    return Sender.getThisServiceInstance().getInstance() + "_" + id;
+  }
+
+  @Override
+  public int getConcurrentLimit() {
+    return NebulaConfiguration.ENGINE_CONCURRENT_LIMIT.getValue();
+  }
+
+  private NebulaPool getNebulaPool(Map<String, Object> taskParams, Map<String, 
String> cacheMap) {
+    if (!CollectionUtils.isEmpty(cacheMap)) {
+      configMap.putAll(cacheMap);
+    }
+    taskParams.entrySet().stream()
+        .filter(entry -> entry.getValue() != null)
+        .forEach(entry -> configMap.put(entry.getKey(), 
String.valueOf(entry.getValue())));
+
+    String host = NebulaConfiguration.NEBULA_HOST.getValue(configMap);
+    Integer port = NebulaConfiguration.NEBULA_PORT.getValue(configMap);
+    Integer maxConnSize = 
NebulaConfiguration.NEBULA_MAX_CONN_SIZE.getValue(configMap);
+
+    NebulaPool nebulaPool = new NebulaPool();
+    Boolean initResult = false;
+    try {
+
+      NebulaPoolConfig nebulaPoolConfig = new NebulaPoolConfig();
+      nebulaPoolConfig.setMaxConnSize(maxConnSize);
+      List<HostAddress> addresses = Arrays.asList(new HostAddress(host, port));
+      initResult = nebulaPool.init(addresses, nebulaPoolConfig);
+    } catch (Exception e) {
+      logger.error("NebulaPool initialization failed.");
+      throw new NebulaClientException(
+          
NebulaErrorCodeSummary.NEBULA_CLIENT_INITIALIZATION_FAILED.getErrorCode(),
+          
NebulaErrorCodeSummary.NEBULA_CLIENT_INITIALIZATION_FAILED.getErrorDesc());
+    }
+    if (!initResult) {
+      logger.error("NebulaPool initialization failed.");
+      throw new NebulaClientException(
+          
NebulaErrorCodeSummary.NEBULA_CLIENT_INITIALIZATION_FAILED.getErrorCode(),
+          
NebulaErrorCodeSummary.NEBULA_CLIENT_INITIALIZATION_FAILED.getErrorDesc());
+    }
+    return nebulaPool;
+  }
+
+  private Session getSession(String taskId, NebulaPool nebulaPool) {
+    if (sessionCache.containsKey(taskId)
+        && sessionCache.get(taskId) != null
+        && sessionCache.get(taskId).ping()) {
+      return sessionCache.get(taskId);
+    } else {
+      Session session;
+      String username = 
NebulaConfiguration.NEBULA_USER_NAME.getValue(configMap);
+      String password = 
NebulaConfiguration.NEBULA_PASSWORD.getValue(configMap);
+      Boolean reconnect = 
NebulaConfiguration.NEBULA_RECONNECT_ENABLED.getValue(configMap);
+
+      try {
+        session = nebulaPool.getSession(username, password, reconnect);
+      } catch (Exception e) {
+        logger.error("Nebula Session initialization failed.");
+        throw new NebulaClientException(
+            
NebulaErrorCodeSummary.NEBULA_CLIENT_INITIALIZATION_FAILED.getErrorCode(),
+            
NebulaErrorCodeSummary.NEBULA_CLIENT_INITIALIZATION_FAILED.getErrorDesc());
+      }
+
+      sessionCache.put(taskId, session);
+      return session;
+    }
+  }
+
+  private void initialStatusUpdates(
+      String taskId, EngineExecutionContext engineExecutorContext, Session 
session) {
+    if (session.ping()) {
+      engineExecutorContext.pushProgress(progress(taskId), 
getProgressInfo(taskId));
+    }
+  }
+
+  private void queryOutput(
+      String taskId, EngineExecutionContext engineExecutorContext, ResultSet 
resultSet) {
+    int columnCount = 0;
+    ResultSetWriter resultSetWriter =
+        
engineExecutorContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE);
+
+    try {
+      List<String> colNames = resultSet.keys();
+
+      if (CollectionUtils.isEmpty(colNames)) {
+        throw new RuntimeException("Nebula columns is null.");
+      }
+
+      List<Column> columns =
+          colNames.stream()
+              .map(column -> new Column(column, DataType.toDataType("string"), 
""))
+              .collect(Collectors.toList());
+      columnCount = columns.size();
+      resultSetWriter.addMetaData(new TableMetaData(columns.toArray(new 
Column[0])));
+      if (!resultSet.isEmpty()) {
+        for (int i = 0; i < resultSet.rowsSize(); i++) {
+          ResultSet.Record record = resultSet.rowValues(i);
+          if (record != null) {
+            String[] rowArray =
+                record.values().stream()
+                    .map(
+                        x -> {
+                          try {
+                            return x.asString();
+                          } catch (Exception e) {
+                            return "";
+                          }
+                        })
+                    .toArray(String[]::new);
+            resultSetWriter.addRecord(new TableRecord(rowArray));
+          }
+        }
+        engineExecutorContext.pushProgress(progress(taskId), 
getProgressInfo(taskId));
+      }
+    } catch (Exception e) {
+      IOUtils.closeQuietly(resultSetWriter);
+    }
+    String message =
+        String.format("Fetched %d col(s) : %d row(s) in Nebula", columnCount, 
resultSet.rowsSize());
+    logger.info(message);
+    engineExecutorContext.appendStdout(LogUtils.generateInfo(message));
+    engineExecutorContext.sendResultSet(resultSetWriter);
+  }
+
+  private ErrorExecuteResponse verifyServerError(
+      String taskId, EngineExecutionContext engineExecutorContext, ResultSet 
resultSet)
+      throws ErrorException {
+    engineExecutorContext.pushProgress(progress(taskId), 
getProgressInfo(taskId));
+
+    if (!resultSet.isSucceeded() || resultSet.getErrorCode() != 
ErrorCode.SUCCEEDED.getValue()) {
+      logger.error(
+          "Nebula execute failed (#{}): {}", resultSet.getErrorCode(), 
resultSet.getErrorMessage());
+      
engineExecutorContext.appendStdout(LogUtils.generateERROR(resultSet.getErrorMessage()));
+      return new ErrorExecuteResponse(resultSet.getErrorMessage(), null);
+    }
+    return null;
+  }
+
+  @Override
+  public void killAll() {
+    Iterator<Session> iterator = sessionCache.values().iterator();
+    while (iterator.hasNext()) {
+      Session session = iterator.next();
+      if (session != null) {
+        session.release();
+      }
+    }
+    sessionCache.clear();
+  }
+
+  @Override
+  public void close() {
+    killAll();
+    super.close();
+  }
+}
diff --git 
a/linkis-engineconn-plugins/nebula/src/main/resources/linkis-engineconn.properties
 
b/linkis-engineconn-plugins/nebula/src/main/resources/linkis-engineconn.properties
new file mode 100644
index 000000000..059eccb79
--- /dev/null
+++ 
b/linkis-engineconn-plugins/nebula/src/main/resources/linkis-engineconn.properties
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+wds.linkis.server.version=v1
+#wds.linkis.engineconn.debug.enable=true
+#wds.linkis.keytab.enable=true
+wds.linkis.engineconn.plugin.default.class=org.apache.linkis.engineplugin.nebula.NebulaEngineConnPlugin
+
+wds.linkis.engineconn.support.parallelism=true
+
+wds.linkis.engineconn.max.free.time=0
\ No newline at end of file
diff --git a/linkis-engineconn-plugins/nebula/src/main/resources/log4j2.xml 
b/linkis-engineconn-plugins/nebula/src/main/resources/log4j2.xml
new file mode 100644
index 000000000..2cd3e264c
--- /dev/null
+++ b/linkis-engineconn-plugins/nebula/src/main/resources/log4j2.xml
@@ -0,0 +1,91 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~ 
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~ 
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+  
+<configuration status="error" monitorInterval="30">
+    <appenders>
+        <Console name="Console" target="SYSTEM_OUT">
+            <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
+            <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] 
%logger{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
+        </Console>
+
+        <Send name="Send" >
+            <Filters>
+                <ThresholdFilter level="WARN" onMatch="ACCEPT" 
onMismatch="DENY" />
+            </Filters>
+            <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] 
%logger{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
+        </Send>
+
+        <Send name="SendPackage" >
+            <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] 
%logger{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
+        </Send>
+
+        <Console name="stderr" target="SYSTEM_ERR">
+            <ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY" 
/>
+            <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M 
[JobId-%X{jobId}] - %msg%xEx%n"/>
+        </Console>
+    </appenders>
+
+    <loggers>
+      <root level="INFO">
+            <appender-ref ref="stderr"/>
+            <appender-ref ref="Console"/>
+            <appender-ref ref="Send"/>
+        </root>
+        <logger name="org.apache.hadoop.hive.ql.exec.StatsTask" level="info" 
additivity="true">
+            <appender-ref ref="SendPackage"/>
+        </logger>
+        <logger 
name="org.springframework.boot.diagnostics.LoggingFailureAnalysisReporter" 
level="error" additivity="true">
+            <appender-ref ref="stderr"/>
+        </logger>
+        <logger name="com.netflix.discovery" level="warn" additivity="true">
+            <appender-ref ref="Send"/>
+        </logger>
+        <logger name="org.apache.hadoop.yarn" level="warn" additivity="true">
+            <appender-ref ref="Send"/>
+        </logger>
+        <logger name="org.springframework" level="warn" additivity="true">
+            <appender-ref ref="Send"/>
+        </logger>
+        <logger name="org.apache.linkis.server.security" level="warn" 
additivity="true">
+            <appender-ref ref="Send"/>
+        </logger>
+        <logger name="org.apache.hadoop.hive.ql.exec.mr.ExecDriver" 
level="fatal" additivity="true">
+            <appender-ref ref="Send"/>
+        </logger>
+        <logger name="org.apache.hadoop.hdfs.KeyProviderCache" level="fatal" 
additivity="true">
+            <appender-ref ref="Send"/>
+        </logger>
+        <logger name="org.spark_project.jetty" level="ERROR" additivity="true">
+            <appender-ref ref="Send"/>
+        </logger>
+        <logger name="org.eclipse.jetty" level="ERROR" additivity="true">
+            <appender-ref ref="Send"/>
+        </logger>
+        <logger name="org.springframework" level="ERROR" additivity="true">
+            <appender-ref ref="Send"/>
+        </logger>
+        <logger name="org.reflections.Reflections" level="ERROR" 
additivity="true">
+            <appender-ref ref="Send"/>
+        </logger>
+
+        <logger name="org.apache.hadoop.ipc.Client" level="ERROR" 
additivity="true">
+            <appender-ref ref="Send"/>
+        </logger>
+
+   </loggers>
+</configuration>
diff --git 
a/linkis-engineconn-plugins/nebula/src/main/scala/org/apache/linkis/engineplugin/nebula/factory/NebulaEngineConnFactory.scala
 
b/linkis-engineconn-plugins/nebula/src/main/scala/org/apache/linkis/engineplugin/nebula/factory/NebulaEngineConnFactory.scala
new file mode 100644
index 000000000..2f7c3c8fb
--- /dev/null
+++ 
b/linkis-engineconn-plugins/nebula/src/main/scala/org/apache/linkis/engineplugin/nebula/factory/NebulaEngineConnFactory.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.nebula.factory
+
+import org.apache.linkis.engineconn.common.creation.EngineCreationContext
+import org.apache.linkis.engineconn.common.engineconn.EngineConn
+import 
org.apache.linkis.engineconn.computation.executor.creation.ComputationSingleExecutorEngineConnFactory
+import org.apache.linkis.engineconn.executor.entity.LabelExecutor
+import org.apache.linkis.engineplugin.nebula.conf.NebulaConfiguration
+import org.apache.linkis.engineplugin.nebula.executor.NebulaEngineConnExecutor
+import org.apache.linkis.manager.label.entity.engine.{EngineType, RunType}
+import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType
+import org.apache.linkis.manager.label.entity.engine.RunType.RunType
+
+class NebulaEngineConnFactory extends 
ComputationSingleExecutorEngineConnFactory {
+
+  override def newExecutor(
+      id: Int,
+      engineCreationContext: EngineCreationContext,
+      engineConn: EngineConn
+  ): LabelExecutor = {
+    new 
NebulaEngineConnExecutor(NebulaConfiguration.ENGINE_DEFAULT_LIMIT.getValue, id)
+  }
+
+  override protected def getEngineConnType: EngineType = EngineType.NEBULA
+
+  override protected def getRunType: RunType = RunType.NEBULA_SQL
+
+}
diff --git a/pom.xml b/pom.xml
index 001e8189d..f9930b12b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -128,6 +128,7 @@
     <openlookeng.version>1.5.0</openlookeng.version>
     <pipeline.version>1</pipeline.version>
     <presto.version>0.234</presto.version>
+    <nebula.version>3.0.0</nebula.version>
     <python.version>python2</python.version>
     <seatunnel.version>2.1.2</seatunnel.version>
     <shell.version>1</shell.version>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to