This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/master by this push:
     new a94c87653 spark support yarn cluster (#4850)
a94c87653 is described below

commit a94c87653f6ee4c0303876fb1349dbd24e0d614a
Author: ChengJie1053 <[email protected]>
AuthorDate: Mon Aug 14 19:11:01 2023 +0800

    spark support yarn cluster (#4850)
    
    * spark support yarn cluster
    
    * spark support yarn cluster
    
    * spark support yarn cluster
    
    * spark support yarn cluster
    
    * spark support yarn cluster
    
    * LinkisManagerApplication Remove useless code
    
    * spark support yarn cluster
    
    * spark support yarn cluster
    
    * spark support yarn cluster
    
    * spark support yarn cluster
    
    * spark support yarn cluster
    
    * spark support yarn cluster
---
 docs/configuration/spark.md                        |  3 +
 .../service/impl/DefaultEngineConnKillService.java |  7 +++
 .../impl/AbstractEngineConnLaunchService.scala     | 13 +++-
 .../callback/hook/CallbackEngineConnHook.scala     |  6 +-
 .../callback/service/EngineConnPidCallback.scala   | 17 +++++-
 .../am/manager/DefaultEngineNodeManager.java       | 11 ++++
 .../manager/am/manager/EngineNodeManager.java      |  2 +
 .../service/engine/DefaultEngineCreateService.java | 12 +++-
 .../impl/DefaultEngineConnPidCallbackService.java  | 17 +++++-
 .../manager/label/service/NodeLabelService.java    |  3 +
 .../service/impl/DefaultNodeLabelService.java      | 38 ++++++++++++
 .../manager/label/constant/LabelKeyConstant.java   |  2 +
 .../manager/label/constant/LabelValueConstant.java |  2 +
 .../entity/engine/EngingeConnRuntimeModeLabel.java | 71 ++++++++++++++++++++++
 .../linkis/manager/label/utils/LabelUtil.scala     |  5 ++
 .../linkis/manager/common/constant/AMConstant.java |  2 +
 .../linkis/manager/dao/NodeManagerMapper.java      |  2 +
 .../persistence/NodeManagerPersistence.java        |  2 +
 .../impl/DefaultNodeManagerPersistence.java        | 22 +++++++
 .../resources/mapper/common/NodeManagerMapper.xml  | 38 +++++++-----
 .../spark/errorcode/SparkErrorCodeSummary.java     |  4 ++
 .../spark/config/SparkConfiguration.scala          |  7 +++
 .../spark/executor/SparkEngineConnExecutor.scala   | 13 +++-
 .../spark/factory/SparkEngineConnFactory.scala     | 39 ++++++++----
 ...SparkSubmitProcessEngineConnLaunchBuilder.scala | 58 +++++++++++++++---
 25 files changed, 350 insertions(+), 46 deletions(-)

diff --git a/docs/configuration/spark.md b/docs/configuration/spark.md
index f0a4723c7..f40c76b43 100644
--- a/docs/configuration/spark.md
+++ b/docs/configuration/spark.md
@@ -3,6 +3,7 @@
 
 | Module Name (Service Name) | Parameter Name | Default Value | Description 
|Used|
 | -------- | -------- | ----- |----- |  -----   |
+|spark|linkis.spark.yarn.cluster.jars|hdfs:///spark/cluster|spark.yarn.cluster.jars|
 |spark|linkis.spark.etl.support.hudi|false|spark.etl.support.hudi|
 
|spark|linkis.bgservice.store.prefix|hdfs:///tmp/bdp-ide/|bgservice.store.prefix|
 |spark|linkis.bgservice.store.suffix|  |bgservice.store.suffix|
@@ -27,6 +28,8 @@
 |spark|wds.linkis.spark.engineconn.fatal.log|error writing 
class;OutOfMemoryError|spark.engineconn.fatal.log|
 |spark|wds.linkis.spark.engine.scala.replace_package_header.enable| true 
|spark.engine.scala.replace_package_header.enable|
 
+Use spark yarn cluster mode,need to set label "engingeConnRuntimeMode": 
"yarnCluster",and need to upload the dependence of the spark to 
'linkis.spark.yarn.cluster.jar'(the default value is 'hdfs:///spark/cluster')
+spark dependencies include jars and configuration files,For example: 
'/appcom/Install/linkis/lib/linkis-engineconn-plugins/spark/dist/3.2.1/lib/*.jar','/appcom/Install/linkis/conf/*''
 
 The spark-excel package may cause class conflicts,need to download 
separately,put it in spark lib
 wget 
https://repo1.maven.org/maven2/com/crealytics/spark-excel-2.12.17-3.2.2_2.12/3.2.2_0.18.1/spark-excel-2.12.17-3.2.2_2.12-3.2.2_0.18.1.jar
diff --git 
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/java/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java
 
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/java/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java
index 440208cd6..a6a932a57 100644
--- 
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/java/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java
+++ 
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/java/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java
@@ -94,6 +94,13 @@ public class DefaultEngineConnKillService implements 
EngineConnKillService {
       killYarnAppIdOfOneEc(engineStopRequest);
     }
 
+    if 
(AMConstant.CLUSTER_PROCESS_MARK.equals(engineStopRequest.getIdentifierType())
+        && engineStopRequest.getIdentifier() != null) {
+      List<String> appIds = new ArrayList<>();
+      appIds.add(engineStopRequest.getIdentifier());
+      GovernanceUtils.killYarnJobApp(appIds);
+    }
+
     if (!response.getStopStatus()) {
       EngineSuicideRequest request =
           new EngineSuicideRequest(
diff --git 
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala
 
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala
index f088f9fcd..390822df0 100644
--- 
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala
+++ 
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala
@@ -38,6 +38,7 @@ import org.apache.linkis.manager.common.protocol.engine.{
   EngineStopRequest
 }
 import 
org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnLaunchRequest
+import org.apache.linkis.manager.label.constant.LabelValueConstant
 import org.apache.linkis.manager.label.utils.LabelUtil
 import org.apache.linkis.rpc.Sender
 
@@ -146,11 +147,21 @@ abstract class AbstractEngineConnLaunchService extends 
EngineConnLaunchService w
       throw t
     }
     LoggerUtils.removeJobIdMDC()
+
+    val label = LabelUtil.getEngingeConnRuntimeModeLabel(request.labels)
+    val isYarnClusterMode: Boolean =
+      if (null != label && 
label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) true
+      else false
+
     val engineNode = new AMEngineNode()
     engineNode.setLabels(conn.getLabels)
     engineNode.setServiceInstance(conn.getServiceInstance)
     engineNode.setOwner(request.user)
-    engineNode.setMark(AMConstant.PROCESS_MARK)
+    if (isYarnClusterMode) {
+      engineNode.setMark(AMConstant.CLUSTER_PROCESS_MARK)
+    } else {
+      engineNode.setMark(AMConstant.PROCESS_MARK)
+    }
     engineNode
   }
 
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala
index 1f4c5cec7..adcbb1a69 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala
@@ -23,7 +23,7 @@ import org.apache.linkis.common.utils.{Logging, Utils}
 import 
org.apache.linkis.engineconn.acessible.executor.entity.AccessibleExecutor
 import org.apache.linkis.engineconn.callback.service.{
   EngineConnAfterStartCallback,
-  EngineConnPidCallback
+  EngineConnIdentifierCallback
 }
 import org.apache.linkis.engineconn.common.conf.EngineConnConf
 import org.apache.linkis.engineconn.common.creation.EngineCreationContext
@@ -61,8 +61,8 @@ class CallbackEngineConnHook extends EngineConnHook with 
Logging {
     newMap.put("spring.mvc.servlet.path", 
ServerConfiguration.BDP_SERVER_RESTFUL_URI.getValue)
     
DataWorkCloudApplication.main(DWCArgumentsParser.formatSpringOptions(newMap.toMap))
 
-    val engineConnPidCallBack = new EngineConnPidCallback()
-    Utils.tryAndError(engineConnPidCallBack.callback())
+    val engineConnIdentifierCallback = new EngineConnIdentifierCallback()
+    Utils.tryAndError(engineConnIdentifierCallback.callback())
     logger.info("<--------------------SpringBoot App init 
succeed-------------------->")
   }
 
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/service/EngineConnPidCallback.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/service/EngineConnPidCallback.scala
index f0995c0b9..71f71f199 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/service/EngineConnPidCallback.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/service/EngineConnPidCallback.scala
@@ -18,18 +18,29 @@
 package org.apache.linkis.engineconn.callback.service
 
 import org.apache.linkis.engineconn.core.EngineConnObject
+import org.apache.linkis.engineconn.core.executor.ExecutorManager
+import org.apache.linkis.engineconn.executor.entity.YarnExecutor
 import org.apache.linkis.governance.common.protocol.task.ResponseEngineConnPid
+import org.apache.linkis.manager.label.constant.LabelValueConstant
+import org.apache.linkis.manager.label.utils.LabelUtil
 import org.apache.linkis.rpc.Sender
 
 import java.lang.management.ManagementFactory
 
-class EngineConnPidCallback extends AbstractEngineConnStartUpCallback {
+class EngineConnIdentifierCallback extends AbstractEngineConnStartUpCallback {
 
   override def callback(): Unit = {
-    val pid = ManagementFactory.getRuntimeMXBean.getName.split("@")(0)
+    var identifier = ManagementFactory.getRuntimeMXBean.getName.split("@")(0)
     val instance = Sender.getThisServiceInstance
     val context = EngineConnObject.getEngineCreationContext
-    callback(ResponseEngineConnPid(instance, pid, context.getTicketId))
+
+    val label = LabelUtil.getEngingeConnRuntimeModeLabel(context.getLabels())
+    if (null != label && 
label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) {
+      identifier = ExecutorManager.getInstance.getReportExecutor match {
+        case cluster: YarnExecutor => cluster.getApplicationId
+      }
+    }
+    callback(ResponseEngineConnPid(instance, identifier, context.getTicketId))
   }
 
 }
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/DefaultEngineNodeManager.java
 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/DefaultEngineNodeManager.java
index b8b38eae3..14d548ef7 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/DefaultEngineNodeManager.java
+++ 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/DefaultEngineNodeManager.java
@@ -127,6 +127,17 @@ public class DefaultEngineNodeManager implements 
EngineNodeManager {
     return dbEngineNode;
   }
 
+  @Override
+  public EngineNode getEngineNodeInfoByTicketId(String ticketId) {
+    EngineNode dbEngineNode = 
nodeManagerPersistence.getEngineNodeByTicketId(ticketId);
+    if (null == dbEngineNode) {
+      throw new LinkisRetryException(AMConstant.ENGINE_ERROR_CODE, ticketId + 
" not exists in db");
+    }
+    metricsConverter.fillMetricsToNode(
+        dbEngineNode, 
nodeMetricManagerPersistence.getNodeMetrics(dbEngineNode));
+    return dbEngineNode;
+  }
+
   @Override
   public void updateEngineStatus(
       ServiceInstance serviceInstance, NodeStatus fromState, NodeStatus 
toState) {}
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/EngineNodeManager.java
 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/EngineNodeManager.java
index 252d97c0b..ce79d79c7 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/EngineNodeManager.java
+++ 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/EngineNodeManager.java
@@ -38,6 +38,8 @@ public interface EngineNodeManager {
 
   EngineNode getEngineNodeInfoByDB(EngineNode engineNode);
 
+  EngineNode getEngineNodeInfoByTicketId(String ticketId);
+
   /**
    * Get detailed engine information from the persistence
    *
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.java
 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.java
index e8c58e382..6f35edc3a 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.java
+++ 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.java
@@ -337,10 +337,20 @@ public class DefaultEngineCreateService extends 
AbstractEngineService
   }
 
   private boolean ensuresIdle(EngineNode engineNode, String resourceTicketId) {
-    EngineNode engineNodeInfo = 
getEngineNodeManager().getEngineNodeInfoByDB(engineNode);
+    EngineNode engineNodeInfo;
+    if (engineNode.getMark().equals(AMConstant.CLUSTER_PROCESS_MARK)) {
+      engineNodeInfo = 
getEngineNodeManager().getEngineNodeInfoByTicketId(resourceTicketId);
+    } else {
+      engineNodeInfo = 
getEngineNodeManager().getEngineNodeInfoByDB(engineNode);
+    }
     if (null == engineNodeInfo) {
       return false;
     }
+
+    if (engineNodeInfo.getServiceInstance() != null) {
+      engineNode.setServiceInstance(engineNodeInfo.getServiceInstance());
+    }
+
     if (NodeStatus.isCompleted(engineNodeInfo.getNodeStatus())) {
       NodeMetrics metrics = 
nodeMetricManagerPersistence.getNodeMetrics(engineNodeInfo);
       Pair<String, Optional<Boolean>> errorInfo = 
getStartErrorInfo(metrics.getHeartBeatMsg());
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java
 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java
index 3d199fe29..4acfb70f9 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java
+++ 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java
@@ -17,10 +17,14 @@
 
 package org.apache.linkis.manager.am.service.impl;
 
+import org.apache.linkis.common.ServiceInstance;
 import org.apache.linkis.governance.common.protocol.task.ResponseEngineConnPid;
 import org.apache.linkis.manager.am.manager.DefaultEngineNodeManager;
 import org.apache.linkis.manager.am.service.EngineConnPidCallbackService;
+import org.apache.linkis.manager.am.service.engine.AbstractEngineService;
+import org.apache.linkis.manager.common.constant.AMConstant;
 import org.apache.linkis.manager.common.entity.node.EngineNode;
+import org.apache.linkis.manager.label.service.NodeLabelService;
 import org.apache.linkis.rpc.message.annotation.Receiver;
 
 import org.springframework.beans.factory.annotation.Autowired;
@@ -30,12 +34,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @Service
-public class DefaultEngineConnPidCallbackService implements 
EngineConnPidCallbackService {
+public class DefaultEngineConnPidCallbackService extends AbstractEngineService
+    implements EngineConnPidCallbackService {
   private static final Logger logger =
       LoggerFactory.getLogger(DefaultEngineConnPidCallbackService.class);
 
   @Autowired private DefaultEngineNodeManager defaultEngineNodeManager;
 
+  @Autowired private NodeLabelService nodeLabelService;
+
   @Receiver
   @Override
   public void dealPid(ResponseEngineConnPid protocol) {
@@ -56,6 +63,14 @@ public class DefaultEngineConnPidCallbackService implements 
EngineConnPidCallbac
     }
 
     engineNode.setIdentifier(protocol.pid());
+
+    if (engineNode.getMark().equals(AMConstant.CLUSTER_PROCESS_MARK)) {
+      ServiceInstance serviceInstance = protocol.serviceInstance();
+      engineNode.setServiceInstance(serviceInstance);
+      getEngineNodeManager().updateEngineNode(serviceInstance, engineNode);
+      nodeLabelService.labelsFromInstanceToNewInstance(
+          engineNode.getServiceInstance(), serviceInstance);
+    }
     defaultEngineNodeManager.updateEngine(engineNode);
   }
 }
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/NodeLabelService.java
 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/NodeLabelService.java
index a5bfcab1c..4dc1976c3 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/NodeLabelService.java
+++ 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/NodeLabelService.java
@@ -47,6 +47,9 @@ public interface NodeLabelService {
 
   void updateLabelsToNode(ServiceInstance instance, List<Label<?>> labels);
 
+  void labelsFromInstanceToNewInstance(
+      ServiceInstance oldServiceInstance, ServiceInstance newServiceInstance);
+
   /**
    * Remove the labels related by node instance
    *
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.java
 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.java
index 4da2bebc6..8529b6d20 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.java
+++ 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.java
@@ -196,6 +196,44 @@ public class DefaultNodeLabelService implements 
NodeLabelService {
     }
   }
 
+  @Override
+  public void labelsFromInstanceToNewInstance(
+      ServiceInstance oldServiceInstance, ServiceInstance newServiceInstance) {
+    List<PersistenceLabel> labels =
+        labelManagerPersistence.getLabelByServiceInstance(newServiceInstance);
+    List<String> newKeyList = 
labels.stream().map(Label::getLabelKey).collect(Collectors.toList());
+    List<PersistenceLabel> nodeLabels =
+        labelManagerPersistence.getLabelByServiceInstance(oldServiceInstance);
+
+    List<String> oldKeyList =
+        
nodeLabels.stream().map(InheritableLabel::getLabelKey).collect(Collectors.toList());
+
+    List<String> willBeAdd = new ArrayList<>(oldKeyList);
+    willBeAdd.removeAll(newKeyList);
+
+    // Assign the old association to the newServiceInstance
+    if (!CollectionUtils.isEmpty(willBeAdd)) {
+      nodeLabels.forEach(
+          nodeLabel -> {
+            if (willBeAdd.contains(nodeLabel.getLabelKey())) {
+              PersistenceLabel persistenceLabel =
+                  LabelManagerUtils.convertPersistenceLabel(nodeLabel);
+              int labelId = tryToAddLabel(persistenceLabel);
+              if (labelId > 0) {
+                List<Integer> labelIds = new ArrayList<>();
+                labelIds.add(labelId);
+                labelManagerPersistence.addLabelToNode(newServiceInstance, 
labelIds);
+              }
+            }
+          });
+    }
+
+    // Delete an old association
+    List<Integer> oldLabelId =
+        
nodeLabels.stream().map(PersistenceLabel::getId).collect(Collectors.toList());
+    labelManagerPersistence.removeNodeLabels(oldServiceInstance, oldLabelId);
+  }
+
   /**
    * Remove the labels related by node instance
    *
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java
 
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java
index 362932083..8021b3585 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java
+++ 
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java
@@ -64,5 +64,7 @@ public class LabelKeyConstant {
 
   public static final String FIXED_EC_KEY = "fixedEngineConn";
 
+  public static final String ENGINGE_CONN_RUNTIME_MODE_KEY = 
"engingeConnRuntimeMode";
+
   public static final String MANAGER_KEY = "manager";
 }
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelValueConstant.java
 
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelValueConstant.java
index cc62921c8..35c0d06e2 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelValueConstant.java
+++ 
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelValueConstant.java
@@ -20,4 +20,6 @@ package org.apache.linkis.manager.label.constant;
 public class LabelValueConstant {
 
   public static final String OFFLINE_VALUE = "offline";
+
+  public static final String YARN_CLUSTER_VALUE = "yarnCluster";
 }
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngingeConnRuntimeModeLabel.java
 
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngingeConnRuntimeModeLabel.java
new file mode 100644
index 000000000..7460f5589
--- /dev/null
+++ 
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngingeConnRuntimeModeLabel.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.label.entity.engine;
+
+import org.apache.linkis.manager.label.constant.LabelKeyConstant;
+import org.apache.linkis.manager.label.entity.*;
+import org.apache.linkis.manager.label.entity.annon.ValueSerialNum;
+import org.apache.linkis.manager.label.exception.LabelErrorException;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.HashMap;
+
+import static 
org.apache.linkis.manager.label.errorcode.LabelCommonErrorCodeSummary.CHECK_LABEL_VALUE_EMPTY;
+import static 
org.apache.linkis.manager.label.errorcode.LabelCommonErrorCodeSummary.LABEL_ERROR_CODE;
+
+public class EngingeConnRuntimeModeLabel extends GenericLabel
+    implements EngineNodeLabel, UserModifiable {
+
+  public EngingeConnRuntimeModeLabel() {
+    setLabelKey(LabelKeyConstant.ENGINGE_CONN_RUNTIME_MODE_KEY);
+  }
+
+  @ValueSerialNum(0)
+  public void setModeValue(String modeValue) {
+    if (getValue() == null) {
+      setValue(new HashMap<>());
+    }
+    getValue().put("modeValue", modeValue);
+  }
+
+  public String getModeValue() {
+    if (getValue() == null) {
+      return null;
+    }
+    return getValue().get("modeValue");
+  }
+
+  @Override
+  public Feature getFeature() {
+    return Feature.CORE;
+  }
+
+  @Override
+  public void valueCheck(String stringValue) throws LabelErrorException {
+    if (!StringUtils.isBlank(stringValue)) {
+      if (stringValue.split(SerializableLabel.VALUE_SEPARATOR).length != 1) {
+        throw new LabelErrorException(
+            LABEL_ERROR_CODE.getErrorCode(), LABEL_ERROR_CODE.getErrorDesc());
+      }
+    } else {
+      throw new LabelErrorException(
+          CHECK_LABEL_VALUE_EMPTY.getErrorCode(), 
CHECK_LABEL_VALUE_EMPTY.getErrorDesc());
+    }
+  }
+}
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala
 
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala
index b4a66d2f4..3965a5ea1 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala
+++ 
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala
@@ -22,6 +22,7 @@ import org.apache.linkis.manager.label.entity.engine.{
   CodeLanguageLabel,
   EngineConnModeLabel,
   EngineTypeLabel,
+  EngingeConnRuntimeModeLabel,
   UserCreatorLabel
 }
 import org.apache.linkis.manager.label.entity.entrance.{
@@ -80,6 +81,10 @@ object LabelUtil {
     getLabelFromList[CodeLanguageLabel](labels)
   }
 
+  def getEngingeConnRuntimeModeLabel(labels: util.List[Label[_]]): 
EngingeConnRuntimeModeLabel = {
+    getLabelFromList[EngingeConnRuntimeModeLabel](labels)
+  }
+
   def getEngineConnModeLabel(labels: util.List[Label[_]]): EngineConnModeLabel 
= {
     getLabelFromList[EngineConnModeLabel](labels)
   }
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/constant/AMConstant.java
 
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/constant/AMConstant.java
index 09d802a95..c80357035 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/constant/AMConstant.java
+++ 
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/constant/AMConstant.java
@@ -29,6 +29,8 @@ public class AMConstant {
 
   public static final String PROCESS_MARK = "process";
 
+  public static final String CLUSTER_PROCESS_MARK = "cluster_process";
+
   public static final String THREAD_MARK = "thread";
 
   public static final String START_REASON = "start_reason";
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/NodeManagerMapper.java
 
b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/NodeManagerMapper.java
index 4e9546944..6f11c910e 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/NodeManagerMapper.java
+++ 
b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/NodeManagerMapper.java
@@ -49,6 +49,8 @@ public interface NodeManagerMapper {
 
   PersistenceNode getNodeInstance(@Param("instance") String instance);
 
+  PersistenceNode getNodeInstanceByTicketId(@Param("ticketId") String 
ticketId);
+
   PersistenceNode getNodeInstanceById(@Param("id") int id);
 
   PersistenceNode getEMNodeInstanceByEngineNode(@Param("instance") String 
instance);
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/NodeManagerPersistence.java
 
b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/NodeManagerPersistence.java
index bb95c8cf7..b83c82fd3 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/NodeManagerPersistence.java
+++ 
b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/NodeManagerPersistence.java
@@ -105,6 +105,8 @@ public interface NodeManagerPersistence {
    */
   EngineNode getEngineNode(ServiceInstance serviceInstance);
 
+  EngineNode getEngineNodeByTicketId(String ticketId);
+
   /**
    * 通过Em的ServiceInstance 获取EM下面Engine的列表
    *
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java
 
b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java
index 4a1697333..14db7252f 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java
+++ 
b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java
@@ -274,6 +274,28 @@ public class DefaultNodeManagerPersistence implements 
NodeManagerPersistence {
     return amEngineNode;
   }
 
+  @Override
+  public EngineNode getEngineNodeByTicketId(String ticketId) {
+    AMEngineNode amEngineNode = new AMEngineNode();
+    PersistenceNode engineNode = 
nodeManagerMapper.getNodeInstanceByTicketId(ticketId);
+
+    if (null == engineNode) {
+      return null;
+    }
+
+    ServiceInstance serviceInstance = new ServiceInstance();
+    serviceInstance.setInstance(engineNode.getInstance());
+    serviceInstance.setApplicationName(engineNode.getName());
+    amEngineNode.setServiceInstance(serviceInstance);
+
+    amEngineNode.setOwner(engineNode.getOwner());
+    amEngineNode.setMark(engineNode.getMark());
+    amEngineNode.setIdentifier(engineNode.getIdentifier());
+    amEngineNode.setTicketId(engineNode.getTicketId());
+    amEngineNode.setStartTime(engineNode.getCreateTime());
+    return amEngineNode;
+  }
+
   @Override
   public List<EngineNode> getEngineNodeByEM(ServiceInstance serviceInstance) {
     // serviceinstance for a given EM(给定EM的 serviceinstance)
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/resources/mapper/common/NodeManagerMapper.xml
 
b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/resources/mapper/common/NodeManagerMapper.xml
index b470daead..935ceb4f8 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/resources/mapper/common/NodeManagerMapper.xml
+++ 
b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/resources/mapper/common/NodeManagerMapper.xml
@@ -6,9 +6,9 @@
   ~ The ASF licenses this file to You under the Apache License, Version 2.0
   ~ (the "License"); you may not use this file except in compliance with
   ~ the License.  You may obtain a copy of the License at
-  ~ 
+  ~
   ~   http://www.apache.org/licenses/LICENSE-2.0
-  ~ 
+  ~
   ~ Unless required by applicable law or agreed to in writing, software
   ~ distributed under the License is distributed on an "AS IS" BASIS,
   ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -34,9 +34,9 @@
     <insert id="addNodeInstance" useGeneratedKeys="true" keyColumn="id" 
keyProperty="id"
             
parameterType="org.apache.linkis.manager.common.entity.persistence.PersistenceNode">
         INSERT INTO linkis_cg_manager_service_instance (instance, name, owner, 
mark, ticketId, update_time
-        , create_time, updator, creator)
+                                                       , create_time, updator, 
creator)
         VALUES (#{instance}, #{name}, #{owner}, #{mark}, #{ticketId}, 
#{updateTime}
-        , #{createTime}, #{updator}, #{creator})
+               , #{createTime}, #{updator}, #{creator})
     </insert>
 
     <update id="updateNodeInstance">
@@ -119,9 +119,9 @@
 
     <select id="getNodeInstanceIds" resultType="java.lang.Integer">
         SELECT id FROM linkis_cg_manager_service_instance WHERE instance IN (
-            <foreach collection='instances' separator=',' item='instance'>
-                #{instance}
-            </foreach> )
+        <foreach collection='instances' separator=',' item='instance'>
+            #{instance}
+        </foreach> )
     </select>
 
     <select id="getNodeInstance" 
resultType="org.apache.linkis.manager.common.entity.persistence.PersistenceNode">
@@ -130,6 +130,12 @@
         WHERE instance = #{instance}
     </select>
 
+    <select id="getNodeInstanceByTicketId" 
resultType="org.apache.linkis.manager.common.entity.persistence.PersistenceNode">
+        SELECT *
+        FROM linkis_cg_manager_service_instance
+        WHERE ticketId = #{ticketId}
+    </select>
+
     <select id="getNodeInstanceById" 
resultType="org.apache.linkis.manager.common.entity.persistence.PersistenceNode">
         SELECT *
         FROM linkis_cg_manager_service_instance
@@ -140,9 +146,9 @@
         SELECT *
         FROM linkis_cg_manager_service_instance
         WHERE instance IN (
-        SELECT em_instance
-        FROM linkis_cg_manager_engine_em
-        WHERE engine_instance = #{instance}
+            SELECT em_instance
+            FROM linkis_cg_manager_engine_em
+            WHERE engine_instance = #{instance}
         )
     </select>
 
@@ -150,17 +156,17 @@
         SELECT *
         FROM linkis_cg_manager_service_instance
         WHERE instance IN (
-        SELECT engine_instance
-        FROM linkis_cg_manager_engine_em
-        WHERE em_instance = #{instance}
+            SELECT engine_instance
+            FROM linkis_cg_manager_engine_em
+            WHERE em_instance = #{instance}
         )
     </select>
 
     <select id="getNodesByInstances" 
resultType="org.apache.linkis.manager.common.entity.persistence.PersistenceNode">
         SELECT * FROM linkis_cg_manager_service_instance WHERE instance IN(
-            <foreach collection='instances' separator=',' item='instance'>
-               #{instance}
-            </foreach>)
+        <foreach collection='instances' separator=',' item='instance'>
+            #{instance}
+        </foreach>)
     </select>
 
     <insert id="addEngineNode">
diff --git 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/errorcode/SparkErrorCodeSummary.java
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/errorcode/SparkErrorCodeSummary.java
index 936e773e4..42f0b66e4 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/errorcode/SparkErrorCodeSummary.java
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/errorcode/SparkErrorCodeSummary.java
@@ -66,6 +66,10 @@ public enum SparkErrorCodeSummary implements LinkisErrorCode 
{
       43032, "The application start failed, since yarn applicationId is 
null."),
 
   NOT_SUPPORT_METHOD(43040, "Not support method for requestExpectedResource."),
+
+  LINKIS_SPARK_YARN_CLUSTER_JARS_ERROR(
+      43042,
+      "linkis.spark.yarn.cluster.jars parameters configuration 
errors(linkis.spark.yarn.cluster.jars 参数配置错误)."),
   ;
 
   /** (errorCode)错误码 */
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
index ccc21a776..b42fc0934 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
@@ -30,6 +30,10 @@ object SparkConfiguration extends Logging {
   val SPARK_HOME_ENV = "SPARK_HOME"
   val SPARK_CONF_DIR_ENV = "SPARK_CONF_DIR"
 
+  val SPARK_YARN_CLIENT = "client"
+
+  val SPARK_YARN_CLUSTER = "cluster"
+
   val PROCESS_MAX_THREADS = 
CommonVars[Int]("wds.linkis.process.threadpool.max", 100)
 
   val SPARK_SESSION_HOOK = 
CommonVars[String]("wds.linkis.engine.spark.session.hook", "")
@@ -46,6 +50,9 @@ object SparkConfiguration extends Logging {
 
   val SPARK_DEPLOY_MODE = CommonVars[String]("spark.submit.deployMode", 
"client")
 
+  val SPARK_YARN_CLUSTER_JARS =
+    CommonVars[String]("linkis.spark.yarn.cluster.jars", 
"hdfs:///spark/cluster")
+
   val SPARK_APP_NAME = CommonVars[String]("spark.app.name", 
"Linkis-EngineConn-Spark")
   val SPARK_APP_RESOURCE = CommonVars[String]("spark.app.resource", "")
   val SPARK_APP_CONF = CommonVars[String]("spark.extconf", "")
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
index 2264db61f..8d97e8152 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
@@ -25,7 +25,7 @@ import 
org.apache.linkis.engineconn.computation.executor.execute.{
 }
 import org.apache.linkis.engineconn.computation.executor.utlis.ProgressUtils
 import org.apache.linkis.engineconn.core.exception.ExecutorHookFatalException
-import org.apache.linkis.engineconn.executor.entity.ResourceFetchExecutor
+import org.apache.linkis.engineconn.executor.entity.{ResourceFetchExecutor, 
YarnExecutor}
 import org.apache.linkis.engineplugin.spark.common.{Kind, SparkDataCalc}
 import org.apache.linkis.engineplugin.spark.cs.CSSparkHelper
 import org.apache.linkis.engineplugin.spark.extension.{
@@ -56,6 +56,7 @@ import scala.collection.mutable.ArrayBuffer
 abstract class SparkEngineConnExecutor(val sc: SparkContext, id: Long)
     extends ComputationExecutor
     with Logging
+    with YarnExecutor
     with ResourceFetchExecutor {
 
   private var initialized: Boolean = false
@@ -70,9 +71,17 @@ abstract class SparkEngineConnExecutor(val sc: SparkContext, 
id: Long)
 
   private var thread: Thread = _
 
+  private var applicationId: String = sc.applicationId
+
+  override def getApplicationId: String = applicationId
+
+  override def getApplicationURL: String = ""
+  override def getYarnMode: String = ""
+  override def getQueue: String = ""
+
   override def init(): Unit = {
     logger.info(s"Ready to change engine state!")
-//    setCodeParser()  // todo check
+    //    setCodeParser()  // todo check
     super.init()
   }
 
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
index 5bf90c6bf..bc18e2bad 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
@@ -39,8 +39,10 @@ import 
org.apache.linkis.manager.engineplugin.common.creation.{
 }
 import org.apache.linkis.manager.engineplugin.common.launch.process.Environment
 import 
org.apache.linkis.manager.engineplugin.common.launch.process.Environment.variable
+import org.apache.linkis.manager.label.constant.LabelValueConstant
 import org.apache.linkis.manager.label.entity.engine.EngineType
 import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType
+import org.apache.linkis.manager.label.utils.LabelUtil
 import org.apache.linkis.server.JMap
 
 import org.apache.commons.lang3.StringUtils
@@ -144,19 +146,32 @@ class SparkEngineConnFactory extends 
MultiExecutorEngineConnFactory with Logging
     val master =
       sparkConf.getOption("spark.master").getOrElse(CommonVars("spark.master", 
"yarn").getValue)
     logger.info(s"------ Create new SparkContext {$master} -------")
-    val pysparkBasePath = SparkConfiguration.SPARK_HOME.getValue
-    val pysparkPath = new File(pysparkBasePath, "python" + File.separator + 
"lib")
-    var pythonLibUris = 
pysparkPath.listFiles().map(_.toURI.toString).filter(_.endsWith(".zip"))
-    if (pythonLibUris.length == 2) {
-      val sparkConfValue1 = 
Utils.tryQuietly(CommonVars("spark.yarn.dist.files", "").getValue)
-      val sparkConfValue2 = 
Utils.tryQuietly(sparkConf.get("spark.yarn.dist.files"))
-      if (StringUtils.isNotBlank(sparkConfValue2)) {
-        pythonLibUris = sparkConfValue2 +: pythonLibUris
-      }
-      if (StringUtils.isNotBlank(sparkConfValue1)) {
-        pythonLibUris = sparkConfValue1 +: pythonLibUris
+
+    val label = 
LabelUtil.getEngingeConnRuntimeModeLabel(engineCreationContext.getLabels())
+    val isYarnClusterMode: Boolean =
+      if (null != label && 
label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) true
+      else false
+
+    if (isYarnClusterMode) {
+      sparkConf.set("spark.submit.deployMode", "cluster")
+    }
+
+    // todo yarn cluster暂时不支持pyspark,后期对pyspark进行处理
+    if (!isYarnClusterMode) {
+      val pysparkBasePath = SparkConfiguration.SPARK_HOME.getValue
+      val pysparkPath = new File(pysparkBasePath, "python" + File.separator + 
"lib")
+      var pythonLibUris = 
pysparkPath.listFiles().map(_.toURI.toString).filter(_.endsWith(".zip"))
+      if (pythonLibUris.length == 2) {
+        val sparkConfValue1 = 
Utils.tryQuietly(CommonVars("spark.yarn.dist.files", "").getValue)
+        val sparkConfValue2 = 
Utils.tryQuietly(sparkConf.get("spark.yarn.dist.files"))
+        if (StringUtils.isNotBlank(sparkConfValue2)) {
+          pythonLibUris = sparkConfValue2 +: pythonLibUris
+        }
+        if (StringUtils.isNotBlank(sparkConfValue1)) {
+          pythonLibUris = sparkConfValue1 +: pythonLibUris
+        }
+        sparkConf.set("spark.yarn.dist.files", pythonLibUris.mkString(","))
       }
-      sparkConf.set("spark.yarn.dist.files", pythonLibUris.mkString(","))
     }
     // Distributes needed libraries to workers
     // when spark version is greater than or equal to 1.5.0
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala
index 8472cdfa9..2487ede90 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala
@@ -18,23 +18,30 @@
 package org.apache.linkis.engineplugin.spark.launch
 
 import org.apache.linkis.common.conf.CommonVars
+import org.apache.linkis.engineplugin.spark.config.SparkConfiguration
 import org.apache.linkis.engineplugin.spark.config.SparkConfiguration.{
   ENGINE_JAR,
   SPARK_APP_NAME,
   SPARK_DEFAULT_EXTERNAL_JARS_PATH,
+  SPARK_DEPLOY_MODE,
   SPARK_DRIVER_CLASSPATH,
   SPARK_DRIVER_EXTRA_JAVA_OPTIONS,
   SPARK_PYTHON_VERSION,
-  SPARK_SUBMIT_PATH
+  SPARK_SUBMIT_PATH,
+  SPARK_YARN_CLUSTER_JARS
 }
 import org.apache.linkis.engineplugin.spark.config.SparkResourceConfiguration._
+import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary
+import org.apache.linkis.engineplugin.spark.exception.SparkEngineException
 import org.apache.linkis.hadoop.common.conf.HadoopConf
 import org.apache.linkis.manager.common.entity.resource.DriverAndYarnResource
 import org.apache.linkis.manager.engineplugin.common.conf.EnvConfiguration
 import 
org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnBuildRequest
 import 
org.apache.linkis.manager.engineplugin.common.launch.process.Environment._
 import 
org.apache.linkis.manager.engineplugin.common.launch.process.JavaProcessEngineConnLaunchBuilder
+import org.apache.linkis.manager.label.constant.LabelValueConstant
 import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel
+import org.apache.linkis.manager.label.utils.LabelUtil
 import org.apache.linkis.protocol.UserWithCreator
 
 import org.apache.commons.lang3.StringUtils
@@ -61,7 +68,11 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder: 
JavaProcessEngineConnLa
     val executorMemory = getValueAndRemove(properties, 
LINKIS_SPARK_EXECUTOR_MEMORY)
     val numExecutors = getValueAndRemove(properties, 
LINKIS_SPARK_EXECUTOR_INSTANCES)
 
-    val files = getValueAndRemove(properties, "files", 
"").split(",").filter(isNotBlankPath)
+    val files: ArrayBuffer[String] = getValueAndRemove(properties, "files", "")
+      .split(",")
+      .filter(isNotBlankPath)
+      .toBuffer
+      .asInstanceOf[ArrayBuffer[String]]
     val jars = new ArrayBuffer[String]()
     jars ++= getValueAndRemove(properties, "jars", 
"").split(",").filter(isNotBlankPath)
     jars ++= getValueAndRemove(properties, SPARK_DEFAULT_EXTERNAL_JARS_PATH)
@@ -115,8 +126,34 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder: 
JavaProcessEngineConnLa
       memory
     }
 
+    var deployMode: String = SparkConfiguration.SPARK_YARN_CLIENT
+
+    val label = 
LabelUtil.getEngingeConnRuntimeModeLabel(engineConnBuildRequest.labels)
+    val isYarnClusterMode: Boolean =
+      if (null != label && 
label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) true
+      else false
+
+    if (isYarnClusterMode) {
+      deployMode = SparkConfiguration.SPARK_YARN_CLUSTER
+      files ++= Array(s"${variable(PWD)}/conf/linkis-engineconn.properties")
+
+      var clusterJars: String = getValueAndRemove(properties, 
SPARK_YARN_CLUSTER_JARS)
+
+      if (StringUtils.isBlank(clusterJars)) {
+        throw new SparkEngineException(
+          
SparkErrorCodeSummary.LINKIS_SPARK_YARN_CLUSTER_JARS_ERROR.getErrorCode,
+          
SparkErrorCodeSummary.LINKIS_SPARK_YARN_CLUSTER_JARS_ERROR.getErrorDesc
+        )
+      }
+
+      if (clusterJars.endsWith("/")) {
+        clusterJars = clusterJars.dropRight(1)
+      }
+      jars += s"$clusterJars/*"
+    }
+
     addOpt("--master", "yarn")
-    addOpt("--deploy-mode", "client")
+    addOpt("--deploy-mode", deployMode)
     addOpt("--name", appName)
     addProxyUser()
 
@@ -137,8 +174,9 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder: 
JavaProcessEngineConnLa
     addOpt("--num-executors", numExecutors.toString)
     addOpt("--queue", queue)
 
-    getConf(engineConnBuildRequest, gcLogDir, logDir).foreach { case (key, 
value) =>
-      addOpt("--conf", s"""$key="$value"""")
+    getConf(engineConnBuildRequest, gcLogDir, logDir, 
isYarnClusterMode).foreach {
+      case (key, value) =>
+        addOpt("--conf", s"""$key="$value"""")
     }
 
     addOpt("--class", className)
@@ -152,7 +190,8 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder: 
JavaProcessEngineConnLa
   def getConf(
       engineConnBuildRequest: EngineConnBuildRequest,
       gcLogDir: String,
-      logDir: String
+      logDir: String,
+      isYarnClusterMode: Boolean
   ): ArrayBuffer[(String, String)] = {
     val driverJavaSet = new StringBuilder(" -server")
     if 
(StringUtils.isNotEmpty(EnvConfiguration.ENGINE_CONN_DEFAULT_JAVA_OPTS.getValue))
 {
@@ -168,7 +207,11 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder: 
JavaProcessEngineConnLa
       .foreach(l => {
         driverJavaSet.append(" ").append(l)
       })
-    driverJavaSet.append(" -Djava.io.tmpdir=" + variable(TEMP_DIRS))
+    if (isYarnClusterMode) {
+      driverJavaSet.append(" -Djava.io.tmpdir=/tmp")
+    } else {
+      driverJavaSet.append(" -Djava.io.tmpdir=" + variable(TEMP_DIRS))
+    }
     if (EnvConfiguration.ENGINE_CONN_DEBUG_ENABLE.getValue) {
       driverJavaSet.append(
         s" 
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=${variable(RANDOM_PORT)}"
@@ -186,6 +229,7 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder: 
JavaProcessEngineConnLa
       val keyValue = iterator.next()
       if (
           !SPARK_PYTHON_VERSION.key.equals(keyValue.getKey) &&
+          !SPARK_DEPLOY_MODE.key.equals(keyValue.getKey) &&
           keyValue.getKey.startsWith("spark.") &&
           StringUtils.isNotBlank(keyValue.getValue)
       ) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to