Repository: hive
Updated Branches:
  refs/heads/llap 2024c962d -> b8b94f297


http://git-wip-us.apache.org/repos/asf/hive/blob/b8b94f29/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java 
b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
index d355029..5a2b77d 100644
--- 
a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
+++ 
b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
@@ -26,9 +26,11 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.NavigableMap;
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -39,6 +41,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -96,6 +99,8 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
 
   // Tracks running and queued tasks. Cleared after a task completes.
   private final ConcurrentMap<Object, TaskInfo> knownTasks = new 
ConcurrentHashMap<>();
+  private final TreeMap<Integer, TreeSet<TaskInfo>> runningTasks = new 
TreeMap<>();
+  private static final TaskStartComparator TASK_INFO_COMPARATOR = new 
TaskStartComparator();
 
   // Queue for disabled nodes. Nodes make it out of this queue when their 
expiration timeout is hit.
   @VisibleForTesting
@@ -119,6 +124,7 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
   private final SchedulerCallable schedulerCallable = new SchedulerCallable();
 
   private final AtomicBoolean isStopped = new AtomicBoolean(false);
+  private final AtomicInteger pendingPreemptions = new AtomicInteger(0);
 
   private final NodeBlacklistConf nodeBlacklistConf;
 
@@ -134,7 +140,6 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
 
   private final LlapRegistryService registry = new LlapRegistryService(false);
 
-
   private volatile ListenableFuture<Void> nodeEnablerFuture;
   private volatile ListenableFuture<Void> schedulerFuture;
 
@@ -385,6 +390,7 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
     trySchedulingPendingTasks();
   }
 
+
   // This may be invoked before a container is ever assigned to a task. 
allocateTask... app decides
   // the task is no longer required, and asks for a de-allocation.
   @Override
@@ -392,7 +398,7 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
     writeLock.lock(); // Updating several local structures
     TaskInfo taskInfo;
     try {
-      taskInfo = knownTasks.remove(task);
+      taskInfo = unregisterTask(task);
       if (taskInfo == null) {
         LOG.error("Could not determine ContainerId for task: "
             + task
@@ -418,12 +424,12 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
 
       NodeInfo nodeInfo = instanceToNodeMap.get(assignedInstance);
       assert nodeInfo != null;
-      if (taskSucceeded) {
-        // The node may have been blacklisted at this point - which means it 
may not be in the
-        // activeNodeList.
-
-        nodeInfo.registerTaskSuccess();
 
+      // Re-enable the node if preempted
+      if (taskInfo.preempted) {
+        LOG.info("Processing deallocateTask for {} which was preempted, 
EndReason={}", task, endReason);
+        pendingPreemptions.decrementAndGet();
+        nodeInfo.registerUnsuccessfulTaskEnd(true);
         if (nodeInfo.isDisabled()) {
           // Re-enable the node. If a task succeeded, a slot may have become 
available.
           // Also reset commFailures since a task was able to communicate back 
and indicate success.
@@ -435,20 +441,40 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
         }
         // In case of success, trigger a scheduling run for pending tasks.
         trySchedulingPendingTasks();
-
-      } else if (!taskSucceeded) {
-        nodeInfo.registerUnsuccessfulTaskEnd();
-        if (endReason != null && EnumSet
-            .of(TaskAttemptEndReason.SERVICE_BUSY, 
TaskAttemptEndReason.COMMUNICATION_ERROR)
-            .contains(endReason)) {
-          if (endReason == TaskAttemptEndReason.COMMUNICATION_ERROR) {
-            dagStats.registerCommFailure(taskInfo.assignedInstance.getHost());
-          } else if (endReason == TaskAttemptEndReason.SERVICE_BUSY) {
-            dagStats.registerTaskRejected(taskInfo.assignedInstance.getHost());
+      } else {
+        if (taskSucceeded) {
+          // The node may have been blacklisted at this point - which means it 
may not be in the
+          // activeNodeList.
+
+          nodeInfo.registerTaskSuccess();
+
+          if (nodeInfo.isDisabled()) {
+            // Re-enable the node. If a task succeeded, a slot may have become 
available.
+            // Also reset commFailures since a task was able to communicate 
back and indicate success.
+            nodeInfo.enableNode();
+            // Re-insert into the queue to force the poll thread to remove the 
element.
+            if (disabledNodesQueue.remove(nodeInfo)) {
+              disabledNodesQueue.add(nodeInfo);
+            }
           }
+          // In case of success, trigger a scheduling run for pending tasks.
+          trySchedulingPendingTasks();
+
+        } else if (!taskSucceeded) {
+          nodeInfo.registerUnsuccessfulTaskEnd(false);
+          if (endReason != null && EnumSet
+              .of(TaskAttemptEndReason.SERVICE_BUSY, 
TaskAttemptEndReason.COMMUNICATION_ERROR)
+              .contains(endReason)) {
+            if (endReason == TaskAttemptEndReason.COMMUNICATION_ERROR) {
+              
dagStats.registerCommFailure(taskInfo.assignedInstance.getHost());
+            } else if (endReason == TaskAttemptEndReason.SERVICE_BUSY) {
+              
dagStats.registerTaskRejected(taskInfo.assignedInstance.getHost());
+            }
+          }
+          boolean commFailure =
+              endReason != null && endReason == 
TaskAttemptEndReason.COMMUNICATION_ERROR;
+          disableInstance(assignedInstance, commFailure);
         }
-        boolean commFailure = endReason != null && endReason == 
TaskAttemptEndReason.COMMUNICATION_ERROR;
-        disableInstance(assignedInstance, commFailure);
       }
     } finally {
       writeLock.unlock();
@@ -461,7 +487,7 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
   public Object deallocateContainer(ContainerId containerId) {
     LOG.info("DEBUG: Ignoring deallocateContainer for containerId: " + 
containerId);
     // Containers are not being tracked for re-use.
-    // This is safe to ignore since a deallocate task should have come in 
earlier.
+    // This is safe to ignore since a deallocate task will come in.
     return null;
   }
 
@@ -635,6 +661,7 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
     }
   }
 
+  /* Remove a task from the pending list */
   private void removePendingTask(TaskInfo taskInfo) {
     writeLock.lock();
     try {
@@ -649,6 +676,48 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
     }
   }
 
+  /* Register a running task into the runningTasks structure */
+  private void registerRunningTask(TaskInfo taskInfo) {
+    writeLock.lock();
+    try {
+      int priority = taskInfo.priority.getPriority();
+      TreeSet<TaskInfo> tasksAtpriority = runningTasks.get(priority);
+      if (tasksAtpriority == null) {
+        tasksAtpriority = new TreeSet<>(TASK_INFO_COMPARATOR);
+        runningTasks.put(priority, tasksAtpriority);
+      }
+      tasksAtpriority.add(taskInfo);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  /* Unregister a task from the known and running structures */
+  private TaskInfo unregisterTask(Object task) {
+    writeLock.lock();
+    try {
+      TaskInfo taskInfo = knownTasks.remove(task);
+      if (taskInfo != null) {
+        if (taskInfo.assigned) {
+          // Remove from the running list.
+          int priority = taskInfo.priority.getPriority();
+          Set<TaskInfo> tasksAtPriority = runningTasks.get(priority);
+          Preconditions.checkState(tasksAtPriority != null,
+              "runningTasks should contain an entry if the task was in running 
state. Caused by task: {}", task);
+          tasksAtPriority.remove(taskInfo);
+          if (tasksAtPriority.isEmpty()) {
+            runningTasks.remove(priority);
+          }
+        }
+      } else {
+        LOG.warn("Could not find TaskInfo for task: {}. Not removing it from 
the running set", task);
+      }
+      return taskInfo;
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
   @VisibleForTesting
   protected void schedulePendingTasks() {
     writeLock.lock();
@@ -674,6 +743,13 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
           if (scheduled) {
             taskIter.remove();
           } else {
+            // Try pre-empting a task so that a higher priority task can take 
it's place.
+            // Preempt only if there's not pending preemptions to avoid 
preempting twice for a task.
+            LOG.info("Attempting to preempt for {}, pendingPreemptions={}", 
taskInfo.task, pendingPreemptions.get());
+            if (pendingPreemptions.get() == 0) {
+              preemptTasks(entry.getKey().getPriority(), 1);
+            }
+
             scheduledAllAtPriority = false;
             // Don't try assigning tasks at the next priority.
             break;
@@ -705,10 +781,11 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
               nsPair.getServiceInstance().getRpcPort());
       writeLock.lock(); // While updating local structures
       try {
+        LOG.info("Assigned task {} to container {}", taskInfo, 
container.getId());
         dagStats.registerTaskAllocated(taskInfo.requestedHosts, 
taskInfo.requestedRacks,
             nsPair.getServiceInstance().getHost());
-        taskInfo.setAssignmentInfo(nsPair.getServiceInstance(), 
container.getId());
-        knownTasks.putIfAbsent(taskInfo.task, taskInfo);
+        taskInfo.setAssignmentInfo(nsPair.getServiceInstance(), 
container.getId(), clock.getTime());
+        registerRunningTask(taskInfo);
         nsPair.getNodeInfo().registerTaskScheduled();
       } finally {
         writeLock.unlock();
@@ -719,6 +796,59 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
     }
   }
 
+  // Removes tasks from the runningList and sends out a preempt request to the 
system.
+  // Subsequent tasks will be scheduled again once the de-allocate request for 
the preempted
+  // task is processed.
+  private void preemptTasks(int forPriority, int numTasksToPreempt) {
+    writeLock.lock();
+    List<TaskInfo> preemptedTaskList = null;
+    try {
+      NavigableMap<Integer, TreeSet<TaskInfo>> orderedMap = 
runningTasks.descendingMap();
+      Iterator<Entry<Integer, TreeSet<TaskInfo>>> iterator = 
orderedMap.entrySet().iterator();
+      int preemptedCount = 0;
+      while (iterator.hasNext() && preemptedCount < numTasksToPreempt) {
+        Entry<Integer, TreeSet<TaskInfo>> entryAtPriority = iterator.next();
+        if (entryAtPriority.getKey() > forPriority) {
+          Iterator<TaskInfo> taskInfoIterator = 
entryAtPriority.getValue().iterator();
+          while (taskInfoIterator.hasNext() && preemptedCount < 
numTasksToPreempt) {
+            TaskInfo taskInfo = taskInfoIterator.next();
+            preemptedCount++;
+            LOG.info("preempting {} for task at priority {}", taskInfo, 
forPriority);
+            taskInfo.setPreemptedInfo(clock.getTime());
+            if (preemptedTaskList == null) {
+              preemptedTaskList = new LinkedList<>();
+            }
+            
dagStats.registerTaskPreempted(taskInfo.assignedInstance.getHost());
+            preemptedTaskList.add(taskInfo);
+            pendingPreemptions.incrementAndGet();
+            // Remove from the runningTaskList
+            taskInfoIterator.remove();
+          }
+
+          // Remove entire priority level if it's been emptied.
+          if (entryAtPriority.getValue().isEmpty()) {
+            iterator.remove();
+          }
+        } else {
+          // No tasks qualify as preemptable
+          LOG.info("DBG: No tasks qualify as killable to schedule tasks at 
priority {}", forPriority);
+          break;
+        }
+      }
+    } finally {
+      writeLock.unlock();
+    }
+    // Send out the preempted request outside of the lock.
+    if (preemptedTaskList != null) {
+      for (TaskInfo taskInfo : preemptedTaskList) {
+        LOG.info("DBG: Preempting task {}", taskInfo);
+        appClientDelegate.preemptContainer(taskInfo.containerId);
+      }
+    }
+    // The schedule loop will be triggered again when the deallocateTask 
request comes in for the
+    // preempted task.
+  }
+
   private class NodeEnablerCallable implements Callable<Void> {
 
     private AtomicBoolean isShutdown = new AtomicBoolean(false);
@@ -832,6 +962,7 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
     // Indicates whether a node is disabled - for whatever reason - 
commFailure, busy, etc.
     private boolean disabled = false;
 
+    private int numPreemptedTasks = 0;
     private int numScheduledTasks = 0;
     private final int numSchedulableTasks;
 
@@ -917,8 +1048,11 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
       numScheduledTasks--;
     }
 
-    void registerUnsuccessfulTaskEnd() {
+    void registerUnsuccessfulTaskEnd(boolean wasPreempted) {
       numScheduledTasks--;
+      if (wasPreempted) {
+        numPreemptedTasks++;
+      }
     }
 
     public boolean isDisabled() {
@@ -980,12 +1114,14 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
     int numRejectedTasks = 0;
     int numCommFailures = 0;
     int numDelayedAllocations = 0;
+    int numPreemptedTasks = 0;
     Map<String, AtomicInteger> localityBasedNumAllocationsPerHost = new 
HashMap<>();
     Map<String, AtomicInteger> numAllocationsPerHost = new HashMap<>();
 
     @Override
     public String toString() {
       StringBuilder sb = new StringBuilder();
+      sb.append("NumPreemptedTasks=").append(numPreemptedTasks).append(", ");
       
sb.append("NumRequestedAllocations=").append(numRequestedAllocations).append(", 
");
       
sb.append("NumRequestsWithlocation=").append(numRequestsWithLocation).append(", 
");
       
sb.append("NumLocalAllocations=").append(numLocalAllocations).append(",");
@@ -1027,6 +1163,10 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
       _registerAllocationInHostMap(allocatedHost, numAllocationsPerHost);
     }
 
+    void registerTaskPreempted(String host) {
+      numPreemptedTasks++;
+    }
+
     void registerCommFailure(String host) {
       numCommFailures++;
     }
@@ -1050,6 +1190,10 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
   }
 
   private static class TaskInfo {
+    // IDs used to ensure two TaskInfos are different without using the 
underlying task instance.
+    // Required for insertion into a TreeMap
+    static final AtomicLong ID_GEN = new AtomicLong(0);
+    final long uniqueId;
     final Object task;
     final Object clientCookie;
     final Priority priority;
@@ -1057,11 +1201,17 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
     final String[] requestedHosts;
     final String[] requestedRacks;
     final long requestTime;
+    long startTime;
+    long preemptTime;
     ContainerId containerId;
     ServiceInstance assignedInstance;
     private boolean assigned = false;
+    private boolean preempted = false;
+
     private int numAssignAttempts = 0;
 
+    // TaskInfo instances for two different tasks will not be the same. Only a 
single instance should
+    // ever be created for a taskAttempt
     public TaskInfo(Object task, Object clientCookie, Priority priority, 
Resource capability,
         String[] hosts, String[] racks, long requestTime) {
       this.task = task;
@@ -1071,14 +1221,22 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
       this.requestedHosts = hosts;
       this.requestedRacks = racks;
       this.requestTime = requestTime;
+      this.uniqueId = ID_GEN.getAndIncrement();
     }
 
-    void setAssignmentInfo(ServiceInstance instance, ContainerId containerId) {
+    void setAssignmentInfo(ServiceInstance instance, ContainerId containerId, 
long startTime) {
       this.assignedInstance = instance;
-      this.containerId = containerId;
+        this.containerId = containerId;
+      this.startTime = startTime;
       assigned = true;
     }
 
+    void setPreemptedInfo(long preemptTime) {
+      this.preempted = true;
+      this.assigned = false;
+      this.preemptTime = preemptTime;
+    }
+
     void triedAssigningTask() {
       numAssignAttempts++;
     }
@@ -1086,6 +1244,66 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
     int getNumPreviousAssignAttempts() {
       return numAssignAttempts;
     }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      TaskInfo taskInfo = (TaskInfo) o;
+
+      if (uniqueId != taskInfo.uniqueId) {
+        return false;
+      }
+      return task.equals(taskInfo.task);
+
+    }
+
+    @Override
+    public int hashCode() {
+      int result = (int) (uniqueId ^ (uniqueId >>> 32));
+      result = 31 * result + task.hashCode();
+      return result;
+    }
+
+    @Override
+    public String toString() {
+      return "TaskInfo{" +
+          "task=" + task +
+          ", priority=" + priority +
+          ", startTime=" + startTime +
+          ", containerId=" + containerId +
+          ", assignedInstance=" + assignedInstance +
+          ", uniqueId=" + uniqueId +
+          '}';
+    }
+  }
+
+  // Newer tasks first.
+  private static class TaskStartComparator implements Comparator<TaskInfo> {
+
+    @Override
+    public int compare(TaskInfo o1, TaskInfo o2) {
+      if (o1.startTime > o2.startTime) {
+        return -1;
+      } else if (o1.startTime < o2.startTime) {
+        return 1;
+      } else {
+        // Comparing on time is not sufficient since two may be created at the 
same time,
+        // in which case inserting into a TreeSet/Map would break
+        if (o1.uniqueId > o2.uniqueId) {
+          return -1;
+        } else if (o1.uniqueId < o2.uniqueId) {
+          return 1;
+        } else {
+          return 0;
+        }
+      }
+    }
   }
 
   private static class NodeServiceInstancePair {

http://git-wip-us.apache.org/repos/asf/hive/blob/b8b94f29/llap-server/src/protobuf/LlapDaemonProtocol.proto
----------------------------------------------------------------------
diff --git a/llap-server/src/protobuf/LlapDaemonProtocol.proto 
b/llap-server/src/protobuf/LlapDaemonProtocol.proto
index d8fd882..0ba6acf 100644
--- a/llap-server/src/protobuf/LlapDaemonProtocol.proto
+++ b/llap-server/src/protobuf/LlapDaemonProtocol.proto
@@ -48,7 +48,7 @@ message GroupInputSpecProto {
 
 
 message FragmentSpecProto {
-  optional string task_attempt_id_string = 1;
+  optional string fragment_identifier_string = 1;
   optional string dag_name = 2;
   optional string vertex_name = 3;
   optional EntityDescriptorProto processor_descriptor = 4;
@@ -111,10 +111,7 @@ message QueryCompleteResponseProto {
 message TerminateFragmentRequestProto {
   optional string query_id = 1;
   optional string dag_name = 2;
-  optional int32 dag_attempt_number = 3;
-  optional string vertex_name = 4;
-  optional int32 fragment_number = 5;
-  optional int32 attempt_number = 6;
+  optional string fragment_identifier_string = 7;
 }
 
 message TerminateFragmentResponseProto {

http://git-wip-us.apache.org/repos/asf/hive/blob/b8b94f29/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
index a2e9501..6b6fac0 100644
--- 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
+++ 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
@@ -27,6 +27,7 @@ import java.util.concurrent.PriorityBlockingQueue;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
 import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
+import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorService.TaskWrapper;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
@@ -101,7 +102,7 @@ public class TestTaskExecutorService {
                 .setVertexParallelism(parallelism)
                 .setProcessorDescriptor(
                     
EntityDescriptorProto.newBuilder().setClassName("MockProcessor").build())
-                
.setTaskAttemptIdString(taId.toString()).build()).setAmHost("localhost")
+                
.setFragmentIdentifierString(taId.toString()).build()).setAmHost("localhost")
         
.setAmPort(12345).setAppAttemptNumber(0).setApplicationIdString("MockApp_1")
         .setContainerIdString("MockContainer_1").setUser("MockUser")
         .setTokenIdentifier("MockToken_1")
@@ -115,12 +116,12 @@ public class TestTaskExecutorService {
   
   @Test
   public void testWaitQueueComparator() throws InterruptedException {
-    MockRequest r1 = new MockRequest(createRequest(1, 2, 100), false, 100000);
-    MockRequest r2 = new MockRequest(createRequest(2, 4, 200), false, 100000);
-    MockRequest r3 = new MockRequest(createRequest(3, 6, 300), false, 1000000);
-    MockRequest r4 = new MockRequest(createRequest(4, 8, 400), false, 1000000);
-    MockRequest r5 = new MockRequest(createRequest(5, 10, 500), false, 
1000000);
-    EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue(
+    TaskWrapper r1 = createTaskWrapper(createRequest(1, 2, 100), false, 
100000);
+    TaskWrapper r2 = createTaskWrapper(createRequest(2, 4, 200), false, 
100000);
+    TaskWrapper r3 = createTaskWrapper(createRequest(3, 6, 300), false, 
1000000);
+    TaskWrapper r4 = createTaskWrapper(createRequest(4, 8, 400), false, 
1000000);
+    TaskWrapper r5 = createTaskWrapper(createRequest(5, 10, 500), false, 
1000000);
+    EvictingPriorityBlockingQueue<TaskWrapper> queue = new 
EvictingPriorityBlockingQueue<>(
         new TaskExecutorService.WaitQueueComparator(), 4);
     assertNull(queue.offer(r1));
     assertEquals(r1, queue.peek());
@@ -137,11 +138,11 @@ public class TestTaskExecutorService {
     assertEquals(r3, queue.take());
     assertEquals(r4, queue.take());
 
-    r1 = new MockRequest(createRequest(1, 2, 100), true, 100000);
-    r2 = new MockRequest(createRequest(2, 4, 200), true, 100000);
-    r3 = new MockRequest(createRequest(3, 6, 300), true, 1000000);
-    r4 = new MockRequest(createRequest(4, 8, 400), true, 1000000);
-    r5 = new MockRequest(createRequest(5, 10, 500), true, 1000000);
+    r1 = createTaskWrapper(createRequest(1, 2, 100), true, 100000);
+    r2 = createTaskWrapper(createRequest(2, 4, 200), true, 100000);
+    r3 = createTaskWrapper(createRequest(3, 6, 300), true, 1000000);
+    r4 = createTaskWrapper(createRequest(4, 8, 400), true, 1000000);
+    r5 = createTaskWrapper(createRequest(5, 10, 500), true, 1000000);
     queue = new EvictingPriorityBlockingQueue(
         new TaskExecutorService.WaitQueueComparator(), 4);
     assertNull(queue.offer(r1));
@@ -159,11 +160,11 @@ public class TestTaskExecutorService {
     assertEquals(r3, queue.take());
     assertEquals(r4, queue.take());
 
-    r1 = new MockRequest(createRequest(1, 1, 100), true, 100000);
-    r2 = new MockRequest(createRequest(2, 1, 200), false, 100000);
-    r3 = new MockRequest(createRequest(3, 1, 300), true, 1000000);
-    r4 = new MockRequest(createRequest(4, 1, 400), false, 1000000);
-    r5 = new MockRequest(createRequest(5, 10, 500), true, 1000000);
+    r1 = createTaskWrapper(createRequest(1, 1, 100), true, 100000);
+    r2 = createTaskWrapper(createRequest(2, 1, 200), false, 100000);
+    r3 = createTaskWrapper(createRequest(3, 1, 300), true, 1000000);
+    r4 = createTaskWrapper(createRequest(4, 1, 400), false, 1000000);
+    r5 = createTaskWrapper(createRequest(5, 10, 500), true, 1000000);
     queue = new EvictingPriorityBlockingQueue(
         new TaskExecutorService.WaitQueueComparator(), 4);
     assertNull(queue.offer(r1));
@@ -181,11 +182,11 @@ public class TestTaskExecutorService {
     assertEquals(r5, queue.take());
     assertEquals(r2, queue.take());
 
-    r1 = new MockRequest(createRequest(1, 2, 100), true, 100000);
-    r2 = new MockRequest(createRequest(2, 4, 200), false, 100000);
-    r3 = new MockRequest(createRequest(3, 6, 300), true, 1000000);
-    r4 = new MockRequest(createRequest(4, 8, 400), false, 1000000);
-    r5 = new MockRequest(createRequest(5, 10, 500), true, 1000000);
+    r1 = createTaskWrapper(createRequest(1, 2, 100), true, 100000);
+    r2 = createTaskWrapper(createRequest(2, 4, 200), false, 100000);
+    r3 = createTaskWrapper(createRequest(3, 6, 300), true, 1000000);
+    r4 = createTaskWrapper(createRequest(4, 8, 400), false, 1000000);
+    r5 = createTaskWrapper(createRequest(5, 10, 500), true, 1000000);
     queue = new EvictingPriorityBlockingQueue(
         new TaskExecutorService.WaitQueueComparator(), 4);
     assertNull(queue.offer(r1));
@@ -203,11 +204,11 @@ public class TestTaskExecutorService {
     assertEquals(r5, queue.take());
     assertEquals(r2, queue.take());
 
-    r1 = new MockRequest(createRequest(1, 2, 100), true, 100000);
-    r2 = new MockRequest(createRequest(2, 4, 200), false, 100000);
-    r3 = new MockRequest(createRequest(3, 6, 300), false, 1000000);
-    r4 = new MockRequest(createRequest(4, 8, 400), false, 1000000);
-    r5 = new MockRequest(createRequest(5, 10, 500), true, 1000000);
+    r1 = createTaskWrapper(createRequest(1, 2, 100), true, 100000);
+    r2 = createTaskWrapper(createRequest(2, 4, 200), false, 100000);
+    r3 = createTaskWrapper(createRequest(3, 6, 300), false, 1000000);
+    r4 = createTaskWrapper(createRequest(4, 8, 400), false, 1000000);
+    r5 = createTaskWrapper(createRequest(5, 10, 500), true, 1000000);
     queue = new EvictingPriorityBlockingQueue(
         new TaskExecutorService.WaitQueueComparator(), 4);
     assertNull(queue.offer(r1));
@@ -225,11 +226,11 @@ public class TestTaskExecutorService {
     assertEquals(r2, queue.take());
     assertEquals(r3, queue.take());
 
-    r1 = new MockRequest(createRequest(1, 2, 100), false, 100000);
-    r2 = new MockRequest(createRequest(2, 4, 200), true, 100000);
-    r3 = new MockRequest(createRequest(3, 6, 300), true, 1000000);
-    r4 = new MockRequest(createRequest(4, 8, 400), true, 1000000);
-    r5 = new MockRequest(createRequest(5, 10, 500), true, 1000000);
+    r1 = createTaskWrapper(createRequest(1, 2, 100), false, 100000);
+    r2 = createTaskWrapper(createRequest(2, 4, 200), true, 100000);
+    r3 = createTaskWrapper(createRequest(3, 6, 300), true, 1000000);
+    r4 = createTaskWrapper(createRequest(4, 8, 400), true, 1000000);
+    r5 = createTaskWrapper(createRequest(5, 10, 500), true, 1000000);
     queue = new EvictingPriorityBlockingQueue(
         new TaskExecutorService.WaitQueueComparator(), 4);
     assertNull(queue.offer(r1));
@@ -250,12 +251,13 @@ public class TestTaskExecutorService {
 
   @Test
   public void testPreemptionQueueComparator() throws InterruptedException {
-    MockRequest r1 = new MockRequest(createRequest(1, 2, 100), false, 100000);
-    MockRequest r2 = new MockRequest(createRequest(2, 4, 200), false, 100000);
-    MockRequest r3 = new MockRequest(createRequest(3, 6, 300), false, 1000000);
-    MockRequest r4 = new MockRequest(createRequest(4, 8, 400), false, 1000000);
-    BlockingQueue queue = new PriorityBlockingQueue(4,
+    TaskWrapper r1 = createTaskWrapper(createRequest(1, 2, 100), false, 
100000);
+    TaskWrapper r2 = createTaskWrapper(createRequest(2, 4, 200), false, 
100000);
+    TaskWrapper r3 = createTaskWrapper(createRequest(3, 6, 300), false, 
1000000);
+    TaskWrapper r4 = createTaskWrapper(createRequest(4, 8, 400), false, 
1000000);
+    BlockingQueue<TaskWrapper> queue = new PriorityBlockingQueue<>(4,
         new TaskExecutorService.PreemptionQueueComparator());
+
     queue.offer(r1);
     assertEquals(r1, queue.peek());
     queue.offer(r2);
@@ -268,4 +270,10 @@ public class TestTaskExecutorService {
     assertEquals(r3, queue.take());
     assertEquals(r4, queue.take());
   }
+
+  private TaskWrapper createTaskWrapper(SubmitWorkRequestProto request, 
boolean canFinish, int workTime) {
+    MockRequest mockRequest = new MockRequest(request, canFinish, workTime);
+    TaskWrapper taskWrapper = new TaskWrapper(mockRequest, null);
+    return taskWrapper;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/b8b94f29/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
 
b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
index a0399da..b9b89e3 100644
--- 
a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
+++ 
b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
@@ -37,6 +37,7 @@ import 
org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.util.SystemClock;
@@ -49,6 +50,8 @@ import org.mockito.ArgumentCaptor;
 
 public class TestLlapTaskSchedulerService {
 
+  // TODO Fix the races and the broken scheduler control in the tests
+
   private static final String HOST1 = "host1";
   private static final String HOST2 = "host2";
   private static final String HOST3 = "host3";
@@ -94,6 +97,63 @@ public class TestLlapTaskSchedulerService {
     }
   }
 
+  // TODO Add a test to ensure the correct task is being preempted, and the 
completion for the specific
+  // task triggers the next task to be scheduled.
+
+  @Test(timeout=5000)
+  public void testPreemption() throws InterruptedException {
+
+    Priority priority1 = Priority.newInstance(1);
+    Priority priority2 = Priority.newInstance(2);
+    String [] hosts = new String[] {HOST1};
+    TestTaskSchedulerServiceWrapper tsWrapper = new 
TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1);
+    try {
+
+      Object task1 = new String("task1");
+      Object clientCookie1 = new String("cookie1");
+      Object task2 = new String("task2");
+      Object clientCookie2 = new String("cookie1");
+      Object task3 = new String("task3");
+      Object clientCookie3 = new String("cookie1");
+      Object task4 = new String("task4");
+      Object clientCookie4 = new String("cookie1");
+
+      tsWrapper.controlScheduler(true);
+      int schedulerRunNumber = tsWrapper.getSchedulerRunNumber();
+      tsWrapper.allocateTask(task1, hosts, priority2, clientCookie1);
+      tsWrapper.allocateTask(task2, hosts, priority2, clientCookie2);
+      tsWrapper.allocateTask(task3, hosts, priority2, clientCookie3);
+      tsWrapper.signalScheduler();
+      tsWrapper.controlScheduler(false);
+      tsWrapper.awaitSchedulerRunNumber(schedulerRunNumber + 1);
+      verify(tsWrapper.mockAppCallback, 
times(2)).taskAllocated(any(Object.class),
+          any(Object.class), any(Container.class));
+      assertEquals(2, tsWrapper.ts.dagStats.numLocalAllocations);
+      assertEquals(0, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest);
+
+      reset(tsWrapper.mockAppCallback);
+
+      tsWrapper.controlScheduler(true);
+      schedulerRunNumber = tsWrapper.getSchedulerRunNumber();
+      tsWrapper.allocateTask(task4, hosts, priority1, clientCookie4);
+      tsWrapper.controlScheduler(false);
+      tsWrapper.awaitSchedulerRunNumber(schedulerRunNumber + 1);
+      
verify(tsWrapper.mockAppCallback).preemptContainer(any(ContainerId.class));
+
+      schedulerRunNumber = tsWrapper.getSchedulerRunNumber();
+      tsWrapper.deallocateTask(task2, false, 
TaskAttemptEndReason.INTERNAL_PREEMPTION);
+      tsWrapper.signalScheduler();
+      Thread.sleep(2000l);
+
+      verify(tsWrapper.mockAppCallback, times(1)).taskAllocated(eq(task4),
+          eq(clientCookie4), any(Container.class));
+
+    } finally {
+      tsWrapper.shutdown();
+    }
+
+  }
+
   @Test(timeout=5000)
   public void testNodeDisabled() {
     TestTaskSchedulerServiceWrapper tsWrapper = new 
TestTaskSchedulerServiceWrapper(10000l);
@@ -225,9 +285,15 @@ public class TestLlapTaskSchedulerService {
     }
 
     TestTaskSchedulerServiceWrapper(long disableTimeoutMillis) {
+      this(2000l, new String[]{HOST1, HOST2, HOST3}, 4,
+          
LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE_DEFAULT);
+    }
+
+    TestTaskSchedulerServiceWrapper(long disableTimeoutMillis, String[] hosts, 
int numExecutors, int waitQueueSize) {
       conf = new Configuration();
-      conf.setStrings(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS, HOST1, 
HOST2, HOST3);
-      conf.setInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS, 4);
+      conf.setStrings(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS, hosts);
+      conf.setInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS, numExecutors);
+      
conf.setInt(LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE, 
waitQueueSize);
       
conf.setLong(LlapConfiguration.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MILLIS,
           disableTimeoutMillis);
       
conf.setBoolean(LlapTaskSchedulerServiceForTest.LLAP_TASK_SCHEDULER_IN_TEST, 
true);
@@ -270,6 +336,10 @@ public class TestLlapTaskSchedulerService {
       ts.allocateTask(task, resource, hosts, null, priority, null, 
clientCookie);
     }
 
+    void deallocateTask(Object task, boolean succeeded, TaskAttemptEndReason 
endReason) {
+      ts.deallocateTask(task, succeeded, endReason);
+    }
+
     void rejectExecution(Object task) {
       ts.deallocateTask(task, false, TaskAttemptEndReason.SERVICE_BUSY);
     }

Reply via email to