This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/master by this push:
new 8f149e180 Spark once task supports engingeConnRuntimeMode label (#4896)
8f149e180 is described below
commit 8f149e1800a57be2d18fb8e0592191bc3aa8ba69
Author: ChengJie1053 <[email protected]>
AuthorDate: Mon Sep 18 15:09:04 2023 +0800
Spark once task supports engingeConnRuntimeMode label (#4896)
* Spark once task supports engingeConnRuntimeMode label
* isYarnClusterMode extracts to LabelUtil
* Modify SparkEngineConnFactory
---
.../linkis/manager/label/utils/LabelUtil.scala | 10 ++++++++++
.../spark/factory/SparkEngineConnFactory.scala | 20 ++++++++++++--------
2 files changed, 22 insertions(+), 8 deletions(-)
diff --git
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala
index 3965a5ea1..986f13068 100644
---
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala
+++
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala
@@ -17,6 +17,7 @@
package org.apache.linkis.manager.label.utils
+import org.apache.linkis.manager.label.constant.LabelValueConstant
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.label.entity.engine.{
CodeLanguageLabel,
@@ -135,4 +136,13 @@ object LabelUtil {
null.asInstanceOf[A]
}
+ def isYarnClusterMode(labels: util.List[Label[_]]): Boolean = {
+ val label = LabelUtil.getEngingeConnRuntimeModeLabel(labels)
+ val isYarnClusterMode: Boolean = {
+ if (null != label &&
label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) true
+ else false
+ }
+ isYarnClusterMode
+ }
+
}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
index e8f2cd22d..fbd38bcc6 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
@@ -39,7 +39,6 @@ import
org.apache.linkis.manager.engineplugin.common.creation.{
}
import org.apache.linkis.manager.engineplugin.common.launch.process.Environment
import
org.apache.linkis.manager.engineplugin.common.launch.process.Environment.variable
-import org.apache.linkis.manager.label.constant.LabelValueConstant
import org.apache.linkis.manager.label.entity.engine.EngineType
import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType
import org.apache.linkis.manager.label.utils.LabelUtil
@@ -86,12 +85,13 @@ class SparkEngineConnFactory extends
MultiExecutorEngineConnFactory with Logging
val hadoopConfDir = EnvConfiguration.HADOOP_CONF_DIR.getValue(options)
val sparkHome = SPARK_HOME.getValue(options)
val sparkConfDir = SPARK_CONF_DIR.getValue(options)
- val sparkConfig: SparkConfig = getSparkConfig(options)
+ val sparkConfig: SparkConfig =
+ getSparkConfig(options,
LabelUtil.isYarnClusterMode(engineCreationContext.getLabels()))
val context = new EnvironmentContext(sparkConfig, hadoopConfDir,
sparkConfDir, sparkHome, null)
context
}
- def getSparkConfig(options: util.Map[String, String]): SparkConfig = {
+ def getSparkConfig(options: util.Map[String, String], isYarnClusterMode:
Boolean): SparkConfig = {
logger.info("options: " + JsonUtils.jackson.writeValueAsString(options))
val sparkConfig: SparkConfig = new SparkConfig()
sparkConfig.setJavaHome(variable(Environment.JAVA_HOME))
@@ -114,7 +114,14 @@ class SparkEngineConnFactory extends
MultiExecutorEngineConnFactory with Logging
sparkConfig.setK8sDriverRequestCores(SPARK_K8S_DRIVER_REQUEST_CORES.getValue(options))
sparkConfig.setK8sExecutorRequestCores(SPARK_K8S_EXECUTOR_REQUEST_CORES.getValue(options))
}
- sparkConfig.setDeployMode(SPARK_DEPLOY_MODE.getValue(options))
+
+ if (master.startsWith("yarn")) {
+ if (isYarnClusterMode) {
+ sparkConfig.setDeployMode(SparkConfiguration.SPARK_YARN_CLUSTER)
+ } else {
+ sparkConfig.setDeployMode(SparkConfiguration.SPARK_YARN_CLIENT)
+ }
+ }
sparkConfig.setAppResource(SPARK_APP_RESOURCE.getValue(options))
sparkConfig.setAppName(SPARK_APP_NAME.getValue(options))
sparkConfig.setJars(SPARK_EXTRA_JARS.getValue(options)) // todo
@@ -149,10 +156,7 @@ class SparkEngineConnFactory extends
MultiExecutorEngineConnFactory with Logging
sparkConf.getOption("spark.master").getOrElse(CommonVars("spark.master",
"yarn").getValue)
logger.info(s"------ Create new SparkContext {$master} -------")
- val label =
LabelUtil.getEngingeConnRuntimeModeLabel(engineCreationContext.getLabels())
- val isYarnClusterMode: Boolean =
- if (null != label &&
label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) true
- else false
+ val isYarnClusterMode =
LabelUtil.isYarnClusterMode(engineCreationContext.getLabels())
if (isYarnClusterMode) {
sparkConf.set("spark.submit.deployMode", "cluster")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]