Author: bikas
Date: Sat Jul 13 23:38:27 2013
New Revision: 1502916

URL: http://svn.apache.org/r1502916
Log:
Merge r1502914 from trunk to branch-2 for YARN-763. AMRMClientAsync should stop 
heartbeating after receiving shutdown from RM (Xuan Gong via bikas)

Modified:
    hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
    
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
    
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1502916&r1=1502915&r2=1502916&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Sat Jul 13 
23:38:27 2013
@@ -669,6 +669,9 @@ Release 2.1.0-beta - 2013-07-02
     YARN-541. getAllocatedContainers() is not returning all the allocated
     containers (bikas)
 
+    YARN-763. AMRMClientAsync should stop heartbeating after receiving
+    shutdown from RM (Xuan Gong via bikas)
+
   BREAKDOWN OF HADOOP-8562 SUBTASKS AND RELATED JIRAS
 
     YARN-158. Yarn creating package-info.java must not depend on sh.

Modified: 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java?rev=1502916&r1=1502915&r2=1502916&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
 Sat Jul 13 23:38:27 2013
@@ -31,6 +31,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.AMCommand;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -92,6 +93,7 @@ extends AMRMClientAsync<T> {
   
   @Override
   protected void serviceStart() throws Exception {
+    handlerThread.setDaemon(true);
     handlerThread.start();
     client.start();
     super.serviceStart();
@@ -99,27 +101,19 @@ extends AMRMClientAsync<T> {
   
   /**
    * Tells the heartbeat and handler threads to stop and waits for them to
-   * terminate.  Calling this method from the callback handler thread would 
cause
-   * deadlock, and thus should be avoided.
+   * terminate.
    */
   @Override
   protected void serviceStop() throws Exception {
-    if (Thread.currentThread() == handlerThread) {
-      throw new YarnRuntimeException("Cannot call stop from callback handler 
thread!");
-    }
     keepRunning = false;
+    heartbeatThread.interrupt();
     try {
       heartbeatThread.join();
     } catch (InterruptedException ex) {
       LOG.error("Error joining with heartbeat thread", ex);
     }
     client.stop();
-    try {
-      handlerThread.interrupt();
-      handlerThread.join();
-    } catch (InterruptedException ex) {
-      LOG.error("Error joining with hander thread", ex);
-    }
+    handlerThread.interrupt();
     super.serviceStop();
   }
   
@@ -248,6 +242,10 @@ extends AMRMClientAsync<T> {
           while (true) {
             try {
               responseQueue.put(response);
+              if (response.getAMCommand() == AMCommand.AM_RESYNC
+                  || response.getAMCommand() == AMCommand.AM_SHUTDOWN) {
+                return;
+              }
               break;
             } catch (InterruptedException ex) {
               LOG.info("Interrupted while waiting to put on response queue", 
ex);
@@ -285,24 +283,18 @@ extends AMRMClientAsync<T> {
         }
 
         if (response.getAMCommand() != null) {
-          boolean stop = false;
           switch(response.getAMCommand()) {
           case AM_RESYNC:
           case AM_SHUTDOWN:
             handler.onShutdownRequest();
             LOG.info("Shutdown requested. Stopping callback.");
-            stop = true;
-            break;
+            return;
           default:
             String msg =
                   "Unhandled value of AMCommand: " + response.getAMCommand();
             LOG.error(msg);
             throw new YarnRuntimeException(msg);
           }
-          if(stop) {
-            // should probably stop heartbeating also YARN-763
-            break;
-          }
         }
         List<NodeReport> updatedNodes = response.getUpdatedNodes();
         if (!updatedNodes.isEmpty()) {

Modified: 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java?rev=1502916&r1=1502915&r2=1502916&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
 Sat Jul 13 23:38:27 2013
@@ -23,7 +23,10 @@ import static org.mockito.Matchers.anyIn
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -218,6 +221,65 @@ public class TestAMRMClientAsync {
     Assert.assertTrue(callbackHandler.callbackCount == 0);
   }
   
+  @Test (timeout = 10000)
+  public void testAMRMClientAsyncShutDown() throws Exception {
+    Configuration conf = new Configuration();
+    TestCallbackHandler callbackHandler = new TestCallbackHandler();
+    @SuppressWarnings("unchecked")
+    AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
+
+    final AllocateResponse shutDownResponse = createAllocateResponse(
+        new ArrayList<ContainerStatus>(), new ArrayList<Container>(), null);
+    shutDownResponse.setAMCommand(AMCommand.AM_SHUTDOWN);
+    when(client.allocate(anyFloat())).thenReturn(shutDownResponse);
+
+    AMRMClientAsync<ContainerRequest> asyncClient =
+        AMRMClientAsync.createAMRMClientAsync(client, 10, callbackHandler);
+    asyncClient.init(conf);
+    asyncClient.start();
+
+    asyncClient.registerApplicationMaster("localhost", 1234, null);
+
+    Thread.sleep(50);
+
+    verify(client, times(1)).allocate(anyFloat());
+    asyncClient.stop();
+  }
+
+  @Test (timeout = 5000)
+  public void testCallAMRMClientAsyncStopFromCallbackHandler()
+      throws YarnException, IOException, InterruptedException {
+    Configuration conf = new Configuration();
+    TestCallbackHandler2 callbackHandler = new TestCallbackHandler2();
+    @SuppressWarnings("unchecked")
+    AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
+
+    List<ContainerStatus> completed = Arrays.asList(
+        ContainerStatus.newInstance(newContainerId(0, 0, 0, 0),
+            ContainerState.COMPLETE, "", 0));
+    final AllocateResponse response = createAllocateResponse(completed,
+        new ArrayList<Container>(), null);
+
+    when(client.allocate(anyFloat())).thenReturn(response);
+
+    AMRMClientAsync<ContainerRequest> asyncClient =
+        AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
+    callbackHandler.registerAsyncClient(asyncClient);
+    asyncClient.init(conf);
+    asyncClient.start();
+
+    synchronized (callbackHandler.notifier) {
+      asyncClient.registerApplicationMaster("localhost", 1234, null);
+      while(callbackHandler.stop == false) {
+        try {
+          callbackHandler.notifier.wait();
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+    }
+  }
+
   private AllocateResponse createAllocateResponse(
       List<ContainerStatus> completed, List<Container> allocated,
       List<NMToken> nmTokens) {
@@ -323,4 +385,41 @@ public class TestAMRMClientAsync {
       }
     }
   }
+
+  private class TestCallbackHandler2 implements 
AMRMClientAsync.CallbackHandler {
+    Object notifier = new Object();
+    @SuppressWarnings("rawtypes")
+    AMRMClientAsync asynClient;
+    boolean stop = false;
+
+    @Override
+    public void onContainersCompleted(List<ContainerStatus> statuses) {}
+
+    @Override
+    public void onContainersAllocated(List<Container> containers) {}
+
+    @Override
+    public void onShutdownRequest() {}
+
+    @Override
+    public void onNodesUpdated(List<NodeReport> updatedNodes) {}
+
+    @Override
+    public float getProgress() {
+      asynClient.stop();
+      stop = true;
+      synchronized (notifier) {
+        notifier.notifyAll();
+      }
+      return 0;
+    }
+
+    @Override
+    public void onError(Exception e) {}
+
+    public void registerAsyncClient(
+        AMRMClientAsync<ContainerRequest> asyncClient) {
+      this.asynClient = asyncClient;
+    }
+  }
 }


Reply via email to