Repository: hadoop Updated Branches: refs/heads/branch-2 824c32de1 -> 888a44563
YARN-3222. Fixed NPE on RMNodeImpl#ReconnectNodeTransition when a node is reconnected with a different port. Contributed by Rohith Sharmaks (cherry picked from commit b2f1ec312ee431aef762cfb49cb29cd6f4661e86) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/888a4456 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/888a4456 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/888a4456 Branch: refs/heads/branch-2 Commit: 888a44563819ba910dc3cc10d10ee0fb8f05db61 Parents: 824c32d Author: Jian He <jia...@apache.org> Authored: Tue Mar 3 16:25:57 2015 -0800 Committer: Jian He <jia...@apache.org> Committed: Tue Mar 3 16:28:55 2015 -0800 ---------------------------------------------------------------------- .../resourcemanager/rmnode/RMNodeImpl.java | 34 +++++++++++--------- .../yarn/server/resourcemanager/MockNM.java | 6 +++- .../TestResourceTrackerService.java | 17 +++++++++- 3 files changed, 39 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/888a4456/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 9701775..c556b80 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -571,12 +571,12 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { rmNode.nodeUpdateQueue.clear(); rmNode.context.getDispatcher().getEventHandler().handle( new NodeRemovedSchedulerEvent(rmNode)); - + if (rmNode.getHttpPort() == newNode.getHttpPort()) { // Reset heartbeat ID since node just restarted. rmNode.getLastNodeHeartBeatResponse().setResponseId(0); - if (rmNode.getState() != NodeState.UNHEALTHY) { - // Only add new node if old state is not UNHEALTHY + if (rmNode.getState().equals(NodeState.RUNNING)) { + // Only add new node if old state is RUNNING rmNode.context.getDispatcher().getEventHandler().handle( new NodeAddedSchedulerEvent(newNode)); } @@ -599,30 +599,32 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { } else { rmNode.httpPort = newNode.getHttpPort(); rmNode.httpAddress = newNode.getHttpAddress(); - rmNode.totalCapability = newNode.getTotalCapability(); + boolean isCapabilityChanged = false; + if (rmNode.getTotalCapability() != newNode.getTotalCapability()) { + rmNode.totalCapability = newNode.getTotalCapability(); + isCapabilityChanged = true; + } handleNMContainerStatus(reconnectEvent.getNMContainerStatuses(), rmNode); // Reset heartbeat ID since node just restarted. rmNode.getLastNodeHeartBeatResponse().setResponseId(0); - } - if (null != reconnectEvent.getRunningApplications()) { for (ApplicationId appId : reconnectEvent.getRunningApplications()) { handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId); } - } - rmNode.context.getDispatcher().getEventHandler().handle( - new NodesListManagerEvent( - NodesListManagerEventType.NODE_USABLE, rmNode)); - if (rmNode.getState().equals(NodeState.RUNNING)) { - // Update scheduler node's capacity for reconnect node. - rmNode.context.getDispatcher().getEventHandler().handle( - new NodeResourceUpdateSchedulerEvent(rmNode, - ResourceOption.newInstance(newNode.getTotalCapability(), -1))); + if (isCapabilityChanged + && rmNode.getState().equals(NodeState.RUNNING)) { + // Update scheduler node's capacity for reconnect node. + rmNode.context + .getDispatcher() + .getEventHandler() + .handle( + new NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption + .newInstance(newNode.getTotalCapability(), -1))); + } } - } private void handleNMContainerStatus( http://git-wip-us.apache.org/repos/asf/hadoop/blob/888a4456/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 5f53805..c917f79 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -51,7 +51,7 @@ public class MockNM { private final int memory; private final int vCores; private ResourceTrackerService resourceTracker; - private final int httpPort = 2; + private int httpPort = 2; private MasterKey currentContainerTokenMasterKey; private MasterKey currentNMTokenMasterKey; private String version; @@ -87,6 +87,10 @@ public class MockNM { return httpPort; } + public void setHttpPort(int port) { + httpPort = port; + } + public void setResourceTrackerService(ResourceTrackerService resourceTracker) { this.resourceTracker = resourceTracker; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/888a4456/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 7c12848..a904dc0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResp import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -623,7 +624,7 @@ public class TestResourceTrackerService { dispatcher.await(); Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); Assert.assertEquals(5120 + 10240, metrics.getAvailableMB()); - + // reconnect of node with changed capability and running applications List<ApplicationId> runningApps = new ArrayList<ApplicationId>(); runningApps.add(ApplicationId.newInstance(1, 0)); @@ -633,6 +634,20 @@ public class TestResourceTrackerService { dispatcher.await(); Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); Assert.assertEquals(5120 + 15360, metrics.getAvailableMB()); + + // reconnect healthy node changing http port + nm1 = new MockNM("host1:1234", 5120, rm.getResourceTrackerService()); + nm1.setHttpPort(3); + nm1.registerNode(); + dispatcher.await(); + response = nm1.nodeHeartbeat(true); + response = nm1.nodeHeartbeat(true); + dispatcher.await(); + RMNode rmNode = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + Assert.assertEquals(3, rmNode.getHttpPort()); + Assert.assertEquals(5120, rmNode.getTotalCapability().getMemory()); + Assert.assertEquals(5120 + 15360, metrics.getAvailableMB()); + } private void writeToHostsFile(String... hosts) throws IOException {