Author: vinodkv
Date: Mon Jul 29 18:15:51 2013
New Revision: 1508160

URL: http://svn.apache.org/r1508160
Log:
YARN-245. Fixed NodeManager to handle duplicate responses from ResourceManager. 
Contributed by Mayank Bansal.
svn merge --ignore-ancestry -c 1508157 ../../trunk/

Modified:
    hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/CHANGES.txt
    
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
    
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
    
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/CHANGES.txt?rev=1508160&r1=1508159&r2=1508160&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/CHANGES.txt 
(original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/CHANGES.txt Mon 
Jul 29 18:15:51 2013
@@ -733,6 +733,11 @@ Release 2.1.0-beta - 2013-07-02
     YARN-960. Fixed ResourceManager to propagate client-submitted credentials
     irrespective of security. (Daryn Sharp via vinodkv)
 
+    YARN-937. Fix unmanaged AM in non-secure/secure setup post YARN-701. (tucu)
+
+    YARN-245. Fixed NodeManager to handle duplicate responses from
+    ResourceManager. (Mayank Bansal via vinodkv)
+
   BREAKDOWN OF HADOOP-8562/YARN-191 SUBTASKS AND RELATED JIRAS
 
     YARN-158. Yarn creating package-info.java must not depend on sh.
@@ -798,8 +803,6 @@ Release 2.1.0-beta - 2013-07-02
     YARN-909. Disable TestLinuxContainerExecutorWithMocks on Windows. (Chuan 
Liu
     via cnauroth)
 
-    YARN-937. Fix unmanaged AM in non-secure/secure setup post YARN-701. (tucu)
-
 Release 2.0.5-alpha - 06/06/2013
 
   INCOMPATIBLE CHANGES

Modified: 
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1508160&r1=1508159&r2=1508160&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
 (original)
+++ 
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
 Mon Jul 29 18:15:51 2013
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.Atomi
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.SecurityUtil;
@@ -158,7 +159,7 @@ public class NodeManager extends Composi
     addService(del);
 
     // NodeManager level dispatcher
-    this.dispatcher = new AsyncDispatcher();
+    this.dispatcher = (AsyncDispatcher) createDispatcher();
 
     nodeHealthChecker = new NodeHealthCheckerService();
     addService(nodeHealthChecker);
@@ -203,6 +204,16 @@ public class NodeManager extends Composi
     // TODO add local dirs to del
   }
 
+  @Private
+  protected Dispatcher createDispatcher(){
+    return new AsyncDispatcher();
+  }
+  
+  @Private
+  public Dispatcher getDispatcher(){
+    return this.dispatcher;
+  }
+  
   @Override
   protected void serviceStart() throws Exception {
     try {

Modified: 
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1508160&r1=1508159&r2=1508160&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
 (original)
+++ 
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
 Mon Jul 29 18:15:51 2013
@@ -369,6 +369,13 @@ public class NodeStatusUpdaterImpl exten
               .setLastKnownNMTokenMasterKey(NodeStatusUpdaterImpl.this.context
                 .getNMTokenSecretManager().getCurrentKey());
             response = resourceTracker.nodeHeartbeat(request);
+            // Checking if the response id is the same which we just processed
+            // If yes then ignore the update.
+            if (lastHeartBeatID != response.getResponseId() - 1) {
+              LOG.info("Discarding the duplicate response "
+                  + response.getResponseId());
+              continue;
+            }
             //get next heartbeat interval from response
             nextHeartBeatInterval = response.getNextHeartBeatInterval();
             updateMasterKeys(response);
@@ -395,7 +402,6 @@ public class NodeStatusUpdaterImpl exten
                   new NodeManagerEvent(NodeManagerEventType.RESYNC));
               break;
             }
-
             lastHeartBeatID = response.getResponseId();
             List<ContainerId> containersToCleanup = response
                 .getContainersToCleanup();

Modified: 
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1508160&r1=1508159&r2=1508160&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
 (original)
+++ 
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
 Mon Jul 29 18:15:51 2013
@@ -59,7 +59,9 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.client.RMProxy;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -431,6 +433,26 @@ public class TestNodeStatusUpdater {
     }
   }
   
+  private class MyNodeManager7 extends NodeManager {
+    private ResourceTracker resourceTracker;
+    private MyNodeStatusUpdater3 nodeStatusUpdater;
+
+    @Override
+    protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+        Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+      this.nodeStatusUpdater =
+          new MyNodeStatusUpdater3(context, dispatcher, healthChecker, 
metrics);
+      resourceTracker = new MyResourceTracker7(context);
+      this.nodeStatusUpdater.resourceTracker = resourceTracker;
+
+      return this.nodeStatusUpdater;
+    }
+
+    protected MyNodeStatusUpdater3 getNodeStatusUpdater() {
+      return this.nodeStatusUpdater;
+    }
+  }
+
   private class MyNodeManager2 extends NodeManager {
     public boolean isStopped = false;
     private NodeStatusUpdater nodeStatusUpdater;
@@ -552,6 +574,68 @@ public class TestNodeStatusUpdater {
     }
   }
 
+  private class MyResourceTracker7 implements ResourceTracker {
+    public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
+    public NodeAction registerNodeAction = NodeAction.NORMAL;
+    private final Context context;
+    private int lastRequestedHeartBeat = 0;
+    private boolean gotDuplicateHeartBeatRequest = false;
+    private ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+
+    MyResourceTracker7(Context context) {
+      this.context = context;
+    }
+
+    @Override
+    public RegisterNodeManagerResponse registerNodeManager(
+        RegisterNodeManagerRequest request) throws YarnException, IOException {
+      RegisterNodeManagerResponse response =
+          recordFactory.newRecordInstance(RegisterNodeManagerResponse.class);
+      response.setNodeAction(registerNodeAction);
+      response.setContainerTokenMasterKey(createMasterKey());
+      response.setNMTokenMasterKey(createMasterKey());
+      return response;
+    }
+
+    @Override
+    public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
+        throws YarnException, IOException {
+
+      if (lastRequestedHeartBeat != 0
+          && lastRequestedHeartBeat == 
request.getNodeStatus().getResponseId()) {
+        LOG.info("GOT Duplicate heartbeatId "
+            + request.getNodeStatus().getResponseId());
+        gotDuplicateHeartBeatRequest = true;
+      }
+      lastRequestedHeartBeat = request.getNodeStatus().getResponseId();
+      LOG.info("Got heartBeatId: [" + heartBeatID + "]");
+      NodeStatus nodeStatus = request.getNodeStatus();
+      nodeStatus.setResponseId(heartBeatID++);
+      NodeHeartbeatResponse nhResponse =
+          YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID,
+            heartBeatNodeAction, null, null, null, null, 1000L);
+
+      if (heartBeatID == 5) {
+        LOG.info("Sending FINISH_APP for application: [" + appId + "]");
+        this.context.getApplications().put(appId, mock(Application.class));
+        nhResponse
+          .addAllApplicationsToCleanup(Collections.singletonList(appId));
+      }
+      if (heartBeatID == 6) {
+        nhResponse.setResponseId(5);
+        LOG.info("Sending FINISH_APP for application: [" + appId + "]");
+        this.context.getApplications().put(appId, mock(Application.class));
+        nhResponse
+          .addAllApplicationsToCleanup(Collections.singletonList(appId));
+      }
+      return nhResponse;
+    }
+
+    public boolean isGotDuplicateHeartBeatRequest() {
+      return gotDuplicateHeartBeatRequest;
+    }
+  }
+  
   private class MyResourceTracker4 implements ResourceTracker {
 
     public NodeAction registerNodeAction = NodeAction.NORMAL;
@@ -745,7 +829,7 @@ public class TestNodeStatusUpdater {
     lfs.delete(new Path(basedir.getPath()), true);
   }
 
-  @Test
+  @Test(timeout = 60000)
   public void testNMRegistration() throws InterruptedException {
     nm = new NodeManager() {
       @Override
@@ -805,7 +889,7 @@ public class TestNodeStatusUpdater {
     nm.stop();
   }
   
-  @Test
+  @Test(timeout = 60000)
   public void testStopReentrant() throws Exception {
     final AtomicInteger numCleanups = new AtomicInteger(0);
     nm = new NodeManager() {
@@ -851,7 +935,49 @@ public class TestNodeStatusUpdater {
     Assert.assertEquals(numCleanups.get(), 1);
   }
 
-  @Test
+  @SuppressWarnings("rawtypes")
+  class MyDispatcher7 extends AsyncDispatcher {
+    public volatile int finishapp_event;
+
+    protected void dispatch(Event event) {
+      if (event.getType().name()
+        .equals(ContainerManagerEventType.FINISH_APPS.toString())) {
+        ++finishapp_event;
+      }
+    }
+  }
+
+  @Test(timeout = 60000)
+  public void testDuplicateResponseFromRM() throws Exception {
+    MyNodeManager7 nm = new MyNodeManager7() {
+      protected Dispatcher createDispatcher() {
+        return new MyDispatcher7();
+      }
+    };
+    try {
+      YarnConfiguration conf = createNMConfig();
+      conf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 4000l);
+      nm.init(conf);
+      nm.start();
+      MyResourceTracker7 rt =
+          (MyResourceTracker7) nm.getNodeStatusUpdater().getRMClient();
+      while (heartBeatID < 7) {
+        Thread.sleep(1000l);
+      }
+      Assert.assertTrue(rt.isGotDuplicateHeartBeatRequest());
+
+      MyDispatcher7 nmdispatcher = (MyDispatcher7) nm.getDispatcher();
+      // We are sending two FINISH_APPS in heartbeat 5 and 6
+      // Checking we get only one time FINISH_APPS event which is the first one
+      Assert.assertEquals(1, nmdispatcher.finishapp_event);
+
+    } finally {
+      if (nm.getServiceState() == STATE.STARTED)
+        nm.stop();
+    }
+  }
+  
+  @Test(timeout = 60000)
   public void testNodeDecommision() throws Exception {
     nm = getNodeManager(NodeAction.SHUTDOWN);
     YarnConfiguration conf = createNMConfig();
@@ -898,7 +1024,7 @@ public class TestNodeStatusUpdater {
                                                        
NodeHealthCheckerService healthChecker);
   }
 
-  @Test
+  @Test(timeout = 60000)
   public void testNMShutdownForRegistrationFailure() throws Exception {
 
     nm = new NodeManagerWithCustomNodeStatusUpdater() {
@@ -1011,7 +1137,7 @@ public class TestNodeStatusUpdater {
    * started properly, RM will think that the NM is alive and will retire the 
NM
    * only after NM_EXPIRY interval. See MAPREDUCE-2749.
    */
-  @Test
+  @Test(timeout = 60000)
   public void testNoRegistrationWhenNMServicesFail() throws Exception {
 
     nm = new NodeManager() {
@@ -1042,7 +1168,7 @@ public class TestNodeStatusUpdater {
     verifyNodeStartFailure("Starting of RPC Server failed");
   }
 
-  @Test
+  @Test(timeout = 60000)
   public void testApplicationKeepAlive() throws Exception {
     MyNodeManager nm = new MyNodeManager();
     try {


Reply via email to