YARN-9001. [Submarine] Use AppAdminClient instead of ServiceClient to sumbit 
jobs. (Zac Zhou via wangda)

Change-Id: Ic3d6c1e439df9cdf74448b345b925343224efe51


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fcbd205c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fcbd205c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fcbd205c

Branch: refs/heads/HDFS-13891
Commit: fcbd205cc35e7411ac33860c78b9e1e70697bb4a
Parents: 9da6054
Author: Wangda Tan <wan...@apache.org>
Authored: Tue Nov 13 13:13:27 2018 -0800
Committer: Wangda Tan <wan...@apache.org>
Committed: Tue Nov 13 13:13:27 2018 -0800

----------------------------------------------------------------------
 .../yarn/service/client/ServiceClient.java      |  1 +
 .../submarine/runtimes/common/JobMonitor.java   |  6 +++
 .../yarnservice/YarnServiceJobMonitor.java      | 27 +++++++++----
 .../yarnservice/YarnServiceJobSubmitter.java    | 42 ++++++++++++++++++--
 .../runtimes/yarnservice/YarnServiceUtils.java  | 15 +++----
 .../yarnservice/TestYarnServiceRunJobCli.java   | 12 +++---
 .../submarine/common/MockClientContext.java     |  1 -
 7 files changed, 80 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/fcbd205c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
index 0bc5a2c..713d890 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
@@ -1552,6 +1552,7 @@ public class ServiceClient extends AppAdminClient 
implements SliderExitCodes,
       LOG.info("Service {} does not have an application ID", serviceName);
       return appSpec;
     }
+    appSpec.setId(currentAppId.toString());
     ApplicationReport appReport = 
yarnClient.getApplicationReport(currentAppId);
     appSpec.setState(convertState(appReport.getYarnApplicationState()));
     ApplicationTimeout lifetime =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fcbd205c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobMonitor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobMonitor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobMonitor.java
index c81393b..35e21fc 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobMonitor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/JobMonitor.java
@@ -48,6 +48,11 @@ public abstract class JobMonitor {
       throws IOException, YarnException;
 
   /**
+   * Cleanup AppAdminClient, etc.
+   */
+  public void cleanup() throws IOException {}
+
+  /**
    * Continue wait and print status if job goes to ready or final state.
    * @param jobName
    * @throws IOException
@@ -80,5 +85,6 @@ public abstract class JobMonitor {
         throw new IOException(e);
       }
     }
+    cleanup();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fcbd205c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobMonitor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobMonitor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobMonitor.java
index fab018a..ee68ddb 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobMonitor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobMonitor.java
@@ -14,9 +14,10 @@
 
 package org.apache.hadoop.yarn.submarine.runtimes.yarnservice;
 
+import org.apache.hadoop.yarn.client.api.AppAdminClient;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.service.api.records.Service;
-import org.apache.hadoop.yarn.service.client.ServiceClient;
+import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
 import org.apache.hadoop.yarn.submarine.common.ClientContext;
 import org.apache.hadoop.yarn.submarine.common.api.JobStatus;
 import org.apache.hadoop.yarn.submarine.common.api.builder.JobStatusBuilder;
@@ -25,21 +26,33 @@ import 
org.apache.hadoop.yarn.submarine.runtimes.common.JobMonitor;
 import java.io.IOException;
 
 public class YarnServiceJobMonitor extends JobMonitor {
-  private ServiceClient serviceClient = null;
+  private volatile AppAdminClient serviceClient = null;
 
   public YarnServiceJobMonitor(ClientContext clientContext) {
     super(clientContext);
   }
 
   @Override
-  public synchronized JobStatus getTrainingJobStatus(String jobName)
+  public JobStatus getTrainingJobStatus(String jobName)
       throws IOException, YarnException {
     if (this.serviceClient == null) {
-      this.serviceClient = YarnServiceUtils.createServiceClient(
-          clientContext.getYarnConfig());
+      synchronized(this) {
+        if (this.serviceClient == null) {
+          this.serviceClient = YarnServiceUtils.createServiceClient(
+              clientContext.getYarnConfig());
+        }
+      }
     }
+    String appStatus=serviceClient.getStatusString(jobName);
+    Service serviceSpec= ServiceApiUtil.jsonSerDeser.fromJson(appStatus);
+    JobStatus jobStatus = JobStatusBuilder.fromServiceSpec(serviceSpec);
+    return jobStatus;
+  }
 
-    Service serviceSpec = this.serviceClient.getStatus(jobName);
-    return JobStatusBuilder.fromServiceSpec(serviceSpec);
+  @Override
+  public void cleanup() throws IOException{
+    if (this.serviceClient != null) {
+      this.serviceClient.close();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fcbd205c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java
index bcd4698..d28f89f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java
@@ -19,6 +19,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.api.AppAdminClient;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.service.api.ServiceApiConstants;
 import org.apache.hadoop.yarn.service.api.records.Artifact;
@@ -27,7 +28,7 @@ import org.apache.hadoop.yarn.service.api.records.ConfigFile;
 import org.apache.hadoop.yarn.service.api.records.Resource;
 import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.service.api.records.Service;
-import org.apache.hadoop.yarn.service.client.ServiceClient;
+import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
 import org.apache.hadoop.yarn.submarine.client.cli.param.Quicklink;
 import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters;
 import org.apache.hadoop.yarn.submarine.common.ClientContext;
@@ -53,6 +54,8 @@ import java.util.Set;
 import java.util.StringTokenizer;
 
 import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+import static 
org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.EXIT_SUCCESS;
+import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser;
 
 /**
  * Submit a job to cluster
@@ -527,6 +530,20 @@ public class YarnServiceJobSubmitter implements 
JobSubmitter {
     return serviceSpec;
   }
 
+  private String generateServiceSpecFile(Service service) throws IOException {
+    File serviceSpecFile = File.createTempFile(service.getName(), ".json");
+    String buffer = jsonSerDeser.toJson(service);
+    Writer w = new OutputStreamWriter(new FileOutputStream(serviceSpecFile),
+        "UTF-8");
+    PrintWriter pw = new PrintWriter(w);
+    try {
+      pw.append(buffer);
+    } finally {
+      pw.close();
+    }
+    return serviceSpecFile.getAbsolutePath();
+  }
+
   /**
    * {@inheritDoc}
    */
@@ -534,13 +551,30 @@ public class YarnServiceJobSubmitter implements 
JobSubmitter {
   public ApplicationId submitJob(RunJobParameters parameters)
       throws IOException, YarnException {
     createServiceByParameters(parameters);
-    ServiceClient serviceClient = YarnServiceUtils.createServiceClient(
+    String serviceSpecFile = generateServiceSpecFile(serviceSpec);
+
+    AppAdminClient appAdminClient = YarnServiceUtils.createServiceClient(
         clientContext.getYarnConfig());
-    ApplicationId appid = serviceClient.actionCreate(serviceSpec);
-    serviceClient.stop();
+    int code = appAdminClient.actionLaunch(serviceSpecFile,
+        serviceSpec.getName(), null, null);
+    if(code != EXIT_SUCCESS) {
+      throw new YarnException("Fail to launch application with exit code:" +
+          code);
+    }
+
+    String appStatus=appAdminClient.getStatusString(serviceSpec.getName());
+    Service app=ServiceApiUtil.jsonSerDeser.fromJson(appStatus);
+    if(app.getId() == null) {
+      throw new YarnException("Can't get application id for Service " +
+          serviceSpec.getName());
+    }
+    ApplicationId appid = ApplicationId.fromString(app.getId());
+    appAdminClient.stop();
     return appid;
   }
 
+
+
   @VisibleForTesting
   public Service getServiceSpec() {
     return serviceSpec;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fcbd205c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceUtils.java
index ce3a1eb..c599fc9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceUtils.java
@@ -16,8 +16,8 @@ package org.apache.hadoop.yarn.submarine.runtimes.yarnservice;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.client.api.AppAdminClient;
 import org.apache.hadoop.yarn.service.api.records.Service;
-import org.apache.hadoop.yarn.service.client.ServiceClient;
 import org.apache.hadoop.yarn.submarine.common.Envs;
 import org.apache.hadoop.yarn.submarine.common.conf.SubmarineLogs;
 import org.slf4j.Logger;
@@ -26,27 +26,28 @@ import org.slf4j.LoggerFactory;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.hadoop.yarn.client.api.AppAdminClient.DEFAULT_TYPE;
+
 public class YarnServiceUtils {
   private static final Logger LOG =
       LoggerFactory.getLogger(YarnServiceUtils.class);
 
   // This will be true only in UT.
-  private static ServiceClient stubServiceClient = null;
+  private static AppAdminClient stubServiceClient = null;
 
-  public static ServiceClient createServiceClient(
+  public static AppAdminClient createServiceClient(
       Configuration yarnConfiguration) {
     if (stubServiceClient != null) {
       return stubServiceClient;
     }
 
-    ServiceClient serviceClient = new ServiceClient();
-    serviceClient.init(yarnConfiguration);
-    serviceClient.start();
+    AppAdminClient serviceClient = AppAdminClient.createAppAdminClient(
+        DEFAULT_TYPE, yarnConfiguration);
     return serviceClient;
   }
 
   @VisibleForTesting
-  public static void setStubServiceClient(ServiceClient stubServiceClient) {
+  public static void setStubServiceClient(AppAdminClient stubServiceClient) {
     YarnServiceUtils.stubServiceClient = stubServiceClient;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fcbd205c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java
index 89d39a0..4391030 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java
@@ -19,12 +19,11 @@
 package org.apache.hadoop.yarn.submarine.client.cli.yarnservice;
 
 import com.google.common.collect.ImmutableMap;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AppAdminClient;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.service.api.records.Component;
 import org.apache.hadoop.yarn.service.api.records.Service;
-import org.apache.hadoop.yarn.service.client.ServiceClient;
 import org.apache.hadoop.yarn.submarine.client.cli.RunJobCli;
 import org.apache.hadoop.yarn.submarine.common.MockClientContext;
 import org.apache.hadoop.yarn.submarine.common.api.TaskType;
@@ -45,6 +44,7 @@ import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.Map;
 
+import static 
org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.EXIT_SUCCESS;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -53,9 +53,11 @@ public class TestYarnServiceRunJobCli {
   @Before
   public void before() throws IOException, YarnException {
     SubmarineLogs.verboseOff();
-    ServiceClient serviceClient = mock(ServiceClient.class);
-    when(serviceClient.actionCreate(any(Service.class))).thenReturn(
-        ApplicationId.newInstance(1234L, 1));
+    AppAdminClient serviceClient = mock(AppAdminClient.class);
+    when(serviceClient.actionLaunch(any(String.class), any(String.class),
+        any(Long.class), any(String.class))).thenReturn(EXIT_SUCCESS);
+    when(serviceClient.getStatusString(any(String.class))).thenReturn(
+        "{\"id\": \"application_1234_1\"}");
     YarnServiceUtils.setStubServiceClient(serviceClient);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fcbd205c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/MockClientContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/MockClientContext.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/MockClientContext.java
index 5c06ddc..b59c01e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/MockClientContext.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/MockClientContext.java
@@ -22,7 +22,6 @@ import 
org.apache.hadoop.yarn.submarine.common.fs.MockRemoteDirectoryManager;
 import org.apache.hadoop.yarn.submarine.common.fs.RemoteDirectoryManager;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.service.client.ServiceClient;
 import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 
 import java.io.IOException;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to