Author: sseth Date: Tue Aug 28 00:40:02 2012 New Revision: 1377922 URL: http://svn.apache.org/viewvc?rev=1377922&view=rev Log: MAPREDUCE-4580. Change MapReduce to use the yarn-client module. (Contributed by Vinod Kumar Vavilapalli)
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestResourceMgrDelegate.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1377922&r1=1377921&r2=1377922&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Tue Aug 28 00:40:02 2012 @@ -234,6 +234,9 @@ Release 2.1.0-alpha - Unreleased MAPREDUCE-3289. Make use of fadvise in the NM's shuffle handler. (Todd Lipcon and Siddharth Seth via sseth) + MAPREDUCE-4580. Change MapReduce to use the yarn-client module. + (Vinod Kumar Vavilapalli via sseth) + OPTIMIZATIONS BUG FIXES Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml?rev=1377922&r1=1377921&r2=1377922&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml Tue Aug 28 00:40:02 2012 @@ -39,6 +39,10 @@ </dependency> <dependency> <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-client</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> </dependency> <dependency> Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java?rev=1377922&r1=1377921&r2=1377922&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java Tue Aug 28 00:40:02 2012 @@ -19,9 +19,6 @@ package org.apache.hadoop.mapred; import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -41,75 +38,29 @@ import org.apache.hadoop.mapreduce.v2.ut import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.ClientRMProtocol; -import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; -import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; -import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationReport; -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; -import org.apache.hadoop.yarn.api.records.DelegationToken; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnRemoteException; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.util.ProtoUtils; +import org.hadoop.yarn.client.YarnClientImpl; - -// TODO: This should be part of something like yarn-client. -public class ResourceMgrDelegate { +public class ResourceMgrDelegate extends YarnClientImpl { private static final Log LOG = LogFactory.getLog(ResourceMgrDelegate.class); - private final InetSocketAddress rmAddress; private YarnConfiguration conf; - ClientRMProtocol applicationsManager; + private GetNewApplicationResponse application; private ApplicationId applicationId; - private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); /** * Delegate responsible for communicating with the Resource Manager's {@link ClientRMProtocol}. * @param conf the configuration object. */ public ResourceMgrDelegate(YarnConfiguration conf) { + super(); this.conf = conf; - YarnRPC rpc = YarnRPC.create(this.conf); - this.rmAddress = getRmAddress(conf); - LOG.debug("Connecting to ResourceManager at " + rmAddress); - applicationsManager = - (ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class, - rmAddress, this.conf); - LOG.debug("Connected to ResourceManager at " + rmAddress); - } - - /** - * Used for injecting applicationsManager, mostly for testing. - * @param conf the configuration object - * @param applicationsManager the handle to talk the resource managers - * {@link ClientRMProtocol}. - */ - public ResourceMgrDelegate(YarnConfiguration conf, - ClientRMProtocol applicationsManager) { - this.conf = conf; - this.applicationsManager = applicationsManager; - this.rmAddress = getRmAddress(conf); - } - - private static InetSocketAddress getRmAddress(YarnConfiguration conf) { - return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, - YarnConfiguration.DEFAULT_RM_ADDRESS, - YarnConfiguration.DEFAULT_RM_PORT); + init(conf); + start(); } public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0) @@ -117,26 +68,15 @@ public class ResourceMgrDelegate { return; } - public TaskTrackerInfo[] getActiveTrackers() throws IOException, InterruptedException { - GetClusterNodesRequest request = - recordFactory.newRecordInstance(GetClusterNodesRequest.class); - GetClusterNodesResponse response = - applicationsManager.getClusterNodes(request); - return TypeConverter.fromYarnNodes(response.getNodeReports()); + return TypeConverter.fromYarnNodes(super.getNodeReports()); } - public JobStatus[] getAllJobs() throws IOException, InterruptedException { - GetAllApplicationsRequest request = - recordFactory.newRecordInstance(GetAllApplicationsRequest.class); - GetAllApplicationsResponse response = - applicationsManager.getAllApplications(request); - return TypeConverter.fromYarnApps(response.getApplicationList(), this.conf); + return TypeConverter.fromYarnApps(super.getApplicationList(), this.conf); } - public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException, InterruptedException { // TODO: Implement getBlacklistedTrackers @@ -144,128 +84,56 @@ public class ResourceMgrDelegate { return new TaskTrackerInfo[0]; } - public ClusterMetrics getClusterMetrics() throws IOException, InterruptedException { - GetClusterMetricsRequest request = recordFactory.newRecordInstance(GetClusterMetricsRequest.class); - GetClusterMetricsResponse response = applicationsManager.getClusterMetrics(request); - YarnClusterMetrics metrics = response.getClusterMetrics(); + YarnClusterMetrics metrics = super.getYarnClusterMetrics(); ClusterMetrics oldMetrics = new ClusterMetrics(1, 1, 1, 1, 1, 1, metrics.getNumNodeManagers() * 10, metrics.getNumNodeManagers() * 2, 1, metrics.getNumNodeManagers(), 0, 0); return oldMetrics; } - @SuppressWarnings("rawtypes") - public Token getDelegationToken(Text renewer) - throws IOException, InterruptedException { - /* get the token from RM */ - org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest - rmDTRequest = recordFactory.newRecordInstance( - org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest.class); - rmDTRequest.setRenewer(renewer.toString()); - org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse - response = applicationsManager.getDelegationToken(rmDTRequest); - DelegationToken yarnToken = response.getRMDelegationToken(); - return ProtoUtils.convertFromProtoFormat(yarnToken, rmAddress); + public Token getDelegationToken(Text renewer) throws IOException, + InterruptedException { + return ProtoUtils.convertFromProtoFormat( + super.getRMDelegationToken(renewer), rmAddress); } - public String getFilesystemName() throws IOException, InterruptedException { return FileSystem.get(conf).getUri().toString(); } public JobID getNewJobID() throws IOException, InterruptedException { - GetNewApplicationRequest request = recordFactory.newRecordInstance(GetNewApplicationRequest.class); - applicationId = applicationsManager.getNewApplication(request).getApplicationId(); + this.application = super.getNewApplication(); + this.applicationId = this.application.getApplicationId(); return TypeConverter.fromYarn(applicationId); } - private static final String ROOT = "root"; - - private GetQueueInfoRequest getQueueInfoRequest(String queueName, - boolean includeApplications, boolean includeChildQueues, boolean recursive) { - GetQueueInfoRequest request = - recordFactory.newRecordInstance(GetQueueInfoRequest.class); - request.setQueueName(queueName); - request.setIncludeApplications(includeApplications); - request.setIncludeChildQueues(includeChildQueues); - request.setRecursive(recursive); - return request; - - } - public QueueInfo getQueue(String queueName) throws IOException, InterruptedException { - GetQueueInfoRequest request = - getQueueInfoRequest(queueName, true, false, false); - recordFactory.newRecordInstance(GetQueueInfoRequest.class); return TypeConverter.fromYarn( - applicationsManager.getQueueInfo(request).getQueueInfo(), this.conf); + super.getQueueInfo(queueName), this.conf); } - - private void getChildQueues(org.apache.hadoop.yarn.api.records.QueueInfo parent, - List<org.apache.hadoop.yarn.api.records.QueueInfo> queues, - boolean recursive) { - List<org.apache.hadoop.yarn.api.records.QueueInfo> childQueues = - parent.getChildQueues(); - - for (org.apache.hadoop.yarn.api.records.QueueInfo child : childQueues) { - queues.add(child); - if(recursive) { - getChildQueues(child, queues, recursive); - } - } - } - public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException, InterruptedException { - GetQueueUserAclsInfoRequest request = - recordFactory.newRecordInstance(GetQueueUserAclsInfoRequest.class); - List<QueueUserACLInfo> userAcls = - applicationsManager.getQueueUserAcls(request).getUserAclsInfoList(); - return TypeConverter.fromYarnQueueUserAclsInfo(userAcls); + return TypeConverter.fromYarnQueueUserAclsInfo(super + .getQueueAclsInfo()); } - public QueueInfo[] getQueues() throws IOException, InterruptedException { - List<org.apache.hadoop.yarn.api.records.QueueInfo> queues = - new ArrayList<org.apache.hadoop.yarn.api.records.QueueInfo>(); - - org.apache.hadoop.yarn.api.records.QueueInfo rootQueue = - applicationsManager.getQueueInfo( - getQueueInfoRequest(ROOT, false, true, true)).getQueueInfo(); - getChildQueues(rootQueue, queues, true); - - return TypeConverter.fromYarnQueueInfo(queues, this.conf); + return TypeConverter.fromYarnQueueInfo(super.getAllQueues(), this.conf); } - public QueueInfo[] getRootQueues() throws IOException, InterruptedException { - List<org.apache.hadoop.yarn.api.records.QueueInfo> queues = - new ArrayList<org.apache.hadoop.yarn.api.records.QueueInfo>(); - - org.apache.hadoop.yarn.api.records.QueueInfo rootQueue = - applicationsManager.getQueueInfo( - getQueueInfoRequest(ROOT, false, true, true)).getQueueInfo(); - getChildQueues(rootQueue, queues, false); - - return TypeConverter.fromYarnQueueInfo(queues, this.conf); + return TypeConverter.fromYarnQueueInfo(super.getRootQueueInfos(), this.conf); } public QueueInfo[] getChildQueues(String parent) throws IOException, InterruptedException { - List<org.apache.hadoop.yarn.api.records.QueueInfo> queues = - new ArrayList<org.apache.hadoop.yarn.api.records.QueueInfo>(); - - org.apache.hadoop.yarn.api.records.QueueInfo parentQueue = - applicationsManager.getQueueInfo( - getQueueInfoRequest(parent, false, true, false)).getQueueInfo(); - getChildQueues(parentQueue, queues, true); - - return TypeConverter.fromYarnQueueInfo(queues, this.conf); + return TypeConverter.fromYarnQueueInfo(super.getChildQueueInfos(parent), + this.conf); } public String getStagingAreaDir() throws IOException, InterruptedException { @@ -307,40 +175,6 @@ public class ResourceMgrDelegate { return 0; } - - public ApplicationId submitApplication( - ApplicationSubmissionContext appContext) - throws IOException { - appContext.setApplicationId(applicationId); - SubmitApplicationRequest request = - recordFactory.newRecordInstance(SubmitApplicationRequest.class); - request.setApplicationSubmissionContext(appContext); - applicationsManager.submitApplication(request); - LOG.info("Submitted application " + applicationId + " to ResourceManager" + - " at " + rmAddress); - return applicationId; - } - - public void killApplication(ApplicationId applicationId) throws IOException { - KillApplicationRequest request = - recordFactory.newRecordInstance(KillApplicationRequest.class); - request.setApplicationId(applicationId); - applicationsManager.forceKillApplication(request); - LOG.info("Killing application " + applicationId); - } - - - public ApplicationReport getApplicationReport(ApplicationId appId) - throws YarnRemoteException { - GetApplicationReportRequest request = recordFactory - .newRecordInstance(GetApplicationReportRequest.class); - request.setApplicationId(appId); - GetApplicationReportResponse response = applicationsManager - .getApplicationReport(request); - ApplicationReport applicationReport = response.getApplicationReport(); - return applicationReport; - } - public ApplicationId getApplicationId() { return applicationId; } Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java?rev=1377922&r1=1377921&r2=1377922&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java Tue Aug 28 00:40:02 2012 @@ -89,7 +89,7 @@ import org.apache.hadoop.yarn.util.Proto /** * This class enables the current JobClient (0.22 hadoop) to run on YARN. */ -@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" }) +@SuppressWarnings({ "rawtypes", "unchecked" }) public class YARNRunner implements ClientProtocol { private static final Log LOG = LogFactory.getLog(YARNRunner.class); Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestResourceMgrDelegate.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestResourceMgrDelegate.java?rev=1377922&r1=1377921&r2=1377922&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestResourceMgrDelegate.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestResourceMgrDelegate.java Tue Aug 28 00:40:02 2012 @@ -50,7 +50,7 @@ public class TestResourceMgrDelegate { */ @Test public void testGetRootQueues() throws IOException, InterruptedException { - ClientRMProtocol applicationsManager = Mockito.mock(ClientRMProtocol.class); + final ClientRMProtocol applicationsManager = Mockito.mock(ClientRMProtocol.class); GetQueueInfoResponse response = Mockito.mock(GetQueueInfoResponse.class); org.apache.hadoop.yarn.api.records.QueueInfo queueInfo = Mockito.mock(org.apache.hadoop.yarn.api.records.QueueInfo.class); @@ -59,12 +59,17 @@ public class TestResourceMgrDelegate { GetQueueInfoRequest.class))).thenReturn(response); ResourceMgrDelegate delegate = new ResourceMgrDelegate( - new YarnConfiguration(), applicationsManager); + new YarnConfiguration()) { + @Override + public synchronized void start() { + this.rmClient = applicationsManager; + } + }; delegate.getRootQueues(); ArgumentCaptor<GetQueueInfoRequest> argument = ArgumentCaptor.forClass(GetQueueInfoRequest.class); - Mockito.verify(delegate.applicationsManager).getQueueInfo( + Mockito.verify(applicationsManager).getQueueInfo( argument.capture()); Assert.assertTrue("Children of root queue not requested", @@ -75,7 +80,7 @@ public class TestResourceMgrDelegate { @Test public void tesAllJobs() throws Exception { - ClientRMProtocol applicationsManager = Mockito.mock(ClientRMProtocol.class); + final ClientRMProtocol applicationsManager = Mockito.mock(ClientRMProtocol.class); GetAllApplicationsResponse allApplicationsResponse = Records .newRecord(GetAllApplicationsResponse.class); List<ApplicationReport> applications = new ArrayList<ApplicationReport>(); @@ -93,7 +98,12 @@ public class TestResourceMgrDelegate { .any(GetAllApplicationsRequest.class))).thenReturn( allApplicationsResponse); ResourceMgrDelegate resourceMgrDelegate = new ResourceMgrDelegate( - new YarnConfiguration(), applicationsManager); + new YarnConfiguration()) { + @Override + public synchronized void start() { + this.rmClient = applicationsManager; + } + }; JobStatus[] allJobs = resourceMgrDelegate.getAllJobs(); Assert.assertEquals(State.FAILED, allJobs[0].getState()); Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java?rev=1377922&r1=1377921&r2=1377922&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java Tue Aug 28 00:40:02 2012 @@ -18,9 +18,10 @@ package org.apache.hadoop.mapreduce; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.any; import static org.mockito.Mockito.when; + import java.io.IOException; import java.nio.ByteBuffer; @@ -104,14 +105,20 @@ public class TestYarnClientProtocolProvi rmDTToken.setPassword(ByteBuffer.wrap("testcluster".getBytes())); rmDTToken.setService("0.0.0.0:8032"); getDTResponse.setRMDelegationToken(rmDTToken); - ClientRMProtocol cRMProtocol = mock(ClientRMProtocol.class); + final ClientRMProtocol cRMProtocol = mock(ClientRMProtocol.class); when(cRMProtocol.getDelegationToken(any( GetDelegationTokenRequest.class))).thenReturn(getDTResponse); ResourceMgrDelegate rmgrDelegate = new ResourceMgrDelegate( - new YarnConfiguration(conf), cRMProtocol); + new YarnConfiguration(conf)) { + @Override + public synchronized void start() { + this.rmClient = cRMProtocol; + } + }; yrunner.setResourceMgrDelegate(rmgrDelegate); Token t = cluster.getDelegationToken(new Text(" ")); - assertTrue("Testclusterkind".equals(t.getKind().toString())); + assertTrue("Token kind is instead " + t.getKind().toString(), + "Testclusterkind".equals(t.getKind().toString())); } finally { if (cluster != null) { cluster.close(); Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java?rev=1377922&r1=1377921&r2=1377922&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java Tue Aug 28 00:40:02 2012 @@ -177,8 +177,13 @@ public class TestYARNRunner extends Test @Test public void testResourceMgrDelegate() throws Exception { /* we not want a mock of resourcemgr deleagte */ - ClientRMProtocol clientRMProtocol = mock(ClientRMProtocol.class); - ResourceMgrDelegate delegate = new ResourceMgrDelegate(conf, clientRMProtocol); + final ClientRMProtocol clientRMProtocol = mock(ClientRMProtocol.class); + ResourceMgrDelegate delegate = new ResourceMgrDelegate(conf) { + @Override + public synchronized void start() { + this.rmClient = clientRMProtocol; + } + }; /* make sure kill calls finish application master */ when(clientRMProtocol.forceKillApplication(any(KillApplicationRequest.class))) .thenReturn(null);