This is an automated email from the ASF dual-hosted git repository. pingsutw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/submarine.git
The following commit(s) were added to refs/heads/master by this push: new 71ccbf5 SUBMARINE-1021. Experiment Watcher 71ccbf5 is described below commit 71ccbf50701da9eb0c5ef8f5d1fae372a44a1154 Author: noidname01 <tim983...@gmail.com> AuthorDate: Sun Oct 24 16:38:13 2021 +0800 SUBMARINE-1021. Experiment Watcher ### What is this PR for? Use k8s java client to build watchers of TFJobs and PytorchJobs, logging status when the experiment status change. [Watcher examples](https://github.com/kubernetes-client/java/blob/master/examples/examples-release-12/src/main/java/io/kubernetes/client/examples/WatchExample.java) We will create a websocket connection between server and workbench, and modify the frontend logic of workbench in the following PRs. ### What type of PR is it? [Feature] ### Todos None ### What is the Jira issue? https://issues.apache.org/jira/projects/SUBMARINE/issues/SUBMARINE-1021 ### How should this be tested? It will run with the initializing of k8s submitter, then keep watching the experiments. You can see the log when status of experiment changing. ### Screenshots (if appropriate) https://user-images.githubusercontent.com/55401762/136215425-5dcf4c15-3810-42b5-9511-ceb00b781405.mp4 ### Questions: * Do the license files need updating? No * Are there breaking changes for older versions? No * Does this need new documentation? No Author: noidname01 <tim983...@gmail.com> Signed-off-by: Kevin <pings...@apache.org> Closes #767 from noidname01/SUBMARINE-1021 and squashes the following commits: 9daa4af2 [noidname01] add error handling 00b341d8 [noidname01] delete debugging 423434ca [noidname01] add error log 69acfb58 [noidname01] delete comment 1b50a75e [noidname01] experiment watcher complete 9c6ace57 [noidname01] informer(WIP) 9f86dd53 [noidname01] add informer(WIP) --- .../server/submitter/k8s/K8sSubmitter.java | 111 ++++++++++++++++++++- 1 file changed, 106 insertions(+), 5 deletions(-) diff --git a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sSubmitter.java b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sSubmitter.java index 0a23e58..cac7398 100644 --- a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sSubmitter.java +++ b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sSubmitter.java @@ -26,9 +26,14 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import com.google.gson.JsonSyntaxException; +import com.squareup.okhttp.OkHttpClient; import io.kubernetes.client.ApiClient; import io.kubernetes.client.ApiException; import io.kubernetes.client.Configuration; @@ -46,8 +51,10 @@ import io.kubernetes.client.models.V1Pod; import io.kubernetes.client.models.V1PodList; import io.kubernetes.client.models.V1Service; import io.kubernetes.client.models.V1Status; +import io.kubernetes.client.util.Watch; import io.kubernetes.client.util.ClientBuilder; import io.kubernetes.client.util.KubeConfig; + import org.apache.submarine.commons.utils.SubmarineConfiguration; import org.apache.submarine.commons.utils.exception.SubmarineRuntimeException; import org.apache.submarine.server.api.Submitter; @@ -68,6 +75,8 @@ import org.apache.submarine.server.submitter.k8s.model.ingressroute.IngressRoute import org.apache.submarine.server.submitter.k8s.model.ingressroute.IngressRouteSpec; import org.apache.submarine.server.submitter.k8s.model.ingressroute.SpecRoute; import org.apache.submarine.server.submitter.k8s.model.middlewares.Middlewares; +import org.apache.submarine.server.submitter.k8s.model.pytorchjob.PyTorchJob; +import org.apache.submarine.server.submitter.k8s.model.tfjob.TFJob; import org.apache.submarine.server.submitter.k8s.parser.ExperimentSpecParser; import org.apache.submarine.server.submitter.k8s.parser.NotebookSpecParser; import org.apache.submarine.server.submitter.k8s.parser.ServeSpecParser; @@ -79,7 +88,6 @@ import org.apache.submarine.server.submitter.k8s.util.OwnerReferenceUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * JobSubmitter for Kubernetes Cluster. */ @@ -100,12 +108,13 @@ public class K8sSubmitter implements Submitter { private AppsV1Api appsV1Api; + private ApiClient client = null; + public K8sSubmitter() { } @Override public void initialize(SubmarineConfiguration conf) { - ApiClient client = null; try { String path = System.getenv(KUBECONFIG_ENV); KubeConfig config = KubeConfig.loadKubeConfig(new FileReader(path)); @@ -119,6 +128,10 @@ public class K8sSubmitter implements Submitter { throw new SubmarineRuntimeException(500, "Initialize K8s submitter failed."); } } finally { + // let watcher can wait until the next change + OkHttpClient httpClient = client.getHttpClient(); + httpClient.setReadTimeout(0, TimeUnit.SECONDS); + client.setHttpClient(httpClient); Configuration.setDefaultApiClient(client); } @@ -133,7 +146,12 @@ public class K8sSubmitter implements Submitter { appsV1Api = new AppsV1Api(); } - client.setDebugging(true); + try { + watchExperiment(); + } catch (Exception e){ + LOG.error("Experiment watch failed. " + e.getMessage(), e); + } + } @Override @@ -164,7 +182,7 @@ public class K8sSubmitter implements Submitter { try { MLJob mlJob = ExperimentSpecParser.parseJob(spec); mlJob.getMetadata().setNamespace(getServerNamespace()); - + Object object = api.getNamespacedCustomObject(mlJob.getGroup(), mlJob.getVersion(), mlJob.getMetadata().getNamespace(), mlJob.getPlural(), mlJob.getMetadata().getName()); experiment = parseExperimentResponseObject(object, ParseOp.PARSE_OP_RESULT); @@ -451,7 +469,7 @@ public class K8sSubmitter implements Submitter { if (latestEvent.getReason().equalsIgnoreCase("Pulling")) { notebook.setStatus(Notebook.Status.STATUS_PULLING.getValue()); notebook.setReason(latestEvent.getReason()); - } + } } } catch (ApiException e) { throw new SubmarineRuntimeException(e.getCode(), e.getMessage()); @@ -565,6 +583,89 @@ public class K8sSubmitter implements Submitter { } } + public void watchExperiment() throws ApiException{ + + Watch<MLJob> watchTF = Watch.createWatch( + client, + api.listNamespacedCustomObjectCall( + TFJob.CRD_TF_GROUP_V1, + TFJob.CRD_TF_VERSION_V1, + getServerNamespace(), + TFJob.CRD_TF_PLURAL_V1, + "true", + null, + null, + null, + null, + Boolean.TRUE, + null, + null + ), + new TypeToken<Watch.Response<MLJob>>() {}.getType() + ); + + Watch<MLJob> watchPytorch = Watch.createWatch( + client, + api.listNamespacedCustomObjectCall( + PyTorchJob.CRD_PYTORCH_GROUP_V1, + PyTorchJob.CRD_PYTORCH_VERSION_V1, + getServerNamespace(), + PyTorchJob.CRD_PYTORCH_PLURAL_V1, + "true", + null, + null, + null, + null, + Boolean.TRUE, + null, + null + ), + new TypeToken<Watch.Response<MLJob>>() {}.getType() + ); + + ExecutorService experimentThread = Executors.newFixedThreadPool(2); + + experimentThread.execute(new Runnable() { + @Override + public void run() { + try { + LOG.info("Start watching on TFJobs..."); + for (Watch.Response<MLJob> experiment : watchTF) { + LOG.info("{}", experiment.object.getStatus()); + } + } finally { + LOG.info("WATCH TFJob END"); + try { + watchTF.close(); + } catch (Exception e){ + LOG.error("{}", e.getMessage()); + } + throw new RuntimeException(); + } + } + }); + + experimentThread.execute(new Runnable() { + @Override + public void run() { + try { + LOG.info("Start watching on PytorchJobs..."); + for (Watch.Response<MLJob> experiment : watchPytorch) { + LOG.info("{}", experiment.object.getStatus()); + } + } finally { + LOG.info("WATCH PytorchJob END"); + try { + watchPytorch.close(); + } catch (Exception e){ + LOG.error("{}", e.getMessage()); + } + throw new RuntimeException(); + } + } + }); + } + public void createPersistentVolumeClaim(String pvcName, String namespace, String scName, String storage) throws ApiException { V1PersistentVolumeClaim pvc = VolumeSpecParser.parsePersistentVolumeClaim(pvcName, scName, storage); --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@submarine.apache.org For additional commands, e-mail: dev-h...@submarine.apache.org