Repository: hive
Updated Branches:
  refs/heads/llap 598e39128 -> e36f1fc4c


HIVE-10424. LLAP: Factor known capacity into scheduling decisions. (Siddharth 
Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e36f1fc4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e36f1fc4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e36f1fc4

Branch: refs/heads/llap
Commit: e36f1fc4ca5d270019040b4ecff817c09908f717
Parents: 598e391
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Apr 22 18:20:09 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Apr 22 18:20:09 2015 -0700

----------------------------------------------------------------------
 .../llap/configuration/LlapConfiguration.java   |  38 +-
 .../registry/impl/LlapFixedRegistryImpl.java    |  11 +-
 .../dag/app/rm/LlapTaskSchedulerService.java    | 484 +++++++++++++------
 .../app/rm/TestLlapTaskSchedulerService.java    | 160 +++++-
 4 files changed, 531 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/e36f1fc4/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
index 8c5c3e4..f03c807 100644
--- 
a/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
@@ -29,6 +29,8 @@ public class LlapConfiguration extends Configuration {
   }
 
 
+  public static final String LLAP_PREFIX = "llap.";
+
   public static final String LLAP_DAEMON_PREFIX = "llap.daemon.";
   private static final String LLAP_DAEMON_SITE = "llap-daemon-site.xml";
 
@@ -74,12 +76,38 @@ public class LlapConfiguration extends Configuration {
   public static final int LLAP_DAEMON_COMMUNICATOR_NUM_THREADS_DEFAULT = 5;
 
   /**
-   * Time after which a previously disabled node will be re-enabled for 
scheduling. This may be
-   * modified by an exponential back-off if failures persist
+   * Minimum time after which a previously disabled node will be re-enabled 
for scheduling. This may
+   * be modified by an exponential back-off if failures persist
+   */
+  public static final String 
LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MILLIS =
+      LLAP_PREFIX + "task.scheduler.node.re-enable.min.timeout.ms";
+  public static final long 
LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MILLIS_DEFAULT = 200l;
+
+  /**
+   * Maximum time after which a previously disabled node will be re-enabled 
for scheduling. This may
+   * be modified by an exponential back-off if failures persist
+   */
+  public static final String 
LLAP_TASK_SCHEDULER_NODE_REENABLE_MAX_TIMEOUT_MILLIS =
+      LLAP_PREFIX + "task.scheduler.node.re-enable.max.timeout.ms";
+  public static final long 
LLAP_TASK_SCHEDULER_NODE_REENABLE_MAX_TIMEOUT_MILLIS_DEFAULT = 10000l;
+
+  /**
+   * Backoff factor on successive blacklists of a node. Blacklists timeouts 
start at the min timeout
+   * and go up to the max timeout based on this backoff factor
+   */
+  public static final String LLAP_TASK_SCHEDULER_NODE_DISABLE_BACK_OFF_FACTOR =
+      LLAP_PREFIX + "task.scheduler.node.disable.backoff.factor";
+  public static final float 
LLAP_TASK_SCHEDULER_NODE_DISABLE_BACK_OFF_FACTOR_DEFAULT = 1.5f;
+
+  /**
+   * The number of tasks the AM TaskScheduler will try allocating per node.
+   * 0 indicates that this should be picked up from the Registry.
+   * -1 indicates unlimited capacity
+   * >0 indicates a specific bound
    */
-  public static final String 
LLAP_DAEMON_TASK_SCHEDULER_NODE_REENABLE_TIMEOUT_MILLIS =
-      LLAP_DAEMON_PREFIX + "task.scheduler.node.re-enable.timeout.ms";
-  public static final long 
LLAP_DAEMON_TASK_SCHEDULER_NODE_REENABLE_TIMEOUT_MILLIS_DEFAULT = 2000l;
+  public static final String 
LLAP_TASK_SCHEDULER_NUM_SCHEDULABLE_TASKS_PER_NODE =
+      LLAP_PREFIX + "task.scheduler.num.schedulable.tasks.per.node";
+  public static final int 
LLAP_TASK_SCHEDULER_NUM_SCHEDULABLE_TASKS_PER_NODE_DEFAULT = 0;
 
   public static final String LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE =
       LLAP_DAEMON_PREFIX + "task.scheduler.wait.queue.size";

http://git-wip-us.apache.org/repos/asf/hive/blob/e36f1fc4/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java
index c600e74..cdc3930 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java
@@ -55,7 +55,8 @@ public class LlapFixedRegistryImpl implements ServiceRegistry 
{
 
     for (Map.Entry<String, String> kv : conf) {
       if (kv.getKey().startsWith(LlapConfiguration.LLAP_DAEMON_PREFIX)
-          || kv.getKey().startsWith("hive.llap.")) {
+          || kv.getKey().startsWith("hive.llap.")
+          || kv.getKey().startsWith(LlapConfiguration.LLAP_PREFIX)) {
         // TODO: read this somewhere useful, like the task scheduler
         srv.put(kv.getKey(), kv.getValue());
       }
@@ -152,6 +153,14 @@ public class LlapFixedRegistryImpl implements 
ServiceRegistry {
       return Resource.newInstance(memory, vcores);
     }
 
+    @Override
+    public String toString() {
+      return "FixedServiceInstance{" +
+          "host=" + host +
+          ", memory=" + memory +
+          ", vcores=" + vcores +
+          '}';
+    }
   }
 
   private final class FixedServiceInstanceSet implements ServiceInstanceSet {

http://git-wip-us.apache.org/repos/asf/hive/blob/e36f1fc4/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java 
b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
index 39ad552..3a827c3 100644
--- 
a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
+++ 
b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
@@ -39,6 +39,9 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
@@ -68,8 +71,6 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
 
   private static final Log LOG = 
LogFactory.getLog(LlapTaskSchedulerService.class);
 
-  private static final float BACKOFF_FACTOR = 1.2f;
-
   private final ExecutorService appCallbackExecutor;
   private final TaskSchedulerAppCallback appClientDelegate;
 
@@ -101,36 +102,39 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
   private final ContainerFactory containerFactory;
   private final Random random = new Random();
   private final Clock clock;
-  private final ListeningExecutorService executor;
+
+  private final ListeningExecutorService nodeEnabledExecutor;
+  private final NodeEnablerCallable nodeEnablerCallable =
+      new NodeEnablerCallable();
+
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
   private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
   private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
 
-  // TODO Track resources used by this application on specific hosts, and make 
scheduling decisions
-  // accordingly.
-  // Ideally implement in a way where updates from ZK, if they do come, can 
just be plugged in.
-  // A heap based on available capacity - which is updated each time stats are 
updated,
-  // or anytime assignment numbers are changed. Especially for random 
allocations (no host request).
-  // For non-random allocations - Walk through all pending tasks to get local 
assignments, then
-  // start assigning them to non local hosts.
-  // Also setup a max over-subscribe limit as part of this.
+  private final Lock scheduleLock = new ReentrantLock();
+  private final Condition scheduleCondition = scheduleLock.newCondition();
+  private final ListeningExecutorService schedulerExecutor;
+  private final SchedulerCallable schedulerCallable = new SchedulerCallable();
 
   private final AtomicBoolean isStopped = new AtomicBoolean(false);
 
-  private final long nodeReEnableTimeout;
+  private final NodeBlacklistConf nodeBlacklistConf;
 
   // Per daemon
   private final int memoryPerInstance;
   private final int coresPerInstance;
   private final int executorsPerInstance;
 
+  private final int numSchedulableTasksPerNode;
+
   // Per Executor Thread
   private final Resource resourcePerExecutor;
 
   private final LlapRegistryService registry = new LlapRegistryService();
-  private final PendingTaskSchedulerCallable pendingTaskSchedulerCallable =
-      new PendingTaskSchedulerCallable();
-  private ListenableFuture<Void> pendingTaskSchedulerFuture;
+
+
+  private volatile ListenableFuture<Void> nodeEnablerFuture;
+  private volatile ListenableFuture<Void> schedulerFuture;
 
   @VisibleForTesting
   private final AtomicInteger dagCounter = new AtomicInteger(1);
@@ -158,9 +162,17 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
     this.executorsPerInstance =
         conf.getInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS,
             LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS_DEFAULT);
-    this.nodeReEnableTimeout =
-        
conf.getLong(LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_NODE_REENABLE_TIMEOUT_MILLIS,
-            
LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_NODE_REENABLE_TIMEOUT_MILLIS_DEFAULT);
+    this.nodeBlacklistConf = new NodeBlacklistConf(
+        
conf.getLong(LlapConfiguration.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MILLIS,
+            
LlapConfiguration.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MILLIS_DEFAULT),
+        
conf.getLong(LlapConfiguration.LLAP_TASK_SCHEDULER_NODE_REENABLE_MAX_TIMEOUT_MILLIS,
+            
LlapConfiguration.LLAP_TASK_SCHEDULER_NODE_REENABLE_MAX_TIMEOUT_MILLIS_DEFAULT),
+        
conf.getFloat(LlapConfiguration.LLAP_TASK_SCHEDULER_NODE_DISABLE_BACK_OFF_FACTOR,
+            
LlapConfiguration.LLAP_TASK_SCHEDULER_NODE_DISABLE_BACK_OFF_FACTOR_DEFAULT));
+
+    this.numSchedulableTasksPerNode = conf.getInt(
+        LlapConfiguration.LLAP_TASK_SCHEDULER_NUM_SCHEDULABLE_TASKS_PER_NODE,
+        
LlapConfiguration.LLAP_TASK_SCHEDULER_NUM_SCHEDULABLE_TASKS_PER_NODE_DEFAULT);
 
     int memoryPerExecutor = (int) (memoryPerInstance / (float) 
executorsPerInstance);
     int coresPerExecutor = (int) (coresPerInstance / (float) 
executorsPerInstance);
@@ -171,16 +183,19 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
     Preconditions.checkNotNull(instanceId, 
LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS
         + " must be defined");
 
-    ExecutorService executorService =
+    ExecutorService executorServiceRaw =
         Executors.newFixedThreadPool(1,
-            new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapScheduler").build());
-    executor = MoreExecutors.listeningDecorator(executorService);
+            new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapSchedulerNodeEnabler").build());
+    nodeEnabledExecutor = MoreExecutors.listeningDecorator(executorServiceRaw);
+
+    ExecutorService schedulerExecutorServiceRaw = 
Executors.newFixedThreadPool(1,
+        new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapScheduler").build());
+    schedulerExecutor = 
MoreExecutors.listeningDecorator(schedulerExecutorServiceRaw);
 
     LOG.info("Running with configuration: " + "memoryPerInstance=" + 
memoryPerInstance
         + ", vCoresPerInstance=" + coresPerInstance + ", executorsPerInstance="
         + executorsPerInstance + ", resourcePerInstanceInferred=" + 
resourcePerExecutor
-        + ", nodeReEnableTimeout=" + nodeReEnableTimeout + ", 
nodeReEnableBackOffFactor="
-        + BACKOFF_FACTOR);
+        + ", nodeBlacklistConf=" + nodeBlacklistConf);
   }
 
   @Override
@@ -192,11 +207,12 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
   public void serviceStart() throws IOException {
     writeLock.lock();
     try {
-      pendingTaskSchedulerFuture = 
executor.submit(pendingTaskSchedulerCallable);
+      nodeEnablerFuture = nodeEnabledExecutor.submit(nodeEnablerCallable);
+      schedulerFuture = schedulerExecutor.submit(schedulerCallable);
       registry.start();
       activeInstances = registry.getInstances();
       for (ServiceInstance inst : activeInstances.getAll().values()) {
-        addNode(inst, new NodeInfo(inst, BACKOFF_FACTOR, clock));
+        addNode(inst, new NodeInfo(inst, nodeBlacklistConf, clock, 
numSchedulableTasksPerNode));
       }
     } finally {
       writeLock.unlock();
@@ -208,11 +224,18 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
     writeLock.lock();
     try {
       if (!this.isStopped.getAndSet(true)) {
-        pendingTaskSchedulerCallable.shutdown();
-        if (pendingTaskSchedulerFuture != null) {
-          pendingTaskSchedulerFuture.cancel(true);
+        nodeEnablerCallable.shutdown();
+        if (nodeEnablerFuture != null) {
+          nodeEnablerFuture.cancel(true);
         }
-        executor.shutdownNow();
+        nodeEnabledExecutor.shutdownNow();
+
+        schedulerCallable.shutdown();
+        if (schedulerFuture != null) {
+          schedulerFuture.cancel(true);
+        }
+        schedulerExecutor.shutdownNow();
+
         if (registry != null) {
           registry.stop();
         }
@@ -232,7 +255,7 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
       for (ServiceInstance inst : activeInstances.getAll().values()) {
         if (inst.isAlive()) {
           Resource r = inst.getResource();
-          LOG.info("Found instance " + inst + " with " + r);
+          LOG.info("Found instance " + inst);
           memory += r.getMemory();
           vcores += r.getVirtualCores();
         } else {
@@ -317,10 +340,8 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
     } finally {
       writeLock.unlock();
     }
-    boolean scheduled = scheduleTask(taskInfo);
-    if (!scheduled) {
-      addPendingTask(taskInfo);
-    }
+    addPendingTask(taskInfo);
+    trySchedulingPendingTasks();
   }
 
   @Override
@@ -336,10 +357,8 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
     } finally {
       writeLock.unlock();
     }
-    boolean scheduled = scheduleTask(taskInfo);
-    if (!scheduled) {
-      addPendingTask(taskInfo);
-    }
+    addPendingTask(taskInfo);
+    trySchedulingPendingTasks();
   }
 
   // This may be invoked before a container is ever assigned to a task. 
allocateTask... app decides
@@ -373,25 +392,39 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
       ServiceInstance assignedInstance = taskInfo.assignedInstance;
       assert assignedInstance != null;
 
+      NodeInfo nodeInfo = instanceToNodeMap.get(assignedInstance);
+      assert nodeInfo != null;
       if (taskSucceeded) {
         // The node may have been blacklisted at this point - which means it 
may not be in the
         // activeNodeList.
-        NodeInfo nodeInfo = instanceToNodeMap.get(assignedInstance);
-        assert nodeInfo != null;
+
         nodeInfo.registerTaskSuccess();
-        // TODO Consider un-blacklisting the node since at least 1 slot should 
have become available
-        // on the node.
-      } else if (!taskSucceeded
-          && endReason != null
-          && EnumSet
-              .of(TaskAttemptEndReason.SERVICE_BUSY, 
TaskAttemptEndReason.COMMUNICATION_ERROR)
-              .contains(endReason)) {
-        if (endReason == TaskAttemptEndReason.COMMUNICATION_ERROR) {
-          dagStats.registerCommFailure(taskInfo.assignedInstance.getHost());
-        } else if (endReason == TaskAttemptEndReason.SERVICE_BUSY) {
-          dagStats.registerTaskRejected(taskInfo.assignedInstance.getHost());
+
+        if (nodeInfo.isDisabled()) {
+          // Re-enable the node. If a task succeeded, a slot may have become 
available.
+          // Also reset commFailures since a task was able to communicate back 
and indicate success.
+          nodeInfo.enableNode();
+          // Re-insert into the queue to force the poll thread to remove the 
element.
+          if ( disabledNodesQueue.remove(nodeInfo)) {
+            disabledNodesQueue.add(nodeInfo);
+          }
         }
-        disableInstance(assignedInstance, endReason == 
TaskAttemptEndReason.SERVICE_BUSY);
+        // In case of success, trigger a scheduling run for pending tasks.
+        trySchedulingPendingTasks();
+
+      } else if (!taskSucceeded) {
+        nodeInfo.registerUnsuccessfulTaskEnd();
+        if (endReason != null && EnumSet
+            .of(TaskAttemptEndReason.SERVICE_BUSY, 
TaskAttemptEndReason.COMMUNICATION_ERROR)
+            .contains(endReason)) {
+          if (endReason == TaskAttemptEndReason.COMMUNICATION_ERROR) {
+            dagStats.registerCommFailure(taskInfo.assignedInstance.getHost());
+          } else if (endReason == TaskAttemptEndReason.SERVICE_BUSY) {
+            dagStats.registerTaskRejected(taskInfo.assignedInstance.getHost());
+          }
+        }
+        boolean commFailure = endReason != null && endReason == 
TaskAttemptEndReason.COMMUNICATION_ERROR;
+        disableInstance(assignedInstance, commFailure);
       }
     } finally {
       writeLock.unlock();
@@ -433,7 +466,7 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
    * @param request the list of preferred hosts. null implies any host
    * @return
    */
-  private ServiceInstance selectHost(TaskInfo request) {
+  private NodeServiceInstancePair selectHost(TaskInfo request) {
     String[] requestedHosts = request.requestedHosts;
     readLock.lock(); // Read-lock. Not updating any stats at the moment.
     try {
@@ -459,91 +492,81 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
           if (!instances.isEmpty()) {
             for (ServiceInstance inst : instances) {
               NodeInfo nodeInfo = instanceToNodeMap.get(inst);
-              if (inst.isAlive() && nodeInfo != null && 
!nodeInfo.isDisabled()) {
-                // TODO Change this to work off of what we think is remaining 
capacity for an
-                // instance
-                LOG.info(
-                    "Assigning " + inst + " when looking for " + host + ". 
FirstRequestedHost=" +
-                        (prefHostCount == 0));
-                return inst;
+              if (nodeInfo != null && nodeInfo.canAcceptTask()) {
+                LOG.info("Assigning " + inst + " when looking for " + host + 
"." +
+                    " FirstRequestedHost=" + (prefHostCount == 0));
+                return new NodeServiceInstancePair(inst, nodeInfo);
               }
             }
           }
         }
       }
       /* fall through - miss in locality (random scheduling) */
-      Entry<ServiceInstance, NodeInfo> [] all = 
instanceToNodeMap.entrySet().toArray(new Entry[instanceToNodeMap.size()]);
+      Entry<ServiceInstance, NodeInfo>[] all =
+          instanceToNodeMap.entrySet().toArray(new 
Entry[instanceToNodeMap.size()]);
       // Check again
       if (all.length > 0) {
         int n = random.nextInt(all.length);
         // start at random offset and iterate whole list
         for (int i = 0; i < all.length; i++) {
           Entry<ServiceInstance, NodeInfo> inst = all[(i + n) % all.length];
-          if (inst.getKey().isAlive() && !inst.getValue().isDisabled()) {
+          if (inst.getValue().canAcceptTask()) {
             LOG.info("Assigning " + inst + " when looking for any host, from 
#hosts=" + all.length);
-            return inst.getKey();
+            return new NodeServiceInstancePair(inst.getKey(), inst.getValue());
           }
         }
       }
+      return null;
     } finally {
       readLock.unlock();
     }
+  }
 
-    // TODO Ideally, each refresh operation should addNodes if they don't 
already exist.
-    // Even better would be to get notifications from the service impl when a 
node gets added or removed.
-    // Instead of having to walk through the entire list. The computation of a 
node getting added or
-    // removed already exists in the DynamicRegistry implementation.
-
-
-    // This will only happen if no allocations are possible, which means all 
other nodes have
-    // been blacklisted.
-    // TODO Look for new nodes more often. See comment above.
+  // TODO Each refresh operation should addNodes if they don't already exist.
+  // Even better would be to get notifications from the service impl when a 
node gets added or removed.
+  // Instead of having to walk through the entire list. The computation of a 
node getting added or
+  // removed already exists in the DynamicRegistry implementation.
+  private void refreshInstances() {
+    try {
+      activeInstances.refresh(); // handles its own sync
+    } catch (IOException ioe) {
+      LOG.warn("Could not refresh list of active instances", ioe);
+    }
+  }
 
+  private void scanForNodeChanges() {
     /* check again whether nodes are disabled or just missing */
     writeLock.lock();
     try {
       for (ServiceInstance inst : activeInstances.getAll().values()) {
         if (inst.isAlive() && instanceToNodeMap.containsKey(inst) == false) {
           /* that's a good node, not added to the allocations yet */
-          LOG.info("Found a new node: " + inst + ". Adding to node list and 
disabling to trigger scheduling");
-          addNode(inst, new NodeInfo(inst, BACKOFF_FACTOR, clock));
-          // mark it as disabled to let the pending tasks go there
-          // TODO If disabling the instance, have it wake up immediately 
instead of waiting.
-          // Ideally get rid of this requirement, by having all tasks 
allocated via a queue.
-          disableInstance(inst, true);
+          LOG.info("Found a new node: " + inst + ".");
+          addNode(inst, new NodeInfo(inst, nodeBlacklistConf, clock, 
numSchedulableTasksPerNode));
         }
       }
-      /* do not allocate nodes from this process, as then the pending tasks 
will get starved */
     } finally {
       writeLock.unlock();
     }
-    return null;
-  }
-
-  private void refreshInstances() {
-    try {
-      activeInstances.refresh(); // handles its own sync
-    } catch (IOException ioe) {
-      LOG.warn("Could not refresh list of active instances", ioe);
-    }
   }
 
   private void addNode(ServiceInstance inst, NodeInfo node) {
     LOG.info("Adding node: " + inst);
     instanceToNodeMap.put(inst, node);
-    // TODO Trigger a scheduling run each time a new node is added.
+    // Trigger scheduling since a new node became available.
+    trySchedulingPendingTasks();
   }
 
   private void reenableDisabledNode(NodeInfo nodeInfo) {
     writeLock.lock();
     try {
-      if (!nodeInfo.isBusy()) {
+      if (nodeInfo.hadCommFailure()) {
         // If the node being re-enabled was not marked busy previously, then 
it was disabled due to
         // some other failure. Refresh the service list to see if it's been 
removed permanently.
         refreshInstances();
       }
-      LOG.info("Attempting to re-enable node: " + nodeInfo.host);
-      if (nodeInfo.host.isAlive()) {
+      LOG.info("Attempting to re-enable node: " + 
nodeInfo.getServiceInstance());
+      if (nodeInfo.getServiceInstance().isAlive()) {
         nodeInfo.enableNode();
       } else {
         if (LOG.isInfoEnabled()) {
@@ -555,7 +578,7 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
     }
   }
 
-  private void disableInstance(ServiceInstance instance, boolean busy) {
+  private void disableInstance(ServiceInstance instance, boolean 
isCommFailure) {
     writeLock.lock();
     try {
       NodeInfo nodeInfo = instanceToNodeMap.get(instance);
@@ -564,13 +587,9 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
           LOG.debug("Node: " + instance + " already disabled, or invalid. Not 
doing anything.");
         }
       } else {
-        nodeInfo.disableNode(nodeReEnableTimeout);
-        nodeInfo.setBusy(busy); // daemon failure vs daemon busy
+        nodeInfo.disableNode(isCommFailure);
         // TODO: handle task to container map events in case of hard failures
         disabledNodesQueue.add(nodeInfo);
-        if (LOG.isInfoEnabled()) {
-          LOG.info("Disabling instance " + instance + " for " + 
nodeReEnableTimeout + " milli-seconds");
-        }
       }
     } finally {
       writeLock.unlock();
@@ -580,7 +599,6 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
   private void addPendingTask(TaskInfo taskInfo) {
     writeLock.lock();
     try {
-      dagStats.registerDelayedAllocation();
       List<TaskInfo> tasksAtPriority = pendingTasks.get(taskInfo.priority);
       if (tasksAtPriority == null) {
         tasksAtPriority = new LinkedList<>();
@@ -607,7 +625,8 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
     }
   }
 
-  private void schedulePendingTasks() {
+  @VisibleForTesting
+  protected void schedulePendingTasks() {
     writeLock.lock();
     try {
       Iterator<Entry<Priority, List<TaskInfo>>> pendingIterator =
@@ -618,7 +637,15 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
         Iterator<TaskInfo> taskIter = taskListAtPriority.iterator();
         boolean scheduledAllAtPriority = true;
         while (taskIter.hasNext()) {
+
+          // TODO Optimization: Add a check to see if there's any capacity 
available. No point in
+          // walking through all active nodes, if they don't have potential 
capacity.
+
           TaskInfo taskInfo = taskIter.next();
+          if (taskInfo.getNumPreviousAssignAttempts() == 1) {
+            dagStats.registerDelayedAllocation();
+          }
+          taskInfo.triedAssigningTask();
           boolean scheduled = scheduleTask(taskInfo);
           if (scheduled) {
             taskIter.remove();
@@ -642,20 +669,23 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
     }
   }
 
+
   private boolean scheduleTask(TaskInfo taskInfo) {
-    ServiceInstance host = selectHost(taskInfo);
-    if (host == null) {
+    NodeServiceInstancePair nsPair = selectHost(taskInfo);
+    if (nsPair == null) {
       return false;
     } else {
       Container container =
-          containerFactory.createContainer(resourcePerExecutor, 
taskInfo.priority, host.getHost(),
-              host.getRpcPort());
+          containerFactory.createContainer(resourcePerExecutor, 
taskInfo.priority,
+              nsPair.getServiceInstance().getHost(),
+              nsPair.getServiceInstance().getRpcPort());
       writeLock.lock(); // While updating local structures
       try {
         dagStats.registerTaskAllocated(taskInfo.requestedHosts, 
taskInfo.requestedRacks,
-            host.getHost());
-        taskInfo.setAssignmentInfo(host, container.getId());
+            nsPair.getServiceInstance().getHost());
+        taskInfo.setAssignmentInfo(nsPair.getServiceInstance(), 
container.getId());
         knownTasks.putIfAbsent(taskInfo.task, taskInfo);
+        nsPair.getNodeInfo().registerTaskScheduled();
       } finally {
         writeLock.unlock();
       }
@@ -665,30 +695,92 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
     }
   }
 
-  private class PendingTaskSchedulerCallable implements Callable<Void> {
+  private class NodeEnablerCallable implements Callable<Void> {
 
     private AtomicBoolean isShutdown = new AtomicBoolean(false);
+    private static final long REFRESH_INTERVAL = 10000l;
+    long nextPollInterval = REFRESH_INTERVAL;
+    long lastRefreshTime;
 
     @Override
     public Void call() {
 
+      lastRefreshTime = System.currentTimeMillis();
       while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) {
         try {
           while (true) {
-            NodeInfo nodeInfo = disabledNodesQueue.take();
-            // A node became available. Enable the node and try scheduling.
-            reenableDisabledNode(nodeInfo);
-            schedulePendingTasks();
+            NodeInfo nodeInfo = disabledNodesQueue.poll(nextPollInterval, 
TimeUnit.MILLISECONDS);
+            if (nodeInfo != null) {
+              long currentTime = System.currentTimeMillis();
+              // A node became available. Enable the node and try scheduling.
+              reenableDisabledNode(nodeInfo);
+              trySchedulingPendingTasks();
+
+              nextPollInterval -= (currentTime - lastRefreshTime);
+            }
+
+            if (nextPollInterval < 0 || nodeInfo == null) {
+              // timeout expired. Reset the poll interval and refresh nodes.
+              nextPollInterval = REFRESH_INTERVAL;
+              lastRefreshTime = System.currentTimeMillis();
+              // TODO Get rid of this polling once we have notificaitons from 
the registry sub-system
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Refreshing instances based on poll interval");
+              }
+              refreshInstances();
+              scanForNodeChanges();
+            }
+          }
+        } catch (InterruptedException e) {
+          if (isShutdown.get()) {
+            LOG.info("NodeEnabler thread interrupted after shutdown");
+            break;
+          } else {
+            LOG.warn("NodeEnabler thread interrupted without being shutdown");
+            throw new RuntimeException("NodeEnabler thread interrupted without 
being shutdown", e);
           }
+        }
+      }
+      return null;
+    }
+
+    // Call this first, then send in an interrupt to the thread.
+    public void shutdown() {
+      isShutdown.set(true);
+    }
+  }
+
+  private void trySchedulingPendingTasks() {
+    scheduleLock.lock();
+    try {
+      scheduleCondition.signal();
+    } finally {
+      scheduleLock.unlock();
+    }
+  }
+
+  private class SchedulerCallable implements Callable<Void> {
+    private AtomicBoolean isShutdown = new AtomicBoolean(false);
+
+    @Override
+    public Void call() {
+      while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) {
+        scheduleLock.lock();
+        try {
+          scheduleCondition.await();
         } catch (InterruptedException e) {
           if (isShutdown.get()) {
-            LOG.info("Disabled node wait interrupted after shutdown. Stopping 
the disabled node poll");
+            LOG.info("Scheduler thread interrupted after shutdown");
             break;
           } else {
-            LOG.warn("Interrupted while waiting for disabled nodes.");
-            throw new RuntimeException("Interrupted while waiting for disabled 
nodes", e);
+            LOG.warn("Scheduler thread interrupted without being shutdown");
+            throw new RuntimeException("Scheduler thread interrupted without 
being shutdown", e);
           }
+        } finally {
+          scheduleLock.unlock();
         }
+        // Schedule outside of the scheduleLock - which should only be used to 
wait on the condition.
+        schedulePendingTasks();
       }
       return null;
     }
@@ -701,68 +793,126 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
 
   @VisibleForTesting
   static class NodeInfo implements Delayed {
-    private final float constBackOffFactor;
-    final ServiceInstance host;
+    private final NodeBlacklistConf blacklistConf;
+    final ServiceInstance serviceInstance;
     private final Clock clock;
 
     long expireTimeMillis = -1;
     private long numSuccessfulTasks = 0;
     private long numSuccessfulTasksAtLastBlacklist = -1;
     float cumulativeBackoffFactor = 1.0f;
-    // A node could be disabled for reasons other than being busy.
-    private boolean disabled = false;
-    // If disabled, the node could be marked as busy.
-    private boolean busy;
 
+    // Indicates whether a node had a recent communication failure.
+    private boolean hadCommFailure = false;
 
-    NodeInfo(ServiceInstance host, float backoffFactor, Clock clock) {
-      this.host = host;
-      constBackOffFactor = backoffFactor;
+    // Indicates whether a node is disabled - for whatever reason - 
commFailure, busy, etc.
+    private boolean disabled = false;
+
+    private int numScheduledTasks = 0;
+    private final int numSchedulableTasks;
+
+
+    /**
+     * Create a NodeInfo bound to a service instance
+     *
+     * @param serviceInstance         the associated serviceInstance
+     * @param blacklistConf           blacklist configuration
+     * @param clock                   clock to use to obtain timing information
+     * @param numSchedulableTasksConf number of schedulable tasks on the node. 
0 represents auto
+     *                                detect based on the serviceInstance, -1 
indicates indicates
+     *                                unlimited capacity
+     */
+    NodeInfo(ServiceInstance serviceInstance, NodeBlacklistConf blacklistConf, 
Clock clock, int numSchedulableTasksConf) {
+      Preconditions.checkArgument(numSchedulableTasksConf >= -1, 
"NumSchedulableTasks must be >=-1");
+      this.serviceInstance = serviceInstance;
+      this.blacklistConf = blacklistConf;
       this.clock = clock;
+
+      if (numSchedulableTasksConf == 0) {
+        int pendingQueueuCapacity = 0;
+        String pendingQueueCapacityString = serviceInstance.getProperties()
+            .get(LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Setting up node: " + serviceInstance + ", with available 
capacity=" +
+              serviceInstance.getResource().getVirtualCores() + ", 
pendingQueueCapacity=" +
+              pendingQueueCapacityString);
+        }
+
+        if (pendingQueueCapacityString != null) {
+          pendingQueueuCapacity = Integer.parseInt(pendingQueueCapacityString);
+        }
+        this.numSchedulableTasks = 
serviceInstance.getResource().getVirtualCores() + pendingQueueuCapacity;
+      } else {
+        this.numSchedulableTasks = numSchedulableTasksConf;
+      }
+      LOG.info("Setting up node: " + serviceInstance + " with 
schedulableCapacity=" + this.numSchedulableTasks);
+    }
+
+    ServiceInstance getServiceInstance() {
+      return serviceInstance;
     }
 
     void enableNode() {
       expireTimeMillis = -1;
       disabled = false;
+      hadCommFailure = false;
     }
 
-    void disableNode(long duration) {
+    void disableNode(boolean commFailure) {
+      long duration = blacklistConf.minDelay;
       long currentTime = clock.getTime();
+      this.hadCommFailure = commFailure;
       disabled = true;
       if (numSuccessfulTasksAtLastBlacklist == numSuccessfulTasks) {
-        // Blacklisted again, without any progress. Will never kick in for the 
first run.
-        cumulativeBackoffFactor = cumulativeBackoffFactor * constBackOffFactor;
+        // Relying on a task succeeding to reset the exponent.
+        // There's no notifications on whether a task gets accepted or not. 
That would be ideal to
+        // reset this.
+        cumulativeBackoffFactor = cumulativeBackoffFactor * 
blacklistConf.backoffFactor;
       } else {
         // Was able to execute something before the last blacklist. Reset the 
exponent.
         cumulativeBackoffFactor = 1.0f;
       }
-      expireTimeMillis = currentTime + (long) (duration * 
cumulativeBackoffFactor);
+
+      long delayTime = (long) (duration * cumulativeBackoffFactor);
+      if (delayTime > blacklistConf.maxDelay) {
+        delayTime = blacklistConf.maxDelay;
+      }
+      if (LOG.isInfoEnabled()) {
+        LOG.info("Disabling instance " + serviceInstance + " for " + delayTime 
+ " milli-seconds");
+      }
+      expireTimeMillis = currentTime + delayTime;
       numSuccessfulTasksAtLastBlacklist = numSuccessfulTasks;
+    }
 
+    void registerTaskScheduled() {
+      numScheduledTasks++;
     }
 
     void registerTaskSuccess() {
-      // TODO If a task succeeds, we may have free slots. Mark the node as 
!busy. Ideally take it out
-      // of the queue for more allocations.
-      // For now, not chanigng the busy status,
-
-      // this.busy = false;
-      // this.disabled = false;
       numSuccessfulTasks++;
+      numScheduledTasks--;
     }
 
-    public void setBusy(boolean busy) {
-      this.busy = busy;
-    }
-
-    public boolean isBusy() {
-      return busy;
+    void registerUnsuccessfulTaskEnd() {
+      numScheduledTasks--;
     }
 
     public boolean isDisabled() {
       return disabled;
     }
 
+    public boolean hadCommFailure() {
+      return hadCommFailure;
+    }
+    /* Returning true does not guarantee that the task will run, considering 
other queries
+    may be running in the system. Also depends upon the capacity usage 
configuration
+     */
+    public boolean canAcceptTask() {
+      boolean result = !hadCommFailure && !disabled && 
serviceInstance.isAlive()
+          &&(numSchedulableTasks == -1 || ((numSchedulableTasks - 
numScheduledTasks) > 0));
+      return result;
+    }
+
     @Override
     public long getDelay(TimeUnit unit) {
       return unit.convert(expireTimeMillis - clock.getTime(), 
TimeUnit.MILLISECONDS);
@@ -782,10 +932,15 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
 
     @Override
     public String toString() {
-      return "NodeInfo{" + "constBackOffFactor=" + constBackOffFactor + ", 
host=" + host
+      return "NodeInfo{" + "instance=" + serviceInstance
           + ", expireTimeMillis=" + expireTimeMillis + ", numSuccessfulTasks=" 
+ numSuccessfulTasks
           + ", numSuccessfulTasksAtLastBlacklist=" + 
numSuccessfulTasksAtLastBlacklist
-          + ", cumulativeBackoffFactor=" + cumulativeBackoffFactor + '}';
+          + ", cumulativeBackoffFactor=" + cumulativeBackoffFactor
+          + ", numSchedulableTasks=" + numSchedulableTasks
+          + ", numScheduledTasks=" + numScheduledTasks
+          + ", disabled=" + disabled
+          + ", commFailures=" + hadCommFailure
+          +'}';
     }
   }
 
@@ -880,6 +1035,7 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
     ContainerId containerId;
     ServiceInstance assignedInstance;
     private boolean assigned = false;
+    private int numAssignAttempts = 0;
 
     public TaskInfo(Object task, Object clientCookie, Priority priority, 
Resource capability,
         String[] hosts, String[] racks, long requestTime) {
@@ -897,6 +1053,52 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
       this.containerId = containerId;
       assigned = true;
     }
+
+    void triedAssigningTask() {
+      numAssignAttempts++;
+    }
+
+    int getNumPreviousAssignAttempts() {
+      return numAssignAttempts;
+    }
   }
 
+  private static class NodeServiceInstancePair {
+    final NodeInfo nodeInfo;
+    final ServiceInstance serviceInstance;
+
+    private NodeServiceInstancePair(ServiceInstance serviceInstance, NodeInfo 
nodeInfo) {
+      this.serviceInstance = serviceInstance;
+      this.nodeInfo = nodeInfo;
+    }
+
+    public ServiceInstance getServiceInstance() {
+      return serviceInstance;
+    }
+
+    public NodeInfo getNodeInfo() {
+      return nodeInfo;
+    }
+  }
+
+  private static final class NodeBlacklistConf {
+    private final long minDelay;
+    private final long maxDelay;
+    private final float backoffFactor;
+
+    public NodeBlacklistConf(long minDelay, long maxDelay, float 
backoffFactor) {
+      this.minDelay = minDelay;
+      this.maxDelay = maxDelay;
+      this.backoffFactor = backoffFactor;
+    }
+
+    @Override
+    public String toString() {
+      return "NodeBlacklistConf{" +
+          "minDelay=" + minDelay +
+          ", maxDelay=" + maxDelay +
+          ", backoffFactor=" + backoffFactor +
+          '}';
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e36f1fc4/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
 
b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
index b116af0..a0399da 100644
--- 
a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
+++ 
b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
@@ -15,10 +15,7 @@
 package org.apache.tez.dag.app.rm;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doReturn;
@@ -28,7 +25,13 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -61,7 +64,9 @@ public class TestLlapTaskSchedulerService {
 
       Object task1 = new Object();
       Object clientCookie1 = new Object();
+      int schedulerRunNumber = tsWrapper.getSchedulerRunNumber();
       tsWrapper.allocateTask(task1, hosts1, priority1, clientCookie1);
+      tsWrapper.awaitSchedulerRunNumber(schedulerRunNumber + 1);
       verify(tsWrapper.mockAppCallback).taskAllocated(eq(task1), 
eq(clientCookie1), any(Container.class));
       // TODO Verify this is on host1.
       assertEquals(1, tsWrapper.ts.dagStats.numLocalAllocations);
@@ -79,7 +84,9 @@ public class TestLlapTaskSchedulerService {
 
       Object task1 = new Object();
       Object clientCookie1 = new Object();
+      int schedulerRunNumber = tsWrapper.getSchedulerRunNumber();
       tsWrapper.allocateTask(task1, null, priority1, clientCookie1);
+      tsWrapper.awaitSchedulerRunNumber(schedulerRunNumber + 1);
       verify(tsWrapper.mockAppCallback).taskAllocated(eq(task1), 
eq(clientCookie1), any(Container.class));
       assertEquals(1, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest);
     } finally {
@@ -95,7 +102,9 @@ public class TestLlapTaskSchedulerService {
       String[] hosts1 = new String[]{HOST1};
       Object task1 = new Object();
       Object clientCookie1 = new Object();
+      int schedulerRunNumber = tsWrapper.getSchedulerRunNumber();
       tsWrapper.allocateTask(task1, hosts1, priority1, clientCookie1);
+      tsWrapper.awaitSchedulerRunNumber(schedulerRunNumber + 1);
       verify(tsWrapper.mockAppCallback).taskAllocated(eq(task1), 
eq(clientCookie1), any(Container.class));
       assertEquals(1, tsWrapper.ts.dagStats.numLocalAllocations);
       assertEquals(0, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest);
@@ -107,16 +116,18 @@ public class TestLlapTaskSchedulerService {
 
       // Verify that the node is blacklisted
       assertEquals(1, tsWrapper.ts.dagStats.numRejectedTasks);
-      assertEquals(2, tsWrapper.ts.instanceToNodeMap.size());
+      assertEquals(3, tsWrapper.ts.instanceToNodeMap.size());
       LlapTaskSchedulerService.NodeInfo disabledNodeInfo = 
tsWrapper.ts.disabledNodesQueue.peek();
       assertNotNull(disabledNodeInfo);
-      assertEquals(HOST1, disabledNodeInfo.host.getHost());
-      assertEquals((10000l), disabledNodeInfo.getDelay(TimeUnit.NANOSECONDS));
+      assertEquals(HOST1, disabledNodeInfo.serviceInstance.getHost());
+      assertEquals((10000l), disabledNodeInfo.getDelay(TimeUnit.MILLISECONDS));
       assertEquals((10000l + 10000l), disabledNodeInfo.expireTimeMillis);
 
       Object task2 = new Object();
       Object clientCookie2 = new Object();
+      schedulerRunNumber = tsWrapper.getSchedulerRunNumber();
       tsWrapper.allocateTask(task2, hosts1, priority1, clientCookie2);
+      tsWrapper.awaitSchedulerRunNumber(schedulerRunNumber + 1);
       verify(tsWrapper.mockAppCallback).taskAllocated(eq(task2), 
eq(clientCookie2), any(Container.class));
       assertEquals(1, tsWrapper.ts.dagStats.numLocalAllocations);
       assertEquals(0, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest);
@@ -144,9 +155,15 @@ public class TestLlapTaskSchedulerService {
       Object clientCookie2 = new Object();
       Object task3 = new Object();
       Object clientCookie3 = new Object();
+
+      tsWrapper.controlScheduler(true);
+      int schedulerRunNumber = tsWrapper.getSchedulerRunNumber();
       tsWrapper.allocateTask(task1, hosts1, priority1, clientCookie1);
       tsWrapper.allocateTask(task2, hosts2, priority1, clientCookie2);
       tsWrapper.allocateTask(task3, hosts3, priority1, clientCookie3);
+      tsWrapper.signalScheduler();
+      tsWrapper.controlScheduler(false);
+      tsWrapper.awaitSchedulerRunNumber(schedulerRunNumber + 1);
       verify(tsWrapper.mockAppCallback, 
times(3)).taskAllocated(any(Object.class), any(Object.class), 
any(Container.class));
       assertEquals(3, tsWrapper.ts.dagStats.numLocalAllocations);
       assertEquals(0, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest);
@@ -159,7 +176,7 @@ public class TestLlapTaskSchedulerService {
 
       // Verify that the node is blacklisted
       assertEquals(3, tsWrapper.ts.dagStats.numRejectedTasks);
-      assertEquals(0, tsWrapper.ts.instanceToNodeMap.size());
+      assertEquals(3, tsWrapper.ts.instanceToNodeMap.size());
       assertEquals(3, tsWrapper.ts.disabledNodesQueue.size());
 
 
@@ -169,20 +186,24 @@ public class TestLlapTaskSchedulerService {
       Object clientCookie5 = new Object();
       Object task6 = new Object();
       Object clientCookie6 = new Object();
+      tsWrapper.controlScheduler(true);
+      schedulerRunNumber = tsWrapper.getSchedulerRunNumber();
       tsWrapper.allocateTask(task4, hosts1, priority1, clientCookie4);
       tsWrapper.allocateTask(task5, hosts2, priority1, clientCookie5);
       tsWrapper.allocateTask(task6, hosts3, priority1, clientCookie6);
-
-      // Sleep longer than the re-enable timeout.
-      Thread.sleep(3000l);
+      tsWrapper.signalScheduler();
+      tsWrapper.controlScheduler(false);
+      tsWrapper.awaitSchedulerRunNumber(schedulerRunNumber + 2);
 
       ArgumentCaptor<Container> argumentCaptor = 
ArgumentCaptor.forClass(Container.class);
       verify(tsWrapper.mockAppCallback, 
times(3)).taskAllocated(any(Object.class), any(Object.class), 
argumentCaptor.capture());
 
-      // Everything should go to host1 since it gets of the list first, and 
there are no locality delays
-      assertEquals(4, tsWrapper.ts.dagStats.numLocalAllocations);
+      // Limited allocations per node. So better locality when nodes come out 
of the blacklist
+      // TODO This is flaky, since multiple nodes can get enabled at roughly 
the same time,
+      // which affects the locality matching
+      assertEquals(6, tsWrapper.ts.dagStats.numLocalAllocations);
       assertEquals(0, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest);
-      assertEquals(2, tsWrapper.ts.dagStats.numNonLocalAllocations);
+      assertEquals(0, tsWrapper.ts.dagStats.numNonLocalAllocations);
 
       // TODO Enhance this to verify unblacklisting of the node.
     } finally {
@@ -197,7 +218,7 @@ public class TestLlapTaskSchedulerService {
     AppContext mockAppContext = mock(AppContext.class);
     ControlledClock clock = new ControlledClock(new SystemClock());
     ApplicationAttemptId appAttemptId = 
ApplicationAttemptId.newInstance(ApplicationId.newInstance(1000, 1), 1);
-    LlapTaskSchedulerService ts;
+    LlapTaskSchedulerServiceForTest ts;
 
     TestTaskSchedulerServiceWrapper() {
       this(2000l);
@@ -207,7 +228,9 @@ public class TestLlapTaskSchedulerService {
       conf = new Configuration();
       conf.setStrings(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS, HOST1, 
HOST2, HOST3);
       conf.setInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS, 4);
-      
conf.setLong(LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_NODE_REENABLE_TIMEOUT_MILLIS,
 disableTimeoutMillis);
+      
conf.setLong(LlapConfiguration.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MILLIS,
+          disableTimeoutMillis);
+      
conf.setBoolean(LlapTaskSchedulerServiceForTest.LLAP_TASK_SCHEDULER_IN_TEST, 
true);
 
       doReturn(clock).when(mockAppContext).getClock();
       doReturn(appAttemptId).when(mockAppContext).getApplicationAttemptId();
@@ -216,8 +239,25 @@ public class TestLlapTaskSchedulerService {
 
       ts.init(conf);
       ts.start();
+      // One shceduler pass from the nodes that are added at startup
+      awaitSchedulerRunNumber(1);
+    }
+
+    int getSchedulerRunNumber() {
+      return ts.forTestGetSchedulerRunNumber();
+    }
+
+    void awaitSchedulerRunNumber(int runNumber) {
+      ts.forTestAwaitSchedulingRun(runNumber);
     }
 
+    void controlScheduler(boolean val) {
+      ts.forTestSetupSchedulerStartWait(val);
+    }
+
+    void signalScheduler() {
+      ts.forTestSignalSchedulerStart();
+    }
     void resetAppCallback() {
       reset(mockAppCallback);
     }
@@ -237,6 +277,15 @@ public class TestLlapTaskSchedulerService {
 
   private static class LlapTaskSchedulerServiceForTest extends 
LlapTaskSchedulerService {
 
+    // For Unit Testing
+    static final String LLAP_TASK_SCHEDULER_IN_TEST = 
"llap.task.scheduler.in-test";
+    private final boolean inTest;
+    private final Lock forTestSchedulerLock = new ReentrantLock();
+    private final Condition forTestSchedulerRunCondition = 
forTestSchedulerLock.newCondition();
+    private final Condition forTestSchedulerRunStartCondition = 
forTestSchedulerLock.newCondition();
+    private final AtomicInteger forTestNumSchedulerRuns = new AtomicInteger(0);
+    private final AtomicBoolean forTestControlledScheduleStart = new 
AtomicBoolean(false);
+    private boolean forTestSchedulerGoSignal = false;
 
     public LlapTaskSchedulerServiceForTest(
         TaskSchedulerAppCallback appClient, AppContext appContext, String 
clientHostname,
@@ -244,6 +293,7 @@ public class TestLlapTaskSchedulerService {
         Configuration conf) {
       super(appClient, appContext, clientHostname, clientPort, trackingUrl, 
customAppIdIdentifier,
           conf);
+      this.inTest = conf.getBoolean(LLAP_TASK_SCHEDULER_IN_TEST, false);
     }
 
     @Override
@@ -251,5 +301,85 @@ public class TestLlapTaskSchedulerService {
         TaskSchedulerAppCallback realAppClient) {
       return realAppClient;
     }
+
+    protected void schedulePendingTasks() {
+      try {
+        forTestAwaitSchedulerStartSignal();
+        super.schedulePendingTasks();
+      } finally {
+        forTestMaybeSignalSchedulerRun();
+      }
+    }
+
+
+    private void forTestMaybeSignalSchedulerRun() {
+      if (inTest) {
+        forTestSchedulerLock.lock();
+        try {
+          forTestNumSchedulerRuns.incrementAndGet();
+          forTestSchedulerRunCondition.signal();
+        } finally {
+          forTestSchedulerLock.unlock();
+        }
+      }
+    }
+
+    int forTestGetSchedulerRunNumber() {
+      return forTestNumSchedulerRuns.get();
+    }
+
+    @VisibleForTesting
+    void forTestAwaitSchedulingRun(int runNumber) {
+      if (inTest) {
+        forTestSchedulerLock.lock();
+        try {
+          while (forTestNumSchedulerRuns.get() != runNumber) {
+            forTestSchedulerRunCondition.await();
+          }
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        } finally {
+          forTestSchedulerLock.unlock();
+        }
+      }
+    }
+
+    void forTestSetupSchedulerStartWait(boolean val) {
+      if (inTest) {
+        forTestControlledScheduleStart.set(val);
+        forTestSchedulerGoSignal = false;
+      }
+    }
+
+    void forTestSignalSchedulerStart() {
+      if (inTest) {
+        forTestSchedulerLock.lock();
+        try {
+          forTestSchedulerGoSignal = true;
+          forTestSchedulerRunStartCondition.signal();
+        } finally {
+          forTestSchedulerLock.unlock();
+        }
+      }
+    }
+
+    private void forTestAwaitSchedulerStartSignal() {
+      if (inTest) {
+        forTestSchedulerLock.lock();
+        try {
+          if (forTestControlledScheduleStart.get()) {
+            if (forTestSchedulerGoSignal) {
+              forTestSchedulerGoSignal = false;
+              return;
+            }
+            forTestSchedulerRunStartCondition.await();
+          }
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        } finally {
+          forTestSchedulerLock.unlock();
+        }
+      }
+    }
   }
 }

Reply via email to