YARN-8202. DefaultAMSProcessor should properly check units of requested custom resource types against minimum/maximum allocation (snemeth via rkanter)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c8fa7cb6 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c8fa7cb6 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c8fa7cb6 Branch: refs/heads/HDDS-4 Commit: c8fa7cb6d0c12c4e65e53ea60167b856511b8294 Parents: 5c1c344 Author: Robert Kanter <rkan...@apache.org> Authored: Thu May 10 09:31:59 2018 -0700 Committer: Xiaoyu Yao <x...@apache.org> Committed: Mon May 14 10:31:08 2018 -0700 ---------------------------------------------------------------------- .../v2/app/rm/ContainerRequestCreator.java | 57 ++ .../v2/app/rm/TestRMContainerAllocator.java | 534 ++++++++++--------- .../hadoop/yarn/util/UnitsConversionUtil.java | 44 +- .../resourcetypes/ResourceTypesTestHelper.java | 93 ++++ .../hadoop/yarn/server/utils/BuilderUtils.java | 8 +- .../scheduler/SchedulerUtils.java | 95 +++- .../TestApplicationMasterService.java | 185 +++++-- .../scheduler/TestSchedulerUtils.java | 278 +++++++++- 8 files changed, 961 insertions(+), 333 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8fa7cb6/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestCreator.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestCreator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestCreator.java new file mode 100644 index 0000000..39a9ddc --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestCreator.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.app.rm; + +import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskType; +import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; +import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.yarn.api.records.Resource; + +final class ContainerRequestCreator { + + private ContainerRequestCreator() {} + + static ContainerRequestEvent createRequest(JobId jobId, int taskAttemptId, + Resource resource, String[] hosts) { + return createRequest(jobId, taskAttemptId, resource, hosts, + false, false); + } + + static ContainerRequestEvent createRequest(JobId jobId, int taskAttemptId, + Resource resource, String[] hosts, boolean earlierFailedAttempt, + boolean reduce) { + final TaskId taskId; + if (reduce) { + taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE); + } else { + taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP); + } + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, + taskAttemptId); + + if (earlierFailedAttempt) { + return ContainerRequestEvent + .createContainerRequestEventForFailedContainer(attemptId, + resource); + } + return new ContainerRequestEvent(attemptId, resource, hosts, + new String[]{NetworkTopology.DEFAULT_RACK}); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8fa7cb6/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index 7875917..427e6ea 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -18,6 +18,7 @@ package org.apache.hadoop.mapreduce.v2.app.rm; +import static org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestCreator.createRequest; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyFloat; @@ -96,7 +97,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.CollectorInfo; @@ -203,7 +203,7 @@ public class TestRMContainerAllocator { JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); when(mockJob.getReport()).thenReturn( - MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, appAttemptId, mockJob); @@ -215,13 +215,13 @@ public class TestRMContainerAllocator { rm.drainEvents(); // create the container request - ContainerRequestEvent event1 = createReq(jobId, 1, 1024, - new String[] { "h1" }); + ContainerRequestEvent event1 = ContainerRequestCreator.createRequest(jobId, + 1, Resource.newInstance(1024, 1), new String[] {"h1"}); allocator.sendRequest(event1); // send 1 more request with different resource req - ContainerRequestEvent event2 = createReq(jobId, 2, 1024, - new String[] { "h2" }); + ContainerRequestEvent event2 = ContainerRequestCreator.createRequest(jobId, + 2, Resource.newInstance(1024, 1), new String[] {"h2"}); allocator.sendRequest(event2); // this tells the scheduler about the requests @@ -232,8 +232,8 @@ public class TestRMContainerAllocator { Assert.assertEquals(4, rm.getMyFifoScheduler().lastAsk.size()); // send another request with different resource and priority - ContainerRequestEvent event3 = createReq(jobId, 3, 1024, - new String[] { "h3" }); + ContainerRequestEvent event3 = ContainerRequestCreator.createRequest(jobId, + 3, Resource.newInstance(1024, 1), new String[] {"h3"}); allocator.sendRequest(event3); // this tells the scheduler about the requests @@ -242,7 +242,7 @@ public class TestRMContainerAllocator { rm.drainEvents(); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); Assert.assertEquals(3, rm.getMyFifoScheduler().lastAsk.size()); - + // update resources in scheduler nodeManager1.nodeHeartbeat(true); // Node heartbeat nodeManager2.nodeHeartbeat(true); // Node heartbeat @@ -252,21 +252,21 @@ public class TestRMContainerAllocator { assigned = allocator.schedule(); rm.drainEvents(); Assert.assertEquals(0, rm.getMyFifoScheduler().lastAsk.size()); - checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 }, + checkAssignments(new ContainerRequestEvent[] {event1, event2, event3}, assigned, false); - + // check that the assigned container requests are cancelled allocator.schedule(); rm.drainEvents(); Assert.assertEquals(5, rm.getMyFifoScheduler().lastAsk.size()); } - - @Test + + @Test public void testMapNodeLocality() throws Exception { - // test checks that ordering of allocated containers list from the RM does - // not affect the map->container assignment done by the AM. If there is a - // node local container available for a map then it should be assigned to - // that container and not a rack-local container that happened to be seen + // test checks that ordering of allocated containers list from the RM does + // not affect the map->container assignment done by the AM. If there is a + // node local container available for a map then it should be assigned to + // that container and not a rack-local container that happened to be seen // earlier in the allocated containers list from the RM. // Regression test for MAPREDUCE-4893 LOG.info("Running testMapNodeLocality"); @@ -291,26 +291,29 @@ public class TestRMContainerAllocator { JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); when(mockJob.getReport()).thenReturn( - MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, appAttemptId, mockJob); // add resources to scheduler - MockNM nodeManager1 = rm.registerNode("h1:1234", 3072); // can assign 2 maps + MockNM nodeManager1 = rm.registerNode("h1:1234", 3072); // can assign 2 maps rm.registerNode("h2:1234", 10240); // wont heartbeat on node local node MockNM nodeManager3 = rm.registerNode("h3:1234", 1536); // assign 1 map rm.drainEvents(); // create the container requests for maps - ContainerRequestEvent event1 = createReq(jobId, 1, 1024, - new String[] { "h1" }); + ContainerRequestEvent event1 = ContainerRequestCreator.createRequest( + jobId, 1, Resource.newInstance(1024, 1), + new String[]{"h1"}); allocator.sendRequest(event1); - ContainerRequestEvent event2 = createReq(jobId, 2, 1024, - new String[] { "h1" }); + ContainerRequestEvent event2 = ContainerRequestCreator.createRequest( + jobId, 2, Resource.newInstance(1024, 1), + new String[]{"h1"}); allocator.sendRequest(event2); - ContainerRequestEvent event3 = createReq(jobId, 3, 1024, - new String[] { "h2" }); + ContainerRequestEvent event3 = ContainerRequestCreator.createRequest( + jobId, 3, Resource.newInstance(1024, 1), + new String[]{"h2"}); allocator.sendRequest(event3); // this tells the scheduler about the requests @@ -323,14 +326,14 @@ public class TestRMContainerAllocator { // Node heartbeat from rack-local first. This makes node h3 the first in the // list of allocated containers but it should not be assigned to task1. nodeManager3.nodeHeartbeat(true); - // Node heartbeat from node-local next. This allocates 2 node local + // Node heartbeat from node-local next. This allocates 2 node local // containers for task1 and task2. These should be matched with those tasks. nodeManager1.nodeHeartbeat(true); rm.drainEvents(); assigned = allocator.schedule(); rm.drainEvents(); - checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 }, + checkAssignments(new ContainerRequestEvent[] {event1, event2, event3}, assigned, false); // remove the rack-local assignment that should have happened for task3 for(TaskAttemptContainerAssignedEvent event : assigned) { @@ -340,7 +343,7 @@ public class TestRMContainerAllocator { break; } } - checkAssignments(new ContainerRequestEvent[] { event1, event2}, + checkAssignments(new ContainerRequestEvent[] {event1, event2}, assigned, true); } @@ -381,13 +384,15 @@ public class TestRMContainerAllocator { rm.drainEvents(); // create the container request - ContainerRequestEvent event1 = createReq(jobId, 1, 1024, - new String[] { "h1" }); + ContainerRequestEvent event1 = ContainerRequestCreator.createRequest( + jobId, 1, Resource.newInstance(1024, 1), + new String[] {"h1"}); allocator.sendRequest(event1); // send 1 more request with different resource req - ContainerRequestEvent event2 = createReq(jobId, 2, 2048, - new String[] { "h2" }); + ContainerRequestEvent event2 = ContainerRequestCreator.createRequest( + jobId, 2, Resource.newInstance(1024, 1), + new String[] {"h2"}); allocator.sendRequest(event2); // this tells the scheduler about the requests @@ -404,7 +409,7 @@ public class TestRMContainerAllocator { assigned = allocator.schedule(); rm.drainEvents(); - checkAssignments(new ContainerRequestEvent[] { event1, event2 }, + checkAssignments(new ContainerRequestEvent[] {event1, event2}, assigned, false); } @@ -439,15 +444,19 @@ public class TestRMContainerAllocator { rm.drainEvents(); // create the container request - final String[] locations = new String[] { host }; - allocator.sendRequest(createReq(jobId, 0, 1024, locations, false, true)); + final String[] locations = new String[] {host}; + allocator.sendRequest(createRequest(jobId, 0, + Resource.newInstance(1024, 1), + locations, false, true)); for (int i = 0; i < 1;) { rm.drainEvents(); i += allocator.schedule().size(); nm.nodeHeartbeat(true); } - allocator.sendRequest(createReq(jobId, 0, 1024, locations, true, false)); + allocator.sendRequest(createRequest(jobId, 0, + Resource.newInstance(1024, 1), + locations, true, false)); while (allocator.getTaskAttemptKillEvents().size() == 0) { rm.drainEvents(); allocator.schedule().size(); @@ -494,9 +503,10 @@ public class TestRMContainerAllocator { RMContainerAllocator.ScheduledRequests scheduledRequests = allocator.getScheduledRequests(); ContainerRequestEvent event1 = - createReq(jobId, 1, 2048, new String[] { "h1" }, false, false); + createRequest(jobId, 1, Resource.newInstance(2048, 1), + new String[] {"h1"}, false, false); scheduledRequests.maps.put(mock(TaskAttemptId.class), - new RMContainerRequestor.ContainerRequest(event1, null,null)); + new RMContainerRequestor.ContainerRequest(event1, null, null)); assignedRequests.reduces.put(mock(TaskAttemptId.class), mock(Container.class)); @@ -547,9 +557,12 @@ public class TestRMContainerAllocator { RMContainerAllocator.ScheduledRequests scheduledRequests = allocator.getScheduledRequests(); ContainerRequestEvent event1 = - createReq(jobId, 1, 2048, new String[] { "h1" }, false, false); + createRequest(jobId, 1, + Resource.newInstance(2048, 1), + new String[] {"h1"}, false, false); scheduledRequests.maps.put(mock(TaskAttemptId.class), - new RMContainerRequestor.ContainerRequest(event1, null, clock.getTime())); + new RMContainerRequestor.ContainerRequest(event1, null, + clock.getTime())); assignedRequests.reduces.put(mock(TaskAttemptId.class), mock(Container.class)); @@ -561,7 +574,7 @@ public class TestRMContainerAllocator { clock.setTime(clock.getTime() + (preemptThreshold) * 1000); allocator.preemptReducesIfNeeded(); Assert.assertEquals("The reducer is not preeempted", 1, - assignedRequests.preemptionWaitingReduces.size()); + assignedRequests.preemptionWaitingReduces.size()); } @Test(timeout = 30000) @@ -608,9 +621,12 @@ public class TestRMContainerAllocator { RMContainerAllocator.ScheduledRequests scheduledRequests = allocator.getScheduledRequests(); ContainerRequestEvent event1 = - createReq(jobId, 1, 2048, new String[] { "h1" }, false, false); + createRequest(jobId, 1, + Resource.newInstance(2048, 1), + new String[] {"h1"}, false, false); scheduledRequests.maps.put(mock(TaskAttemptId.class), - new RMContainerRequestor.ContainerRequest(event1, null, clock.getTime())); + new RMContainerRequestor.ContainerRequest(event1, null, + clock.getTime())); assignedRequests.reduces.put(mock(TaskAttemptId.class), mock(Container.class)); @@ -651,13 +667,17 @@ public class TestRMContainerAllocator { appAttemptId, mockJob, SystemClock.getInstance()); // request to allocate two reduce priority containers - final String[] locations = new String[] { host }; - allocator.sendRequest(createReq(jobId, 0, 1024, locations, false, true)); + final String[] locations = new String[] {host}; + allocator.sendRequest(createRequest(jobId, 0, + Resource.newInstance(1024, 1), + locations, false, true)); allocator.scheduleAllReduces(); allocator.makeRemoteRequest(); nm.nodeHeartbeat(true); rm.drainEvents(); - allocator.sendRequest(createReq(jobId, 1, 1024, locations, false, false)); + allocator.sendRequest(createRequest(jobId, 1, + Resource.newInstance(1024, 1), + locations, false, false)); int assignedContainer; for (assignedContainer = 0; assignedContainer < 1;) { @@ -684,7 +704,7 @@ public class TestRMContainerAllocator { conf.set(MRJobConfig.REDUCE_NODE_LABEL_EXP, "ReduceNodes"); ApplicationId appId = ApplicationId.newInstance(1, 1); ApplicationAttemptId appAttemptId = - ApplicationAttemptId.newInstance(appId, 1); + ApplicationAttemptId.newInstance(appId, 1); JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); when(mockJob.getReport()).thenReturn( @@ -706,13 +726,16 @@ public class TestRMContainerAllocator { // create some map requests ContainerRequestEvent reqMapEvents; - reqMapEvents = createReq(jobId, 0, 1024, new String[] { "map" }); + reqMapEvents = ContainerRequestCreator.createRequest(jobId, 0, + Resource.newInstance(1024, 1), new String[]{"map"}); allocator.sendRequests(Arrays.asList(reqMapEvents)); // create some reduce requests ContainerRequestEvent reqReduceEvents; reqReduceEvents = - createReq(jobId, 0, 2048, new String[] { "reduce" }, false, true); + createRequest(jobId, 0, + Resource.newInstance(2048, 1), + new String[] {"reduce"}, false, true); allocator.sendRequests(Arrays.asList(reqReduceEvents)); allocator.schedule(); // verify all of the host-specific asks were sent plus one for the @@ -883,18 +906,21 @@ public class TestRMContainerAllocator { // create the container request // send MAP request - ContainerRequestEvent event1 = createReq(jobId, 1, 2048, new String[] { - "h1", "h2" }, true, false); + ContainerRequestEvent event1 = createRequest(jobId, 1, + Resource.newInstance(2048, 1), + new String[] {"h1", "h2"}, true, false); allocator.sendRequest(event1); // send REDUCE request - ContainerRequestEvent event2 = createReq(jobId, 2, 3000, - new String[] { "h1" }, false, true); + ContainerRequestEvent event2 = createRequest(jobId, 2, + Resource.newInstance(3000, 1), + new String[] {"h1"}, false, true); allocator.sendRequest(event2); // send MAP request - ContainerRequestEvent event3 = createReq(jobId, 3, 2048, - new String[] { "h3" }, false, false); + ContainerRequestEvent event3 = createRequest(jobId, 3, + Resource.newInstance(2048, 1), + new String[] {"h3"}, false, false); allocator.sendRequest(event3); // this tells the scheduler about the requests @@ -911,7 +937,7 @@ public class TestRMContainerAllocator { assigned = allocator.schedule(); rm.drainEvents(); - checkAssignments(new ContainerRequestEvent[] { event1, event3 }, + checkAssignments(new ContainerRequestEvent[] {event1, event3}, assigned, false); // validate that no container is assigned to h1 as it doesn't have 2048 @@ -921,10 +947,10 @@ public class TestRMContainerAllocator { } } - private static class MyResourceManager extends MockRM { + static class MyResourceManager extends MockRM { private static long fakeClusterTimeStamp = System.currentTimeMillis(); - + public MyResourceManager(Configuration conf) { super(conf); } @@ -955,7 +981,7 @@ public class TestRMContainerAllocator { protected ResourceScheduler createScheduler() { return new MyFifoScheduler(this.getRMContext()); } - + MyFifoScheduler getMyFifoScheduler() { return (MyFifoScheduler) scheduler; } @@ -1221,7 +1247,7 @@ public class TestRMContainerAllocator { Assert.assertEquals(0.95f, job.getProgress(), 0.001f); Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f); } - + @Test public void testUpdatedNodes() throws Exception { Configuration conf = new Configuration(); @@ -1251,11 +1277,13 @@ public class TestRMContainerAllocator { rm.drainEvents(); // create the map container request - ContainerRequestEvent event = createReq(jobId, 1, 1024, - new String[] { "h1" }); + ContainerRequestEvent event = + ContainerRequestCreator.createRequest(jobId, 1, + Resource.newInstance(1024, 1), + new String[] {"h1"}); allocator.sendRequest(event); TaskAttemptId attemptId = event.getAttemptID(); - + TaskAttempt mockTaskAttempt = mock(TaskAttempt.class); when(mockTaskAttempt.getNodeId()).thenReturn(nm1.getNodeId()); Task mockTask = mock(Task.class); @@ -1279,7 +1307,7 @@ public class TestRMContainerAllocator { // no updated nodes reported Assert.assertTrue(allocator.getJobUpdatedNodeEvents().isEmpty()); Assert.assertTrue(allocator.getTaskAttemptKillEvents().isEmpty()); - + // mark nodes bad nm1.nodeHeartbeat(false); nm2.nodeHeartbeat(false); @@ -1292,11 +1320,13 @@ public class TestRMContainerAllocator { // updated nodes are reported Assert.assertEquals(1, allocator.getJobUpdatedNodeEvents().size()); Assert.assertEquals(1, allocator.getTaskAttemptKillEvents().size()); - Assert.assertEquals(2, allocator.getJobUpdatedNodeEvents().get(0).getUpdatedNodes().size()); - Assert.assertEquals(attemptId, allocator.getTaskAttemptKillEvents().get(0).getTaskAttemptID()); + Assert.assertEquals(2, + allocator.getJobUpdatedNodeEvents().get(0).getUpdatedNodes().size()); + Assert.assertEquals(attemptId, + allocator.getTaskAttemptKillEvents().get(0).getTaskAttemptID()); allocator.getJobUpdatedNodeEvents().clear(); allocator.getTaskAttemptKillEvents().clear(); - + assigned = allocator.schedule(); rm.drainEvents(); Assert.assertEquals(0, assigned.size()); @@ -1307,7 +1337,7 @@ public class TestRMContainerAllocator { @Test public void testBlackListedNodes() throws Exception { - + LOG.info("Running testBlackListedNodes"); Configuration conf = new Configuration(); @@ -1315,7 +1345,7 @@ public class TestRMContainerAllocator { conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1); conf.setInt( MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1); - + MyResourceManager rm = new MyResourceManager(conf); rm.start(); @@ -1331,7 +1361,7 @@ public class TestRMContainerAllocator { .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); rm.drainEvents(); - + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); when(mockJob.getReport()).thenReturn( @@ -1347,18 +1377,24 @@ public class TestRMContainerAllocator { rm.drainEvents(); // create the container request - ContainerRequestEvent event1 = createReq(jobId, 1, 1024, - new String[] { "h1" }); + ContainerRequestEvent event1 = + ContainerRequestCreator.createRequest(jobId, 1, + Resource.newInstance(1024, 1), + new String[] {"h1"}); allocator.sendRequest(event1); // send 1 more request with different resource req - ContainerRequestEvent event2 = createReq(jobId, 2, 1024, - new String[] { "h2" }); + ContainerRequestEvent event2 = + ContainerRequestCreator.createRequest(jobId, 2, + Resource.newInstance(1024, 1), + new String[] {"h2"}); allocator.sendRequest(event2); // send another request with different resource and priority - ContainerRequestEvent event3 = createReq(jobId, 3, 1024, - new String[] { "h3" }); + ContainerRequestEvent event3 = + ContainerRequestCreator.createRequest(jobId, 3, + Resource.newInstance(1024, 1), + new String[] {"h3"}); allocator.sendRequest(event3); // this tells the scheduler about the requests @@ -1368,9 +1404,9 @@ public class TestRMContainerAllocator { Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); // Send events to blacklist nodes h1 and h2 - ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false); + ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false); allocator.sendFailure(f1); - ContainerFailedEvent f2 = createFailEvent(jobId, 1, "h2", false); + ContainerFailedEvent f2 = createFailEvent(jobId, 1, "h2", false); allocator.sendFailure(f2); // update resources in scheduler @@ -1392,23 +1428,23 @@ public class TestRMContainerAllocator { assigned = allocator.schedule(); rm.drainEvents(); assertBlacklistAdditionsAndRemovals(0, 0, rm); - Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); + Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); nodeManager3.nodeHeartbeat(true); // Node heartbeat rm.drainEvents(); assigned = allocator.schedule(); rm.drainEvents(); assertBlacklistAdditionsAndRemovals(0, 0, rm); - + Assert.assertTrue("No of assignments must be 3", assigned.size() == 3); - + // validate that all containers are assigned to h3 for (TaskAttemptContainerAssignedEvent assig : assigned) { Assert.assertTrue("Assigned container host not correct", "h3".equals(assig .getContainer().getNodeId().getHost())); } } - + @Test public void testIgnoreBlacklisting() throws Exception { LOG.info("Running testIgnoreBlacklisting"); @@ -1448,7 +1484,7 @@ public class TestRMContainerAllocator { // Known=1, blacklisted=0, ignore should be false - assign first container assigned = - getContainerOnHost(jobId, 1, 1024, new String[] { "h1" }, + getContainerOnHost(jobId, 1, 1024, new String[] {"h1"}, nodeManagers[0], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); @@ -1463,47 +1499,47 @@ public class TestRMContainerAllocator { // Because makeRemoteRequest will not be aware of it until next call // The current call will send blacklisted node "h1" to RM assigned = - getContainerOnHost(jobId, 2, 1024, new String[] { "h1" }, + getContainerOnHost(jobId, 2, 1024, new String[] {"h1"}, nodeManagers[0], allocator, 1, 0, 0, 1, rm); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); // Known=1, blacklisted=1, ignore should be true - assign 1 assigned = - getContainerOnHost(jobId, 2, 1024, new String[] { "h1" }, + getContainerOnHost(jobId, 2, 1024, new String[] {"h1"}, nodeManagers[0], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); nodeManagers[nmNum] = registerNodeManager(nmNum++, rm); // Known=2, blacklisted=1, ignore should be true - assign 1 anyway. assigned = - getContainerOnHost(jobId, 3, 1024, new String[] { "h2" }, + getContainerOnHost(jobId, 3, 1024, new String[] {"h2"}, nodeManagers[1], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); nodeManagers[nmNum] = registerNodeManager(nmNum++, rm); // Known=3, blacklisted=1, ignore should be true - assign 1 anyway. assigned = - getContainerOnHost(jobId, 4, 1024, new String[] { "h3" }, + getContainerOnHost(jobId, 4, 1024, new String[] {"h3"}, nodeManagers[2], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); // Known=3, blacklisted=1, ignore should be true - assign 1 assigned = - getContainerOnHost(jobId, 5, 1024, new String[] { "h1" }, + getContainerOnHost(jobId, 5, 1024, new String[] {"h1"}, nodeManagers[0], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); nodeManagers[nmNum] = registerNodeManager(nmNum++, rm); // Known=4, blacklisted=1, ignore should be false - assign 1 anyway assigned = - getContainerOnHost(jobId, 6, 1024, new String[] { "h4" }, + getContainerOnHost(jobId, 6, 1024, new String[] {"h4"}, nodeManagers[3], allocator, 0, 0, 1, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); // Test blacklisting re-enabled. // Known=4, blacklisted=1, ignore should be false - no assignment on h1 assigned = - getContainerOnHost(jobId, 7, 1024, new String[] { "h1" }, + getContainerOnHost(jobId, 7, 1024, new String[] {"h1"}, nodeManagers[0], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); // RMContainerRequestor would have created a replacement request. @@ -1516,20 +1552,20 @@ public class TestRMContainerAllocator { // Known=4, blacklisted=2, ignore should be true. Should assign 0 // container for the same reason above. assigned = - getContainerOnHost(jobId, 8, 1024, new String[] { "h1" }, + getContainerOnHost(jobId, 8, 1024, new String[] {"h1"}, nodeManagers[0], allocator, 1, 0, 0, 2, rm); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); // Known=4, blacklisted=2, ignore should be true. Should assign 2 // containers. assigned = - getContainerOnHost(jobId, 8, 1024, new String[] { "h1" }, + getContainerOnHost(jobId, 8, 1024, new String[] {"h1"}, nodeManagers[0], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 2", 2, assigned.size()); // Known=4, blacklisted=2, ignore should be true. assigned = - getContainerOnHost(jobId, 9, 1024, new String[] { "h2" }, + getContainerOnHost(jobId, 9, 1024, new String[] {"h2"}, nodeManagers[1], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); @@ -1540,23 +1576,23 @@ public class TestRMContainerAllocator { nodeManagers[nmNum] = registerNodeManager(nmNum++, rm); // Known=5, blacklisted=3, ignore should be true. assigned = - getContainerOnHost(jobId, 10, 1024, new String[] { "h3" }, + getContainerOnHost(jobId, 10, 1024, new String[] {"h3"}, nodeManagers[2], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); - + // Assign on 5 more nodes - to re-enable blacklisting for (int i = 0; i < 5; i++) { nodeManagers[nmNum] = registerNodeManager(nmNum++, rm); assigned = getContainerOnHost(jobId, 11 + i, 1024, - new String[] { String.valueOf(5 + i) }, nodeManagers[4 + i], + new String[] {String.valueOf(5 + i)}, nodeManagers[4 + i], allocator, 0, 0, (i == 4 ? 3 : 0), 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); } // Test h3 (blacklisted while ignoring blacklisting) is blacklisted. assigned = - getContainerOnHost(jobId, 20, 1024, new String[] { "h3" }, + getContainerOnHost(jobId, 20, 1024, new String[] {"h3"}, nodeManagers[2], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); } @@ -1576,7 +1612,8 @@ public class TestRMContainerAllocator { int expectedAdditions2, int expectedRemovals2, MyResourceManager rm) throws Exception { ContainerRequestEvent reqEvent = - createReq(jobId, taskAttemptId, memory, hosts); + ContainerRequestCreator.createRequest(jobId, taskAttemptId, + Resource.newInstance(memory, 1), hosts); allocator.sendRequest(reqEvent); // Send the request to the RM @@ -1596,7 +1633,7 @@ public class TestRMContainerAllocator { expectedAdditions2, expectedRemovals2, rm); return assigned; } - + @Test public void testBlackListedNodesWithSchedulingToThatNode() throws Exception { LOG.info("Running testBlackListedNodesWithSchedulingToThatNode"); @@ -1606,7 +1643,7 @@ public class TestRMContainerAllocator { conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1); conf.setInt( MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1); - + MyResourceManager rm = new MyResourceManager(conf); rm.start(); @@ -1622,7 +1659,7 @@ public class TestRMContainerAllocator { .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); rm.drainEvents(); - + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); when(mockJob.getReport()).thenReturn( @@ -1638,8 +1675,10 @@ public class TestRMContainerAllocator { LOG.info("Requesting 1 Containers _1 on H1"); // create the container request - ContainerRequestEvent event1 = createReq(jobId, 1, 1024, - new String[] { "h1" }); + ContainerRequestEvent event1 = + ContainerRequestCreator.createRequest(jobId, 1, + Resource.newInstance(1024, 1), + new String[] {"h1"}); allocator.sendRequest(event1); LOG.info("RM Heartbeat (to send the container requests)"); @@ -1653,13 +1692,13 @@ public class TestRMContainerAllocator { // update resources in scheduler nodeManager1.nodeHeartbeat(true); // Node heartbeat rm.drainEvents(); - + LOG.info("RM Heartbeat (To process the scheduled containers)"); assigned = allocator.schedule(); rm.drainEvents(); assertBlacklistAdditionsAndRemovals(0, 0, rm); - Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); - + Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); + LOG.info("Failing container _1 on H1 (should blacklist the node)"); // Send events to blacklist nodes h1 and h2 ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false); @@ -1667,8 +1706,9 @@ public class TestRMContainerAllocator { //At this stage, a request should be created for a fast fail map //Create a FAST_FAIL request for a previously failed map. - ContainerRequestEvent event1f = createReq(jobId, 1, 1024, - new String[] { "h1" }, true, false); + ContainerRequestEvent event1f = createRequest(jobId, 1, + Resource.newInstance(1024, 1), + new String[] {"h1"}, true, false); allocator.sendRequest(event1f); //Update the Scheduler with the new requests. @@ -1678,24 +1718,26 @@ public class TestRMContainerAllocator { Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); // send another request with different resource and priority - ContainerRequestEvent event3 = createReq(jobId, 3, 1024, - new String[] { "h1", "h3" }); + ContainerRequestEvent event3 = + ContainerRequestCreator.createRequest(jobId, 3, + Resource.newInstance(1024, 1), + new String[] {"h1", "h3"}); allocator.sendRequest(event3); - + //Allocator is aware of prio:5 container, and prio:20 (h1+h3) container. //RM is only aware of the prio:5 container - + LOG.info("h1 Heartbeat (To actually schedule the containers)"); // update resources in scheduler nodeManager1.nodeHeartbeat(true); // Node heartbeat rm.drainEvents(); - + LOG.info("RM Heartbeat (To process the scheduled containers)"); assigned = allocator.schedule(); rm.drainEvents(); assertBlacklistAdditionsAndRemovals(0, 0, rm); - Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); - + Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); + //RMContainerAllocator gets assigned a p:5 on a blacklisted node. //Send a release for the p:5 container + another request. @@ -1704,26 +1746,26 @@ public class TestRMContainerAllocator { rm.drainEvents(); assertBlacklistAdditionsAndRemovals(0, 0, rm); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); - + //Hearbeat from H3 to schedule on this host. LOG.info("h3 Heartbeat (To re-schedule the containers)"); nodeManager3.nodeHeartbeat(true); // Node heartbeat rm.drainEvents(); - + LOG.info("RM Heartbeat (To process the re-scheduled containers for H3)"); assigned = allocator.schedule(); assertBlacklistAdditionsAndRemovals(0, 0, rm); rm.drainEvents(); - + // For debugging for (TaskAttemptContainerAssignedEvent assig : assigned) { LOG.info(assig.getTaskAttemptID() + " assgined to " + assig.getContainer().getId() + " with priority " + assig.getContainer().getPriority()); } - + Assert.assertEquals("No of assignments must be 2", 2, assigned.size()); - + // validate that all containers are assigned to h3 for (TaskAttemptContainerAssignedEvent assig : assigned) { Assert.assertEquals("Assigned container " + assig.getContainer().getId() @@ -1759,13 +1801,13 @@ public class TestRMContainerAllocator { assert (false); } } - + List<ResourceRequest> lastAsk = null; List<ContainerId> lastRelease = null; List<String> lastBlacklistAdditions; List<String> lastBlacklistRemovals; Resource forceResourceLimit = null; - + // override this to copy the objects otherwise FifoScheduler updates the // numContainers in same objects as kept by RMContainerAllocator @Override @@ -1855,38 +1897,6 @@ public class TestRMContainerAllocator { } } - private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId, - int memory, String[] hosts) { - return createReq(jobId, taskAttemptId, memory, 1, hosts, false, false); - } - - private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId, - int mem, String[] hosts, boolean earlierFailedAttempt, boolean reduce) { - return createReq(jobId, taskAttemptId, mem, - 1, hosts, earlierFailedAttempt, reduce); - } - - private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId, - int memory, int vcore, String[] hosts, boolean earlierFailedAttempt, - boolean reduce) { - TaskId taskId; - if (reduce) { - taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE); - } else { - taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP); - } - TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, - taskAttemptId); - Resource containerNeed = Resource.newInstance(memory, vcore); - if (earlierFailedAttempt) { - return ContainerRequestEvent - .createContainerRequestEventForFailedContainer(attemptId, - containerNeed); - } - return new ContainerRequestEvent(attemptId, containerNeed, hosts, - new String[] { NetworkTopology.DEFAULT_RACK }); - } - private ContainerFailedEvent createFailEvent(JobId jobId, int taskAttemptId, String host, boolean reduce) { TaskId taskId; @@ -1897,9 +1907,9 @@ public class TestRMContainerAllocator { } TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, taskAttemptId); - return new ContainerFailedEvent(attemptId, host); + return new ContainerFailedEvent(attemptId, host); } - + private ContainerAllocatorEvent createDeallocateEvent(JobId jobId, int taskAttemptId, boolean reduce) { TaskId taskId; @@ -1957,14 +1967,14 @@ public class TestRMContainerAllocator { // Mock RMContainerAllocator // Instead of talking to remote Scheduler,uses the local Scheduler - private static class MyContainerAllocator extends RMContainerAllocator { - static final List<TaskAttemptContainerAssignedEvent> events - = new ArrayList<TaskAttemptContainerAssignedEvent>(); - static final List<TaskAttemptKillEvent> taskAttemptKillEvents - = new ArrayList<TaskAttemptKillEvent>(); - static final List<JobUpdatedNodesEvent> jobUpdatedNodeEvents - = new ArrayList<JobUpdatedNodesEvent>(); - static final List<JobEvent> jobEvents = new ArrayList<JobEvent>(); + static class MyContainerAllocator extends RMContainerAllocator { + static final List<TaskAttemptContainerAssignedEvent> events = + new ArrayList<>(); + static final List<TaskAttemptKillEvent> taskAttemptKillEvents = + new ArrayList<>(); + static final List<JobUpdatedNodesEvent> jobUpdatedNodeEvents = + new ArrayList<>(); + static final List<JobEvent> jobEvents = new ArrayList<>(); private MyResourceManager rm; private boolean isUnregistered = false; private AllocateResponse allocateResponse; @@ -2069,7 +2079,7 @@ public class TestRMContainerAllocator { } public void sendRequest(ContainerRequestEvent req) { - sendRequests(Arrays.asList(new ContainerRequestEvent[] { req })); + sendRequests(Arrays.asList(new ContainerRequestEvent[] {req})); } public void sendRequests(List<ContainerRequestEvent> reqs) { @@ -2081,7 +2091,7 @@ public class TestRMContainerAllocator { public void sendFailure(ContainerFailedEvent f) { super.handleEvent(f); } - + public void sendDeallocate(ContainerAllocatorEvent f) { super.handleEvent(f); } @@ -2099,16 +2109,15 @@ public class TestRMContainerAllocator { // run the scheduler super.heartbeat(); - List<TaskAttemptContainerAssignedEvent> result - = new ArrayList<TaskAttemptContainerAssignedEvent>(events); + List<TaskAttemptContainerAssignedEvent> result = new ArrayList<>(events); events.clear(); return result; } - + static List<TaskAttemptKillEvent> getTaskAttemptKillEvents() { return taskAttemptKillEvents; } - + static List<JobUpdatedNodesEvent> getJobUpdatedNodeEvents() { return jobUpdatedNodeEvents; } @@ -2117,12 +2126,12 @@ public class TestRMContainerAllocator { protected void startAllocatorThread() { // override to NOT start thread } - + @Override protected boolean isApplicationMasterRegistered() { return super.isApplicationMasterRegistered(); } - + public boolean isUnregistered() { return isUnregistered; } @@ -2164,7 +2173,7 @@ public class TestRMContainerAllocator { int numPendingReduces = 4; float maxReduceRampupLimit = 0.5f; float reduceSlowStart = 0.2f; - + RMContainerAllocator allocator = mock(RMContainerAllocator.class); doCallRealMethod().when(allocator).scheduleReduces(anyInt(), anyInt(), anyInt(), anyInt(), anyInt(), anyInt(), any(Resource.class), @@ -2174,14 +2183,14 @@ public class TestRMContainerAllocator { // Test slow-start allocator.scheduleReduces( - totalMaps, succeededMaps, - scheduledMaps, scheduledReduces, - assignedMaps, assignedReduces, - mapResourceReqt, reduceResourceReqt, - numPendingReduces, + totalMaps, succeededMaps, + scheduledMaps, scheduledReduces, + assignedMaps, assignedReduces, + mapResourceReqt, reduceResourceReqt, + numPendingReduces, maxReduceRampupLimit, reduceSlowStart); verify(allocator, never()).setIsReduceStarted(true); - + // verify slow-start still in effect when no more maps need to // be scheduled but some have yet to complete allocator.scheduleReduces( @@ -2197,23 +2206,23 @@ public class TestRMContainerAllocator { succeededMaps = 3; doReturn(BuilderUtils.newResource(0, 0)).when(allocator).getResourceLimit(); allocator.scheduleReduces( - totalMaps, succeededMaps, - scheduledMaps, scheduledReduces, - assignedMaps, assignedReduces, - mapResourceReqt, reduceResourceReqt, - numPendingReduces, + totalMaps, succeededMaps, + scheduledMaps, scheduledReduces, + assignedMaps, assignedReduces, + mapResourceReqt, reduceResourceReqt, + numPendingReduces, maxReduceRampupLimit, reduceSlowStart); verify(allocator, times(1)).setIsReduceStarted(true); - + // Test reduce ramp-up doReturn(BuilderUtils.newResource(100 * 1024, 100 * 1)).when(allocator) .getResourceLimit(); allocator.scheduleReduces( - totalMaps, succeededMaps, - scheduledMaps, scheduledReduces, - assignedMaps, assignedReduces, - mapResourceReqt, reduceResourceReqt, - numPendingReduces, + totalMaps, succeededMaps, + scheduledMaps, scheduledReduces, + assignedMaps, assignedReduces, + mapResourceReqt, reduceResourceReqt, + numPendingReduces, maxReduceRampupLimit, reduceSlowStart); verify(allocator).rampUpReduces(anyInt()); verify(allocator, never()).rampDownReduces(anyInt()); @@ -2232,18 +2241,18 @@ public class TestRMContainerAllocator { verify(allocator).rampDownReduces(anyInt()); // Test reduce ramp-down for when there are scheduled maps - // Since we have two scheduled Maps, rampDownReducers + // Since we have two scheduled Maps, rampDownReducers // should be invoked twice. scheduledMaps = 2; assignedReduces = 2; doReturn(BuilderUtils.newResource(10 * 1024, 10 * 1)).when(allocator) .getResourceLimit(); allocator.scheduleReduces( - totalMaps, succeededMaps, - scheduledMaps, scheduledReduces, - assignedMaps, assignedReduces, - mapResourceReqt, reduceResourceReqt, - numPendingReduces, + totalMaps, succeededMaps, + scheduledMaps, scheduledReduces, + assignedMaps, assignedReduces, + mapResourceReqt, reduceResourceReqt, + numPendingReduces, maxReduceRampupLimit, reduceSlowStart); verify(allocator, times(2)).rampDownReduces(anyInt()); @@ -2288,7 +2297,7 @@ public class TestRMContainerAllocator { recalculatedReduceSchedule = true; } } - + @Test public void testCompletedTasksRecalculateSchedule() throws Exception { LOG.info("Running testCompletedTasksRecalculateSchedule"); @@ -2400,31 +2409,33 @@ public class TestRMContainerAllocator { RMContainerAllocator allocator = new RMContainerAllocator( mock(ClientService.class), mock(AppContext.class), new NoopAMPreemptionPolicy()); - + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId( MRBuilderUtils.newTaskId( MRBuilderUtils.newJobId(1, 1, 1), 1, TaskType.MAP), 1); ApplicationId applicationId = ApplicationId.newInstance(1, 1); - ApplicationAttemptId applicationAttemptId = ApplicationAttemptId.newInstance( - applicationId, 1); - ContainerId containerId = ContainerId.newContainerId(applicationAttemptId, 1); + ApplicationAttemptId applicationAttemptId = + ApplicationAttemptId.newInstance(applicationId, 1); + ContainerId containerId = + ContainerId.newContainerId(applicationAttemptId, 1); ContainerStatus status = ContainerStatus.newInstance( containerId, ContainerState.RUNNING, "", 0); ContainerStatus abortedStatus = ContainerStatus.newInstance( containerId, ContainerState.RUNNING, "", ContainerExitStatus.ABORTED); - + TaskAttemptEvent event = allocator.createContainerFinishedEvent(status, attemptId); Assert.assertEquals(TaskAttemptEventType.TA_CONTAINER_COMPLETED, event.getType()); - + TaskAttemptEvent abortedEvent = allocator.createContainerFinishedEvent( abortedStatus, attemptId); Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent.getType()); - - ContainerId containerId2 = ContainerId.newContainerId(applicationAttemptId, 2); + + ContainerId containerId2 = + ContainerId.newContainerId(applicationAttemptId, 2); ContainerStatus status2 = ContainerStatus.newInstance(containerId2, ContainerState.RUNNING, "", 0); @@ -2440,7 +2451,7 @@ public class TestRMContainerAllocator { preemptedStatus, attemptId); Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent2.getType()); } - + @Test public void testUnregistrationOnlyIfRegistered() throws Exception { Configuration conf = new Configuration(); @@ -2483,7 +2494,7 @@ public class TestRMContainerAllocator { mrApp.stop(); Assert.assertTrue(allocator.isUnregistered()); } - + // Step-1 : AM send allocate request for 2 ContainerRequests and 1 // blackListeNode // Step-2 : 2 containers are allocated by RM. @@ -2542,11 +2553,15 @@ public class TestRMContainerAllocator { // create the container request // send MAP request ContainerRequestEvent event1 = - createReq(jobId, 1, 1024, new String[] { "h1" }); + ContainerRequestCreator.createRequest(jobId, 1, + Resource.newInstance(1024, 1), + new String[]{"h1"}); allocator.sendRequest(event1); ContainerRequestEvent event2 = - createReq(jobId, 2, 2048, new String[] { "h1", "h2" }); + ContainerRequestCreator.createRequest(jobId, 2, + Resource.newInstance(2048, 1), + new String[] {"h1", "h2"}); allocator.sendRequest(event2); // Send events to blacklist h2 @@ -2584,7 +2599,9 @@ public class TestRMContainerAllocator { // RM // send container request ContainerRequestEvent event3 = - createReq(jobId, 3, 1000, new String[] { "h1" }); + ContainerRequestCreator.createRequest(jobId, 3, + Resource.newInstance(1000, 1), + new String[]{"h1"}); allocator.sendRequest(event3); // send deallocate request @@ -2628,7 +2645,9 @@ public class TestRMContainerAllocator { allocator.sendFailure(f2); ContainerRequestEvent event4 = - createReq(jobId, 4, 2000, new String[] { "h1", "h2" }); + ContainerRequestCreator.createRequest(jobId, 4, + Resource.newInstance(2000, 1), + new String[]{"h1", "h2"}); allocator.sendRequest(event4); // send allocate request to 2nd RM and get resync command @@ -2639,7 +2658,9 @@ public class TestRMContainerAllocator { // asks,release,blacklistAaddition // and another containerRequest(event5) ContainerRequestEvent event5 = - createReq(jobId, 5, 3000, new String[] { "h1", "h2", "h3" }); + ContainerRequestCreator.createRequest(jobId, 5, + Resource.newInstance(3000, 1), + new String[]{"h1", "h2", "h3"}); allocator.sendRequest(event5); // send all outstanding request again. @@ -2696,9 +2717,10 @@ public class TestRMContainerAllocator { } }; - ContainerRequestEvent mapRequestEvt = createReq(jobId, 0, - (int) (maxContainerSupported.getMemorySize() + 10), - maxContainerSupported.getVirtualCores(), + final int memory = (int) (maxContainerSupported.getMemorySize() + 10); + ContainerRequestEvent mapRequestEvt = createRequest(jobId, 0, + Resource.newInstance(memory, + maxContainerSupported.getVirtualCores()), new String[0], false, false); allocator.sendRequests(Arrays.asList(mapRequestEvt)); allocator.schedule(); @@ -2734,10 +2756,11 @@ public class TestRMContainerAllocator { } }; - ContainerRequestEvent reduceRequestEvt = createReq(jobId, 0, - (int) (maxContainerSupported.getMemorySize() + 10), - maxContainerSupported.getVirtualCores(), - new String[0], false, true); + final int memory = (int) (maxContainerSupported.getMemorySize() + 10); + ContainerRequestEvent reduceRequestEvt = createRequest(jobId, 0, + Resource.newInstance(memory, + maxContainerSupported.getVirtualCores()), + new String[0], false, true); allocator.sendRequests(Arrays.asList(reduceRequestEvt)); // Reducer container requests are added to the pending queue upon request, // schedule all reducers here so that we can observe if reducer requests @@ -2787,8 +2810,9 @@ public class TestRMContainerAllocator { rm1.drainEvents(); Assert.assertEquals("Should Have 1 Job Event", 1, allocator.jobEvents.size()); - JobEvent event = allocator.jobEvents.get(0); - Assert.assertTrue("Should Reboot", event.getType().equals(JobEventType.JOB_AM_REBOOT)); + JobEvent event = allocator.jobEvents.get(0); + Assert.assertTrue("Should Reboot", + event.getType().equals(JobEventType.JOB_AM_REBOOT)); } @Test(timeout=60000) @@ -2920,7 +2944,9 @@ public class TestRMContainerAllocator { // create some map requests ContainerRequestEvent[] reqMapEvents = new ContainerRequestEvent[MAP_COUNT]; for (int i = 0; i < reqMapEvents.length; ++i) { - reqMapEvents[i] = createReq(jobId, i, 1024, new String[] { "h" + i }); + reqMapEvents[i] = ContainerRequestCreator.createRequest(jobId, i, + Resource.newInstance(1024, 1), + new String[] {"h" + i}); } allocator.sendRequests(Arrays.asList(reqMapEvents)); // create some reduce requests @@ -2928,7 +2954,8 @@ public class TestRMContainerAllocator { new ContainerRequestEvent[REDUCE_COUNT]; for (int i = 0; i < reqReduceEvents.length; ++i) { reqReduceEvents[i] = - createReq(jobId, i, 1024, new String[] {}, false, true); + createRequest(jobId, i, Resource.newInstance(1024, 1), + new String[] {}, false, true); } allocator.sendRequests(Arrays.asList(reqReduceEvents)); allocator.schedule(); @@ -2975,14 +3002,17 @@ public class TestRMContainerAllocator { // create some map requests ContainerRequestEvent[] reqMapEvents = new ContainerRequestEvent[MAP_COUNT]; for (int i = 0; i < reqMapEvents.length; ++i) { - reqMapEvents[i] = createReq(jobId, i, 1024, new String[] { "h" + i }); + reqMapEvents[i] = ContainerRequestCreator.createRequest(jobId, i, + Resource.newInstance(1024, 1), new String[] {"h" + i}); } allocator.sendRequests(Arrays.asList(reqMapEvents)); // create some reduce requests - ContainerRequestEvent[] reqReduceEvents = new ContainerRequestEvent[REDUCE_COUNT]; + ContainerRequestEvent[] reqReduceEvents = + new ContainerRequestEvent[REDUCE_COUNT]; for (int i = 0; i < reqReduceEvents.length; ++i) { - reqReduceEvents[i] = createReq(jobId, i, 1024, new String[] {}, - false, true); + reqReduceEvents[i] = + createRequest(jobId, i, Resource.newInstance(1024, 1), + new String[] {}, false, true); } allocator.sendRequests(Arrays.asList(reqReduceEvents)); allocator.schedule(); @@ -3137,13 +3167,19 @@ public class TestRMContainerAllocator { // Request 2 maps and 1 reducer(sone on nodes which are not registered). ContainerRequestEvent event1 = - createReq(jobId, 1, 1024, new String[] { "h1" }); + ContainerRequestCreator.createRequest(jobId, 1, + Resource.newInstance(1024, 1), + new String[]{"h1"}); allocator.sendRequest(event1); ContainerRequestEvent event2 = - createReq(jobId, 2, 1024, new String[] { "h2" }); + ContainerRequestCreator.createRequest(jobId, 2, + Resource.newInstance(1024, 1), + new String[]{"h2"}); allocator.sendRequest(event2); ContainerRequestEvent event3 = - createReq(jobId, 3, 1024, new String[] { "h2" }, false, true); + createRequest(jobId, 3, + Resource.newInstance(1024, 1), + new String[]{"h2"}, false, true); allocator.sendRequest(event3); // This will tell the scheduler about the requests but there will be no @@ -3156,7 +3192,8 @@ public class TestRMContainerAllocator { // Request for another reducer on h3 which has not registered. ContainerRequestEvent event4 = - createReq(jobId, 4, 1024, new String[] { "h3" }, false, true); + createRequest(jobId, 4, Resource.newInstance(1024, 1), + new String[] {"h3"}, false, true); allocator.sendRequest(event4); allocator.schedule(); @@ -3301,13 +3338,18 @@ public class TestRMContainerAllocator { // Request 2 maps and 1 reducer(sone on nodes which are not registered). ContainerRequestEvent event1 = - createReq(jobId, 1, 1024, new String[] { "h1" }); + ContainerRequestCreator.createRequest(jobId, 1, + Resource.newInstance(1024, 1), + new String[]{"h1"}); allocator.sendRequest(event1); ContainerRequestEvent event2 = - createReq(jobId, 2, 1024, new String[] { "h2" }); + ContainerRequestCreator.createRequest(jobId, 2, + Resource.newInstance(1024, 1), + new String[]{"h2"}); allocator.sendRequest(event2); ContainerRequestEvent event3 = - createReq(jobId, 3, 1024, new String[] { "h2" }, false, true); + createRequest(jobId, 3, Resource.newInstance(1024, 1), + new String[]{"h2"}, false, true); allocator.sendRequest(event3); // This will tell the scheduler about the requests but there will be no @@ -3320,7 +3362,8 @@ public class TestRMContainerAllocator { // Request for another reducer on h3 which has not registered. ContainerRequestEvent event4 = - createReq(jobId, 4, 1024, new String[] { "h3" }, false, true); + createRequest(jobId, 4, Resource.newInstance(1024, 1), + new String[]{"h3"}, false, true); allocator.sendRequest(event4); allocator.schedule(); @@ -3433,13 +3476,19 @@ public class TestRMContainerAllocator { // Request 2 maps and 1 reducer(sone on nodes which are not registered). ContainerRequestEvent event1 = - createReq(jobId, 1, 1024, new String[] { "h1" }); + ContainerRequestCreator.createRequest(jobId, 1, + Resource.newInstance(1024, 1), + new String[]{"h1"}); allocator.sendRequest(event1); ContainerRequestEvent event2 = - createReq(jobId, 2, 1024, new String[] { "h2" }); + ContainerRequestCreator.createRequest(jobId, 2, + Resource.newInstance(1024, 1), + new String[]{"h2"}); allocator.sendRequest(event2); ContainerRequestEvent event3 = - createReq(jobId, 3, 1024, new String[] { "h1" }, false, true); + createRequest(jobId, 3, + Resource.newInstance(1024, 1), + new String[]{"h1"}, false, true); allocator.sendRequest(event3); // This will tell the scheduler about the requests but there will be no @@ -3449,7 +3498,8 @@ public class TestRMContainerAllocator { // Request for another reducer on h3 which has not registered. ContainerRequestEvent event4 = - createReq(jobId, 4, 1024, new String[] { "h3" }, false, true); + createRequest(jobId, 4, Resource.newInstance(1024, 1), + new String[] {"h3"}, false, true); allocator.sendRequest(event4); allocator.schedule(); @@ -3486,7 +3536,9 @@ public class TestRMContainerAllocator { // Send request for one more mapper. ContainerRequestEvent event5 = - createReq(jobId, 5, 1024, new String[] { "h1" }); + ContainerRequestCreator.createRequest(jobId, 5, + Resource.newInstance(1024, 1), + new String[]{"h1"}); allocator.sendRequest(event5); rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(2048, 2)); @@ -3528,7 +3580,7 @@ public class TestRMContainerAllocator { return RegisterApplicationMasterResponse.newInstance( Resource.newInstance(512, 1), Resource.newInstance(512000, 1024), - Collections.<ApplicationAccessType,String>emptyMap(), + Collections.emptyMap(), ByteBuffer.wrap("fake_key".getBytes()), Collections.<Container>emptyList(), "default", http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8fa7cb6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java index 7a212e1..1da2fed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java @@ -175,16 +175,8 @@ public class UnitsConversionUtil { */ public static int compare(String unitA, long valueA, String unitB, long valueB) { - if (unitA == null || unitB == null || !KNOWN_UNITS.contains(unitA) - || !KNOWN_UNITS.contains(unitB)) { - throw new IllegalArgumentException("Units cannot be null"); - } - if (!KNOWN_UNITS.contains(unitA)) { - throw new IllegalArgumentException("Unknown unit '" + unitA + "'"); - } - if (!KNOWN_UNITS.contains(unitB)) { - throw new IllegalArgumentException("Unknown unit '" + unitB + "'"); - } + checkUnitArgument(unitA); + checkUnitArgument(unitB); if (unitA.equals(unitB)) { return Long.compare(valueA, valueB); } @@ -218,4 +210,36 @@ public class UnitsConversionUtil { return tmpA.compareTo(tmpB); } } + + private static void checkUnitArgument(String unit) { + if (unit == null) { + throw new IllegalArgumentException("Unit cannot be null"); + } else if (!KNOWN_UNITS.contains(unit)) { + throw new IllegalArgumentException("Unknown unit '" + unit + "'"); + } + } + + /** + * Compare a unit to another unit. + * <br> + * Examples:<br> + * 1. 'm' (milli) is smaller than 'k' (kilo), so compareUnits("m", "k") + * will return -1.<br> + * 2. 'M' (MEGA) is greater than 'k' (kilo), so compareUnits("M", "k") will + * return 1. + * + * @param unitA first unit + * @param unitB second unit + * @return +1, 0 or -1 depending on whether the relationship between units + * is smaller than, + * equal to or lesser than. + */ + public static int compareUnits(String unitA, String unitB) { + checkUnitArgument(unitA); + checkUnitArgument(unitB); + int unitAPos = SORTED_UNITS.indexOf(unitA); + int unitBPos = SORTED_UNITS.indexOf(unitB); + + return Integer.compare(unitAPos, unitBPos); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8fa7cb6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/resourcetypes/ResourceTypesTestHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/resourcetypes/ResourceTypesTestHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/resourcetypes/ResourceTypesTestHelper.java new file mode 100644 index 0000000..98a8a00 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/resourcetypes/ResourceTypesTestHelper.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.resourcetypes; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; + +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Contains helper methods to create Resource and ResourceInformation objects. + * ResourceInformation can be created from a resource name + * and a resource descriptor as well that comprises amount and unit. + */ +public final class ResourceTypesTestHelper { + + private static final Pattern RESOURCE_VALUE_AND_UNIT_PATTERN = + Pattern.compile("(\\d+)([A-za-z]*)"); + + private ResourceTypesTestHelper() {} + + private static final RecordFactory RECORD_FACTORY = RecordFactoryProvider + .getRecordFactory(null); + + private static final class ResourceValueAndUnit { + private final Long value; + private final String unit; + + private ResourceValueAndUnit(Long value, String unit) { + this.value = value; + this.unit = unit; + } + } + + public static Resource newResource(long memory, int vCores, Map<String, + String> customResources) { + Resource resource = RECORD_FACTORY.newRecordInstance(Resource.class); + resource.setMemorySize(memory); + resource.setVirtualCores(vCores); + + for (Map.Entry<String, String> customResource : + customResources.entrySet()) { + String resourceName = customResource.getKey(); + ResourceInformation resourceInformation = + createResourceInformation(resourceName, + customResource.getValue()); + resource.setResourceInformation(resourceName, resourceInformation); + } + return resource; + } + + public static ResourceInformation createResourceInformation(String + resourceName, String descriptor) { + ResourceValueAndUnit resourceValueAndUnit = + getResourceValueAndUnit(descriptor); + return ResourceInformation + .newInstance(resourceName, resourceValueAndUnit.unit, + resourceValueAndUnit.value); + } + + private static ResourceValueAndUnit getResourceValueAndUnit(String val) { + Matcher matcher = RESOURCE_VALUE_AND_UNIT_PATTERN.matcher(val); + if (!matcher.find()) { + throw new RuntimeException("Invalid pattern of resource descriptor: " + + val); + } else if (matcher.groupCount() != 2) { + throw new RuntimeException("Capturing group count in string " + + val + " is not 2!"); + } + long value = Long.parseLong(matcher.group(1)); + + return new ResourceValueAndUnit(value, matcher.group(2)); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8fa7cb6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java index 0de834c..e06b55e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java @@ -183,7 +183,7 @@ public class BuilderUtils { public static NodeId newNodeId(String host, int port) { return NodeId.newInstance(host, port); } - + public static NodeReport newNodeReport(NodeId nodeId, NodeState nodeState, String httpAddress, String rackName, Resource used, Resource capability, int numContainers, String healthReport, long lastHealthReportTime) { @@ -422,7 +422,7 @@ public class BuilderUtils { report.setPriority(priority); return report; } - + public static ApplicationSubmissionContext newApplicationSubmissionContext( ApplicationId applicationId, String applicationName, String queue, Priority priority, ContainerLaunchContext amContainer, @@ -477,6 +477,10 @@ public class BuilderUtils { return resource; } + public static Resource newEmptyResource() { + return recordFactory.newRecordInstance(Resource.class); + } + public static URL newURL(String scheme, String host, int port, String file) { URL url = recordFactory.newRecordInstance(URL.class); url.setScheme(scheme); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8fa7cb6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java index c0d7d86..9b3c20a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java @@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.util.UnitsConversionUtil; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; @@ -283,24 +284,10 @@ public class SchedulerUtils { private static void validateResourceRequest(ResourceRequest resReq, Resource maximumResource, QueueInfo queueInfo, RMContext rmContext) throws InvalidResourceRequestException { - Resource requestedResource = resReq.getCapability(); - for (int i = 0; i < ResourceUtils.getNumberOfKnownResourceTypes(); i++) { - ResourceInformation reqRI = requestedResource.getResourceInformation(i); - ResourceInformation maxRI = maximumResource.getResourceInformation(i); - if (reqRI.getValue() < 0 || reqRI.getValue() > maxRI.getValue()) { - throw new InvalidResourceRequestException( - "Invalid resource request, requested resource type=[" + reqRI - .getName() - + "] < 0 or greater than maximum allowed allocation. Requested " - + "resource=" + requestedResource - + ", maximum allowed allocation=" + maximumResource - + ", please note that maximum allowed allocation is calculated " - + "by scheduler based on maximum resource of registered " - + "NodeManagers, which might be less than configured " - + "maximum allocation=" + ResourceUtils - .getResourceTypesMaximumAllocation()); - } - } + final Resource requestedResource = resReq.getCapability(); + checkResourceRequestAgainstAvailableResource(requestedResource, + maximumResource); + String labelExp = resReq.getNodeLabelExpression(); // we don't allow specify label expression other than resourceName=ANY now if (!ResourceRequest.ANY.equals(resReq.getResourceName()) @@ -338,6 +325,78 @@ public class SchedulerUtils { } } + @Private + @VisibleForTesting + static void checkResourceRequestAgainstAvailableResource(Resource reqResource, + Resource availableResource) throws InvalidResourceRequestException { + for (int i = 0; i < ResourceUtils.getNumberOfKnownResourceTypes(); i++) { + final ResourceInformation requestedRI = + reqResource.getResourceInformation(i); + final String reqResourceName = requestedRI.getName(); + + if (requestedRI.getValue() < 0) { + throwInvalidResourceException(reqResource, availableResource, + reqResourceName); + } + + final ResourceInformation availableRI = + availableResource.getResourceInformation(reqResourceName); + + long requestedResourceValue = requestedRI.getValue(); + long availableResourceValue = availableRI.getValue(); + int unitsRelation = UnitsConversionUtil + .compareUnits(requestedRI.getUnits(), availableRI.getUnits()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Requested resource information: " + requestedRI); + LOG.debug("Available resource information: " + availableRI); + LOG.debug("Relation of units: " + unitsRelation); + } + + // requested resource unit is less than available resource unit + // e.g. requestedUnit: "m", availableUnit: "K") + if (unitsRelation < 0) { + availableResourceValue = + UnitsConversionUtil.convert(availableRI.getUnits(), + requestedRI.getUnits(), availableRI.getValue()); + + // requested resource unit is greater than available resource unit + // e.g. requestedUnit: "G", availableUnit: "M") + } else if (unitsRelation > 0) { + requestedResourceValue = + UnitsConversionUtil.convert(requestedRI.getUnits(), + availableRI.getUnits(), requestedRI.getValue()); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Requested resource value after conversion: " + + requestedResourceValue); + LOG.info("Available resource value after conversion: " + + availableResourceValue); + } + + if (requestedResourceValue > availableResourceValue) { + throwInvalidResourceException(reqResource, availableResource, + reqResourceName); + } + } + } + + private static void throwInvalidResourceException(Resource reqResource, + Resource availableResource, String reqResourceName) + throws InvalidResourceRequestException { + throw new InvalidResourceRequestException( + "Invalid resource request, requested resource type=[" + reqResourceName + + "] < 0 or greater than maximum allowed allocation. Requested " + + "resource=" + reqResource + ", maximum allowed allocation=" + + availableResource + + ", please note that maximum allowed allocation is calculated " + + "by scheduler based on maximum resource of registered " + + "NodeManagers, which might be less than configured " + + "maximum allocation=" + + ResourceUtils.getResourceTypesMaximumAllocation()); + } + private static void checkQueueLabelInLabelManager(String labelExpression, RMContext rmContext) throws InvalidLabelResourceRequestException { // check node label manager contains this label --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org