Author: sseth Date: Wed Aug 29 01:54:16 2012 New Revision: 1378427 URL: http://svn.apache.org/viewvc?rev=1378427&view=rev Log: MAPREDUCE-4602 ammendment. Adds a missing file for the unit test.
Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerRequestor.java Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerRequestor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerRequestor.java?rev=1378427&view=auto ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerRequestor.java (added) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerRequestor.java Wed Aug 29 01:54:16 2012 @@ -0,0 +1,243 @@ +package org.apache.hadoop.mapreduce.v2.app2.rm; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Set; + +import junit.framework.Assert; + +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.app2.AppContext; +import org.apache.hadoop.mapreduce.v2.app2.ControlledClock; +import org.apache.hadoop.mapreduce.v2.app2.job.Job; +import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerRequestor.ContainerRequest; +import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeMap; +import org.apache.hadoop.yarn.Clock; +import org.apache.hadoop.yarn.SystemClock; +import org.apache.hadoop.yarn.api.AMRMProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.AMResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.util.BuilderUtils; +import org.junit.Test; + +public class TestRMContainerRequestor { + + @Test + public void testFailedAllocate() throws Exception{ + AppContext appContext = setupDefaultTestContext(); + AMRMProtocolForFailedAllocate amrm = createAMRMProtocolForFailedAllocate(); + RMContainerRequestorForTest rmComm = new RMContainerRequestorForTest(appContext, amrm); + amrm.setRmCommunicator(rmComm); + rmComm.init(new YarnConfiguration()); + rmComm.start(); + + Resource resource = BuilderUtils.newResource(512); + String [] hosts = new String[]{"host1", "host2"}; + String [] racks = new String[]{"rack1"}; + Priority priority = BuilderUtils.newPriority(5); + ContainerRequest cr1 = new ContainerRequest(resource, hosts, racks, priority); + ContainerRequest cr2 = new ContainerRequest(resource, new String[]{"host1"}, racks, priority); + + rmComm.addContainerReq(cr1); + rmComm.addContainerReq(cr2); + + + // Set containerRequest to be decremented. + amrm.setIncContainerRequest(cr1); + amrm.setDecContainerRequest(cr2); + + // Verify initial ask. + Set<ResourceRequest> askSet = null; + askSet = rmComm.getAskSet(); + assertEquals(4, askSet.size()); //2 hosts. 1 rack. * + verifyAsks(askSet, 2, 1, 2, 2); + + //First heartbeat + rmComm.heartbeat(); + //Verify empty ask. + askSet = rmComm.getAskSet(); + assertEquals(0, askSet.size()); //2 hosts. 1 rack. * + + // Add 2 more container requests. + rmComm.addContainerReq(cr1); + rmComm.addContainerReq(cr2); + + //Verify ask + askSet = rmComm.getAskSet(); + assertEquals(4, askSet.size()); + verifyAsks(askSet, 4, 2, 4, 4); + + try { + rmComm.heartbeat(); + Assert.fail("Second heartbeat was expected to fail"); + } catch (YarnRemoteException yre) { + } + + // Verify ask. Should factor in +cr1 = 5 3 5 5, -cr2 = 4 3 4 4 + assertEquals(4, askSet.size()); + verifyAsks(askSet, 4, 3, 4, 4); + } + + private void verifyAsks(Set<ResourceRequest> askSet, int host1, int host2, int rack1, int generic) { + for (ResourceRequest rr : askSet) { + if (rr.getHostName().equals("*")) { + assertEquals(generic, rr.getNumContainers()); + } else if (rr.getHostName().equals("host1")) { + assertEquals(host1, rr.getNumContainers()); + } else if (rr.getHostName().equals("host2")) { + assertEquals(host2, rr.getNumContainers()); + } else if (rr.getHostName().equals("rack1")) { + assertEquals(rack1, rr.getNumContainers()); + } + } + } + + private AMRMProtocolForFailedAllocate createAMRMProtocolForFailedAllocate() { + AMResponse amResponse = BuilderUtils + .newAMResponse(new ArrayList<Container>(), + BuilderUtils.newResource(1024), new ArrayList<ContainerStatus>(), + false, 1, new ArrayList<NodeReport>()); + AllocateResponse allocateResponse = BuilderUtils.newAllocateResponse( + amResponse, 2); + return new AMRMProtocolForFailedAllocate(allocateResponse); + } + + class AMRMProtocolForFailedAllocate implements AMRMProtocol { + private AllocateResponse allocateResponse; + private RMContainerRequestor rmComm; + private ContainerRequest crInc; + private ContainerRequest crDec; + + AMRMProtocolForFailedAllocate(AllocateResponse response) { + allocateResponse = response; + } + + void setRmCommunicator(RMContainerRequestor rmComm) { + this.rmComm = rmComm; + } + + void setIncContainerRequest(ContainerRequest cr) { + this.crInc = cr; + } + + void setDecContainerRequest(ContainerRequest cr) { + this.crDec = cr; + } + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request) throws YarnRemoteException { + return null; + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster( + FinishApplicationMasterRequest request) throws YarnRemoteException { + return null; + } + + @Override + public AllocateResponse allocate(AllocateRequest request) + throws YarnRemoteException { + if (request.getResponseId() == 0) { + return allocateResponse; + } else if (request.getResponseId() == 1) { + // Change the table before throwing the exception. + rmComm.addContainerReq(crInc); + rmComm.decContainerReq(crDec); + throw RPCUtil.getRemoteException("MockRpcError"); + } + return null; + } + } + + class RMContainerRequestorForTest extends RMContainerRequestor { + + private AMRMProtocol amRmProtocol; + + public RMContainerRequestorForTest(AppContext context) { + super(null, context); + } + + public RMContainerRequestorForTest(AppContext context, AMRMProtocol amrm) { + super(null, context); + this.amRmProtocol = amrm; + } + + @Override + public AMRMProtocol createSchedulerProxy() { + if (amRmProtocol == null) { + amRmProtocol = mock(AMRMProtocol.class); + AMResponse amResponse = BuilderUtils.newAMResponse( + new ArrayList<Container>(), BuilderUtils.newResource(1024), + new ArrayList<ContainerStatus>(), false, 1, + new ArrayList<NodeReport>()); + AllocateResponse allocateResponse = BuilderUtils.newAllocateResponse( + amResponse, 2); + try { + when(amRmProtocol.allocate(any(AllocateRequest.class))).thenReturn(allocateResponse); + } catch (YarnRemoteException e) { + } + } + return amRmProtocol; + } + + @Override public void register() {} + @Override public void unregister() {} + + @Override public void startAllocatorThread() {} + } + + private AppContext setupDefaultTestContext() { + ApplicationId appId = BuilderUtils.newApplicationId(1, 1); + ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( + appId, 1); + JobID id = TypeConverter.fromYarn(appId); + JobId jobId = TypeConverter.toYarn(id); + + Job mockJob = mock(Job.class); + when(mockJob.getID()).thenReturn(jobId); + when(mockJob.getProgress()).thenReturn(0.0f); + + @SuppressWarnings("rawtypes") + EventHandler handler = mock(EventHandler.class); + + Clock clock = new ControlledClock(new SystemClock()); + + AMNodeMap amNodeMap = mock(AMNodeMap.class); + when(amNodeMap.isHostBlackListed(any(String.class))).thenReturn(false); + + AppContext appContext = mock(AppContext.class); + when(appContext.getApplicationID()).thenReturn(appId); + when(appContext.getApplicationAttemptId()).thenReturn(appAttemptId); + when(appContext.getEventHandler()).thenReturn(handler); + when(appContext.getJob(jobId)).thenReturn(mockJob); + when(appContext.getClock()).thenReturn(clock); + when(appContext.getAllNodes()).thenReturn(amNodeMap); + + return appContext; + } +} \ No newline at end of file