mustafaiman commented on a change in pull request #2123:
URL: https://github.com/apache/hive/pull/2123#discussion_r611864768
##########
File path:
llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
##########
@@ -429,6 +437,11 @@ public LlapTaskSchedulerService(TaskSchedulerContext
taskSchedulerContext, Clock
delayedTaskSchedulerExecutor =
MoreExecutors.listeningDecorator(delayedTaskSchedulerExecutorRaw);
+ ExecutorService preemptTaskSchedulerExecutorRaw =
Executors.newFixedThreadPool(1,
Review comment:
If preemption is turned off, we won't need this executor.
##########
File path:
llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
##########
@@ -1954,6 +1911,37 @@ protected void schedulePendingTasks() throws
InterruptedException {
break;
}
}
+ // Finally take care of preemption requests that can unblock higher-pri
tasks.
+ // This removes preemptable tasks from the runningList and sends out a
preempt request to the system.
+ // Subsequent tasks will be scheduled once the de-allocate request for
the preempted task is processed.
+ while (!preemptionCandidates.isEmpty()) {
+ TaskInfo toPreempt = preemptionCandidates.take();
+ // 1. task has not terminated
+ if (toPreempt.isGuaranteed != null) {
+ String host = toPreempt.getAssignedNode().getHost();
+ // 2. is currently assigned 3. no preemption pending on that Host
+ if (toPreempt.getState() == TaskInfo.State.ASSIGNED &&
+ (pendingPreemptionsPerHost.get(host) == null ||
pendingPreemptionsPerHost.get(host).intValue() == 0)) {
+ LOG.debug("Preempting task took {} ms {}", (clock.getTime() -
toPreempt.getPreemptedTime()), toPreempt);
Review comment:
This looks like a leftover from a debugging session.
##########
File path:
llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
##########
@@ -2324,7 +2278,114 @@ private void maybeAddToDelayedTaskQueue(TaskInfo
taskInfo) {
}
}
+ private void maybeAddToHighPriorityTaskQueue(TaskInfo taskInfo) {
+ // Only add task if its not already in the Queue AND there no mores than
HOSTS tasks there already
+ // as we are performing up to HOSTS preemptions at a time
+ if (!taskInfo.isInHighPriorityQueue() && highPriorityTaskQueue.size() <
activeInstances.size()) {
+ taskInfo.setInHighPriorityQueue(true);
+ highPriorityTaskQueue.add(taskInfo);
+ }
+ }
+
// ------ Inner classes defined after this point ------
+ class PreemptionSchedulerCallable implements Callable<Void> {
+ private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+
+ @Override
+ public Void call() {
+ while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) {
+ try {
+ TaskInfo taskInfo = getNextTask();
+ // Tasks can exist in the queue even after they have been scheduled.
+ // Process task Preemption only if the task is still in PENDING
state.
+ processTaskPreemption(taskInfo);
+
+ } catch (InterruptedException e) {
+ if (isShutdown.get()) {
+ LOG.info("PreemptTaskScheduler thread interrupted after shutdown");
+ break;
+ } else {
+ LOG.warn("PreemptTaskScheduler thread interrupted before being
shutdown");
+ throw new RuntimeException("PreemptTaskScheduler thread
interrupted without being shutdown", e);
+ }
+ }
+ }
+ return null;
+ }
+
+ private void processTaskPreemption(TaskInfo taskInfo) {
+ if (shouldAttemptTask(taskInfo) && tryTaskPreemption(taskInfo)) {
+ trySchedulingPendingTasks();
+ }
+ // Enables scheduler to reAdd task in Queue if needed
+ taskInfo.setInHighPriorityQueue(false);
+ }
+
+ private boolean tryTaskPreemption(TaskInfo taskInfo) {
+ // Find a lower priority task that can be preempted on a particular host.
+ // ONLY if there's no pending preemptions on that host to avoid
preempting twice for a task.
+ Set<String> potentialHosts = null; // null => preempt on any host.
+ readLock.lock();
+ try {
+ // Protect against a bad location being requested.
+ if (taskInfo.requestedHosts != null && taskInfo.requestedHosts.length
!= 0) {
+ potentialHosts = Sets.newHashSet(taskInfo.requestedHosts);
+ }
+ if (potentialHosts != null) {
+ // Preempt on specific host
+ boolean shouldPreempt = true;
+ for (String host : potentialHosts) {
+ // Preempt only if there are no pending preemptions on the same
host
+ // When the preemption registers, the request at the highest
priority will be given the slot,
+ // even if the initial preemption was caused by some other task.
+ // TODO Maybe register which task the preemption was for, to avoid
a bad non-local allocation.
+ MutableInt pendingHostPreemptions =
pendingPreemptionsPerHost.get(host);
+ if (pendingHostPreemptions != null &&
pendingHostPreemptions.intValue() > 0) {
+ shouldPreempt = false;
+ LOG.debug("No preempt candidate for task={}. Found an existing
preemption request on host={}, pendingPreemptionCount={}",
+ taskInfo.task, host, pendingHostPreemptions.intValue());
+ break;
+ }
+ }
+
+ if (!shouldPreempt) {
+ LOG.debug("No preempt candidate for {} on potential hosts={}. An
existing preemption request exists",
+ taskInfo.task, potentialHosts);
+ return false;
+ }
+ } else {
+ // Unknown requested host -- Request for a preemption if there's
none pending. If a single preemption is pending,
+ // and this is the next task to be assigned, it will be assigned
once that slot becomes available.
+ if (pendingPreemptions.get() != 0) {
+ LOG.debug("Skipping preempt candidate since there are {} pending
preemption request. For task={}",
+ pendingPreemptions.get(), taskInfo);
+ return false;
+ }
+ }
+
+ LOG.debug("Attempting preempt candidate for task={}, priority={} on
potential hosts={}. pendingPreemptions={}",
+ taskInfo.task, taskInfo.priority, potentialHosts == null ? "ANY" :
potentialHosts, pendingPreemptions.get());
+ return addTaskPreemptionCandidate(speculativeTasks, taskInfo,
potentialHosts) ||
+ addTaskPreemptionCandidate(guaranteedTasks, taskInfo,
potentialHosts);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public void shutdown() {
+ isShutdown.set(true);
+ }
+
+ public TaskInfo getNextTask() throws InterruptedException {
Review comment:
getNextTask does not add any value. Can you remove this and just use
highPriorityTaskQueue.take() instead of getNextTask(). It is called from only
one place.
##########
File path:
llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
##########
@@ -3049,7 +3131,7 @@ boolean isUpdateInProgress() {
return isPendingUpdate;
}
- TezTaskAttemptID getAttemptId() {
+ synchronized TezTaskAttemptID getAttemptId() {
Review comment:
attemptId is final. Why would we need synchronization accessing it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]