Author: sseth Date: Tue Apr 21 18:08:56 2015 New Revision: 1675176 URL: http://svn.apache.org/r1675176 Log: HIVE-10408. Fix NPE in scheduler in case of rejected tasks. (Siddharth Seth)
Added: hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/ContainerFactory.java Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapYarnRegistryImpl.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java hive/branches/llap/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapYarnRegistryImpl.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapYarnRegistryImpl.java?rev=1675176&r1=1675175&r2=1675176&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapYarnRegistryImpl.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapYarnRegistryImpl.java Tue Apr 21 18:08:56 2015 @@ -19,6 +19,7 @@ import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -26,8 +27,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.PathNotFoundException; import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstance; import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstanceSet; @@ -37,7 +38,6 @@ import org.apache.hadoop.registry.client import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; import org.apache.hadoop.registry.client.binding.RegistryUtils; import org.apache.hadoop.registry.client.binding.RegistryUtils.ServiceRecordMarshal; -import org.apache.hadoop.registry.client.exceptions.InvalidRecordException; import org.apache.hadoop.registry.client.impl.zk.RegistryOperationsService; import org.apache.hadoop.registry.client.types.AddressTypes; import org.apache.hadoop.registry.client.types.Endpoint; @@ -55,9 +55,10 @@ public class LlapYarnRegistryImpl implem private static final Logger LOG = Logger.getLogger(LlapYarnRegistryImpl.class); private RegistryOperationsService client; - private String instanceName; - private Configuration conf; - private ServiceRecordMarshal encoder; + private final String instanceName; + private final Configuration conf; + private final ServiceRecordMarshal encoder; + private final String path; private final DynamicServiceInstanceSet instances = new DynamicServiceInstanceSet(); @@ -68,7 +69,8 @@ public class LlapYarnRegistryImpl implem private final static String SERVICE_CLASS = "org-apache-hive"; - final ScheduledExecutorService refresher = Executors.newScheduledThreadPool(1); + final ScheduledExecutorService refresher = Executors.newScheduledThreadPool(1, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapYarnRegistryRefresher").build()); final long refreshDelay; static { @@ -90,6 +92,8 @@ public class LlapYarnRegistryImpl implem // registry reference client = (RegistryOperationsService) RegistryOperationsFactory.createInstance(conf); encoder = new RegistryUtils.ServiceRecordMarshal(); + this.path = RegistryPathUtils.join(RegistryUtils.componentPath(RegistryUtils.currentUser(), + SERVICE_CLASS, instanceName, "workers"), "worker-"); refreshDelay = conf.getInt(LlapConfiguration.LLAP_DAEMON_SERVICE_REFRESH_INTERVAL, LlapConfiguration.LLAP_DAEMON_SERVICE_REFRESH_INTERVAL_DEFAULT); @@ -114,8 +118,7 @@ public class LlapYarnRegistryImpl implem } private final String getPath() { - return RegistryPathUtils.join(RegistryUtils.componentPath(RegistryUtils.currentUser(), - SERVICE_CLASS, instanceName, "workers"), "worker-"); + return this.path; } @Override @@ -199,7 +202,8 @@ public class LlapYarnRegistryImpl implem } public void kill() { - LOG.info("Killing " + this); + // May be possible to generate a notification back to the scheduler from here. + LOG.info("Killing service instance: " + this); this.alive = false; } @@ -217,74 +221,90 @@ public class LlapYarnRegistryImpl implem @Override public String toString() { - return "DynamicServiceInstance [alive=" + alive + ", host=" + host + ":" + rpcPort + "]"; + return "DynamicServiceInstance [alive=" + alive + ", host=" + host + ":" + rpcPort + " with resources=" + getResource() +"]"; } + + // Relying on the identity hashCode and equality, since refreshing instances retains the old copy + // of an already known instance. } private class DynamicServiceInstanceSet implements ServiceInstanceSet { - Map<String, ServiceInstance> instances; + // LinkedHashMap to retain iteration order. + private final Map<String, ServiceInstance> instances = new LinkedHashMap<>(); @Override - public Map<String, ServiceInstance> getAll() { - return instances; + public synchronized Map<String, ServiceInstance> getAll() { + // Return a copy. Instances may be modified during a refresh. + return new LinkedHashMap<>(instances); } @Override - public ServiceInstance getInstance(String name) { + public synchronized ServiceInstance getInstance(String name) { return instances.get(name); } @Override - public synchronized void refresh() throws IOException { + public void refresh() throws IOException { /* call this from wherever */ Map<String, ServiceInstance> freshInstances = new HashMap<String, ServiceInstance>(); String path = getPath(); Map<String, ServiceRecord> records = RegistryUtils.listServiceRecords(client, RegistryPathUtils.parentOf(path)); - Set<String> latestKeys = new HashSet<String>(); - LOG.info("Starting to refresh ServiceInstanceSet " + System.identityHashCode(this)); - for (ServiceRecord rec : records.values()) { - ServiceInstance instance = new DynamicServiceInstance(rec); - if (instance != null) { - if (instances != null && instances.containsKey(instance.getWorkerIdentity()) == false) { - // add a new object - freshInstances.put(instance.getWorkerIdentity(), instance); - if (LOG.isInfoEnabled()) { - LOG.info("Adding new worker " + instance.getWorkerIdentity() + " which mapped to " - + instance); + // Synchronize after reading the service records from the external service (ZK) + synchronized (this) { + Set<String> latestKeys = new HashSet<String>(); + LOG.info("Starting to refresh ServiceInstanceSet " + System.identityHashCode(this)); + for (ServiceRecord rec : records.values()) { + ServiceInstance instance = new DynamicServiceInstance(rec); + if (instance != null) { + if (instances != null && instances.containsKey(instance.getWorkerIdentity()) == false) { + // add a new object + freshInstances.put(instance.getWorkerIdentity(), instance); + if (LOG.isInfoEnabled()) { + LOG.info("Adding new worker " + instance.getWorkerIdentity() + " which mapped to " + + instance); + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Retaining running worker " + instance.getWorkerIdentity() + + " which mapped to " + instance); + } } - } else if (LOG.isDebugEnabled()) { - LOG.debug("Retaining running worker " + instance.getWorkerIdentity() + " which mapped to " + instance); } + latestKeys.add(instance.getWorkerIdentity()); } - latestKeys.add(instance.getWorkerIdentity()); - } - if (instances != null) { - // deep-copy before modifying - Set<String> oldKeys = new HashSet(instances.keySet()); - if (oldKeys.removeAll(latestKeys)) { - for (String k : oldKeys) { - // this is so that people can hold onto ServiceInstance references as placeholders for tasks - final DynamicServiceInstance dead = (DynamicServiceInstance) instances.get(k); - dead.kill(); - if (LOG.isInfoEnabled()) { - LOG.info("Deleting dead worker " + k + " which mapped to " + dead); + if (instances != null) { + // deep-copy before modifying + Set<String> oldKeys = new HashSet(instances.keySet()); + if (oldKeys.removeAll(latestKeys)) { + // This is all the records which have not checked in, and are effectively dead. + for (String k : oldKeys) { + // this is so that people can hold onto ServiceInstance references as placeholders for tasks + final DynamicServiceInstance dead = (DynamicServiceInstance) instances.get(k); + dead.kill(); + if (LOG.isInfoEnabled()) { + LOG.info("Deleting dead worker " + k + " which mapped to " + dead); + } } } + // oldKeys contains the set of dead instances at this point. + this.instances.keySet().removeAll(oldKeys); + this.instances.putAll(freshInstances); + } else { + this.instances.putAll(freshInstances); } - this.instances.keySet().removeAll(oldKeys); - this.instances.putAll(freshInstances); - } else { - this.instances = freshInstances; } } @Override - public Set<ServiceInstance> getByHost(String host) { + public synchronized Set<ServiceInstance> getByHost(String host) { + // TODO Maybe store this as a map which is populated during construction, to avoid walking + // the map on each request. Set<ServiceInstance> byHost = new HashSet<ServiceInstance>(); + for (ServiceInstance i : instances.values()) { if (host.equals(i.getHost())) { // all hosts in instances should be alive in this impl Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java?rev=1675176&r1=1675175&r2=1675176&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java Tue Apr 21 18:08:56 2015 @@ -218,7 +218,6 @@ public class LlapTaskCommunicator extend @Override public void indicateError(Throwable t) { - LOG.info("Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + containerId, t); if (t instanceof ServiceException) { ServiceException se = (ServiceException) t; t = se.getCause(); @@ -228,10 +227,14 @@ public class LlapTaskCommunicator extend String message = re.toString(); // RejectedExecutions from the remote service treated as KILLED if (message.contains(RejectedExecutionException.class.getName())) { + LOG.info( + "Unable to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + + containerId + ", Service Busy"); getTaskCommunicatorContext().taskKilled(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.SERVICE_BUSY, "Service Busy"); } else { // All others from the remote service cause the task to FAIL. + LOG.info("Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + containerId, t); getTaskCommunicatorContext() .taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER, t.toString()); @@ -239,10 +242,14 @@ public class LlapTaskCommunicator extend } else { // Exception from the RPC layer - communication failure, consider as KILLED / service down. if (t instanceof IOException) { + LOG.info( + "Unable to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + + containerId + ", Communication Error"); getTaskCommunicatorContext().taskKilled(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.COMMUNICATION_ERROR, "Communication Error"); } else { // Anything else is a FAIL. + LOG.info("Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + containerId, t); getTaskCommunicatorContext() .taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER, t.getMessage()); Added: hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/ContainerFactory.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/ContainerFactory.java?rev=1675176&view=auto ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/ContainerFactory.java (added) +++ hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/ContainerFactory.java Tue Apr 21 18:08:56 2015 @@ -0,0 +1,54 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.rm; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.tez.dag.app.AppContext; + +class ContainerFactory { + final ApplicationAttemptId customAppAttemptId; + AtomicLong nextId; + + public ContainerFactory(AppContext appContext, long appIdLong) { + this.nextId = new AtomicLong(1); + ApplicationId appId = + ApplicationId.newInstance(appIdLong, appContext.getApplicationAttemptId() + .getApplicationId().getId()); + this.customAppAttemptId = + ApplicationAttemptId.newInstance(appId, appContext.getApplicationAttemptId() + .getAttemptId()); + } + + public Container createContainer(Resource capability, Priority priority, String hostname, + int port) { + ContainerId containerId = + ContainerId.newContainerId(customAppAttemptId, nextId.getAndIncrement()); + NodeId nodeId = NodeId.newInstance(hostname, port); + String nodeHttpAddress = "hostname:0"; // TODO: include UI ports + + Container container = + Container.newInstance(containerId, nodeId, nodeHttpAddress, capability, priority, null); + + return container; + } +} Modified: hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java?rev=1675176&r1=1675175&r2=1675176&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java Tue Apr 21 18:08:56 2015 @@ -21,6 +21,7 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -38,7 +39,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; @@ -48,8 +48,6 @@ import org.apache.hadoop.hive.llap.confi import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstance; import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstanceSet; import org.apache.hadoop.hive.llap.daemon.registry.impl.LlapRegistryService; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; @@ -78,15 +76,11 @@ public class LlapTaskSchedulerService ex // interface into the registry service private ServiceInstanceSet activeInstances; + // Tracks all instances, including ones which have been disabled in the past. + // LinkedHashMap to provide the same iteration order when selecting a random host. @VisibleForTesting - final Map<ServiceInstance, NodeInfo> instanceToNodeMap = new HashMap<>(); - - @VisibleForTesting - final Set<ServiceInstance> instanceBlackList = new HashSet<ServiceInstance>(); - - @VisibleForTesting - // Tracks currently allocated containers. - final Map<ContainerId, String> containerToInstanceMap = new HashMap<>(); + final Map<ServiceInstance, NodeInfo> instanceToNodeMap = new LinkedHashMap<>(); + // TODO Ideally, remove elements from this once it's known that no tasks are linked to the instance (all deallocated) // Tracks tasks which could not be allocated immediately. @VisibleForTesting @@ -100,8 +94,9 @@ public class LlapTaskSchedulerService ex // Tracks running and queued tasks. Cleared after a task completes. private final ConcurrentMap<Object, TaskInfo> knownTasks = new ConcurrentHashMap<>(); + // Queue for disabled nodes. Nodes make it out of this queue when their expiration timeout is hit. @VisibleForTesting - final DelayQueue<NodeInfo> disabledNodes = new DelayQueue<>(); + final DelayQueue<NodeInfo> disabledNodesQueue = new DelayQueue<>(); private final ContainerFactory containerFactory; private final Random random = new Random(); @@ -263,9 +258,9 @@ public class LlapTaskSchedulerService ex int vcores = 0; readLock.lock(); try { - for (ServiceInstance inst : instanceToNodeMap.keySet()) { - if (inst.isAlive()) { - Resource r = inst.getResource(); + for (Entry<ServiceInstance, NodeInfo> entry : instanceToNodeMap.entrySet()) { + if (entry.getKey().isAlive() && !entry.getValue().isDisabled()) { + Resource r = entry.getKey().getResource(); memory += r.getMemory(); vcores += r.getVirtualCores(); } @@ -375,8 +370,6 @@ public class LlapTaskSchedulerService ex } return false; } - String hostForContainer = containerToInstanceMap.remove(taskInfo.containerId); - assert hostForContainer != null; ServiceInstance assignedInstance = taskInfo.assignedInstance; assert assignedInstance != null; @@ -410,6 +403,8 @@ public class LlapTaskSchedulerService ex @Override public Object deallocateContainer(ContainerId containerId) { LOG.info("DEBUG: Ignoring deallocateContainer for containerId: " + containerId); + // Containers are not being tracked for re-use. + // This is safe to ignore since a deallocate task should have come in earlier. return null; } @@ -435,7 +430,7 @@ public class LlapTaskSchedulerService ex } /** - * @param requestedHosts the list of preferred hosts. null implies any host + * @param request the list of preferred hosts. null implies any host * @return */ private ServiceInstance selectHost(TaskInfo request) { @@ -444,6 +439,9 @@ public class LlapTaskSchedulerService ex try { // Check if any hosts are active. if (getAvailableResources().getMemory() <= 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Refreshing instances since total memory is 0"); + } refreshInstances(); } @@ -453,16 +451,20 @@ public class LlapTaskSchedulerService ex } if (requestedHosts != null) { + int prefHostCount = -1; for (String host : requestedHosts) { + prefHostCount++; // Pick the first host always. Weak attempt at cache affinity. Set<ServiceInstance> instances = activeInstances.getByHost(host); if (!instances.isEmpty()) { for (ServiceInstance inst : instances) { - if (inst.isAlive() && instanceToNodeMap.containsKey(inst)) { - // only allocate from the "available" list + NodeInfo nodeInfo = instanceToNodeMap.get(inst); + if (inst.isAlive() && nodeInfo != null && !nodeInfo.isDisabled()) { // TODO Change this to work off of what we think is remaining capacity for an // instance - LOG.info("Assigning " + inst + " when looking for " + host); + LOG.info( + "Assigning " + inst + " when looking for " + host + ". FirstRequestedHost=" + + (prefHostCount == 0)); return inst; } } @@ -470,16 +472,16 @@ public class LlapTaskSchedulerService ex } } /* fall through - miss in locality (random scheduling) */ - ServiceInstance[] all = instanceToNodeMap.keySet().toArray(new ServiceInstance[0]); + Entry<ServiceInstance, NodeInfo> [] all = instanceToNodeMap.entrySet().toArray(new Entry[instanceToNodeMap.size()]); // Check again if (all.length > 0) { int n = random.nextInt(all.length); // start at random offset and iterate whole list for (int i = 0; i < all.length; i++) { - ServiceInstance inst = all[(i + n) % all.length]; - if (inst.isAlive()) { - LOG.info("Assigning " + inst + " when looking for any host"); - return inst; + Entry<ServiceInstance, NodeInfo> inst = all[(i + n) % all.length]; + if (inst.getKey().isAlive() && !inst.getValue().isDisabled()) { + LOG.info("Assigning " + inst + " when looking for any host, from #hosts=" + all.length); + return inst.getKey(); } } } @@ -487,15 +489,27 @@ public class LlapTaskSchedulerService ex readLock.unlock(); } + // TODO Ideally, each refresh operation should addNodes if they don't already exist. + // Even better would be to get notifications from the service impl when a node gets added or removed. + // Instead of having to walk through the entire list. The computation of a node getting added or + // removed already exists in the DynamicRegistry implementation. + + + // This will only happen if no allocations are possible, which means all other nodes have + // been blacklisted. + // TODO Look for new nodes more often. See comment above. + /* check again whether nodes are disabled or just missing */ writeLock.lock(); try { for (ServiceInstance inst : activeInstances.getAll().values()) { - if (inst.isAlive() && instanceBlackList.contains(inst) == false - && instanceToNodeMap.containsKey(inst) == false) { + if (inst.isAlive() && instanceToNodeMap.containsKey(inst) == false) { /* that's a good node, not added to the allocations yet */ + LOG.info("Found a new node: " + inst + ". Adding to node list and disabling to trigger scheduling"); addNode(inst, new NodeInfo(inst, BACKOFF_FACTOR, clock)); // mark it as disabled to let the pending tasks go there + // TODO If disabling the instance, have it wake up immediately instead of waiting. + // Ideally get rid of this requirement, by having all tasks allocated via a queue. disableInstance(inst, true); } } @@ -515,19 +529,22 @@ public class LlapTaskSchedulerService ex } private void addNode(ServiceInstance inst, NodeInfo node) { + LOG.info("Adding node: " + inst); instanceToNodeMap.put(inst, node); + // TODO Trigger a scheduling run each time a new node is added. } private void reenableDisabledNode(NodeInfo nodeInfo) { writeLock.lock(); try { if (!nodeInfo.isBusy()) { + // If the node being re-enabled was not marked busy previously, then it was disabled due to + // some other failure. Refresh the service list to see if it's been removed permanently. refreshInstances(); } + LOG.info("Attempting to re-enable node: " + nodeInfo.host); if (nodeInfo.host.isAlive()) { nodeInfo.enableNode(); - instanceBlackList.remove(nodeInfo.host); - instanceToNodeMap.put(nodeInfo.host, nodeInfo); } else { if (LOG.isInfoEnabled()) { LOG.info("Removing dead node " + nodeInfo); @@ -541,19 +558,18 @@ public class LlapTaskSchedulerService ex private void disableInstance(ServiceInstance instance, boolean busy) { writeLock.lock(); try { - NodeInfo nodeInfo = instanceToNodeMap.remove(instance); - if (nodeInfo == null) { + NodeInfo nodeInfo = instanceToNodeMap.get(instance); + if (nodeInfo == null || nodeInfo.isDisabled()) { if (LOG.isDebugEnabled()) { LOG.debug("Node: " + instance + " already disabled, or invalid. Not doing anything."); } } else { - instanceBlackList.add(instance); nodeInfo.disableNode(nodeReEnableTimeout); nodeInfo.setBusy(busy); // daemon failure vs daemon busy // TODO: handle task to container map events in case of hard failures - disabledNodes.add(nodeInfo); + disabledNodesQueue.add(nodeInfo); if (LOG.isInfoEnabled()) { - LOG.info("Disabling instance " + instance + " for " + nodeReEnableTimeout + " seconds"); + LOG.info("Disabling instance " + instance + " for " + nodeReEnableTimeout + " milli-seconds"); } } } finally { @@ -640,7 +656,6 @@ public class LlapTaskSchedulerService ex host.getHost()); taskInfo.setAssignmentInfo(host, container.getId()); knownTasks.putIfAbsent(taskInfo.task, taskInfo); - containerToInstanceMap.put(container.getId(), host.getWorkerIdentity()); } finally { writeLock.unlock(); } @@ -660,7 +675,7 @@ public class LlapTaskSchedulerService ex while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) { try { while (true) { - NodeInfo nodeInfo = disabledNodes.take(); + NodeInfo nodeInfo = disabledNodesQueue.take(); // A node became available. Enable the node and try scheduling. reenableDisabledNode(nodeInfo); schedulePendingTasks(); @@ -694,8 +709,12 @@ public class LlapTaskSchedulerService ex private long numSuccessfulTasks = 0; private long numSuccessfulTasksAtLastBlacklist = -1; float cumulativeBackoffFactor = 1.0f; + // A node could be disabled for reasons other than being busy. + private boolean disabled = false; + // If disabled, the node could be marked as busy. private boolean busy; + NodeInfo(ServiceInstance host, float backoffFactor, Clock clock) { this.host = host; constBackOffFactor = backoffFactor; @@ -704,10 +723,12 @@ public class LlapTaskSchedulerService ex void enableNode() { expireTimeMillis = -1; + disabled = false; } void disableNode(long duration) { long currentTime = clock.getTime(); + disabled = true; if (numSuccessfulTasksAtLastBlacklist == numSuccessfulTasks) { // Blacklisted again, without any progress. Will never kick in for the first run. cumulativeBackoffFactor = cumulativeBackoffFactor * constBackOffFactor; @@ -721,7 +742,12 @@ public class LlapTaskSchedulerService ex } void registerTaskSuccess() { - this.busy = false; // if a task exited, we might have free slots + // TODO If a task succeeds, we may have free slots. Mark the node as !busy. Ideally take it out + // of the queue for more allocations. + // For now, not chanigng the busy status, + + // this.busy = false; + // this.disabled = false; numSuccessfulTasks++; } @@ -733,9 +759,13 @@ public class LlapTaskSchedulerService ex return busy; } + public boolean isDisabled() { + return disabled; + } + @Override public long getDelay(TimeUnit unit) { - return expireTimeMillis - clock.getTime(); + return unit.convert(expireTimeMillis - clock.getTime(), TimeUnit.MILLISECONDS); } @Override @@ -869,31 +899,4 @@ public class LlapTaskSchedulerService ex } } - static class ContainerFactory { - final ApplicationAttemptId customAppAttemptId; - AtomicLong nextId; - - public ContainerFactory(AppContext appContext, long appIdLong) { - this.nextId = new AtomicLong(1); - ApplicationId appId = - ApplicationId.newInstance(appIdLong, appContext.getApplicationAttemptId() - .getApplicationId().getId()); - this.customAppAttemptId = - ApplicationAttemptId.newInstance(appId, appContext.getApplicationAttemptId() - .getAttemptId()); - } - - public Container createContainer(Resource capability, Priority priority, String hostname, - int port) { - ContainerId containerId = - ContainerId.newContainerId(customAppAttemptId, nextId.getAndIncrement()); - NodeId nodeId = NodeId.newInstance(hostname, port); - String nodeHttpAddress = "hostname:0"; // TODO: include UI ports - - Container container = - Container.newInstance(containerId, nodeId, nodeHttpAddress, capability, priority, null); - - return container; - } - } } Modified: hive/branches/llap/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java?rev=1675176&r1=1675175&r2=1675176&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java (original) +++ hive/branches/llap/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java Tue Apr 21 18:08:56 2015 @@ -22,9 +22,7 @@ import static org.junit.Assert.assertTru import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -38,7 +36,6 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.tez.dag.api.TaskAttemptEndReason; import org.apache.tez.dag.app.AppContext; @@ -46,7 +43,6 @@ import org.apache.tez.dag.app.Controlled import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback; import org.junit.Test; import org.mockito.ArgumentCaptor; -import org.mortbay.log.Log; public class TestLlapTaskSchedulerService { @@ -112,7 +108,7 @@ public class TestLlapTaskSchedulerServic // Verify that the node is blacklisted assertEquals(1, tsWrapper.ts.dagStats.numRejectedTasks); assertEquals(2, tsWrapper.ts.instanceToNodeMap.size()); - LlapTaskSchedulerService.NodeInfo disabledNodeInfo = tsWrapper.ts.disabledNodes.peek(); + LlapTaskSchedulerService.NodeInfo disabledNodeInfo = tsWrapper.ts.disabledNodesQueue.peek(); assertNotNull(disabledNodeInfo); assertEquals(HOST1, disabledNodeInfo.host.getHost()); assertEquals((10000l), disabledNodeInfo.getDelay(TimeUnit.NANOSECONDS)); @@ -164,7 +160,7 @@ public class TestLlapTaskSchedulerServic // Verify that the node is blacklisted assertEquals(3, tsWrapper.ts.dagStats.numRejectedTasks); assertEquals(0, tsWrapper.ts.instanceToNodeMap.size()); - assertEquals(3, tsWrapper.ts.disabledNodes.size()); + assertEquals(3, tsWrapper.ts.disabledNodesQueue.size()); Object task4 = new Object();