Author: sseth Date: Wed Aug 29 18:32:08 2012 New Revision: 1378674 URL: http://svn.apache.org/viewvc?rev=1378674&view=rev Log: MAPREDUCE-4599. Prevent contianer launches on blacklisted hosts. (Contributed by Tsuyoshi OZAWA)
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeImpl.java hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeMap.java Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902?rev=1378674&r1=1378673&r2=1378674&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 Wed Aug 29 18:32:08 2012 @@ -2,3 +2,5 @@ Branch MR-3902 MAPREDUCE-4581. TaskHeartbeatHandler should extend HeartbeatHandlerBase (Tsuyoshi OZAWA via sseth) MAPREDUCE-4602. Re-create ask list correctly in case of a temporary error in the AM-RM allocate call (sseth) + + MAPREDUCE-4599. Prevent contianer launches on blacklisted hosts. (Tsuyoshi OZAWA via sseth) Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java?rev=1378674&r1=1378673&r2=1378674&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java Wed Aug 29 18:32:08 2012 @@ -866,10 +866,10 @@ protected synchronized void handleEvent( // Blakclisted nodes should likely be removed immediately. // TODO Differentiation between blacklisted versus unusable nodes ? - //blackListed = appContext.getAllNodes().isHostBlackListed(allocatedHost); + boolean blackListed = appContext.getAllNodes().isHostBlackListed(allocatedHost); nodeUsable = appContext.getNode(allocated.getNodeId()).isUsable(); - if (!nodeUsable) { + if (!nodeUsable || blackListed) { // we need to request for a new container // and release the current one LOG.info("Got allocated container on an unusable " Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeImpl.java?rev=1378674&r1=1378673&r2=1378674&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeImpl.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeImpl.java Wed Aug 29 18:32:08 2012 @@ -237,6 +237,8 @@ public class AMNodeImpl implements AMNod node.numFailedTAs++; boolean shouldBlacklist = node.shouldBlacklistNode(); if (shouldBlacklist) { + node.sendEvent(new AMNodeEvent(node.getNodeId(), + AMNodeEventType.N_NODE_WAS_BLACKLISTED)); return AMNodeState.BLACKLISTED; // TODO XXX: An event likely needs to go out to the scheduler. } @@ -291,6 +293,8 @@ public class AMNodeImpl implements AMNod public AMNodeState transition(AMNodeImpl node, AMNodeEvent nEvent) { boolean shouldBlacklist = node.shouldBlacklistNode(); if (shouldBlacklist) { + node.sendEvent(new AMNodeEvent(node.getNodeId(), + AMNodeEventType.N_NODE_WAS_BLACKLISTED)); return AMNodeState.BLACKLISTED; // TODO XXX: An event likely needs to go out to the scheduler. } @@ -375,7 +379,6 @@ public class AMNodeImpl implements AMNod @Override public boolean isUsable() { - // TODO Auto-generated method stub this.readLock.lock(); try { return (EnumSet.of(AMNodeState.ACTIVE, AMNodeState.FORCED_ACTIVE) Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeMap.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeMap.java?rev=1378674&r1=1378673&r2=1378674&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeMap.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeMap.java Wed Aug 29 18:32:08 2012 @@ -1,68 +1,121 @@ package org.apache.hadoop.mapreduce.v2.app2.rm.node; +import java.util.ArrayList; import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.app2.AppContext; +import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.service.AbstractService; -// TODO Seems a little strange, extending ConcurrentHashMap like this. -// TODO This needs to extend AbstractService to get a handle on the conf. -@SuppressWarnings("serial") -public class AMNodeMap extends ConcurrentHashMap<NodeId, AMNode> implements +public class AMNodeMap extends AbstractService implements EventHandler<AMNodeEvent> { + static final Log LOG = LogFactory.getLog(AMNodeMap.class); - - private final EventHandler eventHandler; + private final ConcurrentHashMap<NodeId, AMNode> nodeMap; + // TODO XXX -> blacklistMap is also used for computing forcedUnblacklisting. + private final ConcurrentHashMap<String, ArrayList<NodeId>> blacklistMap; + private final EventHandler<?> eventHandler; private final AppContext appContext; - - public AMNodeMap(EventHandler eventHandler, AppContext appContext) { + private int maxTaskFailuresPerNode; + private boolean nodeBlacklistingEnabled; + private int blacklistDisablePercent; + + public AMNodeMap(EventHandler<?> eventHandler, AppContext appContext) { + super("AMNodeMap"); + this.nodeMap = new ConcurrentHashMap<NodeId, AMNode>(); + this.blacklistMap = new ConcurrentHashMap<String, ArrayList<NodeId>>(); this.eventHandler = eventHandler; this.appContext = appContext; - + // TODO XXX: Get a handle of allowed failures. } + @Override + public synchronized void init(Configuration config) { + Configuration conf = new Configuration(config); + this.maxTaskFailuresPerNode = conf.getInt( + MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 3); + this.nodeBlacklistingEnabled = config.getBoolean( + MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true); + this.blacklistDisablePercent = config.getInt( + MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, + MRJobConfig.DEFAULT_MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERCENT); + + LOG.info("maxTaskFailuresPerNode is " + maxTaskFailuresPerNode); + if (blacklistDisablePercent < -1 || blacklistDisablePercent > 100) { + throw new YarnException("Invalid blacklistDisablePercent: " + + blacklistDisablePercent + + ". Should be an integer between 0 and 100 or -1 to disabled"); + } + LOG.info("blacklistDisablePercent is " + blacklistDisablePercent); + + super.init(conf); + } + + public void nodeSeen(NodeId nodeId) { - // TODO Replace 3 with correct value. - putIfAbsent(nodeId, new AMNodeImpl(nodeId, 3, eventHandler, appContext)); + nodeMap.putIfAbsent(nodeId, new AMNodeImpl( + nodeId, maxTaskFailuresPerNode, eventHandler, appContext)); } public boolean isHostBlackListed(String hostname) { - return false; - // Node versus host blacklisting. - // TODO XXX -> Maintain a map of host to NodeList (case of multiple NMs) - // Provide functionality to say isHostBlacklisted(hostname) -> all hosts. - // ... blacklisted means don't ask for containers on this host. - // Same list to be used for computing forcedUnblacklisting. + if (!nodeBlacklistingEnabled) { + return false; + } + + return blacklistMap.containsKey(hostname); } + private void addToBlackList(NodeId nodeId) { + String host = nodeId.getHost(); + ArrayList<NodeId> nodes; + + if (!blacklistMap.containsKey(host)) { + nodes = new ArrayList<NodeId>(); + blacklistMap.put(host, nodes); + } else { + nodes = blacklistMap.get(host); + } + + if (!nodes.contains(nodeId)) { + nodes.add(nodeId); + } + } + + // TODO: Currently, un-blacklisting feature is not supported. + /* + private void removeFromBlackList(NodeId nodeId) { + String host = nodeId.getHost(); + if (blacklistMap.containsKey(host)) { + ArrayList<NodeId> nodes = blacklistMap.get(host); + nodes.remove(nodeId); + } + } + */ + public void handle(AMNodeEvent event) { if (event.getType() == AMNodeEventType.N_NODE_WAS_BLACKLISTED) { - // TODO Handle blacklisting. + NodeId nodeId = event.getNodeId(); + addToBlackList(nodeId); } else { NodeId nodeId = event.getNodeId(); - get(nodeId).handle(event); + nodeMap.get(nodeId).handle(event); } } + public AMNode get(NodeId nodeId) { + return nodeMap.get(nodeId); + } + public int size() { + return nodeMap.size(); + } -//nodeBlacklistingEnabled = -//conf.getBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true); -//LOG.info("nodeBlacklistingEnabled:" + nodeBlacklistingEnabled); -//maxTaskFailuresPerNode = -//conf.getInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 3); -//blacklistDisablePercent = -// conf.getInt( -// MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -// MRJobConfig.DEFAULT_MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERCENT); -//LOG.info("maxTaskFailuresPerNode is " + maxTaskFailuresPerNode); -//if (blacklistDisablePercent < -1 || blacklistDisablePercent > 100) { -//throw new YarnException("Invalid blacklistDisablePercent: " -// + blacklistDisablePercent -// + ". Should be an integer between 0 and 100 or -1 to disabled"); -//} -//LOG.info("blacklistDisablePercent is " + blacklistDisablePercent); }