YARN-8202. DefaultAMSProcessor should properly check units of requested custom 
resource types against minimum/maximum allocation (snemeth via rkanter)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c8fa7cb6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c8fa7cb6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c8fa7cb6

Branch: refs/heads/HDDS-4
Commit: c8fa7cb6d0c12c4e65e53ea60167b856511b8294
Parents: 5c1c344
Author: Robert Kanter <rkan...@apache.org>
Authored: Thu May 10 09:31:59 2018 -0700
Committer: Xiaoyu Yao <x...@apache.org>
Committed: Mon May 14 10:31:08 2018 -0700

----------------------------------------------------------------------
 .../v2/app/rm/ContainerRequestCreator.java      |  57 ++
 .../v2/app/rm/TestRMContainerAllocator.java     | 534 ++++++++++---------
 .../hadoop/yarn/util/UnitsConversionUtil.java   |  44 +-
 .../resourcetypes/ResourceTypesTestHelper.java  |  93 ++++
 .../hadoop/yarn/server/utils/BuilderUtils.java  |   8 +-
 .../scheduler/SchedulerUtils.java               |  95 +++-
 .../TestApplicationMasterService.java           | 185 +++++--
 .../scheduler/TestSchedulerUtils.java           | 278 +++++++++-
 8 files changed, 961 insertions(+), 333 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8fa7cb6/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestCreator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestCreator.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestCreator.java
new file mode 100644
index 0000000..39a9ddc
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestCreator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.rm;
+
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+final class ContainerRequestCreator {
+
+  private ContainerRequestCreator() {}
+
+  static ContainerRequestEvent createRequest(JobId jobId, int taskAttemptId,
+          Resource resource, String[] hosts) {
+    return createRequest(jobId, taskAttemptId, resource, hosts,
+            false, false);
+  }
+
+  static ContainerRequestEvent createRequest(JobId jobId, int taskAttemptId,
+          Resource resource, String[] hosts, boolean earlierFailedAttempt,
+          boolean reduce) {
+    final TaskId taskId;
+    if (reduce) {
+      taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
+    } else {
+      taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
+    }
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId,
+            taskAttemptId);
+
+    if (earlierFailedAttempt) {
+      return ContainerRequestEvent
+              .createContainerRequestEventForFailedContainer(attemptId,
+                      resource);
+    }
+    return new ContainerRequestEvent(attemptId, resource, hosts,
+            new String[]{NetworkTopology.DEFAULT_RACK});
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8fa7cb6/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
index 7875917..427e6ea 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.mapreduce.v2.app.rm;
 
+import static 
org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestCreator.createRequest;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyFloat;
@@ -96,7 +97,6 @@ import 
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
 import 
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
 import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.CollectorInfo;
@@ -203,7 +203,7 @@ public class TestRMContainerAllocator {
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
     when(mockJob.getReport()).thenReturn(
-        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, 
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
             0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
     MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
         appAttemptId, mockJob);
@@ -215,13 +215,13 @@ public class TestRMContainerAllocator {
     rm.drainEvents();
 
     // create the container request
-    ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
-        new String[] { "h1" });
+    ContainerRequestEvent event1 = ContainerRequestCreator.createRequest(jobId,
+        1, Resource.newInstance(1024, 1), new String[] {"h1"});
     allocator.sendRequest(event1);
 
     // send 1 more request with different resource req
-    ContainerRequestEvent event2 = createReq(jobId, 2, 1024,
-        new String[] { "h2" });
+    ContainerRequestEvent event2 = ContainerRequestCreator.createRequest(jobId,
+        2, Resource.newInstance(1024, 1), new String[] {"h2"});
     allocator.sendRequest(event2);
 
     // this tells the scheduler about the requests
@@ -232,8 +232,8 @@ public class TestRMContainerAllocator {
     Assert.assertEquals(4, rm.getMyFifoScheduler().lastAsk.size());
 
     // send another request with different resource and priority
-    ContainerRequestEvent event3 = createReq(jobId, 3, 1024,
-        new String[] { "h3" });
+    ContainerRequestEvent event3 = ContainerRequestCreator.createRequest(jobId,
+        3, Resource.newInstance(1024, 1), new String[] {"h3"});
     allocator.sendRequest(event3);
 
     // this tells the scheduler about the requests
@@ -242,7 +242,7 @@ public class TestRMContainerAllocator {
     rm.drainEvents();
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
     Assert.assertEquals(3, rm.getMyFifoScheduler().lastAsk.size());
-    
+
     // update resources in scheduler
     nodeManager1.nodeHeartbeat(true); // Node heartbeat
     nodeManager2.nodeHeartbeat(true); // Node heartbeat
@@ -252,21 +252,21 @@ public class TestRMContainerAllocator {
     assigned = allocator.schedule();
     rm.drainEvents();
     Assert.assertEquals(0, rm.getMyFifoScheduler().lastAsk.size());
-    checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 },
+    checkAssignments(new ContainerRequestEvent[] {event1, event2, event3},
         assigned, false);
-    
+
     // check that the assigned container requests are cancelled
     allocator.schedule();
     rm.drainEvents();
     Assert.assertEquals(5, rm.getMyFifoScheduler().lastAsk.size());
   }
-  
-  @Test 
+
+  @Test
   public void testMapNodeLocality() throws Exception {
-    // test checks that ordering of allocated containers list from the RM does 
-    // not affect the map->container assignment done by the AM. If there is a 
-    // node local container available for a map then it should be assigned to 
-    // that container and not a rack-local container that happened to be seen 
+    // test checks that ordering of allocated containers list from the RM does
+    // not affect the map->container assignment done by the AM. If there is a
+    // node local container available for a map then it should be assigned to
+    // that container and not a rack-local container that happened to be seen
     // earlier in the allocated containers list from the RM.
     // Regression test for MAPREDUCE-4893
     LOG.info("Running testMapNodeLocality");
@@ -291,26 +291,29 @@ public class TestRMContainerAllocator {
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
     when(mockJob.getReport()).thenReturn(
-        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, 
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
             0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
     MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
         appAttemptId, mockJob);
 
     // add resources to scheduler
-    MockNM nodeManager1 = rm.registerNode("h1:1234", 3072); // can assign 2 
maps 
+    MockNM nodeManager1 = rm.registerNode("h1:1234", 3072); // can assign 2 
maps
     rm.registerNode("h2:1234", 10240); // wont heartbeat on node local node
     MockNM nodeManager3 = rm.registerNode("h3:1234", 1536); // assign 1 map
     rm.drainEvents();
 
     // create the container requests for maps
-    ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
-        new String[] { "h1" });
+    ContainerRequestEvent event1 = ContainerRequestCreator.createRequest(
+            jobId, 1, Resource.newInstance(1024, 1),
+            new String[]{"h1"});
     allocator.sendRequest(event1);
-    ContainerRequestEvent event2 = createReq(jobId, 2, 1024,
-        new String[] { "h1" });
+    ContainerRequestEvent event2 = ContainerRequestCreator.createRequest(
+            jobId, 2, Resource.newInstance(1024, 1),
+            new String[]{"h1"});
     allocator.sendRequest(event2);
-    ContainerRequestEvent event3 = createReq(jobId, 3, 1024,
-        new String[] { "h2" });
+    ContainerRequestEvent event3 = ContainerRequestCreator.createRequest(
+            jobId, 3, Resource.newInstance(1024, 1),
+            new String[]{"h2"});
     allocator.sendRequest(event3);
 
     // this tells the scheduler about the requests
@@ -323,14 +326,14 @@ public class TestRMContainerAllocator {
     // Node heartbeat from rack-local first. This makes node h3 the first in 
the
     // list of allocated containers but it should not be assigned to task1.
     nodeManager3.nodeHeartbeat(true);
-    // Node heartbeat from node-local next. This allocates 2 node local 
+    // Node heartbeat from node-local next. This allocates 2 node local
     // containers for task1 and task2. These should be matched with those 
tasks.
     nodeManager1.nodeHeartbeat(true);
     rm.drainEvents();
 
     assigned = allocator.schedule();
     rm.drainEvents();
-    checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 },
+    checkAssignments(new ContainerRequestEvent[] {event1, event2, event3},
         assigned, false);
     // remove the rack-local assignment that should have happened for task3
     for(TaskAttemptContainerAssignedEvent event : assigned) {
@@ -340,7 +343,7 @@ public class TestRMContainerAllocator {
         break;
       }
     }
-    checkAssignments(new ContainerRequestEvent[] { event1, event2},
+    checkAssignments(new ContainerRequestEvent[] {event1, event2},
         assigned, true);
   }
 
@@ -381,13 +384,15 @@ public class TestRMContainerAllocator {
     rm.drainEvents();
 
     // create the container request
-    ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
-        new String[] { "h1" });
+    ContainerRequestEvent event1 = ContainerRequestCreator.createRequest(
+            jobId, 1, Resource.newInstance(1024, 1),
+        new String[] {"h1"});
     allocator.sendRequest(event1);
 
     // send 1 more request with different resource req
-    ContainerRequestEvent event2 = createReq(jobId, 2, 2048,
-        new String[] { "h2" });
+    ContainerRequestEvent event2 = ContainerRequestCreator.createRequest(
+            jobId, 2, Resource.newInstance(1024, 1),
+        new String[] {"h2"});
     allocator.sendRequest(event2);
 
     // this tells the scheduler about the requests
@@ -404,7 +409,7 @@ public class TestRMContainerAllocator {
 
     assigned = allocator.schedule();
     rm.drainEvents();
-    checkAssignments(new ContainerRequestEvent[] { event1, event2 },
+    checkAssignments(new ContainerRequestEvent[] {event1, event2},
         assigned, false);
   }
 
@@ -439,15 +444,19 @@ public class TestRMContainerAllocator {
     rm.drainEvents();
 
     // create the container request
-    final String[] locations = new String[] { host };
-    allocator.sendRequest(createReq(jobId, 0, 1024, locations, false, true));
+    final String[] locations = new String[] {host};
+    allocator.sendRequest(createRequest(jobId, 0,
+            Resource.newInstance(1024, 1),
+            locations, false, true));
     for (int i = 0; i < 1;) {
       rm.drainEvents();
       i += allocator.schedule().size();
       nm.nodeHeartbeat(true);
     }
 
-    allocator.sendRequest(createReq(jobId, 0, 1024, locations, true, false));
+    allocator.sendRequest(createRequest(jobId, 0,
+            Resource.newInstance(1024, 1),
+            locations, true, false));
     while (allocator.getTaskAttemptKillEvents().size() == 0) {
       rm.drainEvents();
       allocator.schedule().size();
@@ -494,9 +503,10 @@ public class TestRMContainerAllocator {
     RMContainerAllocator.ScheduledRequests scheduledRequests =
         allocator.getScheduledRequests();
     ContainerRequestEvent event1 =
-        createReq(jobId, 1, 2048, new String[] { "h1" }, false, false);
+        createRequest(jobId, 1, Resource.newInstance(2048, 1),
+            new String[] {"h1"}, false, false);
     scheduledRequests.maps.put(mock(TaskAttemptId.class),
-        new RMContainerRequestor.ContainerRequest(event1, null,null));
+        new RMContainerRequestor.ContainerRequest(event1, null, null));
     assignedRequests.reduces.put(mock(TaskAttemptId.class),
         mock(Container.class));
 
@@ -547,9 +557,12 @@ public class TestRMContainerAllocator {
     RMContainerAllocator.ScheduledRequests scheduledRequests =
         allocator.getScheduledRequests();
     ContainerRequestEvent event1 =
-        createReq(jobId, 1, 2048, new String[] { "h1" }, false, false);
+        createRequest(jobId, 1,
+                Resource.newInstance(2048, 1),
+                new String[] {"h1"}, false, false);
     scheduledRequests.maps.put(mock(TaskAttemptId.class),
-        new RMContainerRequestor.ContainerRequest(event1, null, 
clock.getTime()));
+        new RMContainerRequestor.ContainerRequest(event1, null,
+                clock.getTime()));
     assignedRequests.reduces.put(mock(TaskAttemptId.class),
         mock(Container.class));
 
@@ -561,7 +574,7 @@ public class TestRMContainerAllocator {
     clock.setTime(clock.getTime() + (preemptThreshold) * 1000);
     allocator.preemptReducesIfNeeded();
     Assert.assertEquals("The reducer is not preeempted", 1,
-        assignedRequests.preemptionWaitingReduces.size());
+            assignedRequests.preemptionWaitingReduces.size());
   }
 
   @Test(timeout = 30000)
@@ -608,9 +621,12 @@ public class TestRMContainerAllocator {
     RMContainerAllocator.ScheduledRequests scheduledRequests =
         allocator.getScheduledRequests();
     ContainerRequestEvent event1 =
-        createReq(jobId, 1, 2048, new String[] { "h1" }, false, false);
+        createRequest(jobId, 1,
+                Resource.newInstance(2048, 1),
+                new String[] {"h1"}, false, false);
     scheduledRequests.maps.put(mock(TaskAttemptId.class),
-        new RMContainerRequestor.ContainerRequest(event1, null, 
clock.getTime()));
+        new RMContainerRequestor.ContainerRequest(event1, null,
+                clock.getTime()));
     assignedRequests.reduces.put(mock(TaskAttemptId.class),
         mock(Container.class));
 
@@ -651,13 +667,17 @@ public class TestRMContainerAllocator {
         appAttemptId, mockJob, SystemClock.getInstance());
 
     // request to allocate two reduce priority containers
-    final String[] locations = new String[] { host };
-    allocator.sendRequest(createReq(jobId, 0, 1024, locations, false, true));
+    final String[] locations = new String[] {host};
+    allocator.sendRequest(createRequest(jobId, 0,
+            Resource.newInstance(1024, 1),
+            locations, false, true));
     allocator.scheduleAllReduces();
     allocator.makeRemoteRequest();
     nm.nodeHeartbeat(true);
     rm.drainEvents();
-    allocator.sendRequest(createReq(jobId, 1, 1024, locations, false, false));
+    allocator.sendRequest(createRequest(jobId, 1,
+            Resource.newInstance(1024, 1),
+            locations, false, false));
 
     int assignedContainer;
     for (assignedContainer = 0; assignedContainer < 1;) {
@@ -684,7 +704,7 @@ public class TestRMContainerAllocator {
     conf.set(MRJobConfig.REDUCE_NODE_LABEL_EXP, "ReduceNodes");
     ApplicationId appId = ApplicationId.newInstance(1, 1);
     ApplicationAttemptId appAttemptId =
-        ApplicationAttemptId.newInstance(appId, 1);
+            ApplicationAttemptId.newInstance(appId, 1);
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
     when(mockJob.getReport()).thenReturn(
@@ -706,13 +726,16 @@ public class TestRMContainerAllocator {
 
     // create some map requests
     ContainerRequestEvent reqMapEvents;
-    reqMapEvents = createReq(jobId, 0, 1024, new String[] { "map" });
+    reqMapEvents = ContainerRequestCreator.createRequest(jobId, 0,
+            Resource.newInstance(1024, 1), new String[]{"map"});
     allocator.sendRequests(Arrays.asList(reqMapEvents));
 
     // create some reduce requests
     ContainerRequestEvent reqReduceEvents;
     reqReduceEvents =
-        createReq(jobId, 0, 2048, new String[] { "reduce" }, false, true);
+        createRequest(jobId, 0,
+                Resource.newInstance(2048, 1),
+                new String[] {"reduce"}, false, true);
     allocator.sendRequests(Arrays.asList(reqReduceEvents));
     allocator.schedule();
     // verify all of the host-specific asks were sent plus one for the
@@ -883,18 +906,21 @@ public class TestRMContainerAllocator {
 
     // create the container request
     // send MAP request
-    ContainerRequestEvent event1 = createReq(jobId, 1, 2048, new String[] {
-        "h1", "h2" }, true, false);
+    ContainerRequestEvent event1 = createRequest(jobId, 1,
+            Resource.newInstance(2048, 1),
+            new String[] {"h1", "h2"}, true, false);
     allocator.sendRequest(event1);
 
     // send REDUCE request
-    ContainerRequestEvent event2 = createReq(jobId, 2, 3000,
-        new String[] { "h1" }, false, true);
+    ContainerRequestEvent event2 = createRequest(jobId, 2,
+            Resource.newInstance(3000, 1),
+            new String[] {"h1"}, false, true);
     allocator.sendRequest(event2);
 
     // send MAP request
-    ContainerRequestEvent event3 = createReq(jobId, 3, 2048,
-        new String[] { "h3" }, false, false);
+    ContainerRequestEvent event3 = createRequest(jobId, 3,
+            Resource.newInstance(2048, 1),
+            new String[] {"h3"}, false, false);
     allocator.sendRequest(event3);
 
     // this tells the scheduler about the requests
@@ -911,7 +937,7 @@ public class TestRMContainerAllocator {
 
     assigned = allocator.schedule();
     rm.drainEvents();
-    checkAssignments(new ContainerRequestEvent[] { event1, event3 },
+    checkAssignments(new ContainerRequestEvent[] {event1, event3},
         assigned, false);
 
     // validate that no container is assigned to h1 as it doesn't have 2048
@@ -921,10 +947,10 @@ public class TestRMContainerAllocator {
     }
   }
 
-  private static class MyResourceManager extends MockRM {
+  static class MyResourceManager extends MockRM {
 
     private static long fakeClusterTimeStamp = System.currentTimeMillis();
-    
+
     public MyResourceManager(Configuration conf) {
       super(conf);
     }
@@ -955,7 +981,7 @@ public class TestRMContainerAllocator {
     protected ResourceScheduler createScheduler() {
       return new MyFifoScheduler(this.getRMContext());
     }
-    
+
     MyFifoScheduler getMyFifoScheduler() {
       return (MyFifoScheduler) scheduler;
     }
@@ -1221,7 +1247,7 @@ public class TestRMContainerAllocator {
     Assert.assertEquals(0.95f, job.getProgress(), 0.001f);
     Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f);
   }
-  
+
   @Test
   public void testUpdatedNodes() throws Exception {
     Configuration conf = new Configuration();
@@ -1251,11 +1277,13 @@ public class TestRMContainerAllocator {
     rm.drainEvents();
 
     // create the map container request
-    ContainerRequestEvent event = createReq(jobId, 1, 1024,
-        new String[] { "h1" });
+    ContainerRequestEvent event =
+            ContainerRequestCreator.createRequest(jobId, 1,
+                    Resource.newInstance(1024, 1),
+                    new String[] {"h1"});
     allocator.sendRequest(event);
     TaskAttemptId attemptId = event.getAttemptID();
-    
+
     TaskAttempt mockTaskAttempt = mock(TaskAttempt.class);
     when(mockTaskAttempt.getNodeId()).thenReturn(nm1.getNodeId());
     Task mockTask = mock(Task.class);
@@ -1279,7 +1307,7 @@ public class TestRMContainerAllocator {
     // no updated nodes reported
     Assert.assertTrue(allocator.getJobUpdatedNodeEvents().isEmpty());
     Assert.assertTrue(allocator.getTaskAttemptKillEvents().isEmpty());
-    
+
     // mark nodes bad
     nm1.nodeHeartbeat(false);
     nm2.nodeHeartbeat(false);
@@ -1292,11 +1320,13 @@ public class TestRMContainerAllocator {
     // updated nodes are reported
     Assert.assertEquals(1, allocator.getJobUpdatedNodeEvents().size());
     Assert.assertEquals(1, allocator.getTaskAttemptKillEvents().size());
-    Assert.assertEquals(2, 
allocator.getJobUpdatedNodeEvents().get(0).getUpdatedNodes().size());
-    Assert.assertEquals(attemptId, 
allocator.getTaskAttemptKillEvents().get(0).getTaskAttemptID());
+    Assert.assertEquals(2,
+        allocator.getJobUpdatedNodeEvents().get(0).getUpdatedNodes().size());
+    Assert.assertEquals(attemptId,
+        allocator.getTaskAttemptKillEvents().get(0).getTaskAttemptID());
     allocator.getJobUpdatedNodeEvents().clear();
     allocator.getTaskAttemptKillEvents().clear();
-    
+
     assigned = allocator.schedule();
     rm.drainEvents();
     Assert.assertEquals(0, assigned.size());
@@ -1307,7 +1337,7 @@ public class TestRMContainerAllocator {
 
   @Test
   public void testBlackListedNodes() throws Exception {
-    
+
     LOG.info("Running testBlackListedNodes");
 
     Configuration conf = new Configuration();
@@ -1315,7 +1345,7 @@ public class TestRMContainerAllocator {
     conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
     conf.setInt(
         MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1);
-    
+
     MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
 
@@ -1331,7 +1361,7 @@ public class TestRMContainerAllocator {
         .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
     rm.drainEvents();
-    
+
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
     when(mockJob.getReport()).thenReturn(
@@ -1347,18 +1377,24 @@ public class TestRMContainerAllocator {
     rm.drainEvents();
 
     // create the container request
-    ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
-        new String[] { "h1" });
+    ContainerRequestEvent event1 =
+            ContainerRequestCreator.createRequest(jobId, 1,
+                    Resource.newInstance(1024, 1),
+                    new String[] {"h1"});
     allocator.sendRequest(event1);
 
     // send 1 more request with different resource req
-    ContainerRequestEvent event2 = createReq(jobId, 2, 1024,
-        new String[] { "h2" });
+    ContainerRequestEvent event2 =
+            ContainerRequestCreator.createRequest(jobId, 2,
+                    Resource.newInstance(1024, 1),
+                    new String[] {"h2"});
     allocator.sendRequest(event2);
 
     // send another request with different resource and priority
-    ContainerRequestEvent event3 = createReq(jobId, 3, 1024,
-        new String[] { "h3" });
+    ContainerRequestEvent event3 =
+            ContainerRequestCreator.createRequest(jobId, 3,
+                    Resource.newInstance(1024, 1),
+                    new String[] {"h3"});
     allocator.sendRequest(event3);
 
     // this tells the scheduler about the requests
@@ -1368,9 +1404,9 @@ public class TestRMContainerAllocator {
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
 
     // Send events to blacklist nodes h1 and h2
-    ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false);          
  
+    ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false);
     allocator.sendFailure(f1);
-    ContainerFailedEvent f2 = createFailEvent(jobId, 1, "h2", false);          
  
+    ContainerFailedEvent f2 = createFailEvent(jobId, 1, "h2", false);
     allocator.sendFailure(f2);
 
     // update resources in scheduler
@@ -1392,23 +1428,23 @@ public class TestRMContainerAllocator {
     assigned = allocator.schedule();
     rm.drainEvents();
     assertBlacklistAdditionsAndRemovals(0, 0, rm);
-    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());    
+    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
 
     nodeManager3.nodeHeartbeat(true); // Node heartbeat
     rm.drainEvents();
     assigned = allocator.schedule();
     rm.drainEvents();
     assertBlacklistAdditionsAndRemovals(0, 0, rm);
-        
+
     Assert.assertTrue("No of assignments must be 3", assigned.size() == 3);
-    
+
     // validate that all containers are assigned to h3
     for (TaskAttemptContainerAssignedEvent assig : assigned) {
       Assert.assertTrue("Assigned container host not correct", 
"h3".equals(assig
           .getContainer().getNodeId().getHost()));
     }
   }
-  
+
   @Test
   public void testIgnoreBlacklisting() throws Exception {
     LOG.info("Running testIgnoreBlacklisting");
@@ -1448,7 +1484,7 @@ public class TestRMContainerAllocator {
 
     // Known=1, blacklisted=0, ignore should be false - assign first container
     assigned =
-        getContainerOnHost(jobId, 1, 1024, new String[] { "h1" },
+        getContainerOnHost(jobId, 1, 1024, new String[] {"h1"},
             nodeManagers[0], allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
 
@@ -1463,47 +1499,47 @@ public class TestRMContainerAllocator {
     // Because makeRemoteRequest will not be aware of it until next call
     // The current call will send blacklisted node "h1" to RM
     assigned =
-        getContainerOnHost(jobId, 2, 1024, new String[] { "h1" },
+        getContainerOnHost(jobId, 2, 1024, new String[] {"h1"},
             nodeManagers[0], allocator, 1, 0, 0, 1, rm);
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
 
     // Known=1, blacklisted=1, ignore should be true - assign 1
     assigned =
-        getContainerOnHost(jobId, 2, 1024, new String[] { "h1" },
+        getContainerOnHost(jobId, 2, 1024, new String[] {"h1"},
             nodeManagers[0], allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
 
     nodeManagers[nmNum] = registerNodeManager(nmNum++, rm);
     // Known=2, blacklisted=1, ignore should be true - assign 1 anyway.
     assigned =
-        getContainerOnHost(jobId, 3, 1024, new String[] { "h2" },
+        getContainerOnHost(jobId, 3, 1024, new String[] {"h2"},
             nodeManagers[1], allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
 
     nodeManagers[nmNum] = registerNodeManager(nmNum++, rm);
     // Known=3, blacklisted=1, ignore should be true - assign 1 anyway.
     assigned =
-        getContainerOnHost(jobId, 4, 1024, new String[] { "h3" },
+        getContainerOnHost(jobId, 4, 1024, new String[] {"h3"},
             nodeManagers[2], allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
 
     // Known=3, blacklisted=1, ignore should be true - assign 1
     assigned =
-        getContainerOnHost(jobId, 5, 1024, new String[] { "h1" },
+        getContainerOnHost(jobId, 5, 1024, new String[] {"h1"},
             nodeManagers[0], allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
 
     nodeManagers[nmNum] = registerNodeManager(nmNum++, rm);
     // Known=4, blacklisted=1, ignore should be false - assign 1 anyway
     assigned =
-        getContainerOnHost(jobId, 6, 1024, new String[] { "h4" },
+        getContainerOnHost(jobId, 6, 1024, new String[] {"h4"},
             nodeManagers[3], allocator, 0, 0, 1, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
 
     // Test blacklisting re-enabled.
     // Known=4, blacklisted=1, ignore should be false - no assignment on h1
     assigned =
-        getContainerOnHost(jobId, 7, 1024, new String[] { "h1" },
+        getContainerOnHost(jobId, 7, 1024, new String[] {"h1"},
             nodeManagers[0], allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
     // RMContainerRequestor would have created a replacement request.
@@ -1516,20 +1552,20 @@ public class TestRMContainerAllocator {
     // Known=4, blacklisted=2, ignore should be true. Should assign 0
     // container for the same reason above.
     assigned =
-        getContainerOnHost(jobId, 8, 1024, new String[] { "h1" },
+        getContainerOnHost(jobId, 8, 1024, new String[] {"h1"},
             nodeManagers[0], allocator, 1, 0, 0, 2, rm);
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
 
     // Known=4, blacklisted=2, ignore should be true. Should assign 2
     // containers.
     assigned =
-        getContainerOnHost(jobId, 8, 1024, new String[] { "h1" },
+        getContainerOnHost(jobId, 8, 1024, new String[] {"h1"},
             nodeManagers[0], allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 2", 2, assigned.size());
 
     // Known=4, blacklisted=2, ignore should be true.
     assigned =
-        getContainerOnHost(jobId, 9, 1024, new String[] { "h2" },
+        getContainerOnHost(jobId, 9, 1024, new String[] {"h2"},
             nodeManagers[1], allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
 
@@ -1540,23 +1576,23 @@ public class TestRMContainerAllocator {
     nodeManagers[nmNum] = registerNodeManager(nmNum++, rm);
     // Known=5, blacklisted=3, ignore should be true.
     assigned =
-        getContainerOnHost(jobId, 10, 1024, new String[] { "h3" },
+        getContainerOnHost(jobId, 10, 1024, new String[] {"h3"},
             nodeManagers[2], allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
-    
+
     // Assign on 5 more nodes - to re-enable blacklisting
     for (int i = 0; i < 5; i++) {
       nodeManagers[nmNum] = registerNodeManager(nmNum++, rm);
       assigned =
           getContainerOnHost(jobId, 11 + i, 1024,
-              new String[] { String.valueOf(5 + i) }, nodeManagers[4 + i],
+              new String[] {String.valueOf(5 + i)}, nodeManagers[4 + i],
               allocator, 0, 0, (i == 4 ? 3 : 0), 0, rm);
       Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
     }
 
     // Test h3 (blacklisted while ignoring blacklisting) is blacklisted.
     assigned =
-        getContainerOnHost(jobId, 20, 1024, new String[] { "h3" },
+        getContainerOnHost(jobId, 20, 1024, new String[] {"h3"},
             nodeManagers[2], allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
   }
@@ -1576,7 +1612,8 @@ public class TestRMContainerAllocator {
           int expectedAdditions2, int expectedRemovals2, MyResourceManager rm)
           throws Exception {
     ContainerRequestEvent reqEvent =
-        createReq(jobId, taskAttemptId, memory, hosts);
+            ContainerRequestCreator.createRequest(jobId, taskAttemptId,
+                    Resource.newInstance(memory, 1), hosts);
     allocator.sendRequest(reqEvent);
 
     // Send the request to the RM
@@ -1596,7 +1633,7 @@ public class TestRMContainerAllocator {
         expectedAdditions2, expectedRemovals2, rm);
     return assigned;
   }
- 
+
   @Test
   public void testBlackListedNodesWithSchedulingToThatNode() throws Exception {
     LOG.info("Running testBlackListedNodesWithSchedulingToThatNode");
@@ -1606,7 +1643,7 @@ public class TestRMContainerAllocator {
     conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
     conf.setInt(
         MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1);
-    
+
     MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
 
@@ -1622,7 +1659,7 @@ public class TestRMContainerAllocator {
         .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
     rm.drainEvents();
-    
+
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
     when(mockJob.getReport()).thenReturn(
@@ -1638,8 +1675,10 @@ public class TestRMContainerAllocator {
 
     LOG.info("Requesting 1 Containers _1 on H1");
     // create the container request
-    ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
-        new String[] { "h1" });
+    ContainerRequestEvent event1 =
+            ContainerRequestCreator.createRequest(jobId, 1,
+                    Resource.newInstance(1024, 1),
+                    new String[] {"h1"});
     allocator.sendRequest(event1);
 
     LOG.info("RM Heartbeat (to send the container requests)");
@@ -1653,13 +1692,13 @@ public class TestRMContainerAllocator {
     // update resources in scheduler
     nodeManager1.nodeHeartbeat(true); // Node heartbeat
     rm.drainEvents();
-    
+
     LOG.info("RM Heartbeat (To process the scheduled containers)");
     assigned = allocator.schedule();
     rm.drainEvents();
     assertBlacklistAdditionsAndRemovals(0, 0, rm);
-    Assert.assertEquals("No of assignments must be 1", 1, assigned.size());    
-    
+    Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
+
     LOG.info("Failing container _1 on H1 (should blacklist the node)");
     // Send events to blacklist nodes h1 and h2
     ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false);
@@ -1667,8 +1706,9 @@ public class TestRMContainerAllocator {
 
     //At this stage, a request should be created for a fast fail map
     //Create a FAST_FAIL request for a previously failed map.
-    ContainerRequestEvent event1f = createReq(jobId, 1, 1024,
-        new String[] { "h1" }, true, false);
+    ContainerRequestEvent event1f = createRequest(jobId, 1,
+            Resource.newInstance(1024, 1),
+            new String[] {"h1"}, true, false);
     allocator.sendRequest(event1f);
 
     //Update the Scheduler with the new requests.
@@ -1678,24 +1718,26 @@ public class TestRMContainerAllocator {
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
 
     // send another request with different resource and priority
-    ContainerRequestEvent event3 = createReq(jobId, 3, 1024,
-        new String[] { "h1", "h3" });
+    ContainerRequestEvent event3 =
+            ContainerRequestCreator.createRequest(jobId, 3,
+                    Resource.newInstance(1024, 1),
+                    new String[] {"h1", "h3"});
     allocator.sendRequest(event3);
-    
+
     //Allocator is aware of prio:5 container, and prio:20 (h1+h3) container.
     //RM is only aware of the prio:5 container
-    
+
     LOG.info("h1 Heartbeat (To actually schedule the containers)");
     // update resources in scheduler
     nodeManager1.nodeHeartbeat(true); // Node heartbeat
     rm.drainEvents();
-    
+
     LOG.info("RM Heartbeat (To process the scheduled containers)");
     assigned = allocator.schedule();
     rm.drainEvents();
     assertBlacklistAdditionsAndRemovals(0, 0, rm);
-    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());    
-    
+    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
     //RMContainerAllocator gets assigned a p:5 on a blacklisted node.
 
     //Send a release for the p:5 container + another request.
@@ -1704,26 +1746,26 @@ public class TestRMContainerAllocator {
     rm.drainEvents();
     assertBlacklistAdditionsAndRemovals(0, 0, rm);
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
-    
+
     //Hearbeat from H3 to schedule on this host.
     LOG.info("h3 Heartbeat (To re-schedule the containers)");
     nodeManager3.nodeHeartbeat(true); // Node heartbeat
     rm.drainEvents();
-    
+
     LOG.info("RM Heartbeat (To process the re-scheduled containers for H3)");
     assigned = allocator.schedule();
     assertBlacklistAdditionsAndRemovals(0, 0, rm);
     rm.drainEvents();
-     
+
     // For debugging
     for (TaskAttemptContainerAssignedEvent assig : assigned) {
       LOG.info(assig.getTaskAttemptID() +
           " assgined to " + assig.getContainer().getId() +
           " with priority " + assig.getContainer().getPriority());
     }
-    
+
     Assert.assertEquals("No of assignments must be 2", 2, assigned.size());
-    
+
     // validate that all containers are assigned to h3
     for (TaskAttemptContainerAssignedEvent assig : assigned) {
       Assert.assertEquals("Assigned container " + assig.getContainer().getId()
@@ -1759,13 +1801,13 @@ public class TestRMContainerAllocator {
         assert (false);
       }
     }
-    
+
     List<ResourceRequest> lastAsk = null;
     List<ContainerId> lastRelease = null;
     List<String> lastBlacklistAdditions;
     List<String> lastBlacklistRemovals;
     Resource forceResourceLimit = null;
-    
+
     // override this to copy the objects otherwise FifoScheduler updates the
     // numContainers in same objects as kept by RMContainerAllocator
     @Override
@@ -1855,38 +1897,6 @@ public class TestRMContainerAllocator {
     }
   }
 
-  private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId,
-      int memory, String[] hosts) {
-    return createReq(jobId, taskAttemptId, memory, 1, hosts, false, false);
-  }
-
-  private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId,
-      int mem, String[] hosts, boolean earlierFailedAttempt, boolean reduce) {
-    return createReq(jobId, taskAttemptId, mem,
-        1, hosts, earlierFailedAttempt, reduce);
-  }
-
-  private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId,
-      int memory, int vcore, String[] hosts, boolean earlierFailedAttempt,
-      boolean reduce) {
-    TaskId taskId;
-    if (reduce) {
-      taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
-    } else {
-      taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
-    }
-    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId,
-        taskAttemptId);
-    Resource containerNeed = Resource.newInstance(memory, vcore);
-    if (earlierFailedAttempt) {
-      return ContainerRequestEvent
-          .createContainerRequestEventForFailedContainer(attemptId,
-              containerNeed);
-    }
-    return new ContainerRequestEvent(attemptId, containerNeed, hosts,
-        new String[] { NetworkTopology.DEFAULT_RACK });
-  }
-
   private ContainerFailedEvent createFailEvent(JobId jobId, int taskAttemptId,
       String host, boolean reduce) {
     TaskId taskId;
@@ -1897,9 +1907,9 @@ public class TestRMContainerAllocator {
     }
     TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId,
         taskAttemptId);
-    return new ContainerFailedEvent(attemptId, host);    
+    return new ContainerFailedEvent(attemptId, host);
   }
-  
+
   private ContainerAllocatorEvent createDeallocateEvent(JobId jobId,
       int taskAttemptId, boolean reduce) {
     TaskId taskId;
@@ -1957,14 +1967,14 @@ public class TestRMContainerAllocator {
 
   // Mock RMContainerAllocator
   // Instead of talking to remote Scheduler,uses the local Scheduler
-  private static class MyContainerAllocator extends RMContainerAllocator {
-    static final List<TaskAttemptContainerAssignedEvent> events
-      = new ArrayList<TaskAttemptContainerAssignedEvent>();
-    static final List<TaskAttemptKillEvent> taskAttemptKillEvents 
-      = new ArrayList<TaskAttemptKillEvent>();
-    static final List<JobUpdatedNodesEvent> jobUpdatedNodeEvents 
-    = new ArrayList<JobUpdatedNodesEvent>();
-    static final List<JobEvent> jobEvents = new ArrayList<JobEvent>();
+  static class MyContainerAllocator extends RMContainerAllocator {
+    static final List<TaskAttemptContainerAssignedEvent> events =
+        new ArrayList<>();
+    static final List<TaskAttemptKillEvent> taskAttemptKillEvents =
+        new ArrayList<>();
+    static final List<JobUpdatedNodesEvent> jobUpdatedNodeEvents =
+        new ArrayList<>();
+    static final List<JobEvent> jobEvents = new ArrayList<>();
     private MyResourceManager rm;
     private boolean isUnregistered = false;
     private AllocateResponse allocateResponse;
@@ -2069,7 +2079,7 @@ public class TestRMContainerAllocator {
     }
 
     public void sendRequest(ContainerRequestEvent req) {
-      sendRequests(Arrays.asList(new ContainerRequestEvent[] { req }));
+      sendRequests(Arrays.asList(new ContainerRequestEvent[] {req}));
     }
 
     public void sendRequests(List<ContainerRequestEvent> reqs) {
@@ -2081,7 +2091,7 @@ public class TestRMContainerAllocator {
     public void sendFailure(ContainerFailedEvent f) {
       super.handleEvent(f);
     }
-    
+
     public void sendDeallocate(ContainerAllocatorEvent f) {
       super.handleEvent(f);
     }
@@ -2099,16 +2109,15 @@ public class TestRMContainerAllocator {
       // run the scheduler
       super.heartbeat();
 
-      List<TaskAttemptContainerAssignedEvent> result
-        = new ArrayList<TaskAttemptContainerAssignedEvent>(events);
+      List<TaskAttemptContainerAssignedEvent> result = new ArrayList<>(events);
       events.clear();
       return result;
     }
-    
+
     static List<TaskAttemptKillEvent> getTaskAttemptKillEvents() {
       return taskAttemptKillEvents;
     }
-    
+
     static List<JobUpdatedNodesEvent> getJobUpdatedNodeEvents() {
       return jobUpdatedNodeEvents;
     }
@@ -2117,12 +2126,12 @@ public class TestRMContainerAllocator {
     protected void startAllocatorThread() {
       // override to NOT start thread
     }
-    
+
     @Override
     protected boolean isApplicationMasterRegistered() {
       return super.isApplicationMasterRegistered();
     }
-    
+
     public boolean isUnregistered() {
       return isUnregistered;
     }
@@ -2164,7 +2173,7 @@ public class TestRMContainerAllocator {
     int numPendingReduces = 4;
     float maxReduceRampupLimit = 0.5f;
     float reduceSlowStart = 0.2f;
-    
+
     RMContainerAllocator allocator = mock(RMContainerAllocator.class);
     doCallRealMethod().when(allocator).scheduleReduces(anyInt(), anyInt(),
         anyInt(), anyInt(), anyInt(), anyInt(), any(Resource.class),
@@ -2174,14 +2183,14 @@ public class TestRMContainerAllocator {
 
     // Test slow-start
     allocator.scheduleReduces(
-        totalMaps, succeededMaps, 
-        scheduledMaps, scheduledReduces, 
-        assignedMaps, assignedReduces, 
-        mapResourceReqt, reduceResourceReqt, 
-        numPendingReduces, 
+        totalMaps, succeededMaps,
+        scheduledMaps, scheduledReduces,
+        assignedMaps, assignedReduces,
+        mapResourceReqt, reduceResourceReqt,
+        numPendingReduces,
         maxReduceRampupLimit, reduceSlowStart);
     verify(allocator, never()).setIsReduceStarted(true);
-    
+
     // verify slow-start still in effect when no more maps need to
     // be scheduled but some have yet to complete
     allocator.scheduleReduces(
@@ -2197,23 +2206,23 @@ public class TestRMContainerAllocator {
     succeededMaps = 3;
     doReturn(BuilderUtils.newResource(0, 
0)).when(allocator).getResourceLimit();
     allocator.scheduleReduces(
-        totalMaps, succeededMaps, 
-        scheduledMaps, scheduledReduces, 
-        assignedMaps, assignedReduces, 
-        mapResourceReqt, reduceResourceReqt, 
-        numPendingReduces, 
+        totalMaps, succeededMaps,
+        scheduledMaps, scheduledReduces,
+        assignedMaps, assignedReduces,
+        mapResourceReqt, reduceResourceReqt,
+        numPendingReduces,
         maxReduceRampupLimit, reduceSlowStart);
     verify(allocator, times(1)).setIsReduceStarted(true);
-    
+
     // Test reduce ramp-up
     doReturn(BuilderUtils.newResource(100 * 1024, 100 * 1)).when(allocator)
       .getResourceLimit();
     allocator.scheduleReduces(
-        totalMaps, succeededMaps, 
-        scheduledMaps, scheduledReduces, 
-        assignedMaps, assignedReduces, 
-        mapResourceReqt, reduceResourceReqt, 
-        numPendingReduces, 
+        totalMaps, succeededMaps,
+        scheduledMaps, scheduledReduces,
+        assignedMaps, assignedReduces,
+        mapResourceReqt, reduceResourceReqt,
+        numPendingReduces,
         maxReduceRampupLimit, reduceSlowStart);
     verify(allocator).rampUpReduces(anyInt());
     verify(allocator, never()).rampDownReduces(anyInt());
@@ -2232,18 +2241,18 @@ public class TestRMContainerAllocator {
     verify(allocator).rampDownReduces(anyInt());
 
     // Test reduce ramp-down for when there are scheduled maps
-    // Since we have two scheduled Maps, rampDownReducers 
+    // Since we have two scheduled Maps, rampDownReducers
     // should be invoked twice.
     scheduledMaps = 2;
     assignedReduces = 2;
     doReturn(BuilderUtils.newResource(10 * 1024, 10 * 1)).when(allocator)
       .getResourceLimit();
     allocator.scheduleReduces(
-        totalMaps, succeededMaps, 
-        scheduledMaps, scheduledReduces, 
-        assignedMaps, assignedReduces, 
-        mapResourceReqt, reduceResourceReqt, 
-        numPendingReduces, 
+        totalMaps, succeededMaps,
+        scheduledMaps, scheduledReduces,
+        assignedMaps, assignedReduces,
+        mapResourceReqt, reduceResourceReqt,
+        numPendingReduces,
         maxReduceRampupLimit, reduceSlowStart);
     verify(allocator, times(2)).rampDownReduces(anyInt());
 
@@ -2288,7 +2297,7 @@ public class TestRMContainerAllocator {
       recalculatedReduceSchedule = true;
     }
   }
-  
+
   @Test
   public void testCompletedTasksRecalculateSchedule() throws Exception {
     LOG.info("Running testCompletedTasksRecalculateSchedule");
@@ -2400,31 +2409,33 @@ public class TestRMContainerAllocator {
     RMContainerAllocator allocator = new RMContainerAllocator(
         mock(ClientService.class), mock(AppContext.class),
         new NoopAMPreemptionPolicy());
-    
+
     TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(
         MRBuilderUtils.newTaskId(
             MRBuilderUtils.newJobId(1, 1, 1), 1, TaskType.MAP), 1);
     ApplicationId applicationId = ApplicationId.newInstance(1, 1);
-    ApplicationAttemptId applicationAttemptId = 
ApplicationAttemptId.newInstance(
-        applicationId, 1);
-    ContainerId containerId = ContainerId.newContainerId(applicationAttemptId, 
1);
+    ApplicationAttemptId applicationAttemptId =
+        ApplicationAttemptId.newInstance(applicationId, 1);
+    ContainerId containerId =
+        ContainerId.newContainerId(applicationAttemptId, 1);
     ContainerStatus status = ContainerStatus.newInstance(
         containerId, ContainerState.RUNNING, "", 0);
 
     ContainerStatus abortedStatus = ContainerStatus.newInstance(
         containerId, ContainerState.RUNNING, "",
         ContainerExitStatus.ABORTED);
-    
+
     TaskAttemptEvent event = allocator.createContainerFinishedEvent(status,
         attemptId);
     Assert.assertEquals(TaskAttemptEventType.TA_CONTAINER_COMPLETED,
         event.getType());
-    
+
     TaskAttemptEvent abortedEvent = allocator.createContainerFinishedEvent(
         abortedStatus, attemptId);
     Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent.getType());
-    
-    ContainerId containerId2 = 
ContainerId.newContainerId(applicationAttemptId, 2);
+
+    ContainerId containerId2 =
+        ContainerId.newContainerId(applicationAttemptId, 2);
     ContainerStatus status2 = ContainerStatus.newInstance(containerId2,
         ContainerState.RUNNING, "", 0);
 
@@ -2440,7 +2451,7 @@ public class TestRMContainerAllocator {
         preemptedStatus, attemptId);
     Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent2.getType());
   }
-  
+
   @Test
   public void testUnregistrationOnlyIfRegistered() throws Exception {
     Configuration conf = new Configuration();
@@ -2483,7 +2494,7 @@ public class TestRMContainerAllocator {
     mrApp.stop();
     Assert.assertTrue(allocator.isUnregistered());
   }
-  
+
   // Step-1 : AM send allocate request for 2 ContainerRequests and 1
   // blackListeNode
   // Step-2 : 2 containers are allocated by RM.
@@ -2542,11 +2553,15 @@ public class TestRMContainerAllocator {
     // create the container request
     // send MAP request
     ContainerRequestEvent event1 =
-        createReq(jobId, 1, 1024, new String[] { "h1" });
+            ContainerRequestCreator.createRequest(jobId, 1,
+                    Resource.newInstance(1024, 1),
+                    new String[]{"h1"});
     allocator.sendRequest(event1);
 
     ContainerRequestEvent event2 =
-        createReq(jobId, 2, 2048, new String[] { "h1", "h2" });
+        ContainerRequestCreator.createRequest(jobId, 2,
+                Resource.newInstance(2048, 1),
+                new String[] {"h1", "h2"});
     allocator.sendRequest(event2);
 
     // Send events to blacklist h2
@@ -2584,7 +2599,9 @@ public class TestRMContainerAllocator {
     // RM
     // send container request
     ContainerRequestEvent event3 =
-        createReq(jobId, 3, 1000, new String[] { "h1" });
+            ContainerRequestCreator.createRequest(jobId, 3,
+                    Resource.newInstance(1000, 1),
+                    new String[]{"h1"});
     allocator.sendRequest(event3);
 
     // send deallocate request
@@ -2628,7 +2645,9 @@ public class TestRMContainerAllocator {
     allocator.sendFailure(f2);
 
     ContainerRequestEvent event4 =
-        createReq(jobId, 4, 2000, new String[] { "h1", "h2" });
+            ContainerRequestCreator.createRequest(jobId, 4,
+                    Resource.newInstance(2000, 1),
+                    new String[]{"h1", "h2"});
     allocator.sendRequest(event4);
 
     // send allocate request to 2nd RM and get resync command
@@ -2639,7 +2658,9 @@ public class TestRMContainerAllocator {
     // asks,release,blacklistAaddition
     // and another containerRequest(event5)
     ContainerRequestEvent event5 =
-        createReq(jobId, 5, 3000, new String[] { "h1", "h2", "h3" });
+            ContainerRequestCreator.createRequest(jobId, 5,
+                    Resource.newInstance(3000, 1),
+                    new String[]{"h1", "h2", "h3"});
     allocator.sendRequest(event5);
 
     // send all outstanding request again.
@@ -2696,9 +2717,10 @@ public class TestRMContainerAllocator {
       }
     };
 
-    ContainerRequestEvent mapRequestEvt = createReq(jobId, 0,
-        (int) (maxContainerSupported.getMemorySize() + 10),
-        maxContainerSupported.getVirtualCores(),
+    final int memory = (int) (maxContainerSupported.getMemorySize() + 10);
+    ContainerRequestEvent mapRequestEvt = createRequest(jobId, 0,
+            Resource.newInstance(memory,
+            maxContainerSupported.getVirtualCores()),
         new String[0], false, false);
     allocator.sendRequests(Arrays.asList(mapRequestEvt));
     allocator.schedule();
@@ -2734,10 +2756,11 @@ public class TestRMContainerAllocator {
       }
     };
 
-    ContainerRequestEvent reduceRequestEvt = createReq(jobId, 0,
-        (int) (maxContainerSupported.getMemorySize() + 10),
-        maxContainerSupported.getVirtualCores(),
-        new String[0], false, true);
+    final int memory = (int) (maxContainerSupported.getMemorySize() + 10);
+    ContainerRequestEvent reduceRequestEvt = createRequest(jobId, 0,
+            Resource.newInstance(memory,
+            maxContainerSupported.getVirtualCores()),
+            new String[0], false, true);
     allocator.sendRequests(Arrays.asList(reduceRequestEvt));
     // Reducer container requests are added to the pending queue upon request,
     // schedule all reducers here so that we can observe if reducer requests
@@ -2787,8 +2810,9 @@ public class TestRMContainerAllocator {
     rm1.drainEvents();
     Assert.assertEquals("Should Have 1 Job Event", 1,
         allocator.jobEvents.size());
-    JobEvent event = allocator.jobEvents.get(0); 
-    Assert.assertTrue("Should Reboot", 
event.getType().equals(JobEventType.JOB_AM_REBOOT));
+    JobEvent event = allocator.jobEvents.get(0);
+    Assert.assertTrue("Should Reboot",
+        event.getType().equals(JobEventType.JOB_AM_REBOOT));
   }
 
   @Test(timeout=60000)
@@ -2920,7 +2944,9 @@ public class TestRMContainerAllocator {
     // create some map requests
     ContainerRequestEvent[] reqMapEvents = new 
ContainerRequestEvent[MAP_COUNT];
     for (int i = 0; i < reqMapEvents.length; ++i) {
-      reqMapEvents[i] = createReq(jobId, i, 1024, new String[] { "h" + i });
+      reqMapEvents[i] = ContainerRequestCreator.createRequest(jobId, i,
+              Resource.newInstance(1024, 1),
+              new String[] {"h" + i});
     }
     allocator.sendRequests(Arrays.asList(reqMapEvents));
     // create some reduce requests
@@ -2928,7 +2954,8 @@ public class TestRMContainerAllocator {
         new ContainerRequestEvent[REDUCE_COUNT];
     for (int i = 0; i < reqReduceEvents.length; ++i) {
       reqReduceEvents[i] =
-          createReq(jobId, i, 1024, new String[] {}, false, true);
+          createRequest(jobId, i, Resource.newInstance(1024, 1),
+                  new String[] {}, false, true);
     }
     allocator.sendRequests(Arrays.asList(reqReduceEvents));
     allocator.schedule();
@@ -2975,14 +3002,17 @@ public class TestRMContainerAllocator {
     // create some map requests
     ContainerRequestEvent[] reqMapEvents = new 
ContainerRequestEvent[MAP_COUNT];
     for (int i = 0; i < reqMapEvents.length; ++i) {
-      reqMapEvents[i] = createReq(jobId, i, 1024, new String[] { "h" + i });
+      reqMapEvents[i] = ContainerRequestCreator.createRequest(jobId, i,
+          Resource.newInstance(1024, 1), new String[] {"h" + i});
     }
     allocator.sendRequests(Arrays.asList(reqMapEvents));
     // create some reduce requests
-    ContainerRequestEvent[] reqReduceEvents = new 
ContainerRequestEvent[REDUCE_COUNT];
+    ContainerRequestEvent[] reqReduceEvents =
+        new ContainerRequestEvent[REDUCE_COUNT];
     for (int i = 0; i < reqReduceEvents.length; ++i) {
-      reqReduceEvents[i] = createReq(jobId, i, 1024, new String[] {},
-          false, true);
+      reqReduceEvents[i] =
+          createRequest(jobId, i, Resource.newInstance(1024, 1),
+              new String[] {}, false, true);
     }
     allocator.sendRequests(Arrays.asList(reqReduceEvents));
     allocator.schedule();
@@ -3137,13 +3167,19 @@ public class TestRMContainerAllocator {
 
     // Request 2 maps and 1 reducer(sone on nodes which are not registered).
     ContainerRequestEvent event1 =
-        createReq(jobId, 1, 1024, new String[] { "h1" });
+            ContainerRequestCreator.createRequest(jobId, 1,
+                    Resource.newInstance(1024, 1),
+                    new String[]{"h1"});
     allocator.sendRequest(event1);
     ContainerRequestEvent event2 =
-        createReq(jobId, 2, 1024, new String[] { "h2" });
+            ContainerRequestCreator.createRequest(jobId, 2,
+                    Resource.newInstance(1024, 1),
+                    new String[]{"h2"});
     allocator.sendRequest(event2);
     ContainerRequestEvent event3 =
-        createReq(jobId, 3, 1024, new String[] { "h2" }, false, true);
+            createRequest(jobId, 3,
+                    Resource.newInstance(1024, 1),
+                    new String[]{"h2"}, false, true);
     allocator.sendRequest(event3);
 
     // This will tell the scheduler about the requests but there will be no
@@ -3156,7 +3192,8 @@ public class TestRMContainerAllocator {
 
     // Request for another reducer on h3 which has not registered.
     ContainerRequestEvent event4 =
-        createReq(jobId, 4, 1024, new String[] { "h3" }, false, true);
+        createRequest(jobId, 4, Resource.newInstance(1024, 1),
+                new String[] {"h3"}, false, true);
     allocator.sendRequest(event4);
 
     allocator.schedule();
@@ -3301,13 +3338,18 @@ public class TestRMContainerAllocator {
 
     // Request 2 maps and 1 reducer(sone on nodes which are not registered).
     ContainerRequestEvent event1 =
-        createReq(jobId, 1, 1024, new String[] { "h1" });
+            ContainerRequestCreator.createRequest(jobId, 1,
+                    Resource.newInstance(1024, 1),
+                    new String[]{"h1"});
     allocator.sendRequest(event1);
     ContainerRequestEvent event2 =
-        createReq(jobId, 2, 1024, new String[] { "h2" });
+            ContainerRequestCreator.createRequest(jobId, 2,
+                    Resource.newInstance(1024, 1),
+                    new String[]{"h2"});
     allocator.sendRequest(event2);
     ContainerRequestEvent event3 =
-        createReq(jobId, 3, 1024, new String[] { "h2" }, false, true);
+            createRequest(jobId, 3, Resource.newInstance(1024, 1),
+                    new String[]{"h2"}, false, true);
     allocator.sendRequest(event3);
 
     // This will tell the scheduler about the requests but there will be no
@@ -3320,7 +3362,8 @@ public class TestRMContainerAllocator {
 
     // Request for another reducer on h3 which has not registered.
     ContainerRequestEvent event4 =
-        createReq(jobId, 4, 1024, new String[] { "h3" }, false, true);
+            createRequest(jobId, 4, Resource.newInstance(1024, 1),
+                    new String[]{"h3"}, false, true);
     allocator.sendRequest(event4);
 
     allocator.schedule();
@@ -3433,13 +3476,19 @@ public class TestRMContainerAllocator {
 
     // Request 2 maps and 1 reducer(sone on nodes which are not registered).
     ContainerRequestEvent event1 =
-        createReq(jobId, 1, 1024, new String[] { "h1" });
+            ContainerRequestCreator.createRequest(jobId, 1,
+                    Resource.newInstance(1024, 1),
+                    new String[]{"h1"});
     allocator.sendRequest(event1);
     ContainerRequestEvent event2 =
-        createReq(jobId, 2, 1024, new String[] { "h2" });
+            ContainerRequestCreator.createRequest(jobId, 2,
+                    Resource.newInstance(1024, 1),
+                    new String[]{"h2"});
     allocator.sendRequest(event2);
     ContainerRequestEvent event3 =
-         createReq(jobId, 3, 1024, new String[] { "h1" }, false, true);
+            createRequest(jobId, 3,
+                    Resource.newInstance(1024, 1),
+                    new String[]{"h1"}, false, true);
     allocator.sendRequest(event3);
 
     // This will tell the scheduler about the requests but there will be no
@@ -3449,7 +3498,8 @@ public class TestRMContainerAllocator {
 
     // Request for another reducer on h3 which has not registered.
     ContainerRequestEvent event4 =
-        createReq(jobId, 4, 1024, new String[] { "h3" }, false, true);
+        createRequest(jobId, 4, Resource.newInstance(1024, 1),
+                new String[] {"h3"}, false, true);
     allocator.sendRequest(event4);
 
     allocator.schedule();
@@ -3486,7 +3536,9 @@ public class TestRMContainerAllocator {
 
     // Send request for one more mapper.
     ContainerRequestEvent event5 =
-        createReq(jobId, 5, 1024, new String[] { "h1" });
+            ContainerRequestCreator.createRequest(jobId, 5,
+                    Resource.newInstance(1024, 1),
+                    new String[]{"h1"});
     allocator.sendRequest(event5);
 
     rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(2048, 2));
@@ -3528,7 +3580,7 @@ public class TestRMContainerAllocator {
       return RegisterApplicationMasterResponse.newInstance(
           Resource.newInstance(512, 1),
           Resource.newInstance(512000, 1024),
-          Collections.<ApplicationAccessType,String>emptyMap(),
+          Collections.emptyMap(),
           ByteBuffer.wrap("fake_key".getBytes()),
           Collections.<Container>emptyList(),
           "default",

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8fa7cb6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java
index 7a212e1..1da2fed 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java
@@ -175,16 +175,8 @@ public class UnitsConversionUtil {
    */
   public static int compare(String unitA, long valueA, String unitB,
       long valueB) {
-    if (unitA == null || unitB == null || !KNOWN_UNITS.contains(unitA)
-        || !KNOWN_UNITS.contains(unitB)) {
-      throw new IllegalArgumentException("Units cannot be null");
-    }
-    if (!KNOWN_UNITS.contains(unitA)) {
-      throw new IllegalArgumentException("Unknown unit '" + unitA + "'");
-    }
-    if (!KNOWN_UNITS.contains(unitB)) {
-      throw new IllegalArgumentException("Unknown unit '" + unitB + "'");
-    }
+    checkUnitArgument(unitA);
+    checkUnitArgument(unitB);
     if (unitA.equals(unitB)) {
       return Long.compare(valueA, valueB);
     }
@@ -218,4 +210,36 @@ public class UnitsConversionUtil {
       return tmpA.compareTo(tmpB);
     }
   }
+
+  private static void checkUnitArgument(String unit) {
+    if (unit == null) {
+      throw new IllegalArgumentException("Unit cannot be null");
+    } else if (!KNOWN_UNITS.contains(unit)) {
+      throw new IllegalArgumentException("Unknown unit '" + unit + "'");
+    }
+  }
+
+  /**
+   * Compare a unit to another unit.
+   * <br>
+   * Examples:<br>
+   * 1. 'm' (milli) is smaller than 'k' (kilo), so compareUnits("m", "k")
+   * will return -1.<br>
+   * 2. 'M' (MEGA) is greater than 'k' (kilo), so compareUnits("M", "k") will
+   * return 1.
+   *
+   * @param unitA first unit
+   * @param unitB second unit
+   * @return +1, 0 or -1 depending on whether the relationship between units
+   * is smaller than,
+   * equal to or lesser than.
+   */
+  public static int compareUnits(String unitA, String unitB) {
+    checkUnitArgument(unitA);
+    checkUnitArgument(unitB);
+    int unitAPos = SORTED_UNITS.indexOf(unitA);
+    int unitBPos = SORTED_UNITS.indexOf(unitB);
+
+    return Integer.compare(unitAPos, unitBPos);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8fa7cb6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/resourcetypes/ResourceTypesTestHelper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/resourcetypes/ResourceTypesTestHelper.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/resourcetypes/ResourceTypesTestHelper.java
new file mode 100644
index 0000000..98a8a00
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/resourcetypes/ResourceTypesTestHelper.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.resourcetypes;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Contains helper methods to create Resource and ResourceInformation objects.
+ * ResourceInformation can be created from a resource name
+ * and a resource descriptor as well that comprises amount and unit.
+ */
+public final class ResourceTypesTestHelper {
+
+  private static final Pattern RESOURCE_VALUE_AND_UNIT_PATTERN =
+      Pattern.compile("(\\d+)([A-za-z]*)");
+
+  private ResourceTypesTestHelper() {}
+
+  private static final RecordFactory RECORD_FACTORY = RecordFactoryProvider
+          .getRecordFactory(null);
+
+  private static final class ResourceValueAndUnit {
+    private final Long value;
+    private final String unit;
+
+    private ResourceValueAndUnit(Long value, String unit) {
+      this.value = value;
+      this.unit = unit;
+    }
+  }
+
+  public static Resource newResource(long memory, int vCores, Map<String,
+          String> customResources) {
+    Resource resource = RECORD_FACTORY.newRecordInstance(Resource.class);
+    resource.setMemorySize(memory);
+    resource.setVirtualCores(vCores);
+
+    for (Map.Entry<String, String> customResource :
+            customResources.entrySet()) {
+      String resourceName = customResource.getKey();
+      ResourceInformation resourceInformation =
+              createResourceInformation(resourceName,
+                      customResource.getValue());
+      resource.setResourceInformation(resourceName, resourceInformation);
+    }
+    return resource;
+  }
+
+  public static ResourceInformation createResourceInformation(String
+          resourceName, String descriptor) {
+    ResourceValueAndUnit resourceValueAndUnit =
+            getResourceValueAndUnit(descriptor);
+    return ResourceInformation
+            .newInstance(resourceName, resourceValueAndUnit.unit,
+                    resourceValueAndUnit.value);
+  }
+
+  private static ResourceValueAndUnit getResourceValueAndUnit(String val) {
+    Matcher matcher = RESOURCE_VALUE_AND_UNIT_PATTERN.matcher(val);
+    if (!matcher.find()) {
+      throw new RuntimeException("Invalid pattern of resource descriptor: " +
+              val);
+    } else if (matcher.groupCount() != 2) {
+      throw new RuntimeException("Capturing group count in string " +
+              val + " is not 2!");
+    }
+    long value = Long.parseLong(matcher.group(1));
+
+    return new ResourceValueAndUnit(value, matcher.group(2));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8fa7cb6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
index 0de834c..e06b55e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
@@ -183,7 +183,7 @@ public class BuilderUtils {
   public static NodeId newNodeId(String host, int port) {
     return NodeId.newInstance(host, port);
   }
-  
+
   public static NodeReport newNodeReport(NodeId nodeId, NodeState nodeState,
       String httpAddress, String rackName, Resource used, Resource capability,
       int numContainers, String healthReport, long lastHealthReportTime) {
@@ -422,7 +422,7 @@ public class BuilderUtils {
     report.setPriority(priority);
     return report;
   }
-  
+
   public static ApplicationSubmissionContext newApplicationSubmissionContext(
       ApplicationId applicationId, String applicationName, String queue,
       Priority priority, ContainerLaunchContext amContainer,
@@ -477,6 +477,10 @@ public class BuilderUtils {
     return resource;
   }
 
+  public static Resource newEmptyResource() {
+    return recordFactory.newRecordInstance(Resource.class);
+  }
+
   public static URL newURL(String scheme, String host, int port, String file) {
     URL url = recordFactory.newRecordInstance(URL.class);
     url.setScheme(scheme);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8fa7cb6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
index c0d7d86..9b3c20a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
@@ -49,6 +49,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
+import org.apache.hadoop.yarn.util.UnitsConversionUtil;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -283,24 +284,10 @@ public class SchedulerUtils {
   private static void validateResourceRequest(ResourceRequest resReq,
       Resource maximumResource, QueueInfo queueInfo, RMContext rmContext)
       throws InvalidResourceRequestException {
-    Resource requestedResource = resReq.getCapability();
-    for (int i = 0; i < ResourceUtils.getNumberOfKnownResourceTypes(); i++) {
-      ResourceInformation reqRI = requestedResource.getResourceInformation(i);
-      ResourceInformation maxRI = maximumResource.getResourceInformation(i);
-      if (reqRI.getValue() < 0 || reqRI.getValue() > maxRI.getValue()) {
-        throw new InvalidResourceRequestException(
-            "Invalid resource request, requested resource type=[" + reqRI
-                .getName()
-                + "] < 0 or greater than maximum allowed allocation. Requested 
"
-                + "resource=" + requestedResource
-                + ", maximum allowed allocation=" + maximumResource
-                + ", please note that maximum allowed allocation is calculated 
"
-                + "by scheduler based on maximum resource of registered "
-                + "NodeManagers, which might be less than configured "
-                + "maximum allocation=" + ResourceUtils
-                .getResourceTypesMaximumAllocation());
-      }
-    }
+    final Resource requestedResource = resReq.getCapability();
+    checkResourceRequestAgainstAvailableResource(requestedResource,
+        maximumResource);
+
     String labelExp = resReq.getNodeLabelExpression();
     // we don't allow specify label expression other than resourceName=ANY now
     if (!ResourceRequest.ANY.equals(resReq.getResourceName())
@@ -338,6 +325,78 @@ public class SchedulerUtils {
     }
   }
 
+  @Private
+  @VisibleForTesting
+  static void checkResourceRequestAgainstAvailableResource(Resource 
reqResource,
+      Resource availableResource) throws InvalidResourceRequestException {
+    for (int i = 0; i < ResourceUtils.getNumberOfKnownResourceTypes(); i++) {
+      final ResourceInformation requestedRI =
+          reqResource.getResourceInformation(i);
+      final String reqResourceName = requestedRI.getName();
+
+      if (requestedRI.getValue() < 0) {
+        throwInvalidResourceException(reqResource, availableResource,
+            reqResourceName);
+      }
+
+      final ResourceInformation availableRI =
+          availableResource.getResourceInformation(reqResourceName);
+
+      long requestedResourceValue = requestedRI.getValue();
+      long availableResourceValue = availableRI.getValue();
+      int unitsRelation = UnitsConversionUtil
+          .compareUnits(requestedRI.getUnits(), availableRI.getUnits());
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Requested resource information: " + requestedRI);
+        LOG.debug("Available resource information: " + availableRI);
+        LOG.debug("Relation of units: " + unitsRelation);
+      }
+
+      // requested resource unit is less than available resource unit
+      // e.g. requestedUnit: "m", availableUnit: "K")
+      if (unitsRelation < 0) {
+        availableResourceValue =
+            UnitsConversionUtil.convert(availableRI.getUnits(),
+                requestedRI.getUnits(), availableRI.getValue());
+
+        // requested resource unit is greater than available resource unit
+        // e.g. requestedUnit: "G", availableUnit: "M")
+      } else if (unitsRelation > 0) {
+        requestedResourceValue =
+            UnitsConversionUtil.convert(requestedRI.getUnits(),
+                availableRI.getUnits(), requestedRI.getValue());
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Requested resource value after conversion: " +
+                requestedResourceValue);
+        LOG.info("Available resource value after conversion: " +
+                availableResourceValue);
+      }
+
+      if (requestedResourceValue > availableResourceValue) {
+        throwInvalidResourceException(reqResource, availableResource,
+            reqResourceName);
+      }
+    }
+  }
+
+  private static void throwInvalidResourceException(Resource reqResource,
+      Resource availableResource, String reqResourceName)
+      throws InvalidResourceRequestException {
+    throw new InvalidResourceRequestException(
+        "Invalid resource request, requested resource type=[" + reqResourceName
+            + "] < 0 or greater than maximum allowed allocation. Requested "
+            + "resource=" + reqResource + ", maximum allowed allocation="
+            + availableResource
+            + ", please note that maximum allowed allocation is calculated "
+            + "by scheduler based on maximum resource of registered "
+            + "NodeManagers, which might be less than configured "
+            + "maximum allocation="
+            + ResourceUtils.getResourceTypesMaximumAllocation());
+  }
+
   private static void checkQueueLabelInLabelManager(String labelExpression,
       RMContext rmContext) throws InvalidLabelResourceRequestException {
     // check node label manager contains this label


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to