This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/master by this push:
new 51704268a feat: optimize process of closing spark job on k8s (#4957)
51704268a is described below
commit 51704268ab6403baaef0ae3c9d8da9128d168a8b
Author: zlucelia <[email protected]>
AuthorDate: Wed Nov 8 16:13:11 2023 +0800
feat: optimize process of closing spark job on k8s (#4957)
---
.../KubernetesApplicationClusterDescriptorAdapter.java | 10 +++++++---
1 file changed, 7 insertions(+), 3 deletions(-)
diff --git
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesApplicationClusterDescriptorAdapter.java
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesApplicationClusterDescriptorAdapter.java
index ce709b2e7..73892117a 100644
---
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesApplicationClusterDescriptorAdapter.java
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesApplicationClusterDescriptorAdapter.java
@@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory;
public class KubernetesApplicationClusterDescriptorAdapter extends
ClusterDescriptorAdapter {
private static final Logger logger =
-
LoggerFactory.getLogger(KubernetesOperatorClusterDescriptorAdapter.class);
+
LoggerFactory.getLogger(KubernetesApplicationClusterDescriptorAdapter.class);
protected SparkConfig sparkConfig;
protected KubernetesClient client;
@@ -66,7 +66,7 @@ public class KubernetesApplicationClusterDescriptorAdapter
extends ClusterDescri
.setJavaHome(sparkConfig.getJavaHome())
.setSparkHome(sparkConfig.getSparkHome())
.setMaster(sparkConfig.getK8sMasterUrl())
- .setDeployMode(sparkConfig.getDeployMode())
+ .setDeployMode("cluster")
.setAppName(sparkConfig.getAppName())
.setVerbose(true);
this.driverPodName = generateDriverPodName(sparkConfig.getAppName());
@@ -196,12 +196,16 @@ public class
KubernetesApplicationClusterDescriptorAdapter extends ClusterDescri
@Override
public void close() {
logger.info("Start to close job {}.", getApplicationId());
+ client.close();
+ if (isDisposed()) {
+ logger.info("Job has finished, close action return.");
+ return;
+ }
PodResource<Pod> sparkDriverPodResource =
client.pods().inNamespace(namespace).withName(driverPodName);
if (null != sparkDriverPodResource.get()) {
sparkDriverPodResource.delete();
}
- client.close();
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]