http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 6dd1ac7..827bdb3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; @@ -192,10 +193,10 @@ public abstract class BaseContainerManagerTest { conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1); // Default delSrvc + exec = createContainerExecutor(); delSrvc = createDeletionService(); delSrvc.init(conf); - exec = createContainerExecutor(); dirsHandler = new LocalDirsHandlerService(); nodeHealthChecker = new NodeHealthCheckerService( NodeManager.getNodeHealthScriptRunner(conf), dirsHandler); @@ -288,32 +289,43 @@ public abstract class BaseContainerManagerTest { ContainerManagementProtocol containerManager, ContainerId containerID, ContainerState finalState) throws InterruptedException, YarnException, IOException { - waitForContainerState(containerManager, containerID, finalState, 20); + waitForContainerState(containerManager, containerID, + Arrays.asList(finalState), 20); } public static void waitForContainerState( ContainerManagementProtocol containerManager, ContainerId containerID, ContainerState finalState, int timeOutMax) throws InterruptedException, YarnException, IOException { + waitForContainerState(containerManager, containerID, + Arrays.asList(finalState), timeOutMax); + } + + public static void waitForContainerState( + ContainerManagementProtocol containerManager, ContainerId containerID, + List<ContainerState> finalStates, int timeOutMax) + throws InterruptedException, YarnException, IOException { List<ContainerId> list = new ArrayList<ContainerId>(); list.add(containerID); GetContainerStatusesRequest request = GetContainerStatusesRequest.newInstance(list); ContainerStatus containerStatus = null; + HashSet<ContainerState> fStates = + new HashSet<>(finalStates); int timeoutSecs = 0; do { Thread.sleep(2000); containerStatus = containerManager.getContainerStatuses(request) .getContainerStatuses().get(0); - LOG.info("Waiting for container to get into state " + finalState + LOG.info("Waiting for container to get into one of states " + fStates + ". Current state is " + containerStatus.getState()); timeoutSecs += 2; - } while (!containerStatus.getState().equals(finalState) + } while (!fStates.contains(containerStatus.getState()) && timeoutSecs < timeOutMax); LOG.info("Container state is " + containerStatus.getState()); - Assert.assertEquals("ContainerState is not correct (timedout)", - finalState, containerStatus.getState()); + Assert.assertTrue("ContainerState is not correct (timedout)", + fStates.contains(containerStatus.getState())); } public static void waitForApplicationState(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java index 32dddae..31546f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java @@ -94,6 +94,10 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler; + import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; @@ -101,7 +105,6 @@ import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecret import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.Before; import org.junit.Test; @@ -551,6 +554,35 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest { throw new YarnException("Reject this container"); } } + @Override + protected ContainerScheduler createContainerScheduler(Context context) { + return new ContainerScheduler(context, dispatcher, metrics){ + @Override + public ContainersMonitor getContainersMonitor() { + return new ContainersMonitorImpl(null, null, null) { + @Override + public float getVmemRatio() { + return 2.0f; + } + + @Override + public long getVmemAllocatedForContainers() { + return 20480; + } + + @Override + public long getPmemAllocatedForContainers() { + return 10240; + } + + @Override + public long getVCoresAllocatedForContainers() { + return 4; + } + }; + } + }; + } }; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRegression.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRegression.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRegression.java deleted file mode 100644 index 71af76f..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRegression.java +++ /dev/null @@ -1,84 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.hadoop.yarn.server.nodemanager.containermanager; - -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.UnsupportedFileSystemException; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.security.NMTokenIdentifier; -import org.apache.hadoop.yarn.server.nodemanager.DeletionService; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing.QueuingContainerManagerImpl; - -/** - * Test class that invokes all test cases of {@link TestContainerManager} while - * using the {@link QueuingContainerManagerImpl}. The goal is to assert that - * no regression is introduced in the existing cases when no queuing of tasks at - * the NMs is involved. - */ -public class TestContainerManagerRegression extends TestContainerManager { - - public TestContainerManagerRegression() - throws UnsupportedFileSystemException { - super(); - } - - static { - LOG = LogFactory.getLog(TestContainerManagerRegression.class); - } - - @Override - protected ContainerManagerImpl createContainerManager( - DeletionService delSrvc) { - return new QueuingContainerManagerImpl(context, exec, delSrvc, - nodeStatusUpdater, metrics, dirsHandler) { - @Override - public void - setBlockNewContainerRequests(boolean blockNewContainerRequests) { - // do nothing - } - - @Override - protected UserGroupInformation getRemoteUgi() throws YarnException { - ApplicationId appId = ApplicationId.newInstance(0, 0); - ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( - appId, 1); - UserGroupInformation ugi = UserGroupInformation.createRemoteUser( - appAttemptId.toString()); - ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context - .getNodeId(), user, context.getNMTokenSecretManager() - .getCurrentKey().getKeyId())); - return ugi; - } - - @Override - protected void authorizeGetAndStopContainerRequest( - ContainerId containerId, Container container, boolean stopRequest, - NMTokenIdentifier identifier) throws YarnException { - if (container == null || container.getUser().equals("Fail")) { - throw new YarnException("Reject this container"); - } - } - }; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java index 766a1f9..33f4609 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java @@ -27,6 +27,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.atLeastOnce; import java.io.IOException; import java.net.URISyntaxException; @@ -90,6 +91,11 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.eve import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerMetrics; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType; + + +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEventType; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; @@ -143,7 +149,7 @@ public class TestContainer { Map<Path, List<String>> localPaths = wc.localizeResources(); // all resources should be localized - assertEquals(ContainerState.LOCALIZED, wc.c.getContainerState()); + assertEquals(ContainerState.SCHEDULED, wc.c.getContainerState()); assertNotNull(wc.c.getLocalizedResources()); for (Entry<Path, List<String>> loc : wc.c.getLocalizedResources() .entrySet()) { @@ -421,7 +427,7 @@ public class TestContainer { wc = new WrappedContainer(17, 314159265358979L, 4344, "yak"); wc.initContainer(); wc.localizeResources(); - assertEquals(ContainerState.LOCALIZED, wc.c.getContainerState()); + assertEquals(ContainerState.SCHEDULED, wc.c.getContainerState()); ContainerLaunch launcher = wc.launcher.running.get(wc.c.getContainerId()); wc.killContainer(); assertEquals(ContainerState.KILLING, wc.c.getContainerState()); @@ -452,7 +458,7 @@ public class TestContainer { wc = new WrappedContainer(17, 314159265358979L, 4344, "yak"); wc.initContainer(); wc.localizeResources(); - assertEquals(ContainerState.LOCALIZED, wc.c.getContainerState()); + assertEquals(ContainerState.SCHEDULED, wc.c.getContainerState()); wc.killContainer(); assertEquals(ContainerState.KILLING, wc.c.getContainerState()); wc.containerSuccessful(); @@ -480,7 +486,7 @@ public class TestContainer { wc = new WrappedContainer(17, 314159265358979L, 4344, "yak"); wc.initContainer(); wc.localizeResources(); - assertEquals(ContainerState.LOCALIZED, wc.c.getContainerState()); + assertEquals(ContainerState.SCHEDULED, wc.c.getContainerState()); wc.killContainer(); assertEquals(ContainerState.KILLING, wc.c.getContainerState()); wc.containerFailed(ExitCode.FORCE_KILLED.getExitCode()); @@ -507,7 +513,7 @@ public class TestContainer { wc = new WrappedContainer(17, 314159265358979L, 4344, "yak"); wc.initContainer(); wc.localizeResources(); - assertEquals(ContainerState.LOCALIZED, wc.c.getContainerState()); + assertEquals(ContainerState.SCHEDULED, wc.c.getContainerState()); ContainerLaunch launcher = wc.launcher.running.get(wc.c.getContainerId()); launcher.call(); wc.drainDispatcherEvents(); @@ -764,7 +770,7 @@ public class TestContainer { new ResourcesReleasedMatcher(wc.localResources, EnumSet.of( LocalResourceVisibility.PUBLIC, LocalResourceVisibility.PRIVATE, LocalResourceVisibility.APPLICATION)); - verify(wc.localizerBus).handle(argThat(matchesReq)); + verify(wc.localizerBus, atLeastOnce()).handle(argThat(matchesReq)); } private void verifyOutofBandHeartBeat(WrappedContainer wc) { @@ -890,6 +896,7 @@ public class TestContainer { final EventHandler<AuxServicesEvent> auxBus; final EventHandler<ApplicationEvent> appBus; final EventHandler<LogHandlerEvent> LogBus; + final EventHandler<ContainerSchedulerEvent> schedBus; final ContainersLauncher launcher; final ContainerLaunchContext ctxt; @@ -927,9 +934,16 @@ public class TestContainer { auxBus = mock(EventHandler.class); appBus = mock(EventHandler.class); LogBus = mock(EventHandler.class); + schedBus = new ContainerScheduler(context, dispatcher, metrics, 0) { + @Override + protected void scheduleContainer(Container container) { + container.sendLaunchEvent(); + } + }; dispatcher.register(LocalizationEventType.class, localizerBus); dispatcher.register(ContainersLauncherEventType.class, launcherBus); dispatcher.register(ContainersMonitorEventType.class, monitorBus); + dispatcher.register(ContainerSchedulerEventType.class, schedBus); dispatcher.register(AuxServicesEventType.class, auxBus); dispatcher.register(ApplicationEventType.class, appBus); dispatcher.register(LogHandlerEventType.class, LogBus); http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java deleted file mode 100644 index 7f06afa..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java +++ /dev/null @@ -1,596 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.UnsupportedFileSystemException; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; -import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; -import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; -import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.ExecutionType; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.security.NMTokenIdentifier; -import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; -import org.apache.hadoop.yarn.server.nodemanager.DeletionService; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; -import org.junit.Assert; -import org.junit.Test; - -/** - * Class for testing the {@link QueuingContainerManagerImpl}. - */ -public class TestQueuingContainerManager extends BaseContainerManagerTest { - public TestQueuingContainerManager() throws UnsupportedFileSystemException { - super(); - } - - static { - LOG = LogFactory.getLog(TestQueuingContainerManager.class); - } - - boolean shouldDeleteWait = false; - - @Override - protected ContainerManagerImpl createContainerManager( - DeletionService delSrvc) { - return new QueuingContainerManagerImpl(context, exec, delSrvc, - nodeStatusUpdater, metrics, dirsHandler) { - @Override - public void - setBlockNewContainerRequests(boolean blockNewContainerRequests) { - // do nothing - } - - @Override - protected UserGroupInformation getRemoteUgi() throws YarnException { - ApplicationId appId = ApplicationId.newInstance(0, 0); - ApplicationAttemptId appAttemptId = - ApplicationAttemptId.newInstance(appId, 1); - UserGroupInformation ugi = - UserGroupInformation.createRemoteUser(appAttemptId.toString()); - ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context - .getNodeId(), user, context.getNMTokenSecretManager().getCurrentKey() - .getKeyId())); - return ugi; - } - - @Override - protected ContainersMonitor createContainersMonitor( - ContainerExecutor exec) { - return new ContainersMonitorImpl(exec, dispatcher, this.context) { - // Define resources available for containers to be executed. - @Override - public long getPmemAllocatedForContainers() { - return 2048 * 1024 * 1024L; - } - - @Override - public long getVmemAllocatedForContainers() { - float pmemRatio = getConfig().getFloat( - YarnConfiguration.NM_VMEM_PMEM_RATIO, - YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); - return (long) (pmemRatio * getPmemAllocatedForContainers()); - } - - @Override - public long getVCoresAllocatedForContainers() { - return 4; - } - }; - } - }; - } - - @Override - protected DeletionService createDeletionService() { - return new DeletionService(exec) { - @Override - public void delete(String user, Path subDir, Path... baseDirs) { - // Don't do any deletions. - if (shouldDeleteWait) { - try { - Thread.sleep(10000); - LOG.info("\n\nSleeping Pseudo delete : user - " + user + ", " + - "subDir - " + subDir + ", " + - "baseDirs - " + Arrays.asList(baseDirs)); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } else { - LOG.info("\n\nPseudo delete : user - " + user + ", " + - "subDir - " + subDir + ", " + - "baseDirs - " + Arrays.asList(baseDirs)); - } - } - }; - } - - @Override - public void setup() throws IOException { - super.setup(); - shouldDeleteWait = false; - } - - /** - * Starting one GUARANTEED and one OPPORTUNISTIC container. - * @throws Exception - */ - @Test - public void testStartMultipleContainers() throws Exception { - shouldDeleteWait = true; - containerManager.start(); - - ContainerLaunchContext containerLaunchContext = - recordFactory.newRecordInstance(ContainerLaunchContext.class); - - List<StartContainerRequest> list = new ArrayList<>(); - list.add(StartContainerRequest.newInstance( - containerLaunchContext, - createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER, - context.getNodeId(), - user, BuilderUtils.newResource(1024, 1), - context.getContainerTokenSecretManager(), null, - ExecutionType.GUARANTEED))); - list.add(StartContainerRequest.newInstance( - containerLaunchContext, - createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER, - context.getNodeId(), - user, BuilderUtils.newResource(1024, 1), - context.getContainerTokenSecretManager(), null, - ExecutionType.OPPORTUNISTIC))); - - StartContainersRequest allRequests = - StartContainersRequest.newInstance(list); - containerManager.startContainers(allRequests); - - BaseContainerManagerTest.waitForContainerState(containerManager, - createContainerId(0), - org.apache.hadoop.yarn.api.records.ContainerState.RUNNING); - BaseContainerManagerTest.waitForContainerState(containerManager, - createContainerId(1), - org.apache.hadoop.yarn.api.records.ContainerState.RUNNING); - - // Ensure all containers are running. - List<ContainerId> statList = new ArrayList<ContainerId>(); - for (int i = 0; i < 2; i++) { - statList.add(createContainerId(i)); - } - GetContainerStatusesRequest statRequest = - GetContainerStatusesRequest.newInstance(statList); - List<ContainerStatus> containerStatuses = containerManager - .getContainerStatuses(statRequest).getContainerStatuses(); - for (ContainerStatus status : containerStatuses) { - Assert.assertEquals( - org.apache.hadoop.yarn.api.records.ContainerState.RUNNING, - status.getState()); - } - } - - /** - * Submit both a GUARANTEED and an OPPORTUNISTIC container, each of which - * requires more resources than available at the node, and make sure they - * are both queued. - * @throws Exception - */ - @Test - public void testQueueMultipleContainers() throws Exception { - shouldDeleteWait = true; - containerManager.start(); - - ContainerLaunchContext containerLaunchContext = - recordFactory.newRecordInstance(ContainerLaunchContext.class); - - List<StartContainerRequest> list = new ArrayList<>(); - list.add(StartContainerRequest.newInstance( - containerLaunchContext, - createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER, - context.getNodeId(), - user, BuilderUtils.newResource(3072, 1), - context.getContainerTokenSecretManager(), null, - ExecutionType.GUARANTEED))); - list.add(StartContainerRequest.newInstance( - containerLaunchContext, - createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER, - context.getNodeId(), - user, BuilderUtils.newResource(3072, 1), - context.getContainerTokenSecretManager(), null, - ExecutionType.OPPORTUNISTIC))); - - StartContainersRequest allRequests = - StartContainersRequest.newInstance(list); - containerManager.startContainers(allRequests); - - Thread.sleep(5000); - - // Ensure both containers are queued. - List<ContainerId> statList = new ArrayList<ContainerId>(); - for (int i = 0; i < 2; i++) { - statList.add(createContainerId(i)); - } - GetContainerStatusesRequest statRequest = - GetContainerStatusesRequest.newInstance(statList); - List<ContainerStatus> containerStatuses = containerManager - .getContainerStatuses(statRequest).getContainerStatuses(); - for (ContainerStatus status : containerStatuses) { - Assert.assertEquals( - org.apache.hadoop.yarn.api.records.ContainerState.QUEUED, - status.getState()); - } - - // Ensure both containers are properly queued. - Assert.assertEquals(2, containerManager.getContext().getQueuingContext() - .getQueuedContainers().size()); - Assert.assertEquals(1, ((QueuingContainerManagerImpl) containerManager) - .getNumQueuedGuaranteedContainers()); - Assert.assertEquals(1, ((QueuingContainerManagerImpl) containerManager) - .getNumQueuedOpportunisticContainers()); - } - - /** - * Starts one OPPORTUNISTIC container that takes up the whole node's - * resources, and submit two more that will be queued. - * @throws Exception - */ - @Test - public void testStartAndQueueMultipleContainers() throws Exception { - shouldDeleteWait = true; - containerManager.start(); - - ContainerLaunchContext containerLaunchContext = - recordFactory.newRecordInstance(ContainerLaunchContext.class); - - List<StartContainerRequest> list = new ArrayList<>(); - list.add(StartContainerRequest.newInstance( - containerLaunchContext, - createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER, - context.getNodeId(), - user, BuilderUtils.newResource(2048, 1), - context.getContainerTokenSecretManager(), null, - ExecutionType.OPPORTUNISTIC))); - list.add(StartContainerRequest.newInstance( - containerLaunchContext, - createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER, - context.getNodeId(), - user, BuilderUtils.newResource(1024, 1), - context.getContainerTokenSecretManager(), null, - ExecutionType.OPPORTUNISTIC))); - list.add(StartContainerRequest.newInstance( - containerLaunchContext, - createContainerToken(createContainerId(2), DUMMY_RM_IDENTIFIER, - context.getNodeId(), - user, BuilderUtils.newResource(1024, 1), - context.getContainerTokenSecretManager(), null, - ExecutionType.OPPORTUNISTIC))); - - StartContainersRequest allRequests = - StartContainersRequest.newInstance(list); - containerManager.startContainers(allRequests); - - Thread.sleep(5000); - - // Ensure first container is running and others are queued. - List<ContainerId> statList = new ArrayList<ContainerId>(); - for (int i = 0; i < 3; i++) { - statList.add(createContainerId(i)); - } - GetContainerStatusesRequest statRequest = GetContainerStatusesRequest - .newInstance(Arrays.asList(createContainerId(0))); - List<ContainerStatus> containerStatuses = containerManager - .getContainerStatuses(statRequest).getContainerStatuses(); - for (ContainerStatus status : containerStatuses) { - if (status.getContainerId().equals(createContainerId(0))) { - Assert.assertEquals( - org.apache.hadoop.yarn.api.records.ContainerState.RUNNING, - status.getState()); - } else { - Assert.assertEquals( - org.apache.hadoop.yarn.api.records.ContainerState.QUEUED, - status.getState()); - } - } - - // Ensure two containers are properly queued. - Assert.assertEquals(2, containerManager.getContext().getQueuingContext() - .getQueuedContainers().size()); - Assert.assertEquals(0, ((QueuingContainerManagerImpl) containerManager) - .getNumQueuedGuaranteedContainers()); - Assert.assertEquals(2, ((QueuingContainerManagerImpl) containerManager) - .getNumQueuedOpportunisticContainers()); - } - - /** - * Submit two OPPORTUNISTIC and one GUARANTEED containers. The resources - * requests by each container as such that only one can run in parallel. - * Thus, the OPPORTUNISTIC container that started running, will be - * killed for the GUARANTEED container to start. - * Once the GUARANTEED container finishes its execution, the remaining - * OPPORTUNISTIC container will be executed. - * @throws Exception - */ - @Test - public void testKillOpportunisticForGuaranteedContainer() throws Exception { - shouldDeleteWait = true; - containerManager.start(); - - ContainerLaunchContext containerLaunchContext = - recordFactory.newRecordInstance(ContainerLaunchContext.class); - - List<StartContainerRequest> list = new ArrayList<>(); - list.add(StartContainerRequest.newInstance( - containerLaunchContext, - createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER, - context.getNodeId(), - user, BuilderUtils.newResource(2048, 1), - context.getContainerTokenSecretManager(), null, - ExecutionType.OPPORTUNISTIC))); - list.add(StartContainerRequest.newInstance( - containerLaunchContext, - createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER, - context.getNodeId(), - user, BuilderUtils.newResource(2048, 1), - context.getContainerTokenSecretManager(), null, - ExecutionType.OPPORTUNISTIC))); - list.add(StartContainerRequest.newInstance( - containerLaunchContext, - createContainerToken(createContainerId(2), DUMMY_RM_IDENTIFIER, - context.getNodeId(), - user, BuilderUtils.newResource(2048, 1), - context.getContainerTokenSecretManager(), null, - ExecutionType.GUARANTEED))); - - StartContainersRequest allRequests = - StartContainersRequest.newInstance(list); - containerManager.startContainers(allRequests); - - BaseContainerManagerTest.waitForNMContainerState(containerManager, - createContainerId(0), ContainerState.DONE, 40); - Thread.sleep(5000); - - // Get container statuses. Container 0 should be killed, container 1 - // should be queued and container 2 should be running. - List<ContainerId> statList = new ArrayList<ContainerId>(); - for (int i = 0; i < 3; i++) { - statList.add(createContainerId(i)); - } - GetContainerStatusesRequest statRequest = - GetContainerStatusesRequest.newInstance(statList); - List<ContainerStatus> containerStatuses = containerManager - .getContainerStatuses(statRequest).getContainerStatuses(); - for (ContainerStatus status : containerStatuses) { - if (status.getContainerId().equals(createContainerId(0))) { - Assert.assertTrue(status.getDiagnostics() - .contains("Container killed by the ApplicationMaster")); - } else if (status.getContainerId().equals(createContainerId(1))) { - Assert.assertEquals( - org.apache.hadoop.yarn.api.records.ContainerState.QUEUED, - status.getState()); - } else if (status.getContainerId().equals(createContainerId(2))) { - Assert.assertEquals( - org.apache.hadoop.yarn.api.records.ContainerState.RUNNING, - status.getState()); - } - System.out.println("\nStatus : [" + status + "]\n"); - } - - // Make sure the remaining OPPORTUNISTIC container starts its execution. - BaseContainerManagerTest.waitForNMContainerState(containerManager, - createContainerId(2), ContainerState.DONE, 40); - Thread.sleep(5000); - statRequest = GetContainerStatusesRequest.newInstance(Arrays.asList( - createContainerId(1))); - ContainerStatus contStatus1 = containerManager.getContainerStatuses( - statRequest).getContainerStatuses().get(0); - Assert.assertEquals( - org.apache.hadoop.yarn.api.records.ContainerState.RUNNING, - contStatus1.getState()); - } - - /** - * Submit three OPPORTUNISTIC containers that can run concurrently, and one - * GUARANTEED that needs to kill two of the OPPORTUNISTIC for it to run. - * @throws Exception - */ - @Test - public void testKillMultipleOpportunisticContainers() throws Exception { - shouldDeleteWait = true; - containerManager.start(); - - ContainerLaunchContext containerLaunchContext = - recordFactory.newRecordInstance(ContainerLaunchContext.class); - - List<StartContainerRequest> list = new ArrayList<>(); - list.add(StartContainerRequest.newInstance( - containerLaunchContext, - createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER, - context.getNodeId(), - user, BuilderUtils.newResource(512, 1), - context.getContainerTokenSecretManager(), null, - ExecutionType.OPPORTUNISTIC))); - list.add(StartContainerRequest.newInstance( - containerLaunchContext, - createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER, - context.getNodeId(), - user, BuilderUtils.newResource(512, 1), - context.getContainerTokenSecretManager(), null, - ExecutionType.OPPORTUNISTIC))); - list.add(StartContainerRequest.newInstance( - containerLaunchContext, - createContainerToken(createContainerId(2), DUMMY_RM_IDENTIFIER, - context.getNodeId(), - user, BuilderUtils.newResource(512, 1), - context.getContainerTokenSecretManager(), null, - ExecutionType.OPPORTUNISTIC))); - list.add(StartContainerRequest.newInstance( - containerLaunchContext, - createContainerToken(createContainerId(3), DUMMY_RM_IDENTIFIER, - context.getNodeId(), - user, BuilderUtils.newResource(1500, 1), - context.getContainerTokenSecretManager(), null, - ExecutionType.GUARANTEED))); - - StartContainersRequest allRequests = - StartContainersRequest.newInstance(list); - containerManager.startContainers(allRequests); - - BaseContainerManagerTest.waitForNMContainerState( - containerManager, createContainerId(0), - Arrays.asList(ContainerState.DONE, - ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL), 40); - Thread.sleep(5000); - - // Get container statuses. Container 0 should be killed, container 1 - // should be queued and container 2 should be running. - int killedContainers = 0; - List<ContainerId> statList = new ArrayList<ContainerId>(); - for (int i = 0; i < 4; i++) { - statList.add(createContainerId(i)); - } - GetContainerStatusesRequest statRequest = - GetContainerStatusesRequest.newInstance(statList); - List<ContainerStatus> containerStatuses = containerManager - .getContainerStatuses(statRequest).getContainerStatuses(); - for (ContainerStatus status : containerStatuses) { - if (status.getDiagnostics().contains( - "Container killed by the ApplicationMaster")) { - killedContainers++; - } - System.out.println("\nStatus : [" + status + "]\n"); - } - - Assert.assertEquals(2, killedContainers); - } - - /** - * Start running one GUARANTEED container and queue two OPPORTUNISTIC ones. - * Try killing one of the two queued containers. - * @throws Exception - */ - @Test - public void testStopQueuedContainer() throws Exception { - shouldDeleteWait = true; - containerManager.start(); - - ContainerLaunchContext containerLaunchContext = - recordFactory.newRecordInstance(ContainerLaunchContext.class); - - List<StartContainerRequest> list = new ArrayList<>(); - list.add(StartContainerRequest.newInstance( - containerLaunchContext, - createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER, - context.getNodeId(), - user, BuilderUtils.newResource(2048, 1), - context.getContainerTokenSecretManager(), null, - ExecutionType.GUARANTEED))); - list.add(StartContainerRequest.newInstance( - containerLaunchContext, - createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER, - context.getNodeId(), - user, BuilderUtils.newResource(512, 1), - context.getContainerTokenSecretManager(), null, - ExecutionType.OPPORTUNISTIC))); - list.add(StartContainerRequest.newInstance( - containerLaunchContext, - createContainerToken(createContainerId(2), DUMMY_RM_IDENTIFIER, - context.getNodeId(), - user, BuilderUtils.newResource(512, 1), - context.getContainerTokenSecretManager(), null, - ExecutionType.OPPORTUNISTIC))); - - StartContainersRequest allRequests = - StartContainersRequest.newInstance(list); - containerManager.startContainers(allRequests); - - Thread.sleep(2000); - - // Assert there is initially one container running and two queued. - int runningContainersNo = 0; - int queuedContainersNo = 0; - List<ContainerId> statList = new ArrayList<ContainerId>(); - for (int i = 0; i < 3; i++) { - statList.add(createContainerId(i)); - } - GetContainerStatusesRequest statRequest = GetContainerStatusesRequest - .newInstance(statList); - List<ContainerStatus> containerStatuses = containerManager - .getContainerStatuses(statRequest).getContainerStatuses(); - for (ContainerStatus status : containerStatuses) { - if (status.getState() == - org.apache.hadoop.yarn.api.records.ContainerState.RUNNING) { - runningContainersNo++; - } else if (status.getState() == - org.apache.hadoop.yarn.api.records.ContainerState.QUEUED) { - queuedContainersNo++; - } - System.out.println("\nStatus : [" + status + "]\n"); - } - - Assert.assertEquals(1, runningContainersNo); - Assert.assertEquals(2, queuedContainersNo); - - // Stop one of the two queued containers. - StopContainersRequest stopRequest = StopContainersRequest. - newInstance(Arrays.asList(createContainerId(1))); - containerManager.stopContainers(stopRequest); - - Thread.sleep(2000); - - // Assert queued container got properly stopped. - statList.clear(); - for (int i = 0; i < 3; i++) { - statList.add(createContainerId(i)); - } - statRequest = GetContainerStatusesRequest.newInstance(statList); - containerStatuses = containerManager.getContainerStatuses(statRequest) - .getContainerStatuses(); - for (ContainerStatus status : containerStatuses) { - if (status.getContainerId().equals(createContainerId(0))) { - Assert.assertEquals( - org.apache.hadoop.yarn.api.records.ContainerState.RUNNING, - status.getState()); - } else if (status.getContainerId().equals(createContainerId(1))) { - Assert.assertTrue(status.getDiagnostics().contains( - "Queued container request removed")); - } else if (status.getContainerId().equals(createContainerId(2))) { - Assert.assertEquals( - org.apache.hadoop.yarn.api.records.ContainerState.QUEUED, - status.getState()); - } - System.out.println("\nStatus : [" + status + "]\n"); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java new file mode 100644 index 0000000..24e388f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java @@ -0,0 +1,872 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.NMTokenIdentifier; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.junit.Assert; +import org.junit.Test; + +import static org.mockito.Mockito.spy; + +/** + * Tests to verify that the {@link ContainerScheduler} is able to queue and + * make room for containers. + */ +public class TestContainerSchedulerQueuing extends BaseContainerManagerTest { + public TestContainerSchedulerQueuing() throws UnsupportedFileSystemException { + super(); + } + + static { + LOG = LogFactory.getLog(TestContainerSchedulerQueuing.class); + } + + private boolean delayContainers = true; + + @Override + protected ContainerManagerImpl createContainerManager( + DeletionService delSrvc) { + return new ContainerManagerImpl(context, exec, delSrvc, + nodeStatusUpdater, metrics, dirsHandler) { + @Override + public void + setBlockNewContainerRequests(boolean blockNewContainerRequests) { + // do nothing + } + + @Override + protected UserGroupInformation getRemoteUgi() throws YarnException { + ApplicationId appId = ApplicationId.newInstance(0, 0); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + UserGroupInformation ugi = + UserGroupInformation.createRemoteUser(appAttemptId.toString()); + ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context + .getNodeId(), user, context.getNMTokenSecretManager().getCurrentKey() + .getKeyId())); + return ugi; + } + + @Override + protected ContainersMonitor createContainersMonitor( + ContainerExecutor exec) { + return new ContainersMonitorImpl(exec, dispatcher, this.context) { + // Define resources available for containers to be executed. + @Override + public long getPmemAllocatedForContainers() { + return 2048 * 1024 * 1024L; + } + + @Override + public long getVmemAllocatedForContainers() { + float pmemRatio = getConfig().getFloat( + YarnConfiguration.NM_VMEM_PMEM_RATIO, + YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); + return (long) (pmemRatio * getPmemAllocatedForContainers()); + } + + @Override + public long getVCoresAllocatedForContainers() { + return 4; + } + }; + } + }; + } + + @Override + protected ContainerExecutor createContainerExecutor() { + DefaultContainerExecutor exec = new DefaultContainerExecutor() { + @Override + public int launchContainer(ContainerStartContext ctx) throws IOException { + if (delayContainers) { + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + // Nothing.. + } + } + return super.launchContainer(ctx); + } + }; + exec.setConf(conf); + return spy(exec); + } + + @Override + public void setup() throws IOException { + conf.setInt( + YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 10); + super.setup(); + } + + /** + * Starting one GUARANTEED and one OPPORTUNISTIC container. + * @throws Exception + */ + @Test + public void testStartMultipleContainers() throws Exception { + containerManager.start(); + + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + + List<StartContainerRequest> list = new ArrayList<>(); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(1024, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.GUARANTEED))); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(1024, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + + BaseContainerManagerTest.waitForContainerState(containerManager, + createContainerId(0), + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING); + BaseContainerManagerTest.waitForContainerState(containerManager, + createContainerId(1), + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING); + + // Ensure all containers are running. + List<ContainerId> statList = new ArrayList<ContainerId>(); + for (int i = 0; i < 2; i++) { + statList.add(createContainerId(i)); + } + GetContainerStatusesRequest statRequest = + GetContainerStatusesRequest.newInstance(statList); + List<ContainerStatus> containerStatuses = containerManager + .getContainerStatuses(statRequest).getContainerStatuses(); + for (ContainerStatus status : containerStatuses) { + Assert.assertEquals( + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING, + status.getState()); + } + } + + /** + * Submit both a GUARANTEED and an OPPORTUNISTIC container, each of which + * requires more resources than available at the node, and make sure they + * are both queued. + * @throws Exception + */ + @Test + public void testQueueMultipleContainers() throws Exception { + containerManager.start(); + + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + + List<StartContainerRequest> list = new ArrayList<>(); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(3072, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.GUARANTEED))); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(3072, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + + Thread.sleep(5000); + + // Ensure both containers are queued. + List<ContainerId> statList = new ArrayList<ContainerId>(); + for (int i = 0; i < 2; i++) { + statList.add(createContainerId(i)); + } + GetContainerStatusesRequest statRequest = + GetContainerStatusesRequest.newInstance(statList); + List<ContainerStatus> containerStatuses = containerManager + .getContainerStatuses(statRequest).getContainerStatuses(); + for (ContainerStatus status : containerStatuses) { + Assert.assertEquals( + org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED, + status.getState()); + } + + ContainerScheduler containerScheduler = + containerManager.getContainerScheduler(); + // Ensure both containers are properly queued. + Assert.assertEquals(2, containerScheduler.getNumQueuedContainers()); + Assert.assertEquals(1, + containerScheduler.getNumQueuedGuaranteedContainers()); + Assert.assertEquals(1, + containerScheduler.getNumQueuedOpportunisticContainers()); + } + + /** + * Starts one OPPORTUNISTIC container that takes up the whole node's + * resources, and submit two more that will be queued. + * @throws Exception + */ + @Test + public void testStartAndQueueMultipleContainers() throws Exception { + containerManager.start(); + + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + + List<StartContainerRequest> list = new ArrayList<>(); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(2048, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(1024, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(2), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(1024, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + + Thread.sleep(5000); + + // Ensure first container is running and others are queued. + List<ContainerId> statList = new ArrayList<ContainerId>(); + for (int i = 0; i < 3; i++) { + statList.add(createContainerId(i)); + } + GetContainerStatusesRequest statRequest = GetContainerStatusesRequest + .newInstance(Arrays.asList(createContainerId(0))); + List<ContainerStatus> containerStatuses = containerManager + .getContainerStatuses(statRequest).getContainerStatuses(); + for (ContainerStatus status : containerStatuses) { + if (status.getContainerId().equals(createContainerId(0))) { + Assert.assertEquals( + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING, + status.getState()); + } else { + Assert.assertEquals( + org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED, + status.getState()); + } + } + + ContainerScheduler containerScheduler = + containerManager.getContainerScheduler(); + // Ensure two containers are properly queued. + Assert.assertEquals(2, containerScheduler.getNumQueuedContainers()); + Assert.assertEquals(0, + containerScheduler.getNumQueuedGuaranteedContainers()); + Assert.assertEquals(2, + containerScheduler.getNumQueuedOpportunisticContainers()); + } + + /** + * Submit two OPPORTUNISTIC and one GUARANTEED containers. The resources + * requests by each container as such that only one can run in parallel. + * Thus, the OPPORTUNISTIC container that started running, will be + * killed for the GUARANTEED container to start. + * Once the GUARANTEED container finishes its execution, the remaining + * OPPORTUNISTIC container will be executed. + * @throws Exception + */ + @Test + public void testKillOpportunisticForGuaranteedContainer() throws Exception { + containerManager.start(); + + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + + List<StartContainerRequest> list = new ArrayList<>(); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(2048, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(2048, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(2), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(2048, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.GUARANTEED))); + + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + + BaseContainerManagerTest.waitForNMContainerState(containerManager, + createContainerId(0), ContainerState.DONE, 40); + Thread.sleep(5000); + + // Get container statuses. Container 0 should be killed, container 1 + // should be queued and container 2 should be running. + List<ContainerId> statList = new ArrayList<ContainerId>(); + for (int i = 0; i < 3; i++) { + statList.add(createContainerId(i)); + } + GetContainerStatusesRequest statRequest = + GetContainerStatusesRequest.newInstance(statList); + List<ContainerStatus> containerStatuses = containerManager + .getContainerStatuses(statRequest).getContainerStatuses(); + for (ContainerStatus status : containerStatuses) { + if (status.getContainerId().equals(createContainerId(0))) { + Assert.assertTrue(status.getDiagnostics().contains( + "Container Killed to make room for Guaranteed Container")); + } else if (status.getContainerId().equals(createContainerId(1))) { + Assert.assertEquals( + org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED, + status.getState()); + } else if (status.getContainerId().equals(createContainerId(2))) { + Assert.assertEquals( + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING, + status.getState()); + } + System.out.println("\nStatus : [" + status + "]\n"); + } + + // Make sure the remaining OPPORTUNISTIC container starts its execution. + BaseContainerManagerTest.waitForNMContainerState(containerManager, + createContainerId(2), ContainerState.DONE, 40); + Thread.sleep(5000); + statRequest = GetContainerStatusesRequest.newInstance(Arrays.asList( + createContainerId(1))); + ContainerStatus contStatus1 = containerManager.getContainerStatuses( + statRequest).getContainerStatuses().get(0); + Assert.assertEquals( + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING, + contStatus1.getState()); + } + + /** + * 1. Submit a long running GUARANTEED container to hog all NM resources. + * 2. Submit 6 OPPORTUNISTIC containers, all of which will be queued. + * 3. Update the Queue Limit to 2. + * 4. Ensure only 2 containers remain in the Queue, and 4 are de-Queued. + * @throws Exception + */ + @Test + public void testQueueShedding() throws Exception { + containerManager.start(); + + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + containerLaunchContext.setCommands(Arrays.asList("sleep 100")); + + List<StartContainerRequest> list = new ArrayList<>(); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(2048, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.GUARANTEED))); + + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + + list = new ArrayList<>(); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(512, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(2), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(512, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(3), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(512, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(4), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(512, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(5), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(512, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(6), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(512, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + + allRequests = StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + + ContainerScheduler containerScheduler = + containerManager.getContainerScheduler(); + // Ensure all containers are properly queued. + int numTries = 30; + while ((containerScheduler.getNumQueuedContainers() < 6) && + (numTries-- > 0)) { + Thread.sleep(100); + } + Assert.assertEquals(6, containerScheduler.getNumQueuedContainers()); + + ContainerQueuingLimit containerQueuingLimit = ContainerQueuingLimit + .newInstance(); + containerQueuingLimit.setMaxQueueLength(2); + containerScheduler.updateQueuingLimit(containerQueuingLimit); + numTries = 30; + while ((containerScheduler.getNumQueuedContainers() > 2) && + (numTries-- > 0)) { + Thread.sleep(100); + } + Assert.assertEquals(2, containerScheduler.getNumQueuedContainers()); + + List<ContainerId> statList = new ArrayList<ContainerId>(); + for (int i = 1; i < 7; i++) { + statList.add(createContainerId(i)); + } + GetContainerStatusesRequest statRequest = + GetContainerStatusesRequest.newInstance(statList); + List<ContainerStatus> containerStatuses = containerManager + .getContainerStatuses(statRequest).getContainerStatuses(); + + int deQueuedContainers = 0; + int numQueuedOppContainers = 0; + for (ContainerStatus status : containerStatuses) { + if (status.getExecutionType() == ExecutionType.OPPORTUNISTIC) { + if (status.getDiagnostics().contains( + "Container De-queued to meet NM queuing limits")) { + deQueuedContainers++; + } + if (status.getState() == + org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED) { + numQueuedOppContainers++; + } + } + } + Assert.assertEquals(4, deQueuedContainers); + Assert.assertEquals(2, numQueuedOppContainers); + } + + /** + * 1. Submit a long running GUARANTEED container to hog all NM resources. + * 2. Submit 2 OPPORTUNISTIC containers, both of which will be queued. + * 3. Send Stop Container to one of the queued containers. + * 4. Ensure container is removed from the queue. + * @throws Exception + */ + @Test + public void testContainerDeQueuedAfterAMKill() throws Exception { + containerManager.start(); + + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + containerLaunchContext.setCommands(Arrays.asList("sleep 100")); + + List<StartContainerRequest> list = new ArrayList<>(); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(2048, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.GUARANTEED))); + + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + + list = new ArrayList<>(); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(512, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(2), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(512, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + + allRequests = StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + + ContainerScheduler containerScheduler = + containerManager.getContainerScheduler(); + // Ensure both containers are properly queued. + int numTries = 30; + while ((containerScheduler.getNumQueuedContainers() < 2) && + (numTries-- > 0)) { + Thread.sleep(100); + } + Assert.assertEquals(2, containerScheduler.getNumQueuedContainers()); + + containerManager.stopContainers( + StopContainersRequest.newInstance(Arrays.asList(createContainerId(2)))); + + numTries = 30; + while ((containerScheduler.getNumQueuedContainers() > 1) && + (numTries-- > 0)) { + Thread.sleep(100); + } + Assert.assertEquals(1, containerScheduler.getNumQueuedContainers()); + } + + /** + * Submit three OPPORTUNISTIC containers that can run concurrently, and one + * GUARANTEED that needs to kill two of the OPPORTUNISTIC for it to run. + * @throws Exception + */ + @Test + public void testKillMultipleOpportunisticContainers() throws Exception { + containerManager.start(); + + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + + List<StartContainerRequest> list = new ArrayList<>(); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(512, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(512, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(2), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(512, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + + list = new ArrayList<>(); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(3), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(1500, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.GUARANTEED))); + + allRequests = StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + + BaseContainerManagerTest.waitForNMContainerState( + containerManager, createContainerId(0), + Arrays.asList(ContainerState.DONE, + ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL), 40); + Thread.sleep(5000); + + // Get container statuses. Container 0 should be killed, container 1 + // should be queued and container 2 should be running. + int killedContainers = 0; + List<ContainerId> statList = new ArrayList<ContainerId>(); + for (int i = 0; i < 4; i++) { + statList.add(createContainerId(i)); + } + GetContainerStatusesRequest statRequest = + GetContainerStatusesRequest.newInstance(statList); + List<ContainerStatus> containerStatuses = containerManager + .getContainerStatuses(statRequest).getContainerStatuses(); + for (ContainerStatus status : containerStatuses) { + if (status.getDiagnostics().contains( + "Container Killed to make room for Guaranteed Container")) { + killedContainers++; + } + System.out.println("\nStatus : [" + status + "]\n"); + } + + Assert.assertEquals(2, killedContainers); + } + + /** + * Submit four OPPORTUNISTIC containers that can run concurrently, and then + * two GUARANTEED that needs to kill Exactly two of the OPPORTUNISTIC for + * it to run. Make sure only 2 are killed. + * @throws Exception + */ + @Test + public void testKillOnlyRequiredOpportunisticContainers() throws Exception { + containerManager.start(); + + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + + List<StartContainerRequest> list = new ArrayList<>(); + // Fill NM with Opportunistic containers + for (int i = 0; i < 4; i++) { + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(i), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(512, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + } + + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + + list = new ArrayList<>(); + // Now ask for two Guaranteed containers + for (int i = 4; i < 6; i++) { + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(i), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(512, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.GUARANTEED))); + } + + allRequests = StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + + BaseContainerManagerTest.waitForNMContainerState(containerManager, + createContainerId(0), ContainerState.DONE, 40); + Thread.sleep(5000); + + // Get container statuses. Container 0 should be killed, container 1 + // should be queued and container 2 should be running. + int killedContainers = 0; + List<ContainerId> statList = new ArrayList<ContainerId>(); + for (int i = 0; i < 6; i++) { + statList.add(createContainerId(i)); + } + GetContainerStatusesRequest statRequest = + GetContainerStatusesRequest.newInstance(statList); + List<ContainerStatus> containerStatuses = containerManager + .getContainerStatuses(statRequest).getContainerStatuses(); + for (ContainerStatus status : containerStatuses) { + if (status.getDiagnostics().contains( + "Container Killed to make room for Guaranteed Container")) { + killedContainers++; + } + System.out.println("\nStatus : [" + status + "]\n"); + } + + Assert.assertEquals(2, killedContainers); + } + + /** + * Start running one GUARANTEED container and queue two OPPORTUNISTIC ones. + * Try killing one of the two queued containers. + * @throws Exception + */ + @Test + public void testStopQueuedContainer() throws Exception { + containerManager.start(); + + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + + List<StartContainerRequest> list = new ArrayList<>(); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(2048, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.GUARANTEED))); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(512, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(2), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(512, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + + Thread.sleep(2000); + + // Assert there is initially one container running and two queued. + int runningContainersNo = 0; + int queuedContainersNo = 0; + List<ContainerId> statList = new ArrayList<ContainerId>(); + for (int i = 0; i < 3; i++) { + statList.add(createContainerId(i)); + } + GetContainerStatusesRequest statRequest = GetContainerStatusesRequest + .newInstance(statList); + List<ContainerStatus> containerStatuses = containerManager + .getContainerStatuses(statRequest).getContainerStatuses(); + for (ContainerStatus status : containerStatuses) { + if (status.getState() == + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING) { + runningContainersNo++; + } else if (status.getState() == + org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED) { + queuedContainersNo++; + } + System.out.println("\nStatus : [" + status + "]\n"); + } + + Assert.assertEquals(1, runningContainersNo); + Assert.assertEquals(2, queuedContainersNo); + + // Stop one of the two queued containers. + StopContainersRequest stopRequest = StopContainersRequest. + newInstance(Arrays.asList(createContainerId(1))); + containerManager.stopContainers(stopRequest); + + Thread.sleep(2000); + + // Assert queued container got properly stopped. + statList.clear(); + for (int i = 0; i < 3; i++) { + statList.add(createContainerId(i)); + } + + statRequest = GetContainerStatusesRequest.newInstance(statList); + HashMap<org.apache.hadoop.yarn.api.records.ContainerState, ContainerStatus> + map = new HashMap<>(); + for (int i=0; i < 10; i++) { + containerStatuses = containerManager.getContainerStatuses(statRequest) + .getContainerStatuses(); + for (ContainerStatus status : containerStatuses) { + System.out.println("\nStatus : [" + status + "]\n"); + map.put(status.getState(), status); + if (map.containsKey( + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING) && + map.containsKey( + org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED) && + map.containsKey( + org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE)) { + break; + } + Thread.sleep(1000); + } + } + Assert.assertEquals(createContainerId(0), + map.get(org.apache.hadoop.yarn.api.records.ContainerState.RUNNING) + .getContainerId()); + Assert.assertEquals(createContainerId(1), + map.get(org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE) + .getContainerId()); + Assert.assertEquals(createContainerId(2), + map.get(org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED) + .getContainerId()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java index 164488d..686a0d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java @@ -215,4 +215,19 @@ public class MockContainer implements Container { public void commitUpgrade() { } + + @Override + public boolean isMarkedForKilling() { + return false; + } + + @Override + public void sendLaunchEvent() { + + } + + @Override + public void sendKillEvent(int exitStatus, String description) { + + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java index 50a9c4d..d3db96f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java @@ -313,9 +313,11 @@ public class OpportunisticContainerAllocatorAMService appAttempt.getApplicationAttemptId(), container.getNodeId(), appAttempt.getUser(), rmContext, isRemotelyAllocated); appAttempt.addRMContainer(container.getId(), rmContainer); + ((AbstractYarnScheduler) rmContext.getScheduler()).getNode( + container.getNodeId()).allocateContainer(rmContainer); rmContainer.handle( new RMContainerEvent(container.getId(), - RMContainerEventType.LAUNCHED)); + RMContainerEventType.ACQUIRED)); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index f5d8b5b..0cfa1d3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -80,8 +80,8 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> { RMContainerEventType.KILL) .addTransition(RMContainerState.NEW, RMContainerState.RESERVED, RMContainerEventType.RESERVED, new ContainerReservedTransition()) - .addTransition(RMContainerState.NEW, RMContainerState.RUNNING, - RMContainerEventType.LAUNCHED) + .addTransition(RMContainerState.NEW, RMContainerState.ACQUIRED, + RMContainerEventType.ACQUIRED, new AcquiredTransition()) .addTransition(RMContainerState.NEW, EnumSet.of(RMContainerState.RUNNING, RMContainerState.COMPLETED), RMContainerEventType.RECOVER, new ContainerRecoveredTransition()) http://git-wip-us.apache.org/repos/asf/hadoop/blob/3219b7b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 302743a..ab13000 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -1394,32 +1394,26 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { } // Process running containers - if (remoteContainer.getState() == ContainerState.RUNNING) { - // Process only GUARANTEED containers in the RM. - if (remoteContainer.getExecutionType() == ExecutionType.GUARANTEED) { - ++numRemoteRunningContainers; - if (!launchedContainers.contains(containerId)) { - // Just launched container. RM knows about it the first time. - launchedContainers.add(containerId); - newlyLaunchedContainers.add(remoteContainer); - // Unregister from containerAllocationExpirer. - containerAllocationExpirer - .unregister(new AllocationExpirationInfo(containerId)); - } - } - } else { - if (remoteContainer.getExecutionType() == ExecutionType.GUARANTEED) { - // A finished container - launchedContainers.remove(containerId); + if (remoteContainer.getState() == ContainerState.RUNNING || + remoteContainer.getState() == ContainerState.SCHEDULED) { + ++numRemoteRunningContainers; + if (!launchedContainers.contains(containerId)) { + // Just launched container. RM knows about it the first time. + launchedContainers.add(containerId); + newlyLaunchedContainers.add(remoteContainer); // Unregister from containerAllocationExpirer. containerAllocationExpirer .unregister(new AllocationExpirationInfo(containerId)); } - // Completed containers should also include the OPPORTUNISTIC containers - // so that the AM gets properly notified. + } else { + // A finished container + launchedContainers.remove(containerId); if (completedContainers.add(containerId)) { newlyCompletedContainers.add(remoteContainer); } + // Unregister from containerAllocationExpirer. + containerAllocationExpirer + .unregister(new AllocationExpirationInfo(containerId)); } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org