Author: bikas Date: Sat Jul 13 23:38:27 2013 New Revision: 1502916 URL: http://svn.apache.org/r1502916 Log: Merge r1502914 from trunk to branch-2 for YARN-763. AMRMClientAsync should stop heartbeating after receiving shutdown from RM (Xuan Gong via bikas)
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1502916&r1=1502915&r2=1502916&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Sat Jul 13 23:38:27 2013 @@ -669,6 +669,9 @@ Release 2.1.0-beta - 2013-07-02 YARN-541. getAllocatedContainers() is not returning all the allocated containers (bikas) + YARN-763. AMRMClientAsync should stop heartbeating after receiving + shutdown from RM (Xuan Gong via bikas) + BREAKDOWN OF HADOOP-8562 SUBTASKS AND RELATED JIRAS YARN-158. Yarn creating package-info.java must not depend on sh. Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java?rev=1502916&r1=1502915&r2=1502916&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java Sat Jul 13 23:38:27 2013 @@ -31,6 +31,7 @@ import org.apache.hadoop.classification. import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -92,6 +93,7 @@ extends AMRMClientAsync<T> { @Override protected void serviceStart() throws Exception { + handlerThread.setDaemon(true); handlerThread.start(); client.start(); super.serviceStart(); @@ -99,27 +101,19 @@ extends AMRMClientAsync<T> { /** * Tells the heartbeat and handler threads to stop and waits for them to - * terminate. Calling this method from the callback handler thread would cause - * deadlock, and thus should be avoided. + * terminate. */ @Override protected void serviceStop() throws Exception { - if (Thread.currentThread() == handlerThread) { - throw new YarnRuntimeException("Cannot call stop from callback handler thread!"); - } keepRunning = false; + heartbeatThread.interrupt(); try { heartbeatThread.join(); } catch (InterruptedException ex) { LOG.error("Error joining with heartbeat thread", ex); } client.stop(); - try { - handlerThread.interrupt(); - handlerThread.join(); - } catch (InterruptedException ex) { - LOG.error("Error joining with hander thread", ex); - } + handlerThread.interrupt(); super.serviceStop(); } @@ -248,6 +242,10 @@ extends AMRMClientAsync<T> { while (true) { try { responseQueue.put(response); + if (response.getAMCommand() == AMCommand.AM_RESYNC + || response.getAMCommand() == AMCommand.AM_SHUTDOWN) { + return; + } break; } catch (InterruptedException ex) { LOG.info("Interrupted while waiting to put on response queue", ex); @@ -285,24 +283,18 @@ extends AMRMClientAsync<T> { } if (response.getAMCommand() != null) { - boolean stop = false; switch(response.getAMCommand()) { case AM_RESYNC: case AM_SHUTDOWN: handler.onShutdownRequest(); LOG.info("Shutdown requested. Stopping callback."); - stop = true; - break; + return; default: String msg = "Unhandled value of AMCommand: " + response.getAMCommand(); LOG.error(msg); throw new YarnRuntimeException(msg); } - if(stop) { - // should probably stop heartbeating also YARN-763 - break; - } } List<NodeReport> updatedNodes = response.getUpdatedNodes(); if (!updatedNodes.isEmpty()) { Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java?rev=1502916&r1=1502915&r2=1502916&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java Sat Jul 13 23:38:27 2013 @@ -23,7 +23,10 @@ import static org.mockito.Matchers.anyIn import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -218,6 +221,65 @@ public class TestAMRMClientAsync { Assert.assertTrue(callbackHandler.callbackCount == 0); } + @Test (timeout = 10000) + public void testAMRMClientAsyncShutDown() throws Exception { + Configuration conf = new Configuration(); + TestCallbackHandler callbackHandler = new TestCallbackHandler(); + @SuppressWarnings("unchecked") + AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class); + + final AllocateResponse shutDownResponse = createAllocateResponse( + new ArrayList<ContainerStatus>(), new ArrayList<Container>(), null); + shutDownResponse.setAMCommand(AMCommand.AM_SHUTDOWN); + when(client.allocate(anyFloat())).thenReturn(shutDownResponse); + + AMRMClientAsync<ContainerRequest> asyncClient = + AMRMClientAsync.createAMRMClientAsync(client, 10, callbackHandler); + asyncClient.init(conf); + asyncClient.start(); + + asyncClient.registerApplicationMaster("localhost", 1234, null); + + Thread.sleep(50); + + verify(client, times(1)).allocate(anyFloat()); + asyncClient.stop(); + } + + @Test (timeout = 5000) + public void testCallAMRMClientAsyncStopFromCallbackHandler() + throws YarnException, IOException, InterruptedException { + Configuration conf = new Configuration(); + TestCallbackHandler2 callbackHandler = new TestCallbackHandler2(); + @SuppressWarnings("unchecked") + AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class); + + List<ContainerStatus> completed = Arrays.asList( + ContainerStatus.newInstance(newContainerId(0, 0, 0, 0), + ContainerState.COMPLETE, "", 0)); + final AllocateResponse response = createAllocateResponse(completed, + new ArrayList<Container>(), null); + + when(client.allocate(anyFloat())).thenReturn(response); + + AMRMClientAsync<ContainerRequest> asyncClient = + AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler); + callbackHandler.registerAsyncClient(asyncClient); + asyncClient.init(conf); + asyncClient.start(); + + synchronized (callbackHandler.notifier) { + asyncClient.registerApplicationMaster("localhost", 1234, null); + while(callbackHandler.stop == false) { + try { + callbackHandler.notifier.wait(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } + private AllocateResponse createAllocateResponse( List<ContainerStatus> completed, List<Container> allocated, List<NMToken> nmTokens) { @@ -323,4 +385,41 @@ public class TestAMRMClientAsync { } } } + + private class TestCallbackHandler2 implements AMRMClientAsync.CallbackHandler { + Object notifier = new Object(); + @SuppressWarnings("rawtypes") + AMRMClientAsync asynClient; + boolean stop = false; + + @Override + public void onContainersCompleted(List<ContainerStatus> statuses) {} + + @Override + public void onContainersAllocated(List<Container> containers) {} + + @Override + public void onShutdownRequest() {} + + @Override + public void onNodesUpdated(List<NodeReport> updatedNodes) {} + + @Override + public float getProgress() { + asynClient.stop(); + stop = true; + synchronized (notifier) { + notifier.notifyAll(); + } + return 0; + } + + @Override + public void onError(Exception e) {} + + public void registerAsyncClient( + AMRMClientAsync<ContainerRequest> asyncClient) { + this.asynClient = asyncClient; + } + } }