http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
new file mode 100644
index 0000000..3bca0da
--- /dev/null
+++ 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -0,0 +1,1512 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.tezplugins;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.registry.ServiceInstance;
+import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
+import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.Clock;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.TaskScheduler;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LlapTaskSchedulerService extends TaskScheduler {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(LlapTaskSchedulerService.class);
+
+  private final Configuration conf;
+
+  // interface into the registry service
+  private ServiceInstanceSet activeInstances;
+
+  // Tracks all instances, including ones which have been disabled in the past.
+  // LinkedHashMap to provide the same iteration order when selecting a random 
host.
+  @VisibleForTesting
+  final Map<ServiceInstance, NodeInfo> instanceToNodeMap = new 
LinkedHashMap<>();
+  // TODO Ideally, remove elements from this once it's known that no tasks are 
linked to the instance (all deallocated)
+
+  // Tracks tasks which could not be allocated immediately.
+  @VisibleForTesting
+  final TreeMap<Priority, List<TaskInfo>> pendingTasks = new TreeMap<>(new 
Comparator<Priority>() {
+    @Override
+    public int compare(Priority o1, Priority o2) {
+      return o1.getPriority() - o2.getPriority();
+    }
+  });
+
+  // Tracks running and queued tasks. Cleared after a task completes.
+  private final ConcurrentMap<Object, TaskInfo> knownTasks = new 
ConcurrentHashMap<>();
+  private final TreeMap<Integer, TreeSet<TaskInfo>> runningTasks = new 
TreeMap<>();
+  private static final TaskStartComparator TASK_INFO_COMPARATOR = new 
TaskStartComparator();
+
+  // Queue for disabled nodes. Nodes make it out of this queue when their 
expiration timeout is hit.
+  @VisibleForTesting
+  final DelayQueue<NodeInfo> disabledNodesQueue = new DelayQueue<>();
+
+  private final boolean forceLocation;
+
+  private final ContainerFactory containerFactory;
+  private final Random random = new Random();
+  private final Clock clock;
+
+  private final ListeningExecutorService nodeEnabledExecutor;
+  private final NodeEnablerCallable nodeEnablerCallable =
+      new NodeEnablerCallable();
+
+  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+  private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
+  private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
+
+  private final Lock scheduleLock = new ReentrantLock();
+  private final Condition scheduleCondition = scheduleLock.newCondition();
+  private final AtomicBoolean pendingScheduleInvodations = new 
AtomicBoolean(false);
+  private final ListeningExecutorService schedulerExecutor;
+  private final SchedulerCallable schedulerCallable = new SchedulerCallable();
+
+  private final AtomicBoolean isStopped = new AtomicBoolean(false);
+  // Tracks total pending preemptions.
+  private final AtomicInteger pendingPreemptions = new AtomicInteger(0);
+  // Tracks pending preemptions per host, using the hostname || Always to be 
accessed inside a lock
+  private final Map<String, MutableInt> pendingPreemptionsPerHost = new 
HashMap<>();
+
+  private final NodeBlacklistConf nodeBlacklistConf;
+
+  // Per daemon
+  private final int memoryPerInstance;
+  private final int coresPerInstance;
+  private final int executorsPerInstance;
+
+  private final int numSchedulableTasksPerNode;
+
+  // Per Executor Thread
+  private final Resource resourcePerExecutor;
+
+  private final LlapRegistryService registry = new LlapRegistryService(false);
+
+  private volatile ListenableFuture<Void> nodeEnablerFuture;
+  private volatile ListenableFuture<Void> schedulerFuture;
+
+  @VisibleForTesting
+  private final AtomicInteger dagCounter = new AtomicInteger(1);
+  // Statistics to track allocations
+  // All of stats variables are visible for testing.
+  @VisibleForTesting
+  StatsPerDag dagStats = new StatsPerDag();
+
+  public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) {
+    this(taskSchedulerContext, new SystemClock());
+  }
+
+  @VisibleForTesting
+  public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, 
Clock clock) {
+    super(taskSchedulerContext);
+    this.clock = clock;
+    try {
+      this.conf = 
TezUtils.createConfFromUserPayload(taskSchedulerContext.getInitialUserPayload());
+    } catch (IOException e) {
+      throw new TezUncheckedException(
+          "Failed to parse user payload for " + 
LlapTaskSchedulerService.class.getSimpleName(), e);
+    }
+    this.containerFactory = new 
ContainerFactory(taskSchedulerContext.getApplicationAttemptId(),
+        taskSchedulerContext.getCustomClusterIdentifier());
+    this.memoryPerInstance = HiveConf.getIntVar(conf, 
ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB);
+    this.coresPerInstance = HiveConf.getIntVar(conf, 
ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE);
+    this.executorsPerInstance = HiveConf.getIntVar(conf, 
ConfVars.LLAP_DAEMON_NUM_EXECUTORS);
+    this.nodeBlacklistConf = new NodeBlacklistConf(
+        HiveConf.getTimeVar(conf, 
ConfVars.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MS,
+            TimeUnit.MILLISECONDS),
+        HiveConf.getTimeVar(conf, 
ConfVars.LLAP_TASK_SCHEDULER_NODE_REENABLE_MAX_TIMEOUT_MS,
+            TimeUnit.MILLISECONDS),
+        HiveConf.getFloatVar(conf, 
ConfVars.LLAP_TASK_SCHEDULER_NODE_DISABLE_BACK_OFF_FACTOR));
+
+    this.numSchedulableTasksPerNode =
+        HiveConf.getIntVar(conf, 
ConfVars.LLAP_TASK_SCHEDULER_NUM_SCHEDULABLE_TASKS_PER_NODE);
+
+    long localityDelayMs = HiveConf
+        .getTimeVar(conf, ConfVars.LLAP_TASK_SCHEDULER_LOCALITY_DELAY, 
TimeUnit.MILLISECONDS);
+    if (localityDelayMs == -1) {
+      this.forceLocation = true;
+    } else {
+      this.forceLocation = false;
+    }
+
+    int memoryPerExecutor = (int) (memoryPerInstance / (float) 
executorsPerInstance);
+    int coresPerExecutor = (int) (coresPerInstance / (float) 
executorsPerInstance);
+    this.resourcePerExecutor = Resource.newInstance(memoryPerExecutor, 
coresPerExecutor);
+
+    String instanceId = HiveConf.getTrimmedVar(conf, 
ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
+
+    Preconditions.checkNotNull(instanceId, 
ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname
+        + " must be defined");
+
+    ExecutorService executorServiceRaw =
+        Executors.newFixedThreadPool(1,
+            new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapSchedulerNodeEnabler").build());
+    nodeEnabledExecutor = MoreExecutors.listeningDecorator(executorServiceRaw);
+
+    ExecutorService schedulerExecutorServiceRaw = 
Executors.newFixedThreadPool(1,
+        new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapScheduler").build());
+    schedulerExecutor = 
MoreExecutors.listeningDecorator(schedulerExecutorServiceRaw);
+
+    LOG.info("Running with configuration: " + "memoryPerInstance=" + 
memoryPerInstance
+        + ", vCoresPerInstance=" + coresPerInstance + ", executorsPerInstance="
+        + executorsPerInstance + ", resourcePerInstanceInferred=" + 
resourcePerExecutor
+        + ", nodeBlacklistConf=" + nodeBlacklistConf
+        + ", forceLocation=" + forceLocation);
+  }
+
+  @Override
+  public void initialize() {
+    registry.init(conf);
+  }
+
+  @Override
+  public void start() throws IOException {
+    writeLock.lock();
+    try {
+      nodeEnablerFuture = nodeEnabledExecutor.submit(nodeEnablerCallable);
+      Futures.addCallback(nodeEnablerFuture, new FutureCallback<Void>() {
+        @Override
+        public void onSuccess(Void result) {
+          LOG.info("NodeEnabledThread exited");
+        }
+
+        @Override
+        public void onFailure(Throwable t) {
+          LOG.warn("NodeEnabledThread exited with error", t);
+        }
+      });
+      schedulerFuture = schedulerExecutor.submit(schedulerCallable);
+      Futures.addCallback(schedulerFuture, new FutureCallback<Void>() {
+        @Override
+        public void onSuccess(Void result) {
+          LOG.info("SchedulerThread exited");
+        }
+
+        @Override
+        public void onFailure(Throwable t) {
+          LOG.warn("SchedulerThread exited with error", t);
+        }
+      });
+      registry.start();
+      activeInstances = registry.getInstances();
+      for (ServiceInstance inst : activeInstances.getAll().values()) {
+        addNode(inst, new NodeInfo(inst, nodeBlacklistConf, clock, 
numSchedulableTasksPerNode));
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override
+  public void shutdown() {
+    writeLock.lock();
+    try {
+      if (!this.isStopped.getAndSet(true)) {
+        nodeEnablerCallable.shutdown();
+        if (nodeEnablerFuture != null) {
+          nodeEnablerFuture.cancel(true);
+        }
+        nodeEnabledExecutor.shutdownNow();
+
+        schedulerCallable.shutdown();
+        if (schedulerFuture != null) {
+          schedulerFuture.cancel(true);
+        }
+        schedulerExecutor.shutdownNow();
+
+        if (registry != null) {
+          registry.stop();
+        }
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override
+  public Resource getTotalResources() {
+    int memory = 0;
+    int vcores = 0;
+    readLock.lock();
+    try {
+      for (ServiceInstance inst : activeInstances.getAll().values()) {
+        if (inst.isAlive()) {
+          Resource r = inst.getResource();
+          LOG.info("Found instance " + inst);
+          memory += r.getMemory();
+          vcores += r.getVirtualCores();
+        } else {
+          LOG.info("Ignoring dead instance " + inst);
+        }
+      }
+    } finally {
+      readLock.unlock();
+    }
+
+    return Resource.newInstance(memory, vcores);
+  }
+
+  /**
+   * The difference between this and getTotalResources() is that this only 
gives currently free
+   * resource instances, while the other lists all the instances that may 
become available in a
+   * while.
+   */
+  @Override
+  public Resource getAvailableResources() {
+    // need a state store eventually for current state & measure backoffs
+    int memory = 0;
+    int vcores = 0;
+    readLock.lock();
+    try {
+      for (Entry<ServiceInstance, NodeInfo> entry : 
instanceToNodeMap.entrySet()) {
+        if (entry.getKey().isAlive() && !entry.getValue().isDisabled()) {
+          Resource r = entry.getKey().getResource();
+          memory += r.getMemory();
+          vcores += r.getVirtualCores();
+        }
+      }
+    } finally {
+      readLock.unlock();
+    }
+
+    return Resource.newInstance(memory, vcores);
+  }
+
+  @Override
+  public int getClusterNodeCount() {
+    readLock.lock();
+    try {
+      int n = 0;
+      for (ServiceInstance inst : activeInstances.getAll().values()) {
+        if (inst.isAlive()) {
+          n++;
+        }
+      }
+      return n;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public void dagComplete() {
+    // This is effectively DAG completed, and can be used to reset statistics 
being tracked.
+    LOG.info("DAG: " + dagCounter.get() + " completed. Scheduling stats: " + 
dagStats);
+    dagCounter.incrementAndGet();
+    dagStats = new StatsPerDag();
+  }
+
+  @Override
+  public void blacklistNode(NodeId nodeId) {
+    LOG.info("BlacklistNode not supported");
+  }
+
+  @Override
+  public void unblacklistNode(NodeId nodeId) {
+    LOG.info("unBlacklistNode not supported");
+  }
+
+  @Override
+  public void allocateTask(Object task, Resource capability, String[] hosts, 
String[] racks,
+      Priority priority, Object containerSignature, Object clientCookie) {
+    TaskInfo taskInfo =
+        new TaskInfo(task, clientCookie, priority, capability, hosts, racks, 
clock.getTime());
+    writeLock.lock();
+    try {
+      dagStats.registerTaskRequest(hosts, racks);
+    } finally {
+      writeLock.unlock();
+    }
+    addPendingTask(taskInfo);
+    trySchedulingPendingTasks();
+  }
+
+  @Override
+  public void allocateTask(Object task, Resource capability, ContainerId 
containerId,
+      Priority priority, Object containerSignature, Object clientCookie) {
+    // Container affinity can be implemented as Host affinity for LLAP. Not 
required until
+    // 1:1 edges are used in Hive.
+    TaskInfo taskInfo =
+        new TaskInfo(task, clientCookie, priority, capability, null, null, 
clock.getTime());
+    writeLock.lock();
+    try {
+      dagStats.registerTaskRequest(null, null);
+    } finally {
+      writeLock.unlock();
+    }
+    addPendingTask(taskInfo);
+    trySchedulingPendingTasks();
+  }
+
+
+  // This may be invoked before a container is ever assigned to a task. 
allocateTask... app decides
+  // the task is no longer required, and asks for a de-allocation.
+  @Override
+  public boolean deallocateTask(Object task, boolean taskSucceeded, 
TaskAttemptEndReason endReason, String diagnostics) {
+    writeLock.lock(); // Updating several local structures
+    TaskInfo taskInfo;
+    try {
+      taskInfo = unregisterTask(task);
+      if (taskInfo == null) {
+        LOG.error("Could not determine ContainerId for task: "
+            + task
+            + " . Could have hit a race condition. Ignoring."
+            + " The query may hang since this \"unknown\" container is now 
taking up a slot permanently");
+        return false;
+      }
+      if (taskInfo.containerId == null) {
+        if (taskInfo.assigned) {
+          LOG.error("Task: "
+              + task
+              + " assigned, but could not find the corresponding containerId."
+              + " The query may hang since this \"unknown\" container is now 
taking up a slot permanently");
+        } else {
+          LOG.info("Ignoring deallocate request for task " + task
+              + " which hasn't been assigned to a container");
+          removePendingTask(taskInfo);
+        }
+        return false;
+      }
+      ServiceInstance assignedInstance = taskInfo.assignedInstance;
+      assert assignedInstance != null;
+
+      NodeInfo nodeInfo = instanceToNodeMap.get(assignedInstance);
+      assert nodeInfo != null;
+
+      // Re-enable the node if preempted
+      if (taskInfo.preempted) {
+        LOG.info("Processing deallocateTask for {} which was preempted, 
EndReason={}", task, endReason);
+        unregisterPendingPreemption(taskInfo.assignedInstance.getHost());
+        nodeInfo.registerUnsuccessfulTaskEnd(true);
+        if (nodeInfo.isDisabled()) {
+          // Re-enable the node. If a task succeeded, a slot may have become 
available.
+          // Also reset commFailures since a task was able to communicate back 
and indicate success.
+          nodeInfo.enableNode();
+          // Re-insert into the queue to force the poll thread to remove the 
element.
+          if ( disabledNodesQueue.remove(nodeInfo)) {
+            disabledNodesQueue.add(nodeInfo);
+          }
+        }
+        // In case of success, trigger a scheduling run for pending tasks.
+        trySchedulingPendingTasks();
+      } else {
+        if (taskSucceeded) {
+          // The node may have been blacklisted at this point - which means it 
may not be in the
+          // activeNodeList.
+
+          nodeInfo.registerTaskSuccess();
+
+          if (nodeInfo.isDisabled()) {
+            // Re-enable the node. If a task succeeded, a slot may have become 
available.
+            // Also reset commFailures since a task was able to communicate 
back and indicate success.
+            nodeInfo.enableNode();
+            // Re-insert into the queue to force the poll thread to remove the 
element.
+            if (disabledNodesQueue.remove(nodeInfo)) {
+              disabledNodesQueue.add(nodeInfo);
+            }
+          }
+          // In case of success, trigger a scheduling run for pending tasks.
+          trySchedulingPendingTasks();
+
+        } else if (!taskSucceeded) {
+          nodeInfo.registerUnsuccessfulTaskEnd(false);
+          if (endReason != null && EnumSet
+              .of(TaskAttemptEndReason.EXECUTOR_BUSY, 
TaskAttemptEndReason.COMMUNICATION_ERROR)
+              .contains(endReason)) {
+            if (endReason == TaskAttemptEndReason.COMMUNICATION_ERROR) {
+              
dagStats.registerCommFailure(taskInfo.assignedInstance.getHost());
+            } else if (endReason == TaskAttemptEndReason.EXECUTOR_BUSY) {
+              
dagStats.registerTaskRejected(taskInfo.assignedInstance.getHost());
+            }
+          }
+          boolean commFailure =
+              endReason != null && endReason == 
TaskAttemptEndReason.COMMUNICATION_ERROR;
+          disableInstance(assignedInstance, commFailure);
+        }
+      }
+    } finally {
+      writeLock.unlock();
+    }
+    getContext().containerBeingReleased(taskInfo.containerId);
+    return true;
+  }
+
+  @Override
+  public Object deallocateContainer(ContainerId containerId) {
+    LOG.debug("Ignoring deallocateContainer for containerId: " + containerId);
+    // Containers are not being tracked for re-use.
+    // This is safe to ignore since a deallocate task will come in.
+    return null;
+  }
+
+  @Override
+  public void setShouldUnregister() {
+
+  }
+
+  @Override
+  public boolean hasUnregistered() {
+    // Nothing to do. No registration involved.
+    return true;
+  }
+
+  private ExecutorService createAppCallbackExecutorService() {
+    return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+        .setNameFormat("TaskSchedulerAppCaller #%d").setDaemon(true).build());
+  }
+
+  /**
+   * @param request the list of preferred hosts. null implies any host
+   * @return
+   */
+  private SelectHostResult selectHost(TaskInfo request) {
+    String[] requestedHosts = request.requestedHosts;
+    readLock.lock(); // Read-lock. Not updating any stats at the moment.
+    try {
+      // Check if any hosts are active.
+      if (getAvailableResources().getMemory() <= 0) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Refreshing instances since total memory is 0");
+        }
+        refreshInstances();
+      }
+
+      // If there's no memory available, fail
+      if (getTotalResources().getMemory() <= 0) {
+        return SELECT_HOST_RESULT_INADEQUATE_TOTAL_CAPACITY;
+      }
+
+      if (requestedHosts != null && requestedHosts.length > 0) {
+        int prefHostCount = -1;
+        boolean requestedHostExists = false;
+        for (String host : requestedHosts) {
+          prefHostCount++;
+          // Pick the first host always. Weak attempt at cache affinity.
+          Set<ServiceInstance> instances = activeInstances.getByHost(host);
+          if (!instances.isEmpty()) {
+            requestedHostExists = true;
+            for (ServiceInstance inst : instances) {
+              NodeInfo nodeInfo = instanceToNodeMap.get(inst);
+              if (nodeInfo != null && nodeInfo.canAcceptTask()) {
+                LOG.info("Assigning " + inst + " when looking for " + host + 
"." +
+                    " FirstRequestedHost=" + (prefHostCount == 0) +
+                    (requestedHosts.length > 1 ? "#prefLocations=" + 
requestedHosts.length : ""));
+                return new SelectHostResult(inst, nodeInfo);
+              }
+            }
+          }
+        }
+        // Check if forcing the location is required.
+        if (forceLocation) {
+          if (requestedHostExists) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Skipping non-local location allocation for [" + 
request.task +
+                  "] when trying to allocate on [" + 
Arrays.toString(requestedHosts) + "]");
+            }
+            return SELECT_HOST_RESULT_DELAYED_LOCALITY;
+          } else {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Not skipping non-local location allocation for [" + 
request.task +
+                  "] when trying to allocate on [" + 
Arrays.toString(requestedHosts) +
+                  "] since none of these hosts are part of the known list");
+            }
+          }
+        }
+      }
+      /* fall through - miss in locality (random scheduling) */
+      Entry<ServiceInstance, NodeInfo>[] all =
+          instanceToNodeMap.entrySet().toArray(new 
Entry[instanceToNodeMap.size()]);
+      // Check again
+      if (all.length > 0) {
+        int n = random.nextInt(all.length);
+        // start at random offset and iterate whole list
+        for (int i = 0; i < all.length; i++) {
+          Entry<ServiceInstance, NodeInfo> inst = all[(i + n) % all.length];
+          if (inst.getValue().canAcceptTask()) {
+            LOG.info("Assigning " + inst + " when looking for any host, from 
#hosts=" + all.length +
+                ", requestedHosts=" +
+                ((requestedHosts == null || requestedHosts.length == 0) ? 
"null" :
+                    Arrays.toString(requestedHosts)));
+            return new SelectHostResult(inst.getKey(), inst.getValue());
+          }
+        }
+      }
+      return SELECT_HOST_RESULT_DELAYED_RESOURCES;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  // TODO Each refresh operation should addNodes if they don't already exist.
+  // Even better would be to get notifications from the service impl when a 
node gets added or removed.
+  // Instead of having to walk through the entire list. The computation of a 
node getting added or
+  // removed already exists in the DynamicRegistry implementation.
+  private void refreshInstances() {
+    try {
+      activeInstances.refresh(); // handles its own sync
+    } catch (IOException ioe) {
+      LOG.warn("Could not refresh list of active instances", ioe);
+    }
+  }
+
+  private void scanForNodeChanges() {
+    /* check again whether nodes are disabled or just missing */
+    writeLock.lock();
+    try {
+      for (ServiceInstance inst : activeInstances.getAll().values()) {
+        if (inst.isAlive() && instanceToNodeMap.containsKey(inst) == false) {
+          /* that's a good node, not added to the allocations yet */
+          LOG.info("Found a new node: " + inst + ".");
+          addNode(inst, new NodeInfo(inst, nodeBlacklistConf, clock, 
numSchedulableTasksPerNode));
+        }
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  private void addNode(ServiceInstance inst, NodeInfo node) {
+    LOG.info("Adding node: " + inst);
+    instanceToNodeMap.put(inst, node);
+    // Trigger scheduling since a new node became available.
+    trySchedulingPendingTasks();
+  }
+
+  private void reenableDisabledNode(NodeInfo nodeInfo) {
+    writeLock.lock();
+    try {
+      if (nodeInfo.hadCommFailure()) {
+        // If the node being re-enabled was not marked busy previously, then 
it was disabled due to
+        // some other failure. Refresh the service list to see if it's been 
removed permanently.
+        refreshInstances();
+      }
+      LOG.info("Attempting to re-enable node: " + 
nodeInfo.getServiceInstance());
+      if (nodeInfo.getServiceInstance().isAlive()) {
+        nodeInfo.enableNode();
+      } else {
+        if (LOG.isInfoEnabled()) {
+          LOG.info("Removing dead node " + nodeInfo);
+        }
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  private void disableInstance(ServiceInstance instance, boolean 
isCommFailure) {
+    writeLock.lock();
+    try {
+      NodeInfo nodeInfo = instanceToNodeMap.get(instance);
+      if (nodeInfo == null || nodeInfo.isDisabled()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Node: " + instance + " already disabled, or invalid. Not 
doing anything.");
+        }
+      } else {
+        nodeInfo.disableNode(isCommFailure);
+        // TODO: handle task to container map events in case of hard failures
+        disabledNodesQueue.add(nodeInfo);
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  private void addPendingTask(TaskInfo taskInfo) {
+    writeLock.lock();
+    try {
+      List<TaskInfo> tasksAtPriority = pendingTasks.get(taskInfo.priority);
+      if (tasksAtPriority == null) {
+        tasksAtPriority = new LinkedList<>();
+        pendingTasks.put(taskInfo.priority, tasksAtPriority);
+      }
+      tasksAtPriority.add(taskInfo);
+      knownTasks.putIfAbsent(taskInfo.task, taskInfo);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  /* Remove a task from the pending list */
+  private void removePendingTask(TaskInfo taskInfo) {
+    writeLock.lock();
+    try {
+      Priority priority = taskInfo.priority;
+      List<TaskInfo> taskInfoList = pendingTasks.get(priority);
+      if (taskInfoList == null || taskInfoList.isEmpty() || 
!taskInfoList.remove(taskInfo)) {
+        LOG.warn("Could not find task: " + taskInfo.task + " in pending list, 
at priority: "
+            + priority);
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  /* Register a running task into the runningTasks structure */
+  private void registerRunningTask(TaskInfo taskInfo) {
+    writeLock.lock();
+    try {
+      int priority = taskInfo.priority.getPriority();
+      TreeSet<TaskInfo> tasksAtpriority = runningTasks.get(priority);
+      if (tasksAtpriority == null) {
+        tasksAtpriority = new TreeSet<>(TASK_INFO_COMPARATOR);
+        runningTasks.put(priority, tasksAtpriority);
+      }
+      tasksAtpriority.add(taskInfo);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  /* Unregister a task from the known and running structures */
+  private TaskInfo unregisterTask(Object task) {
+    writeLock.lock();
+    try {
+      TaskInfo taskInfo = knownTasks.remove(task);
+      if (taskInfo != null) {
+        if (taskInfo.assigned) {
+          // Remove from the running list.
+          int priority = taskInfo.priority.getPriority();
+          Set<TaskInfo> tasksAtPriority = runningTasks.get(priority);
+          Preconditions.checkState(tasksAtPriority != null,
+              "runningTasks should contain an entry if the task was in running 
state. Caused by task: {}", task);
+          tasksAtPriority.remove(taskInfo);
+          if (tasksAtPriority.isEmpty()) {
+            runningTasks.remove(priority);
+          }
+        }
+      } else {
+        LOG.warn("Could not find TaskInfo for task: {}. Not removing it from 
the running set", task);
+      }
+      return taskInfo;
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  private enum ScheduleResult {
+    // Successfully scheduled
+    SCHEDULED,
+
+    // Delayed to find a local match
+    DELAYED_LOCALITY,
+
+    // Delayed due to temporary resource availability
+    DELAYED_RESOURCES,
+
+    // Inadequate total resources - will never succeed / wait for new 
executors to become available
+    INADEQUATE_TOTAL_RESOURCES,
+  }
+
+  @VisibleForTesting
+  protected void schedulePendingTasks() {
+    writeLock.lock();
+    try {
+      Iterator<Entry<Priority, List<TaskInfo>>> pendingIterator =
+          pendingTasks.entrySet().iterator();
+      while (pendingIterator.hasNext()) {
+        Entry<Priority, List<TaskInfo>> entry = pendingIterator.next();
+        List<TaskInfo> taskListAtPriority = entry.getValue();
+        Iterator<TaskInfo> taskIter = taskListAtPriority.iterator();
+        boolean scheduledAllAtPriority = true;
+        while (taskIter.hasNext()) {
+
+          // TODO Optimization: Add a check to see if there's any capacity 
available. No point in
+          // walking through all active nodes, if they don't have potential 
capacity.
+
+          TaskInfo taskInfo = taskIter.next();
+          if (taskInfo.getNumPreviousAssignAttempts() == 1) {
+            dagStats.registerDelayedAllocation();
+          }
+          taskInfo.triedAssigningTask();
+          ScheduleResult scheduleResult = scheduleTask(taskInfo);
+          if (scheduleResult == ScheduleResult.SCHEDULED) {
+            taskIter.remove();
+          } else {
+            // TODO Handle INADEQUATE_TOTAL_RESOURCES eventually - either by 
throwin an error immediately,
+            // or waiting for some timeout for new executors and then throwing 
an error
+
+            // Try pre-empting a task so that a higher priority task can take 
it's place.
+            // Preempt only if there's no pending preemptions to avoid 
preempting twice for a task.
+            String[] potentialHosts;
+            if (scheduleResult == ScheduleResult.DELAYED_LOCALITY) {
+              // preempt only on specific hosts, if no preemptions already 
exist on those.
+              potentialHosts = taskInfo.requestedHosts;
+              //Protect against a bad location being requested.
+              if (potentialHosts == null || potentialHosts.length == 0) {
+                potentialHosts = null;
+              }
+            } else {
+              // preempt on any host.
+              potentialHosts = null;
+            }
+
+            if (potentialHosts != null) {
+              // Preempt on specific host
+              boolean shouldPreempt = true;
+              for (String host : potentialHosts) {
+                // Preempt only if there are not pending preemptions on the 
same host
+                // When the premption registers, the request at the highest 
priority will be given the slot,
+                // even if the initial request was for some other task.
+                // TODO Maybe register which task the preemption was for, to 
avoid a bad non-local allocation.
+                MutableInt pendingHostPreemptions = 
pendingPreemptionsPerHost.get(host);
+                if (pendingHostPreemptions != null && 
pendingHostPreemptions.intValue() > 0) {
+                  shouldPreempt = false;
+                  break;
+                }
+              }
+              if (shouldPreempt) {
+                LOG.info("Attempting to preempt for {}, pendingPreemptions={} 
on hosts={}",
+                    taskInfo.task, pendingPreemptions.get(), 
Arrays.toString(potentialHosts));
+                preemptTasks(entry.getKey().getPriority(), 1, potentialHosts);
+              }
+            } else {
+              // Request for a preemption if there's none pending. If a single 
preemption is pending,
+              // and this is the next task to be assigned, it will be assigned 
once that slot becomes available.
+              if (pendingPreemptions.get() == 0) {
+                LOG.info("Attempting to preempt for {}, pendingPreemptions={} 
on any host",
+                    taskInfo.task, pendingPreemptions.get());
+                preemptTasks(entry.getKey().getPriority(), 1, null);
+              }
+            }
+            // Since there was an allocation failure - don't try assigning 
tasks at the next priority.
+
+            scheduledAllAtPriority = false;
+            // Don't break if this allocation failure was a result of a 
LOCALITY_DELAY. Others could still be allocated.
+            if (scheduleResult != ScheduleResult.DELAYED_LOCALITY) {
+              break;
+            }
+          } // end of else - i.e. could not allocate
+        } // end of loop over pending tasks
+        if (taskListAtPriority.isEmpty()) {
+          // Remove the entry, if there's nothing left at the specific 
priority level
+          pendingIterator.remove();
+        }
+        if (!scheduledAllAtPriority) {
+          // Don't attempt scheduling for additional priorities
+          break;
+        }
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+
+  private ScheduleResult scheduleTask(TaskInfo taskInfo) {
+    SelectHostResult selectHostResult = selectHost(taskInfo);
+    if (selectHostResult.scheduleResult == ScheduleResult.SCHEDULED) {
+      NodeServiceInstancePair nsPair = 
selectHostResult.nodeServiceInstancePair;
+      Container container =
+          containerFactory.createContainer(resourcePerExecutor, 
taskInfo.priority,
+              nsPair.getServiceInstance().getHost(),
+              nsPair.getServiceInstance().getRpcPort());
+      writeLock.lock(); // While updating local structures
+      try {
+        LOG.info("Assigned task {} to container {}", taskInfo, 
container.getId());
+        dagStats.registerTaskAllocated(taskInfo.requestedHosts, 
taskInfo.requestedRacks,
+            nsPair.getServiceInstance().getHost());
+        taskInfo.setAssignmentInfo(nsPair.getServiceInstance(), 
container.getId(), clock.getTime());
+        registerRunningTask(taskInfo);
+        nsPair.getNodeInfo().registerTaskScheduled();
+      } finally {
+        writeLock.unlock();
+      }
+      getContext().taskAllocated(taskInfo.task, taskInfo.clientCookie, 
container);
+    }
+    return selectHostResult.scheduleResult;
+  }
+
+  // Removes tasks from the runningList and sends out a preempt request to the 
system.
+  // Subsequent tasks will be scheduled again once the de-allocate request for 
the preempted
+  // task is processed.
+  private void preemptTasks(int forPriority, int numTasksToPreempt, String 
[]potentialHosts) {
+    Set<String> preemptHosts;
+    if (potentialHosts == null) {
+      preemptHosts = null;
+    } else {
+      preemptHosts = Sets.newHashSet(potentialHosts);
+    }
+    writeLock.lock();
+    List<TaskInfo> preemptedTaskList = null;
+    try {
+      NavigableMap<Integer, TreeSet<TaskInfo>> orderedMap = 
runningTasks.descendingMap();
+      Iterator<Entry<Integer, TreeSet<TaskInfo>>> iterator = 
orderedMap.entrySet().iterator();
+      int preemptedCount = 0;
+      while (iterator.hasNext() && preemptedCount < numTasksToPreempt) {
+        Entry<Integer, TreeSet<TaskInfo>> entryAtPriority = iterator.next();
+        if (entryAtPriority.getKey() > forPriority) {
+          Iterator<TaskInfo> taskInfoIterator = 
entryAtPriority.getValue().iterator();
+          while (taskInfoIterator.hasNext() && preemptedCount < 
numTasksToPreempt) {
+            TaskInfo taskInfo = taskInfoIterator.next();
+            if (preemptHosts == null || 
preemptHosts.contains(taskInfo.assignedInstance.getHost())) {
+              // Candidate for preemption.
+              preemptedCount++;
+              LOG.info("preempting {} for task at priority {} with 
potentialHosts={}", taskInfo,
+                  forPriority, potentialHosts == null ? "" : 
Arrays.toString(potentialHosts));
+              taskInfo.setPreemptedInfo(clock.getTime());
+              if (preemptedTaskList == null) {
+                preemptedTaskList = new LinkedList<>();
+              }
+              
dagStats.registerTaskPreempted(taskInfo.assignedInstance.getHost());
+              preemptedTaskList.add(taskInfo);
+              registerPendingPreemption(taskInfo.assignedInstance.getHost());
+              // Remove from the runningTaskList
+              taskInfoIterator.remove();
+            }
+          }
+
+          // Remove entire priority level if it's been emptied.
+          if (entryAtPriority.getValue().isEmpty()) {
+            iterator.remove();
+          }
+        } else {
+          // No tasks qualify as preemptable
+          LOG.info("DBG: No tasks qualify as killable to schedule tasks at 
priority {}", forPriority);
+          break;
+        }
+      }
+    } finally {
+      writeLock.unlock();
+    }
+    // Send out the preempted request outside of the lock.
+    if (preemptedTaskList != null) {
+      for (TaskInfo taskInfo : preemptedTaskList) {
+        LOG.info("DBG: Preempting task {}", taskInfo);
+        getContext().preemptContainer(taskInfo.containerId);
+        // Preemption will finally be registered as a deallocateTask as a 
result of preemptContainer
+        // That resets preemption info and allows additional tasks to be 
pre-empted if required.
+      }
+    }
+    // The schedule loop will be triggered again when the deallocateTask 
request comes in for the
+    // preempted task.
+  }
+
+  private void registerPendingPreemption(String host) {
+    writeLock.lock();
+    try {
+      pendingPreemptions.incrementAndGet();
+      MutableInt val = pendingPreemptionsPerHost.get(host);
+      if (val == null) {
+        val = new MutableInt(1);
+        pendingPreemptionsPerHost.put(host, val);
+      }
+      val.increment();
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  private void unregisterPendingPreemption(String host) {
+    writeLock.lock();
+    try {
+      pendingPreemptions.decrementAndGet();
+      MutableInt val = pendingPreemptionsPerHost.get(host);
+      Preconditions.checkNotNull(val);
+      val.decrement();
+      // Not bothering with removing the entry. There's a limited number of 
hosts, and a good
+      // chance that the entry will make it back in when the AM is used for a 
long duration.
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  private class NodeEnablerCallable implements Callable<Void> {
+
+    private AtomicBoolean isShutdown = new AtomicBoolean(false);
+    private static final long REFRESH_INTERVAL = 10000l;
+    long nextPollInterval = REFRESH_INTERVAL;
+    long lastRefreshTime;
+
+    @Override
+    public Void call() {
+
+      lastRefreshTime = System.currentTimeMillis();
+      while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) {
+        try {
+          while (true) {
+            NodeInfo nodeInfo = disabledNodesQueue.poll(nextPollInterval, 
TimeUnit.MILLISECONDS);
+            if (nodeInfo != null) {
+              long currentTime = System.currentTimeMillis();
+              // A node became available. Enable the node and try scheduling.
+              reenableDisabledNode(nodeInfo);
+              trySchedulingPendingTasks();
+
+              nextPollInterval -= (currentTime - lastRefreshTime);
+            }
+
+            if (nextPollInterval < 0 || nodeInfo == null) {
+              // timeout expired. Reset the poll interval and refresh nodes.
+              nextPollInterval = REFRESH_INTERVAL;
+              lastRefreshTime = System.currentTimeMillis();
+              // TODO Get rid of this polling once we have notificaitons from 
the registry sub-system
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Refreshing instances based on poll interval");
+              }
+              refreshInstances();
+              scanForNodeChanges();
+            }
+          }
+        } catch (InterruptedException e) {
+          if (isShutdown.get()) {
+            LOG.info("NodeEnabler thread interrupted after shutdown");
+            break;
+          } else {
+            LOG.warn("NodeEnabler thread interrupted without being shutdown");
+            throw new RuntimeException("NodeEnabler thread interrupted without 
being shutdown", e);
+          }
+        }
+      }
+      return null;
+    }
+
+    // Call this first, then send in an interrupt to the thread.
+    public void shutdown() {
+      isShutdown.set(true);
+    }
+  }
+
+  private void trySchedulingPendingTasks() {
+    scheduleLock.lock();
+    try {
+      pendingScheduleInvodations.set(true);
+      scheduleCondition.signal();
+    } finally {
+      scheduleLock.unlock();
+    }
+  }
+
+  private class SchedulerCallable implements Callable<Void> {
+    private AtomicBoolean isShutdown = new AtomicBoolean(false);
+
+    @Override
+    public Void call() {
+      while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) {
+        scheduleLock.lock();
+        try {
+          while (!pendingScheduleInvodations.get()) {
+            scheduleCondition.await();
+          }
+        } catch (InterruptedException e) {
+          if (isShutdown.get()) {
+            LOG.info("Scheduler thread interrupted after shutdown");
+            break;
+          } else {
+            LOG.warn("Scheduler thread interrupted without being shutdown");
+            throw new RuntimeException("Scheduler thread interrupted without 
being shutdown", e);
+          }
+        } finally {
+          scheduleLock.unlock();
+        }
+
+        // Set pending to false since scheduling is about to run. Any triggers 
up to this point
+        // will be handled in the next run.
+        // A new request may come in right after this is set to false, but 
before the actual scheduling.
+        // This will be handled in this run, but will cause an immediate run 
after, which is harmless.
+        pendingScheduleInvodations.set(false);
+        // Schedule outside of the scheduleLock - which should only be used to 
wait on the condition.
+        schedulePendingTasks();
+      }
+      return null;
+    }
+
+    // Call this first, then send in an interrupt to the thread.
+    public void shutdown() {
+      isShutdown.set(true);
+    }
+  }
+
+  @VisibleForTesting
+  static class NodeInfo implements Delayed {
+    private final NodeBlacklistConf blacklistConf;
+    final ServiceInstance serviceInstance;
+    private final Clock clock;
+
+    long expireTimeMillis = -1;
+    private long numSuccessfulTasks = 0;
+    private long numSuccessfulTasksAtLastBlacklist = -1;
+    float cumulativeBackoffFactor = 1.0f;
+
+    // Indicates whether a node had a recent communication failure.
+    private boolean hadCommFailure = false;
+
+    // Indicates whether a node is disabled - for whatever reason - 
commFailure, busy, etc.
+    private boolean disabled = false;
+
+    private int numPreemptedTasks = 0;
+    private int numScheduledTasks = 0;
+    private final int numSchedulableTasks;
+
+
+    /**
+     * Create a NodeInfo bound to a service instance
+     *
+     * @param serviceInstance         the associated serviceInstance
+     * @param blacklistConf           blacklist configuration
+     * @param clock                   clock to use to obtain timing information
+     * @param numSchedulableTasksConf number of schedulable tasks on the node. 
0 represents auto
+     *                                detect based on the serviceInstance, -1 
indicates indicates
+     *                                unlimited capacity
+     */
+    NodeInfo(ServiceInstance serviceInstance, NodeBlacklistConf blacklistConf, 
Clock clock, int numSchedulableTasksConf) {
+      Preconditions.checkArgument(numSchedulableTasksConf >= -1, 
"NumSchedulableTasks must be >=-1");
+      this.serviceInstance = serviceInstance;
+      this.blacklistConf = blacklistConf;
+      this.clock = clock;
+
+      if (numSchedulableTasksConf == 0) {
+        int pendingQueueuCapacity = 0;
+        String pendingQueueCapacityString = serviceInstance.getProperties()
+            .get(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.varname);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Setting up node: " + serviceInstance + ", with available 
capacity=" +
+              serviceInstance.getResource().getVirtualCores() + ", 
pendingQueueCapacity=" +
+              pendingQueueCapacityString);
+        }
+
+        if (pendingQueueCapacityString != null) {
+          pendingQueueuCapacity = Integer.parseInt(pendingQueueCapacityString);
+        }
+        this.numSchedulableTasks = 
serviceInstance.getResource().getVirtualCores() + pendingQueueuCapacity;
+      } else {
+        this.numSchedulableTasks = numSchedulableTasksConf;
+      }
+      LOG.info("Setting up node: " + serviceInstance + " with 
schedulableCapacity=" + this.numSchedulableTasks);
+    }
+
+    ServiceInstance getServiceInstance() {
+      return serviceInstance;
+    }
+
+    void enableNode() {
+      expireTimeMillis = -1;
+      disabled = false;
+      hadCommFailure = false;
+    }
+
+    void disableNode(boolean commFailure) {
+      long duration = blacklistConf.minDelay;
+      long currentTime = clock.getTime();
+      this.hadCommFailure = commFailure;
+      disabled = true;
+      if (numSuccessfulTasksAtLastBlacklist == numSuccessfulTasks) {
+        // Relying on a task succeeding to reset the exponent.
+        // There's no notifications on whether a task gets accepted or not. 
That would be ideal to
+        // reset this.
+        cumulativeBackoffFactor = cumulativeBackoffFactor * 
blacklistConf.backoffFactor;
+      } else {
+        // Was able to execute something before the last blacklist. Reset the 
exponent.
+        cumulativeBackoffFactor = 1.0f;
+      }
+
+      long delayTime = (long) (duration * cumulativeBackoffFactor);
+      if (delayTime > blacklistConf.maxDelay) {
+        delayTime = blacklistConf.maxDelay;
+      }
+      if (LOG.isInfoEnabled()) {
+        LOG.info("Disabling instance " + serviceInstance + " for " + delayTime 
+ " milli-seconds");
+      }
+      expireTimeMillis = currentTime + delayTime;
+      numSuccessfulTasksAtLastBlacklist = numSuccessfulTasks;
+    }
+
+    void registerTaskScheduled() {
+      numScheduledTasks++;
+    }
+
+    void registerTaskSuccess() {
+      numSuccessfulTasks++;
+      numScheduledTasks--;
+    }
+
+    void registerUnsuccessfulTaskEnd(boolean wasPreempted) {
+      numScheduledTasks--;
+      if (wasPreempted) {
+        numPreemptedTasks++;
+      }
+    }
+
+    public boolean isDisabled() {
+      return disabled;
+    }
+
+    public boolean hadCommFailure() {
+      return hadCommFailure;
+    }
+    /* Returning true does not guarantee that the task will run, considering 
other queries
+    may be running in the system. Also depends upon the capacity usage 
configuration
+     */
+    public boolean canAcceptTask() {
+      boolean result = !hadCommFailure && !disabled && 
serviceInstance.isAlive()
+          &&(numSchedulableTasks == -1 || ((numSchedulableTasks - 
numScheduledTasks) > 0));
+      LOG.info("canAcceptTask={}, numScheduledTasks={}, 
numSchedulableTasks={}, hadCommFailure={}, disabled={}, 
serviceInstance.isAlive={}", result, numScheduledTasks, numSchedulableTasks, 
hadCommFailure, disabled, serviceInstance.isAlive());
+      return result;
+    }
+
+    @Override
+    public long getDelay(TimeUnit unit) {
+      return unit.convert(expireTimeMillis - clock.getTime(), 
TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public int compareTo(Delayed o) {
+      NodeInfo other = (NodeInfo) o;
+      if (other.expireTimeMillis > this.expireTimeMillis) {
+        return -1;
+      } else if (other.expireTimeMillis < this.expireTimeMillis) {
+        return 1;
+      } else {
+        return 0;
+      }
+    }
+
+    @Override
+    public String toString() {
+      return "NodeInfo{" + "instance=" + serviceInstance
+          + ", expireTimeMillis=" + expireTimeMillis + ", numSuccessfulTasks=" 
+ numSuccessfulTasks
+          + ", numSuccessfulTasksAtLastBlacklist=" + 
numSuccessfulTasksAtLastBlacklist
+          + ", cumulativeBackoffFactor=" + cumulativeBackoffFactor
+          + ", numSchedulableTasks=" + numSchedulableTasks
+          + ", numScheduledTasks=" + numScheduledTasks
+          + ", disabled=" + disabled
+          + ", commFailures=" + hadCommFailure
+          +'}';
+    }
+  }
+
+  @VisibleForTesting
+  static class StatsPerDag {
+    int numRequestedAllocations = 0;
+    int numRequestsWithLocation = 0;
+    int numRequestsWithoutLocation = 0;
+    int numTotalAllocations = 0;
+    int numLocalAllocations = 0;
+    int numNonLocalAllocations = 0;
+    int numAllocationsNoLocalityRequest = 0;
+    int numRejectedTasks = 0;
+    int numCommFailures = 0;
+    int numDelayedAllocations = 0;
+    int numPreemptedTasks = 0;
+    Map<String, AtomicInteger> localityBasedNumAllocationsPerHost = new 
HashMap<>();
+    Map<String, AtomicInteger> numAllocationsPerHost = new HashMap<>();
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("NumPreemptedTasks=").append(numPreemptedTasks).append(", ");
+      
sb.append("NumRequestedAllocations=").append(numRequestedAllocations).append(", 
");
+      
sb.append("NumRequestsWithlocation=").append(numRequestsWithLocation).append(", 
");
+      
sb.append("NumLocalAllocations=").append(numLocalAllocations).append(",");
+      
sb.append("NumNonLocalAllocations=").append(numNonLocalAllocations).append(",");
+      
sb.append("NumTotalAllocations=").append(numTotalAllocations).append(",");
+      
sb.append("NumRequestsWithoutLocation=").append(numRequestsWithoutLocation).append(",
 ");
+      sb.append("NumRejectedTasks=").append(numRejectedTasks).append(", ");
+      sb.append("NumCommFailures=").append(numCommFailures).append(", ");
+      
sb.append("NumDelayedAllocations=").append(numDelayedAllocations).append(", ");
+      
sb.append("LocalityBasedAllocationsPerHost=").append(localityBasedNumAllocationsPerHost)
+          .append(", ");
+      sb.append("NumAllocationsPerHost=").append(numAllocationsPerHost);
+      return sb.toString();
+    }
+
+    void registerTaskRequest(String[] requestedHosts, String[] requestedRacks) 
{
+      numRequestedAllocations++;
+      // TODO Change after HIVE-9987. For now, there's no rack matching.
+      if (requestedHosts != null && requestedHosts.length != 0) {
+        numRequestsWithLocation++;
+      } else {
+        numRequestsWithoutLocation++;
+      }
+    }
+
+    void registerTaskAllocated(String[] requestedHosts, String[] 
requestedRacks,
+        String allocatedHost) {
+      // TODO Change after HIVE-9987. For now, there's no rack matching.
+      if (requestedHosts != null && requestedHosts.length != 0) {
+        Set<String> requestedHostSet = new 
HashSet<>(Arrays.asList(requestedHosts));
+        if (requestedHostSet.contains(allocatedHost)) {
+          numLocalAllocations++;
+          _registerAllocationInHostMap(allocatedHost, 
localityBasedNumAllocationsPerHost);
+        } else {
+          numNonLocalAllocations++;
+        }
+      } else {
+        numAllocationsNoLocalityRequest++;
+      }
+      numTotalAllocations++;
+      _registerAllocationInHostMap(allocatedHost, numAllocationsPerHost);
+    }
+
+    void registerTaskPreempted(String host) {
+      numPreemptedTasks++;
+    }
+
+    void registerCommFailure(String host) {
+      numCommFailures++;
+    }
+
+    void registerTaskRejected(String host) {
+      numRejectedTasks++;
+    }
+
+    void registerDelayedAllocation() {
+      numDelayedAllocations++;
+    }
+
+    private void _registerAllocationInHostMap(String host, Map<String, 
AtomicInteger> hostMap) {
+      AtomicInteger val = hostMap.get(host);
+      if (val == null) {
+        val = new AtomicInteger(0);
+        hostMap.put(host, val);
+      }
+      val.incrementAndGet();
+    }
+  }
+
+  private static class TaskInfo {
+    // IDs used to ensure two TaskInfos are different without using the 
underlying task instance.
+    // Required for insertion into a TreeMap
+    static final AtomicLong ID_GEN = new AtomicLong(0);
+    final long uniqueId;
+    final Object task;
+    final Object clientCookie;
+    final Priority priority;
+    final Resource capability;
+    final String[] requestedHosts;
+    final String[] requestedRacks;
+    final long requestTime;
+    long startTime;
+    long preemptTime;
+    ContainerId containerId;
+    ServiceInstance assignedInstance;
+    private boolean assigned = false;
+    private boolean preempted = false;
+
+    private int numAssignAttempts = 0;
+
+    // TaskInfo instances for two different tasks will not be the same. Only a 
single instance should
+    // ever be created for a taskAttempt
+    public TaskInfo(Object task, Object clientCookie, Priority priority, 
Resource capability,
+        String[] hosts, String[] racks, long requestTime) {
+      this.task = task;
+      this.clientCookie = clientCookie;
+      this.priority = priority;
+      this.capability = capability;
+      this.requestedHosts = hosts;
+      this.requestedRacks = racks;
+      this.requestTime = requestTime;
+      this.uniqueId = ID_GEN.getAndIncrement();
+    }
+
+    void setAssignmentInfo(ServiceInstance instance, ContainerId containerId, 
long startTime) {
+      this.assignedInstance = instance;
+        this.containerId = containerId;
+      this.startTime = startTime;
+      assigned = true;
+    }
+
+    void setPreemptedInfo(long preemptTime) {
+      this.preempted = true;
+      this.assigned = false;
+      this.preemptTime = preemptTime;
+    }
+
+    void triedAssigningTask() {
+      numAssignAttempts++;
+    }
+
+    int getNumPreviousAssignAttempts() {
+      return numAssignAttempts;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      TaskInfo taskInfo = (TaskInfo) o;
+
+      if (uniqueId != taskInfo.uniqueId) {
+        return false;
+      }
+      return task.equals(taskInfo.task);
+
+    }
+
+    @Override
+    public int hashCode() {
+      int result = (int) (uniqueId ^ (uniqueId >>> 32));
+      result = 31 * result + task.hashCode();
+      return result;
+    }
+
+    @Override
+    public String toString() {
+      return "TaskInfo{" +
+          "task=" + task +
+          ", priority=" + priority +
+          ", startTime=" + startTime +
+          ", containerId=" + containerId +
+          ", assignedInstance=" + assignedInstance +
+          ", uniqueId=" + uniqueId +
+          '}';
+    }
+  }
+
+  // Newer tasks first.
+  private static class TaskStartComparator implements Comparator<TaskInfo> {
+
+    @Override
+    public int compare(TaskInfo o1, TaskInfo o2) {
+      if (o1.startTime > o2.startTime) {
+        return -1;
+      } else if (o1.startTime < o2.startTime) {
+        return 1;
+      } else {
+        // Comparing on time is not sufficient since two may be created at the 
same time,
+        // in which case inserting into a TreeSet/Map would break
+        if (o1.uniqueId > o2.uniqueId) {
+          return -1;
+        } else if (o1.uniqueId < o2.uniqueId) {
+          return 1;
+        } else {
+          return 0;
+        }
+      }
+    }
+  }
+
+  private static class SelectHostResult {
+    final NodeServiceInstancePair nodeServiceInstancePair;
+    final ScheduleResult scheduleResult;
+
+    SelectHostResult(ServiceInstance serviceInstance, NodeInfo nodeInfo) {
+      this.nodeServiceInstancePair = new 
NodeServiceInstancePair(serviceInstance, nodeInfo);
+      this.scheduleResult = ScheduleResult.SCHEDULED;
+    }
+
+    SelectHostResult(ScheduleResult scheduleResult) {
+      this.nodeServiceInstancePair = null;
+      this.scheduleResult = scheduleResult;
+    }
+  }
+
+  private static final SelectHostResult 
SELECT_HOST_RESULT_INADEQUATE_TOTAL_CAPACITY =
+      new SelectHostResult(ScheduleResult.INADEQUATE_TOTAL_RESOURCES);
+  private static final SelectHostResult SELECT_HOST_RESULT_DELAYED_LOCALITY =
+      new SelectHostResult(ScheduleResult.DELAYED_LOCALITY);
+  private static final SelectHostResult SELECT_HOST_RESULT_DELAYED_RESOURCES =
+      new SelectHostResult(ScheduleResult.DELAYED_RESOURCES);
+
+  private static class NodeServiceInstancePair {
+    final NodeInfo nodeInfo;
+    final ServiceInstance serviceInstance;
+
+    private NodeServiceInstancePair(ServiceInstance serviceInstance, NodeInfo 
nodeInfo) {
+      this.serviceInstance = serviceInstance;
+      this.nodeInfo = nodeInfo;
+    }
+
+    public ServiceInstance getServiceInstance() {
+      return serviceInstance;
+    }
+
+    public NodeInfo getNodeInfo() {
+      return nodeInfo;
+    }
+  }
+
+  private static final class NodeBlacklistConf {
+    private final long minDelay;
+    private final long maxDelay;
+    private final float backoffFactor;
+
+    public NodeBlacklistConf(long minDelay, long maxDelay, float 
backoffFactor) {
+      this.minDelay = minDelay;
+      this.maxDelay = maxDelay;
+      this.backoffFactor = backoffFactor;
+    }
+
+    @Override
+    public String toString() {
+      return "NodeBlacklistConf{" +
+          "minDelay=" + minDelay +
+          ", maxDelay=" + maxDelay +
+          ", backoffFactor=" + backoffFactor +
+          '}';
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java
----------------------------------------------------------------------
diff --git 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java
new file mode 100644
index 0000000..2c3e53c
--- /dev/null
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.tezplugins;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.mapreduce.input.MultiMRInput;
+
+@InterfaceAudience.Private
+public class LlapTezUtils {
+  public static boolean isSourceOfInterest(String inputClassName) {
+    // MRInput is not of interest since it'll always be ready.
+    return !(inputClassName.equals(MRInputLegacy.class.getName()) || 
inputClassName.equals(
+        MultiMRInput.class.getName()) || 
inputClassName.equals(MRInput.class.getName()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapUmbilicalPolicyProvider.java
----------------------------------------------------------------------
diff --git 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapUmbilicalPolicyProvider.java
 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapUmbilicalPolicyProvider.java
new file mode 100644
index 0000000..4102f5b
--- /dev/null
+++ 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapUmbilicalPolicyProvider.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.tezplugins;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
+import org.apache.hadoop.security.authorize.Service;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.app.security.authorize.TezAMPolicyProvider;
+
+public class LlapUmbilicalPolicyProvider extends TezAMPolicyProvider {
+
+  private static Service[] services;
+  private static final Object servicesLock = new Object();
+
+  @Override
+  public Service[] getServices() {
+    if (services != null) return services;
+    synchronized (servicesLock) {
+      if (services != null) return services;
+      Service[] parentSvc = super.getServices();
+      Service[] result = Arrays.copyOf(parentSvc, parentSvc.length + 1);
+      result[parentSvc.length] =  new Service(
+          TezConstants.TEZ_AM_SECURITY_SERVICE_AUTHORIZATION_TASK_UMBILICAL,
+          LlapTaskUmbilicalProtocol.class);
+      return (services = result);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
----------------------------------------------------------------------
diff --git 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
new file mode 100644
index 0000000..d8f7574
--- /dev/null
+++ 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.tezplugins.helpers;
+
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.MutableInt;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto;
+import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
+import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.llap.LlapNodeId;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
+import org.apache.hadoop.hive.llap.tez.Converters;
+import org.apache.hadoop.hive.llap.tezplugins.LlapTaskCommunicator;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.runtime.api.impl.InputSpec;
+
+public class SourceStateTracker {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SourceStateTracker.class);
+
+  private final TaskCommunicatorContext taskCommunicatorContext;
+  private final LlapTaskCommunicator taskCommunicator;
+
+  private final QueryIdentifierProto BASE_QUERY_IDENTIFIER;
+
+  // Tracks vertices for which notifications have been registered
+  private final Set<String> notificationRegisteredVertices = new HashSet<>();
+
+  private final Map<String, SourceInfo> sourceInfoMap = new HashMap<>();
+  private final Map<LlapNodeId, NodeInfo> nodeInfoMap = new HashMap<>();
+
+  private volatile QueryIdentifierProto currentQueryIdentifier;
+
+  public SourceStateTracker(TaskCommunicatorContext taskCommunicatorContext,
+                            LlapTaskCommunicator taskCommunicator) {
+    this.taskCommunicatorContext = taskCommunicatorContext;
+    this.taskCommunicator = taskCommunicator;
+    BASE_QUERY_IDENTIFIER = QueryIdentifierProto.newBuilder()
+        
.setAppIdentifier(taskCommunicatorContext.getCurrentAppIdentifier()).build();
+  }
+
+  /**
+   * To be invoked after each DAG completes.
+   */
+  public synchronized void resetState(int newDagId) {
+    sourceInfoMap.clear();
+    nodeInfoMap.clear();
+    notificationRegisteredVertices.clear();
+    this.currentQueryIdentifier =
+        
QueryIdentifierProto.newBuilder(BASE_QUERY_IDENTIFIER).setDagIdentifier(newDagId).build();
+  }
+
+  /**
+   * Used to register a task for state updates. Effectively registers for 
state updates to go to the specific node.
+   * @param host
+   * @param port
+   * @param inputSpecList
+   */
+  public synchronized void registerTaskForStateUpdates(String host, int port,
+                                                       List<InputSpec> 
inputSpecList) {
+
+    // Add tracking information. Check if source state already known and send 
out an update if it is.
+
+    List<String> sourcesOfInterest = getSourceInterestList(inputSpecList);
+    if (sourcesOfInterest != null && !sourcesOfInterest.isEmpty()) {
+      LlapNodeId nodeId = LlapNodeId.getInstance(host, port);
+      NodeInfo nodeInfo = getNodeInfo(nodeId);
+
+      // Set up the data structures, before any notifications come in.
+      for (String src : sourcesOfInterest) {
+        VertexState oldStateForNode = nodeInfo.getLastKnownStateForSource(src);
+        if (oldStateForNode == null) {
+          // Not registered for this node.
+          // Register and send state if it is successful.
+          SourceInfo srcInfo = getSourceInfo(src);
+          srcInfo.addNode(nodeId);
+
+          nodeInfo.addSource(src, srcInfo.lastKnownState);
+          if (srcInfo.lastKnownState == VertexState.SUCCEEDED) {
+            sendStateUpdateToNode(nodeId, src, srcInfo.lastKnownState);
+          }
+
+        } else {
+          // Already registered to send updates to this node for the specific 
source.
+          // Nothing to do for now, unless tracking tasks at a later point.
+        }
+
+        // Setup for actual notifications, if not already done for a previous 
task.
+        maybeRegisterForVertexUpdates(src);
+      }
+    } else {
+      // Don't need to track anything for this task. No new notifications, etc.
+    }
+  }
+
+  /**
+   * Handled notifications on state updates for sources
+   * @param sourceName
+   * @param sourceState
+   */
+  public synchronized void sourceStateUpdated(String sourceName, VertexState 
sourceState) {
+    SourceInfo sourceInfo = getSourceInfo(sourceName);
+    // Update source info if the state is SUCCEEDED
+    if (sourceState == VertexState.SUCCEEDED) {
+      sourceInfo.numCompletedTasks = 
taskCommunicatorContext.getVertexCompletedTaskCount(sourceName);
+      sourceInfo.numTasks = 
taskCommunicatorContext.getVertexTotalTaskCount(sourceName);
+    }
+    sourceInfo.lastKnownState = sourceState;
+    // Checking state per node for future failure handling scenarios, where an 
update
+    // to a single node may fail.
+    for (LlapNodeId nodeId : sourceInfo.getInterestedNodes()) {
+      NodeInfo nodeInfo = nodeInfoMap.get(nodeId);
+      VertexState lastStateForNode = 
nodeInfo.getLastKnownStateForSource(sourceName);
+      // Send only if the state has changed.
+      if (lastStateForNode != sourceState) {
+        nodeInfo.setLastKnownStateForSource(sourceName, sourceState);
+        sendStateUpdateToNode(nodeId, sourceName, sourceState);
+      }
+    }
+  }
+
+
+  // Assumes serialized DAGs within an AM, and a reset of structures after 
each DAG completes.
+  /**
+   * Constructs FragmentRuntimeInfo for scheduling within LLAP daemons.
+   * Also caches state based on state updates.
+   * @param vertexName
+   * @param fragmentNumber
+   * @param priority
+   * @return
+   */
+  public synchronized FragmentRuntimeInfo getFragmentRuntimeInfo(String 
vertexName, int fragmentNumber,
+                                                                 int priority) 
{
+    FragmentRuntimeInfo.Builder builder = FragmentRuntimeInfo.newBuilder();
+    maybeRegisterForVertexUpdates(vertexName);
+
+    MutableInt totalTaskCount = new MutableInt(0);
+    MutableInt completedTaskCount = new MutableInt(0);
+    computeUpstreamTaskCounts(completedTaskCount, totalTaskCount, vertexName);
+
+    builder.setNumSelfAndUpstreamCompletedTasks(completedTaskCount.intValue());
+    builder.setNumSelfAndUpstreamTasks(totalTaskCount.intValue());
+    builder.setDagStartTime(taskCommunicatorContext.getDagStartTime());
+    builder.setWithinDagPriority(priority);
+    
builder.setFirstAttemptStartTime(taskCommunicatorContext.getFirstAttemptStartTime(vertexName,
 fragmentNumber));
+    builder.setCurrentAttemptStartTime(System.currentTimeMillis());
+    return builder.build();
+  }
+
+  private void computeUpstreamTaskCounts(MutableInt completedTaskCount, 
MutableInt totalTaskCount, String sourceName) {
+    SourceInfo sourceInfo = getSourceInfo(sourceName);
+    if (sourceInfo.lastKnownState == VertexState.SUCCEEDED) {
+      // Some of the information in the source is complete. Don't need to 
fetch it from the context.
+      completedTaskCount.add(sourceInfo.numCompletedTasks);
+      totalTaskCount.add(sourceInfo.numTasks);
+    } else {
+      
completedTaskCount.add(taskCommunicatorContext.getVertexCompletedTaskCount(sourceName));
+      int totalCount 
=taskCommunicatorContext.getVertexTotalTaskCount(sourceName);
+      // Uninitialized vertices will report count as 0.
+      totalCount = totalCount == -1 ? 0 : totalCount;
+      totalTaskCount.add(totalCount);
+    }
+
+    // Walk through all the source vertices
+    for (String up : taskCommunicatorContext.getInputVertexNames(sourceName)) {
+      computeUpstreamTaskCounts(completedTaskCount, totalTaskCount, up);
+    }
+  }
+
+  private static class SourceInfo {
+
+    // Always start in the running state. Requests for state updates will be 
sent out after registration.
+    private VertexState lastKnownState = VertexState.RUNNING;
+
+    // Used for sending notifications about a vertex completed. For canFinish
+    // Can be converted to a Tez event, if this is sufficient to decide on 
pre-emption
+    private final List<LlapNodeId> interestedNodes = new LinkedList<>();
+
+    // Used for sending information for scheduling priority.
+    private int numTasks;
+    private int numCompletedTasks;
+
+    void addNode(LlapNodeId nodeId) {
+      interestedNodes.add(nodeId);
+    }
+
+    List<LlapNodeId> getInterestedNodes() {
+      return this.interestedNodes;
+    }
+
+
+
+  }
+
+  private synchronized SourceInfo getSourceInfo(String srcName) {
+    SourceInfo sourceInfo = sourceInfoMap.get(srcName);
+    if (sourceInfo == null) {
+      sourceInfo = new SourceInfo();
+      sourceInfoMap.put(srcName, sourceInfo);
+    }
+    return sourceInfo;
+  }
+
+
+  private static class NodeInfo {
+    private final Map<String, VertexState> sourcesOfInterest = new HashMap<>();
+
+    void addSource(String srcName, VertexState sourceState) {
+      sourcesOfInterest.put(srcName, sourceState);
+    }
+
+    VertexState getLastKnownStateForSource(String src) {
+      return sourcesOfInterest.get(src);
+    }
+
+    void setLastKnownStateForSource(String src, VertexState state) {
+      sourcesOfInterest.put(src, state);
+    }
+  }
+
+  private synchronized NodeInfo getNodeInfo(LlapNodeId llapNodeId) {
+    NodeInfo nodeInfo = nodeInfoMap.get(llapNodeId);
+    if (nodeInfo == null) {
+      nodeInfo = new NodeInfo();
+      nodeInfoMap.put(llapNodeId, nodeInfo);
+    }
+    return nodeInfo;
+  }
+
+
+  private List<String> getSourceInterestList(List<InputSpec> inputSpecList) {
+    List<String> sourcesOfInterest = Collections.emptyList();
+    if (inputSpecList != null) {
+      boolean alreadyFound = false;
+      for (InputSpec inputSpec : inputSpecList) {
+        if 
(LlapTezUtils.isSourceOfInterest(inputSpec.getInputDescriptor().getClassName()))
 {
+          if (!alreadyFound) {
+            alreadyFound = true;
+            sourcesOfInterest = new LinkedList<>();
+          }
+          sourcesOfInterest.add(inputSpec.getSourceVertexName());
+        }
+      }
+    }
+    return sourcesOfInterest;
+  }
+
+
+  private void maybeRegisterForVertexUpdates(String sourceName) {
+    if (!notificationRegisteredVertices.contains(sourceName)) {
+      notificationRegisteredVertices.add(sourceName);
+      taskCommunicatorContext.registerForVertexStateUpdates(sourceName, 
EnumSet.of(
+          VertexState.RUNNING, VertexState.SUCCEEDED));
+    }
+  }
+
+
+
+  void sendStateUpdateToNode(LlapNodeId nodeId, String sourceName, VertexState 
state) {
+    taskCommunicator.sendStateUpdate(nodeId.getHostname(), nodeId.getPort(),
+        
SourceStateUpdatedRequestProto.newBuilder().setQueryIdentifier(currentQueryIdentifier)
+            
.setSrcName(sourceName).setState(Converters.fromVertexState(state)).build());
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git 
a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
 
b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
new file mode 100644
index 0000000..8f3d104
--- /dev/null
+++ 
b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.tezplugins;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+import org.apache.hadoop.hive.llap.LlapNodeId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.junit.Test;
+
+public class TestLlapTaskCommunicator {
+
+  @Test (timeout = 5000)
+  public void testEntityTracker1() {
+    LlapTaskCommunicator.EntityTracker entityTracker = new 
LlapTaskCommunicator.EntityTracker();
+
+    String host1 = "host1";
+    String host2 = "host2";
+    String host3 = "host3";
+    int port = 1451;
+
+
+    // Simple container registration and un-registration without any task 
attempt being involved.
+    ContainerId containerId101 = constructContainerId(101);
+    entityTracker.registerContainer(containerId101, host1, port);
+    assertEquals(LlapNodeId.getInstance(host1, port), 
entityTracker.getNodeIdForContainer(containerId101));
+
+    entityTracker.unregisterContainer(containerId101);
+    
assertNull(entityTracker.getContainerAttemptMapForNode(LlapNodeId.getInstance(host1,
 port)));
+    assertNull(entityTracker.getNodeIdForContainer(containerId101));
+    assertEquals(0, entityTracker.nodeMap.size());
+    assertEquals(0, entityTracker.attemptToNodeMap.size());
+    assertEquals(0, entityTracker.containerToNodeMap.size());
+
+
+    // Simple task registration and un-registration.
+    ContainerId containerId1 = constructContainerId(1);
+    TezTaskAttemptID taskAttemptId1 = constructTaskAttemptId(1);
+    entityTracker.registerTaskAttempt(containerId1, taskAttemptId1, host1, 
port);
+    assertEquals(LlapNodeId.getInstance(host1, port), 
entityTracker.getNodeIdForContainer(containerId1));
+    assertEquals(LlapNodeId.getInstance(host1, port), 
entityTracker.getNodeIdForTaskAttempt(taskAttemptId1));
+
+    entityTracker.unregisterTaskAttempt(taskAttemptId1);
+    
assertNull(entityTracker.getContainerAttemptMapForNode(LlapNodeId.getInstance(host1,
 port)));
+    assertNull(entityTracker.getNodeIdForContainer(containerId1));
+    assertNull(entityTracker.getNodeIdForTaskAttempt(taskAttemptId1));
+    assertEquals(0, entityTracker.nodeMap.size());
+    assertEquals(0, entityTracker.attemptToNodeMap.size());
+    assertEquals(0, entityTracker.containerToNodeMap.size());
+
+    // Register taskAttempt, unregister container. TaskAttempt should also be 
unregistered
+    ContainerId containerId201 = constructContainerId(201);
+    TezTaskAttemptID taskAttemptId201 = constructTaskAttemptId(201);
+    entityTracker.registerTaskAttempt(containerId201, taskAttemptId201, host1, 
port);
+    assertEquals(LlapNodeId.getInstance(host1, port), 
entityTracker.getNodeIdForContainer(containerId201));
+    assertEquals(LlapNodeId.getInstance(host1, port), 
entityTracker.getNodeIdForTaskAttempt(taskAttemptId201));
+
+    entityTracker.unregisterContainer(containerId201);
+    
assertNull(entityTracker.getContainerAttemptMapForNode(LlapNodeId.getInstance(host1,
 port)));
+    assertNull(entityTracker.getNodeIdForContainer(containerId201));
+    assertNull(entityTracker.getNodeIdForTaskAttempt(taskAttemptId201));
+    assertEquals(0, entityTracker.nodeMap.size());
+    assertEquals(0, entityTracker.attemptToNodeMap.size());
+    assertEquals(0, entityTracker.containerToNodeMap.size());
+
+    entityTracker.unregisterTaskAttempt(taskAttemptId201); // No errors
+  }
+
+
+  private ContainerId constructContainerId(int id) {
+    ContainerId containerId = mock(ContainerId.class);
+    doReturn(id).when(containerId).getId();
+    doReturn((long)id).when(containerId).getContainerId();
+    return containerId;
+  }
+
+  private TezTaskAttemptID constructTaskAttemptId(int id) {
+    TezTaskAttemptID taskAttemptId = mock(TezTaskAttemptID.class);
+    doReturn(id).when(taskAttemptId).getId();
+    return taskAttemptId;
+  }
+
+}

Reply via email to