Repository: hive Updated Branches: refs/heads/llap 2024c962d -> b8b94f297
http://git-wip-us.apache.org/repos/asf/hive/blob/b8b94f29/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java index d355029..5a2b77d 100644 --- a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java +++ b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java @@ -26,9 +26,11 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.NavigableMap; import java.util.Random; import java.util.Set; import java.util.TreeMap; +import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -39,6 +41,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -96,6 +99,8 @@ public class LlapTaskSchedulerService extends TaskSchedulerService { // Tracks running and queued tasks. Cleared after a task completes. private final ConcurrentMap<Object, TaskInfo> knownTasks = new ConcurrentHashMap<>(); + private final TreeMap<Integer, TreeSet<TaskInfo>> runningTasks = new TreeMap<>(); + private static final TaskStartComparator TASK_INFO_COMPARATOR = new TaskStartComparator(); // Queue for disabled nodes. Nodes make it out of this queue when their expiration timeout is hit. @VisibleForTesting @@ -119,6 +124,7 @@ public class LlapTaskSchedulerService extends TaskSchedulerService { private final SchedulerCallable schedulerCallable = new SchedulerCallable(); private final AtomicBoolean isStopped = new AtomicBoolean(false); + private final AtomicInteger pendingPreemptions = new AtomicInteger(0); private final NodeBlacklistConf nodeBlacklistConf; @@ -134,7 +140,6 @@ public class LlapTaskSchedulerService extends TaskSchedulerService { private final LlapRegistryService registry = new LlapRegistryService(false); - private volatile ListenableFuture<Void> nodeEnablerFuture; private volatile ListenableFuture<Void> schedulerFuture; @@ -385,6 +390,7 @@ public class LlapTaskSchedulerService extends TaskSchedulerService { trySchedulingPendingTasks(); } + // This may be invoked before a container is ever assigned to a task. allocateTask... app decides // the task is no longer required, and asks for a de-allocation. @Override @@ -392,7 +398,7 @@ public class LlapTaskSchedulerService extends TaskSchedulerService { writeLock.lock(); // Updating several local structures TaskInfo taskInfo; try { - taskInfo = knownTasks.remove(task); + taskInfo = unregisterTask(task); if (taskInfo == null) { LOG.error("Could not determine ContainerId for task: " + task @@ -418,12 +424,12 @@ public class LlapTaskSchedulerService extends TaskSchedulerService { NodeInfo nodeInfo = instanceToNodeMap.get(assignedInstance); assert nodeInfo != null; - if (taskSucceeded) { - // The node may have been blacklisted at this point - which means it may not be in the - // activeNodeList. - - nodeInfo.registerTaskSuccess(); + // Re-enable the node if preempted + if (taskInfo.preempted) { + LOG.info("Processing deallocateTask for {} which was preempted, EndReason={}", task, endReason); + pendingPreemptions.decrementAndGet(); + nodeInfo.registerUnsuccessfulTaskEnd(true); if (nodeInfo.isDisabled()) { // Re-enable the node. If a task succeeded, a slot may have become available. // Also reset commFailures since a task was able to communicate back and indicate success. @@ -435,20 +441,40 @@ public class LlapTaskSchedulerService extends TaskSchedulerService { } // In case of success, trigger a scheduling run for pending tasks. trySchedulingPendingTasks(); - - } else if (!taskSucceeded) { - nodeInfo.registerUnsuccessfulTaskEnd(); - if (endReason != null && EnumSet - .of(TaskAttemptEndReason.SERVICE_BUSY, TaskAttemptEndReason.COMMUNICATION_ERROR) - .contains(endReason)) { - if (endReason == TaskAttemptEndReason.COMMUNICATION_ERROR) { - dagStats.registerCommFailure(taskInfo.assignedInstance.getHost()); - } else if (endReason == TaskAttemptEndReason.SERVICE_BUSY) { - dagStats.registerTaskRejected(taskInfo.assignedInstance.getHost()); + } else { + if (taskSucceeded) { + // The node may have been blacklisted at this point - which means it may not be in the + // activeNodeList. + + nodeInfo.registerTaskSuccess(); + + if (nodeInfo.isDisabled()) { + // Re-enable the node. If a task succeeded, a slot may have become available. + // Also reset commFailures since a task was able to communicate back and indicate success. + nodeInfo.enableNode(); + // Re-insert into the queue to force the poll thread to remove the element. + if (disabledNodesQueue.remove(nodeInfo)) { + disabledNodesQueue.add(nodeInfo); + } } + // In case of success, trigger a scheduling run for pending tasks. + trySchedulingPendingTasks(); + + } else if (!taskSucceeded) { + nodeInfo.registerUnsuccessfulTaskEnd(false); + if (endReason != null && EnumSet + .of(TaskAttemptEndReason.SERVICE_BUSY, TaskAttemptEndReason.COMMUNICATION_ERROR) + .contains(endReason)) { + if (endReason == TaskAttemptEndReason.COMMUNICATION_ERROR) { + dagStats.registerCommFailure(taskInfo.assignedInstance.getHost()); + } else if (endReason == TaskAttemptEndReason.SERVICE_BUSY) { + dagStats.registerTaskRejected(taskInfo.assignedInstance.getHost()); + } + } + boolean commFailure = + endReason != null && endReason == TaskAttemptEndReason.COMMUNICATION_ERROR; + disableInstance(assignedInstance, commFailure); } - boolean commFailure = endReason != null && endReason == TaskAttemptEndReason.COMMUNICATION_ERROR; - disableInstance(assignedInstance, commFailure); } } finally { writeLock.unlock(); @@ -461,7 +487,7 @@ public class LlapTaskSchedulerService extends TaskSchedulerService { public Object deallocateContainer(ContainerId containerId) { LOG.info("DEBUG: Ignoring deallocateContainer for containerId: " + containerId); // Containers are not being tracked for re-use. - // This is safe to ignore since a deallocate task should have come in earlier. + // This is safe to ignore since a deallocate task will come in. return null; } @@ -635,6 +661,7 @@ public class LlapTaskSchedulerService extends TaskSchedulerService { } } + /* Remove a task from the pending list */ private void removePendingTask(TaskInfo taskInfo) { writeLock.lock(); try { @@ -649,6 +676,48 @@ public class LlapTaskSchedulerService extends TaskSchedulerService { } } + /* Register a running task into the runningTasks structure */ + private void registerRunningTask(TaskInfo taskInfo) { + writeLock.lock(); + try { + int priority = taskInfo.priority.getPriority(); + TreeSet<TaskInfo> tasksAtpriority = runningTasks.get(priority); + if (tasksAtpriority == null) { + tasksAtpriority = new TreeSet<>(TASK_INFO_COMPARATOR); + runningTasks.put(priority, tasksAtpriority); + } + tasksAtpriority.add(taskInfo); + } finally { + writeLock.unlock(); + } + } + + /* Unregister a task from the known and running structures */ + private TaskInfo unregisterTask(Object task) { + writeLock.lock(); + try { + TaskInfo taskInfo = knownTasks.remove(task); + if (taskInfo != null) { + if (taskInfo.assigned) { + // Remove from the running list. + int priority = taskInfo.priority.getPriority(); + Set<TaskInfo> tasksAtPriority = runningTasks.get(priority); + Preconditions.checkState(tasksAtPriority != null, + "runningTasks should contain an entry if the task was in running state. Caused by task: {}", task); + tasksAtPriority.remove(taskInfo); + if (tasksAtPriority.isEmpty()) { + runningTasks.remove(priority); + } + } + } else { + LOG.warn("Could not find TaskInfo for task: {}. Not removing it from the running set", task); + } + return taskInfo; + } finally { + writeLock.unlock(); + } + } + @VisibleForTesting protected void schedulePendingTasks() { writeLock.lock(); @@ -674,6 +743,13 @@ public class LlapTaskSchedulerService extends TaskSchedulerService { if (scheduled) { taskIter.remove(); } else { + // Try pre-empting a task so that a higher priority task can take it's place. + // Preempt only if there's not pending preemptions to avoid preempting twice for a task. + LOG.info("Attempting to preempt for {}, pendingPreemptions={}", taskInfo.task, pendingPreemptions.get()); + if (pendingPreemptions.get() == 0) { + preemptTasks(entry.getKey().getPriority(), 1); + } + scheduledAllAtPriority = false; // Don't try assigning tasks at the next priority. break; @@ -705,10 +781,11 @@ public class LlapTaskSchedulerService extends TaskSchedulerService { nsPair.getServiceInstance().getRpcPort()); writeLock.lock(); // While updating local structures try { + LOG.info("Assigned task {} to container {}", taskInfo, container.getId()); dagStats.registerTaskAllocated(taskInfo.requestedHosts, taskInfo.requestedRacks, nsPair.getServiceInstance().getHost()); - taskInfo.setAssignmentInfo(nsPair.getServiceInstance(), container.getId()); - knownTasks.putIfAbsent(taskInfo.task, taskInfo); + taskInfo.setAssignmentInfo(nsPair.getServiceInstance(), container.getId(), clock.getTime()); + registerRunningTask(taskInfo); nsPair.getNodeInfo().registerTaskScheduled(); } finally { writeLock.unlock(); @@ -719,6 +796,59 @@ public class LlapTaskSchedulerService extends TaskSchedulerService { } } + // Removes tasks from the runningList and sends out a preempt request to the system. + // Subsequent tasks will be scheduled again once the de-allocate request for the preempted + // task is processed. + private void preemptTasks(int forPriority, int numTasksToPreempt) { + writeLock.lock(); + List<TaskInfo> preemptedTaskList = null; + try { + NavigableMap<Integer, TreeSet<TaskInfo>> orderedMap = runningTasks.descendingMap(); + Iterator<Entry<Integer, TreeSet<TaskInfo>>> iterator = orderedMap.entrySet().iterator(); + int preemptedCount = 0; + while (iterator.hasNext() && preemptedCount < numTasksToPreempt) { + Entry<Integer, TreeSet<TaskInfo>> entryAtPriority = iterator.next(); + if (entryAtPriority.getKey() > forPriority) { + Iterator<TaskInfo> taskInfoIterator = entryAtPriority.getValue().iterator(); + while (taskInfoIterator.hasNext() && preemptedCount < numTasksToPreempt) { + TaskInfo taskInfo = taskInfoIterator.next(); + preemptedCount++; + LOG.info("preempting {} for task at priority {}", taskInfo, forPriority); + taskInfo.setPreemptedInfo(clock.getTime()); + if (preemptedTaskList == null) { + preemptedTaskList = new LinkedList<>(); + } + dagStats.registerTaskPreempted(taskInfo.assignedInstance.getHost()); + preemptedTaskList.add(taskInfo); + pendingPreemptions.incrementAndGet(); + // Remove from the runningTaskList + taskInfoIterator.remove(); + } + + // Remove entire priority level if it's been emptied. + if (entryAtPriority.getValue().isEmpty()) { + iterator.remove(); + } + } else { + // No tasks qualify as preemptable + LOG.info("DBG: No tasks qualify as killable to schedule tasks at priority {}", forPriority); + break; + } + } + } finally { + writeLock.unlock(); + } + // Send out the preempted request outside of the lock. + if (preemptedTaskList != null) { + for (TaskInfo taskInfo : preemptedTaskList) { + LOG.info("DBG: Preempting task {}", taskInfo); + appClientDelegate.preemptContainer(taskInfo.containerId); + } + } + // The schedule loop will be triggered again when the deallocateTask request comes in for the + // preempted task. + } + private class NodeEnablerCallable implements Callable<Void> { private AtomicBoolean isShutdown = new AtomicBoolean(false); @@ -832,6 +962,7 @@ public class LlapTaskSchedulerService extends TaskSchedulerService { // Indicates whether a node is disabled - for whatever reason - commFailure, busy, etc. private boolean disabled = false; + private int numPreemptedTasks = 0; private int numScheduledTasks = 0; private final int numSchedulableTasks; @@ -917,8 +1048,11 @@ public class LlapTaskSchedulerService extends TaskSchedulerService { numScheduledTasks--; } - void registerUnsuccessfulTaskEnd() { + void registerUnsuccessfulTaskEnd(boolean wasPreempted) { numScheduledTasks--; + if (wasPreempted) { + numPreemptedTasks++; + } } public boolean isDisabled() { @@ -980,12 +1114,14 @@ public class LlapTaskSchedulerService extends TaskSchedulerService { int numRejectedTasks = 0; int numCommFailures = 0; int numDelayedAllocations = 0; + int numPreemptedTasks = 0; Map<String, AtomicInteger> localityBasedNumAllocationsPerHost = new HashMap<>(); Map<String, AtomicInteger> numAllocationsPerHost = new HashMap<>(); @Override public String toString() { StringBuilder sb = new StringBuilder(); + sb.append("NumPreemptedTasks=").append(numPreemptedTasks).append(", "); sb.append("NumRequestedAllocations=").append(numRequestedAllocations).append(", "); sb.append("NumRequestsWithlocation=").append(numRequestsWithLocation).append(", "); sb.append("NumLocalAllocations=").append(numLocalAllocations).append(","); @@ -1027,6 +1163,10 @@ public class LlapTaskSchedulerService extends TaskSchedulerService { _registerAllocationInHostMap(allocatedHost, numAllocationsPerHost); } + void registerTaskPreempted(String host) { + numPreemptedTasks++; + } + void registerCommFailure(String host) { numCommFailures++; } @@ -1050,6 +1190,10 @@ public class LlapTaskSchedulerService extends TaskSchedulerService { } private static class TaskInfo { + // IDs used to ensure two TaskInfos are different without using the underlying task instance. + // Required for insertion into a TreeMap + static final AtomicLong ID_GEN = new AtomicLong(0); + final long uniqueId; final Object task; final Object clientCookie; final Priority priority; @@ -1057,11 +1201,17 @@ public class LlapTaskSchedulerService extends TaskSchedulerService { final String[] requestedHosts; final String[] requestedRacks; final long requestTime; + long startTime; + long preemptTime; ContainerId containerId; ServiceInstance assignedInstance; private boolean assigned = false; + private boolean preempted = false; + private int numAssignAttempts = 0; + // TaskInfo instances for two different tasks will not be the same. Only a single instance should + // ever be created for a taskAttempt public TaskInfo(Object task, Object clientCookie, Priority priority, Resource capability, String[] hosts, String[] racks, long requestTime) { this.task = task; @@ -1071,14 +1221,22 @@ public class LlapTaskSchedulerService extends TaskSchedulerService { this.requestedHosts = hosts; this.requestedRacks = racks; this.requestTime = requestTime; + this.uniqueId = ID_GEN.getAndIncrement(); } - void setAssignmentInfo(ServiceInstance instance, ContainerId containerId) { + void setAssignmentInfo(ServiceInstance instance, ContainerId containerId, long startTime) { this.assignedInstance = instance; - this.containerId = containerId; + this.containerId = containerId; + this.startTime = startTime; assigned = true; } + void setPreemptedInfo(long preemptTime) { + this.preempted = true; + this.assigned = false; + this.preemptTime = preemptTime; + } + void triedAssigningTask() { numAssignAttempts++; } @@ -1086,6 +1244,66 @@ public class LlapTaskSchedulerService extends TaskSchedulerService { int getNumPreviousAssignAttempts() { return numAssignAttempts; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + TaskInfo taskInfo = (TaskInfo) o; + + if (uniqueId != taskInfo.uniqueId) { + return false; + } + return task.equals(taskInfo.task); + + } + + @Override + public int hashCode() { + int result = (int) (uniqueId ^ (uniqueId >>> 32)); + result = 31 * result + task.hashCode(); + return result; + } + + @Override + public String toString() { + return "TaskInfo{" + + "task=" + task + + ", priority=" + priority + + ", startTime=" + startTime + + ", containerId=" + containerId + + ", assignedInstance=" + assignedInstance + + ", uniqueId=" + uniqueId + + '}'; + } + } + + // Newer tasks first. + private static class TaskStartComparator implements Comparator<TaskInfo> { + + @Override + public int compare(TaskInfo o1, TaskInfo o2) { + if (o1.startTime > o2.startTime) { + return -1; + } else if (o1.startTime < o2.startTime) { + return 1; + } else { + // Comparing on time is not sufficient since two may be created at the same time, + // in which case inserting into a TreeSet/Map would break + if (o1.uniqueId > o2.uniqueId) { + return -1; + } else if (o1.uniqueId < o2.uniqueId) { + return 1; + } else { + return 0; + } + } + } } private static class NodeServiceInstancePair { http://git-wip-us.apache.org/repos/asf/hive/blob/b8b94f29/llap-server/src/protobuf/LlapDaemonProtocol.proto ---------------------------------------------------------------------- diff --git a/llap-server/src/protobuf/LlapDaemonProtocol.proto b/llap-server/src/protobuf/LlapDaemonProtocol.proto index d8fd882..0ba6acf 100644 --- a/llap-server/src/protobuf/LlapDaemonProtocol.proto +++ b/llap-server/src/protobuf/LlapDaemonProtocol.proto @@ -48,7 +48,7 @@ message GroupInputSpecProto { message FragmentSpecProto { - optional string task_attempt_id_string = 1; + optional string fragment_identifier_string = 1; optional string dag_name = 2; optional string vertex_name = 3; optional EntityDescriptorProto processor_descriptor = 4; @@ -111,10 +111,7 @@ message QueryCompleteResponseProto { message TerminateFragmentRequestProto { optional string query_id = 1; optional string dag_name = 2; - optional int32 dag_attempt_number = 3; - optional string vertex_name = 4; - optional int32 fragment_number = 5; - optional int32 attempt_number = 6; + optional string fragment_identifier_string = 7; } message TerminateFragmentResponseProto { http://git-wip-us.apache.org/repos/asf/hive/blob/b8b94f29/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java index a2e9501..6b6fac0 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java @@ -27,6 +27,7 @@ import java.util.concurrent.PriorityBlockingQueue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler; +import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorService.TaskWrapper; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; @@ -101,7 +102,7 @@ public class TestTaskExecutorService { .setVertexParallelism(parallelism) .setProcessorDescriptor( EntityDescriptorProto.newBuilder().setClassName("MockProcessor").build()) - .setTaskAttemptIdString(taId.toString()).build()).setAmHost("localhost") + .setFragmentIdentifierString(taId.toString()).build()).setAmHost("localhost") .setAmPort(12345).setAppAttemptNumber(0).setApplicationIdString("MockApp_1") .setContainerIdString("MockContainer_1").setUser("MockUser") .setTokenIdentifier("MockToken_1") @@ -115,12 +116,12 @@ public class TestTaskExecutorService { @Test public void testWaitQueueComparator() throws InterruptedException { - MockRequest r1 = new MockRequest(createRequest(1, 2, 100), false, 100000); - MockRequest r2 = new MockRequest(createRequest(2, 4, 200), false, 100000); - MockRequest r3 = new MockRequest(createRequest(3, 6, 300), false, 1000000); - MockRequest r4 = new MockRequest(createRequest(4, 8, 400), false, 1000000); - MockRequest r5 = new MockRequest(createRequest(5, 10, 500), false, 1000000); - EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue( + TaskWrapper r1 = createTaskWrapper(createRequest(1, 2, 100), false, 100000); + TaskWrapper r2 = createTaskWrapper(createRequest(2, 4, 200), false, 100000); + TaskWrapper r3 = createTaskWrapper(createRequest(3, 6, 300), false, 1000000); + TaskWrapper r4 = createTaskWrapper(createRequest(4, 8, 400), false, 1000000); + TaskWrapper r5 = createTaskWrapper(createRequest(5, 10, 500), false, 1000000); + EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>( new TaskExecutorService.WaitQueueComparator(), 4); assertNull(queue.offer(r1)); assertEquals(r1, queue.peek()); @@ -137,11 +138,11 @@ public class TestTaskExecutorService { assertEquals(r3, queue.take()); assertEquals(r4, queue.take()); - r1 = new MockRequest(createRequest(1, 2, 100), true, 100000); - r2 = new MockRequest(createRequest(2, 4, 200), true, 100000); - r3 = new MockRequest(createRequest(3, 6, 300), true, 1000000); - r4 = new MockRequest(createRequest(4, 8, 400), true, 1000000); - r5 = new MockRequest(createRequest(5, 10, 500), true, 1000000); + r1 = createTaskWrapper(createRequest(1, 2, 100), true, 100000); + r2 = createTaskWrapper(createRequest(2, 4, 200), true, 100000); + r3 = createTaskWrapper(createRequest(3, 6, 300), true, 1000000); + r4 = createTaskWrapper(createRequest(4, 8, 400), true, 1000000); + r5 = createTaskWrapper(createRequest(5, 10, 500), true, 1000000); queue = new EvictingPriorityBlockingQueue( new TaskExecutorService.WaitQueueComparator(), 4); assertNull(queue.offer(r1)); @@ -159,11 +160,11 @@ public class TestTaskExecutorService { assertEquals(r3, queue.take()); assertEquals(r4, queue.take()); - r1 = new MockRequest(createRequest(1, 1, 100), true, 100000); - r2 = new MockRequest(createRequest(2, 1, 200), false, 100000); - r3 = new MockRequest(createRequest(3, 1, 300), true, 1000000); - r4 = new MockRequest(createRequest(4, 1, 400), false, 1000000); - r5 = new MockRequest(createRequest(5, 10, 500), true, 1000000); + r1 = createTaskWrapper(createRequest(1, 1, 100), true, 100000); + r2 = createTaskWrapper(createRequest(2, 1, 200), false, 100000); + r3 = createTaskWrapper(createRequest(3, 1, 300), true, 1000000); + r4 = createTaskWrapper(createRequest(4, 1, 400), false, 1000000); + r5 = createTaskWrapper(createRequest(5, 10, 500), true, 1000000); queue = new EvictingPriorityBlockingQueue( new TaskExecutorService.WaitQueueComparator(), 4); assertNull(queue.offer(r1)); @@ -181,11 +182,11 @@ public class TestTaskExecutorService { assertEquals(r5, queue.take()); assertEquals(r2, queue.take()); - r1 = new MockRequest(createRequest(1, 2, 100), true, 100000); - r2 = new MockRequest(createRequest(2, 4, 200), false, 100000); - r3 = new MockRequest(createRequest(3, 6, 300), true, 1000000); - r4 = new MockRequest(createRequest(4, 8, 400), false, 1000000); - r5 = new MockRequest(createRequest(5, 10, 500), true, 1000000); + r1 = createTaskWrapper(createRequest(1, 2, 100), true, 100000); + r2 = createTaskWrapper(createRequest(2, 4, 200), false, 100000); + r3 = createTaskWrapper(createRequest(3, 6, 300), true, 1000000); + r4 = createTaskWrapper(createRequest(4, 8, 400), false, 1000000); + r5 = createTaskWrapper(createRequest(5, 10, 500), true, 1000000); queue = new EvictingPriorityBlockingQueue( new TaskExecutorService.WaitQueueComparator(), 4); assertNull(queue.offer(r1)); @@ -203,11 +204,11 @@ public class TestTaskExecutorService { assertEquals(r5, queue.take()); assertEquals(r2, queue.take()); - r1 = new MockRequest(createRequest(1, 2, 100), true, 100000); - r2 = new MockRequest(createRequest(2, 4, 200), false, 100000); - r3 = new MockRequest(createRequest(3, 6, 300), false, 1000000); - r4 = new MockRequest(createRequest(4, 8, 400), false, 1000000); - r5 = new MockRequest(createRequest(5, 10, 500), true, 1000000); + r1 = createTaskWrapper(createRequest(1, 2, 100), true, 100000); + r2 = createTaskWrapper(createRequest(2, 4, 200), false, 100000); + r3 = createTaskWrapper(createRequest(3, 6, 300), false, 1000000); + r4 = createTaskWrapper(createRequest(4, 8, 400), false, 1000000); + r5 = createTaskWrapper(createRequest(5, 10, 500), true, 1000000); queue = new EvictingPriorityBlockingQueue( new TaskExecutorService.WaitQueueComparator(), 4); assertNull(queue.offer(r1)); @@ -225,11 +226,11 @@ public class TestTaskExecutorService { assertEquals(r2, queue.take()); assertEquals(r3, queue.take()); - r1 = new MockRequest(createRequest(1, 2, 100), false, 100000); - r2 = new MockRequest(createRequest(2, 4, 200), true, 100000); - r3 = new MockRequest(createRequest(3, 6, 300), true, 1000000); - r4 = new MockRequest(createRequest(4, 8, 400), true, 1000000); - r5 = new MockRequest(createRequest(5, 10, 500), true, 1000000); + r1 = createTaskWrapper(createRequest(1, 2, 100), false, 100000); + r2 = createTaskWrapper(createRequest(2, 4, 200), true, 100000); + r3 = createTaskWrapper(createRequest(3, 6, 300), true, 1000000); + r4 = createTaskWrapper(createRequest(4, 8, 400), true, 1000000); + r5 = createTaskWrapper(createRequest(5, 10, 500), true, 1000000); queue = new EvictingPriorityBlockingQueue( new TaskExecutorService.WaitQueueComparator(), 4); assertNull(queue.offer(r1)); @@ -250,12 +251,13 @@ public class TestTaskExecutorService { @Test public void testPreemptionQueueComparator() throws InterruptedException { - MockRequest r1 = new MockRequest(createRequest(1, 2, 100), false, 100000); - MockRequest r2 = new MockRequest(createRequest(2, 4, 200), false, 100000); - MockRequest r3 = new MockRequest(createRequest(3, 6, 300), false, 1000000); - MockRequest r4 = new MockRequest(createRequest(4, 8, 400), false, 1000000); - BlockingQueue queue = new PriorityBlockingQueue(4, + TaskWrapper r1 = createTaskWrapper(createRequest(1, 2, 100), false, 100000); + TaskWrapper r2 = createTaskWrapper(createRequest(2, 4, 200), false, 100000); + TaskWrapper r3 = createTaskWrapper(createRequest(3, 6, 300), false, 1000000); + TaskWrapper r4 = createTaskWrapper(createRequest(4, 8, 400), false, 1000000); + BlockingQueue<TaskWrapper> queue = new PriorityBlockingQueue<>(4, new TaskExecutorService.PreemptionQueueComparator()); + queue.offer(r1); assertEquals(r1, queue.peek()); queue.offer(r2); @@ -268,4 +270,10 @@ public class TestTaskExecutorService { assertEquals(r3, queue.take()); assertEquals(r4, queue.take()); } + + private TaskWrapper createTaskWrapper(SubmitWorkRequestProto request, boolean canFinish, int workTime) { + MockRequest mockRequest = new MockRequest(request, canFinish, workTime); + TaskWrapper taskWrapper = new TaskWrapper(mockRequest, null); + return taskWrapper; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/b8b94f29/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java index a0399da..b9b89e3 100644 --- a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java +++ b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.util.SystemClock; @@ -49,6 +50,8 @@ import org.mockito.ArgumentCaptor; public class TestLlapTaskSchedulerService { + // TODO Fix the races and the broken scheduler control in the tests + private static final String HOST1 = "host1"; private static final String HOST2 = "host2"; private static final String HOST3 = "host3"; @@ -94,6 +97,63 @@ public class TestLlapTaskSchedulerService { } } + // TODO Add a test to ensure the correct task is being preempted, and the completion for the specific + // task triggers the next task to be scheduled. + + @Test(timeout=5000) + public void testPreemption() throws InterruptedException { + + Priority priority1 = Priority.newInstance(1); + Priority priority2 = Priority.newInstance(2); + String [] hosts = new String[] {HOST1}; + TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1); + try { + + Object task1 = new String("task1"); + Object clientCookie1 = new String("cookie1"); + Object task2 = new String("task2"); + Object clientCookie2 = new String("cookie1"); + Object task3 = new String("task3"); + Object clientCookie3 = new String("cookie1"); + Object task4 = new String("task4"); + Object clientCookie4 = new String("cookie1"); + + tsWrapper.controlScheduler(true); + int schedulerRunNumber = tsWrapper.getSchedulerRunNumber(); + tsWrapper.allocateTask(task1, hosts, priority2, clientCookie1); + tsWrapper.allocateTask(task2, hosts, priority2, clientCookie2); + tsWrapper.allocateTask(task3, hosts, priority2, clientCookie3); + tsWrapper.signalScheduler(); + tsWrapper.controlScheduler(false); + tsWrapper.awaitSchedulerRunNumber(schedulerRunNumber + 1); + verify(tsWrapper.mockAppCallback, times(2)).taskAllocated(any(Object.class), + any(Object.class), any(Container.class)); + assertEquals(2, tsWrapper.ts.dagStats.numLocalAllocations); + assertEquals(0, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest); + + reset(tsWrapper.mockAppCallback); + + tsWrapper.controlScheduler(true); + schedulerRunNumber = tsWrapper.getSchedulerRunNumber(); + tsWrapper.allocateTask(task4, hosts, priority1, clientCookie4); + tsWrapper.controlScheduler(false); + tsWrapper.awaitSchedulerRunNumber(schedulerRunNumber + 1); + verify(tsWrapper.mockAppCallback).preemptContainer(any(ContainerId.class)); + + schedulerRunNumber = tsWrapper.getSchedulerRunNumber(); + tsWrapper.deallocateTask(task2, false, TaskAttemptEndReason.INTERNAL_PREEMPTION); + tsWrapper.signalScheduler(); + Thread.sleep(2000l); + + verify(tsWrapper.mockAppCallback, times(1)).taskAllocated(eq(task4), + eq(clientCookie4), any(Container.class)); + + } finally { + tsWrapper.shutdown(); + } + + } + @Test(timeout=5000) public void testNodeDisabled() { TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(10000l); @@ -225,9 +285,15 @@ public class TestLlapTaskSchedulerService { } TestTaskSchedulerServiceWrapper(long disableTimeoutMillis) { + this(2000l, new String[]{HOST1, HOST2, HOST3}, 4, + LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE_DEFAULT); + } + + TestTaskSchedulerServiceWrapper(long disableTimeoutMillis, String[] hosts, int numExecutors, int waitQueueSize) { conf = new Configuration(); - conf.setStrings(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS, HOST1, HOST2, HOST3); - conf.setInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS, 4); + conf.setStrings(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS, hosts); + conf.setInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS, numExecutors); + conf.setInt(LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE, waitQueueSize); conf.setLong(LlapConfiguration.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MILLIS, disableTimeoutMillis); conf.setBoolean(LlapTaskSchedulerServiceForTest.LLAP_TASK_SCHEDULER_IN_TEST, true); @@ -270,6 +336,10 @@ public class TestLlapTaskSchedulerService { ts.allocateTask(task, resource, hosts, null, priority, null, clientCookie); } + void deallocateTask(Object task, boolean succeeded, TaskAttemptEndReason endReason) { + ts.deallocateTask(task, succeeded, endReason); + } + void rejectExecution(Object task) { ts.deallocateTask(task, false, TaskAttemptEndReason.SERVICE_BUSY); }