http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java new file mode 100644 index 0000000..3bca0da --- /dev/null +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -0,0 +1,1512 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.tezplugins; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.Random; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import org.apache.commons.lang.mutable.MutableInt; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.registry.ServiceInstance; +import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; +import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.Clock; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; +import org.apache.tez.serviceplugins.api.TaskScheduler; +import org.apache.tez.serviceplugins.api.TaskSchedulerContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LlapTaskSchedulerService extends TaskScheduler { + + private static final Logger LOG = LoggerFactory.getLogger(LlapTaskSchedulerService.class); + + private final Configuration conf; + + // interface into the registry service + private ServiceInstanceSet activeInstances; + + // Tracks all instances, including ones which have been disabled in the past. + // LinkedHashMap to provide the same iteration order when selecting a random host. + @VisibleForTesting + final Map<ServiceInstance, NodeInfo> instanceToNodeMap = new LinkedHashMap<>(); + // TODO Ideally, remove elements from this once it's known that no tasks are linked to the instance (all deallocated) + + // Tracks tasks which could not be allocated immediately. + @VisibleForTesting + final TreeMap<Priority, List<TaskInfo>> pendingTasks = new TreeMap<>(new Comparator<Priority>() { + @Override + public int compare(Priority o1, Priority o2) { + return o1.getPriority() - o2.getPriority(); + } + }); + + // Tracks running and queued tasks. Cleared after a task completes. + private final ConcurrentMap<Object, TaskInfo> knownTasks = new ConcurrentHashMap<>(); + private final TreeMap<Integer, TreeSet<TaskInfo>> runningTasks = new TreeMap<>(); + private static final TaskStartComparator TASK_INFO_COMPARATOR = new TaskStartComparator(); + + // Queue for disabled nodes. Nodes make it out of this queue when their expiration timeout is hit. + @VisibleForTesting + final DelayQueue<NodeInfo> disabledNodesQueue = new DelayQueue<>(); + + private final boolean forceLocation; + + private final ContainerFactory containerFactory; + private final Random random = new Random(); + private final Clock clock; + + private final ListeningExecutorService nodeEnabledExecutor; + private final NodeEnablerCallable nodeEnablerCallable = + new NodeEnablerCallable(); + + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); + private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock(); + + private final Lock scheduleLock = new ReentrantLock(); + private final Condition scheduleCondition = scheduleLock.newCondition(); + private final AtomicBoolean pendingScheduleInvodations = new AtomicBoolean(false); + private final ListeningExecutorService schedulerExecutor; + private final SchedulerCallable schedulerCallable = new SchedulerCallable(); + + private final AtomicBoolean isStopped = new AtomicBoolean(false); + // Tracks total pending preemptions. + private final AtomicInteger pendingPreemptions = new AtomicInteger(0); + // Tracks pending preemptions per host, using the hostname || Always to be accessed inside a lock + private final Map<String, MutableInt> pendingPreemptionsPerHost = new HashMap<>(); + + private final NodeBlacklistConf nodeBlacklistConf; + + // Per daemon + private final int memoryPerInstance; + private final int coresPerInstance; + private final int executorsPerInstance; + + private final int numSchedulableTasksPerNode; + + // Per Executor Thread + private final Resource resourcePerExecutor; + + private final LlapRegistryService registry = new LlapRegistryService(false); + + private volatile ListenableFuture<Void> nodeEnablerFuture; + private volatile ListenableFuture<Void> schedulerFuture; + + @VisibleForTesting + private final AtomicInteger dagCounter = new AtomicInteger(1); + // Statistics to track allocations + // All of stats variables are visible for testing. + @VisibleForTesting + StatsPerDag dagStats = new StatsPerDag(); + + public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) { + this(taskSchedulerContext, new SystemClock()); + } + + @VisibleForTesting + public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock clock) { + super(taskSchedulerContext); + this.clock = clock; + try { + this.conf = TezUtils.createConfFromUserPayload(taskSchedulerContext.getInitialUserPayload()); + } catch (IOException e) { + throw new TezUncheckedException( + "Failed to parse user payload for " + LlapTaskSchedulerService.class.getSimpleName(), e); + } + this.containerFactory = new ContainerFactory(taskSchedulerContext.getApplicationAttemptId(), + taskSchedulerContext.getCustomClusterIdentifier()); + this.memoryPerInstance = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB); + this.coresPerInstance = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE); + this.executorsPerInstance = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS); + this.nodeBlacklistConf = new NodeBlacklistConf( + HiveConf.getTimeVar(conf, ConfVars.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MS, + TimeUnit.MILLISECONDS), + HiveConf.getTimeVar(conf, ConfVars.LLAP_TASK_SCHEDULER_NODE_REENABLE_MAX_TIMEOUT_MS, + TimeUnit.MILLISECONDS), + HiveConf.getFloatVar(conf, ConfVars.LLAP_TASK_SCHEDULER_NODE_DISABLE_BACK_OFF_FACTOR)); + + this.numSchedulableTasksPerNode = + HiveConf.getIntVar(conf, ConfVars.LLAP_TASK_SCHEDULER_NUM_SCHEDULABLE_TASKS_PER_NODE); + + long localityDelayMs = HiveConf + .getTimeVar(conf, ConfVars.LLAP_TASK_SCHEDULER_LOCALITY_DELAY, TimeUnit.MILLISECONDS); + if (localityDelayMs == -1) { + this.forceLocation = true; + } else { + this.forceLocation = false; + } + + int memoryPerExecutor = (int) (memoryPerInstance / (float) executorsPerInstance); + int coresPerExecutor = (int) (coresPerInstance / (float) executorsPerInstance); + this.resourcePerExecutor = Resource.newInstance(memoryPerExecutor, coresPerExecutor); + + String instanceId = HiveConf.getTrimmedVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS); + + Preconditions.checkNotNull(instanceId, ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname + + " must be defined"); + + ExecutorService executorServiceRaw = + Executors.newFixedThreadPool(1, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapSchedulerNodeEnabler").build()); + nodeEnabledExecutor = MoreExecutors.listeningDecorator(executorServiceRaw); + + ExecutorService schedulerExecutorServiceRaw = Executors.newFixedThreadPool(1, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapScheduler").build()); + schedulerExecutor = MoreExecutors.listeningDecorator(schedulerExecutorServiceRaw); + + LOG.info("Running with configuration: " + "memoryPerInstance=" + memoryPerInstance + + ", vCoresPerInstance=" + coresPerInstance + ", executorsPerInstance=" + + executorsPerInstance + ", resourcePerInstanceInferred=" + resourcePerExecutor + + ", nodeBlacklistConf=" + nodeBlacklistConf + + ", forceLocation=" + forceLocation); + } + + @Override + public void initialize() { + registry.init(conf); + } + + @Override + public void start() throws IOException { + writeLock.lock(); + try { + nodeEnablerFuture = nodeEnabledExecutor.submit(nodeEnablerCallable); + Futures.addCallback(nodeEnablerFuture, new FutureCallback<Void>() { + @Override + public void onSuccess(Void result) { + LOG.info("NodeEnabledThread exited"); + } + + @Override + public void onFailure(Throwable t) { + LOG.warn("NodeEnabledThread exited with error", t); + } + }); + schedulerFuture = schedulerExecutor.submit(schedulerCallable); + Futures.addCallback(schedulerFuture, new FutureCallback<Void>() { + @Override + public void onSuccess(Void result) { + LOG.info("SchedulerThread exited"); + } + + @Override + public void onFailure(Throwable t) { + LOG.warn("SchedulerThread exited with error", t); + } + }); + registry.start(); + activeInstances = registry.getInstances(); + for (ServiceInstance inst : activeInstances.getAll().values()) { + addNode(inst, new NodeInfo(inst, nodeBlacklistConf, clock, numSchedulableTasksPerNode)); + } + } finally { + writeLock.unlock(); + } + } + + @Override + public void shutdown() { + writeLock.lock(); + try { + if (!this.isStopped.getAndSet(true)) { + nodeEnablerCallable.shutdown(); + if (nodeEnablerFuture != null) { + nodeEnablerFuture.cancel(true); + } + nodeEnabledExecutor.shutdownNow(); + + schedulerCallable.shutdown(); + if (schedulerFuture != null) { + schedulerFuture.cancel(true); + } + schedulerExecutor.shutdownNow(); + + if (registry != null) { + registry.stop(); + } + } + } finally { + writeLock.unlock(); + } + } + + @Override + public Resource getTotalResources() { + int memory = 0; + int vcores = 0; + readLock.lock(); + try { + for (ServiceInstance inst : activeInstances.getAll().values()) { + if (inst.isAlive()) { + Resource r = inst.getResource(); + LOG.info("Found instance " + inst); + memory += r.getMemory(); + vcores += r.getVirtualCores(); + } else { + LOG.info("Ignoring dead instance " + inst); + } + } + } finally { + readLock.unlock(); + } + + return Resource.newInstance(memory, vcores); + } + + /** + * The difference between this and getTotalResources() is that this only gives currently free + * resource instances, while the other lists all the instances that may become available in a + * while. + */ + @Override + public Resource getAvailableResources() { + // need a state store eventually for current state & measure backoffs + int memory = 0; + int vcores = 0; + readLock.lock(); + try { + for (Entry<ServiceInstance, NodeInfo> entry : instanceToNodeMap.entrySet()) { + if (entry.getKey().isAlive() && !entry.getValue().isDisabled()) { + Resource r = entry.getKey().getResource(); + memory += r.getMemory(); + vcores += r.getVirtualCores(); + } + } + } finally { + readLock.unlock(); + } + + return Resource.newInstance(memory, vcores); + } + + @Override + public int getClusterNodeCount() { + readLock.lock(); + try { + int n = 0; + for (ServiceInstance inst : activeInstances.getAll().values()) { + if (inst.isAlive()) { + n++; + } + } + return n; + } finally { + readLock.unlock(); + } + } + + @Override + public void dagComplete() { + // This is effectively DAG completed, and can be used to reset statistics being tracked. + LOG.info("DAG: " + dagCounter.get() + " completed. Scheduling stats: " + dagStats); + dagCounter.incrementAndGet(); + dagStats = new StatsPerDag(); + } + + @Override + public void blacklistNode(NodeId nodeId) { + LOG.info("BlacklistNode not supported"); + } + + @Override + public void unblacklistNode(NodeId nodeId) { + LOG.info("unBlacklistNode not supported"); + } + + @Override + public void allocateTask(Object task, Resource capability, String[] hosts, String[] racks, + Priority priority, Object containerSignature, Object clientCookie) { + TaskInfo taskInfo = + new TaskInfo(task, clientCookie, priority, capability, hosts, racks, clock.getTime()); + writeLock.lock(); + try { + dagStats.registerTaskRequest(hosts, racks); + } finally { + writeLock.unlock(); + } + addPendingTask(taskInfo); + trySchedulingPendingTasks(); + } + + @Override + public void allocateTask(Object task, Resource capability, ContainerId containerId, + Priority priority, Object containerSignature, Object clientCookie) { + // Container affinity can be implemented as Host affinity for LLAP. Not required until + // 1:1 edges are used in Hive. + TaskInfo taskInfo = + new TaskInfo(task, clientCookie, priority, capability, null, null, clock.getTime()); + writeLock.lock(); + try { + dagStats.registerTaskRequest(null, null); + } finally { + writeLock.unlock(); + } + addPendingTask(taskInfo); + trySchedulingPendingTasks(); + } + + + // This may be invoked before a container is ever assigned to a task. allocateTask... app decides + // the task is no longer required, and asks for a de-allocation. + @Override + public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason, String diagnostics) { + writeLock.lock(); // Updating several local structures + TaskInfo taskInfo; + try { + taskInfo = unregisterTask(task); + if (taskInfo == null) { + LOG.error("Could not determine ContainerId for task: " + + task + + " . Could have hit a race condition. Ignoring." + + " The query may hang since this \"unknown\" container is now taking up a slot permanently"); + return false; + } + if (taskInfo.containerId == null) { + if (taskInfo.assigned) { + LOG.error("Task: " + + task + + " assigned, but could not find the corresponding containerId." + + " The query may hang since this \"unknown\" container is now taking up a slot permanently"); + } else { + LOG.info("Ignoring deallocate request for task " + task + + " which hasn't been assigned to a container"); + removePendingTask(taskInfo); + } + return false; + } + ServiceInstance assignedInstance = taskInfo.assignedInstance; + assert assignedInstance != null; + + NodeInfo nodeInfo = instanceToNodeMap.get(assignedInstance); + assert nodeInfo != null; + + // Re-enable the node if preempted + if (taskInfo.preempted) { + LOG.info("Processing deallocateTask for {} which was preempted, EndReason={}", task, endReason); + unregisterPendingPreemption(taskInfo.assignedInstance.getHost()); + nodeInfo.registerUnsuccessfulTaskEnd(true); + if (nodeInfo.isDisabled()) { + // Re-enable the node. If a task succeeded, a slot may have become available. + // Also reset commFailures since a task was able to communicate back and indicate success. + nodeInfo.enableNode(); + // Re-insert into the queue to force the poll thread to remove the element. + if ( disabledNodesQueue.remove(nodeInfo)) { + disabledNodesQueue.add(nodeInfo); + } + } + // In case of success, trigger a scheduling run for pending tasks. + trySchedulingPendingTasks(); + } else { + if (taskSucceeded) { + // The node may have been blacklisted at this point - which means it may not be in the + // activeNodeList. + + nodeInfo.registerTaskSuccess(); + + if (nodeInfo.isDisabled()) { + // Re-enable the node. If a task succeeded, a slot may have become available. + // Also reset commFailures since a task was able to communicate back and indicate success. + nodeInfo.enableNode(); + // Re-insert into the queue to force the poll thread to remove the element. + if (disabledNodesQueue.remove(nodeInfo)) { + disabledNodesQueue.add(nodeInfo); + } + } + // In case of success, trigger a scheduling run for pending tasks. + trySchedulingPendingTasks(); + + } else if (!taskSucceeded) { + nodeInfo.registerUnsuccessfulTaskEnd(false); + if (endReason != null && EnumSet + .of(TaskAttemptEndReason.EXECUTOR_BUSY, TaskAttemptEndReason.COMMUNICATION_ERROR) + .contains(endReason)) { + if (endReason == TaskAttemptEndReason.COMMUNICATION_ERROR) { + dagStats.registerCommFailure(taskInfo.assignedInstance.getHost()); + } else if (endReason == TaskAttemptEndReason.EXECUTOR_BUSY) { + dagStats.registerTaskRejected(taskInfo.assignedInstance.getHost()); + } + } + boolean commFailure = + endReason != null && endReason == TaskAttemptEndReason.COMMUNICATION_ERROR; + disableInstance(assignedInstance, commFailure); + } + } + } finally { + writeLock.unlock(); + } + getContext().containerBeingReleased(taskInfo.containerId); + return true; + } + + @Override + public Object deallocateContainer(ContainerId containerId) { + LOG.debug("Ignoring deallocateContainer for containerId: " + containerId); + // Containers are not being tracked for re-use. + // This is safe to ignore since a deallocate task will come in. + return null; + } + + @Override + public void setShouldUnregister() { + + } + + @Override + public boolean hasUnregistered() { + // Nothing to do. No registration involved. + return true; + } + + private ExecutorService createAppCallbackExecutorService() { + return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() + .setNameFormat("TaskSchedulerAppCaller #%d").setDaemon(true).build()); + } + + /** + * @param request the list of preferred hosts. null implies any host + * @return + */ + private SelectHostResult selectHost(TaskInfo request) { + String[] requestedHosts = request.requestedHosts; + readLock.lock(); // Read-lock. Not updating any stats at the moment. + try { + // Check if any hosts are active. + if (getAvailableResources().getMemory() <= 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Refreshing instances since total memory is 0"); + } + refreshInstances(); + } + + // If there's no memory available, fail + if (getTotalResources().getMemory() <= 0) { + return SELECT_HOST_RESULT_INADEQUATE_TOTAL_CAPACITY; + } + + if (requestedHosts != null && requestedHosts.length > 0) { + int prefHostCount = -1; + boolean requestedHostExists = false; + for (String host : requestedHosts) { + prefHostCount++; + // Pick the first host always. Weak attempt at cache affinity. + Set<ServiceInstance> instances = activeInstances.getByHost(host); + if (!instances.isEmpty()) { + requestedHostExists = true; + for (ServiceInstance inst : instances) { + NodeInfo nodeInfo = instanceToNodeMap.get(inst); + if (nodeInfo != null && nodeInfo.canAcceptTask()) { + LOG.info("Assigning " + inst + " when looking for " + host + "." + + " FirstRequestedHost=" + (prefHostCount == 0) + + (requestedHosts.length > 1 ? "#prefLocations=" + requestedHosts.length : "")); + return new SelectHostResult(inst, nodeInfo); + } + } + } + } + // Check if forcing the location is required. + if (forceLocation) { + if (requestedHostExists) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping non-local location allocation for [" + request.task + + "] when trying to allocate on [" + Arrays.toString(requestedHosts) + "]"); + } + return SELECT_HOST_RESULT_DELAYED_LOCALITY; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Not skipping non-local location allocation for [" + request.task + + "] when trying to allocate on [" + Arrays.toString(requestedHosts) + + "] since none of these hosts are part of the known list"); + } + } + } + } + /* fall through - miss in locality (random scheduling) */ + Entry<ServiceInstance, NodeInfo>[] all = + instanceToNodeMap.entrySet().toArray(new Entry[instanceToNodeMap.size()]); + // Check again + if (all.length > 0) { + int n = random.nextInt(all.length); + // start at random offset and iterate whole list + for (int i = 0; i < all.length; i++) { + Entry<ServiceInstance, NodeInfo> inst = all[(i + n) % all.length]; + if (inst.getValue().canAcceptTask()) { + LOG.info("Assigning " + inst + " when looking for any host, from #hosts=" + all.length + + ", requestedHosts=" + + ((requestedHosts == null || requestedHosts.length == 0) ? "null" : + Arrays.toString(requestedHosts))); + return new SelectHostResult(inst.getKey(), inst.getValue()); + } + } + } + return SELECT_HOST_RESULT_DELAYED_RESOURCES; + } finally { + readLock.unlock(); + } + } + + // TODO Each refresh operation should addNodes if they don't already exist. + // Even better would be to get notifications from the service impl when a node gets added or removed. + // Instead of having to walk through the entire list. The computation of a node getting added or + // removed already exists in the DynamicRegistry implementation. + private void refreshInstances() { + try { + activeInstances.refresh(); // handles its own sync + } catch (IOException ioe) { + LOG.warn("Could not refresh list of active instances", ioe); + } + } + + private void scanForNodeChanges() { + /* check again whether nodes are disabled or just missing */ + writeLock.lock(); + try { + for (ServiceInstance inst : activeInstances.getAll().values()) { + if (inst.isAlive() && instanceToNodeMap.containsKey(inst) == false) { + /* that's a good node, not added to the allocations yet */ + LOG.info("Found a new node: " + inst + "."); + addNode(inst, new NodeInfo(inst, nodeBlacklistConf, clock, numSchedulableTasksPerNode)); + } + } + } finally { + writeLock.unlock(); + } + } + + private void addNode(ServiceInstance inst, NodeInfo node) { + LOG.info("Adding node: " + inst); + instanceToNodeMap.put(inst, node); + // Trigger scheduling since a new node became available. + trySchedulingPendingTasks(); + } + + private void reenableDisabledNode(NodeInfo nodeInfo) { + writeLock.lock(); + try { + if (nodeInfo.hadCommFailure()) { + // If the node being re-enabled was not marked busy previously, then it was disabled due to + // some other failure. Refresh the service list to see if it's been removed permanently. + refreshInstances(); + } + LOG.info("Attempting to re-enable node: " + nodeInfo.getServiceInstance()); + if (nodeInfo.getServiceInstance().isAlive()) { + nodeInfo.enableNode(); + } else { + if (LOG.isInfoEnabled()) { + LOG.info("Removing dead node " + nodeInfo); + } + } + } finally { + writeLock.unlock(); + } + } + + private void disableInstance(ServiceInstance instance, boolean isCommFailure) { + writeLock.lock(); + try { + NodeInfo nodeInfo = instanceToNodeMap.get(instance); + if (nodeInfo == null || nodeInfo.isDisabled()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Node: " + instance + " already disabled, or invalid. Not doing anything."); + } + } else { + nodeInfo.disableNode(isCommFailure); + // TODO: handle task to container map events in case of hard failures + disabledNodesQueue.add(nodeInfo); + } + } finally { + writeLock.unlock(); + } + } + + private void addPendingTask(TaskInfo taskInfo) { + writeLock.lock(); + try { + List<TaskInfo> tasksAtPriority = pendingTasks.get(taskInfo.priority); + if (tasksAtPriority == null) { + tasksAtPriority = new LinkedList<>(); + pendingTasks.put(taskInfo.priority, tasksAtPriority); + } + tasksAtPriority.add(taskInfo); + knownTasks.putIfAbsent(taskInfo.task, taskInfo); + } finally { + writeLock.unlock(); + } + } + + /* Remove a task from the pending list */ + private void removePendingTask(TaskInfo taskInfo) { + writeLock.lock(); + try { + Priority priority = taskInfo.priority; + List<TaskInfo> taskInfoList = pendingTasks.get(priority); + if (taskInfoList == null || taskInfoList.isEmpty() || !taskInfoList.remove(taskInfo)) { + LOG.warn("Could not find task: " + taskInfo.task + " in pending list, at priority: " + + priority); + } + } finally { + writeLock.unlock(); + } + } + + /* Register a running task into the runningTasks structure */ + private void registerRunningTask(TaskInfo taskInfo) { + writeLock.lock(); + try { + int priority = taskInfo.priority.getPriority(); + TreeSet<TaskInfo> tasksAtpriority = runningTasks.get(priority); + if (tasksAtpriority == null) { + tasksAtpriority = new TreeSet<>(TASK_INFO_COMPARATOR); + runningTasks.put(priority, tasksAtpriority); + } + tasksAtpriority.add(taskInfo); + } finally { + writeLock.unlock(); + } + } + + /* Unregister a task from the known and running structures */ + private TaskInfo unregisterTask(Object task) { + writeLock.lock(); + try { + TaskInfo taskInfo = knownTasks.remove(task); + if (taskInfo != null) { + if (taskInfo.assigned) { + // Remove from the running list. + int priority = taskInfo.priority.getPriority(); + Set<TaskInfo> tasksAtPriority = runningTasks.get(priority); + Preconditions.checkState(tasksAtPriority != null, + "runningTasks should contain an entry if the task was in running state. Caused by task: {}", task); + tasksAtPriority.remove(taskInfo); + if (tasksAtPriority.isEmpty()) { + runningTasks.remove(priority); + } + } + } else { + LOG.warn("Could not find TaskInfo for task: {}. Not removing it from the running set", task); + } + return taskInfo; + } finally { + writeLock.unlock(); + } + } + + private enum ScheduleResult { + // Successfully scheduled + SCHEDULED, + + // Delayed to find a local match + DELAYED_LOCALITY, + + // Delayed due to temporary resource availability + DELAYED_RESOURCES, + + // Inadequate total resources - will never succeed / wait for new executors to become available + INADEQUATE_TOTAL_RESOURCES, + } + + @VisibleForTesting + protected void schedulePendingTasks() { + writeLock.lock(); + try { + Iterator<Entry<Priority, List<TaskInfo>>> pendingIterator = + pendingTasks.entrySet().iterator(); + while (pendingIterator.hasNext()) { + Entry<Priority, List<TaskInfo>> entry = pendingIterator.next(); + List<TaskInfo> taskListAtPriority = entry.getValue(); + Iterator<TaskInfo> taskIter = taskListAtPriority.iterator(); + boolean scheduledAllAtPriority = true; + while (taskIter.hasNext()) { + + // TODO Optimization: Add a check to see if there's any capacity available. No point in + // walking through all active nodes, if they don't have potential capacity. + + TaskInfo taskInfo = taskIter.next(); + if (taskInfo.getNumPreviousAssignAttempts() == 1) { + dagStats.registerDelayedAllocation(); + } + taskInfo.triedAssigningTask(); + ScheduleResult scheduleResult = scheduleTask(taskInfo); + if (scheduleResult == ScheduleResult.SCHEDULED) { + taskIter.remove(); + } else { + // TODO Handle INADEQUATE_TOTAL_RESOURCES eventually - either by throwin an error immediately, + // or waiting for some timeout for new executors and then throwing an error + + // Try pre-empting a task so that a higher priority task can take it's place. + // Preempt only if there's no pending preemptions to avoid preempting twice for a task. + String[] potentialHosts; + if (scheduleResult == ScheduleResult.DELAYED_LOCALITY) { + // preempt only on specific hosts, if no preemptions already exist on those. + potentialHosts = taskInfo.requestedHosts; + //Protect against a bad location being requested. + if (potentialHosts == null || potentialHosts.length == 0) { + potentialHosts = null; + } + } else { + // preempt on any host. + potentialHosts = null; + } + + if (potentialHosts != null) { + // Preempt on specific host + boolean shouldPreempt = true; + for (String host : potentialHosts) { + // Preempt only if there are not pending preemptions on the same host + // When the premption registers, the request at the highest priority will be given the slot, + // even if the initial request was for some other task. + // TODO Maybe register which task the preemption was for, to avoid a bad non-local allocation. + MutableInt pendingHostPreemptions = pendingPreemptionsPerHost.get(host); + if (pendingHostPreemptions != null && pendingHostPreemptions.intValue() > 0) { + shouldPreempt = false; + break; + } + } + if (shouldPreempt) { + LOG.info("Attempting to preempt for {}, pendingPreemptions={} on hosts={}", + taskInfo.task, pendingPreemptions.get(), Arrays.toString(potentialHosts)); + preemptTasks(entry.getKey().getPriority(), 1, potentialHosts); + } + } else { + // Request for a preemption if there's none pending. If a single preemption is pending, + // and this is the next task to be assigned, it will be assigned once that slot becomes available. + if (pendingPreemptions.get() == 0) { + LOG.info("Attempting to preempt for {}, pendingPreemptions={} on any host", + taskInfo.task, pendingPreemptions.get()); + preemptTasks(entry.getKey().getPriority(), 1, null); + } + } + // Since there was an allocation failure - don't try assigning tasks at the next priority. + + scheduledAllAtPriority = false; + // Don't break if this allocation failure was a result of a LOCALITY_DELAY. Others could still be allocated. + if (scheduleResult != ScheduleResult.DELAYED_LOCALITY) { + break; + } + } // end of else - i.e. could not allocate + } // end of loop over pending tasks + if (taskListAtPriority.isEmpty()) { + // Remove the entry, if there's nothing left at the specific priority level + pendingIterator.remove(); + } + if (!scheduledAllAtPriority) { + // Don't attempt scheduling for additional priorities + break; + } + } + } finally { + writeLock.unlock(); + } + } + + + private ScheduleResult scheduleTask(TaskInfo taskInfo) { + SelectHostResult selectHostResult = selectHost(taskInfo); + if (selectHostResult.scheduleResult == ScheduleResult.SCHEDULED) { + NodeServiceInstancePair nsPair = selectHostResult.nodeServiceInstancePair; + Container container = + containerFactory.createContainer(resourcePerExecutor, taskInfo.priority, + nsPair.getServiceInstance().getHost(), + nsPair.getServiceInstance().getRpcPort()); + writeLock.lock(); // While updating local structures + try { + LOG.info("Assigned task {} to container {}", taskInfo, container.getId()); + dagStats.registerTaskAllocated(taskInfo.requestedHosts, taskInfo.requestedRacks, + nsPair.getServiceInstance().getHost()); + taskInfo.setAssignmentInfo(nsPair.getServiceInstance(), container.getId(), clock.getTime()); + registerRunningTask(taskInfo); + nsPair.getNodeInfo().registerTaskScheduled(); + } finally { + writeLock.unlock(); + } + getContext().taskAllocated(taskInfo.task, taskInfo.clientCookie, container); + } + return selectHostResult.scheduleResult; + } + + // Removes tasks from the runningList and sends out a preempt request to the system. + // Subsequent tasks will be scheduled again once the de-allocate request for the preempted + // task is processed. + private void preemptTasks(int forPriority, int numTasksToPreempt, String []potentialHosts) { + Set<String> preemptHosts; + if (potentialHosts == null) { + preemptHosts = null; + } else { + preemptHosts = Sets.newHashSet(potentialHosts); + } + writeLock.lock(); + List<TaskInfo> preemptedTaskList = null; + try { + NavigableMap<Integer, TreeSet<TaskInfo>> orderedMap = runningTasks.descendingMap(); + Iterator<Entry<Integer, TreeSet<TaskInfo>>> iterator = orderedMap.entrySet().iterator(); + int preemptedCount = 0; + while (iterator.hasNext() && preemptedCount < numTasksToPreempt) { + Entry<Integer, TreeSet<TaskInfo>> entryAtPriority = iterator.next(); + if (entryAtPriority.getKey() > forPriority) { + Iterator<TaskInfo> taskInfoIterator = entryAtPriority.getValue().iterator(); + while (taskInfoIterator.hasNext() && preemptedCount < numTasksToPreempt) { + TaskInfo taskInfo = taskInfoIterator.next(); + if (preemptHosts == null || preemptHosts.contains(taskInfo.assignedInstance.getHost())) { + // Candidate for preemption. + preemptedCount++; + LOG.info("preempting {} for task at priority {} with potentialHosts={}", taskInfo, + forPriority, potentialHosts == null ? "" : Arrays.toString(potentialHosts)); + taskInfo.setPreemptedInfo(clock.getTime()); + if (preemptedTaskList == null) { + preemptedTaskList = new LinkedList<>(); + } + dagStats.registerTaskPreempted(taskInfo.assignedInstance.getHost()); + preemptedTaskList.add(taskInfo); + registerPendingPreemption(taskInfo.assignedInstance.getHost()); + // Remove from the runningTaskList + taskInfoIterator.remove(); + } + } + + // Remove entire priority level if it's been emptied. + if (entryAtPriority.getValue().isEmpty()) { + iterator.remove(); + } + } else { + // No tasks qualify as preemptable + LOG.info("DBG: No tasks qualify as killable to schedule tasks at priority {}", forPriority); + break; + } + } + } finally { + writeLock.unlock(); + } + // Send out the preempted request outside of the lock. + if (preemptedTaskList != null) { + for (TaskInfo taskInfo : preemptedTaskList) { + LOG.info("DBG: Preempting task {}", taskInfo); + getContext().preemptContainer(taskInfo.containerId); + // Preemption will finally be registered as a deallocateTask as a result of preemptContainer + // That resets preemption info and allows additional tasks to be pre-empted if required. + } + } + // The schedule loop will be triggered again when the deallocateTask request comes in for the + // preempted task. + } + + private void registerPendingPreemption(String host) { + writeLock.lock(); + try { + pendingPreemptions.incrementAndGet(); + MutableInt val = pendingPreemptionsPerHost.get(host); + if (val == null) { + val = new MutableInt(1); + pendingPreemptionsPerHost.put(host, val); + } + val.increment(); + } finally { + writeLock.unlock(); + } + } + + private void unregisterPendingPreemption(String host) { + writeLock.lock(); + try { + pendingPreemptions.decrementAndGet(); + MutableInt val = pendingPreemptionsPerHost.get(host); + Preconditions.checkNotNull(val); + val.decrement(); + // Not bothering with removing the entry. There's a limited number of hosts, and a good + // chance that the entry will make it back in when the AM is used for a long duration. + } finally { + writeLock.unlock(); + } + } + + private class NodeEnablerCallable implements Callable<Void> { + + private AtomicBoolean isShutdown = new AtomicBoolean(false); + private static final long REFRESH_INTERVAL = 10000l; + long nextPollInterval = REFRESH_INTERVAL; + long lastRefreshTime; + + @Override + public Void call() { + + lastRefreshTime = System.currentTimeMillis(); + while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) { + try { + while (true) { + NodeInfo nodeInfo = disabledNodesQueue.poll(nextPollInterval, TimeUnit.MILLISECONDS); + if (nodeInfo != null) { + long currentTime = System.currentTimeMillis(); + // A node became available. Enable the node and try scheduling. + reenableDisabledNode(nodeInfo); + trySchedulingPendingTasks(); + + nextPollInterval -= (currentTime - lastRefreshTime); + } + + if (nextPollInterval < 0 || nodeInfo == null) { + // timeout expired. Reset the poll interval and refresh nodes. + nextPollInterval = REFRESH_INTERVAL; + lastRefreshTime = System.currentTimeMillis(); + // TODO Get rid of this polling once we have notificaitons from the registry sub-system + if (LOG.isDebugEnabled()) { + LOG.debug("Refreshing instances based on poll interval"); + } + refreshInstances(); + scanForNodeChanges(); + } + } + } catch (InterruptedException e) { + if (isShutdown.get()) { + LOG.info("NodeEnabler thread interrupted after shutdown"); + break; + } else { + LOG.warn("NodeEnabler thread interrupted without being shutdown"); + throw new RuntimeException("NodeEnabler thread interrupted without being shutdown", e); + } + } + } + return null; + } + + // Call this first, then send in an interrupt to the thread. + public void shutdown() { + isShutdown.set(true); + } + } + + private void trySchedulingPendingTasks() { + scheduleLock.lock(); + try { + pendingScheduleInvodations.set(true); + scheduleCondition.signal(); + } finally { + scheduleLock.unlock(); + } + } + + private class SchedulerCallable implements Callable<Void> { + private AtomicBoolean isShutdown = new AtomicBoolean(false); + + @Override + public Void call() { + while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) { + scheduleLock.lock(); + try { + while (!pendingScheduleInvodations.get()) { + scheduleCondition.await(); + } + } catch (InterruptedException e) { + if (isShutdown.get()) { + LOG.info("Scheduler thread interrupted after shutdown"); + break; + } else { + LOG.warn("Scheduler thread interrupted without being shutdown"); + throw new RuntimeException("Scheduler thread interrupted without being shutdown", e); + } + } finally { + scheduleLock.unlock(); + } + + // Set pending to false since scheduling is about to run. Any triggers up to this point + // will be handled in the next run. + // A new request may come in right after this is set to false, but before the actual scheduling. + // This will be handled in this run, but will cause an immediate run after, which is harmless. + pendingScheduleInvodations.set(false); + // Schedule outside of the scheduleLock - which should only be used to wait on the condition. + schedulePendingTasks(); + } + return null; + } + + // Call this first, then send in an interrupt to the thread. + public void shutdown() { + isShutdown.set(true); + } + } + + @VisibleForTesting + static class NodeInfo implements Delayed { + private final NodeBlacklistConf blacklistConf; + final ServiceInstance serviceInstance; + private final Clock clock; + + long expireTimeMillis = -1; + private long numSuccessfulTasks = 0; + private long numSuccessfulTasksAtLastBlacklist = -1; + float cumulativeBackoffFactor = 1.0f; + + // Indicates whether a node had a recent communication failure. + private boolean hadCommFailure = false; + + // Indicates whether a node is disabled - for whatever reason - commFailure, busy, etc. + private boolean disabled = false; + + private int numPreemptedTasks = 0; + private int numScheduledTasks = 0; + private final int numSchedulableTasks; + + + /** + * Create a NodeInfo bound to a service instance + * + * @param serviceInstance the associated serviceInstance + * @param blacklistConf blacklist configuration + * @param clock clock to use to obtain timing information + * @param numSchedulableTasksConf number of schedulable tasks on the node. 0 represents auto + * detect based on the serviceInstance, -1 indicates indicates + * unlimited capacity + */ + NodeInfo(ServiceInstance serviceInstance, NodeBlacklistConf blacklistConf, Clock clock, int numSchedulableTasksConf) { + Preconditions.checkArgument(numSchedulableTasksConf >= -1, "NumSchedulableTasks must be >=-1"); + this.serviceInstance = serviceInstance; + this.blacklistConf = blacklistConf; + this.clock = clock; + + if (numSchedulableTasksConf == 0) { + int pendingQueueuCapacity = 0; + String pendingQueueCapacityString = serviceInstance.getProperties() + .get(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.varname); + if (LOG.isDebugEnabled()) { + LOG.debug("Setting up node: " + serviceInstance + ", with available capacity=" + + serviceInstance.getResource().getVirtualCores() + ", pendingQueueCapacity=" + + pendingQueueCapacityString); + } + + if (pendingQueueCapacityString != null) { + pendingQueueuCapacity = Integer.parseInt(pendingQueueCapacityString); + } + this.numSchedulableTasks = serviceInstance.getResource().getVirtualCores() + pendingQueueuCapacity; + } else { + this.numSchedulableTasks = numSchedulableTasksConf; + } + LOG.info("Setting up node: " + serviceInstance + " with schedulableCapacity=" + this.numSchedulableTasks); + } + + ServiceInstance getServiceInstance() { + return serviceInstance; + } + + void enableNode() { + expireTimeMillis = -1; + disabled = false; + hadCommFailure = false; + } + + void disableNode(boolean commFailure) { + long duration = blacklistConf.minDelay; + long currentTime = clock.getTime(); + this.hadCommFailure = commFailure; + disabled = true; + if (numSuccessfulTasksAtLastBlacklist == numSuccessfulTasks) { + // Relying on a task succeeding to reset the exponent. + // There's no notifications on whether a task gets accepted or not. That would be ideal to + // reset this. + cumulativeBackoffFactor = cumulativeBackoffFactor * blacklistConf.backoffFactor; + } else { + // Was able to execute something before the last blacklist. Reset the exponent. + cumulativeBackoffFactor = 1.0f; + } + + long delayTime = (long) (duration * cumulativeBackoffFactor); + if (delayTime > blacklistConf.maxDelay) { + delayTime = blacklistConf.maxDelay; + } + if (LOG.isInfoEnabled()) { + LOG.info("Disabling instance " + serviceInstance + " for " + delayTime + " milli-seconds"); + } + expireTimeMillis = currentTime + delayTime; + numSuccessfulTasksAtLastBlacklist = numSuccessfulTasks; + } + + void registerTaskScheduled() { + numScheduledTasks++; + } + + void registerTaskSuccess() { + numSuccessfulTasks++; + numScheduledTasks--; + } + + void registerUnsuccessfulTaskEnd(boolean wasPreempted) { + numScheduledTasks--; + if (wasPreempted) { + numPreemptedTasks++; + } + } + + public boolean isDisabled() { + return disabled; + } + + public boolean hadCommFailure() { + return hadCommFailure; + } + /* Returning true does not guarantee that the task will run, considering other queries + may be running in the system. Also depends upon the capacity usage configuration + */ + public boolean canAcceptTask() { + boolean result = !hadCommFailure && !disabled && serviceInstance.isAlive() + &&(numSchedulableTasks == -1 || ((numSchedulableTasks - numScheduledTasks) > 0)); + LOG.info("canAcceptTask={}, numScheduledTasks={}, numSchedulableTasks={}, hadCommFailure={}, disabled={}, serviceInstance.isAlive={}", result, numScheduledTasks, numSchedulableTasks, hadCommFailure, disabled, serviceInstance.isAlive()); + return result; + } + + @Override + public long getDelay(TimeUnit unit) { + return unit.convert(expireTimeMillis - clock.getTime(), TimeUnit.MILLISECONDS); + } + + @Override + public int compareTo(Delayed o) { + NodeInfo other = (NodeInfo) o; + if (other.expireTimeMillis > this.expireTimeMillis) { + return -1; + } else if (other.expireTimeMillis < this.expireTimeMillis) { + return 1; + } else { + return 0; + } + } + + @Override + public String toString() { + return "NodeInfo{" + "instance=" + serviceInstance + + ", expireTimeMillis=" + expireTimeMillis + ", numSuccessfulTasks=" + numSuccessfulTasks + + ", numSuccessfulTasksAtLastBlacklist=" + numSuccessfulTasksAtLastBlacklist + + ", cumulativeBackoffFactor=" + cumulativeBackoffFactor + + ", numSchedulableTasks=" + numSchedulableTasks + + ", numScheduledTasks=" + numScheduledTasks + + ", disabled=" + disabled + + ", commFailures=" + hadCommFailure + +'}'; + } + } + + @VisibleForTesting + static class StatsPerDag { + int numRequestedAllocations = 0; + int numRequestsWithLocation = 0; + int numRequestsWithoutLocation = 0; + int numTotalAllocations = 0; + int numLocalAllocations = 0; + int numNonLocalAllocations = 0; + int numAllocationsNoLocalityRequest = 0; + int numRejectedTasks = 0; + int numCommFailures = 0; + int numDelayedAllocations = 0; + int numPreemptedTasks = 0; + Map<String, AtomicInteger> localityBasedNumAllocationsPerHost = new HashMap<>(); + Map<String, AtomicInteger> numAllocationsPerHost = new HashMap<>(); + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("NumPreemptedTasks=").append(numPreemptedTasks).append(", "); + sb.append("NumRequestedAllocations=").append(numRequestedAllocations).append(", "); + sb.append("NumRequestsWithlocation=").append(numRequestsWithLocation).append(", "); + sb.append("NumLocalAllocations=").append(numLocalAllocations).append(","); + sb.append("NumNonLocalAllocations=").append(numNonLocalAllocations).append(","); + sb.append("NumTotalAllocations=").append(numTotalAllocations).append(","); + sb.append("NumRequestsWithoutLocation=").append(numRequestsWithoutLocation).append(", "); + sb.append("NumRejectedTasks=").append(numRejectedTasks).append(", "); + sb.append("NumCommFailures=").append(numCommFailures).append(", "); + sb.append("NumDelayedAllocations=").append(numDelayedAllocations).append(", "); + sb.append("LocalityBasedAllocationsPerHost=").append(localityBasedNumAllocationsPerHost) + .append(", "); + sb.append("NumAllocationsPerHost=").append(numAllocationsPerHost); + return sb.toString(); + } + + void registerTaskRequest(String[] requestedHosts, String[] requestedRacks) { + numRequestedAllocations++; + // TODO Change after HIVE-9987. For now, there's no rack matching. + if (requestedHosts != null && requestedHosts.length != 0) { + numRequestsWithLocation++; + } else { + numRequestsWithoutLocation++; + } + } + + void registerTaskAllocated(String[] requestedHosts, String[] requestedRacks, + String allocatedHost) { + // TODO Change after HIVE-9987. For now, there's no rack matching. + if (requestedHosts != null && requestedHosts.length != 0) { + Set<String> requestedHostSet = new HashSet<>(Arrays.asList(requestedHosts)); + if (requestedHostSet.contains(allocatedHost)) { + numLocalAllocations++; + _registerAllocationInHostMap(allocatedHost, localityBasedNumAllocationsPerHost); + } else { + numNonLocalAllocations++; + } + } else { + numAllocationsNoLocalityRequest++; + } + numTotalAllocations++; + _registerAllocationInHostMap(allocatedHost, numAllocationsPerHost); + } + + void registerTaskPreempted(String host) { + numPreemptedTasks++; + } + + void registerCommFailure(String host) { + numCommFailures++; + } + + void registerTaskRejected(String host) { + numRejectedTasks++; + } + + void registerDelayedAllocation() { + numDelayedAllocations++; + } + + private void _registerAllocationInHostMap(String host, Map<String, AtomicInteger> hostMap) { + AtomicInteger val = hostMap.get(host); + if (val == null) { + val = new AtomicInteger(0); + hostMap.put(host, val); + } + val.incrementAndGet(); + } + } + + private static class TaskInfo { + // IDs used to ensure two TaskInfos are different without using the underlying task instance. + // Required for insertion into a TreeMap + static final AtomicLong ID_GEN = new AtomicLong(0); + final long uniqueId; + final Object task; + final Object clientCookie; + final Priority priority; + final Resource capability; + final String[] requestedHosts; + final String[] requestedRacks; + final long requestTime; + long startTime; + long preemptTime; + ContainerId containerId; + ServiceInstance assignedInstance; + private boolean assigned = false; + private boolean preempted = false; + + private int numAssignAttempts = 0; + + // TaskInfo instances for two different tasks will not be the same. Only a single instance should + // ever be created for a taskAttempt + public TaskInfo(Object task, Object clientCookie, Priority priority, Resource capability, + String[] hosts, String[] racks, long requestTime) { + this.task = task; + this.clientCookie = clientCookie; + this.priority = priority; + this.capability = capability; + this.requestedHosts = hosts; + this.requestedRacks = racks; + this.requestTime = requestTime; + this.uniqueId = ID_GEN.getAndIncrement(); + } + + void setAssignmentInfo(ServiceInstance instance, ContainerId containerId, long startTime) { + this.assignedInstance = instance; + this.containerId = containerId; + this.startTime = startTime; + assigned = true; + } + + void setPreemptedInfo(long preemptTime) { + this.preempted = true; + this.assigned = false; + this.preemptTime = preemptTime; + } + + void triedAssigningTask() { + numAssignAttempts++; + } + + int getNumPreviousAssignAttempts() { + return numAssignAttempts; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + TaskInfo taskInfo = (TaskInfo) o; + + if (uniqueId != taskInfo.uniqueId) { + return false; + } + return task.equals(taskInfo.task); + + } + + @Override + public int hashCode() { + int result = (int) (uniqueId ^ (uniqueId >>> 32)); + result = 31 * result + task.hashCode(); + return result; + } + + @Override + public String toString() { + return "TaskInfo{" + + "task=" + task + + ", priority=" + priority + + ", startTime=" + startTime + + ", containerId=" + containerId + + ", assignedInstance=" + assignedInstance + + ", uniqueId=" + uniqueId + + '}'; + } + } + + // Newer tasks first. + private static class TaskStartComparator implements Comparator<TaskInfo> { + + @Override + public int compare(TaskInfo o1, TaskInfo o2) { + if (o1.startTime > o2.startTime) { + return -1; + } else if (o1.startTime < o2.startTime) { + return 1; + } else { + // Comparing on time is not sufficient since two may be created at the same time, + // in which case inserting into a TreeSet/Map would break + if (o1.uniqueId > o2.uniqueId) { + return -1; + } else if (o1.uniqueId < o2.uniqueId) { + return 1; + } else { + return 0; + } + } + } + } + + private static class SelectHostResult { + final NodeServiceInstancePair nodeServiceInstancePair; + final ScheduleResult scheduleResult; + + SelectHostResult(ServiceInstance serviceInstance, NodeInfo nodeInfo) { + this.nodeServiceInstancePair = new NodeServiceInstancePair(serviceInstance, nodeInfo); + this.scheduleResult = ScheduleResult.SCHEDULED; + } + + SelectHostResult(ScheduleResult scheduleResult) { + this.nodeServiceInstancePair = null; + this.scheduleResult = scheduleResult; + } + } + + private static final SelectHostResult SELECT_HOST_RESULT_INADEQUATE_TOTAL_CAPACITY = + new SelectHostResult(ScheduleResult.INADEQUATE_TOTAL_RESOURCES); + private static final SelectHostResult SELECT_HOST_RESULT_DELAYED_LOCALITY = + new SelectHostResult(ScheduleResult.DELAYED_LOCALITY); + private static final SelectHostResult SELECT_HOST_RESULT_DELAYED_RESOURCES = + new SelectHostResult(ScheduleResult.DELAYED_RESOURCES); + + private static class NodeServiceInstancePair { + final NodeInfo nodeInfo; + final ServiceInstance serviceInstance; + + private NodeServiceInstancePair(ServiceInstance serviceInstance, NodeInfo nodeInfo) { + this.serviceInstance = serviceInstance; + this.nodeInfo = nodeInfo; + } + + public ServiceInstance getServiceInstance() { + return serviceInstance; + } + + public NodeInfo getNodeInfo() { + return nodeInfo; + } + } + + private static final class NodeBlacklistConf { + private final long minDelay; + private final long maxDelay; + private final float backoffFactor; + + public NodeBlacklistConf(long minDelay, long maxDelay, float backoffFactor) { + this.minDelay = minDelay; + this.maxDelay = maxDelay; + this.backoffFactor = backoffFactor; + } + + @Override + public String toString() { + return "NodeBlacklistConf{" + + "minDelay=" + minDelay + + ", maxDelay=" + maxDelay + + ", backoffFactor=" + backoffFactor + + '}'; + } + } +}
http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java new file mode 100644 index 0000000..2c3e53c --- /dev/null +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java @@ -0,0 +1,29 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.tezplugins; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.tez.mapreduce.input.MRInput; +import org.apache.tez.mapreduce.input.MRInputLegacy; +import org.apache.tez.mapreduce.input.MultiMRInput; + +@InterfaceAudience.Private +public class LlapTezUtils { + public static boolean isSourceOfInterest(String inputClassName) { + // MRInput is not of interest since it'll always be ready. + return !(inputClassName.equals(MRInputLegacy.class.getName()) || inputClassName.equals( + MultiMRInput.class.getName()) || inputClassName.equals(MRInput.class.getName())); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapUmbilicalPolicyProvider.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapUmbilicalPolicyProvider.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapUmbilicalPolicyProvider.java new file mode 100644 index 0000000..4102f5b --- /dev/null +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapUmbilicalPolicyProvider.java @@ -0,0 +1,42 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.tezplugins; + +import java.util.Arrays; + +import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; +import org.apache.hadoop.security.authorize.Service; +import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.app.security.authorize.TezAMPolicyProvider; + +public class LlapUmbilicalPolicyProvider extends TezAMPolicyProvider { + + private static Service[] services; + private static final Object servicesLock = new Object(); + + @Override + public Service[] getServices() { + if (services != null) return services; + synchronized (servicesLock) { + if (services != null) return services; + Service[] parentSvc = super.getServices(); + Service[] result = Arrays.copyOf(parentSvc, parentSvc.length + 1); + result[parentSvc.length] = new Service( + TezConstants.TEZ_AM_SECURITY_SERVICE_AUTHORIZATION_TASK_UMBILICAL, + LlapTaskUmbilicalProtocol.class); + return (services = result); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java new file mode 100644 index 0000000..d8f7574 --- /dev/null +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java @@ -0,0 +1,289 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.tezplugins.helpers; + +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto; +import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils; +import org.apache.tez.serviceplugins.api.TaskCommunicatorContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.llap.LlapNodeId; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; +import org.apache.hadoop.hive.llap.tez.Converters; +import org.apache.hadoop.hive.llap.tezplugins.LlapTaskCommunicator; +import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.runtime.api.impl.InputSpec; + +public class SourceStateTracker { + + private static final Logger LOG = LoggerFactory.getLogger(SourceStateTracker.class); + + private final TaskCommunicatorContext taskCommunicatorContext; + private final LlapTaskCommunicator taskCommunicator; + + private final QueryIdentifierProto BASE_QUERY_IDENTIFIER; + + // Tracks vertices for which notifications have been registered + private final Set<String> notificationRegisteredVertices = new HashSet<>(); + + private final Map<String, SourceInfo> sourceInfoMap = new HashMap<>(); + private final Map<LlapNodeId, NodeInfo> nodeInfoMap = new HashMap<>(); + + private volatile QueryIdentifierProto currentQueryIdentifier; + + public SourceStateTracker(TaskCommunicatorContext taskCommunicatorContext, + LlapTaskCommunicator taskCommunicator) { + this.taskCommunicatorContext = taskCommunicatorContext; + this.taskCommunicator = taskCommunicator; + BASE_QUERY_IDENTIFIER = QueryIdentifierProto.newBuilder() + .setAppIdentifier(taskCommunicatorContext.getCurrentAppIdentifier()).build(); + } + + /** + * To be invoked after each DAG completes. + */ + public synchronized void resetState(int newDagId) { + sourceInfoMap.clear(); + nodeInfoMap.clear(); + notificationRegisteredVertices.clear(); + this.currentQueryIdentifier = + QueryIdentifierProto.newBuilder(BASE_QUERY_IDENTIFIER).setDagIdentifier(newDagId).build(); + } + + /** + * Used to register a task for state updates. Effectively registers for state updates to go to the specific node. + * @param host + * @param port + * @param inputSpecList + */ + public synchronized void registerTaskForStateUpdates(String host, int port, + List<InputSpec> inputSpecList) { + + // Add tracking information. Check if source state already known and send out an update if it is. + + List<String> sourcesOfInterest = getSourceInterestList(inputSpecList); + if (sourcesOfInterest != null && !sourcesOfInterest.isEmpty()) { + LlapNodeId nodeId = LlapNodeId.getInstance(host, port); + NodeInfo nodeInfo = getNodeInfo(nodeId); + + // Set up the data structures, before any notifications come in. + for (String src : sourcesOfInterest) { + VertexState oldStateForNode = nodeInfo.getLastKnownStateForSource(src); + if (oldStateForNode == null) { + // Not registered for this node. + // Register and send state if it is successful. + SourceInfo srcInfo = getSourceInfo(src); + srcInfo.addNode(nodeId); + + nodeInfo.addSource(src, srcInfo.lastKnownState); + if (srcInfo.lastKnownState == VertexState.SUCCEEDED) { + sendStateUpdateToNode(nodeId, src, srcInfo.lastKnownState); + } + + } else { + // Already registered to send updates to this node for the specific source. + // Nothing to do for now, unless tracking tasks at a later point. + } + + // Setup for actual notifications, if not already done for a previous task. + maybeRegisterForVertexUpdates(src); + } + } else { + // Don't need to track anything for this task. No new notifications, etc. + } + } + + /** + * Handled notifications on state updates for sources + * @param sourceName + * @param sourceState + */ + public synchronized void sourceStateUpdated(String sourceName, VertexState sourceState) { + SourceInfo sourceInfo = getSourceInfo(sourceName); + // Update source info if the state is SUCCEEDED + if (sourceState == VertexState.SUCCEEDED) { + sourceInfo.numCompletedTasks = taskCommunicatorContext.getVertexCompletedTaskCount(sourceName); + sourceInfo.numTasks = taskCommunicatorContext.getVertexTotalTaskCount(sourceName); + } + sourceInfo.lastKnownState = sourceState; + // Checking state per node for future failure handling scenarios, where an update + // to a single node may fail. + for (LlapNodeId nodeId : sourceInfo.getInterestedNodes()) { + NodeInfo nodeInfo = nodeInfoMap.get(nodeId); + VertexState lastStateForNode = nodeInfo.getLastKnownStateForSource(sourceName); + // Send only if the state has changed. + if (lastStateForNode != sourceState) { + nodeInfo.setLastKnownStateForSource(sourceName, sourceState); + sendStateUpdateToNode(nodeId, sourceName, sourceState); + } + } + } + + + // Assumes serialized DAGs within an AM, and a reset of structures after each DAG completes. + /** + * Constructs FragmentRuntimeInfo for scheduling within LLAP daemons. + * Also caches state based on state updates. + * @param vertexName + * @param fragmentNumber + * @param priority + * @return + */ + public synchronized FragmentRuntimeInfo getFragmentRuntimeInfo(String vertexName, int fragmentNumber, + int priority) { + FragmentRuntimeInfo.Builder builder = FragmentRuntimeInfo.newBuilder(); + maybeRegisterForVertexUpdates(vertexName); + + MutableInt totalTaskCount = new MutableInt(0); + MutableInt completedTaskCount = new MutableInt(0); + computeUpstreamTaskCounts(completedTaskCount, totalTaskCount, vertexName); + + builder.setNumSelfAndUpstreamCompletedTasks(completedTaskCount.intValue()); + builder.setNumSelfAndUpstreamTasks(totalTaskCount.intValue()); + builder.setDagStartTime(taskCommunicatorContext.getDagStartTime()); + builder.setWithinDagPriority(priority); + builder.setFirstAttemptStartTime(taskCommunicatorContext.getFirstAttemptStartTime(vertexName, fragmentNumber)); + builder.setCurrentAttemptStartTime(System.currentTimeMillis()); + return builder.build(); + } + + private void computeUpstreamTaskCounts(MutableInt completedTaskCount, MutableInt totalTaskCount, String sourceName) { + SourceInfo sourceInfo = getSourceInfo(sourceName); + if (sourceInfo.lastKnownState == VertexState.SUCCEEDED) { + // Some of the information in the source is complete. Don't need to fetch it from the context. + completedTaskCount.add(sourceInfo.numCompletedTasks); + totalTaskCount.add(sourceInfo.numTasks); + } else { + completedTaskCount.add(taskCommunicatorContext.getVertexCompletedTaskCount(sourceName)); + int totalCount =taskCommunicatorContext.getVertexTotalTaskCount(sourceName); + // Uninitialized vertices will report count as 0. + totalCount = totalCount == -1 ? 0 : totalCount; + totalTaskCount.add(totalCount); + } + + // Walk through all the source vertices + for (String up : taskCommunicatorContext.getInputVertexNames(sourceName)) { + computeUpstreamTaskCounts(completedTaskCount, totalTaskCount, up); + } + } + + private static class SourceInfo { + + // Always start in the running state. Requests for state updates will be sent out after registration. + private VertexState lastKnownState = VertexState.RUNNING; + + // Used for sending notifications about a vertex completed. For canFinish + // Can be converted to a Tez event, if this is sufficient to decide on pre-emption + private final List<LlapNodeId> interestedNodes = new LinkedList<>(); + + // Used for sending information for scheduling priority. + private int numTasks; + private int numCompletedTasks; + + void addNode(LlapNodeId nodeId) { + interestedNodes.add(nodeId); + } + + List<LlapNodeId> getInterestedNodes() { + return this.interestedNodes; + } + + + + } + + private synchronized SourceInfo getSourceInfo(String srcName) { + SourceInfo sourceInfo = sourceInfoMap.get(srcName); + if (sourceInfo == null) { + sourceInfo = new SourceInfo(); + sourceInfoMap.put(srcName, sourceInfo); + } + return sourceInfo; + } + + + private static class NodeInfo { + private final Map<String, VertexState> sourcesOfInterest = new HashMap<>(); + + void addSource(String srcName, VertexState sourceState) { + sourcesOfInterest.put(srcName, sourceState); + } + + VertexState getLastKnownStateForSource(String src) { + return sourcesOfInterest.get(src); + } + + void setLastKnownStateForSource(String src, VertexState state) { + sourcesOfInterest.put(src, state); + } + } + + private synchronized NodeInfo getNodeInfo(LlapNodeId llapNodeId) { + NodeInfo nodeInfo = nodeInfoMap.get(llapNodeId); + if (nodeInfo == null) { + nodeInfo = new NodeInfo(); + nodeInfoMap.put(llapNodeId, nodeInfo); + } + return nodeInfo; + } + + + private List<String> getSourceInterestList(List<InputSpec> inputSpecList) { + List<String> sourcesOfInterest = Collections.emptyList(); + if (inputSpecList != null) { + boolean alreadyFound = false; + for (InputSpec inputSpec : inputSpecList) { + if (LlapTezUtils.isSourceOfInterest(inputSpec.getInputDescriptor().getClassName())) { + if (!alreadyFound) { + alreadyFound = true; + sourcesOfInterest = new LinkedList<>(); + } + sourcesOfInterest.add(inputSpec.getSourceVertexName()); + } + } + } + return sourcesOfInterest; + } + + + private void maybeRegisterForVertexUpdates(String sourceName) { + if (!notificationRegisteredVertices.contains(sourceName)) { + notificationRegisteredVertices.add(sourceName); + taskCommunicatorContext.registerForVertexStateUpdates(sourceName, EnumSet.of( + VertexState.RUNNING, VertexState.SUCCEEDED)); + } + } + + + + void sendStateUpdateToNode(LlapNodeId nodeId, String sourceName, VertexState state) { + taskCommunicator.sendStateUpdate(nodeId.getHostname(), nodeId.getPort(), + SourceStateUpdatedRequestProto.newBuilder().setQueryIdentifier(currentQueryIdentifier) + .setSrcName(sourceName).setState(Converters.fromVertexState(state)).build()); + } + + +} http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java new file mode 100644 index 0000000..8f3d104 --- /dev/null +++ b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java @@ -0,0 +1,100 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.tezplugins; + + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +import org.apache.hadoop.hive.llap.LlapNodeId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.junit.Test; + +public class TestLlapTaskCommunicator { + + @Test (timeout = 5000) + public void testEntityTracker1() { + LlapTaskCommunicator.EntityTracker entityTracker = new LlapTaskCommunicator.EntityTracker(); + + String host1 = "host1"; + String host2 = "host2"; + String host3 = "host3"; + int port = 1451; + + + // Simple container registration and un-registration without any task attempt being involved. + ContainerId containerId101 = constructContainerId(101); + entityTracker.registerContainer(containerId101, host1, port); + assertEquals(LlapNodeId.getInstance(host1, port), entityTracker.getNodeIdForContainer(containerId101)); + + entityTracker.unregisterContainer(containerId101); + assertNull(entityTracker.getContainerAttemptMapForNode(LlapNodeId.getInstance(host1, port))); + assertNull(entityTracker.getNodeIdForContainer(containerId101)); + assertEquals(0, entityTracker.nodeMap.size()); + assertEquals(0, entityTracker.attemptToNodeMap.size()); + assertEquals(0, entityTracker.containerToNodeMap.size()); + + + // Simple task registration and un-registration. + ContainerId containerId1 = constructContainerId(1); + TezTaskAttemptID taskAttemptId1 = constructTaskAttemptId(1); + entityTracker.registerTaskAttempt(containerId1, taskAttemptId1, host1, port); + assertEquals(LlapNodeId.getInstance(host1, port), entityTracker.getNodeIdForContainer(containerId1)); + assertEquals(LlapNodeId.getInstance(host1, port), entityTracker.getNodeIdForTaskAttempt(taskAttemptId1)); + + entityTracker.unregisterTaskAttempt(taskAttemptId1); + assertNull(entityTracker.getContainerAttemptMapForNode(LlapNodeId.getInstance(host1, port))); + assertNull(entityTracker.getNodeIdForContainer(containerId1)); + assertNull(entityTracker.getNodeIdForTaskAttempt(taskAttemptId1)); + assertEquals(0, entityTracker.nodeMap.size()); + assertEquals(0, entityTracker.attemptToNodeMap.size()); + assertEquals(0, entityTracker.containerToNodeMap.size()); + + // Register taskAttempt, unregister container. TaskAttempt should also be unregistered + ContainerId containerId201 = constructContainerId(201); + TezTaskAttemptID taskAttemptId201 = constructTaskAttemptId(201); + entityTracker.registerTaskAttempt(containerId201, taskAttemptId201, host1, port); + assertEquals(LlapNodeId.getInstance(host1, port), entityTracker.getNodeIdForContainer(containerId201)); + assertEquals(LlapNodeId.getInstance(host1, port), entityTracker.getNodeIdForTaskAttempt(taskAttemptId201)); + + entityTracker.unregisterContainer(containerId201); + assertNull(entityTracker.getContainerAttemptMapForNode(LlapNodeId.getInstance(host1, port))); + assertNull(entityTracker.getNodeIdForContainer(containerId201)); + assertNull(entityTracker.getNodeIdForTaskAttempt(taskAttemptId201)); + assertEquals(0, entityTracker.nodeMap.size()); + assertEquals(0, entityTracker.attemptToNodeMap.size()); + assertEquals(0, entityTracker.containerToNodeMap.size()); + + entityTracker.unregisterTaskAttempt(taskAttemptId201); // No errors + } + + + private ContainerId constructContainerId(int id) { + ContainerId containerId = mock(ContainerId.class); + doReturn(id).when(containerId).getId(); + doReturn((long)id).when(containerId).getContainerId(); + return containerId; + } + + private TezTaskAttemptID constructTaskAttemptId(int id) { + TezTaskAttemptID taskAttemptId = mock(TezTaskAttemptID.class); + doReturn(id).when(taskAttemptId).getId(); + return taskAttemptId; + } + +}