Author: daryn Date: Wed Feb 6 15:26:30 2013 New Revision: 1443016 URL: http://svn.apache.org/viewvc?rev=1443016&view=rev Log: YARN-357. App submission should not be synchronized (daryn)
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1443016&r1=1443015&r2=1443016&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Wed Feb 6 15:26:30 2013 @@ -300,6 +300,8 @@ Release 0.23.7 - UNRELEASED OPTIMIZATIONS + YARN-357. App submission should not be synchronized (daryn) + BUG FIXES YARN-343. Capacity Scheduler maximum-capacity value -1 is invalid (Xuan Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java?rev=1443016&r1=1443015&r2=1443016&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java Wed Feb 6 15:26:30 2013 @@ -228,7 +228,7 @@ public class RMAppManager implements Eve } @SuppressWarnings("unchecked") - protected synchronized void submitApplication( + protected void submitApplication( ApplicationSubmissionContext submissionContext, long submitTime) { ApplicationId applicationId = submissionContext.getApplicationId(); RMApp application = null; Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java?rev=1443016&r1=1443015&r2=1443016&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java Wed Feb 6 15:26:30 2013 @@ -27,7 +27,9 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.security.PrivilegedExceptionAction; import java.util.List; +import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CyclicBarrier; import junit.framework.Assert; @@ -37,6 +39,7 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.io.Text; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.api.ClientRMProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; @@ -44,28 +47,36 @@ import org.apache.hadoop.yarn.api.protoc import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.DelegationToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.Records; -import org.junit.Test; import org.junit.AfterClass; import org.junit.BeforeClass; - +import org.junit.Test; public class TestClientRMService { @@ -235,6 +246,88 @@ public class TestClientRMService { rmService.renewDelegationToken(request); } + @Test(timeout=4000) + public void testConcurrentAppSubmit() + throws IOException, InterruptedException, BrokenBarrierException { + YarnScheduler yarnScheduler = mock(YarnScheduler.class); + RMContext rmContext = mock(RMContext.class); + mockRMContext(yarnScheduler, rmContext); + RMStateStore stateStore = mock(RMStateStore.class); + when(rmContext.getStateStore()).thenReturn(stateStore); + RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler, + null, mock(ApplicationACLsManager.class), new Configuration()); + + final ApplicationId appId1 = getApplicationId(100); + final ApplicationId appId2 = getApplicationId(101); + final SubmitApplicationRequest submitRequest1 = mockSubmitAppRequest(appId1); + final SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest(appId2); + + final CyclicBarrier startBarrier = new CyclicBarrier(2); + final CyclicBarrier endBarrier = new CyclicBarrier(2); + + @SuppressWarnings("rawtypes") + EventHandler eventHandler = new EventHandler() { + @Override + public void handle(Event rawEvent) { + if (rawEvent instanceof RMAppEvent) { + RMAppEvent event = (RMAppEvent) rawEvent; + if (event.getApplicationId().equals(appId1)) { + try { + startBarrier.await(); + endBarrier.await(); + } catch (BrokenBarrierException e) { + LOG.warn("Broken Barrier", e); + } catch (InterruptedException e) { + LOG.warn("Interrupted while awaiting barriers", e); + } + } + } + } + }; + + when(rmContext.getDispatcher().getEventHandler()).thenReturn(eventHandler); + + final ClientRMService rmService = + new ClientRMService(rmContext, yarnScheduler, appManager, null, null); + + // submit an app and wait for it to block while in app submission + Thread t = new Thread() { + @Override + public void run() { + try { + rmService.submitApplication(submitRequest1); + } catch (YarnRemoteException e) {} + } + }; + t.start(); + + // submit another app, so go through while the first app is blocked + startBarrier.await(); + rmService.submitApplication(submitRequest2); + endBarrier.await(); + t.join(); + } + + private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId) { + String user = MockApps.newUserName(); + String queue = MockApps.newQueue(); + + ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class); + Resource resource = mock(Resource.class); + when(amContainerSpec.getResource()).thenReturn(resource); + + ApplicationSubmissionContext submissionContext = mock(ApplicationSubmissionContext.class); + when(submissionContext.getUser()).thenReturn(user); + when(submissionContext.getQueue()).thenReturn(queue); + when(submissionContext.getAMContainerSpec()).thenReturn(amContainerSpec); + when(submissionContext.getApplicationId()).thenReturn(appId); + + SubmitApplicationRequest submitRequest = + recordFactory.newRecordInstance(SubmitApplicationRequest.class); + submitRequest.setApplicationSubmissionContext(submissionContext); + return submitRequest; + } + private void mockRMContext(YarnScheduler yarnScheduler, RMContext rmContext) throws IOException { Dispatcher dispatcher = mock(Dispatcher.class);