This is an automated email from the ASF dual-hosted git repository.
rarexixi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/master by this push:
new 4169e703c Spark k8s operator task Added status acquisition (#4889)
4169e703c is described below
commit 4169e703c427842d5e932b9375423d74017465ad
Author: ChengJie1053 <[email protected]>
AuthorDate: Fri Sep 1 19:09:53 2023 +0800
Spark k8s operator task Added status acquisition (#4889)
* Spark k8s operator task Added status acquisition
* spark The obtaining status of the k8s operator task is changed to k8s
list-watch
* spark The obtaining status of the k8s operator task is changed to k8s
list-watch
---
...KubernetesOperatorClusterDescriptorAdapter.java | 58 ++++++++++++++--------
1 file changed, 36 insertions(+), 22 deletions(-)
diff --git
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java
index fa6236600..3ea27b394 100644
---
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java
@@ -38,6 +38,9 @@ import
io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinition
import
io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinitionList;
import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.Watcher;
+import io.fabric8.kubernetes.client.WatcherException;
+import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import org.slf4j.Logger;
@@ -152,32 +155,44 @@ public class KubernetesOperatorClusterDescriptorAdapter
extends ClusterDescripto
}
public boolean initJobId() {
- SparkApplicationStatus sparkApplicationStatus =
getKubernetesOperatorState();
-
- if (Objects.nonNull(sparkApplicationStatus)) {
- this.applicationId = sparkApplicationStatus.getSparkApplicationId();
- this.jobState =
- kubernetesOperatorStateConvertSparkState(
- sparkApplicationStatus.getApplicationState().getState());
+ try {
+ getKubernetesOperatorState();
+ } catch (Exception e) {
+ try {
+ // Prevent watch interruption due to network interruption.Restart
Watcher.
+ Thread.sleep(5000);
+ getKubernetesOperatorState();
+ } catch (InterruptedException interruptedException) {
+ logger.error("Use k8s watch obtain the status failed");
+ }
}
-
// When the job is not finished, the appId is monitored; otherwise, the
status is
// monitored(当任务没结束时,监控appId,反之,则监控状态,这里主要防止任务过早结束,导致一直等待)
return null != getApplicationId() || (jobState != null &&
jobState.isFinal());
}
- private SparkApplicationStatus getKubernetesOperatorState() {
- List<SparkApplication> sparkApplicationList =
- getSparkApplicationClient(client).list().getItems();
- if (CollectionUtils.isNotEmpty(sparkApplicationList)) {
- for (SparkApplication sparkApplication : sparkApplicationList) {
- if
(sparkApplication.getMetadata().getNamespace().equals(this.sparkConfig.getK8sNamespace())
- &&
sparkApplication.getMetadata().getName().equals(this.sparkConfig.getAppName()))
{
- return sparkApplication.getStatus();
- }
- }
- }
- return null;
+ private void getKubernetesOperatorState() {
+ getSparkApplicationClient(client)
+ .inNamespace(this.sparkConfig.getK8sNamespace())
+ .withName(this.sparkConfig.getAppName())
+ .watch(
+ new Watcher<SparkApplication>() {
+ @Override
+ public void eventReceived(Action action, SparkApplication
sparkApplication) {
+ // todo get status
+ applicationId =
sparkApplication.getStatus().getSparkApplicationId();
+ jobState =
+ kubernetesOperatorStateConvertSparkState(
+
sparkApplication.getStatus().getApplicationState().getState());
+ }
+
+ @Override
+ public void onClose(WatcherException e) {
+ // Invoked when the watcher closes due to an Exception.Restart
Watcher.
+ logger.error("Use k8s watch obtain the status failed", e);
+ getKubernetesOperatorState();
+ }
+ });
}
public SparkAppHandle.State kubernetesOperatorStateConvertSparkState(String
kubernetesState) {
@@ -216,8 +231,7 @@ public class KubernetesOperatorClusterDescriptorAdapter
extends ClusterDescripto
client.close();
}
- public static NonNamespaceOperation<
- SparkApplication, SparkApplicationList, Resource<SparkApplication>>
+ public static MixedOperation<SparkApplication, SparkApplicationList,
Resource<SparkApplication>>
getSparkApplicationClient(KubernetesClient client) {
return client.customResources(SparkApplication.class,
SparkApplicationList.class);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]