MAPREDUCE-6824. TaskAttemptImpl#createCommonContainerLaunchContext is longer 
than 150 lines. Contributed by Chris Trezzo.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/845529b3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/845529b3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/845529b3

Branch: refs/heads/YARN-2915
Commit: 845529b3ab338e759665a687eb525fb2cccde7bf
Parents: a4b5aa8
Author: Akira Ajisaka <aajis...@apache.org>
Authored: Mon Apr 3 13:06:24 2017 +0900
Committer: Akira Ajisaka <aajis...@apache.org>
Committed: Mon Apr 3 13:06:54 2017 +0900

----------------------------------------------------------------------
 .../v2/app/job/impl/TaskAttemptImpl.java        | 285 ++++++++++---------
 1 file changed, 153 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/845529b3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
index 4305824..9ea1b9a 100755
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
@@ -755,7 +755,7 @@ public abstract class TaskAttemptImpl implements
         new HashMap<String, LocalResource>();
     
     // Application environment
-    Map<String, String> environment = new HashMap<String, String>();
+    Map<String, String> environment;
 
     // Service data
     Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
@@ -763,157 +763,178 @@ public abstract class TaskAttemptImpl implements
     // Tokens
     ByteBuffer taskCredentialsBuffer = ByteBuffer.wrap(new byte[]{});
     try {
-      FileSystem remoteFS = FileSystem.get(conf);
-
-      // //////////// Set up JobJar to be localized properly on the remote NM.
-      String jobJar = conf.get(MRJobConfig.JAR);
-      if (jobJar != null) {
-        final Path jobJarPath = new Path(jobJar);
-        final FileSystem jobJarFs = FileSystem.get(jobJarPath.toUri(), conf);
-        Path remoteJobJar = jobJarPath.makeQualified(jobJarFs.getUri(),
-            jobJarFs.getWorkingDirectory());
-        LocalResource rc = createLocalResource(jobJarFs, remoteJobJar,
-            LocalResourceType.PATTERN, LocalResourceVisibility.APPLICATION);
-        String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN, 
-            JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern();
-        rc.setPattern(pattern);
-        localResources.put(MRJobConfig.JOB_JAR, rc);
-        LOG.info("The job-jar file on the remote FS is "
-            + remoteJobJar.toUri().toASCIIString());
-      } else {
-        // Job jar may be null. For e.g, for pipes, the job jar is the hadoop
-        // mapreduce jar itself which is already on the classpath.
-        LOG.info("Job jar is not present. "
-            + "Not adding any jar to the list of resources.");
-      }
-      // //////////// End of JobJar setup
-
-      // //////////// Set up JobConf to be localized properly on the remote NM.
-      Path path =
-          MRApps.getStagingAreaDir(conf, UserGroupInformation
-              .getCurrentUser().getShortUserName());
-      Path remoteJobSubmitDir =
-          new Path(path, oldJobId.toString());
-      Path remoteJobConfPath = 
-          new Path(remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);
-      localResources.put(
-          MRJobConfig.JOB_CONF_FILE,
-          createLocalResource(remoteFS, remoteJobConfPath,
-              LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
-      LOG.info("The job-conf file on the remote FS is "
-          + remoteJobConfPath.toUri().toASCIIString());
-      // //////////// End of JobConf setup
 
-      // Setup DistributedCache
-      MRApps.setupDistributedCache(conf, localResources);
+      configureJobJar(conf, localResources);
 
-      // Setup up task credentials buffer
-      LOG.info("Adding #" + credentials.numberOfTokens()
-          + " tokens and #" + credentials.numberOfSecretKeys()
-          + " secret keys for NM use for launching container");
-      Credentials taskCredentials = new Credentials(credentials);
+      configureJobConf(conf, localResources, oldJobId);
 
-      // LocalStorageToken is needed irrespective of whether security is 
enabled
-      // or not.
-      TokenCache.setJobToken(jobToken, taskCredentials);
+      // Setup DistributedCache
+      MRApps.setupDistributedCache(conf, localResources);
 
-      DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
-      LOG.info("Size of containertokens_dob is "
-          + taskCredentials.numberOfTokens());
-      taskCredentials.writeTokenStorageToStream(containerTokens_dob);
       taskCredentialsBuffer =
-          ByteBuffer.wrap(containerTokens_dob.getData(), 0,
-              containerTokens_dob.getLength());
-
-      // Add shuffle secret key
-      // The secret key is converted to a JobToken to preserve backwards
-      // compatibility with an older ShuffleHandler running on an NM.
-      LOG.info("Putting shuffle token in serviceData");
-      byte[] shuffleSecret = TokenCache.getShuffleSecretKey(credentials);
-      if (shuffleSecret == null) {
-        LOG.warn("Cannot locate shuffle secret in credentials."
-            + " Using job token as shuffle secret.");
-        shuffleSecret = jobToken.getPassword();
-      }
-      Token<JobTokenIdentifier> shuffleToken = new Token<JobTokenIdentifier>(
-          jobToken.getIdentifier(), shuffleSecret, jobToken.getKind(),
-          jobToken.getService());
-      serviceData.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
-          ShuffleHandler.serializeServiceData(shuffleToken));
-
-      // add external shuffle-providers - if any
-      Collection<String> shuffleProviders = conf.getStringCollection(
-          MRJobConfig.MAPREDUCE_JOB_SHUFFLE_PROVIDER_SERVICES);
-      if (! shuffleProviders.isEmpty()) {
-        Collection<String> auxNames = conf.getStringCollection(
-            YarnConfiguration.NM_AUX_SERVICES);
-
-        for (final String shuffleProvider : shuffleProviders) {
-          if 
(shuffleProvider.equals(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID)) {
-            continue; // skip built-in shuffle-provider that was already 
inserted with shuffle secret key
-          }
-          if (auxNames.contains(shuffleProvider)) {
-                LOG.info("Adding ShuffleProvider Service: " + shuffleProvider 
+ " to serviceData");
-                // This only serves for INIT_APP notifications
-                // The shuffle service needs to be able to work with the 
host:port information provided by the AM
-                // (i.e. shuffle services which require custom location / 
other configuration are not supported)
-                serviceData.put(shuffleProvider, ByteBuffer.allocate(0));
-          }
-          else {
-            throw new YarnRuntimeException("ShuffleProvider Service: " + 
shuffleProvider +
-            " was NOT found in the list of aux-services that are available in 
this NM." +
-            " You may need to specify this ShuffleProvider as an aux-service 
in your yarn-site.xml");
-          }
-        }
-      }
+          configureTokens(jobToken, credentials, serviceData);
 
-      MRApps.addToEnvironment(
-          environment,  
-          Environment.CLASSPATH.name(), 
-          getInitialClasspath(conf), conf);
+      addExternalShuffleProviders(conf, serviceData);
+
+      environment = configureEnv(conf);
 
-      if (initialAppClasspath != null) {
-        MRApps.addToEnvironment(
-            environment,  
-            Environment.APP_CLASSPATH.name(), 
-            initialAppClasspath, conf);
-      }
     } catch (IOException e) {
       throw new YarnRuntimeException(e);
     }
 
-    // Shell
-    environment.put(
-        Environment.SHELL.name(), 
-        conf.get(
-            MRJobConfig.MAPRED_ADMIN_USER_SHELL, 
-            MRJobConfig.DEFAULT_SHELL)
-            );
-
-    // Add pwd to LD_LIBRARY_PATH, add this before adding anything else
-    MRApps.addToEnvironment(
-        environment, 
-        Environment.LD_LIBRARY_PATH.name(), 
-        MRApps.crossPlatformifyMREnv(conf, Environment.PWD), conf);
-
-    // Add the env variables passed by the admin
-    MRApps.setEnvFromInputString(
-        environment, 
-        conf.get(
-            MRJobConfig.MAPRED_ADMIN_USER_ENV, 
-            MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV), conf
-        );
-
     // Construct the actual Container
     // The null fields are per-container and will be constructed for each
     // container separately.
     ContainerLaunchContext container =
         ContainerLaunchContext.newInstance(localResources, environment, null,
-          serviceData, taskCredentialsBuffer, applicationACLs);
+            serviceData, taskCredentialsBuffer, applicationACLs);
 
     return container;
   }
 
+  private static Map<String, String> configureEnv(Configuration conf)
+      throws IOException {
+    Map<String, String> environment = new HashMap<String, String>();
+    MRApps.addToEnvironment(environment, Environment.CLASSPATH.name(),
+        getInitialClasspath(conf), conf);
+
+    if (initialAppClasspath != null) {
+      MRApps.addToEnvironment(environment, Environment.APP_CLASSPATH.name(),
+          initialAppClasspath, conf);
+    }
+
+    // Shell
+    environment.put(Environment.SHELL.name(), conf
+        .get(MRJobConfig.MAPRED_ADMIN_USER_SHELL, MRJobConfig.DEFAULT_SHELL));
+
+    // Add pwd to LD_LIBRARY_PATH, add this before adding anything else
+    MRApps.addToEnvironment(environment, Environment.LD_LIBRARY_PATH.name(),
+        MRApps.crossPlatformifyMREnv(conf, Environment.PWD), conf);
+
+    // Add the env variables passed by the admin
+    MRApps.setEnvFromInputString(environment,
+        conf.get(MRJobConfig.MAPRED_ADMIN_USER_ENV,
+            MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV),
+        conf);
+    return environment;
+  }
+
+  private static void configureJobJar(Configuration conf,
+      Map<String, LocalResource> localResources) throws IOException {
+    // Set up JobJar to be localized properly on the remote NM.
+    String jobJar = conf.get(MRJobConfig.JAR);
+    if (jobJar != null) {
+      final Path jobJarPath = new Path(jobJar);
+      final FileSystem jobJarFs = FileSystem.get(jobJarPath.toUri(), conf);
+      Path remoteJobJar = jobJarPath.makeQualified(jobJarFs.getUri(),
+          jobJarFs.getWorkingDirectory());
+      LocalResource rc = createLocalResource(jobJarFs, remoteJobJar,
+          LocalResourceType.PATTERN, LocalResourceVisibility.APPLICATION);
+      String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN,
+          JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern();
+      rc.setPattern(pattern);
+      localResources.put(MRJobConfig.JOB_JAR, rc);
+      LOG.info("The job-jar file on the remote FS is "
+          + remoteJobJar.toUri().toASCIIString());
+    } else {
+      // Job jar may be null. For e.g, for pipes, the job jar is the hadoop
+      // mapreduce jar itself which is already on the classpath.
+      LOG.info("Job jar is not present. "
+          + "Not adding any jar to the list of resources.");
+    }
+  }
+
+  private static void configureJobConf(Configuration conf,
+      Map<String, LocalResource> localResources,
+      final org.apache.hadoop.mapred.JobID oldJobId) throws IOException {
+    // Set up JobConf to be localized properly on the remote NM.
+    Path path = MRApps.getStagingAreaDir(conf,
+        UserGroupInformation.getCurrentUser().getShortUserName());
+    Path remoteJobSubmitDir = new Path(path, oldJobId.toString());
+    Path remoteJobConfPath =
+        new Path(remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);
+    FileSystem remoteFS = FileSystem.get(conf);
+    localResources.put(MRJobConfig.JOB_CONF_FILE,
+        createLocalResource(remoteFS, remoteJobConfPath, 
LocalResourceType.FILE,
+            LocalResourceVisibility.APPLICATION));
+    LOG.info("The job-conf file on the remote FS is "
+        + remoteJobConfPath.toUri().toASCIIString());
+  }
+
+  private static ByteBuffer configureTokens(Token<JobTokenIdentifier> jobToken,
+      Credentials credentials,
+      Map<String, ByteBuffer> serviceData) throws IOException {
+    // Setup up task credentials buffer
+    LOG.info("Adding #" + credentials.numberOfTokens() + " tokens and #"
+        + credentials.numberOfSecretKeys()
+        + " secret keys for NM use for launching container");
+    Credentials taskCredentials = new Credentials(credentials);
+
+    // LocalStorageToken is needed irrespective of whether security is enabled
+    // or not.
+    TokenCache.setJobToken(jobToken, taskCredentials);
+
+    DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
+    LOG.info(
+        "Size of containertokens_dob is " + taskCredentials.numberOfTokens());
+    taskCredentials.writeTokenStorageToStream(containerTokens_dob);
+    ByteBuffer taskCredentialsBuffer =
+        ByteBuffer.wrap(containerTokens_dob.getData(), 0,
+            containerTokens_dob.getLength());
+
+    // Add shuffle secret key
+    // The secret key is converted to a JobToken to preserve backwards
+    // compatibility with an older ShuffleHandler running on an NM.
+    LOG.info("Putting shuffle token in serviceData");
+    byte[] shuffleSecret = TokenCache.getShuffleSecretKey(credentials);
+    if (shuffleSecret == null) {
+      LOG.warn("Cannot locate shuffle secret in credentials."
+          + " Using job token as shuffle secret.");
+      shuffleSecret = jobToken.getPassword();
+    }
+    Token<JobTokenIdentifier> shuffleToken =
+        new Token<JobTokenIdentifier>(jobToken.getIdentifier(), shuffleSecret,
+            jobToken.getKind(), jobToken.getService());
+    serviceData.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
+        ShuffleHandler.serializeServiceData(shuffleToken));
+    return taskCredentialsBuffer;
+  }
+
+  private static void addExternalShuffleProviders(Configuration conf,
+      Map<String, ByteBuffer> serviceData) {
+    // add external shuffle-providers - if any
+    Collection<String> shuffleProviders = conf.getStringCollection(
+        MRJobConfig.MAPREDUCE_JOB_SHUFFLE_PROVIDER_SERVICES);
+    if (!shuffleProviders.isEmpty()) {
+      Collection<String> auxNames =
+          conf.getStringCollection(YarnConfiguration.NM_AUX_SERVICES);
+
+      for (final String shuffleProvider : shuffleProviders) {
+        if (shuffleProvider
+            .equals(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID)) {
+          continue; // skip built-in shuffle-provider that was already inserted
+                    // with shuffle secret key
+        }
+        if (auxNames.contains(shuffleProvider)) {
+          LOG.info("Adding ShuffleProvider Service: " + shuffleProvider
+              + " to serviceData");
+          // This only serves for INIT_APP notifications
+          // The shuffle service needs to be able to work with the host:port
+          // information provided by the AM
+          // (i.e. shuffle services which require custom location / other
+          // configuration are not supported)
+          serviceData.put(shuffleProvider, ByteBuffer.allocate(0));
+        } else {
+          throw new YarnRuntimeException("ShuffleProvider Service: "
+              + shuffleProvider
+              + " was NOT found in the list of aux-services that are "
+              + "available in this NM. You may need to specify this "
+              + "ShuffleProvider as an aux-service in your yarn-site.xml");
+        }
+      }
+    }
+  }
+
   static ContainerLaunchContext createContainerLaunchContext(
       Map<ApplicationAccessType, String> applicationACLs,
       Configuration conf, Token<JobTokenIdentifier> jobToken, Task remoteTask,


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to