This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/master by this push:
new 3dd9b0c68 Add configuration spark.kubernetes.file.upload.path (#4829)
3dd9b0c68 is described below
commit 3dd9b0c6897c3178d6ffa573b2cbba2882a5d99a
Author: ChengJie1053 <[email protected]>
AuthorDate: Thu Jul 27 17:49:55 2023 +0800
Add configuration spark.kubernetes.file.upload.path (#4829)
* Set the default value for SPARK_K8S_CONFIG_FILE
* Add configuration spark.kubernetes.file.upload.path
* Delete useless parameters
---
.../spark/client/context/SparkConfig.java | 12 ++++++++
...KubernetesOperatorClusterDescriptorAdapter.java | 12 ++++++++
.../deployment/crds/SparkApplicationSpec.java | 33 ++++++++++++++++++++++
.../spark/config/SparkConfiguration.scala | 3 ++
.../spark/factory/SparkEngineConnFactory.scala | 1 +
5 files changed, 61 insertions(+)
diff --git
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
index 4e4654f92..24d3ddcb2 100644
---
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
@@ -47,6 +47,7 @@ public class SparkConfig {
private String k8sSparkVersion;
private String k8sNamespace;
+ private String k8sFileUploadPath;
private String deployMode = "client"; // ("client") // todo cluster
private String appResource; // ("")
private String appName; // ("")
@@ -73,6 +74,14 @@ public class SparkConfig {
private String keytab; // ("--keytab", "")
private String queue; // ("--queue", "")
+ public String getK8sFileUploadPath() {
+ return k8sFileUploadPath;
+ }
+
+ public void setK8sFileUploadPath(String k8sFileUploadPath) {
+ this.k8sFileUploadPath = k8sFileUploadPath;
+ }
+
public String getK8sImagePullPolicy() {
return k8sImagePullPolicy;
}
@@ -421,6 +430,9 @@ public class SparkConfig {
+ ", k8sSparkVersion='"
+ k8sSparkVersion
+ '\''
+ + ", k8sFileUploadPath='"
+ + k8sFileUploadPath
+ + '\''
+ ", k8sNamespace='"
+ k8sNamespace
+ '\''
diff --git
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java
index 2f0318254..fa6236600 100644
---
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java
@@ -21,11 +21,13 @@ import
org.apache.linkis.engineplugin.spark.client.context.ExecutionContext;
import org.apache.linkis.engineplugin.spark.client.context.SparkConfig;
import org.apache.linkis.engineplugin.spark.client.deployment.crds.*;
import
org.apache.linkis.engineplugin.spark.client.deployment.util.KubernetesHelper;
+import org.apache.linkis.engineplugin.spark.config.SparkConfiguration;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.launcher.SparkAppHandle;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -79,6 +81,7 @@ public class KubernetesOperatorClusterDescriptorAdapter
extends ClusterDescripto
NonNamespaceOperation<SparkApplication, SparkApplicationList,
Resource<SparkApplication>>
sparkApplicationClient = getSparkApplicationClient(client);
+
SparkApplication sparkApplication =
getSparkApplication(sparkConfig.getAppName(),
sparkConfig.getK8sNamespace());
@@ -88,12 +91,19 @@ public class KubernetesOperatorClusterDescriptorAdapter
extends ClusterDescripto
.memory(sparkConfig.getDriverMemory())
.serviceAccount(sparkConfig.getK8sServiceAccount())
.build();
+
SparkPodSpec executor =
SparkPodSpec.Builder()
.cores(sparkConfig.getExecutorCores())
.instances(sparkConfig.getNumExecutors())
.memory(sparkConfig.getExecutorMemory())
.build();
+
+ Map<String, String> sparkConfMap = new HashMap<>();
+ sparkConfMap.put(
+ SparkConfiguration.SPARK_KUBERNETES_FILE_UPLOAD_PATH().key(),
+ sparkConfig.getK8sFileUploadPath());
+
SparkApplicationSpec sparkApplicationSpec =
SparkApplicationSpec.Builder()
.type(sparkConfig.getK8sLanguageType())
@@ -107,10 +117,12 @@ public class KubernetesOperatorClusterDescriptorAdapter
extends ClusterDescripto
.restartPolicy(new
RestartPolicy(sparkConfig.getK8sRestartPolicy()))
.driver(driver)
.executor(executor)
+ .sparkConf(sparkConfMap)
.build();
logger.info("Spark k8s operator task parameters: {}",
sparkApplicationSpec);
sparkApplication.setSpec(sparkApplicationSpec);
+
SparkApplication created =
sparkApplicationClient.createOrReplace(sparkApplication);
logger.info("Preparing to submit the Spark k8s operator Task: {}",
created);
diff --git
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkApplicationSpec.java
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkApplicationSpec.java
index 80c03f2df..be705ce40 100644
---
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkApplicationSpec.java
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkApplicationSpec.java
@@ -17,7 +17,9 @@
package org.apache.linkis.engineplugin.spark.client.deployment.crds;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import io.fabric8.kubernetes.api.model.KubernetesResource;
@@ -45,6 +47,16 @@ public class SparkApplicationSpec implements
KubernetesResource {
private SparkPodSpec executor;
+ private Map<String, String> sparkConf;
+
+ public Map<String, String> getSparkConf() {
+ return sparkConf;
+ }
+
+ public void setSparkConf(Map<String, String> sparkConf) {
+ this.sparkConf = sparkConf;
+ }
+
public String getType() {
return type;
}
@@ -165,6 +177,8 @@ public class SparkApplicationSpec implements
KubernetesResource {
+ driver
+ ", executor="
+ executor
+ + ", sparkConf="
+ + sparkConf
+ '}';
}
@@ -185,6 +199,8 @@ public class SparkApplicationSpec implements
KubernetesResource {
private SparkPodSpec driver;
private SparkPodSpec executor;
+ private Map<String, String> sparkConf;
+
private SparkApplicationSpecBuilder() {}
public SparkApplicationSpecBuilder type(String type) {
@@ -242,6 +258,22 @@ public class SparkApplicationSpec implements
KubernetesResource {
return this;
}
+ public SparkApplicationSpecBuilder sparkConf(Map<String, String>
sparkConf) {
+ if (sparkConf == null || sparkConf.size() == 0) {
+ return this;
+ }
+
+ if (this.sparkConf == null) {
+ this.sparkConf = new HashMap<>();
+ }
+
+ for (Map.Entry<String, String> entry : sparkConf.entrySet()) {
+ this.sparkConf.put(entry.getKey(), entry.getValue());
+ }
+
+ return this;
+ }
+
public SparkApplicationSpec build() {
SparkApplicationSpec sparkApplicationSpec = new SparkApplicationSpec();
sparkApplicationSpec.type = this.type;
@@ -255,6 +287,7 @@ public class SparkApplicationSpec implements
KubernetesResource {
sparkApplicationSpec.executor = this.executor;
sparkApplicationSpec.image = this.image;
sparkApplicationSpec.restartPolicy = this.restartPolicy;
+ sparkApplicationSpec.sparkConf = this.sparkConf;
return sparkApplicationSpec;
}
}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
index b81b21e76..ecc37597d 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
@@ -62,6 +62,9 @@ object SparkConfiguration extends Logging {
val SPARK_K8S_SPARK_VERSION =
CommonVars[String]("linkis.spark.k8s.sparkVersion", "3.2.1")
val SPARK_K8S_NAMESPACE = CommonVars[String]("linkis.spark.k8s.namespace",
"default")
+ val SPARK_KUBERNETES_FILE_UPLOAD_PATH =
+ CommonVars[String]("spark.kubernetes.file.upload.path",
"local:///opt/spark/tmp")
+
val SPARK_PYTHON_VERSION = CommonVars[String]("spark.python.version",
"python")
val SPARK_PYTHON_TEST_MODE_ENABLE =
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
index fef3f0699..5bf90c6bf 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
@@ -104,6 +104,7 @@ class SparkEngineConnFactory extends
MultiExecutorEngineConnFactory with Logging
sparkConfig.setK8sPassword(SPARK_K8S_PASSWORD.getValue(options))
sparkConfig.setK8sImage(SPARK_K8S_IMAGE.getValue(options))
sparkConfig.setK8sNamespace(SPARK_K8S_NAMESPACE.getValue(options))
+
sparkConfig.setK8sFileUploadPath(SPARK_KUBERNETES_FILE_UPLOAD_PATH.getValue(options))
sparkConfig.setK8sSparkVersion(SPARK_K8S_SPARK_VERSION.getValue(options))
sparkConfig.setK8sRestartPolicy(SPARK_K8S_RESTART_POLICY.getValue(options))
sparkConfig.setK8sLanguageType(SPARK_K8S_LANGUAGE_TYPE.getValue(options))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]