This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f149e180 Spark once task supports engingeConnRuntimeMode label (#4896)
8f149e180 is described below

commit 8f149e1800a57be2d18fb8e0592191bc3aa8ba69
Author: ChengJie1053 <[email protected]>
AuthorDate: Mon Sep 18 15:09:04 2023 +0800

    Spark once task supports engingeConnRuntimeMode label (#4896)
    
    * Spark once task supports engingeConnRuntimeMode label
    
    * isYarnClusterMode extracts to LabelUtil
    
    * Modify SparkEngineConnFactory
---
 .../linkis/manager/label/utils/LabelUtil.scala       | 10 ++++++++++
 .../spark/factory/SparkEngineConnFactory.scala       | 20 ++++++++++++--------
 2 files changed, 22 insertions(+), 8 deletions(-)

diff --git 
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala
 
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala
index 3965a5ea1..986f13068 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala
+++ 
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala
@@ -17,6 +17,7 @@
 
 package org.apache.linkis.manager.label.utils
 
+import org.apache.linkis.manager.label.constant.LabelValueConstant
 import org.apache.linkis.manager.label.entity.Label
 import org.apache.linkis.manager.label.entity.engine.{
   CodeLanguageLabel,
@@ -135,4 +136,13 @@ object LabelUtil {
     null.asInstanceOf[A]
   }
 
+  def isYarnClusterMode(labels: util.List[Label[_]]): Boolean = {
+    val label = LabelUtil.getEngingeConnRuntimeModeLabel(labels)
+    val isYarnClusterMode: Boolean = {
+      if (null != label && 
label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) true
+      else false
+    }
+    isYarnClusterMode
+  }
+
 }
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
index e8f2cd22d..fbd38bcc6 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
@@ -39,7 +39,6 @@ import 
org.apache.linkis.manager.engineplugin.common.creation.{
 }
 import org.apache.linkis.manager.engineplugin.common.launch.process.Environment
 import 
org.apache.linkis.manager.engineplugin.common.launch.process.Environment.variable
-import org.apache.linkis.manager.label.constant.LabelValueConstant
 import org.apache.linkis.manager.label.entity.engine.EngineType
 import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType
 import org.apache.linkis.manager.label.utils.LabelUtil
@@ -86,12 +85,13 @@ class SparkEngineConnFactory extends 
MultiExecutorEngineConnFactory with Logging
     val hadoopConfDir = EnvConfiguration.HADOOP_CONF_DIR.getValue(options)
     val sparkHome = SPARK_HOME.getValue(options)
     val sparkConfDir = SPARK_CONF_DIR.getValue(options)
-    val sparkConfig: SparkConfig = getSparkConfig(options)
+    val sparkConfig: SparkConfig =
+      getSparkConfig(options, 
LabelUtil.isYarnClusterMode(engineCreationContext.getLabels()))
     val context = new EnvironmentContext(sparkConfig, hadoopConfDir, 
sparkConfDir, sparkHome, null)
     context
   }
 
-  def getSparkConfig(options: util.Map[String, String]): SparkConfig = {
+  def getSparkConfig(options: util.Map[String, String], isYarnClusterMode: 
Boolean): SparkConfig = {
     logger.info("options: " + JsonUtils.jackson.writeValueAsString(options))
     val sparkConfig: SparkConfig = new SparkConfig()
     sparkConfig.setJavaHome(variable(Environment.JAVA_HOME))
@@ -114,7 +114,14 @@ class SparkEngineConnFactory extends 
MultiExecutorEngineConnFactory with Logging
       
sparkConfig.setK8sDriverRequestCores(SPARK_K8S_DRIVER_REQUEST_CORES.getValue(options))
       
sparkConfig.setK8sExecutorRequestCores(SPARK_K8S_EXECUTOR_REQUEST_CORES.getValue(options))
     }
-    sparkConfig.setDeployMode(SPARK_DEPLOY_MODE.getValue(options))
+
+    if (master.startsWith("yarn")) {
+      if (isYarnClusterMode) {
+        sparkConfig.setDeployMode(SparkConfiguration.SPARK_YARN_CLUSTER)
+      } else {
+        sparkConfig.setDeployMode(SparkConfiguration.SPARK_YARN_CLIENT)
+      }
+    }
     sparkConfig.setAppResource(SPARK_APP_RESOURCE.getValue(options))
     sparkConfig.setAppName(SPARK_APP_NAME.getValue(options))
     sparkConfig.setJars(SPARK_EXTRA_JARS.getValue(options)) // todo
@@ -149,10 +156,7 @@ class SparkEngineConnFactory extends 
MultiExecutorEngineConnFactory with Logging
       sparkConf.getOption("spark.master").getOrElse(CommonVars("spark.master", 
"yarn").getValue)
     logger.info(s"------ Create new SparkContext {$master} -------")
 
-    val label = 
LabelUtil.getEngingeConnRuntimeModeLabel(engineCreationContext.getLabels())
-    val isYarnClusterMode: Boolean =
-      if (null != label && 
label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) true
-      else false
+    val isYarnClusterMode = 
LabelUtil.isYarnClusterMode(engineCreationContext.getLabels())
 
     if (isYarnClusterMode) {
       sparkConf.set("spark.submit.deployMode", "cluster")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to