This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/master by this push:
     new 89a653d75 feat: support submit pyspark once job on k8s and add 
clusterlabel to combinedlabel (#4906)
89a653d75 is described below

commit 89a653d75359b912bffade16426b0ab4b850d01a
Author: zlucelia <[email protected]>
AuthorDate: Thu Sep 21 21:45:40 2023 +0800

    feat: support submit pyspark once job on k8s and add clusterlabel to 
combinedlabel (#4906)
    
    * feat: support submit pyspark once job on k8s
    
    * feat: modify variable name
    
    * feat: add method to build k8s client from kubeConfig
    
    * feat: add Spark UI port configuration for spark on k8s once job
    
    * feat: rename userCreatorEngineTypeLabel
    
    * feat: merge podIP and port into url
    
    * fix: replace 'empty' with 'blank'
---
 .../manager/label/conf/LabelManagerConf.java       |  3 ++
 .../linkis/manager/rm/domain/RMLabelContainer.java | 49 +++++++++++++++----
 .../kubernetes/KubernetesResourceRequester.java    | 57 ++++++++++++++++------
 .../manager/rm/message/RMMessageService.java       |  4 +-
 .../manager/rm/service/RequestResourceService.java |  6 +--
 .../rm/service/impl/DefaultResourceManager.java    | 32 +++++-------
 .../rm/service/impl/ResourceLogService.java        | 11 ++---
 .../spark/client/context/SparkConfig.java          | 24 +++++++++
 ...ernetesApplicationClusterDescriptorAdapter.java |  6 ++-
 .../spark/config/SparkConfiguration.scala          |  3 ++
 .../SparkOnKubernetesSubmitOnceExecutor.scala      | 13 ++---
 .../spark/factory/SparkEngineConnFactory.scala     |  2 +
 .../spark/factory/SparkOnceExecutorFactory.scala   |  3 ++
 .../spark/utils/SparkJobProgressUtil.scala         | 26 ++++++----
 14 files changed, 161 insertions(+), 78 deletions(-)

diff --git 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/conf/LabelManagerConf.java
 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/conf/LabelManagerConf.java
index f43625491..9aa5ff797 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/conf/LabelManagerConf.java
+++ 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/conf/LabelManagerConf.java
@@ -23,4 +23,7 @@ public class LabelManagerConf {
 
   public static final String LONG_LIVED_LABEL =
       CommonVars.apply("wds.linkis.label.node.long.lived.label.keys", 
"tenant").getValue();
+
+  public static final boolean COMBINED_WITHOUT_YARN_DEFAULT =
+      CommonVars.apply("linkis.combined.without.yarn.default", 
true).getValue();
 }
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/domain/RMLabelContainer.java
 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/domain/RMLabelContainer.java
index 5bda33919..9d3140267 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/domain/RMLabelContainer.java
+++ 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/domain/RMLabelContainer.java
@@ -18,10 +18,13 @@
 package org.apache.linkis.manager.rm.domain;
 
 import org.apache.linkis.governance.common.conf.GovernanceCommonConf;
+import org.apache.linkis.manager.common.conf.RMConfiguration;
 import org.apache.linkis.manager.label.builder.CombinedLabelBuilder;
+import org.apache.linkis.manager.label.conf.LabelManagerConf;
 import org.apache.linkis.manager.label.entity.CombinedLabel;
 import org.apache.linkis.manager.label.entity.Label;
 import org.apache.linkis.manager.label.entity.ResourceLabel;
+import org.apache.linkis.manager.label.entity.cluster.ClusterLabel;
 import org.apache.linkis.manager.label.entity.em.EMInstanceLabel;
 import org.apache.linkis.manager.label.entity.engine.EngineInstanceLabel;
 import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel;
@@ -49,7 +52,8 @@ public class RMLabelContainer {
   private EngineTypeLabel engineTypeLabel;
   private UserCreatorLabel userCreatorLabel;
   private EngineInstanceLabel engineInstanceLabel;
-  private CombinedLabel combinedUserCreatorEngineTypeLabel;
+  private ClusterLabel clusterLabel;
+  private CombinedLabel combinedResourceLabel;
   private Label currentLabel;
 
   public RMLabelContainer(List<Label<?>> labels) {
@@ -57,14 +61,16 @@ public class RMLabelContainer {
     this.lockedLabels = Lists.newArrayList();
     try {
       if (getUserCreatorLabel() != null && getEngineTypeLabel() != null) {
-        this.combinedUserCreatorEngineTypeLabel =
-            (CombinedLabel)
-                combinedLabelBuilder.build(
-                    "", Lists.newArrayList(getUserCreatorLabel(), 
getEngineTypeLabel()));
-        this.labels.add(combinedUserCreatorEngineTypeLabel);
+        List<Label> combinedLabel = Lists.newArrayList(getUserCreatorLabel(), 
getEngineTypeLabel());
+        ClusterLabel clusterLabel = getClusterLabel();
+        if (shouldCombinedClusterLabel(clusterLabel)) {
+          combinedLabel.add(clusterLabel);
+        }
+        this.combinedResourceLabel = (CombinedLabel) 
combinedLabelBuilder.build("", combinedLabel);
+        this.labels.add(combinedResourceLabel);
       }
     } catch (Exception e) {
-      logger.warn("failed to get combinedUserCreatorEngineTypeLabel", e);
+      logger.warn("failed to get combinedResourceLabel", e);
     }
     this.labels = LabelUtils.distinctLabel(this.labels, labels);
   }
@@ -156,8 +162,31 @@ public class RMLabelContainer {
     return null;
   }
 
-  public CombinedLabel getCombinedUserCreatorEngineTypeLabel() {
-    return combinedUserCreatorEngineTypeLabel;
+  public ClusterLabel getClusterLabel() {
+    if (clusterLabel == null) {
+      for (Label label : labels) {
+        if (label instanceof ClusterLabel) {
+          return (ClusterLabel) label;
+        }
+      }
+    } else {
+      return clusterLabel;
+    }
+    logger.warn("ClusterLabel not found");
+    return null;
+  }
+
+  private boolean shouldCombinedClusterLabel(ClusterLabel clusterLabel) {
+    return !(clusterLabel == null
+        || (LabelManagerConf.COMBINED_WITHOUT_YARN_DEFAULT
+            && clusterLabel
+                .getClusterName()
+                .equals(RMConfiguration.DEFAULT_YARN_CLUSTER_NAME.getValue())
+            && 
clusterLabel.getClusterType().equals(RMConfiguration.DEFAULT_YARN_TYPE.getValue())));
+  }
+
+  public CombinedLabel getCombinedResourceLabel() {
+    return combinedResourceLabel;
   }
 
   public Label getCurrentLabel() {
@@ -195,6 +224,8 @@ public class RMLabelContainer {
         + userCreatorLabel
         + ", engineInstanceLabel="
         + engineInstanceLabel
+        + ", clusterLabel="
+        + clusterLabel
         + ", currentLabel="
         + currentLabel
         + '}';
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/kubernetes/KubernetesResourceRequester.java
 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/kubernetes/KubernetesResourceRequester.java
index 3bc7004e3..c886b622a 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/kubernetes/KubernetesResourceRequester.java
+++ 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/kubernetes/KubernetesResourceRequester.java
@@ -24,8 +24,11 @@ import 
org.apache.linkis.manager.rm.external.domain.ExternalResourceProvider;
 import org.apache.linkis.manager.rm.external.request.ExternalResourceRequester;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 
+import java.io.File;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -33,8 +36,10 @@ import io.fabric8.kubernetes.api.model.Node;
 import io.fabric8.kubernetes.api.model.Quantity;
 import io.fabric8.kubernetes.api.model.ResourceQuota;
 import io.fabric8.kubernetes.api.model.metrics.v1beta1.NodeMetrics;
+import io.fabric8.kubernetes.client.Config;
 import io.fabric8.kubernetes.client.ConfigBuilder;
 import io.fabric8.kubernetes.client.DefaultKubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,7 +50,7 @@ public class KubernetesResourceRequester implements 
ExternalResourceRequester {
   @Override
   public NodeResource requestResourceInfo(
       ExternalResourceIdentifier identifier, ExternalResourceProvider 
provider) {
-    String k8sMasterUrl = (String) provider.getConfigMap().get("k8sMasterUrl");
+    String k8sMasterUrl = getK8sMasterUrl(provider);
     DefaultKubernetesClient client = clientMap.get(k8sMasterUrl);
     if (client == null) {
       constructKubernetesClient(provider);
@@ -154,8 +159,7 @@ public class KubernetesResourceRequester implements 
ExternalResourceRequester {
   @Override
   public Boolean reloadExternalResourceAddress(ExternalResourceProvider 
provider) {
     if (null != provider) {
-      DefaultKubernetesClient client =
-          clientMap.get((String) provider.getConfigMap().get("k8sMasterUrl"));
+      DefaultKubernetesClient client = 
clientMap.get(getK8sMasterUrl(provider));
       if (client != null) {
         client.close();
       }
@@ -164,19 +168,42 @@ public class KubernetesResourceRequester implements 
ExternalResourceRequester {
     return true;
   }
 
+  private String getK8sMasterUrl(ExternalResourceProvider provider) {
+    Map<String, Object> configMap = provider.getConfigMap();
+    String k8sMasterUrl = (String) configMap.get("k8sMasterUrl");
+    if (StringUtils.isBlank(k8sMasterUrl)) {
+      throw new IllegalArgumentException("k8sMasterUrl is empty, please check 
the configuration.");
+    }
+    return k8sMasterUrl;
+  }
+
   private void constructKubernetesClient(ExternalResourceProvider provider) {
-    String k8sMasterUrl = (String) provider.getConfigMap().get("k8sMasterUrl");
-    String k8sClientCertData = (String) 
provider.getConfigMap().get("k8sClientCertData");
-    String k8sClientKeyData = (String) 
provider.getConfigMap().get("k8sClientKeyData");
-    String k8sCaCertData = (String) 
provider.getConfigMap().get("k8sCaCertData");
-    DefaultKubernetesClient client =
-        new DefaultKubernetesClient(
-            new ConfigBuilder()
-                .withMasterUrl(k8sMasterUrl)
-                .withClientCertData(k8sClientCertData)
-                .withClientKeyData(k8sClientKeyData)
-                .withCaCertData(k8sCaCertData)
-                .build());
+    DefaultKubernetesClient client;
+    Map<String, Object> configMap = provider.getConfigMap();
+    String k8sMasterUrl = getK8sMasterUrl(provider);
+    try {
+      String k8sConfig = (String) configMap.get("k8sConfig");
+      if (StringUtils.isNotBlank(k8sConfig)) {
+        Config kubeConfig =
+            Config.fromKubeconfig(
+                null, FileUtils.readFileToString(new File(k8sConfig), 
"UTF-8"), null);
+        client = new DefaultKubernetesClient(kubeConfig);
+      } else {
+        String k8sClientCertData = (String) configMap.get("k8sClientCertData");
+        String k8sClientKeyData = (String) configMap.get("k8sClientKeyData");
+        String k8sCaCertData = (String) configMap.get("k8sCaCertData");
+        client =
+            new DefaultKubernetesClient(
+                new ConfigBuilder()
+                    .withMasterUrl(k8sMasterUrl)
+                    .withClientCertData(k8sClientCertData)
+                    .withClientKeyData(k8sClientKeyData)
+                    .withCaCertData(k8sCaCertData)
+                    .build());
+      }
+    } catch (Exception e) {
+      throw new KubernetesClientException("Fail to build k8s client. ", e);
+    }
     clientMap.put(k8sMasterUrl, client);
   }
 }
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/message/RMMessageService.java
 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/message/RMMessageService.java
index ff58f933b..88985bf52 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/message/RMMessageService.java
+++ 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/message/RMMessageService.java
@@ -53,9 +53,7 @@ public class RMMessageService {
     } catch (Exception e) {
       RMLabelContainer nodeLabels = new RMLabelContainer(labels);
       String value =
-          Optional.of(nodeLabels.getCombinedUserCreatorEngineTypeLabel())
-              .map(Object::toString)
-              .orElse("");
+          
Optional.of(nodeLabels.getCombinedResourceLabel()).map(Object::toString).orElse("");
       logger.warn(
           String.format(
               "usedResource failed, request from:%s, request engine: %s, ",
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/RequestResourceService.java
 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/RequestResourceService.java
index 4c1237cc4..c88b39e52 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/RequestResourceService.java
+++ 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/RequestResourceService.java
@@ -59,9 +59,7 @@ public abstract class RequestResourceService {
     NodeResource labelResource =
         
labelResourceService.getLabelResource(labelContainer.getCurrentLabel());
     Resource requestResource = resource.getMinResource();
-    if (labelContainer
-        .getCombinedUserCreatorEngineTypeLabel()
-        .equals(labelContainer.getCurrentLabel())) {
+    if 
(labelContainer.getCombinedResourceLabel().equals(labelContainer.getCurrentLabel()))
 {
       if (labelResource == null) {
         labelResource = new CommonNodeResource();
         labelResource.setResourceType(resource.getResourceType());
@@ -92,7 +90,7 @@ public abstract class RequestResourceService {
       labelResourceService.setLabelResource(
           labelContainer.getCurrentLabel(),
           labelResource,
-          
labelContainer.getCombinedUserCreatorEngineTypeLabel().getStringValue());
+          labelContainer.getCombinedResourceLabel().getStringValue());
       logger.debug(
           labelContainer.getCurrentLabel()
               + " to request ["
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.java
 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.java
index 7ecf3f48d..672963e30 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.java
+++ 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.java
@@ -247,8 +247,7 @@ public class DefaultResourceManager extends ResourceManager 
implements Initializ
 
     List<PersistenceLock> persistenceLocks = new ArrayList<>();
     EMInstanceLabel emInstanceLabel = labelContainer.getEMInstanceLabel();
-    CombinedLabel userCreatorEngineTypeLabel =
-        labelContainer.getCombinedUserCreatorEngineTypeLabel();
+    CombinedLabel combinedLabel = labelContainer.getCombinedResourceLabel();
 
     try {
       // check ecm resource if not enough return
@@ -266,14 +265,12 @@ public class DefaultResourceManager extends 
ResourceManager implements Initializ
 
       // lock userCreatorEngineTypeLabel
       persistenceLocks.add(
-          tryLockOneLabel(
-              userCreatorEngineTypeLabel, wait, 
labelContainer.getUserCreatorLabel().getUser()));
+          tryLockOneLabel(combinedLabel, wait, 
labelContainer.getUserCreatorLabel().getUser()));
       try {
-        labelContainer.setCurrentLabel(userCreatorEngineTypeLabel);
+        labelContainer.setCurrentLabel(combinedLabel);
         if (!requestResourceService.canRequest(labelContainer, resource)) {
           return new NotEnoughResource(
-              String.format(
-                  "Labels:%s not enough resource", 
userCreatorEngineTypeLabel.getStringValue()));
+              String.format("Labels:%s not enough resource", 
combinedLabel.getStringValue()));
         }
       } catch (RMWarnException exception) {
         return new NotEnoughResource(exception.getMessage());
@@ -294,9 +291,7 @@ public class DefaultResourceManager extends ResourceManager 
implements Initializ
           labelResource.setLockedResource(
               
labelResource.getLockedResource().add(resource.getLockedResource()));
           labelResourceService.setLabelResource(
-              label,
-              labelResource,
-              
labelContainer.getCombinedUserCreatorEngineTypeLabel().getStringValue());
+              label, labelResource, 
labelContainer.getCombinedResourceLabel().getStringValue());
           logger.info(
               String.format(
                   "ResourceChanged:%s --> %s", label.getStringValue(), 
labelResource.toString()));
@@ -330,9 +325,7 @@ public class DefaultResourceManager extends ResourceManager 
implements Initializ
 
     // add ec resource
     labelResourceService.setEngineConnLabelResource(
-        engineInstanceLabel,
-        resource,
-        
labelContainer.getCombinedUserCreatorEngineTypeLabel().getStringValue());
+        engineInstanceLabel, resource, 
labelContainer.getCombinedResourceLabel().getStringValue());
     // record engine locked resource
     labelContainer.getLabels().add(engineInstanceLabel);
     resourceLogService.recordUserResourceAction(
@@ -441,7 +434,7 @@ public class DefaultResourceManager extends ResourceManager 
implements Initializ
       labelResourceService.setLabelResource(
           engineInstanceLabel,
           lockedResource,
-          
labelContainer.getCombinedUserCreatorEngineTypeLabel().getStringValue());
+          labelContainer.getCombinedResourceLabel().getStringValue());
       resourceLogService.success(
           ChangeType.ENGINE_INIT, lockedResource.getLockedResource(), 
engineInstanceLabel, null);
     } catch (Exception exception) {
@@ -481,7 +474,7 @@ public class DefaultResourceManager extends ResourceManager 
implements Initializ
                 labelResourceService.setLabelResource(
                     label,
                     labelResource,
-                    
labelContainer.getCombinedUserCreatorEngineTypeLabel().getStringValue());
+                    
labelContainer.getCombinedResourceLabel().getStringValue());
                 labelResourceSet.add(
                     new LabelResourceMapping(label, addedResource, 
ResourceOperationType.USED));
                 resourceCheck(label, labelResource);
@@ -493,7 +486,7 @@ public class DefaultResourceManager extends ResourceManager 
implements Initializ
 
         if (label
             .getClass()
-            
.isAssignableFrom(labelContainer.getCombinedUserCreatorEngineTypeLabel().getClass()))
 {
+            
.isAssignableFrom(labelContainer.getCombinedResourceLabel().getClass())) {
           resourceLogService.recordUserResourceAction(
               labelContainer,
               persistenceResource.getTicketId(),
@@ -704,9 +697,7 @@ public class DefaultResourceManager extends ResourceManager 
implements Initializ
                         labelResourceService.setLabelResource(
                             label,
                             labelResource,
-                            labelContainer
-                                .getCombinedUserCreatorEngineTypeLabel()
-                                .getStringValue());
+                            
labelContainer.getCombinedResourceLabel().getStringValue());
                         resourceCheck(label, labelResource);
                       }
                     },
@@ -725,8 +716,7 @@ public class DefaultResourceManager extends ResourceManager 
implements Initializ
 
                 if (label
                     .getClass()
-                    .isAssignableFrom(
-                        
labelContainer.getCombinedUserCreatorEngineTypeLabel().getClass())) {
+                    
.isAssignableFrom(labelContainer.getCombinedResourceLabel().getClass())) {
                   resourceLogService.recordUserResourceAction(
                       labelContainer,
                       persistenceResource.getTicketId(),
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/ResourceLogService.java
 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/ResourceLogService.java
index dd563f5a9..c5890132c 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/ResourceLogService.java
+++ 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/ResourceLogService.java
@@ -150,11 +150,10 @@ public class ResourceLogService {
     if (RMUtils.RM_RESOURCE_ACTION_RECORD.getValue()) {
       LinkisUtils.tryAndWarn(
           () -> {
-            CombinedLabel userCreatorEngineType =
-                labelContainer.getCombinedUserCreatorEngineTypeLabel();
+            CombinedLabel combinedLabel = 
labelContainer.getCombinedResourceLabel();
             EngineInstanceLabel engineInstanceLabel = 
labelContainer.getEngineInstanceLabel();
             EMInstanceLabel eMInstanceLabel = 
labelContainer.getEMInstanceLabel();
-            if (userCreatorEngineType == null) {
+            if (combinedLabel == null) {
               return;
             }
             ECResourceInfoRecord ecResourceInfoRecord =
@@ -171,11 +170,7 @@ public class ResourceLogService {
                       : "";
               ecResourceInfoRecord =
                   new ECResourceInfoRecord(
-                      userCreatorEngineType.getStringValue(),
-                      user,
-                      ticketId,
-                      resource,
-                      logDirSuffix);
+                      combinedLabel.getStringValue(), user, ticketId, 
resource, logDirSuffix);
               
ecResourceRecordMapper.insertECResourceInfoRecord(ecResourceInfoRecord);
             }
             if (engineInstanceLabel != null) {
diff --git 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
index 37a0e2c98..1768b77d0 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
@@ -53,6 +53,7 @@ public class SparkConfig {
 
   private String k8sDriverRequestCores;
   private String k8sExecutorRequestCores;
+  private String k8sSparkUIPort;
   private String deployMode = "client"; // ("client") // todo cluster
   private String appResource; // ("")
   private String appName; // ("")
@@ -78,6 +79,7 @@ public class SparkConfig {
   private String principal; // ("--principal", "")
   private String keytab; // ("--keytab", "")
   private String queue; // ("--queue", "")
+  private String pyFiles; // ("--py-files", "")
 
   public String getK8sFileUploadPath() {
     return k8sFileUploadPath;
@@ -195,6 +197,14 @@ public class SparkConfig {
     this.k8sExecutorRequestCores = k8sExecutorRequestCores;
   }
 
+  public String getK8sSparkUIPort() {
+    return k8sSparkUIPort;
+  }
+
+  public void setK8sSparkUIPort(String k8sSparkUIPort) {
+    this.k8sSparkUIPort = k8sSparkUIPort;
+  }
+
   public String getJavaHome() {
     return javaHome;
   }
@@ -419,6 +429,14 @@ public class SparkConfig {
     this.queue = queue;
   }
 
+  public String getPyFiles() {
+    return pyFiles;
+  }
+
+  public void setPyFiles(String pyFiles) {
+    this.pyFiles = pyFiles;
+  }
+
   @Override
   public String toString() {
     return "SparkConfig{"
@@ -467,6 +485,9 @@ public class SparkConfig {
         + ", k8sExecutorRequestCores='"
         + k8sExecutorRequestCores
         + '\''
+        + ", k8sSparkUIPort='"
+        + k8sSparkUIPort
+        + '\''
         + ", deployMode='"
         + deployMode
         + '\''
@@ -534,6 +555,9 @@ public class SparkConfig {
         + ", queue='"
         + queue
         + '\''
+        + ", pyFiles='"
+        + pyFiles
+        + '\''
         + '}';
   }
 }
diff --git 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesApplicationClusterDescriptorAdapter.java
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesApplicationClusterDescriptorAdapter.java
index 0ee0380fb..ce709b2e7 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesApplicationClusterDescriptorAdapter.java
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesApplicationClusterDescriptorAdapter.java
@@ -72,6 +72,7 @@ public class KubernetesApplicationClusterDescriptorAdapter 
extends ClusterDescri
     this.driverPodName = generateDriverPodName(sparkConfig.getAppName());
     this.namespace = sparkConfig.getK8sNamespace();
     setConf(sparkLauncher, "spark.app.name", sparkConfig.getAppName());
+    setConf(sparkLauncher, "spark.ui.port", sparkConfig.getK8sSparkUIPort());
     setConf(sparkLauncher, "spark.kubernetes.namespace", this.namespace);
     setConf(sparkLauncher, "spark.kubernetes.container.image", 
sparkConfig.getK8sImage());
     setConf(sparkLauncher, "spark.kubernetes.driver.pod.name", 
this.driverPodName);
@@ -111,6 +112,7 @@ public class KubernetesApplicationClusterDescriptorAdapter 
extends ClusterDescri
     addSparkArg(sparkLauncher, "--num-executors", 
sparkConfig.getNumExecutors().toString());
     addSparkArg(sparkLauncher, "--principal", sparkConfig.getPrincipal());
     addSparkArg(sparkLauncher, "--keytab", sparkConfig.getKeytab());
+    addSparkArg(sparkLauncher, "--py-files", sparkConfig.getPyFiles());
     sparkLauncher.setAppResource(sparkConfig.getAppResource());
     sparkLauncher.setMainClass(mainClass);
     Arrays.stream(args.split("\\s+"))
@@ -164,12 +166,12 @@ public class 
KubernetesApplicationClusterDescriptorAdapter extends ClusterDescri
     return client.pods().inNamespace(namespace).withName(driverPodName).get();
   }
 
-  public String getSparkDriverPodIP() {
+  public String getSparkUIUrl() {
     Pod sparkDriverPod = getSparkDriverPod();
     if (null != sparkDriverPod) {
       String sparkDriverPodIP = sparkDriverPod.getStatus().getPodIP();
       if (StringUtils.isNotBlank(sparkDriverPodIP)) {
-        return sparkDriverPodIP;
+        return sparkDriverPodIP + ":" + this.sparkConfig.getK8sSparkUIPort();
       } else {
         logger.info("spark driver pod IP is null, the application may be 
pending");
       }
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
index b42fc0934..bb079b7b5 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
@@ -68,6 +68,7 @@ object SparkConfiguration extends Logging {
   val SPARK_K8S_RESTART_POLICY = 
CommonVars[String]("linkis.spark.k8s.restartPolicy", "Never")
   val SPARK_K8S_SPARK_VERSION = 
CommonVars[String]("linkis.spark.k8s.sparkVersion", "3.2.1")
   val SPARK_K8S_NAMESPACE = CommonVars[String]("linkis.spark.k8s.namespace", 
"default")
+  val SPARK_K8S_UI_PORT = CommonVars[String]("linkis.spark.k8s.ui.port", 
"4040")
 
   val SPARK_K8S_EXECUTOR_REQUEST_CORES =
     CommonVars[String]("linkis.spark.k8s.executor.request.cores", "1")
@@ -80,6 +81,8 @@ object SparkConfiguration extends Logging {
 
   val SPARK_PYTHON_VERSION = CommonVars[String]("spark.python.version", 
"python")
 
+  val SPARK_PYTHON_FILES = CommonVars[String]("spark.submit.pyFiles", "")
+
   val SPARK_PYTHON_TEST_MODE_ENABLE =
     CommonVars[Boolean]("linkis.spark.python.test.mode.enable", false)
 
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnKubernetesSubmitOnceExecutor.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnKubernetesSubmitOnceExecutor.scala
index 1c3873942..e8b7dfb48 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnKubernetesSubmitOnceExecutor.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnKubernetesSubmitOnceExecutor.scala
@@ -132,9 +132,10 @@ class SparkOnKubernetesSubmitOnceExecutor(
     if (oldProgress >= 1 || jobIsFinal) {
       1
     } else {
-      val sparkDriverPodIP = this.clusterDescriptorAdapter.getSparkDriverPodIP
-      if (StringUtils.isNotBlank(sparkDriverPodIP)) {
-        val newProgress = 
SparkJobProgressUtil.getProgress(this.getApplicationId, sparkDriverPodIP)
+      val sparkUIUrl = this.clusterDescriptorAdapter.getSparkUIUrl
+      if (StringUtils.isNotBlank(sparkUIUrl)) {
+        val newProgress =
+          SparkJobProgressUtil.getProgress(this.getApplicationId, sparkUIUrl)
         if (newProgress > oldProgress) {
           oldProgress = newProgress
         }
@@ -144,9 +145,9 @@ class SparkOnKubernetesSubmitOnceExecutor(
   }
 
   override def getProgressInfo: Array[JobProgressInfo] = {
-    val sparkDriverPodIP = this.clusterDescriptorAdapter.getSparkDriverPodIP
-    if (StringUtils.isNotBlank(sparkDriverPodIP)) {
-      SparkJobProgressUtil.getSparkJobProgressInfo(this.getApplicationId, 
sparkDriverPodIP)
+    val sparkUIUrl = this.clusterDescriptorAdapter.getSparkUIUrl
+    if (StringUtils.isNotBlank(sparkUIUrl)) {
+      SparkJobProgressUtil.getSparkJobProgressInfo(this.getApplicationId, 
sparkUIUrl)
     } else {
       Array.empty
     }
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
index fbd38bcc6..a237b544b 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
@@ -113,6 +113,7 @@ class SparkEngineConnFactory extends 
MultiExecutorEngineConnFactory with Logging
       
sparkConfig.setK8sImagePullPolicy(SPARK_K8S_IMAGE_PULL_POLICY.getValue(options))
       
sparkConfig.setK8sDriverRequestCores(SPARK_K8S_DRIVER_REQUEST_CORES.getValue(options))
       
sparkConfig.setK8sExecutorRequestCores(SPARK_K8S_EXECUTOR_REQUEST_CORES.getValue(options))
+      sparkConfig.setK8sSparkUIPort(SPARK_K8S_UI_PORT.getValue(options))
     }
 
     if (master.startsWith("yarn")) {
@@ -134,6 +135,7 @@ class SparkEngineConnFactory extends 
MultiExecutorEngineConnFactory with Logging
     sparkConfig.setExecutorCores(LINKIS_SPARK_EXECUTOR_CORES.getValue(options))
     
sparkConfig.setNumExecutors(LINKIS_SPARK_EXECUTOR_INSTANCES.getValue(options))
     sparkConfig.setQueue(LINKIS_QUEUE_NAME.getValue(options))
+    sparkConfig.setPyFiles(SPARK_PYTHON_FILES.getValue(options))
 
     logger.info(s"spark_info: ${sparkConfig}")
     sparkConfig
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkOnceExecutorFactory.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkOnceExecutorFactory.scala
index 12a87e22f..5802a1c85 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkOnceExecutorFactory.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkOnceExecutorFactory.scala
@@ -56,5 +56,8 @@ class SparkOnceExecutorFactory extends OnceExecutorFactory {
     }
   }
 
+  override protected def getSupportRunTypes: Array[String] =
+    Array(RunType.JAR.toString, RunType.PYSPARK.toString)
+
   override protected def getRunType: RunType = RunType.JAR
 }
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/SparkJobProgressUtil.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/SparkJobProgressUtil.scala
index 6968ffb61..94614f902 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/SparkJobProgressUtil.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/SparkJobProgressUtil.scala
@@ -31,11 +31,12 @@ import java.util
 
 object SparkJobProgressUtil extends Logging {
 
-  def getProgress(applicationId: String, podIP: String = ""): Float = {
+  def getProgress(applicationId: String, sparkUIUrl: String = ""): Float = {
     if (StringUtils.isBlank(applicationId)) return 0f
     val sparkJobsResult =
-      if (StringUtils.isBlank(podIP)) getSparkJobInfo(applicationId)
-      else getKubernetesSparkJobInfo(applicationId, podIP)
+      if (StringUtils.isBlank(sparkUIUrl))
+        getSparkJobInfo(applicationId)
+      else getKubernetesSparkJobInfo(applicationId, sparkUIUrl)
     if (sparkJobsResult.isEmpty) return 0f
     val tuple = sparkJobsResult
       .filter(sparkJobResult => {
@@ -52,10 +53,14 @@ object SparkJobProgressUtil extends Logging {
     tuple._2.toFloat / tuple._1
   }
 
-  def getSparkJobProgressInfo(applicationId: String, podIP: String = ""): 
Array[JobProgressInfo] = {
+  def getSparkJobProgressInfo(
+      applicationId: String,
+      sparkUIUrl: String = ""
+  ): Array[JobProgressInfo] = {
     val sparkJobsResult =
-      if (StringUtils.isBlank(podIP)) getSparkJobInfo(applicationId)
-      else getKubernetesSparkJobInfo(applicationId, podIP)
+      if (StringUtils.isBlank(sparkUIUrl))
+        getSparkJobInfo(applicationId)
+      else getKubernetesSparkJobInfo(applicationId, sparkUIUrl)
     if (sparkJobsResult.isEmpty) {
       Array.empty
     } else {
@@ -104,11 +109,11 @@ object SparkJobProgressUtil extends Logging {
 
   def getKubernetesSparkJobInfo(
       applicationId: String,
-      podIP: String
+      sparkUIUrl: String
   ): Array[java.util.Map[String, Object]] =
-    if (StringUtils.isBlank(applicationId) || StringUtils.isBlank(podIP)) 
Array.empty
+    if (StringUtils.isBlank(applicationId) || StringUtils.isBlank(sparkUIUrl)) 
Array.empty
     else {
-      val getSparkJobsStateUrl = 
s"http://$podIP:4040/api/v1/applications/$applicationId";
+      val getSparkJobsStateUrl = 
s"http://$sparkUIUrl/api/v1/applications/$applicationId";
       logger.info(s"get spark job state from kubernetes spark ui, url: 
$getSparkJobsStateUrl")
       val appStateResult =
         JsonUtils.jackson.readValue(
@@ -121,7 +126,8 @@ object SparkJobProgressUtil extends Logging {
         appAttemptList.get(appAttemptList.size() - 
1).asInstanceOf[util.Map[String, Object]]
       val isLastAttemptCompleted = 
appLastAttempt.get("completed").asInstanceOf[Boolean]
       if (isLastAttemptCompleted) return Array.empty
-      val getSparkJobsInfoUrl = 
s"http://$podIP:4040/api/v1/applications/$applicationId/jobs";
+      val getSparkJobsInfoUrl =
+        s"http://$sparkUIUrl/api/v1/applications/$applicationId/jobs";
       logger.info(s"get spark job info from kubernetes spark ui: 
$getSparkJobsInfoUrl")
       val jobs = get(getSparkJobsInfoUrl)
       if (StringUtils.isBlank(jobs)) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to