http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java index a3b88c0..01d5e6c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java @@ -170,7 +170,7 @@ public class TestCapacitySchedulerAutoQueueCreation 1 * GB, 1, true, priority, recordFactory); cs.allocate(appAttemptId, Collections.<ResourceRequest>singletonList(r1), - Collections.<ContainerId>emptyList(), Collections.singletonList(host), + null, Collections.<ContainerId>emptyList(), Collections.singletonList(host), null, NULL_UPDATE_REQUESTS); //And this will result in container assignment for app1
http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSchedulingRequestUpdate.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSchedulingRequestUpdate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSchedulingRequestUpdate.java new file mode 100644 index 0000000..b6ac4b6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSchedulingRequestUpdate.java @@ -0,0 +1,260 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.ResourceSizing; +import org.apache.hadoop.yarn.api.resource.PlacementConstraints; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Test; + +import java.util.Arrays; + +public class TestCapacitySchedulerSchedulingRequestUpdate + extends CapacitySchedulerTestBase { + @Test + public void testBasicPendingResourceUpdate() throws Exception { + Configuration conf = TestUtils.getConfigurationWithQueueLabels( + new Configuration(false)); + conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); + + final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y")); + mgr.addLabelsToNode( + ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); + + MockRM rm = new MockRM(conf) { + protected RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm.start(); + MockNM nm1 = // label = x + new MockNM("h1:1234", 200 * GB, rm.getResourceTrackerService()); + nm1.registerNode(); + + MockNM nm2 = // label = "" + new MockNM("h2:1234", 200 * GB, rm.getResourceTrackerService()); + nm2.registerNode(); + + // Launch app1 in queue=a1 + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); + + // Launch app2 in queue=b1 + RMApp app2 = rm.submitApp(8 * GB, "app", "user", null, "b1"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2); + // am1 asks for 8 * 1GB container for no label + am1.allocateIntraAppAntiAffinity( + ResourceSizing.newInstance(8, Resource.newInstance(1 * GB, 1)), + Priority.newInstance(1), 0, ImmutableSet.of("mapper", "reducer"), + "mapper", "reducer"); + + checkPendingResource(rm, "a1", 8 * GB, null); + checkPendingResource(rm, "a", 8 * GB, null); + checkPendingResource(rm, "root", 8 * GB, null); + + // am2 asks for 8 * 1GB container for no label + am2.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.newInstance(1), "*", + Resources.createResource(1 * GB), 8)), null); + + checkPendingResource(rm, "a1", 8 * GB, null); + checkPendingResource(rm, "a", 8 * GB, null); + checkPendingResource(rm, "b1", 8 * GB, null); + checkPendingResource(rm, "b", 8 * GB, null); + // root = a + b + checkPendingResource(rm, "root", 16 * GB, null); + + // am2 asks for 8 * 1GB container in another priority for no label + am2.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.newInstance(2), "*", + Resources.createResource(1 * GB), 8)), null); + + checkPendingResource(rm, "a1", 8 * GB, null); + checkPendingResource(rm, "a", 8 * GB, null); + checkPendingResource(rm, "b1", 16 * GB, null); + checkPendingResource(rm, "b", 16 * GB, null); + // root = a + b + checkPendingResource(rm, "root", 24 * GB, null); + + // am1 asks 4 GB resource instead of 8 * GB for priority=1 + // am1 asks for 8 * 1GB container for no label + am1.allocateIntraAppAntiAffinity( + ResourceSizing.newInstance(4, Resource.newInstance(1 * GB, 1)), + Priority.newInstance(1), 0, ImmutableSet.of("mapper", "reducer"), + "mapper", "reducer"); + + checkPendingResource(rm, "a1", 4 * GB, null); + checkPendingResource(rm, "a", 4 * GB, null); + checkPendingResource(rm, "b1", 16 * GB, null); + checkPendingResource(rm, "b", 16 * GB, null); + // root = a + b + checkPendingResource(rm, "root", 20 * GB, null); + + // am1 asks 8 * GB resource which label=x + am1.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.newInstance(2), "*", + Resources.createResource(8 * GB), 1, true, "x")), null); + + checkPendingResource(rm, "a1", 4 * GB, null); + checkPendingResource(rm, "a", 4 * GB, null); + checkPendingResource(rm, "a1", 8 * GB, "x"); + checkPendingResource(rm, "a", 8 * GB, "x"); + checkPendingResource(rm, "b1", 16 * GB, null); + checkPendingResource(rm, "b", 16 * GB, null); + // root = a + b + checkPendingResource(rm, "root", 20 * GB, null); + checkPendingResource(rm, "root", 8 * GB, "x"); + + // complete am1/am2, pending resource should be 0 now + AppAttemptRemovedSchedulerEvent appRemovedEvent = + new AppAttemptRemovedSchedulerEvent(am2.getApplicationAttemptId(), + RMAppAttemptState.FINISHED, false); + rm.getResourceScheduler().handle(appRemovedEvent); + appRemovedEvent = new AppAttemptRemovedSchedulerEvent( + am1.getApplicationAttemptId(), RMAppAttemptState.FINISHED, false); + rm.getResourceScheduler().handle(appRemovedEvent); + + checkPendingResource(rm, "a1", 0 * GB, null); + checkPendingResource(rm, "a", 0 * GB, null); + checkPendingResource(rm, "a1", 0 * GB, "x"); + checkPendingResource(rm, "a", 0 * GB, "x"); + checkPendingResource(rm, "b1", 0 * GB, null); + checkPendingResource(rm, "b", 0 * GB, null); + checkPendingResource(rm, "root", 0 * GB, null); + checkPendingResource(rm, "root", 0 * GB, "x"); + } + + @Test + public void testNodePartitionPendingResourceUpdate() throws Exception { + Configuration conf = TestUtils.getConfigurationWithQueueLabels( + new Configuration(false)); + conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); + + final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y")); + mgr.addLabelsToNode( + ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); + + MockRM rm = new MockRM(conf) { + protected RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm.start(); + MockNM nm1 = // label = x + new MockNM("h1:1234", 200 * GB, rm.getResourceTrackerService()); + nm1.registerNode(); + + MockNM nm2 = // label = "" + new MockNM("h2:1234", 200 * GB, rm.getResourceTrackerService()); + nm2.registerNode(); + + // Launch app1 in queue=a1 + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); + + // Launch app2 in queue=b1 + RMApp app2 = rm.submitApp(8 * GB, "app", "user", null, "b1"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2); + // am1 asks for 8 * 1GB container for "x" + am1.allocateIntraAppAntiAffinity("x", + ResourceSizing.newInstance(8, Resource.newInstance(1 * GB, 1)), + Priority.newInstance(1), 0, "mapper", "reducer"); + + checkPendingResource(rm, "a1", 8 * GB, "x"); + checkPendingResource(rm, "a", 8 * GB, "x"); + checkPendingResource(rm, "root", 8 * GB, "x"); + + // am2 asks for 8 * 1GB container for "x" + am2.allocateIntraAppAntiAffinity("x", + ResourceSizing.newInstance(8, Resource.newInstance(1 * GB, 1)), + Priority.newInstance(1), 0, "mapper", "reducer"); + + checkPendingResource(rm, "a1", 8 * GB, "x"); + checkPendingResource(rm, "a", 8 * GB, "x"); + checkPendingResource(rm, "b1", 8 * GB, "x"); + checkPendingResource(rm, "b", 8 * GB, "x"); + // root = a + b + checkPendingResource(rm, "root", 16 * GB, "x"); + + // am1 asks for 6 * 1GB container for "x" in another priority + am1.allocateIntraAppAntiAffinity("x", + ResourceSizing.newInstance(6, Resource.newInstance(1 * GB, 1)), + Priority.newInstance(2), 0, "mapper", "reducer"); + + checkPendingResource(rm, "a1", 14 * GB, "x"); + checkPendingResource(rm, "a", 14 * GB, "x"); + checkPendingResource(rm, "b1", 8 * GB, "x"); + checkPendingResource(rm, "b", 8 * GB, "x"); + // root = a + b + checkPendingResource(rm, "root", 22 * GB, "x"); + + // am1 asks for 4 * 1GB container for "x" in priority=1, which should + // override 8 * 1GB + am1.allocateIntraAppAntiAffinity("x", + ResourceSizing.newInstance(4, Resource.newInstance(1 * GB, 1)), + Priority.newInstance(1), 0, "mapper", "reducer"); + + checkPendingResource(rm, "a1", 10 * GB, "x"); + checkPendingResource(rm, "a", 10 * GB, "x"); + checkPendingResource(rm, "b1", 8 * GB, "x"); + checkPendingResource(rm, "b", 8 * GB, "x"); + // root = a + b + checkPendingResource(rm, "root", 18 * GB, "x"); + + // complete am1/am2, pending resource should be 0 now + AppAttemptRemovedSchedulerEvent appRemovedEvent = + new AppAttemptRemovedSchedulerEvent(am2.getApplicationAttemptId(), + RMAppAttemptState.FINISHED, false); + rm.getResourceScheduler().handle(appRemovedEvent); + appRemovedEvent = new AppAttemptRemovedSchedulerEvent( + am1.getApplicationAttemptId(), RMAppAttemptState.FINISHED, false); + rm.getResourceScheduler().handle(appRemovedEvent); + + checkPendingResource(rm, "a1", 0 * GB, null); + checkPendingResource(rm, "a", 0 * GB, null); + checkPendingResource(rm, "a1", 0 * GB, "x"); + checkPendingResource(rm, "a", 0 * GB, "x"); + checkPendingResource(rm, "b1", 0 * GB, null); + checkPendingResource(rm, "b", 0 * GB, null); + checkPendingResource(rm, "root", 0 * GB, null); + checkPendingResource(rm, "root", 0 * GB, "x"); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java index d2e28be..a800bef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java @@ -132,7 +132,7 @@ public class TestIncreaseAllocationExpirer { Assert.assertEquals(RMContainerState.RUNNING, rm1.getResourceScheduler().getRMContainer(containerId2).getState()); // Verify container size is 3G - Assert.assertEquals( + Assert.assertEquals( 3 * GB, rm1.getResourceScheduler().getRMContainer(containerId2) .getAllocatedResource().getMemorySize()); // Verify total resource usage http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java new file mode 100644 index 0000000..0a44a1e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceSizing; +import org.apache.hadoop.yarn.api.resource.PlacementConstraints; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestSchedulingRequestContainerAllocation { + private final int GB = 1024; + + private YarnConfiguration conf; + + RMNodeLabelsManager mgr; + + @Before + public void setUp() throws Exception { + conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + } + + @Test + public void testIntraAppAntiAffinity() throws Exception { + Configuration csConf = TestUtils.getConfigurationWithMultipleQueues( + new Configuration()); + csConf.setBoolean(CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED, + true); + + // inject node label manager + MockRM rm1 = new MockRM(csConf) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + + // 4 NMs. + MockNM[] nms = new MockNM[4]; + RMNode[] rmNodes = new RMNode[4]; + for (int i = 0; i < 4; i++) { + nms[i] = rm1.registerNode("192.168.0." + i + ":1234", 10 * GB); + rmNodes[i] = rm1.getRMContext().getRMNodes().get(nms[i].getNodeId()); + } + + // app1 -> c + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms[0]); + + // app1 asks for 10 anti-affinity containers for the same app. It should + // only get 4 containers allocated because we only have 4 nodes. + am1.allocateIntraAppAntiAffinity( + ResourceSizing.newInstance(10, Resource.newInstance(1024, 1)), + Priority.newInstance(1), 1L, ImmutableSet.of("mapper"), "mapper"); + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + + for (int i = 0; i < 3; i++) { + for (int j = 0; j < 4; j++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j])); + } + } + + // App1 should get 5 containers allocated (1 AM + 1 node each). + FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(5, schedulerApp.getLiveContainers().size()); + + // Similarly, app1 asks 10 anti-affinity containers at different priority, + // it should be satisfied as well. + // app1 asks for 10 anti-affinity containers for the same app. It should + // only get 4 containers allocated because we only have 4 nodes. + am1.allocateIntraAppAntiAffinity( + ResourceSizing.newInstance(10, Resource.newInstance(2048, 1)), + Priority.newInstance(2), 1L, ImmutableSet.of("reducer"), "reducer"); + + for (int i = 0; i < 3; i++) { + for (int j = 0; j < 4; j++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j])); + } + } + + // App1 should get 9 containers allocated (1 AM + 8 containers). + Assert.assertEquals(9, schedulerApp.getLiveContainers().size()); + + // Test anti-affinity to both of "mapper/reducer", we should only get no + // container allocated + am1.allocateIntraAppAntiAffinity( + ResourceSizing.newInstance(10, Resource.newInstance(2048, 1)), + Priority.newInstance(3), 1L, ImmutableSet.of("reducer2"), "mapper"); + for (int i = 0; i < 3; i++) { + for (int j = 0; j < 4; j++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j])); + } + } + + // App1 should get 10 containers allocated (1 AM + 9 containers). + Assert.assertEquals(9, schedulerApp.getLiveContainers().size()); + + rm1.close(); + } + + @Test + public void testIntraAppAntiAffinityWithMultipleTags() throws Exception { + Configuration csConf = TestUtils.getConfigurationWithMultipleQueues( + new Configuration()); + csConf.setBoolean(CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED, + true); + + // inject node label manager + MockRM rm1 = new MockRM(csConf) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + + // 4 NMs. + MockNM[] nms = new MockNM[4]; + RMNode[] rmNodes = new RMNode[4]; + for (int i = 0; i < 4; i++) { + nms[i] = rm1.registerNode("192.168.0." + i + ":1234", 10 * GB); + rmNodes[i] = rm1.getRMContext().getRMNodes().get(nms[i].getNodeId()); + } + + // app1 -> c + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms[0]); + + // app1 asks for 2 anti-affinity containers for the same app. + am1.allocateIntraAppAntiAffinity( + ResourceSizing.newInstance(2, Resource.newInstance(1024, 1)), + Priority.newInstance(1), 1L, ImmutableSet.of("tag_1_1", "tag_1_2"), + "tag_1_1", "tag_1_2"); + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + + for (int i = 0; i < 3; i++) { + for (int j = 0; j < 4; j++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j])); + } + } + + // App1 should get 3 containers allocated (1 AM + 2 task). + FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(3, schedulerApp.getLiveContainers().size()); + + // app1 asks for 1 anti-affinity containers for the same app. anti-affinity + // to tag_1_1/tag_1_2. With allocation_tag = tag_2_1/tag_2_2 + am1.allocateIntraAppAntiAffinity( + ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)), + Priority.newInstance(2), 1L, ImmutableSet.of("tag_2_1", "tag_2_2"), + "tag_1_1", "tag_1_2"); + + for (int i = 0; i < 3; i++) { + for (int j = 0; j < 4; j++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j])); + } + } + + // App1 should get 4 containers allocated (1 AM + 2 task (first request) + + // 1 task (2nd request). + Assert.assertEquals(4, schedulerApp.getLiveContainers().size()); + + // app1 asks for 10 anti-affinity containers for the same app. anti-affinity + // to tag_1_1/tag_1_2/tag_2_1/tag_2_2. With allocation_tag = tag_3 + am1.allocateIntraAppAntiAffinity( + ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)), + Priority.newInstance(3), 1L, ImmutableSet.of("tag_3"), + "tag_1_1", "tag_1_2", "tag_2_1", "tag_2_2"); + + for (int i = 0; i < 3; i++) { + for (int j = 0; j < 4; j++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j])); + } + } + + // App1 should get 1 more containers allocated + // 1 AM + 2 task (first request) + 1 task (2nd request) + + // 1 task (3rd request) + Assert.assertEquals(5, schedulerApp.getLiveContainers().size()); + + rm1.close(); + } + + @Test + public void testSchedulingRequestDisabledByDefault() throws Exception { + Configuration csConf = TestUtils.getConfigurationWithMultipleQueues( + new Configuration()); + + // inject node label manager + MockRM rm1 = new MockRM(csConf) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + + // 4 NMs. + MockNM[] nms = new MockNM[4]; + RMNode[] rmNodes = new RMNode[4]; + for (int i = 0; i < 4; i++) { + nms[i] = rm1.registerNode("192.168.0." + i + ":1234", 10 * GB); + rmNodes[i] = rm1.getRMContext().getRMNodes().get(nms[i].getNodeId()); + } + + // app1 -> c + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms[0]); + + // app1 asks for 2 anti-affinity containers for the same app. + boolean caughtException = false; + try { + // Since feature is disabled by default, we should expect exception. + am1.allocateIntraAppAntiAffinity( + ResourceSizing.newInstance(2, Resource.newInstance(1024, 1)), + Priority.newInstance(1), 1L, ImmutableSet.of("tag_1_1", "tag_1_2"), + "tag_1_1", "tag_1_2"); + } catch (Exception e) { + caughtException = true; + } + Assert.assertTrue(caughtException); + rm1.close(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java new file mode 100644 index 0000000..c7f13cd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceSizing; +import org.apache.hadoop.yarn.api.resource.PlacementConstraints; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestSchedulingRequestContainerAllocationAsync { + private final int GB = 1024; + + private YarnConfiguration conf; + + RMNodeLabelsManager mgr; + + @Before + public void setUp() throws Exception { + conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + } + + private void testIntraAppAntiAffinityAsync(int numThreads) throws Exception { + Configuration csConf = TestUtils.getConfigurationWithMultipleQueues( + new Configuration()); + csConf.setBoolean(CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED, + true); + csConf.setInt( + CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD, + numThreads); + csConf.setInt(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX + + ".scheduling-interval-ms", 0); + + // inject node label manager + MockRM rm1 = new MockRM(csConf) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + + // 200 NMs. + int nNMs = 200; + MockNM[] nms = new MockNM[nNMs]; + RMNode[] rmNodes = new RMNode[nNMs]; + for (int i = 0; i < nNMs; i++) { + nms[i] = rm1.registerNode("127.0.0." + i + ":1234", 10 * GB); + rmNodes[i] = rm1.getRMContext().getRMNodes().get(nms[i].getNodeId()); + } + + // app1 -> c + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms[0]); + + // app1 asks for 10 anti-affinity containers for the same app. It should + // only get 4 containers allocated because we only have 4 nodes. + am1.allocateIntraAppAntiAffinity( + ResourceSizing.newInstance(1000, Resource.newInstance(1024, 1)), + Priority.newInstance(1), 1L, ImmutableSet.of("mapper"), "mapper"); + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + + for (int i = 0; i < 3; i++) { + for (int j = 0; j < nNMs; j++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j])); + } + } + + // App1 should get #NM + 1 containers allocated (1 node each + 1 AM). + FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(nNMs + 1, schedulerApp.getLiveContainers().size()); + + rm1.close(); + } + + @Test(timeout = 300000) + public void testSingleThreadAsyncContainerAllocation() throws Exception { + testIntraAppAntiAffinityAsync(1); + } + + @Test(timeout = 300000) + public void testTwoThreadsAsyncContainerAllocation() throws Exception { + testIntraAppAntiAffinityAsync(2); + } + + @Test(timeout = 300000) + public void testThreeThreadsAsyncContainerAllocation() throws Exception { + testIntraAppAntiAffinityAsync(3); + } + + @Test(timeout = 300000) + public void testFourThreadsAsyncContainerAllocation() throws Exception { + testIntraAppAntiAffinityAsync(4); + } + + @Test(timeout = 300000) + public void testFiveThreadsAsyncContainerAllocation() throws Exception { + testIntraAppAntiAffinityAsync(5); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index e8734cc..542ba3e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -275,6 +275,8 @@ public class TestUtils { public static Configuration getConfigurationWithQueueLabels(Configuration config) { CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(config); + conf.setBoolean(CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED, + true); // Define top-level queues conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"}); http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java index f1d5663..7afe4ef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java @@ -20,10 +20,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; -import java.util.List; - +import com.google.common.collect.ImmutableSet; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.resource.PlacementConstraints; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -33,7 +33,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import com.google.common.collect.ImmutableSet; +import java.util.List; /** * Test functionality of AllocationTagsManager. @@ -54,7 +54,6 @@ public class TestAllocationTagsManager { rmContext = rm.getRMContext(); } - @Test public void testAllocationTagsManagerSimpleCases() throws InvalidAllocationTagsQueryException { @@ -141,30 +140,31 @@ public class TestAllocationTagsManager { // Get Node Cardinality of app1 on node2, with tag "<applicationId>", op=max // (Expect this returns #containers from app1 on node2) - Assert - .assertEquals(2, - atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(1), - ImmutableSet.of(AllocationTagsNamespaces.APP_ID - + TestUtils.getMockApplicationId(1).toString()), - Long::max)); + Assert.assertEquals(2, + atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), + TestUtils.getMockApplicationId(1), null, Long::max)); // Get Node Cardinality of app1 on node2, with empty tag set, op=max Assert.assertEquals(2, atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), + TestUtils.getMockApplicationId(1), null, Long::max)); + + // Get Cardinality of app1 on node2, with empty tag set, op=max + Assert.assertEquals(2, + atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::max)); // Get Node Cardinality of all apps on node2, with empty tag set, op=sum - Assert.assertEquals(7, atm.getNodeCardinalityByOp( + Assert.assertEquals(4, atm.getNodeCardinalityByOp( NodeId.fromString("host2:123"), null, ImmutableSet.of(), Long::sum)); // Get Node Cardinality of app_1 on node2, with empty tag set, op=sum - Assert.assertEquals(5, + Assert.assertEquals(3, atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::sum)); // Get Node Cardinality of app_1 on node2, with empty tag set, op=sum - Assert.assertEquals(2, + Assert.assertEquals(1, atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), TestUtils.getMockApplicationId(2), ImmutableSet.of(), Long::sum)); @@ -296,7 +296,7 @@ public class TestAllocationTagsManager { Assert.assertEquals(3, atm.getRackCardinality("rack0", null, "reducer")); // Get Rack Cardinality of app_1 on rack0, with empty tag set, op=max - Assert.assertEquals(2, atm.getRackCardinalityByOp("rack0", + Assert.assertEquals(1, atm.getRackCardinalityByOp("rack0", TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::max)); // Get Rack Cardinality of app_1 on rack0, with empty tag set, op=min http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java index 7492233..8ad726e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java @@ -117,9 +117,9 @@ public class TestPlacementConstraintsUtil { RMNode currentNode = nodeIterator.next(); FiCaSchedulerNode schedulerNode = TestUtils.getMockNode( currentNode.getHostName(), currentNode.getRackName(), 123, 4 * GB); - Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, sourceTag1, schedulerNode, pcm, tm)); - Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, sourceTag2, schedulerNode, pcm, tm)); } /** @@ -145,14 +145,14 @@ public class TestPlacementConstraintsUtil { tm.addContainer(n0_r1.getNodeID(), hbase_m, ImmutableSet.of("hbase-m")); // 'spark' placement on Node0 should now SUCCEED - Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + Assert.assertTrue(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, sourceTag1, schedulerNode0, pcm, tm)); // FAIL on the rest of the nodes - Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, sourceTag1, schedulerNode1, pcm, tm)); - Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, sourceTag1, schedulerNode2, pcm, tm)); - Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, sourceTag1, schedulerNode3, pcm, tm)); } @@ -187,15 +187,15 @@ public class TestPlacementConstraintsUtil { FiCaSchedulerNode schedulerNode3 = TestUtils .getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB); // 'zk' placement on Rack1 should now SUCCEED - Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + Assert.assertTrue(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, sourceTag2, schedulerNode0, pcm, tm)); - Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + Assert.assertTrue(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, sourceTag2, schedulerNode1, pcm, tm)); // FAIL on the rest of the RACKs - Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, sourceTag2, schedulerNode2, pcm, tm)); - Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, sourceTag2, schedulerNode3, pcm, tm)); } @@ -230,14 +230,14 @@ public class TestPlacementConstraintsUtil { tm.addContainer(n0_r1.getNodeID(), hbase_m, ImmutableSet.of("hbase-m")); // 'spark' placement on Node0 should now FAIL - Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, sourceTag1, schedulerNode0, pcm, tm)); // SUCCEED on the rest of the nodes - Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + Assert.assertTrue(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, sourceTag1, schedulerNode1, pcm, tm)); - Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + Assert.assertTrue(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, sourceTag1, schedulerNode2, pcm, tm)); - Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + Assert.assertTrue(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, sourceTag1, schedulerNode3, pcm, tm)); } @@ -273,15 +273,15 @@ public class TestPlacementConstraintsUtil { .getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB); // 'zk' placement on Rack1 should FAIL - Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, sourceTag2, schedulerNode0, pcm, tm)); - Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, sourceTag2, schedulerNode1, pcm, tm)); // SUCCEED on the rest of the RACKs - Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + Assert.assertTrue(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, sourceTag2, schedulerNode2, pcm, tm)); - Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + Assert.assertTrue(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1, sourceTag2, schedulerNode3, pcm, tm)); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index 5f29186..b998564 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -192,7 +192,7 @@ public class FairSchedulerTestBase { resourceManager.getRMContext().getRMApps() .put(id.getApplicationId(), rmApp); - scheduler.allocate(id, ask, new ArrayList<ContainerId>(), + scheduler.allocate(id, ask, null, new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS); scheduler.update(); return id; @@ -222,7 +222,7 @@ public class FairSchedulerTestBase { resourceManager.getRMContext().getRMApps() .put(id.getApplicationId(), rmApp); - scheduler.allocate(id, ask, new ArrayList<ContainerId>(), + scheduler.allocate(id, ask, null, new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS); return id; } @@ -245,7 +245,7 @@ public class FairSchedulerTestBase { ResourceRequest request, ApplicationAttemptId attId) { List<ResourceRequest> ask = new ArrayList<ResourceRequest>(); ask.add(request); - scheduler.allocate(attId, ask, new ArrayList<ContainerId>(), + scheduler.allocate(attId, ask, null, new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS); scheduler.update(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java index 95dbaea..2512787 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java @@ -125,7 +125,7 @@ public class TestContinuousScheduling extends FairSchedulerTestBase { List<ResourceRequest> ask = new ArrayList<>(); ask.add(createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true)); scheduler.allocate( - appAttemptId, ask, new ArrayList<ContainerId>(), + appAttemptId, ask, null, new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS); FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId); @@ -163,8 +163,7 @@ public class TestContinuousScheduling extends FairSchedulerTestBase { ResourceRequest request = createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true); ask.add(request); - scheduler.allocate(appAttemptId, ask, - new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS); + scheduler.allocate(appAttemptId, ask, null, new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS); triggerSchedulingAttempt(); FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId); @@ -175,8 +174,7 @@ public class TestContinuousScheduling extends FairSchedulerTestBase { createResourceRequest(1024, 1, ResourceRequest.ANY, 2, 1, true); ask.clear(); ask.add(request); - scheduler.allocate(appAttemptId, ask, - new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS); + scheduler.allocate(appAttemptId, ask, null, new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS); triggerSchedulingAttempt(); checkAppConsumption(app, Resources.createResource(2048,2)); @@ -373,7 +371,7 @@ public class TestContinuousScheduling extends FairSchedulerTestBase { true); ask1.add(request1); ask1.add(request2); - scheduler.allocate(id11, ask1, new ArrayList<ContainerId>(), null, null, + scheduler.allocate(id11, ask1, null, new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 77b6d04..d9c06a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -1280,7 +1280,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { List<ResourceRequest> asks = new ArrayList<ResourceRequest>(); asks.add(createResourceRequest(2048, node2.getRackName(), 1, 1, false)); - scheduler.allocate(attemptId, asks, new ArrayList<ContainerId>(), null, + scheduler.allocate(attemptId, asks, null, new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS); ApplicationAttemptId attId = createSchedulingRequest(2048, "queue1", "user1", 1); @@ -2125,7 +2125,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { ResourceRequest request1 = createResourceRequest(minReqSize * 2, ResourceRequest.ANY, 1, 1, true); ask1.add(request1); - scheduler.allocate(id11, ask1, new ArrayList<ContainerId>(), + scheduler.allocate(id11, ask1, null, new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS); // Second ask, queue2 requests 1 large. @@ -2141,7 +2141,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { ResourceRequest.ANY, 1, 1, false); ask2.add(request2); ask2.add(request3); - scheduler.allocate(id21, ask2, new ArrayList<ContainerId>(), + scheduler.allocate(id21, ask2, null, new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS); // Third ask, queue2 requests 2 small (minReqSize). @@ -2157,7 +2157,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { ResourceRequest.ANY, 2, 2, true); ask3.add(request4); ask3.add(request5); - scheduler.allocate(id22, ask3, new ArrayList<ContainerId>(), + scheduler.allocate(id22, ask3, null, new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS); scheduler.update(); @@ -2683,7 +2683,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Complete the first container so we can trigger allocation for app2 ContainerId containerId = app1.getLiveContainers().iterator().next().getContainerId(); - scheduler.allocate(app1.getApplicationAttemptId(), new ArrayList<>(), + scheduler.allocate(app1.getApplicationAttemptId(), new ArrayList<>(), null, Arrays.asList(containerId), null, null, NULL_UPDATE_REQUESTS); // Trigger allocation for app2 @@ -2769,7 +2769,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { asks.add(createResourceRequest(1024, node3.getRackName(), 1, 1, true)); asks.add(createResourceRequest(1024, ResourceRequest.ANY, 1, 2, true)); - scheduler.allocate(attemptId, asks, new ArrayList<ContainerId>(), null, + scheduler.allocate(attemptId, asks, null, new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS); // node 1 checks in @@ -3216,7 +3216,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { createResourceRequest(1024, node1.getHostName(), 1, 0, true), createResourceRequest(1024, "rack1", 1, 0, true), createResourceRequest(1024, ResourceRequest.ANY, 1, 1, true)); - scheduler.allocate(attId1, update, new ArrayList<ContainerId>(), + scheduler.allocate(attId1, update, null, new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS); // then node2 should get the container @@ -4432,7 +4432,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { createResourceRequest(1024, 8, ResourceRequest.ANY, 1, 1, true); ask1.add(request1); - scheduler.allocate(id11, ask1, new ArrayList<ContainerId>(), null, + scheduler.allocate(id11, ask1, null, new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS); String hostName = "127.0.0.1"; @@ -4508,11 +4508,11 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Verify the blacklist can be updated independent of requesting containers scheduler.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(), - Collections.<ContainerId>emptyList(), + null, Collections.<ContainerId>emptyList(), Collections.singletonList(host), null, NULL_UPDATE_REQUESTS); assertTrue(app.isPlaceBlacklisted(host)); scheduler.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(), - Collections.<ContainerId>emptyList(), null, + null, Collections.<ContainerId>emptyList(), null, Collections.singletonList(host), NULL_UPDATE_REQUESTS); assertFalse(scheduler.getSchedulerApp(appAttemptId) .isPlaceBlacklisted(host)); @@ -4521,8 +4521,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { createResourceRequest(GB, node.getHostName(), 1, 0, true)); // Verify a container does not actually get placed on the blacklisted host - scheduler.allocate(appAttemptId, update, - Collections.<ContainerId>emptyList(), + scheduler.allocate(appAttemptId, update, null, Collections.<ContainerId>emptyList(), Collections.singletonList(host), null, NULL_UPDATE_REQUESTS); assertTrue(app.isPlaceBlacklisted(host)); scheduler.update(); @@ -4531,8 +4530,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { .getLiveContainers().size()); // Verify a container gets placed on the empty blacklist - scheduler.allocate(appAttemptId, update, - Collections.<ContainerId>emptyList(), null, + scheduler.allocate(appAttemptId, update, null, Collections.<ContainerId>emptyList(), null, Collections.singletonList(host), NULL_UPDATE_REQUESTS); assertFalse(app.isPlaceBlacklisted(host)); createSchedulingRequest(GB, "root.default", "user", 1); @@ -5391,8 +5389,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { ask1.add(request3); // Perform allocation - scheduler.allocate(appAttemptId, ask1, new ArrayList<ContainerId>(), null, - null, NULL_UPDATE_REQUESTS); + scheduler.allocate(appAttemptId, ask1, null, new ArrayList<ContainerId>(), + null, null, NULL_UPDATE_REQUESTS); scheduler.update(); scheduler.handle(new NodeUpdateSchedulerEvent(node)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index db749ac..8814c0e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -281,7 +281,7 @@ public class TestFifoScheduler { ask.add(nodeLocal); ask.add(rackLocal); ask.add(any); - scheduler.allocate(appAttemptId, ask, new ArrayList<ContainerId>(), + scheduler.allocate(appAttemptId, ask, null, new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS); NodeUpdateSchedulerEvent node0Update = new NodeUpdateSchedulerEvent(node0); @@ -378,7 +378,7 @@ public class TestFifoScheduler { ask.add(nodeLocal); ask.add(rackLocal); ask.add(any); - scheduler.allocate(appAttemptId, ask, new ArrayList<ContainerId>(), + scheduler.allocate(appAttemptId, ask, null, new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS); // Before the node update event, there are one local request @@ -954,7 +954,7 @@ public class TestFifoScheduler { ask1.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1, RMNodeLabelsManager.NO_LABEL)); - fs.allocate(appAttemptId1, ask1, emptyId, + fs.allocate(appAttemptId1, ask1, null, emptyId, Collections.singletonList(host_1_0), null, NULL_UPDATE_REQUESTS); // Trigger container assignment @@ -963,7 +963,7 @@ public class TestFifoScheduler { // Get the allocation for the application and verify no allocation on // blacklist node Allocation allocation1 = - fs.allocate(appAttemptId1, emptyAsk, emptyId, + fs.allocate(appAttemptId1, emptyAsk, null, emptyId, null, null, NULL_UPDATE_REQUESTS); Assert.assertEquals("allocation1", 0, allocation1.getContainers().size()); @@ -971,7 +971,7 @@ public class TestFifoScheduler { // verify host_1_1 can get allocated as not in blacklist fs.handle(new NodeUpdateSchedulerEvent(n4)); Allocation allocation2 = - fs.allocate(appAttemptId1, emptyAsk, emptyId, + fs.allocate(appAttemptId1, emptyAsk, null, emptyId, null, null, NULL_UPDATE_REQUESTS); Assert.assertEquals("allocation2", 1, allocation2.getContainers().size()); List<Container> containerList = allocation2.getContainers(); @@ -986,33 +986,33 @@ public class TestFifoScheduler { // be assigned ask2.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1)); - fs.allocate(appAttemptId1, ask2, emptyId, + fs.allocate(appAttemptId1, ask2, null, emptyId, Collections.singletonList("rack0"), null, NULL_UPDATE_REQUESTS); // verify n1 is not qualified to be allocated fs.handle(new NodeUpdateSchedulerEvent(n1)); Allocation allocation3 = - fs.allocate(appAttemptId1, emptyAsk, emptyId, + fs.allocate(appAttemptId1, emptyAsk, null, emptyId, null, null, NULL_UPDATE_REQUESTS); Assert.assertEquals("allocation3", 0, allocation3.getContainers().size()); // verify n2 is not qualified to be allocated fs.handle(new NodeUpdateSchedulerEvent(n2)); Allocation allocation4 = - fs.allocate(appAttemptId1, emptyAsk, emptyId, + fs.allocate(appAttemptId1, emptyAsk, null, emptyId, null, null, NULL_UPDATE_REQUESTS); Assert.assertEquals("allocation4", 0, allocation4.getContainers().size()); // verify n3 is not qualified to be allocated fs.handle(new NodeUpdateSchedulerEvent(n3)); Allocation allocation5 = - fs.allocate(appAttemptId1, emptyAsk, emptyId, + fs.allocate(appAttemptId1, emptyAsk, null, emptyId, null, null, NULL_UPDATE_REQUESTS); Assert.assertEquals("allocation5", 0, allocation5.getContainers().size()); fs.handle(new NodeUpdateSchedulerEvent(n4)); Allocation allocation6 = - fs.allocate(appAttemptId1, emptyAsk, emptyId, + fs.allocate(appAttemptId1, emptyAsk, null, emptyId, null, null, NULL_UPDATE_REQUESTS); Assert.assertEquals("allocation6", 1, allocation6.getContainers().size()); @@ -1072,14 +1072,14 @@ public class TestFifoScheduler { List<ResourceRequest> ask1 = new ArrayList<ResourceRequest>(); ask1.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1)); - fs.allocate(appAttemptId1, ask1, emptyId, + fs.allocate(appAttemptId1, ask1, null, emptyId, null, null, NULL_UPDATE_REQUESTS); // Ask for a 2 GB container for app 2 List<ResourceRequest> ask2 = new ArrayList<ResourceRequest>(); ask2.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), ResourceRequest.ANY, BuilderUtils.newResource(2 * GB, 1), 1)); - fs.allocate(appAttemptId2, ask2, emptyId, + fs.allocate(appAttemptId2, ask2, null, emptyId, null, null, NULL_UPDATE_REQUESTS); // Trigger container assignment @@ -1087,13 +1087,13 @@ public class TestFifoScheduler { // Get the allocation for the applications and verify headroom Allocation allocation1 = - fs.allocate(appAttemptId1, emptyAsk, emptyId, + fs.allocate(appAttemptId1, emptyAsk, null, emptyId, null, null, NULL_UPDATE_REQUESTS); Assert.assertEquals("Allocation headroom", 1 * GB, allocation1 .getResourceLimit().getMemorySize()); Allocation allocation2 = - fs.allocate(appAttemptId2, emptyAsk, emptyId, + fs.allocate(appAttemptId2, emptyAsk, null, emptyId, null, null, NULL_UPDATE_REQUESTS); Assert.assertEquals("Allocation headroom", 1 * GB, allocation2 .getResourceLimit().getMemorySize()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java new file mode 100644 index 0000000..479d2c1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java @@ -0,0 +1,403 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceSizing; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.api.resource.PlacementConstraints; +import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.function.LongBinaryOperator; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Test behaviors of single constraint app placement allocator. + */ +public class TestSingleConstraintAppPlacementAllocator { + private AppSchedulingInfo appSchedulingInfo; + private AllocationTagsManager spyAllocationTagsManager; + private RMContext rmContext; + private SchedulerRequestKey schedulerRequestKey; + private SingleConstraintAppPlacementAllocator allocator; + + @Before + public void setup() throws Exception { + // stub app scheduling info. + appSchedulingInfo = mock(AppSchedulingInfo.class); + when(appSchedulingInfo.getApplicationId()).thenReturn( + TestUtils.getMockApplicationId(1)); + when(appSchedulingInfo.getApplicationAttemptId()).thenReturn( + TestUtils.getMockApplicationAttemptId(1, 1)); + + // stub RMContext + rmContext = TestUtils.getMockRMContext(); + + // Create allocation tags manager + AllocationTagsManager allocationTagsManager = new AllocationTagsManager( + rmContext); + spyAllocationTagsManager = spy(allocationTagsManager); + schedulerRequestKey = new SchedulerRequestKey(Priority.newInstance(1), 2L, + TestUtils.getMockContainerId(1, 1)); + rmContext.setAllocationTagsManager(spyAllocationTagsManager); + + // Create allocator + allocator = new SingleConstraintAppPlacementAllocator(); + allocator.initialize(appSchedulingInfo, schedulerRequestKey, rmContext); + } + + private void assertValidSchedulingRequest( + SchedulingRequest schedulingRequest) { + // Create allocator to avoid fields polluted by previous runs + allocator = new SingleConstraintAppPlacementAllocator(); + allocator.initialize(appSchedulingInfo, schedulerRequestKey, rmContext); + allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest, false); + } + + private void assertInvalidSchedulingRequest( + SchedulingRequest schedulingRequest, boolean recreateAllocator) { + try { + // Create allocator + if (recreateAllocator) { + allocator = new SingleConstraintAppPlacementAllocator(); + allocator.initialize(appSchedulingInfo, schedulerRequestKey, rmContext); + } + allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest, false); + } catch (SchedulerInvalidResoureRequestException e) { + // Expected + return; + } + Assert.fail( + "Expect failure for schedulingRequest=" + schedulingRequest.toString()); + } + + @Test + public void testSchedulingRequestValidation() { + // Valid + assertValidSchedulingRequest(SchedulingRequest.newBuilder().executionType( + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED)) + .allocationRequestId(10L).priority(Priority.newInstance(1)) + .placementConstraintExpression(PlacementConstraints + .targetCardinality(PlacementConstraints.NODE, 0, 1, + PlacementConstraints.PlacementTargets + .allocationTagToIntraApp("mapper", "reducer"), + PlacementConstraints.PlacementTargets.nodePartition("")) + .build()).resourceSizing( + ResourceSizing.newInstance(1, Resource.newInstance(1024, 1))) + .build()); + Assert.assertEquals(ImmutableSet.of("mapper", "reducer"), + allocator.getTargetAllocationTags()); + Assert.assertEquals("", allocator.getTargetNodePartition()); + + // Valid (with partition) + assertValidSchedulingRequest(SchedulingRequest.newBuilder().executionType( + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED)) + .allocationRequestId(10L).priority(Priority.newInstance(1)) + .placementConstraintExpression(PlacementConstraints + .targetCardinality(PlacementConstraints.NODE, 0, 1, + PlacementConstraints.PlacementTargets + .allocationTagToIntraApp("mapper", "reducer"), + PlacementConstraints.PlacementTargets.nodePartition("x")) + .build()).resourceSizing( + ResourceSizing.newInstance(1, Resource.newInstance(1024, 1))) + .build()); + Assert.assertEquals(ImmutableSet.of("mapper", "reducer"), + allocator.getTargetAllocationTags()); + Assert.assertEquals("x", allocator.getTargetNodePartition()); + + // Valid (without specifying node partition) + assertValidSchedulingRequest(SchedulingRequest.newBuilder().executionType( + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED)) + .allocationRequestId(10L).priority(Priority.newInstance(1)) + .placementConstraintExpression(PlacementConstraints + .targetCardinality(PlacementConstraints.NODE, 0, 1, + PlacementConstraints.PlacementTargets + .allocationTagToIntraApp("mapper", "reducer")).build()) + .resourceSizing( + ResourceSizing.newInstance(1, Resource.newInstance(1024, 1))) + .build()); + Assert.assertEquals(ImmutableSet.of("mapper", "reducer"), + allocator.getTargetAllocationTags()); + Assert.assertEquals("", allocator.getTargetNodePartition()); + + // Valid (with application Id target) + assertValidSchedulingRequest(SchedulingRequest.newBuilder().executionType( + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED)) + .allocationRequestId(10L).priority(Priority.newInstance(1)) + .placementConstraintExpression(PlacementConstraints + .targetCardinality(PlacementConstraints.NODE, 0, 1, + PlacementConstraints.PlacementTargets + .allocationTagToIntraApp("mapper", "reducer")).build()) + .resourceSizing( + ResourceSizing.newInstance(1, Resource.newInstance(1024, 1))) + .build()); + // Allocation tags should not include application Id + Assert.assertEquals(ImmutableSet.of("mapper", "reducer"), + allocator.getTargetAllocationTags()); + Assert.assertEquals("", allocator.getTargetNodePartition()); + + // Invalid (without sizing) + assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType( + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED)) + .allocationRequestId(10L).priority(Priority.newInstance(1)) + .placementConstraintExpression(PlacementConstraints + .targetCardinality(PlacementConstraints.NODE, 0, 1, + PlacementConstraints.PlacementTargets + .allocationTagToIntraApp("mapper", "reducer")).build()) + .build(), true); + + // Invalid (without target tags) + assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType( + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED)) + .allocationRequestId(10L).priority(Priority.newInstance(1)) + .placementConstraintExpression(PlacementConstraints + .targetCardinality(PlacementConstraints.NODE, 0, 1).build()) + .build(), true); + + // Invalid (with multiple allocation tags expression specified) + assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType( + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED)) + .allocationRequestId(10L).priority(Priority.newInstance(1)) + .placementConstraintExpression(PlacementConstraints + .targetCardinality(PlacementConstraints.NODE, 0, 1, + PlacementConstraints.PlacementTargets + .allocationTagToIntraApp("mapper"), + PlacementConstraints.PlacementTargets + .allocationTagToIntraApp("reducer"), + PlacementConstraints.PlacementTargets.nodePartition("")) + .build()).resourceSizing( + ResourceSizing.newInstance(1, Resource.newInstance(1024, 1))) + .build(), true); + + // Invalid (with multiple node partition target expression specified) + assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType( + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED)) + .allocationRequestId(10L).priority(Priority.newInstance(1)) + .placementConstraintExpression(PlacementConstraints + .targetCardinality(PlacementConstraints.NODE, 0, 1, + PlacementConstraints.PlacementTargets + .allocationTagToIntraApp("mapper"), + PlacementConstraints.PlacementTargets + .allocationTagToIntraApp(""), + PlacementConstraints.PlacementTargets.nodePartition("x")) + .build()).resourceSizing( + ResourceSizing.newInstance(1, Resource.newInstance(1024, 1))) + .build(), true); + + // Invalid (not anti-affinity cardinality) + assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType( + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED)) + .allocationRequestId(10L).priority(Priority.newInstance(1)) + .placementConstraintExpression(PlacementConstraints + .targetCardinality(PlacementConstraints.NODE, 1, 2, + PlacementConstraints.PlacementTargets + .allocationTagToIntraApp("mapper"), + PlacementConstraints.PlacementTargets.nodePartition("")) + .build()).resourceSizing( + ResourceSizing.newInstance(1, Resource.newInstance(1024, 1))) + .build(), true); + + // Invalid (not anti-affinity cardinality) + assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType( + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED)) + .allocationRequestId(10L).priority(Priority.newInstance(1)) + .placementConstraintExpression(PlacementConstraints + .targetCardinality(PlacementConstraints.NODE, 0, 2, + PlacementConstraints.PlacementTargets + .allocationTagToIntraApp("mapper"), + PlacementConstraints.PlacementTargets.nodePartition("")) + .build()).resourceSizing( + ResourceSizing.newInstance(1, Resource.newInstance(1024, 1))) + .build(), true); + + // Invalid (not NODE scope) + assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType( + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED)) + .allocationRequestId(10L).priority(Priority.newInstance(1)) + .placementConstraintExpression(PlacementConstraints + .targetCardinality(PlacementConstraints.RACK, 0, 1, + PlacementConstraints.PlacementTargets + .allocationTagToIntraApp("mapper", "reducer"), + PlacementConstraints.PlacementTargets.nodePartition("")) + .build()).resourceSizing( + ResourceSizing.newInstance(1, Resource.newInstance(1024, 1))) + .build(), true); + + // Invalid (not GUARANTEED) + assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType( + ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)) + .allocationRequestId(10L).priority(Priority.newInstance(1)) + .placementConstraintExpression(PlacementConstraints + .targetCardinality(PlacementConstraints.NODE, 0, 1, + PlacementConstraints.PlacementTargets + .allocationTagToIntraApp("mapper", "reducer"), + PlacementConstraints.PlacementTargets.nodePartition("")) + .build()).resourceSizing( + ResourceSizing.newInstance(1, Resource.newInstance(1024, 1))) + .build(), true); + } + + @Test + public void testSchedulingRequestUpdate() { + SchedulingRequest schedulingRequest = + SchedulingRequest.newBuilder().executionType( + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED)) + .allocationRequestId(10L).priority(Priority.newInstance(1)) + .placementConstraintExpression(PlacementConstraints + .targetCardinality(PlacementConstraints.NODE, 0, 1, + PlacementConstraints.PlacementTargets + .allocationTagToIntraApp("mapper", "reducer"), + PlacementConstraints.PlacementTargets.nodePartition("")) + .build()).resourceSizing( + ResourceSizing.newInstance(1, Resource.newInstance(1024, 1))) + .build(); + allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest, false); + + // Update allocator with exactly same scheduling request, should succeeded. + allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest, false); + + // Update allocator with scheduling request different at #allocations, + // should succeeded. + schedulingRequest.getResourceSizing().setNumAllocations(10); + allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest, false); + + // Update allocator with scheduling request different at resource, + // should failed. + schedulingRequest.getResourceSizing().setResources( + Resource.newInstance(2048, 1)); + assertInvalidSchedulingRequest(schedulingRequest, false); + + // Update allocator with a different placement target (allocator tag), + // should failed + schedulingRequest = SchedulingRequest.newBuilder().executionType( + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED)) + .allocationRequestId(10L).priority(Priority.newInstance(1)) + .placementConstraintExpression(PlacementConstraints + .targetCardinality(PlacementConstraints.NODE, 0, 1, + PlacementConstraints.PlacementTargets + .allocationTagToIntraApp("mapper"), + PlacementConstraints.PlacementTargets.nodePartition("")) + .build()).resourceSizing( + ResourceSizing.newInstance(1, Resource.newInstance(1024, 1))) + .build(); + assertInvalidSchedulingRequest(schedulingRequest, false); + + // Update allocator with recover == true + int existingNumAllocations = + allocator.getSchedulingRequest().getResourceSizing() + .getNumAllocations(); + schedulingRequest = SchedulingRequest.newBuilder().executionType( + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED)) + .allocationRequestId(10L).priority(Priority.newInstance(1)) + .placementConstraintExpression(PlacementConstraints + .targetCardinality(PlacementConstraints.NODE, 0, 1, + PlacementConstraints.PlacementTargets + .allocationTagToIntraApp("mapper", "reducer"), + PlacementConstraints.PlacementTargets.nodePartition("")) + .build()).resourceSizing( + ResourceSizing.newInstance(1, Resource.newInstance(1024, 1))) + .build(); + allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest, true); + Assert.assertEquals(existingNumAllocations + 1, + allocator.getSchedulingRequest().getResourceSizing() + .getNumAllocations()); + } + + @Test + public void testFunctionality() throws InvalidAllocationTagsQueryException { + SchedulingRequest schedulingRequest = + SchedulingRequest.newBuilder().executionType( + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED)) + .allocationRequestId(10L).priority(Priority.newInstance(1)) + .placementConstraintExpression(PlacementConstraints + .targetCardinality(PlacementConstraints.NODE, 0, 1, + PlacementConstraints.PlacementTargets + .allocationTagToIntraApp("mapper", "reducer"), + PlacementConstraints.PlacementTargets.nodePartition("")) + .build()).resourceSizing( + ResourceSizing.newInstance(1, Resource.newInstance(1024, 1))) + .build(); + allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest, false); + allocator.canAllocate(NodeType.NODE_LOCAL, + TestUtils.getMockNode("host1", "/rack1", 123, 1024)); + verify(spyAllocationTagsManager, Mockito.times(1)).getNodeCardinalityByOp( + eq(NodeId.fromString("host1:123")), eq(TestUtils.getMockApplicationId(1)), + eq(ImmutableSet.of("mapper", "reducer")), + any(LongBinaryOperator.class)); + + allocator = new SingleConstraintAppPlacementAllocator(); + allocator.initialize(appSchedulingInfo, schedulerRequestKey, rmContext); + // Valid (with partition) + schedulingRequest = SchedulingRequest.newBuilder().executionType( + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED)) + .allocationRequestId(10L).priority(Priority.newInstance(1)) + .placementConstraintExpression(PlacementConstraints + .targetCardinality(PlacementConstraints.NODE, 0, 1, + PlacementConstraints.PlacementTargets + .allocationTagToIntraApp("mapper", "reducer"), + PlacementConstraints.PlacementTargets.nodePartition("x")) + .build()).resourceSizing( + ResourceSizing.newInstance(1, Resource.newInstance(1024, 1))) + .build(); + allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest, false); + allocator.canAllocate(NodeType.NODE_LOCAL, + TestUtils.getMockNode("host1", "/rack1", 123, 1024)); + verify(spyAllocationTagsManager, Mockito.atLeast(1)).getNodeCardinalityByOp( + eq(NodeId.fromString("host1:123")), + eq(TestUtils.getMockApplicationId(1)), eq(ImmutableSet + .of("mapper", "reducer")), any(LongBinaryOperator.class)); + + SchedulerNode node1 = mock(SchedulerNode.class); + when(node1.getPartition()).thenReturn("x"); + when(node1.getNodeID()).thenReturn(NodeId.fromString("host1:123")); + + Assert.assertTrue(allocator + .precheckNode(node1, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)); + + SchedulerNode node2 = mock(SchedulerNode.class); + when(node1.getPartition()).thenReturn(""); + when(node1.getNodeID()).thenReturn(NodeId.fromString("host2:123")); + Assert.assertFalse(allocator + .precheckNode(node2, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org