http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceUsageMultiNodeLookupPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceUsageMultiNodeLookupPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceUsageMultiNodeLookupPolicy.java new file mode 100644 index 0000000..d765af8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceUsageMultiNodeLookupPolicy.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +import java.util.Comparator; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListSet; + +/** + * <p> + * This class has the following functionality: + * + * <p> + * ResourceUsageMultiNodeLookupPolicy holds sorted nodes list based on the + * resource usage of nodes at given time. + * </p> + */ +public class ResourceUsageMultiNodeLookupPolicy<N extends SchedulerNode> + implements MultiNodeLookupPolicy<N> { + + protected Map<String, Set<N>> nodesPerPartition = new ConcurrentHashMap<>(); + protected Comparator<N> comparator; + + public ResourceUsageMultiNodeLookupPolicy() { + this.comparator = new Comparator<N>() { + @Override + public int compare(N o1, N o2) { + int allocatedDiff = o1.getAllocatedResource() + .compareTo(o2.getAllocatedResource()); + if (allocatedDiff == 0) { + return o1.getNodeID().compareTo(o2.getNodeID()); + } + return allocatedDiff; + } + }; + } + + @Override + public Iterator<N> getPreferredNodeIterator(Collection<N> nodes, + String partition) { + return getNodesPerPartition(partition).iterator(); + } + + @Override + public void addAndRefreshNodesSet(Collection<N> nodes, + String partition) { + Set<N> nodeList = new ConcurrentSkipListSet<N>(comparator); + nodeList.addAll(nodes); + nodesPerPartition.put(partition, Collections.unmodifiableSet(nodeList)); + } + + @Override + public Set<N> getNodesPerPartition(String partition) { + return nodesPerPartition.getOrDefault(partition, Collections.emptySet()); + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java index eef86a4..09d3327 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -295,6 +296,8 @@ public class ReservationSystemTestUtil { }); mockRmContext.setNodeLabelManager(nlm); + mockRmContext + .setMultiNodeSortingManager(mock(MultiNodeSortingManager.class)); return mockRmContext; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java index b7b0eb7..df8309b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; @@ -118,7 +119,7 @@ public class TestAppSchedulingInfo { doReturn(mock(QueueMetrics.class)).when(queue).getMetrics(); AppSchedulingInfo info = new AppSchedulingInfo( appAttemptId, "test", queue, mock(ActiveUsersManager.class), 0, - new ResourceUsage(), new HashMap<>(), null); + new ResourceUsage(), new HashMap<>(), mock(RMContext.class)); Assert.assertEquals(0, info.getSchedulerKeys().size()); Priority pri1 = Priority.newInstance(1); http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java index 5cea3a2..60e25ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import com.google.common.collect.Sets; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.junit.Assert; import java.util.Set; @@ -76,4 +77,16 @@ public class CapacitySchedulerTestBase { .getPending(label == null ? RMNodeLabelsManager.NO_LABEL : label) .getMemorySize() > 0); } + + protected void waitforNMRegistered(ResourceScheduler scheduler, int nodecount, + int timesec) throws InterruptedException { + long start = System.currentTimeMillis(); + while (System.currentTimeMillis() - start < timesec * 1000) { + if (scheduler.getNumClusterNodes() < nodecount) { + Thread.sleep(100); + } else { + break; + } + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 8d948b5..e77d8e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -106,8 +106,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyConta import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; -import org.apache.hadoop.yarn.server.resourcemanager.placement - .UserGroupMappingPlacementRule; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; @@ -172,7 +170,6 @@ import org.mockito.Mockito; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Sets; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -4871,18 +4868,6 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { return cs; } - private void waitforNMRegistered(ResourceScheduler scheduler, int nodecount, - int timesec) throws InterruptedException { - long start = System.currentTimeMillis(); - while (System.currentTimeMillis() - start < timesec * 1000) { - if (scheduler.getNumClusterNodes() < nodecount) { - Thread.sleep(100); - } else { - break; - } - } - } - @Test (timeout = 60000) public void testClearRequestsBeforeApplyTheProposal() throws Exception { http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java new file mode 100644 index 0000000..c90af94 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java @@ -0,0 +1,166 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSorter; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Test class for Multi Node scheduling related tests. + */ +public class TestCapacitySchedulerMultiNodes extends CapacitySchedulerTestBase { + + private static final Log LOG = LogFactory + .getLog(TestCapacitySchedulerMultiNodes.class); + private CapacitySchedulerConfiguration conf; + private static final String POLICY_CLASS_NAME = + "org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceUsageMultiNodeLookupPolicy"; + + @Before + public void setUp() { + CapacitySchedulerConfiguration config = + new CapacitySchedulerConfiguration(); + config.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, + DominantResourceCalculator.class.getName()); + conf = new CapacitySchedulerConfiguration(config); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICIES, + "resource-based"); + conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME, + "resource-based"); + String policyName = + CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME + + ".resource-based" + ".class"; + conf.set(policyName, POLICY_CLASS_NAME); + conf.setBoolean(CapacitySchedulerConfiguration.MULTI_NODE_PLACEMENT_ENABLED, + true); + conf.setInt("yarn.scheduler.minimum-allocation-mb", 512); + conf.setInt("yarn.scheduler.minimum-allocation-vcores", 1); + } + + @Test + public void testMultiNodeSorterForScheduling() throws Exception { + MockRM rm = new MockRM(conf); + rm.start(); + rm.registerNode("127.0.0.1:1234", 10 * GB); + rm.registerNode("127.0.0.1:1235", 10 * GB); + rm.registerNode("127.0.0.1:1236", 10 * GB); + rm.registerNode("127.0.0.1:1237", 10 * GB); + ResourceScheduler scheduler = rm.getRMContext().getScheduler(); + waitforNMRegistered(scheduler, 4, 5); + MultiNodeSortingManager<SchedulerNode> mns = rm.getRMContext() + .getMultiNodeSortingManager(); + MultiNodeSorter<SchedulerNode> sorter = mns + .getMultiNodePolicy(POLICY_CLASS_NAME); + sorter.reSortClusterNodes(); + Set<SchedulerNode> nodes = sorter.getMultiNodeLookupPolicy() + .getNodesPerPartition(""); + Assert.assertEquals(4, nodes.size()); + rm.stop(); + } + + @Test + public void testMultiNodeSorterForSchedulingWithOrdering() throws Exception { + MockRM rm = new MockRM(conf); + rm.start(); + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10 * GB, 10); + MockNM nm2 = rm.registerNode("127.0.0.2:1235", 10 * GB, 10); + MockNM nm3 = rm.registerNode("127.0.0.3:1236", 10 * GB, 10); + MockNM nm4 = rm.registerNode("127.0.0.4:1237", 10 * GB, 10); + ResourceScheduler scheduler = rm.getRMContext().getScheduler(); + waitforNMRegistered(scheduler, 4, 5); + + MultiNodeSortingManager<SchedulerNode> mns = rm.getRMContext() + .getMultiNodeSortingManager(); + MultiNodeSorter<SchedulerNode> sorter = mns + .getMultiNodePolicy(POLICY_CLASS_NAME); + sorter.reSortClusterNodes(); + + Set<SchedulerNode> nodes = sorter.getMultiNodeLookupPolicy() + .getNodesPerPartition(""); + Assert.assertEquals(4, nodes.size()); + + RMApp app1 = rm.submitApp(2048, "app-1", "user1", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); + SchedulerNodeReport reportNm1 = + rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); + + // check node report + Assert.assertEquals(2 * GB, reportNm1.getUsedResource().getMemorySize()); + Assert.assertEquals(8 * GB, + reportNm1.getAvailableResource().getMemorySize()); + + // Ideally thread will invoke this, but thread operates every 1sec. + // Hence forcefully recompute nodes. + sorter.reSortClusterNodes(); + + RMApp app2 = rm.submitApp(1024, "app-2", "user2", null, "default"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2); + SchedulerNodeReport reportNm2 = + rm.getResourceScheduler().getNodeReport(nm2.getNodeId()); + + // check node report + Assert.assertEquals(1 * GB, reportNm2.getUsedResource().getMemorySize()); + Assert.assertEquals(9 * GB, + reportNm2.getAvailableResource().getMemorySize()); + + // Ideally thread will invoke this, but thread operates every 1sec. + // Hence forcefully recompute nodes. + sorter.reSortClusterNodes(); + + // Node1 and Node2 are now having used resources. Hence ensure these 2 comes + // latter in the list. + nodes = sorter.getMultiNodeLookupPolicy() + .getNodesPerPartition(""); + List<NodeId> currentNodes = new ArrayList<>(); + currentNodes.add(nm3.getNodeId()); + currentNodes.add(nm4.getNodeId()); + currentNodes.add(nm2.getNodeId()); + currentNodes.add(nm1.getNodeId()); + Iterator<SchedulerNode> it = nodes.iterator(); + SchedulerNode current; + int i = 0; + while (it.hasNext()) { + current = it.next(); + Assert.assertEquals(current.getNodeID(), currentNodes.get(i++)); + } + rm.stop(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java index b4ebd15..e239191 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java @@ -817,4 +817,74 @@ public class TestCapacitySchedulerNodeLabelUpdate { } return memorySize; } + + private long waitForNodeLabelSchedulerEventUpdate(MockRM rm, String partition, + long expectedNodeCount, long timeout) throws InterruptedException { + long start = System.currentTimeMillis(); + long size = 0; + while (System.currentTimeMillis() - start < timeout) { + CapacityScheduler scheduler = (CapacityScheduler) rm + .getResourceScheduler(); + size = scheduler.getNodeTracker().getNodesPerPartition(partition).size(); + if (size == expectedNodeCount) { + return size; + } + Thread.sleep(100); + } + return size; + } + + @Test + public void testNodeCountBasedOnNodeLabelsFromClusterNodeTracker() + throws Exception { + // set node -> label + mgr.addToCluserNodeLabelsWithDefaultExclusivity( + ImmutableSet.of("x", "y", "z")); + + // set mapping: + // h1 -> x + // h2 -> y + mgr.addLabelsToNode( + ImmutableMap.of(NodeId.newInstance("h1", 1234), toSet("x"))); + mgr.addLabelsToNode( + ImmutableMap.of(NodeId.newInstance("h2", 1234), toSet("x"))); + + // inject node label manager + MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm.getRMContext().setNodeLabelManager(mgr); + rm.start(); + MockNM nm1 = rm.registerNode("h1:1234", 8000); + rm.registerNode("h2:1234", 8000); + rm.registerNode("h3:1234", 8000); + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + // Ensure that cluster node tracker is updated with correct set of node + // after Node registration. + Assert.assertEquals(2, + cs.getNodeTracker().getNodesPerPartition("x").size()); + Assert.assertEquals(1, cs.getNodeTracker().getNodesPerPartition("").size()); + + rm.unRegisterNode(nm1); + rm.registerNode("h4:1234", 8000); + + // Ensure that cluster node tracker is updated with correct set of node + // after new Node registration and old node label change. + Assert.assertEquals(1, + cs.getNodeTracker().getNodesPerPartition("x").size()); + Assert.assertEquals(2, cs.getNodeTracker().getNodesPerPartition("").size()); + + mgr.replaceLabelsOnNode( + ImmutableMap.of(NodeId.newInstance("h2", 1234), toSet(""))); + + // Last node with label x is replaced by CLI or REST. + Assert.assertEquals(0, + waitForNodeLabelSchedulerEventUpdate(rm, "x", 0, 3000L)); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org