Author: vinodkv Date: Mon Jul 29 18:15:51 2013 New Revision: 1508160 URL: http://svn.apache.org/r1508160 Log: YARN-245. Fixed NodeManager to handle duplicate responses from ResourceManager. Contributed by Mayank Bansal. svn merge --ignore-ancestry -c 1508157 ../../trunk/
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/CHANGES.txt hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/CHANGES.txt?rev=1508160&r1=1508159&r2=1508160&view=diff ============================================================================== --- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/CHANGES.txt Mon Jul 29 18:15:51 2013 @@ -733,6 +733,11 @@ Release 2.1.0-beta - 2013-07-02 YARN-960. Fixed ResourceManager to propagate client-submitted credentials irrespective of security. (Daryn Sharp via vinodkv) + YARN-937. Fix unmanaged AM in non-secure/secure setup post YARN-701. (tucu) + + YARN-245. Fixed NodeManager to handle duplicate responses from + ResourceManager. (Mayank Bansal via vinodkv) + BREAKDOWN OF HADOOP-8562/YARN-191 SUBTASKS AND RELATED JIRAS YARN-158. Yarn creating package-info.java must not depend on sh. @@ -798,8 +803,6 @@ Release 2.1.0-beta - 2013-07-02 YARN-909. Disable TestLinuxContainerExecutorWithMocks on Windows. (Chuan Liu via cnauroth) - YARN-937. Fix unmanaged AM in non-secure/secure setup post YARN-701. (tucu) - Release 2.0.5-alpha - 06/06/2013 INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1508160&r1=1508159&r2=1508160&view=diff ============================================================================== --- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original) +++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Mon Jul 29 18:15:51 2013 @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.Atomi import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.SecurityUtil; @@ -158,7 +159,7 @@ public class NodeManager extends Composi addService(del); // NodeManager level dispatcher - this.dispatcher = new AsyncDispatcher(); + this.dispatcher = (AsyncDispatcher) createDispatcher(); nodeHealthChecker = new NodeHealthCheckerService(); addService(nodeHealthChecker); @@ -203,6 +204,16 @@ public class NodeManager extends Composi // TODO add local dirs to del } + @Private + protected Dispatcher createDispatcher(){ + return new AsyncDispatcher(); + } + + @Private + public Dispatcher getDispatcher(){ + return this.dispatcher; + } + @Override protected void serviceStart() throws Exception { try { Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1508160&r1=1508159&r2=1508160&view=diff ============================================================================== --- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original) +++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Mon Jul 29 18:15:51 2013 @@ -369,6 +369,13 @@ public class NodeStatusUpdaterImpl exten .setLastKnownNMTokenMasterKey(NodeStatusUpdaterImpl.this.context .getNMTokenSecretManager().getCurrentKey()); response = resourceTracker.nodeHeartbeat(request); + // Checking if the response id is the same which we just processed + // If yes then ignore the update. + if (lastHeartBeatID != response.getResponseId() - 1) { + LOG.info("Discarding the duplicate response " + + response.getResponseId()); + continue; + } //get next heartbeat interval from response nextHeartBeatInterval = response.getNextHeartBeatInterval(); updateMasterKeys(response); @@ -395,7 +402,6 @@ public class NodeStatusUpdaterImpl exten new NodeManagerEvent(NodeManagerEventType.RESYNC)); break; } - lastHeartBeatID = response.getResponseId(); List<ContainerId> containersToCleanup = response .getContainersToCleanup(); Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1508160&r1=1508159&r2=1508160&view=diff ============================================================================== --- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original) +++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Mon Jul 29 18:15:51 2013 @@ -59,7 +59,9 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.client.RMProxy; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -431,6 +433,26 @@ public class TestNodeStatusUpdater { } } + private class MyNodeManager7 extends NodeManager { + private ResourceTracker resourceTracker; + private MyNodeStatusUpdater3 nodeStatusUpdater; + + @Override + protected NodeStatusUpdater createNodeStatusUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + this.nodeStatusUpdater = + new MyNodeStatusUpdater3(context, dispatcher, healthChecker, metrics); + resourceTracker = new MyResourceTracker7(context); + this.nodeStatusUpdater.resourceTracker = resourceTracker; + + return this.nodeStatusUpdater; + } + + protected MyNodeStatusUpdater3 getNodeStatusUpdater() { + return this.nodeStatusUpdater; + } + } + private class MyNodeManager2 extends NodeManager { public boolean isStopped = false; private NodeStatusUpdater nodeStatusUpdater; @@ -552,6 +574,68 @@ public class TestNodeStatusUpdater { } } + private class MyResourceTracker7 implements ResourceTracker { + public NodeAction heartBeatNodeAction = NodeAction.NORMAL; + public NodeAction registerNodeAction = NodeAction.NORMAL; + private final Context context; + private int lastRequestedHeartBeat = 0; + private boolean gotDuplicateHeartBeatRequest = false; + private ApplicationId appId = BuilderUtils.newApplicationId(1, 1); + + MyResourceTracker7(Context context) { + this.context = context; + } + + @Override + public RegisterNodeManagerResponse registerNodeManager( + RegisterNodeManagerRequest request) throws YarnException, IOException { + RegisterNodeManagerResponse response = + recordFactory.newRecordInstance(RegisterNodeManagerResponse.class); + response.setNodeAction(registerNodeAction); + response.setContainerTokenMasterKey(createMasterKey()); + response.setNMTokenMasterKey(createMasterKey()); + return response; + } + + @Override + public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) + throws YarnException, IOException { + + if (lastRequestedHeartBeat != 0 + && lastRequestedHeartBeat == request.getNodeStatus().getResponseId()) { + LOG.info("GOT Duplicate heartbeatId " + + request.getNodeStatus().getResponseId()); + gotDuplicateHeartBeatRequest = true; + } + lastRequestedHeartBeat = request.getNodeStatus().getResponseId(); + LOG.info("Got heartBeatId: [" + heartBeatID + "]"); + NodeStatus nodeStatus = request.getNodeStatus(); + nodeStatus.setResponseId(heartBeatID++); + NodeHeartbeatResponse nhResponse = + YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID, + heartBeatNodeAction, null, null, null, null, 1000L); + + if (heartBeatID == 5) { + LOG.info("Sending FINISH_APP for application: [" + appId + "]"); + this.context.getApplications().put(appId, mock(Application.class)); + nhResponse + .addAllApplicationsToCleanup(Collections.singletonList(appId)); + } + if (heartBeatID == 6) { + nhResponse.setResponseId(5); + LOG.info("Sending FINISH_APP for application: [" + appId + "]"); + this.context.getApplications().put(appId, mock(Application.class)); + nhResponse + .addAllApplicationsToCleanup(Collections.singletonList(appId)); + } + return nhResponse; + } + + public boolean isGotDuplicateHeartBeatRequest() { + return gotDuplicateHeartBeatRequest; + } + } + private class MyResourceTracker4 implements ResourceTracker { public NodeAction registerNodeAction = NodeAction.NORMAL; @@ -745,7 +829,7 @@ public class TestNodeStatusUpdater { lfs.delete(new Path(basedir.getPath()), true); } - @Test + @Test(timeout = 60000) public void testNMRegistration() throws InterruptedException { nm = new NodeManager() { @Override @@ -805,7 +889,7 @@ public class TestNodeStatusUpdater { nm.stop(); } - @Test + @Test(timeout = 60000) public void testStopReentrant() throws Exception { final AtomicInteger numCleanups = new AtomicInteger(0); nm = new NodeManager() { @@ -851,7 +935,49 @@ public class TestNodeStatusUpdater { Assert.assertEquals(numCleanups.get(), 1); } - @Test + @SuppressWarnings("rawtypes") + class MyDispatcher7 extends AsyncDispatcher { + public volatile int finishapp_event; + + protected void dispatch(Event event) { + if (event.getType().name() + .equals(ContainerManagerEventType.FINISH_APPS.toString())) { + ++finishapp_event; + } + } + } + + @Test(timeout = 60000) + public void testDuplicateResponseFromRM() throws Exception { + MyNodeManager7 nm = new MyNodeManager7() { + protected Dispatcher createDispatcher() { + return new MyDispatcher7(); + } + }; + try { + YarnConfiguration conf = createNMConfig(); + conf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 4000l); + nm.init(conf); + nm.start(); + MyResourceTracker7 rt = + (MyResourceTracker7) nm.getNodeStatusUpdater().getRMClient(); + while (heartBeatID < 7) { + Thread.sleep(1000l); + } + Assert.assertTrue(rt.isGotDuplicateHeartBeatRequest()); + + MyDispatcher7 nmdispatcher = (MyDispatcher7) nm.getDispatcher(); + // We are sending two FINISH_APPS in heartbeat 5 and 6 + // Checking we get only one time FINISH_APPS event which is the first one + Assert.assertEquals(1, nmdispatcher.finishapp_event); + + } finally { + if (nm.getServiceState() == STATE.STARTED) + nm.stop(); + } + } + + @Test(timeout = 60000) public void testNodeDecommision() throws Exception { nm = getNodeManager(NodeAction.SHUTDOWN); YarnConfiguration conf = createNMConfig(); @@ -898,7 +1024,7 @@ public class TestNodeStatusUpdater { NodeHealthCheckerService healthChecker); } - @Test + @Test(timeout = 60000) public void testNMShutdownForRegistrationFailure() throws Exception { nm = new NodeManagerWithCustomNodeStatusUpdater() { @@ -1011,7 +1137,7 @@ public class TestNodeStatusUpdater { * started properly, RM will think that the NM is alive and will retire the NM * only after NM_EXPIRY interval. See MAPREDUCE-2749. */ - @Test + @Test(timeout = 60000) public void testNoRegistrationWhenNMServicesFail() throws Exception { nm = new NodeManager() { @@ -1042,7 +1168,7 @@ public class TestNodeStatusUpdater { verifyNodeStartFailure("Starting of RPC Server failed"); } - @Test + @Test(timeout = 60000) public void testApplicationKeepAlive() throws Exception { MyNodeManager nm = new MyNodeManager(); try {