YARN-3950. Add unique SHELL_ID environment variable to DistributedShell. Contributed by Robert Kanter
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c7a0f587 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c7a0f587 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c7a0f587 Branch: refs/heads/YARN-2928 Commit: c7a0f587bf110aede55cbd62bc99ed534150294c Parents: f623636 Author: Jason Lowe <jl...@apache.org> Authored: Wed Jul 29 15:16:40 2015 +0000 Committer: Zhijie Shen <zjs...@apache.org> Committed: Mon Aug 3 17:02:09 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 ++ .../distributedshell/ApplicationMaster.java | 31 +++++++++++++++----- .../distributedshell/TestDSAppMaster.java | 11 ++++++- 3 files changed, 36 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7a0f587/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 9f95d3b..d59cd9b 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -479,6 +479,9 @@ Release 2.8.0 - UNRELEASED YARN-3026. Move application-specific container allocation logic from LeafQueue to FiCaSchedulerApp. (Wangda Tan via jianhe) + YARN-3950. Add unique SHELL_ID environment variable to DistributedShell + (Robert Kanter via jlowe) + OPTIMIZATIONS YARN-3339. TestDockerContainerExecutor should pull a single image and not http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7a0f587/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 13bf500..aac0370 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -186,6 +186,8 @@ public class ApplicationMaster { DS_APP_ATTEMPT, DS_CONTAINER } + private static final String YARN_SHELL_ID = "YARN_SHELL_ID"; + // Configuration private Configuration conf; @@ -292,6 +294,8 @@ public class ApplicationMaster { private final String linux_bash_command = "bash"; private final String windows_command = "cmd /c"; + private int yarnShellIdCounter = 1; + @VisibleForTesting protected final Set<ContainerId> launchedContainers = Collections.newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>()); @@ -880,8 +884,11 @@ public class ApplicationMaster { + allocatedContainers.size()); numAllocatedContainers.addAndGet(allocatedContainers.size()); for (Container allocatedContainer : allocatedContainers) { + String yarnShellId = Integer.toString(yarnShellIdCounter); + yarnShellIdCounter++; LOG.info("Launching shell command on a new container." + ", containerId=" + allocatedContainer.getId() + + ", yarnShellId=" + yarnShellId + ", containerNode=" + allocatedContainer.getNodeId().getHost() + ":" + allocatedContainer.getNodeId().getPort() + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() @@ -892,7 +899,8 @@ public class ApplicationMaster { // + ", containerToken" // +allocatedContainer.getContainerToken().getIdentifier().toString()); - Thread launchThread = createLaunchContainerThread(allocatedContainer); + Thread launchThread = createLaunchContainerThread(allocatedContainer, + yarnShellId); // launch and start the container on a separate thread to keep // the main thread unblocked @@ -1010,7 +1018,8 @@ public class ApplicationMaster { private class LaunchContainerRunnable implements Runnable { // Allocated container - Container container; + private Container container; + private String shellId; NMCallbackHandler containerListener; @@ -1018,10 +1027,11 @@ public class ApplicationMaster { * @param lcontainer Allocated container * @param containerListener Callback handler of the container */ - public LaunchContainerRunnable( - Container lcontainer, NMCallbackHandler containerListener) { + public LaunchContainerRunnable(Container lcontainer, + NMCallbackHandler containerListener, String shellId) { this.container = lcontainer; this.containerListener = containerListener; + this.shellId = shellId; } @Override @@ -1032,7 +1042,7 @@ public class ApplicationMaster { */ public void run() { LOG.info("Setting up container launch container for containerid=" - + container.getId()); + + container.getId() + " with shellid=" + shellId); // Set the local resources Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); @@ -1121,8 +1131,11 @@ public class ApplicationMaster { // download anyfiles in the distributed file-system. The tokens are // otherwise also useful in cases, for e.g., when one is running a // "hadoop dfs" command inside the distributed shell. + Map<String, String> myShellEnv = new HashMap<String, String>(shellEnv); + myShellEnv.put(YARN_SHELL_ID, shellId); ContainerLaunchContext ctx = ContainerLaunchContext.newInstance( - localResources, shellEnv, commands, null, allTokens.duplicate(), null); + localResources, myShellEnv, commands, null, allTokens.duplicate(), + null); containerListener.addContainer(container.getId(), container); nmClientAsync.startContainerAsync(container, ctx); } @@ -1388,9 +1401,11 @@ public class ApplicationMaster { } @VisibleForTesting - Thread createLaunchContainerThread(Container allocatedContainer) { + Thread createLaunchContainerThread(Container allocatedContainer, + String shellId) { LaunchContainerRunnable runnableLaunchContainer = - new LaunchContainerRunnable(allocatedContainer, containerListener); + new LaunchContainerRunnable(allocatedContainer, containerListener, + shellId); return new Thread(runnableLaunchContainer); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7a0f587/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java index 0fed14d..2789d04 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java @@ -41,6 +41,7 @@ import org.mockito.Mockito; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; /** @@ -51,11 +52,14 @@ public class TestDSAppMaster { static class TestAppMaster extends ApplicationMaster { private int threadsLaunched = 0; + public List<String> yarnShellIds = new ArrayList<String>(); @Override - protected Thread createLaunchContainerThread(Container allocatedContainer) { + protected Thread createLaunchContainerThread(Container allocatedContainer, + String shellId) { threadsLaunched++; launchedContainers.add(allocatedContainer.getId()); + yarnShellIds.add(shellId); return new Thread(); } @@ -101,6 +105,8 @@ public class TestDSAppMaster { Mockito.verifyZeroInteractions(mockClient); Assert.assertEquals("Incorrect number of threads launched", 1, master.threadsLaunched); + Assert.assertEquals("Incorrect YARN Shell IDs", + Arrays.asList("1"), master.yarnShellIds); // now send 3 extra containers containers.clear(); @@ -117,6 +123,9 @@ public class TestDSAppMaster { Assert.assertEquals("Incorrect number of threads launched", 4, master.threadsLaunched); + Assert.assertEquals("Incorrect YARN Shell IDs", + Arrays.asList("1", "2", "3", "4"), master.yarnShellIds); + // make sure we handle completion events correctly List<ContainerStatus> status = new ArrayList<>(); status.add(generateContainerStatus(id1, ContainerExitStatus.SUCCESS));