YARN-8511. When AM releases a container, RM removes allocation tags before it is released by NM. (Weiwei Yang via wangda)
Change-Id: I6f9f409f2ef685b405cbff547dea9623bf3322d9 Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/752dcce5 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/752dcce5 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/752dcce5 Branch: refs/heads/HDFS-13572 Commit: 752dcce5f4cf0f6ebcb40a61f622f1a885c4bda7 Parents: 88b2794 Author: Wangda Tan <wan...@apache.org> Authored: Mon Jul 16 10:54:41 2018 -0700 Committer: Wangda Tan <wan...@apache.org> Committed: Mon Jul 16 10:54:41 2018 -0700 ---------------------------------------------------------------------- .../hadoop/yarn/sls/nodemanager/NodeInfo.java | 6 ++ .../yarn/sls/scheduler/RMNodeWrapper.java | 6 ++ .../rmcontainer/RMContainerImpl.java | 5 - .../server/resourcemanager/rmnode/RMNode.java | 6 ++ .../resourcemanager/rmnode/RMNodeImpl.java | 5 + .../scheduler/SchedulerNode.java | 15 +++ .../yarn/server/resourcemanager/MockNodes.java | 5 + .../rmcontainer/TestRMContainerImpl.java | 16 ++- .../scheduler/TestAbstractYarnScheduler.java | 104 +++++++++++++++++++ 9 files changed, 162 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/752dcce5/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index 0c99139..69946c8 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode @@ -219,6 +220,11 @@ public class NodeInfo { } @Override + public RMContext getRMContext() { + return null; + } + + @Override public Resource getPhysicalResource() { return null; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/752dcce5/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index 78645e9..a96b790 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode @@ -207,6 +208,11 @@ public class RMNodeWrapper implements RMNode { } @Override + public RMContext getRMContext() { + return node.getRMContext(); + } + + @Override public Resource getPhysicalResource() { return null; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/752dcce5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index b5c8e7c..efac666 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -701,11 +701,6 @@ public class RMContainerImpl implements RMContainer { @Override public void transition(RMContainerImpl container, RMContainerEvent event) { - // Notify AllocationTagsManager - container.rmContext.getAllocationTagsManager().removeContainer( - container.getNodeId(), container.getContainerId(), - container.getAllocationTags()); - RMContainerFinishedEvent finishedEvent = (RMContainerFinishedEvent) event; container.finishTime = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/752dcce5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index 872f2a6..68a780e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; /** * Node managers information on available resources @@ -189,4 +190,9 @@ public interface RMNode { * @return a map of each allocation tag and its count. */ Map<String, Long> getAllocationTagsWithCount(); + + /** + * @return the RM context associated with this RM node. + */ + RMContext getRMContext(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/752dcce5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index b942afa..dfd93e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -1541,4 +1541,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { return context.getAllocationTagsManager() .getAllocationTagsWithCount(getNodeID()); } + + @Override + public RMContext getRMContext() { + return this.context; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/752dcce5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index d5bfc57..59771fd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; @@ -74,6 +75,7 @@ public abstract class SchedulerNode { private final RMNode rmNode; private final String nodeName; + private final RMContext rmContext; private volatile Set<String> labels = null; @@ -83,6 +85,7 @@ public abstract class SchedulerNode { public SchedulerNode(RMNode node, boolean usePortForNodeName, Set<String> labels) { this.rmNode = node; + this.rmContext = node.getRMContext(); this.unallocatedResource = Resources.clone(node.getTotalCapability()); this.totalResource = Resources.clone(node.getTotalCapability()); if (usePortForNodeName) { @@ -242,6 +245,18 @@ public abstract class SchedulerNode { launchedContainers.remove(containerId); Container container = info.container.getContainer(); + + // We remove allocation tags when a container is actually + // released on NM. This is to avoid running into situation + // when AM releases a container and NM has some delay to + // actually release it, then the tag can still be visible + // at RM so that RM can respect it during scheduling new containers. + if (rmContext != null && rmContext.getAllocationTagsManager() != null) { + rmContext.getAllocationTagsManager() + .removeContainer(container.getNodeId(), + container.getId(), container.getAllocationTags()); + } + updateResourceForReleasedContainer(container); if (LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/752dcce5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 84105d9..9041132 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -286,6 +286,11 @@ public class MockNodes { } @Override + public RMContext getRMContext() { + return null; + } + + @Override public Resource getPhysicalResource() { return this.physicalResource; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/752dcce5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java index 7a930cd..1115e8c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java @@ -60,10 +60,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTags; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; @@ -401,6 +405,7 @@ public class TestRMContainerImpl { Container container = BuilderUtils.newContainer(containerId, nodeId, "host:3465", resource, priority, null); + container.setAllocationTags(ImmutableSet.of("mapper")); ConcurrentMap<ApplicationId, RMApp> rmApps = spy(new ConcurrentHashMap<ApplicationId, RMApp>()); RMApp rmApp = mock(RMApp.class); @@ -423,11 +428,14 @@ public class TestRMContainerImpl { true); when(rmContext.getYarnConfiguration()).thenReturn(conf); + RMNode rmNode = new RMNodeImpl(nodeId, rmContext, + "localhost", 0, 0, null, Resource.newInstance(10240, 10), null); + SchedulerNode schedulerNode = new FiCaSchedulerNode(rmNode, false); + /* First container: ALLOCATED -> KILLED */ RMContainerImpl rmContainer = new RMContainerImpl(container, SchedulerRequestKey.extractFrom(container), appAttemptId, nodeId, "user", rmContext); - rmContainer.setAllocationTags(ImmutableSet.of("mapper")); Assert.assertEquals(0, tagsManager.getNodeCardinalityByOp(nodeId, @@ -437,6 +445,7 @@ public class TestRMContainerImpl { rmContainer.handle(new RMContainerEvent(containerId, RMContainerEventType.START)); + schedulerNode.allocateContainer(rmContainer); Assert.assertEquals(1, tagsManager.getNodeCardinalityByOp(nodeId, @@ -446,6 +455,7 @@ public class TestRMContainerImpl { rmContainer.handle(new RMContainerFinishedEvent(containerId, ContainerStatus .newInstance(containerId, ContainerState.COMPLETE, "", 0), RMContainerEventType.KILL)); + schedulerNode.releaseContainer(container.getId(), true); Assert.assertEquals(0, tagsManager.getNodeCardinalityByOp(nodeId, @@ -465,6 +475,7 @@ public class TestRMContainerImpl { rmContainer.setAllocationTags(ImmutableSet.of("mapper")); rmContainer.handle(new RMContainerEvent(containerId, RMContainerEventType.START)); + schedulerNode.allocateContainer(rmContainer); Assert.assertEquals(1, tagsManager.getNodeCardinalityByOp(nodeId, @@ -477,6 +488,7 @@ public class TestRMContainerImpl { rmContainer.handle(new RMContainerFinishedEvent(containerId, ContainerStatus .newInstance(containerId, ContainerState.COMPLETE, "", 0), RMContainerEventType.FINISHED)); + schedulerNode.releaseContainer(container.getId(), true); Assert.assertEquals(0, tagsManager.getNodeCardinalityByOp(nodeId, @@ -496,6 +508,7 @@ public class TestRMContainerImpl { rmContainer.handle(new RMContainerEvent(containerId, RMContainerEventType.START)); + schedulerNode.allocateContainer(rmContainer); Assert.assertEquals(1, tagsManager.getNodeCardinalityByOp(nodeId, @@ -511,6 +524,7 @@ public class TestRMContainerImpl { rmContainer.handle(new RMContainerFinishedEvent(containerId, ContainerStatus .newInstance(containerId, ContainerState.COMPLETE, "", 0), RMContainerEventType.FINISHED)); + schedulerNode.releaseContainer(container.getId(), true); Assert.assertEquals(0, tagsManager.getNodeCardinalityByOp(nodeId, http://git-wip-us.apache.org/repos/asf/hadoop/blob/752dcce5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java index c0f8d39..ba409b1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java @@ -27,9 +27,16 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.Service; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; @@ -416,6 +423,103 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase { } } + @Test(timeout = 30000l) + public void testContainerReleaseWithAllocationTags() throws Exception { + // Currently only can be tested against capacity scheduler. + if (getSchedulerType().equals(SchedulerType.CAPACITY)) { + final String testTag1 = "some-tag"; + final String testTag2 = "some-other-tag"; + YarnConfiguration conf = getConf(); + conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, "scheduler"); + MockRM rm1 = new MockRM(conf); + rm1.start(); + MockNM nm1 = new MockNM("127.0.0.1:1234", + 10240, rm1.getResourceTrackerService()); + nm1.registerNode(); + RMApp app1 = + rm1.submitApp(200, "name", "user", new HashMap<>(), false, "default", + -1, null, "Test", false, true); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // allocate 1 container with tag1 + SchedulingRequest sr = SchedulingRequest + .newInstance(1l, Priority.newInstance(1), + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED), + Sets.newHashSet(testTag1), + ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)), + null); + + // allocate 3 containers with tag2 + SchedulingRequest sr1 = SchedulingRequest + .newInstance(2l, Priority.newInstance(1), + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED), + Sets.newHashSet(testTag2), + ResourceSizing.newInstance(3, Resource.newInstance(1024, 1)), + null); + + AllocateRequest ar = AllocateRequest.newBuilder() + .schedulingRequests(Lists.newArrayList(sr, sr1)).build(); + am1.allocate(ar); + nm1.nodeHeartbeat(true); + + List<Container> allocated = new ArrayList<>(); + while (allocated.size() < 4) { + AllocateResponse rsp = am1 + .allocate(new ArrayList<>(), new ArrayList<>()); + allocated.addAll(rsp.getAllocatedContainers()); + nm1.nodeHeartbeat(true); + Thread.sleep(1000); + } + + Assert.assertEquals(4, allocated.size()); + + Set<Container> containers = allocated.stream() + .filter(container -> container.getAllocationRequestId() == 1l) + .collect(Collectors.toSet()); + Assert.assertNotNull(containers); + Assert.assertEquals(1, containers.size()); + ContainerId cid = containers.iterator().next().getId(); + + // mock container start + rm1.getRMContext().getScheduler() + .getSchedulerNode(nm1.getNodeId()).containerStarted(cid); + + // verifies the allocation is made with correct number of tags + Map<String, Long> nodeTags = rm1.getRMContext() + .getAllocationTagsManager() + .getAllocationTagsWithCount(nm1.getNodeId()); + Assert.assertNotNull(nodeTags.get(testTag1)); + Assert.assertEquals(1, nodeTags.get(testTag1).intValue()); + + // release a container + am1.allocate(new ArrayList<>(), Lists.newArrayList(cid)); + + // before NM confirms, the tag should still exist + nodeTags = rm1.getRMContext().getAllocationTagsManager() + .getAllocationTagsWithCount(nm1.getNodeId()); + Assert.assertNotNull(nodeTags); + Assert.assertNotNull(nodeTags.get(testTag1)); + Assert.assertEquals(1, nodeTags.get(testTag1).intValue()); + + // NM reports back that container is released + // RM should cleanup the tag + ContainerStatus cs = ContainerStatus.newInstance(cid, + ContainerState.COMPLETE, "", 0); + nm1.nodeHeartbeat(Lists.newArrayList(cs), true); + + // Wait on condition + // 1) tag1 doesn't exist anymore + // 2) num of tag2 is still 3 + GenericTestUtils.waitFor(() -> { + Map<String, Long> tags = rm1.getRMContext() + .getAllocationTagsManager() + .getAllocationTagsWithCount(nm1.getNodeId()); + return tags.get(testTag1) == null && + tags.get(testTag2).intValue() == 3; + }, 500, 3000); + } + } + @Test(timeout=60000) public void testContainerReleasedByNode() throws Exception { System.out.println("Starting testContainerReleasedByNode"); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org