YARN-3950. Add unique SHELL_ID environment variable to DistributedShell. 
Contributed by Robert Kanter


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c7a0f587
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c7a0f587
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c7a0f587

Branch: refs/heads/YARN-2928
Commit: c7a0f587bf110aede55cbd62bc99ed534150294c
Parents: f623636
Author: Jason Lowe <jl...@apache.org>
Authored: Wed Jul 29 15:16:40 2015 +0000
Committer: Zhijie Shen <zjs...@apache.org>
Committed: Mon Aug 3 17:02:09 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 ++
 .../distributedshell/ApplicationMaster.java     | 31 +++++++++++++++-----
 .../distributedshell/TestDSAppMaster.java       | 11 ++++++-
 3 files changed, 36 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7a0f587/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 9f95d3b..d59cd9b 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -479,6 +479,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3026. Move application-specific container allocation logic from
     LeafQueue to FiCaSchedulerApp. (Wangda Tan via jianhe)
 
+    YARN-3950. Add unique SHELL_ID environment variable to DistributedShell
+    (Robert Kanter via jlowe)
+
   OPTIMIZATIONS
 
     YARN-3339. TestDockerContainerExecutor should pull a single image and not

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7a0f587/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index 13bf500..aac0370 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -186,6 +186,8 @@ public class ApplicationMaster {
     DS_APP_ATTEMPT, DS_CONTAINER
   }
 
+  private static final String YARN_SHELL_ID = "YARN_SHELL_ID";
+
   // Configuration
   private Configuration conf;
 
@@ -292,6 +294,8 @@ public class ApplicationMaster {
   private final String linux_bash_command = "bash";
   private final String windows_command = "cmd /c";
 
+  private int yarnShellIdCounter = 1;
+
   @VisibleForTesting
   protected final Set<ContainerId> launchedContainers =
       Collections.newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>());
@@ -880,8 +884,11 @@ public class ApplicationMaster {
           + allocatedContainers.size());
       numAllocatedContainers.addAndGet(allocatedContainers.size());
       for (Container allocatedContainer : allocatedContainers) {
+        String yarnShellId = Integer.toString(yarnShellIdCounter);
+        yarnShellIdCounter++;
         LOG.info("Launching shell command on a new container."
             + ", containerId=" + allocatedContainer.getId()
+            + ", yarnShellId=" + yarnShellId
             + ", containerNode=" + allocatedContainer.getNodeId().getHost()
             + ":" + allocatedContainer.getNodeId().getPort()
             + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
@@ -892,7 +899,8 @@ public class ApplicationMaster {
         // + ", containerToken"
         // +allocatedContainer.getContainerToken().getIdentifier().toString());
 
-        Thread launchThread = createLaunchContainerThread(allocatedContainer);
+        Thread launchThread = createLaunchContainerThread(allocatedContainer,
+            yarnShellId);
 
         // launch and start the container on a separate thread to keep
         // the main thread unblocked
@@ -1010,7 +1018,8 @@ public class ApplicationMaster {
   private class LaunchContainerRunnable implements Runnable {
 
     // Allocated container
-    Container container;
+    private Container container;
+    private String shellId;
 
     NMCallbackHandler containerListener;
 
@@ -1018,10 +1027,11 @@ public class ApplicationMaster {
      * @param lcontainer Allocated container
      * @param containerListener Callback handler of the container
      */
-    public LaunchContainerRunnable(
-        Container lcontainer, NMCallbackHandler containerListener) {
+    public LaunchContainerRunnable(Container lcontainer,
+        NMCallbackHandler containerListener, String shellId) {
       this.container = lcontainer;
       this.containerListener = containerListener;
+      this.shellId = shellId;
     }
 
     @Override
@@ -1032,7 +1042,7 @@ public class ApplicationMaster {
      */
     public void run() {
       LOG.info("Setting up container launch container for containerid="
-          + container.getId());
+          + container.getId() + " with shellid=" + shellId);
 
       // Set the local resources
       Map<String, LocalResource> localResources = new HashMap<String, 
LocalResource>();
@@ -1121,8 +1131,11 @@ public class ApplicationMaster {
       // download anyfiles in the distributed file-system. The tokens are
       // otherwise also useful in cases, for e.g., when one is running a
       // "hadoop dfs" command inside the distributed shell.
+      Map<String, String> myShellEnv = new HashMap<String, String>(shellEnv);
+      myShellEnv.put(YARN_SHELL_ID, shellId);
       ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
-        localResources, shellEnv, commands, null, allTokens.duplicate(), null);
+        localResources, myShellEnv, commands, null, allTokens.duplicate(),
+          null);
       containerListener.addContainer(container.getId(), container);
       nmClientAsync.startContainerAsync(container, ctx);
     }
@@ -1388,9 +1401,11 @@ public class ApplicationMaster {
   }
 
   @VisibleForTesting
-  Thread createLaunchContainerThread(Container allocatedContainer) {
+  Thread createLaunchContainerThread(Container allocatedContainer,
+      String shellId) {
     LaunchContainerRunnable runnableLaunchContainer =
-        new LaunchContainerRunnable(allocatedContainer, containerListener);
+        new LaunchContainerRunnable(allocatedContainer, containerListener,
+            shellId);
     return new Thread(runnableLaunchContainer);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7a0f587/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java
index 0fed14d..2789d04 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java
@@ -41,6 +41,7 @@ import org.mockito.Mockito;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 /**
@@ -51,11 +52,14 @@ public class TestDSAppMaster {
 
   static class TestAppMaster extends ApplicationMaster {
     private int threadsLaunched = 0;
+    public List<String> yarnShellIds = new ArrayList<String>();
 
     @Override
-    protected Thread createLaunchContainerThread(Container allocatedContainer) 
{
+    protected Thread createLaunchContainerThread(Container allocatedContainer,
+        String shellId) {
       threadsLaunched++;
       launchedContainers.add(allocatedContainer.getId());
+      yarnShellIds.add(shellId);
       return new Thread();
     }
 
@@ -101,6 +105,8 @@ public class TestDSAppMaster {
     Mockito.verifyZeroInteractions(mockClient);
     Assert.assertEquals("Incorrect number of threads launched", 1,
         master.threadsLaunched);
+    Assert.assertEquals("Incorrect YARN Shell IDs",
+        Arrays.asList("1"), master.yarnShellIds);
 
     // now send 3 extra containers
     containers.clear();
@@ -117,6 +123,9 @@ public class TestDSAppMaster {
     Assert.assertEquals("Incorrect number of threads launched", 4,
         master.threadsLaunched);
 
+    Assert.assertEquals("Incorrect YARN Shell IDs",
+        Arrays.asList("1", "2", "3", "4"), master.yarnShellIds);
+
     // make sure we handle completion events correctly
     List<ContainerStatus> status = new ArrayList<>();
     status.add(generateContainerStatus(id1, ContainerExitStatus.SUCCESS));

Reply via email to