[ https://issues.apache.org/jira/browse/YARN-101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13620261#comment-13620261 ]
Xuan Gong commented on YARN-101: -------------------------------- 1.Use YarnServerBuilderUtils for constructing node-heartbeat response 2.User BuilderUtils to create ApplicationId, ContainerId, ContainerStatus, etc 3.Recreated the test case as last comment suggested > If the heartbeat message loss, the nodestatus info of complete container > will loss too. > ---------------------------------------------------------------------------------------- > > Key: YARN-101 > URL: https://issues.apache.org/jira/browse/YARN-101 > Project: Hadoop YARN > Issue Type: Bug > Components: nodemanager > Environment: suse. > Reporter: xieguiming > Assignee: Xuan Gong > Priority: Minor > Attachments: YARN-101.1.patch, YARN-101.2.patch, YARN-101.3.patch, > YARN-101.4.patch, YARN-101.5.patch > > > see the red color: > org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl.java > protected void startStatusUpdater() { > new Thread("Node Status Updater") { > @Override > @SuppressWarnings("unchecked") > public void run() { > int lastHeartBeatID = 0; > while (!isStopped) { > // Send heartbeat > try { > synchronized (heartbeatMonitor) { > heartbeatMonitor.wait(heartBeatInterval); > } > {color:red} > // Before we send the heartbeat, we get the NodeStatus, > // whose method removes completed containers. > NodeStatus nodeStatus = getNodeStatus(); > {color} > nodeStatus.setResponseId(lastHeartBeatID); > > NodeHeartbeatRequest request = recordFactory > .newRecordInstance(NodeHeartbeatRequest.class); > request.setNodeStatus(nodeStatus); > {color:red} > // But if the nodeHeartbeat fails, we've already removed the > containers away to know about it. We aren't handling a nodeHeartbeat failure > case here. > HeartbeatResponse response = > resourceTracker.nodeHeartbeat(request).getHeartbeatResponse(); > {color} > if (response.getNodeAction() == NodeAction.SHUTDOWN) { > LOG > .info("Recieved SHUTDOWN signal from Resourcemanager as > part of heartbeat," + > " hence shutting down."); > NodeStatusUpdaterImpl.this.stop(); > break; > } > if (response.getNodeAction() == NodeAction.REBOOT) { > LOG.info("Node is out of sync with ResourceManager," > + " hence rebooting."); > NodeStatusUpdaterImpl.this.reboot(); > break; > } > lastHeartBeatID = response.getResponseId(); > List<ContainerId> containersToCleanup = response > .getContainersToCleanupList(); > if (containersToCleanup.size() != 0) { > dispatcher.getEventHandler().handle( > new CMgrCompletedContainersEvent(containersToCleanup)); > } > List<ApplicationId> appsToCleanup = > response.getApplicationsToCleanupList(); > //Only start tracking for keepAlive on FINISH_APP > trackAppsForKeepAlive(appsToCleanup); > if (appsToCleanup.size() != 0) { > dispatcher.getEventHandler().handle( > new CMgrCompletedAppsEvent(appsToCleanup)); > } > } catch (Throwable e) { > // TODO Better error handling. Thread can die with the rest of the > // NM still running. > LOG.error("Caught exception in status-updater", e); > } > } > } > }.start(); > } > private NodeStatus getNodeStatus() { > NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class); > nodeStatus.setNodeId(this.nodeId); > int numActiveContainers = 0; > List<ContainerStatus> containersStatuses = new > ArrayList<ContainerStatus>(); > for (Iterator<Entry<ContainerId, Container>> i = > this.context.getContainers().entrySet().iterator(); i.hasNext();) { > Entry<ContainerId, Container> e = i.next(); > ContainerId containerId = e.getKey(); > Container container = e.getValue(); > // Clone the container to send it to the RM > org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus = > container.cloneAndGetContainerStatus(); > containersStatuses.add(containerStatus); > ++numActiveContainers; > LOG.info("Sending out status for container: " + containerStatus); > {color:red} > // Here is the part that removes the completed containers. > if (containerStatus.getState() == ContainerState.COMPLETE) { > // Remove > i.remove(); > {color} > LOG.info("Removed completed container " + containerId); > } > } > nodeStatus.setContainersStatuses(containersStatuses); > LOG.debug(this.nodeId + " sending out status for " > + numActiveContainers + " containers"); > NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus(); > nodeHealthStatus.setHealthReport(healthChecker.getHealthReport()); > nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy()); > nodeHealthStatus.setLastHealthReportTime( > healthChecker.getLastHealthReportTime()); > if (LOG.isDebugEnabled()) { > LOG.debug("Node's health-status : " + > nodeHealthStatus.getIsNodeHealthy() > + ", " + nodeHealthStatus.getHealthReport()); > } > nodeStatus.setNodeHealthStatus(nodeHealthStatus); > List<ApplicationId> keepAliveAppIds = createKeepAliveApplicationList(); > nodeStatus.setKeepAliveApplications(keepAliveAppIds); > > return nodeStatus; > } -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira