This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ca67262a feat: support spark submit jar on k8s (#4867)
5ca67262a is described below

commit 5ca67262ad93ab09c3338393155f3f5fa8717078
Author: zlucelia <[email protected]>
AuthorDate: Fri Sep 8 14:38:54 2023 +0800

    feat: support spark submit jar on k8s (#4867)
    
    * feat: support spark submit jar on k8s
    
    * feat: add spark cores setting priority
    
    * feat: use UUID to generate driverPodName
    
    * feat: optimize code of executor creation
---
 .../spark/client/context/SparkConfig.java          |  25 +++
 .../ClusterDescriptorAdapterFactory.java           |   9 +-
 ...ernetesApplicationClusterDescriptorAdapter.java | 231 +++++++++++++++++++++
 .../SparkOnKubernetesSubmitOnceExecutor.scala      | 163 +++++++++++++++
 .../spark/factory/SparkEngineConnFactory.scala     |   2 +
 .../factory/SparkEngineConnResourceFactory.scala   |   8 +-
 .../spark/factory/SparkOnceExecutorFactory.scala   |  22 +-
 .../spark/utils/SparkJobProgressUtil.scala         |  45 +++-
 8 files changed, 494 insertions(+), 11 deletions(-)

diff --git 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
index 3d0fc0ff3..37a0e2c98 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
@@ -50,6 +50,9 @@ public class SparkConfig {
 
   private String k8sNamespace;
   private String k8sFileUploadPath;
+
+  private String k8sDriverRequestCores;
+  private String k8sExecutorRequestCores;
   private String deployMode = "client"; // ("client") // todo cluster
   private String appResource; // ("")
   private String appName; // ("")
@@ -176,6 +179,22 @@ public class SparkConfig {
     this.k8sImage = k8sImage;
   }
 
+  public String getK8sDriverRequestCores() {
+    return k8sDriverRequestCores;
+  }
+
+  public void setK8sDriverRequestCores(String k8sDriverRequestCores) {
+    this.k8sDriverRequestCores = k8sDriverRequestCores;
+  }
+
+  public String getK8sExecutorRequestCores() {
+    return k8sExecutorRequestCores;
+  }
+
+  public void setK8sExecutorRequestCores(String k8sExecutorRequestCores) {
+    this.k8sExecutorRequestCores = k8sExecutorRequestCores;
+  }
+
   public String getJavaHome() {
     return javaHome;
   }
@@ -442,6 +461,12 @@ public class SparkConfig {
         + ", k8sNamespace='"
         + k8sNamespace
         + '\''
+        + ", k8sDriverRequestCores='"
+        + k8sDriverRequestCores
+        + '\''
+        + ", k8sExecutorRequestCores='"
+        + k8sExecutorRequestCores
+        + '\''
         + ", deployMode='"
         + deployMode
         + '\''
diff --git 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
index 91d3eafb6..bc67a33e9 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
@@ -29,8 +29,13 @@ public class ClusterDescriptorAdapterFactory {
     ClusterDescriptorAdapter clusterDescriptorAdapter =
         new YarnApplicationClusterDescriptorAdapter(executionContext);
 
-    if (StringUtils.isNotBlank(master) && 
master.equalsIgnoreCase("k8s-operator")) {
-      clusterDescriptorAdapter = new 
KubernetesOperatorClusterDescriptorAdapter(executionContext);
+    if (StringUtils.isNotBlank(master)) {
+      if (master.equalsIgnoreCase("k8s-operator")) {
+        clusterDescriptorAdapter = new 
KubernetesOperatorClusterDescriptorAdapter(executionContext);
+      } else if (master.equalsIgnoreCase("k8s-native")) {
+        clusterDescriptorAdapter =
+            new 
KubernetesApplicationClusterDescriptorAdapter(executionContext);
+      }
     }
 
     return clusterDescriptorAdapter;
diff --git 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesApplicationClusterDescriptorAdapter.java
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesApplicationClusterDescriptorAdapter.java
new file mode 100644
index 000000000..0ee0380fb
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesApplicationClusterDescriptorAdapter.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.client.deployment;
+
+import org.apache.linkis.engineplugin.spark.client.context.ExecutionContext;
+import org.apache.linkis.engineplugin.spark.client.context.SparkConfig;
+import 
org.apache.linkis.engineplugin.spark.client.deployment.util.KubernetesHelper;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.util.Strings;
+import org.apache.spark.launcher.CustomSparkSubmitLauncher;
+import org.apache.spark.launcher.SparkAppHandle;
+import org.apache.spark.launcher.SparkLauncher;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.UUID;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.dsl.PodResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KubernetesApplicationClusterDescriptorAdapter extends 
ClusterDescriptorAdapter {
+  private static final Logger logger =
+      
LoggerFactory.getLogger(KubernetesOperatorClusterDescriptorAdapter.class);
+
+  protected SparkConfig sparkConfig;
+  protected KubernetesClient client;
+  protected String driverPodName;
+  protected String namespace;
+
+  public KubernetesApplicationClusterDescriptorAdapter(ExecutionContext 
executionContext) {
+    super(executionContext);
+    this.sparkConfig = executionContext.getSparkConfig();
+    this.client =
+        KubernetesHelper.getKubernetesClient(
+            this.sparkConfig.getK8sConfigFile(),
+            this.sparkConfig.getK8sMasterUrl(),
+            this.sparkConfig.getK8sUsername(),
+            this.sparkConfig.getK8sPassword());
+  }
+
+  public void deployCluster(String mainClass, String args, Map<String, String> 
confMap)
+      throws IOException {
+    SparkConfig sparkConfig = executionContext.getSparkConfig();
+    sparkLauncher = new CustomSparkSubmitLauncher();
+    sparkLauncher
+        .setJavaHome(sparkConfig.getJavaHome())
+        .setSparkHome(sparkConfig.getSparkHome())
+        .setMaster(sparkConfig.getK8sMasterUrl())
+        .setDeployMode(sparkConfig.getDeployMode())
+        .setAppName(sparkConfig.getAppName())
+        .setVerbose(true);
+    this.driverPodName = generateDriverPodName(sparkConfig.getAppName());
+    this.namespace = sparkConfig.getK8sNamespace();
+    setConf(sparkLauncher, "spark.app.name", sparkConfig.getAppName());
+    setConf(sparkLauncher, "spark.kubernetes.namespace", this.namespace);
+    setConf(sparkLauncher, "spark.kubernetes.container.image", 
sparkConfig.getK8sImage());
+    setConf(sparkLauncher, "spark.kubernetes.driver.pod.name", 
this.driverPodName);
+    setConf(
+        sparkLauncher,
+        "spark.kubernetes.driver.request.cores",
+        sparkConfig.getK8sDriverRequestCores());
+    setConf(
+        sparkLauncher,
+        "spark.kubernetes.executor.request.cores",
+        sparkConfig.getK8sExecutorRequestCores());
+    setConf(
+        sparkLauncher,
+        "spark.kubernetes.container.image.pullPolicy",
+        sparkConfig.getK8sImagePullPolicy());
+    setConf(
+        sparkLauncher,
+        "spark.kubernetes.authenticate.driver.serviceAccountName",
+        sparkConfig.getK8sServiceAccount());
+    if (confMap != null) confMap.forEach((k, v) -> sparkLauncher.setConf(k, 
v));
+
+    addSparkArg(sparkLauncher, "--jars", sparkConfig.getJars());
+    addSparkArg(sparkLauncher, "--packages", sparkConfig.getPackages());
+    addSparkArg(sparkLauncher, "--exclude-packages", 
sparkConfig.getExcludePackages());
+    addSparkArg(sparkLauncher, "--repositories", 
sparkConfig.getRepositories());
+    addSparkArg(sparkLauncher, "--files", sparkConfig.getFiles());
+    addSparkArg(sparkLauncher, "--archives", sparkConfig.getArchives());
+    addSparkArg(sparkLauncher, "--driver-memory", 
sparkConfig.getDriverMemory());
+    addSparkArg(sparkLauncher, "--driver-java-options", 
sparkConfig.getDriverJavaOptions());
+    addSparkArg(sparkLauncher, "--driver-library-path", 
sparkConfig.getDriverLibraryPath());
+    addSparkArg(sparkLauncher, "--driver-class-path", 
sparkConfig.getDriverClassPath());
+    addSparkArg(sparkLauncher, "--executor-memory", 
sparkConfig.getExecutorMemory());
+    addSparkArg(sparkLauncher, "--proxy-user", sparkConfig.getProxyUser());
+    addSparkArg(sparkLauncher, "--driver-cores", 
sparkConfig.getDriverCores().toString());
+    addSparkArg(sparkLauncher, "--total-executor-cores", 
sparkConfig.getTotalExecutorCores());
+    addSparkArg(sparkLauncher, "--executor-cores", 
sparkConfig.getExecutorCores().toString());
+    addSparkArg(sparkLauncher, "--num-executors", 
sparkConfig.getNumExecutors().toString());
+    addSparkArg(sparkLauncher, "--principal", sparkConfig.getPrincipal());
+    addSparkArg(sparkLauncher, "--keytab", sparkConfig.getKeytab());
+    sparkLauncher.setAppResource(sparkConfig.getAppResource());
+    sparkLauncher.setMainClass(mainClass);
+    Arrays.stream(args.split("\\s+"))
+        .filter(StringUtils::isNotBlank)
+        .forEach(arg -> sparkLauncher.addAppArgs(arg));
+    sparkAppHandle =
+        sparkLauncher.startApplication(
+            new SparkAppHandle.Listener() {
+              @Override
+              public void stateChanged(SparkAppHandle sparkAppHandle) {}
+
+              @Override
+              public void infoChanged(SparkAppHandle sparkAppHandle) {}
+            });
+    sparkLauncher.setSparkAppHandle(sparkAppHandle);
+  }
+
+  private void addSparkArg(SparkLauncher sparkLauncher, String key, String 
value) {
+    if (StringUtils.isNotBlank(key) && StringUtils.isNotBlank(value)) {
+      sparkLauncher.addSparkArg(key, value);
+    }
+  }
+
+  private void setConf(SparkLauncher sparkLauncher, String key, String value) {
+    if (StringUtils.isNotBlank(key) && StringUtils.isNotBlank(value)) {
+      sparkLauncher.setConf(key, value);
+    }
+  }
+
+  public boolean initJobId() {
+    Pod sparkDriverPod = getSparkDriverPod();
+    if (null == sparkDriverPod) {
+      return false;
+    }
+    String sparkDriverPodPhase = sparkDriverPod.getStatus().getPhase();
+    String sparkApplicationId = 
sparkDriverPod.getMetadata().getLabels().get("spark-app-selector");
+
+    if (Strings.isNotBlank(sparkApplicationId)) {
+      this.applicationId = sparkApplicationId;
+    }
+    if (Strings.isNotBlank(sparkDriverPodPhase)) {
+      this.jobState = kubernetesPodStateConvertSparkState(sparkDriverPodPhase);
+    }
+
+    // When the job is not finished, the appId is monitored; otherwise, the 
status is
+    // monitored(当任务没结束时,监控appId,反之,则监控状态,这里主要防止任务过早结束,导致一直等待)
+    return null != getApplicationId() || (jobState != null && 
jobState.isFinal());
+  }
+
+  protected Pod getSparkDriverPod() {
+    return client.pods().inNamespace(namespace).withName(driverPodName).get();
+  }
+
+  public String getSparkDriverPodIP() {
+    Pod sparkDriverPod = getSparkDriverPod();
+    if (null != sparkDriverPod) {
+      String sparkDriverPodIP = sparkDriverPod.getStatus().getPodIP();
+      if (StringUtils.isNotBlank(sparkDriverPodIP)) {
+        return sparkDriverPodIP;
+      } else {
+        logger.info("spark driver pod IP is null, the application may be 
pending");
+      }
+    } else {
+      logger.info("spark driver pod is not exist");
+    }
+    return "";
+  }
+
+  @Override
+  public SparkAppHandle.State getJobState() {
+    Pod sparkDriverPod = getSparkDriverPod();
+    if (null != sparkDriverPod) {
+      String sparkDriverPodPhase = sparkDriverPod.getStatus().getPhase();
+      this.jobState = kubernetesPodStateConvertSparkState(sparkDriverPodPhase);
+      logger.info("Job {} state is {}.", getApplicationId(), this.jobState);
+      return this.jobState;
+    }
+    return null;
+  }
+
+  @Override
+  public void close() {
+    logger.info("Start to close job {}.", getApplicationId());
+    PodResource<Pod> sparkDriverPodResource =
+        client.pods().inNamespace(namespace).withName(driverPodName);
+    if (null != sparkDriverPodResource.get()) {
+      sparkDriverPodResource.delete();
+    }
+    client.close();
+  }
+
+  @Override
+  public boolean isDisposed() {
+    return this.jobState.isFinal();
+  }
+
+  public SparkAppHandle.State kubernetesPodStateConvertSparkState(String 
kubernetesState) {
+    if (StringUtils.isBlank(kubernetesState)) {
+      return SparkAppHandle.State.UNKNOWN;
+    }
+    switch (kubernetesState.toUpperCase()) {
+      case "PENDING":
+        return SparkAppHandle.State.CONNECTED;
+      case "RUNNING":
+        return SparkAppHandle.State.RUNNING;
+      case "SUCCEEDED":
+        return SparkAppHandle.State.FINISHED;
+      case "FAILED":
+        return SparkAppHandle.State.FAILED;
+      default:
+        return SparkAppHandle.State.UNKNOWN;
+    }
+  }
+
+  public String generateDriverPodName(String appName) {
+    return appName + "-" + UUID.randomUUID().toString().replace("-", "") + 
"-driver";
+  }
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnKubernetesSubmitOnceExecutor.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnKubernetesSubmitOnceExecutor.scala
new file mode 100644
index 000000000..1c3873942
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnKubernetesSubmitOnceExecutor.scala
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.executor
+
+import org.apache.linkis.common.utils.{ByteTimeUtils, Utils}
+import org.apache.linkis.engineconn.once.executor.{
+  OnceExecutorExecutionContext,
+  OperableOnceExecutor
+}
+import org.apache.linkis.engineplugin.spark.client.deployment.{
+  KubernetesApplicationClusterDescriptorAdapter,
+  YarnApplicationClusterDescriptorAdapter
+}
+import org.apache.linkis.engineplugin.spark.config.SparkConfiguration.{
+  SPARK_APP_CONF,
+  SPARK_APPLICATION_ARGS,
+  SPARK_APPLICATION_MAIN_CLASS
+}
+import org.apache.linkis.engineplugin.spark.context.SparkEngineConnContext
+import org.apache.linkis.engineplugin.spark.utils.SparkJobProgressUtil
+import org.apache.linkis.manager.common.entity.resource._
+import org.apache.linkis.manager.common.utils.ResourceUtils
+import org.apache.linkis.protocol.engine.JobProgressInfo
+
+import org.apache.commons.lang3.StringUtils
+
+import java.util
+
+import scala.concurrent.duration.Duration
+
+import io.fabric8.kubernetes.api.model.Quantity
+
+class SparkOnKubernetesSubmitOnceExecutor(
+    override val id: Long,
+    override protected val sparkEngineConnContext: SparkEngineConnContext
+) extends SparkOnceExecutor[KubernetesApplicationClusterDescriptorAdapter]
+    with OperableOnceExecutor {
+
+  private var oldProgress: Float = 0f
+
+  override def doSubmit(
+      onceExecutorExecutionContext: OnceExecutorExecutionContext,
+      options: Map[String, String]
+  ): Unit = {
+    val args = SPARK_APPLICATION_ARGS.getValue(options)
+    val mainClass = SPARK_APPLICATION_MAIN_CLASS.getValue(options)
+    val extConf = SPARK_APP_CONF.getValue(options)
+    val confMap = new util.HashMap[String, String]()
+    if (StringUtils.isNotBlank(extConf)) {
+      for (conf <- extConf.split("\n")) {
+        if (StringUtils.isNotBlank(conf)) {
+          val pair = conf.trim.split("=")
+          if (pair.length == 2) {
+            confMap.put(pair(0), pair(1))
+          } else {
+            logger.warn(s"ignore spark conf: $conf")
+          }
+        }
+      }
+    }
+    logger.info(
+      s"Ready to submit spark application to kubernetes, mainClass: 
$mainClass, args: $args."
+    )
+    clusterDescriptorAdapter.deployCluster(mainClass, args, confMap)
+  }
+
+  override protected def waitToRunning(): Unit = {
+    // Wait until the task return applicationId (等待返回applicationId)
+    Utils.waitUntil(() => clusterDescriptorAdapter.initJobId(), Duration.Inf)
+    // Synchronize applicationId to EC SparkOnceExecutor to facilitate user 
operations,
+    // such as obtaining progress and killing 
jobs(将applicationId同步给EC执行器,方便用户操作,如获取进度,kill任务等)
+    setApplicationId(clusterDescriptorAdapter.getApplicationId)
+    super.waitToRunning()
+  }
+
+  override def getApplicationURL: String = ""
+
+  override def getCurrentNodeResource(): NodeResource = {
+    logger.info("Begin to get actual used resources!")
+    Utils.tryCatch({
+      val sparkConf = sparkEngineConnContext.getExecutionContext.getSparkConfig
+      val sparkNamespace = sparkConf.getK8sNamespace
+
+      val executorNum: Int = sparkConf.getNumExecutors
+      val executorMem: Long =
+        ByteTimeUtils.byteStringAsBytes(sparkConf.getExecutorMemory) * 
executorNum
+      val driverMem: Long = 
ByteTimeUtils.byteStringAsBytes(sparkConf.getDriverMemory)
+
+      val executorCoresQuantity = 
Quantity.parse(sparkConf.getK8sExecutorRequestCores)
+      val executorCores: Long =
+        (Quantity.getAmountInBytes(executorCoresQuantity).doubleValue() * 
1000).toLong * executorNum
+      val driverCoresQuantity = 
Quantity.parse(sparkConf.getK8sDriverRequestCores)
+      val driverCores: Long =
+        (Quantity.getAmountInBytes(driverCoresQuantity).doubleValue() * 
1000).toLong
+
+      logger.info(
+        "Current actual used resources is driverMem:" + driverMem + 
",driverCores:" + driverCores + ",executorMem:" + executorMem + 
",executorCores:" + executorCores + ",namespace:" + sparkNamespace
+      )
+      val usedResource = new DriverAndKubernetesResource(
+        new LoadInstanceResource(0, 0, 0),
+        new KubernetesResource(executorMem + driverMem, executorCores + 
driverCores, sparkNamespace)
+      )
+      val nodeResource = new CommonNodeResource
+      nodeResource.setUsedResource(usedResource)
+      
nodeResource.setResourceType(ResourceUtils.getResourceTypeByResource(usedResource))
+      nodeResource
+    })(t => {
+      logger.warn("Get actual used resource exception", t)
+      null
+    })
+  }
+
+  override def getProgress: Float = {
+    val jobIsFinal = clusterDescriptorAdapter != null &&
+      clusterDescriptorAdapter.getJobState != null &&
+      clusterDescriptorAdapter.getJobState.isFinal
+    if (oldProgress >= 1 || jobIsFinal) {
+      1
+    } else {
+      val sparkDriverPodIP = this.clusterDescriptorAdapter.getSparkDriverPodIP
+      if (StringUtils.isNotBlank(sparkDriverPodIP)) {
+        val newProgress = 
SparkJobProgressUtil.getProgress(this.getApplicationId, sparkDriverPodIP)
+        if (newProgress > oldProgress) {
+          oldProgress = newProgress
+        }
+      }
+      oldProgress
+    }
+  }
+
+  override def getProgressInfo: Array[JobProgressInfo] = {
+    val sparkDriverPodIP = this.clusterDescriptorAdapter.getSparkDriverPodIP
+    if (StringUtils.isNotBlank(sparkDriverPodIP)) {
+      SparkJobProgressUtil.getSparkJobProgressInfo(this.getApplicationId, 
sparkDriverPodIP)
+    } else {
+      Array.empty
+    }
+  }
+
+  override def getMetrics: util.Map[String, Any] = {
+    new util.HashMap[String, Any]()
+  }
+
+  override def getDiagnosis: util.Map[String, Any] = {
+    new util.HashMap[String, Any]()
+  }
+
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
index bc18e2bad..e8f2cd22d 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
@@ -111,6 +111,8 @@ class SparkEngineConnFactory extends 
MultiExecutorEngineConnFactory with Logging
       
sparkConfig.setK8sRestartPolicy(SPARK_K8S_RESTART_POLICY.getValue(options))
       sparkConfig.setK8sLanguageType(SPARK_K8S_LANGUAGE_TYPE.getValue(options))
       
sparkConfig.setK8sImagePullPolicy(SPARK_K8S_IMAGE_PULL_POLICY.getValue(options))
+      
sparkConfig.setK8sDriverRequestCores(SPARK_K8S_DRIVER_REQUEST_CORES.getValue(options))
+      
sparkConfig.setK8sExecutorRequestCores(SPARK_K8S_EXECUTOR_REQUEST_CORES.getValue(options))
     }
     sparkConfig.setDeployMode(SPARK_DEPLOY_MODE.getValue(options))
     sparkConfig.setAppResource(SPARK_APP_RESOURCE.getValue(options))
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnResourceFactory.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnResourceFactory.scala
index 922826c2a..640476a58 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnResourceFactory.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnResourceFactory.scala
@@ -113,7 +113,9 @@ class SparkEngineConnResourceFactory extends 
AbstractEngineResourceFactory with
         Quantity.parse(SPARK_K8S_EXECUTOR_REQUEST_CORES.getValue(properties))
       (Quantity.getAmountInBytes(executorCoresQuantity).doubleValue() * 
1000).toLong
     } else {
-      LINKIS_SPARK_EXECUTOR_CORES.getValue(properties) * 1000L
+      val sparkDefaultExecutorCores: Int = 
LINKIS_SPARK_EXECUTOR_CORES.getValue(properties)
+      properties.put(SPARK_K8S_EXECUTOR_REQUEST_CORES.key, 
sparkDefaultExecutorCores.toString)
+      sparkDefaultExecutorCores * 1000L
     }
     val executorMemory = LINKIS_SPARK_EXECUTOR_MEMORY.getValue(properties)
     val executorMemoryWithUnit = if (StringUtils.isNumeric(executorMemory)) {
@@ -126,7 +128,9 @@ class SparkEngineConnResourceFactory extends 
AbstractEngineResourceFactory with
         Quantity.parse(SPARK_K8S_DRIVER_REQUEST_CORES.getValue(properties))
       (Quantity.getAmountInBytes(executorCoresQuantity).doubleValue() * 
1000).toLong
     } else {
-      LINKIS_SPARK_DRIVER_CORES.getValue(properties) * 1000L
+      val sparkDefaultDriverCores: Int = 
LINKIS_SPARK_DRIVER_CORES.getValue(properties)
+      properties.put(SPARK_K8S_DRIVER_REQUEST_CORES.key, 
sparkDefaultDriverCores.toString)
+      sparkDefaultDriverCores * 1000L
     }
     val driverMemory = LINKIS_SPARK_DRIVER_MEMORY.getValue(properties)
     val driverMemoryWithUnit = if (StringUtils.isNumeric(driverMemory)) {
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkOnceExecutorFactory.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkOnceExecutorFactory.scala
index 25e264944..12a87e22f 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkOnceExecutorFactory.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkOnceExecutorFactory.scala
@@ -22,10 +22,16 @@ import 
org.apache.linkis.engineconn.common.engineconn.EngineConn
 import org.apache.linkis.engineconn.once.executor.OnceExecutor
 import org.apache.linkis.engineconn.once.executor.creation.OnceExecutorFactory
 import org.apache.linkis.engineplugin.spark.context.SparkEngineConnContext
-import org.apache.linkis.engineplugin.spark.executor.SparkSubmitOnceExecutor
+import org.apache.linkis.engineplugin.spark.executor.{
+  SparkOnKubernetesSubmitOnceExecutor,
+  SparkSubmitOnceExecutor
+}
+import 
org.apache.linkis.manager.common.conf.RMConfiguration.DEFAULT_KUBERNETES_TYPE
 import org.apache.linkis.manager.label.entity.Label
+import org.apache.linkis.manager.label.entity.cluster.ClusterLabel
 import org.apache.linkis.manager.label.entity.engine.RunType
 import org.apache.linkis.manager.label.entity.engine.RunType.RunType
+import org.apache.linkis.manager.label.utils.LabelUtil
 
 class SparkOnceExecutorFactory extends OnceExecutorFactory {
 
@@ -34,11 +40,21 @@ class SparkOnceExecutorFactory extends OnceExecutorFactory {
       engineCreationContext: EngineCreationContext,
       engineConn: EngineConn,
       labels: Array[Label[_]]
-  ): OnceExecutor =
+  ): OnceExecutor = {
+    val clusterLabel = LabelUtil.getLabelFromArray[ClusterLabel](labels)
     engineConn.getEngineConnSession match {
       case context: SparkEngineConnContext =>
-        new SparkSubmitOnceExecutor(id, context)
+        if (
+            null != clusterLabel && 
clusterLabel.getClusterType.equalsIgnoreCase(
+              DEFAULT_KUBERNETES_TYPE.getValue
+            )
+        ) {
+          new SparkOnKubernetesSubmitOnceExecutor(id, context)
+        } else {
+          new SparkSubmitOnceExecutor(id, context)
+        }
     }
+  }
 
   override protected def getRunType: RunType = RunType.JAR
 }
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/SparkJobProgressUtil.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/SparkJobProgressUtil.scala
index 196414420..6968ffb61 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/SparkJobProgressUtil.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/SparkJobProgressUtil.scala
@@ -27,11 +27,15 @@ import org.apache.http.client.methods.HttpGet
 import org.apache.http.impl.client.HttpClients
 import org.apache.http.util.EntityUtils
 
+import java.util
+
 object SparkJobProgressUtil extends Logging {
 
-  def getProgress(applicationId: String): Float = {
+  def getProgress(applicationId: String, podIP: String = ""): Float = {
     if (StringUtils.isBlank(applicationId)) return 0f
-    val sparkJobsResult = getSparkJobInfo(applicationId)
+    val sparkJobsResult =
+      if (StringUtils.isBlank(podIP)) getSparkJobInfo(applicationId)
+      else getKubernetesSparkJobInfo(applicationId, podIP)
     if (sparkJobsResult.isEmpty) return 0f
     val tuple = sparkJobsResult
       .filter(sparkJobResult => {
@@ -48,8 +52,10 @@ object SparkJobProgressUtil extends Logging {
     tuple._2.toFloat / tuple._1
   }
 
-  def getSparkJobProgressInfo(applicationId: String): Array[JobProgressInfo] = 
{
-    val sparkJobsResult = getSparkJobInfo(applicationId)
+  def getSparkJobProgressInfo(applicationId: String, podIP: String = ""): 
Array[JobProgressInfo] = {
+    val sparkJobsResult =
+      if (StringUtils.isBlank(podIP)) getSparkJobInfo(applicationId)
+      else getKubernetesSparkJobInfo(applicationId, podIP)
     if (sparkJobsResult.isEmpty) {
       Array.empty
     } else {
@@ -96,6 +102,37 @@ object SparkJobProgressUtil extends Logging {
       )
     }
 
+  def getKubernetesSparkJobInfo(
+      applicationId: String,
+      podIP: String
+  ): Array[java.util.Map[String, Object]] =
+    if (StringUtils.isBlank(applicationId) || StringUtils.isBlank(podIP)) 
Array.empty
+    else {
+      val getSparkJobsStateUrl = 
s"http://$podIP:4040/api/v1/applications/$applicationId";
+      logger.info(s"get spark job state from kubernetes spark ui, url: 
$getSparkJobsStateUrl")
+      val appStateResult =
+        JsonUtils.jackson.readValue(
+          get(getSparkJobsStateUrl),
+          classOf[java.util.Map[String, Object]]
+        )
+      val appAttemptList = 
appStateResult.get("attempts").asInstanceOf[java.util.List[Object]]
+      if (appAttemptList == null || appAttemptList.size() == 0) return 
Array.empty
+      val appLastAttempt =
+        appAttemptList.get(appAttemptList.size() - 
1).asInstanceOf[util.Map[String, Object]]
+      val isLastAttemptCompleted = 
appLastAttempt.get("completed").asInstanceOf[Boolean]
+      if (isLastAttemptCompleted) return Array.empty
+      val getSparkJobsInfoUrl = 
s"http://$podIP:4040/api/v1/applications/$applicationId/jobs";
+      logger.info(s"get spark job info from kubernetes spark ui: 
$getSparkJobsInfoUrl")
+      val jobs = get(getSparkJobsInfoUrl)
+      if (StringUtils.isBlank(jobs)) {
+        return Array.empty
+      }
+      JsonUtils.jackson.readValue(
+        get(getSparkJobsInfoUrl),
+        classOf[Array[java.util.Map[String, Object]]]
+      )
+    }
+
   def get(url: String): String = {
     val httpGet = new HttpGet(url)
     val client = HttpClients.createDefault


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to