This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/master by this push:
new a94c87653 spark support yarn cluster (#4850)
a94c87653 is described below
commit a94c87653f6ee4c0303876fb1349dbd24e0d614a
Author: ChengJie1053 <[email protected]>
AuthorDate: Mon Aug 14 19:11:01 2023 +0800
spark support yarn cluster (#4850)
* spark support yarn cluster
* spark support yarn cluster
* spark support yarn cluster
* spark support yarn cluster
* spark support yarn cluster
* LinkisManagerApplication Remove useless code
* spark support yarn cluster
* spark support yarn cluster
* spark support yarn cluster
* spark support yarn cluster
* spark support yarn cluster
* spark support yarn cluster
---
docs/configuration/spark.md | 3 +
.../service/impl/DefaultEngineConnKillService.java | 7 +++
.../impl/AbstractEngineConnLaunchService.scala | 13 +++-
.../callback/hook/CallbackEngineConnHook.scala | 6 +-
.../callback/service/EngineConnPidCallback.scala | 17 +++++-
.../am/manager/DefaultEngineNodeManager.java | 11 ++++
.../manager/am/manager/EngineNodeManager.java | 2 +
.../service/engine/DefaultEngineCreateService.java | 12 +++-
.../impl/DefaultEngineConnPidCallbackService.java | 17 +++++-
.../manager/label/service/NodeLabelService.java | 3 +
.../service/impl/DefaultNodeLabelService.java | 38 ++++++++++++
.../manager/label/constant/LabelKeyConstant.java | 2 +
.../manager/label/constant/LabelValueConstant.java | 2 +
.../entity/engine/EngingeConnRuntimeModeLabel.java | 71 ++++++++++++++++++++++
.../linkis/manager/label/utils/LabelUtil.scala | 5 ++
.../linkis/manager/common/constant/AMConstant.java | 2 +
.../linkis/manager/dao/NodeManagerMapper.java | 2 +
.../persistence/NodeManagerPersistence.java | 2 +
.../impl/DefaultNodeManagerPersistence.java | 22 +++++++
.../resources/mapper/common/NodeManagerMapper.xml | 38 +++++++-----
.../spark/errorcode/SparkErrorCodeSummary.java | 4 ++
.../spark/config/SparkConfiguration.scala | 7 +++
.../spark/executor/SparkEngineConnExecutor.scala | 13 +++-
.../spark/factory/SparkEngineConnFactory.scala | 39 ++++++++----
...SparkSubmitProcessEngineConnLaunchBuilder.scala | 58 +++++++++++++++---
25 files changed, 350 insertions(+), 46 deletions(-)
diff --git a/docs/configuration/spark.md b/docs/configuration/spark.md
index f0a4723c7..f40c76b43 100644
--- a/docs/configuration/spark.md
+++ b/docs/configuration/spark.md
@@ -3,6 +3,7 @@
| Module Name (Service Name) | Parameter Name | Default Value | Description
|Used|
| -------- | -------- | ----- |----- | ----- |
+|spark|linkis.spark.yarn.cluster.jars|hdfs:///spark/cluster|spark.yarn.cluster.jars|
|spark|linkis.spark.etl.support.hudi|false|spark.etl.support.hudi|
|spark|linkis.bgservice.store.prefix|hdfs:///tmp/bdp-ide/|bgservice.store.prefix|
|spark|linkis.bgservice.store.suffix| |bgservice.store.suffix|
@@ -27,6 +28,8 @@
|spark|wds.linkis.spark.engineconn.fatal.log|error writing
class;OutOfMemoryError|spark.engineconn.fatal.log|
|spark|wds.linkis.spark.engine.scala.replace_package_header.enable| true
|spark.engine.scala.replace_package_header.enable|
+Use spark yarn cluster mode,need to set label "engingeConnRuntimeMode":
"yarnCluster",and need to upload the dependence of the spark to
'linkis.spark.yarn.cluster.jar'(the default value is 'hdfs:///spark/cluster')
+spark dependencies include jars and configuration files,For example:
'/appcom/Install/linkis/lib/linkis-engineconn-plugins/spark/dist/3.2.1/lib/*.jar','/appcom/Install/linkis/conf/*''
The spark-excel package may cause class conflicts,need to download
separately,put it in spark lib
wget
https://repo1.maven.org/maven2/com/crealytics/spark-excel-2.12.17-3.2.2_2.12/3.2.2_0.18.1/spark-excel-2.12.17-3.2.2_2.12-3.2.2_0.18.1.jar
diff --git
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/java/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/java/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java
index 440208cd6..a6a932a57 100644
---
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/java/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java
+++
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/java/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java
@@ -94,6 +94,13 @@ public class DefaultEngineConnKillService implements
EngineConnKillService {
killYarnAppIdOfOneEc(engineStopRequest);
}
+ if
(AMConstant.CLUSTER_PROCESS_MARK.equals(engineStopRequest.getIdentifierType())
+ && engineStopRequest.getIdentifier() != null) {
+ List<String> appIds = new ArrayList<>();
+ appIds.add(engineStopRequest.getIdentifier());
+ GovernanceUtils.killYarnJobApp(appIds);
+ }
+
if (!response.getStopStatus()) {
EngineSuicideRequest request =
new EngineSuicideRequest(
diff --git
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala
index f088f9fcd..390822df0 100644
---
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala
+++
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala
@@ -38,6 +38,7 @@ import org.apache.linkis.manager.common.protocol.engine.{
EngineStopRequest
}
import
org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnLaunchRequest
+import org.apache.linkis.manager.label.constant.LabelValueConstant
import org.apache.linkis.manager.label.utils.LabelUtil
import org.apache.linkis.rpc.Sender
@@ -146,11 +147,21 @@ abstract class AbstractEngineConnLaunchService extends
EngineConnLaunchService w
throw t
}
LoggerUtils.removeJobIdMDC()
+
+ val label = LabelUtil.getEngingeConnRuntimeModeLabel(request.labels)
+ val isYarnClusterMode: Boolean =
+ if (null != label &&
label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) true
+ else false
+
val engineNode = new AMEngineNode()
engineNode.setLabels(conn.getLabels)
engineNode.setServiceInstance(conn.getServiceInstance)
engineNode.setOwner(request.user)
- engineNode.setMark(AMConstant.PROCESS_MARK)
+ if (isYarnClusterMode) {
+ engineNode.setMark(AMConstant.CLUSTER_PROCESS_MARK)
+ } else {
+ engineNode.setMark(AMConstant.PROCESS_MARK)
+ }
engineNode
}
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala
index 1f4c5cec7..adcbb1a69 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala
@@ -23,7 +23,7 @@ import org.apache.linkis.common.utils.{Logging, Utils}
import
org.apache.linkis.engineconn.acessible.executor.entity.AccessibleExecutor
import org.apache.linkis.engineconn.callback.service.{
EngineConnAfterStartCallback,
- EngineConnPidCallback
+ EngineConnIdentifierCallback
}
import org.apache.linkis.engineconn.common.conf.EngineConnConf
import org.apache.linkis.engineconn.common.creation.EngineCreationContext
@@ -61,8 +61,8 @@ class CallbackEngineConnHook extends EngineConnHook with
Logging {
newMap.put("spring.mvc.servlet.path",
ServerConfiguration.BDP_SERVER_RESTFUL_URI.getValue)
DataWorkCloudApplication.main(DWCArgumentsParser.formatSpringOptions(newMap.toMap))
- val engineConnPidCallBack = new EngineConnPidCallback()
- Utils.tryAndError(engineConnPidCallBack.callback())
+ val engineConnIdentifierCallback = new EngineConnIdentifierCallback()
+ Utils.tryAndError(engineConnIdentifierCallback.callback())
logger.info("<--------------------SpringBoot App init
succeed-------------------->")
}
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/service/EngineConnPidCallback.scala
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/service/EngineConnPidCallback.scala
index f0995c0b9..71f71f199 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/service/EngineConnPidCallback.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/service/EngineConnPidCallback.scala
@@ -18,18 +18,29 @@
package org.apache.linkis.engineconn.callback.service
import org.apache.linkis.engineconn.core.EngineConnObject
+import org.apache.linkis.engineconn.core.executor.ExecutorManager
+import org.apache.linkis.engineconn.executor.entity.YarnExecutor
import org.apache.linkis.governance.common.protocol.task.ResponseEngineConnPid
+import org.apache.linkis.manager.label.constant.LabelValueConstant
+import org.apache.linkis.manager.label.utils.LabelUtil
import org.apache.linkis.rpc.Sender
import java.lang.management.ManagementFactory
-class EngineConnPidCallback extends AbstractEngineConnStartUpCallback {
+class EngineConnIdentifierCallback extends AbstractEngineConnStartUpCallback {
override def callback(): Unit = {
- val pid = ManagementFactory.getRuntimeMXBean.getName.split("@")(0)
+ var identifier = ManagementFactory.getRuntimeMXBean.getName.split("@")(0)
val instance = Sender.getThisServiceInstance
val context = EngineConnObject.getEngineCreationContext
- callback(ResponseEngineConnPid(instance, pid, context.getTicketId))
+
+ val label = LabelUtil.getEngingeConnRuntimeModeLabel(context.getLabels())
+ if (null != label &&
label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) {
+ identifier = ExecutorManager.getInstance.getReportExecutor match {
+ case cluster: YarnExecutor => cluster.getApplicationId
+ }
+ }
+ callback(ResponseEngineConnPid(instance, identifier, context.getTicketId))
}
}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/DefaultEngineNodeManager.java
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/DefaultEngineNodeManager.java
index b8b38eae3..14d548ef7 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/DefaultEngineNodeManager.java
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/DefaultEngineNodeManager.java
@@ -127,6 +127,17 @@ public class DefaultEngineNodeManager implements
EngineNodeManager {
return dbEngineNode;
}
+ @Override
+ public EngineNode getEngineNodeInfoByTicketId(String ticketId) {
+ EngineNode dbEngineNode =
nodeManagerPersistence.getEngineNodeByTicketId(ticketId);
+ if (null == dbEngineNode) {
+ throw new LinkisRetryException(AMConstant.ENGINE_ERROR_CODE, ticketId +
" not exists in db");
+ }
+ metricsConverter.fillMetricsToNode(
+ dbEngineNode,
nodeMetricManagerPersistence.getNodeMetrics(dbEngineNode));
+ return dbEngineNode;
+ }
+
@Override
public void updateEngineStatus(
ServiceInstance serviceInstance, NodeStatus fromState, NodeStatus
toState) {}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/EngineNodeManager.java
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/EngineNodeManager.java
index 252d97c0b..ce79d79c7 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/EngineNodeManager.java
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/EngineNodeManager.java
@@ -38,6 +38,8 @@ public interface EngineNodeManager {
EngineNode getEngineNodeInfoByDB(EngineNode engineNode);
+ EngineNode getEngineNodeInfoByTicketId(String ticketId);
+
/**
* Get detailed engine information from the persistence
*
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.java
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.java
index e8c58e382..6f35edc3a 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.java
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.java
@@ -337,10 +337,20 @@ public class DefaultEngineCreateService extends
AbstractEngineService
}
private boolean ensuresIdle(EngineNode engineNode, String resourceTicketId) {
- EngineNode engineNodeInfo =
getEngineNodeManager().getEngineNodeInfoByDB(engineNode);
+ EngineNode engineNodeInfo;
+ if (engineNode.getMark().equals(AMConstant.CLUSTER_PROCESS_MARK)) {
+ engineNodeInfo =
getEngineNodeManager().getEngineNodeInfoByTicketId(resourceTicketId);
+ } else {
+ engineNodeInfo =
getEngineNodeManager().getEngineNodeInfoByDB(engineNode);
+ }
if (null == engineNodeInfo) {
return false;
}
+
+ if (engineNodeInfo.getServiceInstance() != null) {
+ engineNode.setServiceInstance(engineNodeInfo.getServiceInstance());
+ }
+
if (NodeStatus.isCompleted(engineNodeInfo.getNodeStatus())) {
NodeMetrics metrics =
nodeMetricManagerPersistence.getNodeMetrics(engineNodeInfo);
Pair<String, Optional<Boolean>> errorInfo =
getStartErrorInfo(metrics.getHeartBeatMsg());
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java
index 3d199fe29..4acfb70f9 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java
@@ -17,10 +17,14 @@
package org.apache.linkis.manager.am.service.impl;
+import org.apache.linkis.common.ServiceInstance;
import org.apache.linkis.governance.common.protocol.task.ResponseEngineConnPid;
import org.apache.linkis.manager.am.manager.DefaultEngineNodeManager;
import org.apache.linkis.manager.am.service.EngineConnPidCallbackService;
+import org.apache.linkis.manager.am.service.engine.AbstractEngineService;
+import org.apache.linkis.manager.common.constant.AMConstant;
import org.apache.linkis.manager.common.entity.node.EngineNode;
+import org.apache.linkis.manager.label.service.NodeLabelService;
import org.apache.linkis.rpc.message.annotation.Receiver;
import org.springframework.beans.factory.annotation.Autowired;
@@ -30,12 +34,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Service
-public class DefaultEngineConnPidCallbackService implements
EngineConnPidCallbackService {
+public class DefaultEngineConnPidCallbackService extends AbstractEngineService
+ implements EngineConnPidCallbackService {
private static final Logger logger =
LoggerFactory.getLogger(DefaultEngineConnPidCallbackService.class);
@Autowired private DefaultEngineNodeManager defaultEngineNodeManager;
+ @Autowired private NodeLabelService nodeLabelService;
+
@Receiver
@Override
public void dealPid(ResponseEngineConnPid protocol) {
@@ -56,6 +63,14 @@ public class DefaultEngineConnPidCallbackService implements
EngineConnPidCallbac
}
engineNode.setIdentifier(protocol.pid());
+
+ if (engineNode.getMark().equals(AMConstant.CLUSTER_PROCESS_MARK)) {
+ ServiceInstance serviceInstance = protocol.serviceInstance();
+ engineNode.setServiceInstance(serviceInstance);
+ getEngineNodeManager().updateEngineNode(serviceInstance, engineNode);
+ nodeLabelService.labelsFromInstanceToNewInstance(
+ engineNode.getServiceInstance(), serviceInstance);
+ }
defaultEngineNodeManager.updateEngine(engineNode);
}
}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/NodeLabelService.java
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/NodeLabelService.java
index a5bfcab1c..4dc1976c3 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/NodeLabelService.java
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/NodeLabelService.java
@@ -47,6 +47,9 @@ public interface NodeLabelService {
void updateLabelsToNode(ServiceInstance instance, List<Label<?>> labels);
+ void labelsFromInstanceToNewInstance(
+ ServiceInstance oldServiceInstance, ServiceInstance newServiceInstance);
+
/**
* Remove the labels related by node instance
*
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.java
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.java
index 4da2bebc6..8529b6d20 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.java
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.java
@@ -196,6 +196,44 @@ public class DefaultNodeLabelService implements
NodeLabelService {
}
}
+ @Override
+ public void labelsFromInstanceToNewInstance(
+ ServiceInstance oldServiceInstance, ServiceInstance newServiceInstance) {
+ List<PersistenceLabel> labels =
+ labelManagerPersistence.getLabelByServiceInstance(newServiceInstance);
+ List<String> newKeyList =
labels.stream().map(Label::getLabelKey).collect(Collectors.toList());
+ List<PersistenceLabel> nodeLabels =
+ labelManagerPersistence.getLabelByServiceInstance(oldServiceInstance);
+
+ List<String> oldKeyList =
+
nodeLabels.stream().map(InheritableLabel::getLabelKey).collect(Collectors.toList());
+
+ List<String> willBeAdd = new ArrayList<>(oldKeyList);
+ willBeAdd.removeAll(newKeyList);
+
+ // Assign the old association to the newServiceInstance
+ if (!CollectionUtils.isEmpty(willBeAdd)) {
+ nodeLabels.forEach(
+ nodeLabel -> {
+ if (willBeAdd.contains(nodeLabel.getLabelKey())) {
+ PersistenceLabel persistenceLabel =
+ LabelManagerUtils.convertPersistenceLabel(nodeLabel);
+ int labelId = tryToAddLabel(persistenceLabel);
+ if (labelId > 0) {
+ List<Integer> labelIds = new ArrayList<>();
+ labelIds.add(labelId);
+ labelManagerPersistence.addLabelToNode(newServiceInstance,
labelIds);
+ }
+ }
+ });
+ }
+
+ // Delete an old association
+ List<Integer> oldLabelId =
+
nodeLabels.stream().map(PersistenceLabel::getId).collect(Collectors.toList());
+ labelManagerPersistence.removeNodeLabels(oldServiceInstance, oldLabelId);
+ }
+
/**
* Remove the labels related by node instance
*
diff --git
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java
index 362932083..8021b3585 100644
---
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java
+++
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java
@@ -64,5 +64,7 @@ public class LabelKeyConstant {
public static final String FIXED_EC_KEY = "fixedEngineConn";
+ public static final String ENGINGE_CONN_RUNTIME_MODE_KEY =
"engingeConnRuntimeMode";
+
public static final String MANAGER_KEY = "manager";
}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelValueConstant.java
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelValueConstant.java
index cc62921c8..35c0d06e2 100644
---
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelValueConstant.java
+++
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelValueConstant.java
@@ -20,4 +20,6 @@ package org.apache.linkis.manager.label.constant;
public class LabelValueConstant {
public static final String OFFLINE_VALUE = "offline";
+
+ public static final String YARN_CLUSTER_VALUE = "yarnCluster";
}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngingeConnRuntimeModeLabel.java
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngingeConnRuntimeModeLabel.java
new file mode 100644
index 000000000..7460f5589
--- /dev/null
+++
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngingeConnRuntimeModeLabel.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.label.entity.engine;
+
+import org.apache.linkis.manager.label.constant.LabelKeyConstant;
+import org.apache.linkis.manager.label.entity.*;
+import org.apache.linkis.manager.label.entity.annon.ValueSerialNum;
+import org.apache.linkis.manager.label.exception.LabelErrorException;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.HashMap;
+
+import static
org.apache.linkis.manager.label.errorcode.LabelCommonErrorCodeSummary.CHECK_LABEL_VALUE_EMPTY;
+import static
org.apache.linkis.manager.label.errorcode.LabelCommonErrorCodeSummary.LABEL_ERROR_CODE;
+
+public class EngingeConnRuntimeModeLabel extends GenericLabel
+ implements EngineNodeLabel, UserModifiable {
+
+ public EngingeConnRuntimeModeLabel() {
+ setLabelKey(LabelKeyConstant.ENGINGE_CONN_RUNTIME_MODE_KEY);
+ }
+
+ @ValueSerialNum(0)
+ public void setModeValue(String modeValue) {
+ if (getValue() == null) {
+ setValue(new HashMap<>());
+ }
+ getValue().put("modeValue", modeValue);
+ }
+
+ public String getModeValue() {
+ if (getValue() == null) {
+ return null;
+ }
+ return getValue().get("modeValue");
+ }
+
+ @Override
+ public Feature getFeature() {
+ return Feature.CORE;
+ }
+
+ @Override
+ public void valueCheck(String stringValue) throws LabelErrorException {
+ if (!StringUtils.isBlank(stringValue)) {
+ if (stringValue.split(SerializableLabel.VALUE_SEPARATOR).length != 1) {
+ throw new LabelErrorException(
+ LABEL_ERROR_CODE.getErrorCode(), LABEL_ERROR_CODE.getErrorDesc());
+ }
+ } else {
+ throw new LabelErrorException(
+ CHECK_LABEL_VALUE_EMPTY.getErrorCode(),
CHECK_LABEL_VALUE_EMPTY.getErrorDesc());
+ }
+ }
+}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala
index b4a66d2f4..3965a5ea1 100644
---
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala
+++
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala
@@ -22,6 +22,7 @@ import org.apache.linkis.manager.label.entity.engine.{
CodeLanguageLabel,
EngineConnModeLabel,
EngineTypeLabel,
+ EngingeConnRuntimeModeLabel,
UserCreatorLabel
}
import org.apache.linkis.manager.label.entity.entrance.{
@@ -80,6 +81,10 @@ object LabelUtil {
getLabelFromList[CodeLanguageLabel](labels)
}
+ def getEngingeConnRuntimeModeLabel(labels: util.List[Label[_]]):
EngingeConnRuntimeModeLabel = {
+ getLabelFromList[EngingeConnRuntimeModeLabel](labels)
+ }
+
def getEngineConnModeLabel(labels: util.List[Label[_]]): EngineConnModeLabel
= {
getLabelFromList[EngineConnModeLabel](labels)
}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/constant/AMConstant.java
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/constant/AMConstant.java
index 09d802a95..c80357035 100644
---
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/constant/AMConstant.java
+++
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/constant/AMConstant.java
@@ -29,6 +29,8 @@ public class AMConstant {
public static final String PROCESS_MARK = "process";
+ public static final String CLUSTER_PROCESS_MARK = "cluster_process";
+
public static final String THREAD_MARK = "thread";
public static final String START_REASON = "start_reason";
diff --git
a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/NodeManagerMapper.java
b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/NodeManagerMapper.java
index 4e9546944..6f11c910e 100644
---
a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/NodeManagerMapper.java
+++
b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/NodeManagerMapper.java
@@ -49,6 +49,8 @@ public interface NodeManagerMapper {
PersistenceNode getNodeInstance(@Param("instance") String instance);
+ PersistenceNode getNodeInstanceByTicketId(@Param("ticketId") String
ticketId);
+
PersistenceNode getNodeInstanceById(@Param("id") int id);
PersistenceNode getEMNodeInstanceByEngineNode(@Param("instance") String
instance);
diff --git
a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/NodeManagerPersistence.java
b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/NodeManagerPersistence.java
index bb95c8cf7..b83c82fd3 100644
---
a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/NodeManagerPersistence.java
+++
b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/NodeManagerPersistence.java
@@ -105,6 +105,8 @@ public interface NodeManagerPersistence {
*/
EngineNode getEngineNode(ServiceInstance serviceInstance);
+ EngineNode getEngineNodeByTicketId(String ticketId);
+
/**
* 通过Em的ServiceInstance 获取EM下面Engine的列表
*
diff --git
a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java
b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java
index 4a1697333..14db7252f 100644
---
a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java
+++
b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java
@@ -274,6 +274,28 @@ public class DefaultNodeManagerPersistence implements
NodeManagerPersistence {
return amEngineNode;
}
+ @Override
+ public EngineNode getEngineNodeByTicketId(String ticketId) {
+ AMEngineNode amEngineNode = new AMEngineNode();
+ PersistenceNode engineNode =
nodeManagerMapper.getNodeInstanceByTicketId(ticketId);
+
+ if (null == engineNode) {
+ return null;
+ }
+
+ ServiceInstance serviceInstance = new ServiceInstance();
+ serviceInstance.setInstance(engineNode.getInstance());
+ serviceInstance.setApplicationName(engineNode.getName());
+ amEngineNode.setServiceInstance(serviceInstance);
+
+ amEngineNode.setOwner(engineNode.getOwner());
+ amEngineNode.setMark(engineNode.getMark());
+ amEngineNode.setIdentifier(engineNode.getIdentifier());
+ amEngineNode.setTicketId(engineNode.getTicketId());
+ amEngineNode.setStartTime(engineNode.getCreateTime());
+ return amEngineNode;
+ }
+
@Override
public List<EngineNode> getEngineNodeByEM(ServiceInstance serviceInstance) {
// serviceinstance for a given EM(给定EM的 serviceinstance)
diff --git
a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/resources/mapper/common/NodeManagerMapper.xml
b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/resources/mapper/common/NodeManagerMapper.xml
index b470daead..935ceb4f8 100644
---
a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/resources/mapper/common/NodeManagerMapper.xml
+++
b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/resources/mapper/common/NodeManagerMapper.xml
@@ -6,9 +6,9 @@
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
- ~
+ ~
~ http://www.apache.org/licenses/LICENSE-2.0
- ~
+ ~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -34,9 +34,9 @@
<insert id="addNodeInstance" useGeneratedKeys="true" keyColumn="id"
keyProperty="id"
parameterType="org.apache.linkis.manager.common.entity.persistence.PersistenceNode">
INSERT INTO linkis_cg_manager_service_instance (instance, name, owner,
mark, ticketId, update_time
- , create_time, updator, creator)
+ , create_time, updator,
creator)
VALUES (#{instance}, #{name}, #{owner}, #{mark}, #{ticketId},
#{updateTime}
- , #{createTime}, #{updator}, #{creator})
+ , #{createTime}, #{updator}, #{creator})
</insert>
<update id="updateNodeInstance">
@@ -119,9 +119,9 @@
<select id="getNodeInstanceIds" resultType="java.lang.Integer">
SELECT id FROM linkis_cg_manager_service_instance WHERE instance IN (
- <foreach collection='instances' separator=',' item='instance'>
- #{instance}
- </foreach> )
+ <foreach collection='instances' separator=',' item='instance'>
+ #{instance}
+ </foreach> )
</select>
<select id="getNodeInstance"
resultType="org.apache.linkis.manager.common.entity.persistence.PersistenceNode">
@@ -130,6 +130,12 @@
WHERE instance = #{instance}
</select>
+ <select id="getNodeInstanceByTicketId"
resultType="org.apache.linkis.manager.common.entity.persistence.PersistenceNode">
+ SELECT *
+ FROM linkis_cg_manager_service_instance
+ WHERE ticketId = #{ticketId}
+ </select>
+
<select id="getNodeInstanceById"
resultType="org.apache.linkis.manager.common.entity.persistence.PersistenceNode">
SELECT *
FROM linkis_cg_manager_service_instance
@@ -140,9 +146,9 @@
SELECT *
FROM linkis_cg_manager_service_instance
WHERE instance IN (
- SELECT em_instance
- FROM linkis_cg_manager_engine_em
- WHERE engine_instance = #{instance}
+ SELECT em_instance
+ FROM linkis_cg_manager_engine_em
+ WHERE engine_instance = #{instance}
)
</select>
@@ -150,17 +156,17 @@
SELECT *
FROM linkis_cg_manager_service_instance
WHERE instance IN (
- SELECT engine_instance
- FROM linkis_cg_manager_engine_em
- WHERE em_instance = #{instance}
+ SELECT engine_instance
+ FROM linkis_cg_manager_engine_em
+ WHERE em_instance = #{instance}
)
</select>
<select id="getNodesByInstances"
resultType="org.apache.linkis.manager.common.entity.persistence.PersistenceNode">
SELECT * FROM linkis_cg_manager_service_instance WHERE instance IN(
- <foreach collection='instances' separator=',' item='instance'>
- #{instance}
- </foreach>)
+ <foreach collection='instances' separator=',' item='instance'>
+ #{instance}
+ </foreach>)
</select>
<insert id="addEngineNode">
diff --git
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/errorcode/SparkErrorCodeSummary.java
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/errorcode/SparkErrorCodeSummary.java
index 936e773e4..42f0b66e4 100644
---
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/errorcode/SparkErrorCodeSummary.java
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/errorcode/SparkErrorCodeSummary.java
@@ -66,6 +66,10 @@ public enum SparkErrorCodeSummary implements LinkisErrorCode
{
43032, "The application start failed, since yarn applicationId is
null."),
NOT_SUPPORT_METHOD(43040, "Not support method for requestExpectedResource."),
+
+ LINKIS_SPARK_YARN_CLUSTER_JARS_ERROR(
+ 43042,
+ "linkis.spark.yarn.cluster.jars parameters configuration
errors(linkis.spark.yarn.cluster.jars 参数配置错误)."),
;
/** (errorCode)错误码 */
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
index ccc21a776..b42fc0934 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
@@ -30,6 +30,10 @@ object SparkConfiguration extends Logging {
val SPARK_HOME_ENV = "SPARK_HOME"
val SPARK_CONF_DIR_ENV = "SPARK_CONF_DIR"
+ val SPARK_YARN_CLIENT = "client"
+
+ val SPARK_YARN_CLUSTER = "cluster"
+
val PROCESS_MAX_THREADS =
CommonVars[Int]("wds.linkis.process.threadpool.max", 100)
val SPARK_SESSION_HOOK =
CommonVars[String]("wds.linkis.engine.spark.session.hook", "")
@@ -46,6 +50,9 @@ object SparkConfiguration extends Logging {
val SPARK_DEPLOY_MODE = CommonVars[String]("spark.submit.deployMode",
"client")
+ val SPARK_YARN_CLUSTER_JARS =
+ CommonVars[String]("linkis.spark.yarn.cluster.jars",
"hdfs:///spark/cluster")
+
val SPARK_APP_NAME = CommonVars[String]("spark.app.name",
"Linkis-EngineConn-Spark")
val SPARK_APP_RESOURCE = CommonVars[String]("spark.app.resource", "")
val SPARK_APP_CONF = CommonVars[String]("spark.extconf", "")
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
index 2264db61f..8d97e8152 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
@@ -25,7 +25,7 @@ import
org.apache.linkis.engineconn.computation.executor.execute.{
}
import org.apache.linkis.engineconn.computation.executor.utlis.ProgressUtils
import org.apache.linkis.engineconn.core.exception.ExecutorHookFatalException
-import org.apache.linkis.engineconn.executor.entity.ResourceFetchExecutor
+import org.apache.linkis.engineconn.executor.entity.{ResourceFetchExecutor,
YarnExecutor}
import org.apache.linkis.engineplugin.spark.common.{Kind, SparkDataCalc}
import org.apache.linkis.engineplugin.spark.cs.CSSparkHelper
import org.apache.linkis.engineplugin.spark.extension.{
@@ -56,6 +56,7 @@ import scala.collection.mutable.ArrayBuffer
abstract class SparkEngineConnExecutor(val sc: SparkContext, id: Long)
extends ComputationExecutor
with Logging
+ with YarnExecutor
with ResourceFetchExecutor {
private var initialized: Boolean = false
@@ -70,9 +71,17 @@ abstract class SparkEngineConnExecutor(val sc: SparkContext,
id: Long)
private var thread: Thread = _
+ private var applicationId: String = sc.applicationId
+
+ override def getApplicationId: String = applicationId
+
+ override def getApplicationURL: String = ""
+ override def getYarnMode: String = ""
+ override def getQueue: String = ""
+
override def init(): Unit = {
logger.info(s"Ready to change engine state!")
-// setCodeParser() // todo check
+ // setCodeParser() // todo check
super.init()
}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
index 5bf90c6bf..bc18e2bad 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
@@ -39,8 +39,10 @@ import
org.apache.linkis.manager.engineplugin.common.creation.{
}
import org.apache.linkis.manager.engineplugin.common.launch.process.Environment
import
org.apache.linkis.manager.engineplugin.common.launch.process.Environment.variable
+import org.apache.linkis.manager.label.constant.LabelValueConstant
import org.apache.linkis.manager.label.entity.engine.EngineType
import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType
+import org.apache.linkis.manager.label.utils.LabelUtil
import org.apache.linkis.server.JMap
import org.apache.commons.lang3.StringUtils
@@ -144,19 +146,32 @@ class SparkEngineConnFactory extends
MultiExecutorEngineConnFactory with Logging
val master =
sparkConf.getOption("spark.master").getOrElse(CommonVars("spark.master",
"yarn").getValue)
logger.info(s"------ Create new SparkContext {$master} -------")
- val pysparkBasePath = SparkConfiguration.SPARK_HOME.getValue
- val pysparkPath = new File(pysparkBasePath, "python" + File.separator +
"lib")
- var pythonLibUris =
pysparkPath.listFiles().map(_.toURI.toString).filter(_.endsWith(".zip"))
- if (pythonLibUris.length == 2) {
- val sparkConfValue1 =
Utils.tryQuietly(CommonVars("spark.yarn.dist.files", "").getValue)
- val sparkConfValue2 =
Utils.tryQuietly(sparkConf.get("spark.yarn.dist.files"))
- if (StringUtils.isNotBlank(sparkConfValue2)) {
- pythonLibUris = sparkConfValue2 +: pythonLibUris
- }
- if (StringUtils.isNotBlank(sparkConfValue1)) {
- pythonLibUris = sparkConfValue1 +: pythonLibUris
+
+ val label =
LabelUtil.getEngingeConnRuntimeModeLabel(engineCreationContext.getLabels())
+ val isYarnClusterMode: Boolean =
+ if (null != label &&
label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) true
+ else false
+
+ if (isYarnClusterMode) {
+ sparkConf.set("spark.submit.deployMode", "cluster")
+ }
+
+ // todo yarn cluster暂时不支持pyspark,后期对pyspark进行处理
+ if (!isYarnClusterMode) {
+ val pysparkBasePath = SparkConfiguration.SPARK_HOME.getValue
+ val pysparkPath = new File(pysparkBasePath, "python" + File.separator +
"lib")
+ var pythonLibUris =
pysparkPath.listFiles().map(_.toURI.toString).filter(_.endsWith(".zip"))
+ if (pythonLibUris.length == 2) {
+ val sparkConfValue1 =
Utils.tryQuietly(CommonVars("spark.yarn.dist.files", "").getValue)
+ val sparkConfValue2 =
Utils.tryQuietly(sparkConf.get("spark.yarn.dist.files"))
+ if (StringUtils.isNotBlank(sparkConfValue2)) {
+ pythonLibUris = sparkConfValue2 +: pythonLibUris
+ }
+ if (StringUtils.isNotBlank(sparkConfValue1)) {
+ pythonLibUris = sparkConfValue1 +: pythonLibUris
+ }
+ sparkConf.set("spark.yarn.dist.files", pythonLibUris.mkString(","))
}
- sparkConf.set("spark.yarn.dist.files", pythonLibUris.mkString(","))
}
// Distributes needed libraries to workers
// when spark version is greater than or equal to 1.5.0
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala
index 8472cdfa9..2487ede90 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala
@@ -18,23 +18,30 @@
package org.apache.linkis.engineplugin.spark.launch
import org.apache.linkis.common.conf.CommonVars
+import org.apache.linkis.engineplugin.spark.config.SparkConfiguration
import org.apache.linkis.engineplugin.spark.config.SparkConfiguration.{
ENGINE_JAR,
SPARK_APP_NAME,
SPARK_DEFAULT_EXTERNAL_JARS_PATH,
+ SPARK_DEPLOY_MODE,
SPARK_DRIVER_CLASSPATH,
SPARK_DRIVER_EXTRA_JAVA_OPTIONS,
SPARK_PYTHON_VERSION,
- SPARK_SUBMIT_PATH
+ SPARK_SUBMIT_PATH,
+ SPARK_YARN_CLUSTER_JARS
}
import org.apache.linkis.engineplugin.spark.config.SparkResourceConfiguration._
+import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary
+import org.apache.linkis.engineplugin.spark.exception.SparkEngineException
import org.apache.linkis.hadoop.common.conf.HadoopConf
import org.apache.linkis.manager.common.entity.resource.DriverAndYarnResource
import org.apache.linkis.manager.engineplugin.common.conf.EnvConfiguration
import
org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnBuildRequest
import
org.apache.linkis.manager.engineplugin.common.launch.process.Environment._
import
org.apache.linkis.manager.engineplugin.common.launch.process.JavaProcessEngineConnLaunchBuilder
+import org.apache.linkis.manager.label.constant.LabelValueConstant
import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel
+import org.apache.linkis.manager.label.utils.LabelUtil
import org.apache.linkis.protocol.UserWithCreator
import org.apache.commons.lang3.StringUtils
@@ -61,7 +68,11 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder:
JavaProcessEngineConnLa
val executorMemory = getValueAndRemove(properties,
LINKIS_SPARK_EXECUTOR_MEMORY)
val numExecutors = getValueAndRemove(properties,
LINKIS_SPARK_EXECUTOR_INSTANCES)
- val files = getValueAndRemove(properties, "files",
"").split(",").filter(isNotBlankPath)
+ val files: ArrayBuffer[String] = getValueAndRemove(properties, "files", "")
+ .split(",")
+ .filter(isNotBlankPath)
+ .toBuffer
+ .asInstanceOf[ArrayBuffer[String]]
val jars = new ArrayBuffer[String]()
jars ++= getValueAndRemove(properties, "jars",
"").split(",").filter(isNotBlankPath)
jars ++= getValueAndRemove(properties, SPARK_DEFAULT_EXTERNAL_JARS_PATH)
@@ -115,8 +126,34 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder:
JavaProcessEngineConnLa
memory
}
+ var deployMode: String = SparkConfiguration.SPARK_YARN_CLIENT
+
+ val label =
LabelUtil.getEngingeConnRuntimeModeLabel(engineConnBuildRequest.labels)
+ val isYarnClusterMode: Boolean =
+ if (null != label &&
label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) true
+ else false
+
+ if (isYarnClusterMode) {
+ deployMode = SparkConfiguration.SPARK_YARN_CLUSTER
+ files ++= Array(s"${variable(PWD)}/conf/linkis-engineconn.properties")
+
+ var clusterJars: String = getValueAndRemove(properties,
SPARK_YARN_CLUSTER_JARS)
+
+ if (StringUtils.isBlank(clusterJars)) {
+ throw new SparkEngineException(
+
SparkErrorCodeSummary.LINKIS_SPARK_YARN_CLUSTER_JARS_ERROR.getErrorCode,
+
SparkErrorCodeSummary.LINKIS_SPARK_YARN_CLUSTER_JARS_ERROR.getErrorDesc
+ )
+ }
+
+ if (clusterJars.endsWith("/")) {
+ clusterJars = clusterJars.dropRight(1)
+ }
+ jars += s"$clusterJars/*"
+ }
+
addOpt("--master", "yarn")
- addOpt("--deploy-mode", "client")
+ addOpt("--deploy-mode", deployMode)
addOpt("--name", appName)
addProxyUser()
@@ -137,8 +174,9 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder:
JavaProcessEngineConnLa
addOpt("--num-executors", numExecutors.toString)
addOpt("--queue", queue)
- getConf(engineConnBuildRequest, gcLogDir, logDir).foreach { case (key,
value) =>
- addOpt("--conf", s"""$key="$value"""")
+ getConf(engineConnBuildRequest, gcLogDir, logDir,
isYarnClusterMode).foreach {
+ case (key, value) =>
+ addOpt("--conf", s"""$key="$value"""")
}
addOpt("--class", className)
@@ -152,7 +190,8 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder:
JavaProcessEngineConnLa
def getConf(
engineConnBuildRequest: EngineConnBuildRequest,
gcLogDir: String,
- logDir: String
+ logDir: String,
+ isYarnClusterMode: Boolean
): ArrayBuffer[(String, String)] = {
val driverJavaSet = new StringBuilder(" -server")
if
(StringUtils.isNotEmpty(EnvConfiguration.ENGINE_CONN_DEFAULT_JAVA_OPTS.getValue))
{
@@ -168,7 +207,11 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder:
JavaProcessEngineConnLa
.foreach(l => {
driverJavaSet.append(" ").append(l)
})
- driverJavaSet.append(" -Djava.io.tmpdir=" + variable(TEMP_DIRS))
+ if (isYarnClusterMode) {
+ driverJavaSet.append(" -Djava.io.tmpdir=/tmp")
+ } else {
+ driverJavaSet.append(" -Djava.io.tmpdir=" + variable(TEMP_DIRS))
+ }
if (EnvConfiguration.ENGINE_CONN_DEBUG_ENABLE.getValue) {
driverJavaSet.append(
s"
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=${variable(RANDOM_PORT)}"
@@ -186,6 +229,7 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder:
JavaProcessEngineConnLa
val keyValue = iterator.next()
if (
!SPARK_PYTHON_VERSION.key.equals(keyValue.getKey) &&
+ !SPARK_DEPLOY_MODE.key.equals(keyValue.getKey) &&
keyValue.getKey.startsWith("spark.") &&
StringUtils.isNotBlank(keyValue.getValue)
) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]