[ https://issues.apache.org/jira/browse/YARN-7214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171270#comment-16171270 ]
zhangshilong commented on YARN-7214: ------------------------------------ 3. {code:java} public static class AddNodeTransition implements SingleArcTransition<RMNodeImpl, RMNodeEvent> { @Override public void transition(RMNodeImpl rmNode, RMNodeEvent event) { // Inform the scheduler RMNodeStartedEvent startEvent = (RMNodeStartedEvent) event; List<NMContainerStatus> containers = null; NodeId nodeId = rmNode.nodeId; RMNode previousRMNode = rmNode.context.getInactiveRMNodes().remove(nodeId); if (previousRMNode != null) { rmNode.updateMetricsForRejoinedNode(previousRMNode.getState()); } else { NodeId unknownNodeId = NodesListManager.createUnknownNodeId(nodeId.getHost()); previousRMNode = rmNode.context.getInactiveRMNodes().remove(unknownNodeId); if (previousRMNode != null) { ClusterMetrics.getMetrics().decrDecommisionedNMs(); } // Increment activeNodes explicitly because this is a new node. ClusterMetrics.getMetrics().incrNumActiveNodes(); containers = startEvent.getNMContainerStatuses(); if (containers != null && !containers.isEmpty()) { for (NMContainerStatus container : containers) { if (container.getContainerState() == ContainerState.RUNNING || container.getContainerState() == ContainerState.SCHEDULED) { rmNode.launchedContainers.add(container.getContainerId()); } } } } if (null != startEvent.getRunningApplications()) { for (ApplicationId appId : startEvent.getRunningApplications()) { handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId); } } rmNode.context.getDispatcher().getEventHandler() .handle(new NodeAddedSchedulerEvent(rmNode, containers)); rmNode.context.getDispatcher().getEventHandler().handle( new NodesListManagerEvent( NodesListManagerEventType.NODE_USABLE, rmNode)); } } {code} 4、 in NodeStatusUpdaterImpl.java before register: getNMContainerStatuses will be called. So completedContainer will be put into recentlyStoppedContainers. in register request: completed containers will be sent to RM. {code:java} public void addCompletedContainer(ContainerId containerId) { synchronized (recentlyStoppedContainers) { removeVeryOldStoppedContainersFromCache(); if (!recentlyStoppedContainers.containsKey(containerId)) { recentlyStoppedContainers.put(containerId, System.currentTimeMillis() + durationToTrackStoppedContainers); } } } {code} normal heartbeat, getContainerStatuses is called. So completed container will not be put into containerStatuses beacause it is in recentlyStoppedContainers. So completed container will not be sent to RM. {code:java} protected List<ContainerStatus> getContainerStatuses() throws IOException { List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>(); for (Container container : this.context.getContainers().values()) { ContainerId containerId = container.getContainerId(); ApplicationId applicationId = containerId.getApplicationAttemptId() .getApplicationId(); org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus = container.cloneAndGetContainerStatus(); if (containerStatus.getState() == ContainerState.COMPLETE) { if (isApplicationStopped(applicationId)) { if (LOG.isDebugEnabled()) { LOG.debug(applicationId + " is completing, " + " remove " + containerId + " from NM context."); } context.getContainers().remove(containerId); pendingCompletedContainers.put(containerId, containerStatus); } else { if (!isContainerRecentlyStopped(containerId)) { pendingCompletedContainers.put(containerId, containerStatus); } } // Adding to finished containers cache. Cache will keep it around at // least for #durationToTrackStoppedContainers duration. In the // subsequent call to stop container it will get removed from cache. addCompletedContainer(containerId); } else { containerStatuses.add(containerStatus); } } containerStatuses.addAll(pendingCompletedContainers.values()); if (LOG.isDebugEnabled()) { LOG.debug("Sending out " + containerStatuses.size() + " container statuses: " + containerStatuses); } return containerStatuses; } {code} > duplicated container completed To AM > ------------------------------------ > > Key: YARN-7214 > URL: https://issues.apache.org/jira/browse/YARN-7214 > Project: Hadoop YARN > Issue Type: Bug > Affects Versions: 2.7.1, 3.0.0-alpha3 > Environment: hadoop 2.7.1 rm recovery and nm recovery enabled > Reporter: zhangshilong > > env: hadoop 2.7.1 with rm recovery and nm recovery enabled > case: > spark app(app1) running least one container(named c1) in NM1. > 1、NM1 crashed,and RM found NM1 expired in 10 minutes. > 2、RM will remove all containers in NM1(RMNodeImpl). and app1 will receive > c1 completed message.But RM can not send c1(to be removed) to NM1 because NM1 > lost. > 3、NM1 restart and register with RM(c1 in register request),but RM found NM1 > is lost and will not handle containers from NM1. > 4、NM1 will not heartbeat with c1(c1 not in heartbeat request). So c1 will > not removed from context of NM1. > 5、 RM restart, NM1 re register with RM。And c1 will be handled and recovered. > RM will send c1 complted message to AM of app1. So, app1 received duplicated > c1. > once spark AM receive one container completed from RM, it will allocate one > new container. -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: yarn-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: yarn-issues-h...@hadoop.apache.org