This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/master by this push:
new 5ca67262a feat: support spark submit jar on k8s (#4867)
5ca67262a is described below
commit 5ca67262ad93ab09c3338393155f3f5fa8717078
Author: zlucelia <[email protected]>
AuthorDate: Fri Sep 8 14:38:54 2023 +0800
feat: support spark submit jar on k8s (#4867)
* feat: support spark submit jar on k8s
* feat: add spark cores setting priority
* feat: use UUID to generate driverPodName
* feat: optimize code of executor creation
---
.../spark/client/context/SparkConfig.java | 25 +++
.../ClusterDescriptorAdapterFactory.java | 9 +-
...ernetesApplicationClusterDescriptorAdapter.java | 231 +++++++++++++++++++++
.../SparkOnKubernetesSubmitOnceExecutor.scala | 163 +++++++++++++++
.../spark/factory/SparkEngineConnFactory.scala | 2 +
.../factory/SparkEngineConnResourceFactory.scala | 8 +-
.../spark/factory/SparkOnceExecutorFactory.scala | 22 +-
.../spark/utils/SparkJobProgressUtil.scala | 45 +++-
8 files changed, 494 insertions(+), 11 deletions(-)
diff --git
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
index 3d0fc0ff3..37a0e2c98 100644
---
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
@@ -50,6 +50,9 @@ public class SparkConfig {
private String k8sNamespace;
private String k8sFileUploadPath;
+
+ private String k8sDriverRequestCores;
+ private String k8sExecutorRequestCores;
private String deployMode = "client"; // ("client") // todo cluster
private String appResource; // ("")
private String appName; // ("")
@@ -176,6 +179,22 @@ public class SparkConfig {
this.k8sImage = k8sImage;
}
+ public String getK8sDriverRequestCores() {
+ return k8sDriverRequestCores;
+ }
+
+ public void setK8sDriverRequestCores(String k8sDriverRequestCores) {
+ this.k8sDriverRequestCores = k8sDriverRequestCores;
+ }
+
+ public String getK8sExecutorRequestCores() {
+ return k8sExecutorRequestCores;
+ }
+
+ public void setK8sExecutorRequestCores(String k8sExecutorRequestCores) {
+ this.k8sExecutorRequestCores = k8sExecutorRequestCores;
+ }
+
public String getJavaHome() {
return javaHome;
}
@@ -442,6 +461,12 @@ public class SparkConfig {
+ ", k8sNamespace='"
+ k8sNamespace
+ '\''
+ + ", k8sDriverRequestCores='"
+ + k8sDriverRequestCores
+ + '\''
+ + ", k8sExecutorRequestCores='"
+ + k8sExecutorRequestCores
+ + '\''
+ ", deployMode='"
+ deployMode
+ '\''
diff --git
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
index 91d3eafb6..bc67a33e9 100644
---
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
@@ -29,8 +29,13 @@ public class ClusterDescriptorAdapterFactory {
ClusterDescriptorAdapter clusterDescriptorAdapter =
new YarnApplicationClusterDescriptorAdapter(executionContext);
- if (StringUtils.isNotBlank(master) &&
master.equalsIgnoreCase("k8s-operator")) {
- clusterDescriptorAdapter = new
KubernetesOperatorClusterDescriptorAdapter(executionContext);
+ if (StringUtils.isNotBlank(master)) {
+ if (master.equalsIgnoreCase("k8s-operator")) {
+ clusterDescriptorAdapter = new
KubernetesOperatorClusterDescriptorAdapter(executionContext);
+ } else if (master.equalsIgnoreCase("k8s-native")) {
+ clusterDescriptorAdapter =
+ new
KubernetesApplicationClusterDescriptorAdapter(executionContext);
+ }
}
return clusterDescriptorAdapter;
diff --git
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesApplicationClusterDescriptorAdapter.java
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesApplicationClusterDescriptorAdapter.java
new file mode 100644
index 000000000..0ee0380fb
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesApplicationClusterDescriptorAdapter.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.client.deployment;
+
+import org.apache.linkis.engineplugin.spark.client.context.ExecutionContext;
+import org.apache.linkis.engineplugin.spark.client.context.SparkConfig;
+import
org.apache.linkis.engineplugin.spark.client.deployment.util.KubernetesHelper;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.util.Strings;
+import org.apache.spark.launcher.CustomSparkSubmitLauncher;
+import org.apache.spark.launcher.SparkAppHandle;
+import org.apache.spark.launcher.SparkLauncher;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.UUID;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.dsl.PodResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KubernetesApplicationClusterDescriptorAdapter extends
ClusterDescriptorAdapter {
+ private static final Logger logger =
+
LoggerFactory.getLogger(KubernetesOperatorClusterDescriptorAdapter.class);
+
+ protected SparkConfig sparkConfig;
+ protected KubernetesClient client;
+ protected String driverPodName;
+ protected String namespace;
+
+ public KubernetesApplicationClusterDescriptorAdapter(ExecutionContext
executionContext) {
+ super(executionContext);
+ this.sparkConfig = executionContext.getSparkConfig();
+ this.client =
+ KubernetesHelper.getKubernetesClient(
+ this.sparkConfig.getK8sConfigFile(),
+ this.sparkConfig.getK8sMasterUrl(),
+ this.sparkConfig.getK8sUsername(),
+ this.sparkConfig.getK8sPassword());
+ }
+
+ public void deployCluster(String mainClass, String args, Map<String, String>
confMap)
+ throws IOException {
+ SparkConfig sparkConfig = executionContext.getSparkConfig();
+ sparkLauncher = new CustomSparkSubmitLauncher();
+ sparkLauncher
+ .setJavaHome(sparkConfig.getJavaHome())
+ .setSparkHome(sparkConfig.getSparkHome())
+ .setMaster(sparkConfig.getK8sMasterUrl())
+ .setDeployMode(sparkConfig.getDeployMode())
+ .setAppName(sparkConfig.getAppName())
+ .setVerbose(true);
+ this.driverPodName = generateDriverPodName(sparkConfig.getAppName());
+ this.namespace = sparkConfig.getK8sNamespace();
+ setConf(sparkLauncher, "spark.app.name", sparkConfig.getAppName());
+ setConf(sparkLauncher, "spark.kubernetes.namespace", this.namespace);
+ setConf(sparkLauncher, "spark.kubernetes.container.image",
sparkConfig.getK8sImage());
+ setConf(sparkLauncher, "spark.kubernetes.driver.pod.name",
this.driverPodName);
+ setConf(
+ sparkLauncher,
+ "spark.kubernetes.driver.request.cores",
+ sparkConfig.getK8sDriverRequestCores());
+ setConf(
+ sparkLauncher,
+ "spark.kubernetes.executor.request.cores",
+ sparkConfig.getK8sExecutorRequestCores());
+ setConf(
+ sparkLauncher,
+ "spark.kubernetes.container.image.pullPolicy",
+ sparkConfig.getK8sImagePullPolicy());
+ setConf(
+ sparkLauncher,
+ "spark.kubernetes.authenticate.driver.serviceAccountName",
+ sparkConfig.getK8sServiceAccount());
+ if (confMap != null) confMap.forEach((k, v) -> sparkLauncher.setConf(k,
v));
+
+ addSparkArg(sparkLauncher, "--jars", sparkConfig.getJars());
+ addSparkArg(sparkLauncher, "--packages", sparkConfig.getPackages());
+ addSparkArg(sparkLauncher, "--exclude-packages",
sparkConfig.getExcludePackages());
+ addSparkArg(sparkLauncher, "--repositories",
sparkConfig.getRepositories());
+ addSparkArg(sparkLauncher, "--files", sparkConfig.getFiles());
+ addSparkArg(sparkLauncher, "--archives", sparkConfig.getArchives());
+ addSparkArg(sparkLauncher, "--driver-memory",
sparkConfig.getDriverMemory());
+ addSparkArg(sparkLauncher, "--driver-java-options",
sparkConfig.getDriverJavaOptions());
+ addSparkArg(sparkLauncher, "--driver-library-path",
sparkConfig.getDriverLibraryPath());
+ addSparkArg(sparkLauncher, "--driver-class-path",
sparkConfig.getDriverClassPath());
+ addSparkArg(sparkLauncher, "--executor-memory",
sparkConfig.getExecutorMemory());
+ addSparkArg(sparkLauncher, "--proxy-user", sparkConfig.getProxyUser());
+ addSparkArg(sparkLauncher, "--driver-cores",
sparkConfig.getDriverCores().toString());
+ addSparkArg(sparkLauncher, "--total-executor-cores",
sparkConfig.getTotalExecutorCores());
+ addSparkArg(sparkLauncher, "--executor-cores",
sparkConfig.getExecutorCores().toString());
+ addSparkArg(sparkLauncher, "--num-executors",
sparkConfig.getNumExecutors().toString());
+ addSparkArg(sparkLauncher, "--principal", sparkConfig.getPrincipal());
+ addSparkArg(sparkLauncher, "--keytab", sparkConfig.getKeytab());
+ sparkLauncher.setAppResource(sparkConfig.getAppResource());
+ sparkLauncher.setMainClass(mainClass);
+ Arrays.stream(args.split("\\s+"))
+ .filter(StringUtils::isNotBlank)
+ .forEach(arg -> sparkLauncher.addAppArgs(arg));
+ sparkAppHandle =
+ sparkLauncher.startApplication(
+ new SparkAppHandle.Listener() {
+ @Override
+ public void stateChanged(SparkAppHandle sparkAppHandle) {}
+
+ @Override
+ public void infoChanged(SparkAppHandle sparkAppHandle) {}
+ });
+ sparkLauncher.setSparkAppHandle(sparkAppHandle);
+ }
+
+ private void addSparkArg(SparkLauncher sparkLauncher, String key, String
value) {
+ if (StringUtils.isNotBlank(key) && StringUtils.isNotBlank(value)) {
+ sparkLauncher.addSparkArg(key, value);
+ }
+ }
+
+ private void setConf(SparkLauncher sparkLauncher, String key, String value) {
+ if (StringUtils.isNotBlank(key) && StringUtils.isNotBlank(value)) {
+ sparkLauncher.setConf(key, value);
+ }
+ }
+
+ public boolean initJobId() {
+ Pod sparkDriverPod = getSparkDriverPod();
+ if (null == sparkDriverPod) {
+ return false;
+ }
+ String sparkDriverPodPhase = sparkDriverPod.getStatus().getPhase();
+ String sparkApplicationId =
sparkDriverPod.getMetadata().getLabels().get("spark-app-selector");
+
+ if (Strings.isNotBlank(sparkApplicationId)) {
+ this.applicationId = sparkApplicationId;
+ }
+ if (Strings.isNotBlank(sparkDriverPodPhase)) {
+ this.jobState = kubernetesPodStateConvertSparkState(sparkDriverPodPhase);
+ }
+
+ // When the job is not finished, the appId is monitored; otherwise, the
status is
+ // monitored(当任务没结束时,监控appId,反之,则监控状态,这里主要防止任务过早结束,导致一直等待)
+ return null != getApplicationId() || (jobState != null &&
jobState.isFinal());
+ }
+
+ protected Pod getSparkDriverPod() {
+ return client.pods().inNamespace(namespace).withName(driverPodName).get();
+ }
+
+ public String getSparkDriverPodIP() {
+ Pod sparkDriverPod = getSparkDriverPod();
+ if (null != sparkDriverPod) {
+ String sparkDriverPodIP = sparkDriverPod.getStatus().getPodIP();
+ if (StringUtils.isNotBlank(sparkDriverPodIP)) {
+ return sparkDriverPodIP;
+ } else {
+ logger.info("spark driver pod IP is null, the application may be
pending");
+ }
+ } else {
+ logger.info("spark driver pod is not exist");
+ }
+ return "";
+ }
+
+ @Override
+ public SparkAppHandle.State getJobState() {
+ Pod sparkDriverPod = getSparkDriverPod();
+ if (null != sparkDriverPod) {
+ String sparkDriverPodPhase = sparkDriverPod.getStatus().getPhase();
+ this.jobState = kubernetesPodStateConvertSparkState(sparkDriverPodPhase);
+ logger.info("Job {} state is {}.", getApplicationId(), this.jobState);
+ return this.jobState;
+ }
+ return null;
+ }
+
+ @Override
+ public void close() {
+ logger.info("Start to close job {}.", getApplicationId());
+ PodResource<Pod> sparkDriverPodResource =
+ client.pods().inNamespace(namespace).withName(driverPodName);
+ if (null != sparkDriverPodResource.get()) {
+ sparkDriverPodResource.delete();
+ }
+ client.close();
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return this.jobState.isFinal();
+ }
+
+ public SparkAppHandle.State kubernetesPodStateConvertSparkState(String
kubernetesState) {
+ if (StringUtils.isBlank(kubernetesState)) {
+ return SparkAppHandle.State.UNKNOWN;
+ }
+ switch (kubernetesState.toUpperCase()) {
+ case "PENDING":
+ return SparkAppHandle.State.CONNECTED;
+ case "RUNNING":
+ return SparkAppHandle.State.RUNNING;
+ case "SUCCEEDED":
+ return SparkAppHandle.State.FINISHED;
+ case "FAILED":
+ return SparkAppHandle.State.FAILED;
+ default:
+ return SparkAppHandle.State.UNKNOWN;
+ }
+ }
+
+ public String generateDriverPodName(String appName) {
+ return appName + "-" + UUID.randomUUID().toString().replace("-", "") +
"-driver";
+ }
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnKubernetesSubmitOnceExecutor.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnKubernetesSubmitOnceExecutor.scala
new file mode 100644
index 000000000..1c3873942
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnKubernetesSubmitOnceExecutor.scala
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.executor
+
+import org.apache.linkis.common.utils.{ByteTimeUtils, Utils}
+import org.apache.linkis.engineconn.once.executor.{
+ OnceExecutorExecutionContext,
+ OperableOnceExecutor
+}
+import org.apache.linkis.engineplugin.spark.client.deployment.{
+ KubernetesApplicationClusterDescriptorAdapter,
+ YarnApplicationClusterDescriptorAdapter
+}
+import org.apache.linkis.engineplugin.spark.config.SparkConfiguration.{
+ SPARK_APP_CONF,
+ SPARK_APPLICATION_ARGS,
+ SPARK_APPLICATION_MAIN_CLASS
+}
+import org.apache.linkis.engineplugin.spark.context.SparkEngineConnContext
+import org.apache.linkis.engineplugin.spark.utils.SparkJobProgressUtil
+import org.apache.linkis.manager.common.entity.resource._
+import org.apache.linkis.manager.common.utils.ResourceUtils
+import org.apache.linkis.protocol.engine.JobProgressInfo
+
+import org.apache.commons.lang3.StringUtils
+
+import java.util
+
+import scala.concurrent.duration.Duration
+
+import io.fabric8.kubernetes.api.model.Quantity
+
+class SparkOnKubernetesSubmitOnceExecutor(
+ override val id: Long,
+ override protected val sparkEngineConnContext: SparkEngineConnContext
+) extends SparkOnceExecutor[KubernetesApplicationClusterDescriptorAdapter]
+ with OperableOnceExecutor {
+
+ private var oldProgress: Float = 0f
+
+ override def doSubmit(
+ onceExecutorExecutionContext: OnceExecutorExecutionContext,
+ options: Map[String, String]
+ ): Unit = {
+ val args = SPARK_APPLICATION_ARGS.getValue(options)
+ val mainClass = SPARK_APPLICATION_MAIN_CLASS.getValue(options)
+ val extConf = SPARK_APP_CONF.getValue(options)
+ val confMap = new util.HashMap[String, String]()
+ if (StringUtils.isNotBlank(extConf)) {
+ for (conf <- extConf.split("\n")) {
+ if (StringUtils.isNotBlank(conf)) {
+ val pair = conf.trim.split("=")
+ if (pair.length == 2) {
+ confMap.put(pair(0), pair(1))
+ } else {
+ logger.warn(s"ignore spark conf: $conf")
+ }
+ }
+ }
+ }
+ logger.info(
+ s"Ready to submit spark application to kubernetes, mainClass:
$mainClass, args: $args."
+ )
+ clusterDescriptorAdapter.deployCluster(mainClass, args, confMap)
+ }
+
+ override protected def waitToRunning(): Unit = {
+ // Wait until the task return applicationId (等待返回applicationId)
+ Utils.waitUntil(() => clusterDescriptorAdapter.initJobId(), Duration.Inf)
+ // Synchronize applicationId to EC SparkOnceExecutor to facilitate user
operations,
+ // such as obtaining progress and killing
jobs(将applicationId同步给EC执行器,方便用户操作,如获取进度,kill任务等)
+ setApplicationId(clusterDescriptorAdapter.getApplicationId)
+ super.waitToRunning()
+ }
+
+ override def getApplicationURL: String = ""
+
+ override def getCurrentNodeResource(): NodeResource = {
+ logger.info("Begin to get actual used resources!")
+ Utils.tryCatch({
+ val sparkConf = sparkEngineConnContext.getExecutionContext.getSparkConfig
+ val sparkNamespace = sparkConf.getK8sNamespace
+
+ val executorNum: Int = sparkConf.getNumExecutors
+ val executorMem: Long =
+ ByteTimeUtils.byteStringAsBytes(sparkConf.getExecutorMemory) *
executorNum
+ val driverMem: Long =
ByteTimeUtils.byteStringAsBytes(sparkConf.getDriverMemory)
+
+ val executorCoresQuantity =
Quantity.parse(sparkConf.getK8sExecutorRequestCores)
+ val executorCores: Long =
+ (Quantity.getAmountInBytes(executorCoresQuantity).doubleValue() *
1000).toLong * executorNum
+ val driverCoresQuantity =
Quantity.parse(sparkConf.getK8sDriverRequestCores)
+ val driverCores: Long =
+ (Quantity.getAmountInBytes(driverCoresQuantity).doubleValue() *
1000).toLong
+
+ logger.info(
+ "Current actual used resources is driverMem:" + driverMem +
",driverCores:" + driverCores + ",executorMem:" + executorMem +
",executorCores:" + executorCores + ",namespace:" + sparkNamespace
+ )
+ val usedResource = new DriverAndKubernetesResource(
+ new LoadInstanceResource(0, 0, 0),
+ new KubernetesResource(executorMem + driverMem, executorCores +
driverCores, sparkNamespace)
+ )
+ val nodeResource = new CommonNodeResource
+ nodeResource.setUsedResource(usedResource)
+
nodeResource.setResourceType(ResourceUtils.getResourceTypeByResource(usedResource))
+ nodeResource
+ })(t => {
+ logger.warn("Get actual used resource exception", t)
+ null
+ })
+ }
+
+ override def getProgress: Float = {
+ val jobIsFinal = clusterDescriptorAdapter != null &&
+ clusterDescriptorAdapter.getJobState != null &&
+ clusterDescriptorAdapter.getJobState.isFinal
+ if (oldProgress >= 1 || jobIsFinal) {
+ 1
+ } else {
+ val sparkDriverPodIP = this.clusterDescriptorAdapter.getSparkDriverPodIP
+ if (StringUtils.isNotBlank(sparkDriverPodIP)) {
+ val newProgress =
SparkJobProgressUtil.getProgress(this.getApplicationId, sparkDriverPodIP)
+ if (newProgress > oldProgress) {
+ oldProgress = newProgress
+ }
+ }
+ oldProgress
+ }
+ }
+
+ override def getProgressInfo: Array[JobProgressInfo] = {
+ val sparkDriverPodIP = this.clusterDescriptorAdapter.getSparkDriverPodIP
+ if (StringUtils.isNotBlank(sparkDriverPodIP)) {
+ SparkJobProgressUtil.getSparkJobProgressInfo(this.getApplicationId,
sparkDriverPodIP)
+ } else {
+ Array.empty
+ }
+ }
+
+ override def getMetrics: util.Map[String, Any] = {
+ new util.HashMap[String, Any]()
+ }
+
+ override def getDiagnosis: util.Map[String, Any] = {
+ new util.HashMap[String, Any]()
+ }
+
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
index bc18e2bad..e8f2cd22d 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
@@ -111,6 +111,8 @@ class SparkEngineConnFactory extends
MultiExecutorEngineConnFactory with Logging
sparkConfig.setK8sRestartPolicy(SPARK_K8S_RESTART_POLICY.getValue(options))
sparkConfig.setK8sLanguageType(SPARK_K8S_LANGUAGE_TYPE.getValue(options))
sparkConfig.setK8sImagePullPolicy(SPARK_K8S_IMAGE_PULL_POLICY.getValue(options))
+
sparkConfig.setK8sDriverRequestCores(SPARK_K8S_DRIVER_REQUEST_CORES.getValue(options))
+
sparkConfig.setK8sExecutorRequestCores(SPARK_K8S_EXECUTOR_REQUEST_CORES.getValue(options))
}
sparkConfig.setDeployMode(SPARK_DEPLOY_MODE.getValue(options))
sparkConfig.setAppResource(SPARK_APP_RESOURCE.getValue(options))
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnResourceFactory.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnResourceFactory.scala
index 922826c2a..640476a58 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnResourceFactory.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnResourceFactory.scala
@@ -113,7 +113,9 @@ class SparkEngineConnResourceFactory extends
AbstractEngineResourceFactory with
Quantity.parse(SPARK_K8S_EXECUTOR_REQUEST_CORES.getValue(properties))
(Quantity.getAmountInBytes(executorCoresQuantity).doubleValue() *
1000).toLong
} else {
- LINKIS_SPARK_EXECUTOR_CORES.getValue(properties) * 1000L
+ val sparkDefaultExecutorCores: Int =
LINKIS_SPARK_EXECUTOR_CORES.getValue(properties)
+ properties.put(SPARK_K8S_EXECUTOR_REQUEST_CORES.key,
sparkDefaultExecutorCores.toString)
+ sparkDefaultExecutorCores * 1000L
}
val executorMemory = LINKIS_SPARK_EXECUTOR_MEMORY.getValue(properties)
val executorMemoryWithUnit = if (StringUtils.isNumeric(executorMemory)) {
@@ -126,7 +128,9 @@ class SparkEngineConnResourceFactory extends
AbstractEngineResourceFactory with
Quantity.parse(SPARK_K8S_DRIVER_REQUEST_CORES.getValue(properties))
(Quantity.getAmountInBytes(executorCoresQuantity).doubleValue() *
1000).toLong
} else {
- LINKIS_SPARK_DRIVER_CORES.getValue(properties) * 1000L
+ val sparkDefaultDriverCores: Int =
LINKIS_SPARK_DRIVER_CORES.getValue(properties)
+ properties.put(SPARK_K8S_DRIVER_REQUEST_CORES.key,
sparkDefaultDriverCores.toString)
+ sparkDefaultDriverCores * 1000L
}
val driverMemory = LINKIS_SPARK_DRIVER_MEMORY.getValue(properties)
val driverMemoryWithUnit = if (StringUtils.isNumeric(driverMemory)) {
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkOnceExecutorFactory.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkOnceExecutorFactory.scala
index 25e264944..12a87e22f 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkOnceExecutorFactory.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkOnceExecutorFactory.scala
@@ -22,10 +22,16 @@ import
org.apache.linkis.engineconn.common.engineconn.EngineConn
import org.apache.linkis.engineconn.once.executor.OnceExecutor
import org.apache.linkis.engineconn.once.executor.creation.OnceExecutorFactory
import org.apache.linkis.engineplugin.spark.context.SparkEngineConnContext
-import org.apache.linkis.engineplugin.spark.executor.SparkSubmitOnceExecutor
+import org.apache.linkis.engineplugin.spark.executor.{
+ SparkOnKubernetesSubmitOnceExecutor,
+ SparkSubmitOnceExecutor
+}
+import
org.apache.linkis.manager.common.conf.RMConfiguration.DEFAULT_KUBERNETES_TYPE
import org.apache.linkis.manager.label.entity.Label
+import org.apache.linkis.manager.label.entity.cluster.ClusterLabel
import org.apache.linkis.manager.label.entity.engine.RunType
import org.apache.linkis.manager.label.entity.engine.RunType.RunType
+import org.apache.linkis.manager.label.utils.LabelUtil
class SparkOnceExecutorFactory extends OnceExecutorFactory {
@@ -34,11 +40,21 @@ class SparkOnceExecutorFactory extends OnceExecutorFactory {
engineCreationContext: EngineCreationContext,
engineConn: EngineConn,
labels: Array[Label[_]]
- ): OnceExecutor =
+ ): OnceExecutor = {
+ val clusterLabel = LabelUtil.getLabelFromArray[ClusterLabel](labels)
engineConn.getEngineConnSession match {
case context: SparkEngineConnContext =>
- new SparkSubmitOnceExecutor(id, context)
+ if (
+ null != clusterLabel &&
clusterLabel.getClusterType.equalsIgnoreCase(
+ DEFAULT_KUBERNETES_TYPE.getValue
+ )
+ ) {
+ new SparkOnKubernetesSubmitOnceExecutor(id, context)
+ } else {
+ new SparkSubmitOnceExecutor(id, context)
+ }
}
+ }
override protected def getRunType: RunType = RunType.JAR
}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/SparkJobProgressUtil.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/SparkJobProgressUtil.scala
index 196414420..6968ffb61 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/SparkJobProgressUtil.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/SparkJobProgressUtil.scala
@@ -27,11 +27,15 @@ import org.apache.http.client.methods.HttpGet
import org.apache.http.impl.client.HttpClients
import org.apache.http.util.EntityUtils
+import java.util
+
object SparkJobProgressUtil extends Logging {
- def getProgress(applicationId: String): Float = {
+ def getProgress(applicationId: String, podIP: String = ""): Float = {
if (StringUtils.isBlank(applicationId)) return 0f
- val sparkJobsResult = getSparkJobInfo(applicationId)
+ val sparkJobsResult =
+ if (StringUtils.isBlank(podIP)) getSparkJobInfo(applicationId)
+ else getKubernetesSparkJobInfo(applicationId, podIP)
if (sparkJobsResult.isEmpty) return 0f
val tuple = sparkJobsResult
.filter(sparkJobResult => {
@@ -48,8 +52,10 @@ object SparkJobProgressUtil extends Logging {
tuple._2.toFloat / tuple._1
}
- def getSparkJobProgressInfo(applicationId: String): Array[JobProgressInfo] =
{
- val sparkJobsResult = getSparkJobInfo(applicationId)
+ def getSparkJobProgressInfo(applicationId: String, podIP: String = ""):
Array[JobProgressInfo] = {
+ val sparkJobsResult =
+ if (StringUtils.isBlank(podIP)) getSparkJobInfo(applicationId)
+ else getKubernetesSparkJobInfo(applicationId, podIP)
if (sparkJobsResult.isEmpty) {
Array.empty
} else {
@@ -96,6 +102,37 @@ object SparkJobProgressUtil extends Logging {
)
}
+ def getKubernetesSparkJobInfo(
+ applicationId: String,
+ podIP: String
+ ): Array[java.util.Map[String, Object]] =
+ if (StringUtils.isBlank(applicationId) || StringUtils.isBlank(podIP))
Array.empty
+ else {
+ val getSparkJobsStateUrl =
s"http://$podIP:4040/api/v1/applications/$applicationId"
+ logger.info(s"get spark job state from kubernetes spark ui, url:
$getSparkJobsStateUrl")
+ val appStateResult =
+ JsonUtils.jackson.readValue(
+ get(getSparkJobsStateUrl),
+ classOf[java.util.Map[String, Object]]
+ )
+ val appAttemptList =
appStateResult.get("attempts").asInstanceOf[java.util.List[Object]]
+ if (appAttemptList == null || appAttemptList.size() == 0) return
Array.empty
+ val appLastAttempt =
+ appAttemptList.get(appAttemptList.size() -
1).asInstanceOf[util.Map[String, Object]]
+ val isLastAttemptCompleted =
appLastAttempt.get("completed").asInstanceOf[Boolean]
+ if (isLastAttemptCompleted) return Array.empty
+ val getSparkJobsInfoUrl =
s"http://$podIP:4040/api/v1/applications/$applicationId/jobs"
+ logger.info(s"get spark job info from kubernetes spark ui:
$getSparkJobsInfoUrl")
+ val jobs = get(getSparkJobsInfoUrl)
+ if (StringUtils.isBlank(jobs)) {
+ return Array.empty
+ }
+ JsonUtils.jackson.readValue(
+ get(getSparkJobsInfoUrl),
+ classOf[Array[java.util.Map[String, Object]]]
+ )
+ }
+
def get(url: String): String = {
val httpGet = new HttpGet(url)
val client = HttpClients.createDefault
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]