http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java new file mode 100644 index 0000000..07d1eef --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.junit.Assert; +import org.junit.Test; + +public class TestProportionalCapacityPreemptionPolicyMockFramework + extends ProportionalCapacityPreemptionPolicyMockFramework { + + @Test + public void testBuilder() throws Exception { + /** + * Test of test, make sure we build expected mock schedulable objects + */ + String labelsConfig = + "=200,true;" + // default partition + "red=100,false;" + // partition=red + "blue=200,true"; // partition=blue + String nodesConfig = + "n1=red;" + // n1 has partition=red + "n2=blue;" + // n2 has partition=blue + "n3="; // n3 doesn't have partition + String queuesConfig = + // guaranteed,max,used,pending + "root(=[200 200 100 100],red=[100 100 100 100],blue=[200 200 200 200]);" + //root + "-a(=[100 200 100 100],red=[0 0 0 0],blue=[200 200 200 200]);" + // a + "--a1(=[50 100 50 100],red=[0 0 0 0],blue=[100 200 200 0]);" + // a1 + "--a2(=[50 200 50 0],red=[0 0 0 0],blue=[100 200 0 200]);" + // a2 + "-b(=[100 200 0 0],red=[100 100 100 100],blue=[0 0 0 0])"; + String appsConfig= + //queueName\t(priority,resource,host,expression,#repeat,reserved) + // app1 in a1, , 50 in n2 (reserved), 50 in n2 (allocated) + "a1\t" // app1 in a1 + + "(1,1,n3,red,50,false);" + // 50 * default in n3 + + "a1\t" // app2 in a1 + + "(2,1,n2,,50,true)(2,1,n2,,50,false)" // 50 * ignore-exclusivity (reserved), + // 50 * ignore-exclusivity (allocated) + + "(2,1,n2,blue,50,true)(2,1,n2,blue,50,true);" + // 50 in n2 (reserved), + // 50 in n2 (allocated) + "a2\t" // app3 in a2 + + "(1,1,n3,red,50,false);" + // 50 * default in n3 + + "b\t" // app4 in b + + "(1,1,n1,red,100,false);"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + + // Check queues: + // root + checkAbsCapacities(cs.getQueue("root"), "", 1f, 1f, 0.5f); + checkPendingResource(cs.getQueue("root"), "", 100); + checkAbsCapacities(cs.getQueue("root"), "red", 1f, 1f, 1f); + checkPendingResource(cs.getQueue("root"), "red", 100); + checkAbsCapacities(cs.getQueue("root"), "blue", 1f, 1f, 1f); + checkPendingResource(cs.getQueue("root"), "blue", 200); + + // a + checkAbsCapacities(cs.getQueue("a"), "", 0.5f, 1f, 0.5f); + checkPendingResource(cs.getQueue("a"), "", 100); + checkAbsCapacities(cs.getQueue("a"), "red", 0f, 0f, 0f); + checkPendingResource(cs.getQueue("a"), "red", 0); + checkAbsCapacities(cs.getQueue("a"), "blue", 1f, 1f, 1f); + checkPendingResource(cs.getQueue("a"), "blue", 200); + + // a1 + checkAbsCapacities(cs.getQueue("a1"), "", 0.25f, 0.5f, 0.25f); + checkPendingResource(cs.getQueue("a1"), "", 100); + checkAbsCapacities(cs.getQueue("a1"), "red", 0f, 0f, 0f); + checkPendingResource(cs.getQueue("a1"), "red", 0); + checkAbsCapacities(cs.getQueue("a1"), "blue", 0.5f, 1f, 1f); + checkPendingResource(cs.getQueue("a1"), "blue", 0); + + // a2 + checkAbsCapacities(cs.getQueue("a2"), "", 0.25f, 1f, 0.25f); + checkPendingResource(cs.getQueue("a2"), "", 0); + checkAbsCapacities(cs.getQueue("a2"), "red", 0f, 0f, 0f); + checkPendingResource(cs.getQueue("a2"), "red", 0); + checkAbsCapacities(cs.getQueue("a2"), "blue", 0.5f, 1f, 0f); + checkPendingResource(cs.getQueue("a2"), "blue", 200); + + // b1 + checkAbsCapacities(cs.getQueue("b"), "", 0.5f, 1f, 0f); + checkPendingResource(cs.getQueue("b"), "", 0); + checkAbsCapacities(cs.getQueue("b"), "red", 1f, 1f, 1f); + checkPendingResource(cs.getQueue("b"), "red", 100); + checkAbsCapacities(cs.getQueue("b"), "blue", 0f, 0f, 0f); + checkPendingResource(cs.getQueue("b"), "blue", 0); + + // Check ignored partitioned containers in queue + Assert.assertEquals(100, ((LeafQueue) cs.getQueue("a1")) + .getIgnoreExclusivityRMContainers().get("blue").size()); + + // Check applications + Assert.assertEquals(2, ((LeafQueue)cs.getQueue("a1")).getApplications().size()); + Assert.assertEquals(1, ((LeafQueue)cs.getQueue("a2")).getApplications().size()); + Assert.assertEquals(1, ((LeafQueue)cs.getQueue("b")).getApplications().size()); + + // Check #containers + FiCaSchedulerApp app1 = getApp("a1", 1); + FiCaSchedulerApp app2 = getApp("a1", 2); + FiCaSchedulerApp app3 = getApp("a2", 3); + FiCaSchedulerApp app4 = getApp("b", 4); + + Assert.assertEquals(50, app1.getLiveContainers().size()); + checkContainerNodesInApp(app1, 50, "n3"); + + Assert.assertEquals(50, app2.getLiveContainers().size()); + Assert.assertEquals(150, app2.getReservedContainers().size()); + checkContainerNodesInApp(app2, 200, "n2"); + + Assert.assertEquals(50, app3.getLiveContainers().size()); + checkContainerNodesInApp(app3, 50, "n3"); + + Assert.assertEquals(100, app4.getLiveContainers().size()); + checkContainerNodesInApp(app4, 100, "n1"); + } + + @Test + public void testBuilderWithReservedResource() throws Exception { + String labelsConfig = + "=200,true;" + // default partition + "red=100,false;" + // partition=red + "blue=200,true"; // partition=blue + String nodesConfig = + "n1=red;" + // n1 has partition=red + "n2=blue;" + // n2 has partition=blue + "n3="; // n3 doesn't have partition + String queuesConfig = + // guaranteed,max,used,pending + "root(=[200 200 100 100 100],red=[100 100 100 100 90],blue=[200 200 200 200 80]);" + //root + "-a(=[100 200 100 100 50],red=[0 0 0 0 40],blue=[200 200 200 200 30]);" + // a + "--a1(=[50 100 50 100 40],red=[0 0 0 0 20],blue=[100 200 200 0]);" + // a1 + "--a2(=[50 200 50 0 10],red=[0 0 0 0 20],blue=[100 200 0 200]);" + // a2 + "-b(=[100 200 0 0],red=[100 100 100 100],blue=[0 0 0 0])"; + String appsConfig= + //queueName\t(priority,resource,host,expression,#repeat,reserved) + // app1 in a1, , 50 in n2 (reserved), 50 in n2 (allocated) + "a1\t" // app1 in a1 + + "(1,1,n3,red,50,false);" + // 50 * default in n3 + + "a1\t" // app2 in a1 + + "(2,1,n2,,50,true)(2,1,n2,,50,false)" // 50 * ignore-exclusivity (reserved), + // 50 * ignore-exclusivity (allocated) + + "(2,1,n2,blue,50,true)(2,1,n2,blue,50,true);" + // 50 in n2 (reserved), + // 50 in n2 (allocated) + "a2\t" // app3 in a2 + + "(1,1,n3,red,50,false);" + // 50 * default in n3 + + "b\t" // app4 in b + + "(1,1,n1,red,100,false);"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + + // Check queues: + // root + checkReservedResource(cs.getQueue("root"), "", 100); + checkReservedResource(cs.getQueue("root"), "red", 90); + + // a + checkReservedResource(cs.getQueue("a"), "", 50); + checkReservedResource(cs.getQueue("a"), "red", 40); + + // a1 + checkReservedResource(cs.getQueue("a1"), "", 40); + checkReservedResource(cs.getQueue("a1"), "red", 20); + + // b + checkReservedResource(cs.getQueue("b"), "", 0); + checkReservedResource(cs.getQueue("b"), "red", 0); + } + + @Test + public void testBuilderWithSpecifiedNodeResources() throws Exception { + String labelsConfig = + "=200,true;" + // default partition + "red=100,false;" + // partition=red + "blue=200,true"; // partition=blue + String nodesConfig = + "n1=red res=100;" + // n1 has partition=red + "n2=blue;" + // n2 has partition=blue + "n3= res=30"; // n3 doesn't have partition + String queuesConfig = + // guaranteed,max,used,pending + "root(=[200 200 100 100 100],red=[100 100 100 100 90],blue=[200 200 200 200 80]);" + //root + "-a(=[100 200 100 100 50],red=[0 0 0 0 40],blue=[200 200 200 200 30]);" + // a + "--a1(=[50 100 50 100 40],red=[0 0 0 0 20],blue=[100 200 200 0]);" + // a1 + "--a2(=[50 200 50 0 10],red=[0 0 0 0 20],blue=[100 200 0 200]);" + // a2 + "-b(=[100 200 0 0],red=[100 100 100 100],blue=[0 0 0 0])"; + String appsConfig= + //queueName\t(priority,resource,host,expression,#repeat,reserved) + // app1 in a1, , 50 in n2 (reserved), 50 in n2 (allocated) + "a1\t" // app1 in a1 + + "(1,1,n3,red,50,false);" + // 50 * default in n3 + + "a1\t" // app2 in a1 + + "(2,1,n2,,50,true)(2,1,n2,,50,false)" // 50 * ignore-exclusivity (reserved), + // 50 * ignore-exclusivity (allocated) + + "(2,1,n2,blue,50,true)(2,1,n2,blue,50,true);" + // 50 in n2 (reserved), + // 50 in n2 (allocated) + "a2\t" // app3 in a2 + + "(1,1,n3,red,50,false);" + // 50 * default in n3 + + "b\t" // app4 in b + + "(1,1,n1,red,100,false);"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + + // Check host resources + Assert.assertEquals(3, this.cs.getAllNodes().size()); + SchedulerNode node1 = cs.getSchedulerNode(NodeId.newInstance("n1", 1)); + Assert.assertEquals(100, node1.getTotalResource().getMemorySize()); + Assert.assertEquals(100, node1.getCopiedListOfRunningContainers().size()); + Assert.assertNull(node1.getReservedContainer()); + + SchedulerNode node2 = cs.getSchedulerNode(NodeId.newInstance("n2", 1)); + Assert.assertEquals(0, node2.getTotalResource().getMemorySize()); + Assert.assertEquals(50, node2.getCopiedListOfRunningContainers().size()); + Assert.assertNotNull(node2.getReservedContainer()); + + SchedulerNode node3 = cs.getSchedulerNode(NodeId.newInstance("n3", 1)); + Assert.assertEquals(30, node3.getTotalResource().getMemorySize()); + Assert.assertEquals(100, node3.getCopiedListOfRunningContainers().size()); + Assert.assertNull(node3.getReservedContainer()); + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java index 88216f8..54166c0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.junit.After; @@ -144,7 +145,7 @@ public class TestSchedulerApplicationAttempt { private RMContainer createRMContainer(ApplicationAttemptId appAttId, int id, Resource resource) { ContainerId containerId = ContainerId.newContainerId(appAttId, id); - RMContainer rmContainer = mock(RMContainer.class); + RMContainer rmContainer = mock(RMContainerImpl.class); Container container = mock(Container.class); when(container.getResource()).thenReturn(resource); when(container.getNodeId()).thenReturn(nodeId); http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java new file mode 100644 index 0000000..bd9f615 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.Application; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.util.Clock; +import org.junit.Assert; +import org.junit.Before; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class CapacitySchedulerPreemptionTestBase { + + final int GB = 1024; + + Configuration conf; + + RMNodeLabelsManager mgr; + + Clock clock; + + @Before + void setUp() throws Exception { + conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); + conf.setClass(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, + ProportionalCapacityPreemptionPolicy.class, SchedulingEditPolicy.class); + conf = TestUtils.getConfigurationWithMultipleQueues(this.conf); + + // Set preemption related configurations + conf.setInt(CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, + 0); + conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND, + 1.0f); + conf.setFloat( + CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR, + 1.0f); + conf.setLong(CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL, + 60000L); + mgr = new NullRMNodeLabelsManager(); + mgr.init(this.conf); + clock = mock(Clock.class); + when(clock.getTime()).thenReturn(0L); + } + + SchedulingEditPolicy getSchedulingEditPolicy(MockRM rm) { + ResourceManager.RMActiveServices activeServices = rm.getRMActiveService(); + SchedulingMonitor mon = null; + for (Service service : activeServices.getServices()) { + if (service instanceof SchedulingMonitor) { + mon = (SchedulingMonitor) service; + break; + } + } + + if (mon != null) { + return mon.getSchedulingEditPolicy(); + } + return null; + } + + public void waitNumberOfLiveContainersFromApp(FiCaSchedulerApp app, + int expected) throws InterruptedException { + int waitNum = 0; + + while (waitNum < 10) { + System.out.println(app.getLiveContainers().size()); + if (app.getLiveContainers().size() == expected) { + return; + } + Thread.sleep(100); + waitNum++; + } + + Assert.fail(); + } + + public void waitNumberOfReservedContainersFromApp(FiCaSchedulerApp app, + int expected) throws InterruptedException { + int waitNum = 0; + + while (waitNum < 10) { + System.out.println(app.getReservedContainers().size()); + if (app.getReservedContainers().size() == expected) { + return; + } + Thread.sleep(100); + waitNum++; + } + + Assert.fail(); + } + + public void waitNumberOfLiveContainersOnNodeFromApp(FiCaSchedulerNode node, + ApplicationAttemptId appId, int expected) throws InterruptedException { + int waitNum = 0; + + while (waitNum < 500) { + int total = 0; + for (RMContainer c : node.getCopiedListOfRunningContainers()) { + if (c.getApplicationAttemptId().equals(appId)) { + total++; + } + } + if (total == expected) { + return; + } + Thread.sleep(10); + waitNum++; + } + + Assert.fail(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 6c9faf7..925e7f8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -3371,7 +3371,7 @@ public class TestCapacityScheduler { resourceManager .getResourceScheduler() .getSchedulerNode(resourceEvent.getNodeId()) - .setTotalResource(resourceEvent.getResourceOption().getResource()); + .updateTotalResource(resourceEvent.getResourceOption().getResource()); } } }); http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java new file mode 100644 index 0000000..e2d21c5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java @@ -0,0 +1,639 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import com.google.common.collect.Sets; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMActiveServices; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestCapacitySchedulerLazyPreemption + extends CapacitySchedulerPreemptionTestBase { + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + conf.setBoolean(CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED, + true); + } + + @Test (timeout = 60000) + public void testSimplePreemption() throws Exception { + /** + * Test case: Submit two application (app1/app2) to different queues, queue + * structure: + * + * <pre> + * Root + * / | \ + * a b c + * 10 20 70 + * </pre> + * + * 1) Two nodes in the cluster, each of them has 4G. + * + * 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no + * more resource available. + * + * 3) app2 submit to queue-c, ask for one 1G container (for AM) + * + * Now the cluster is fulfilled. + * + * 4) app2 asks for another 1G container, system will preempt one container + * from app1, and app2 will receive the preempted container + */ + MockRM rm1 = new MockRM(conf); + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + + MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB); + MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB); + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + am1.allocate("*", 1 * GB, 7, new ArrayList<ContainerId>()); + + // Do allocation 3 times for node1/node2 + for (int i = 0; i < 3; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + // App1 should have 7 containers now, and no available resource for cluster + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); + + // Submit app2 to queue-c and asks for a 1G container for AM + RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); + + // NM1/NM2 has available resource = 0G + Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId()) + .getAvailableResource().getMemorySize()); + Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId()) + .getAvailableResource().getMemorySize()); + + // AM asks for a 1 * GB container + am2.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.newInstance(1), ResourceRequest.ANY, + Resources.createResource(1 * GB), 1)), null); + + // Get edit policy and do one update + SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1); + + // Call edit schedule twice, and check if one container from app1 marked + // to be "killable" + editPolicy.editSchedule(); + editPolicy.editSchedule(); + + PreemptionManager pm = cs.getPreemptionManager(); + Map<ContainerId, RMContainer> killableContainers = + waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1); + Assert.assertEquals(1, killableContainers.size()); + Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey() + .getApplicationAttemptId(), am1.getApplicationAttemptId()); + + // Call CS.handle once to see if container preempted + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + + FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( + am2.getApplicationAttemptId()); + + // App1 has 6 containers, and app2 has 2 containers + Assert.assertEquals(6, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(2, schedulerApp2.getLiveContainers().size()); + + rm1.close(); + } + + @Test (timeout = 60000) + public void testPreemptionConsidersNodeLocalityDelay() + throws Exception { + /** + * Test case: same as testSimplePreemption steps 1-3. + * + * Step 4: app2 asks for 1G container with locality specified, so it needs + * to wait for missed-opportunity before get scheduled. + * Check if system waits missed-opportunity before finish killable container + */ + MockRM rm1 = new MockRM(conf); + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB); + MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB); + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>()); + + // Do allocation 3 times for node1/node2 + for (int i = 0; i < 3; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + // App1 should have 7 containers now, and no available resource for cluster + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); + + // Submit app2 to queue-c and asks for a 1G container for AM + RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); + + // NM1/NM2 has available resource = 0G + Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId()) + .getAvailableResource().getMemorySize()); + Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId()) + .getAvailableResource().getMemorySize()); + + // AM asks for a 1 * GB container with unknown host and unknown rack + am2.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.newInstance(1), ResourceRequest.ANY, + Resources.createResource(1 * GB), 1), ResourceRequest + .newInstance(Priority.newInstance(1), "unknownhost", + Resources.createResource(1 * GB), 1), ResourceRequest + .newInstance(Priority.newInstance(1), "/default-rack", + Resources.createResource(1 * GB), 1)), null); + + // Get edit policy and do one update + SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1); + + // Call edit schedule twice, and check if one container from app1 marked + // to be "killable" + editPolicy.editSchedule(); + editPolicy.editSchedule(); + + PreemptionManager pm = cs.getPreemptionManager(); + Map<ContainerId, RMContainer> killableContainers = + waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1); + Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey() + .getApplicationAttemptId(), am1.getApplicationAttemptId()); + + // Call CS.handle once to see if container preempted + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + + FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( + am2.getApplicationAttemptId()); + + // App1 has 7 containers, and app2 has 1 containers (no container preempted) + Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(1, schedulerApp2.getLiveContainers().size()); + + // Do allocation again, one container will be preempted + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + + // App1 has 6 containers, and app2 has 2 containers (new container allocated) + Assert.assertEquals(6, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(2, schedulerApp2.getLiveContainers().size()); + + rm1.close(); + } + + @Test (timeout = 60000) + public void testPreemptionConsidersHardNodeLocality() + throws Exception { + /** + * Test case: same as testSimplePreemption steps 1-3. + * + * Step 4: app2 asks for 1G container with hard locality specified, and + * asked host is not existed + * Confirm system doesn't preempt any container. + */ + MockRM rm1 = new MockRM(conf); + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB); + MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB); + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>()); + + // Do allocation 3 times for node1/node2 + for (int i = 0; i < 3; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } + for (int i = 0; i < 3; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + // App1 should have 7 containers now, and no available resource for cluster + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); + + // Submit app2 to queue-c and asks for a 1G container for AM + RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); + + // NM1/NM2 has available resource = 0G + Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId()) + .getAvailableResource().getMemorySize()); + Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId()) + .getAvailableResource().getMemorySize()); + + // AM asks for a 1 * GB container for h3 with hard locality, + // h3 doesn't exist in the cluster + am2.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.newInstance(1), ResourceRequest.ANY, + Resources.createResource(1 * GB), 1, true), ResourceRequest + .newInstance(Priority.newInstance(1), "h3", + Resources.createResource(1 * GB), 1, false), ResourceRequest + .newInstance(Priority.newInstance(1), "/default-rack", + Resources.createResource(1 * GB), 1, false)), null); + + // Get edit policy and do one update + SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1); + + // Call edit schedule twice, and check if one container from app1 marked + // to be "killable" + editPolicy.editSchedule(); + editPolicy.editSchedule(); + + PreemptionManager pm = cs.getPreemptionManager(); + Map<ContainerId, RMContainer> killableContainers = + waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1); + Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey() + .getApplicationAttemptId(), am1.getApplicationAttemptId()); + + // Call CS.handle once to see if container preempted + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + + FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( + am2.getApplicationAttemptId()); + + // App1 has 7 containers, and app2 has 1 containers (no container preempted) + Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(1, schedulerApp2.getLiveContainers().size()); + + // Do allocation again, nothing will be preempted + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + + // App1 has 7 containers, and app2 has 1 containers (no container allocated) + Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(1, schedulerApp2.getLiveContainers().size()); + + rm1.close(); + } + + @Test (timeout = 60000) + public void testPreemptionPolicyShouldRespectAlreadyMarkedKillableContainers() + throws Exception { + /** + * Test case: + * <pre> + * Root + * / | \ + * a b c + * 10 20 70 + * </pre> + * Submit applications to two queues, one uses more than the other, so + * preemption will happen. + * + * Check: + * 1) Killable containers resources will be excluded from PCPP (no duplicated + * container added to killable list) + * 2) When more resources need to be preempted, new containers will be selected + * and killable containers will be considered + */ + MockRM rm1 = new MockRM(conf); + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>()); + + // Do allocation 6 times for node1 + for (int i = 0; i < 6; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } + + // App1 should have 7 containers now, and no available resource for cluster + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); + + // Submit app2 to queue-c and asks for a 1G container for AM + RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1); + + // NM1 has available resource = 0G + Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId()) + .getAvailableResource().getMemorySize()); + am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>()); + + // Get edit policy and do one update + ProportionalCapacityPreemptionPolicy editPolicy = + (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1); + + // Call edit schedule twice, and check if one container from app1 marked + // to be "killable" + editPolicy.editSchedule(); + editPolicy.editSchedule(); + + PreemptionManager pm = cs.getPreemptionManager(); + waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1); + + // Check killable containers and to-be-preempted containers in edit policy + Assert.assertEquals(0, editPolicy.getToPreemptContainers().size()); + + // Run edit schedule again, confirm status doesn't changed + editPolicy.editSchedule(); + Assert.assertEquals(0, editPolicy.getToPreemptContainers().size()); + + // Save current to kill containers + Set<ContainerId> previousKillableContainers = new HashSet<>( + pm.getKillableContainersMap("a", RMNodeLabelsManager.NO_LABEL) + .keySet()); + + // Update request resource of c from 1 to 2, so we need to preempt + // one more container + am2.allocate("*", 1 * GB, 2, new ArrayList<ContainerId>()); + + // Call editPolicy.editSchedule() once, we should have 1 container in to-preempt map + // and 1 container in killable map + editPolicy.editSchedule(); + Assert.assertEquals(1, editPolicy.getToPreemptContainers().size()); + + // Call editPolicy.editSchedule() once more, we should have 2 containers killable map + editPolicy.editSchedule(); + Assert.assertEquals(0, editPolicy.getToPreemptContainers().size()); + + // Check if previous killable containers included by new killable containers + Map<ContainerId, RMContainer> killableContainers = + waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2); + Assert.assertTrue( + Sets.difference(previousKillableContainers, killableContainers.keySet()) + .isEmpty()); + } + + /* + * Ignore this test now because it could be a premature optimization + */ + @Ignore + @Test (timeout = 60000) + public void testPreemptionPolicyCleanupKillableContainersWhenNoPreemptionNeeded() + throws Exception { + /** + * Test case: + * <pre> + * Root + * / | \ + * a b c + * 10 20 70 + * </pre> + * Submit applications to two queues, one uses more than the other, so + * preemption will happen. + * + * Check: + * 1) Containers will be marked to killable + * 2) Cancel resource request + * 3) Killable containers will be cancelled from policy and scheduler + */ + MockRM rm1 = new MockRM(conf); + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>()); + + // Do allocation 6 times for node1 + for (int i = 0; i < 6; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } + + // App1 should have 7 containers now, and no available resource for cluster + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); + + // Submit app2 to queue-c and asks for a 1G container for AM + RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1); + + // NM1 has available resource = 0G + Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId()) + .getAvailableResource().getMemorySize()); + am2.allocate("*", 3 * GB, 1, new ArrayList<ContainerId>()); + + // Get edit policy and do one update + ProportionalCapacityPreemptionPolicy editPolicy = + (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1); + + // Call edit schedule twice, and check if 3 container from app1 marked + // to be "killable" + editPolicy.editSchedule(); + editPolicy.editSchedule(); + + PreemptionManager pm = cs.getPreemptionManager(); + waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 3); + + // Change reqeust from 3G to 2G, now we can preempt one less container. (3->2) + am2.allocate("*", 2 * GB, 1, new ArrayList<ContainerId>()); + editPolicy.editSchedule(); + Assert.assertEquals(0, editPolicy.getToPreemptContainers().size()); + waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2); + + // Call editSchedule once more to make sure still nothing happens + editPolicy.editSchedule(); + Assert.assertEquals(0, editPolicy.getToPreemptContainers().size()); + waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2); + } + + @Test (timeout = 60000) + public void testPreemptionConsidersUserLimit() + throws Exception { + /** + * Test case: Submit two application (app1/app2) to different queues, queue + * structure: + * + * <pre> + * Root + * / | \ + * a b c + * 10 20 70 + * </pre> + * + * Queue-c's user-limit-factor = 0.1, so single user cannot allocate >1 containers in queue-c + * + * 1) Two nodes in the cluster, each of them has 4G. + * + * 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no + * more resource available. + * + * 3) app2 submit to queue-c, ask for one 1G container (for AM) + * + * Now the cluster is fulfilled. + * + * 4) app2 asks for another 1G container, system will preempt one container + * from app1, and app2 will receive the preempted container + */ + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(conf); + csConf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".c", 0.1f); + MockRM rm1 = new MockRM(csConf); + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB); + MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB); + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>()); + + // Do allocation 3 times for node1/node2 + for (int i = 0; i < 3; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + // App1 should have 7 containers now, and no available resource for cluster + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); + + // Submit app2 to queue-c and asks for a 1G container for AM + RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); + + // NM1/NM2 has available resource = 0G + Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId()) + .getAvailableResource().getMemorySize()); + Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId()) + .getAvailableResource().getMemorySize()); + + // AM asks for a 1 * GB container + am2.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.newInstance(1), ResourceRequest.ANY, + Resources.createResource(1 * GB), 1)), null); + + // Get edit policy and do one update + SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1); + + // Call edit schedule twice, and check if no container from app1 marked + // to be "killable" + editPolicy.editSchedule(); + editPolicy.editSchedule(); + + // No preemption happens + PreemptionManager pm = cs.getPreemptionManager(); + Map<ContainerId, RMContainer> killableContainers = + waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 0); + Assert.assertEquals(0, killableContainers.size()); + + // Call CS.handle once to see if container preempted + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + + FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( + am2.getApplicationAttemptId()); + + // App1 has 7 containers, and app2 has 1 containers (nothing preempted) + Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(1, schedulerApp2.getLiveContainers().size()); + + rm1.close(); + } + + private Map<ContainerId, RMContainer> waitKillableContainersSize( + PreemptionManager pm, String queueName, String partition, + int expectedSize) throws InterruptedException { + Map<ContainerId, RMContainer> killableContainers = + pm.getKillableContainersMap(queueName, partition); + + int wait = 0; + // Wait for at most 5 sec (it should be super fast actually) + while (expectedSize != killableContainers.size() && wait < 500) { + killableContainers = pm.getKillableContainersMap(queueName, partition); + Thread.sleep(10); + wait++; + } + + Assert.assertEquals(expectedSize, killableContainers.size()); + return killableContainers; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java deleted file mode 100644 index 001899d..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java +++ /dev/null @@ -1,683 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; - -import com.google.common.collect.Sets; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.service.Service; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.MockAM; -import org.apache.hadoop.yarn.server.resourcemanager.MockNM; -import org.apache.hadoop.yarn.server.resourcemanager.MockRM; -import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMActiveServices; -import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; -import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; -import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy; -import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; -import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; -import org.apache.hadoop.yarn.util.Clock; -import org.apache.hadoop.yarn.util.resource.Resources; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class TestCapacitySchedulerPreemption { - private static final Log LOG = LogFactory.getLog( - TestCapacitySchedulerPreemption.class); - - private final int GB = 1024; - - private Configuration conf; - - RMNodeLabelsManager mgr; - - Clock clock; - - @Before - public void setUp() throws Exception { - conf = new YarnConfiguration(); - conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, - ResourceScheduler.class); - conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); - conf.setClass(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, - ProportionalCapacityPreemptionPolicy.class, SchedulingEditPolicy.class); - conf = TestUtils.getConfigurationWithMultipleQueues(this.conf); - - // Set preemption related configurations - conf.setInt(CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, - 0); - conf.setBoolean(CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED, - true); - conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND, - 1.0f); - conf.setFloat( - CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR, - 1.0f); - mgr = new NullRMNodeLabelsManager(); - mgr.init(this.conf); - clock = mock(Clock.class); - when(clock.getTime()).thenReturn(0L); - } - - private SchedulingEditPolicy getSchedulingEditPolicy(MockRM rm) { - RMActiveServices activeServices = rm.getRMActiveService(); - SchedulingMonitor mon = null; - for (Service service : activeServices.getServices()) { - if (service instanceof SchedulingMonitor) { - mon = (SchedulingMonitor) service; - break; - } - } - - if (mon != null) { - return mon.getSchedulingEditPolicy(); - } - return null; - } - - @Test (timeout = 60000) - public void testSimplePreemption() throws Exception { - /** - * Test case: Submit two application (app1/app2) to different queues, queue - * structure: - * - * <pre> - * Root - * / | \ - * a b c - * 10 20 70 - * </pre> - * - * 1) Two nodes in the cluster, each of them has 4G. - * - * 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no - * more resource available. - * - * 3) app2 submit to queue-c, ask for one 1G container (for AM) - * - * Now the cluster is fulfilled. - * - * 4) app2 asks for another 1G container, system will preempt one container - * from app1, and app2 will receive the preempted container - */ - MockRM rm1 = new MockRM(conf); - rm1.getRMContext().setNodeLabelManager(mgr); - rm1.start(); - - MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB); - MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB); - CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); - RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); - RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); - - // launch an app to queue, AM container should be launched in nm1 - RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); - MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - - am1.allocate("*", 1 * GB, 7, new ArrayList<ContainerId>()); - - // Do allocation 3 times for node1/node2 - for (int i = 0; i < 3; i++) { - cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); - cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); - } - - // App1 should have 7 containers now, and no available resource for cluster - FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( - am1.getApplicationAttemptId()); - Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); - - // Submit app2 to queue-c and asks for a 1G container for AM - RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c"); - MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); - - // NM1/NM2 has available resource = 0G - Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId()) - .getAvailableResource().getMemory()); - Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId()) - .getAvailableResource().getMemory()); - - // AM asks for a 1 * GB container - am2.allocate(Arrays.asList(ResourceRequest - .newInstance(Priority.newInstance(1), ResourceRequest.ANY, - Resources.createResource(1 * GB), 1)), null); - - // Get edit policy and do one update - SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1); - - // Call edit schedule twice, and check if one container from app1 marked - // to be "killable" - editPolicy.editSchedule(); - editPolicy.editSchedule(); - - PreemptionManager pm = cs.getPreemptionManager(); - Map<ContainerId, RMContainer> killableContainers = - waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1); - Assert.assertEquals(1, killableContainers.size()); - Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey() - .getApplicationAttemptId(), am1.getApplicationAttemptId()); - - // Call CS.handle once to see if container preempted - cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); - - FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( - am2.getApplicationAttemptId()); - - // App1 has 6 containers, and app2 has 2 containers - Assert.assertEquals(6, schedulerApp1.getLiveContainers().size()); - Assert.assertEquals(2, schedulerApp2.getLiveContainers().size()); - - rm1.close(); - } - - @Test (timeout = 60000) - public void testPreemptionConsidersNodeLocalityDelay() - throws Exception { - /** - * Test case: same as testSimplePreemption steps 1-3. - * - * Step 4: app2 asks for 1G container with locality specified, so it needs - * to wait for missed-opportunity before get scheduled. - * Check if system waits missed-opportunity before finish killable container - */ - MockRM rm1 = new MockRM(conf); - rm1.getRMContext().setNodeLabelManager(mgr); - rm1.start(); - MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB); - MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB); - CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); - RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); - RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); - - // launch an app to queue, AM container should be launched in nm1 - RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); - MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - - am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>()); - - // Do allocation 3 times for node1/node2 - for (int i = 0; i < 3; i++) { - cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); - cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); - } - - // App1 should have 7 containers now, and no available resource for cluster - FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( - am1.getApplicationAttemptId()); - Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); - - // Submit app2 to queue-c and asks for a 1G container for AM - RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c"); - MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); - - // NM1/NM2 has available resource = 0G - Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId()) - .getAvailableResource().getMemory()); - Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId()) - .getAvailableResource().getMemory()); - - // AM asks for a 1 * GB container with unknown host and unknown rack - am2.allocate(Arrays.asList(ResourceRequest - .newInstance(Priority.newInstance(1), ResourceRequest.ANY, - Resources.createResource(1 * GB), 1), ResourceRequest - .newInstance(Priority.newInstance(1), "unknownhost", - Resources.createResource(1 * GB), 1), ResourceRequest - .newInstance(Priority.newInstance(1), "/default-rack", - Resources.createResource(1 * GB), 1)), null); - - // Get edit policy and do one update - SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1); - - // Call edit schedule twice, and check if one container from app1 marked - // to be "killable" - editPolicy.editSchedule(); - editPolicy.editSchedule(); - - PreemptionManager pm = cs.getPreemptionManager(); - Map<ContainerId, RMContainer> killableContainers = - waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1); - Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey() - .getApplicationAttemptId(), am1.getApplicationAttemptId()); - - // Call CS.handle once to see if container preempted - cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); - - FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( - am2.getApplicationAttemptId()); - - // App1 has 7 containers, and app2 has 1 containers (no container preempted) - Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); - Assert.assertEquals(1, schedulerApp2.getLiveContainers().size()); - - // Do allocation again, one container will be preempted - cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); - - // App1 has 6 containers, and app2 has 2 containers (new container allocated) - Assert.assertEquals(6, schedulerApp1.getLiveContainers().size()); - Assert.assertEquals(2, schedulerApp2.getLiveContainers().size()); - - rm1.close(); - } - - @Test (timeout = 60000) - public void testPreemptionConsidersHardNodeLocality() - throws Exception { - /** - * Test case: same as testSimplePreemption steps 1-3. - * - * Step 4: app2 asks for 1G container with hard locality specified, and - * asked host is not existed - * Confirm system doesn't preempt any container. - */ - MockRM rm1 = new MockRM(conf); - rm1.getRMContext().setNodeLabelManager(mgr); - rm1.start(); - MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB); - MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB); - CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); - RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); - RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); - - // launch an app to queue, AM container should be launched in nm1 - RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); - MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - - am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>()); - - // Do allocation 3 times for node1/node2 - for (int i = 0; i < 3; i++) { - cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); - } - for (int i = 0; i < 3; i++) { - cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); - } - - // App1 should have 7 containers now, and no available resource for cluster - FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( - am1.getApplicationAttemptId()); - Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); - - // Submit app2 to queue-c and asks for a 1G container for AM - RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c"); - MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); - - // NM1/NM2 has available resource = 0G - Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId()) - .getAvailableResource().getMemory()); - Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId()) - .getAvailableResource().getMemory()); - - // AM asks for a 1 * GB container for h3 with hard locality, - // h3 doesn't exist in the cluster - am2.allocate(Arrays.asList(ResourceRequest - .newInstance(Priority.newInstance(1), ResourceRequest.ANY, - Resources.createResource(1 * GB), 1, true), ResourceRequest - .newInstance(Priority.newInstance(1), "h3", - Resources.createResource(1 * GB), 1, false), ResourceRequest - .newInstance(Priority.newInstance(1), "/default-rack", - Resources.createResource(1 * GB), 1, false)), null); - - // Get edit policy and do one update - SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1); - - // Call edit schedule twice, and check if one container from app1 marked - // to be "killable" - editPolicy.editSchedule(); - editPolicy.editSchedule(); - - PreemptionManager pm = cs.getPreemptionManager(); - Map<ContainerId, RMContainer> killableContainers = - waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1); - Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey() - .getApplicationAttemptId(), am1.getApplicationAttemptId()); - - // Call CS.handle once to see if container preempted - cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); - - FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( - am2.getApplicationAttemptId()); - - // App1 has 7 containers, and app2 has 1 containers (no container preempted) - Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); - Assert.assertEquals(1, schedulerApp2.getLiveContainers().size()); - - // Do allocation again, nothing will be preempted - cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); - - // App1 has 7 containers, and app2 has 1 containers (no container allocated) - Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); - Assert.assertEquals(1, schedulerApp2.getLiveContainers().size()); - - rm1.close(); - } - - @Test (timeout = 60000) - public void testPreemptionPolicyShouldRespectAlreadyMarkedKillableContainers() - throws Exception { - /** - * Test case: - * <pre> - * Root - * / | \ - * a b c - * 10 20 70 - * </pre> - * Submit applications to two queues, one uses more than the other, so - * preemption will happen. - * - * Check: - * 1) Killable containers resources will be excluded from PCPP (no duplicated - * container added to killable list) - * 2) When more resources need to be preempted, new containers will be selected - * and killable containers will be considered - */ - MockRM rm1 = new MockRM(conf); - rm1.getRMContext().setNodeLabelManager(mgr); - rm1.start(); - MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); - CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); - RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); - - // launch an app to queue, AM container should be launched in nm1 - RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); - MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - - am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>()); - - // Do allocation 6 times for node1 - for (int i = 0; i < 6; i++) { - cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); - } - - // App1 should have 7 containers now, and no available resource for cluster - FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( - am1.getApplicationAttemptId()); - Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); - - // Submit app2 to queue-c and asks for a 1G container for AM - RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c"); - MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1); - - // NM1 has available resource = 0G - Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId()) - .getAvailableResource().getMemory()); - am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>()); - - // Get edit policy and do one update - ProportionalCapacityPreemptionPolicy editPolicy = - (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1); - - // Call edit schedule twice, and check if one container from app1 marked - // to be "killable" - editPolicy.editSchedule(); - editPolicy.editSchedule(); - - PreemptionManager pm = cs.getPreemptionManager(); - waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1); - - // Check killable containers and to-be-preempted containers in edit policy - Assert.assertEquals(0, editPolicy.getToPreemptContainers().size()); - - // Run edit schedule again, confirm status doesn't changed - editPolicy.editSchedule(); - Assert.assertEquals(0, editPolicy.getToPreemptContainers().size()); - - // Save current to kill containers - Set<ContainerId> previousKillableContainers = new HashSet<>( - pm.getKillableContainersMap("a", RMNodeLabelsManager.NO_LABEL) - .keySet()); - - // Update request resource of c from 1 to 2, so we need to preempt - // one more container - am2.allocate("*", 1 * GB, 2, new ArrayList<ContainerId>()); - - // Call editPolicy.editSchedule() once, we should have 1 container in to-preempt map - // and 1 container in killable map - editPolicy.editSchedule(); - Assert.assertEquals(1, editPolicy.getToPreemptContainers().size()); - - // Call editPolicy.editSchedule() once more, we should have 2 containers killable map - editPolicy.editSchedule(); - Assert.assertEquals(0, editPolicy.getToPreemptContainers().size()); - - // Check if previous killable containers included by new killable containers - Map<ContainerId, RMContainer> killableContainers = - waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2); - Assert.assertTrue( - Sets.difference(previousKillableContainers, killableContainers.keySet()) - .isEmpty()); - } - - /* - * Ignore this test now because it could be a premature optimization - */ - @Ignore - @Test (timeout = 60000) - public void testPreemptionPolicyCleanupKillableContainersWhenNoPreemptionNeeded() - throws Exception { - /** - * Test case: - * <pre> - * Root - * / | \ - * a b c - * 10 20 70 - * </pre> - * Submit applications to two queues, one uses more than the other, so - * preemption will happen. - * - * Check: - * 1) Containers will be marked to killable - * 2) Cancel resource request - * 3) Killable containers will be cancelled from policy and scheduler - */ - MockRM rm1 = new MockRM(conf); - rm1.getRMContext().setNodeLabelManager(mgr); - rm1.start(); - MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); - CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); - RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); - - // launch an app to queue, AM container should be launched in nm1 - RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); - MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - - am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>()); - - // Do allocation 6 times for node1 - for (int i = 0; i < 6; i++) { - cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); - } - - // App1 should have 7 containers now, and no available resource for cluster - FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( - am1.getApplicationAttemptId()); - Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); - - // Submit app2 to queue-c and asks for a 1G container for AM - RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c"); - MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1); - - // NM1 has available resource = 0G - Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId()) - .getAvailableResource().getMemory()); - am2.allocate("*", 3 * GB, 1, new ArrayList<ContainerId>()); - - // Get edit policy and do one update - ProportionalCapacityPreemptionPolicy editPolicy = - (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1); - - // Call edit schedule twice, and check if 3 container from app1 marked - // to be "killable" - editPolicy.editSchedule(); - editPolicy.editSchedule(); - - PreemptionManager pm = cs.getPreemptionManager(); - waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 3); - - // Change reqeust from 3G to 2G, now we can preempt one less container. (3->2) - am2.allocate("*", 2 * GB, 1, new ArrayList<ContainerId>()); - editPolicy.editSchedule(); - Assert.assertEquals(0, editPolicy.getToPreemptContainers().size()); - waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2); - - // Call editSchedule once more to make sure still nothing happens - editPolicy.editSchedule(); - Assert.assertEquals(0, editPolicy.getToPreemptContainers().size()); - waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2); - } - - @Test (timeout = 60000) - public void testPreemptionConsidersUserLimit() - throws Exception { - /** - * Test case: Submit two application (app1/app2) to different queues, queue - * structure: - * - * <pre> - * Root - * / | \ - * a b c - * 10 20 70 - * </pre> - * - * Queue-c's user-limit-factor = 0.1, so single user cannot allocate >1 containers in queue-c - * - * 1) Two nodes in the cluster, each of them has 4G. - * - * 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no - * more resource available. - * - * 3) app2 submit to queue-c, ask for one 1G container (for AM) - * - * Now the cluster is fulfilled. - * - * 4) app2 asks for another 1G container, system will preempt one container - * from app1, and app2 will receive the preempted container - */ - CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(conf); - csConf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".c", 0.1f); - MockRM rm1 = new MockRM(csConf); - rm1.getRMContext().setNodeLabelManager(mgr); - rm1.start(); - MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB); - MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB); - CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); - RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); - RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); - - // launch an app to queue, AM container should be launched in nm1 - RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); - MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - - am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>()); - - // Do allocation 3 times for node1/node2 - for (int i = 0; i < 3; i++) { - cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); - cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); - } - - // App1 should have 7 containers now, and no available resource for cluster - FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( - am1.getApplicationAttemptId()); - Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); - - // Submit app2 to queue-c and asks for a 1G container for AM - RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c"); - MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); - - // NM1/NM2 has available resource = 0G - Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId()) - .getAvailableResource().getMemory()); - Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId()) - .getAvailableResource().getMemory()); - - // AM asks for a 1 * GB container - am2.allocate(Arrays.asList(ResourceRequest - .newInstance(Priority.newInstance(1), ResourceRequest.ANY, - Resources.createResource(1 * GB), 1)), null); - - // Get edit policy and do one update - SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1); - - // Call edit schedule twice, and check if no container from app1 marked - // to be "killable" - editPolicy.editSchedule(); - editPolicy.editSchedule(); - - // No preemption happens - PreemptionManager pm = cs.getPreemptionManager(); - Map<ContainerId, RMContainer> killableContainers = - waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 0); - Assert.assertEquals(0, killableContainers.size()); - - // Call CS.handle once to see if container preempted - cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); - - FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( - am2.getApplicationAttemptId()); - - // App1 has 7 containers, and app2 has 1 containers (nothing preempted) - Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); - Assert.assertEquals(1, schedulerApp2.getLiveContainers().size()); - - rm1.close(); - } - - private Map<ContainerId, RMContainer> waitKillableContainersSize( - PreemptionManager pm, String queueName, String partition, - int expectedSize) throws InterruptedException { - Map<ContainerId, RMContainer> killableContainers = - pm.getKillableContainersMap(queueName, partition); - - int wait = 0; - // Wait for at most 5 sec (it should be super fast actually) - while (expectedSize != killableContainers.size() && wait < 500) { - killableContainers = pm.getKillableContainersMap(queueName, partition); - Thread.sleep(10); - wait++; - } - - Assert.assertEquals(expectedSize, killableContainers.size()); - return killableContainers; - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org