jihoonson commented on a change in pull request #8697: HRTR: make pending task 
execution handling to go through all tasks on not finding worker slots
URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r356350247
 
 

 ##########
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
 ##########
 @@ -1021,92 +1043,164 @@ public void unregisterListener(String listenerId)
             HttpRemoteTaskRunnerWorkItem.State.PENDING
         );
         tasks.put(task.getId(), taskRunnerWorkItem);
-        addPendingTaskToExecutor(task.getId());
+        pendingTaskIds.add(task.getId());
+
+        statusLock.notifyAll();
+
         return taskRunnerWorkItem.getResult();
       }
     }
   }
 
-  private void addPendingTaskToExecutor(final String taskId)
+  private void startPendingTaskHandling()
   {
-    pendingTasksExec.execute(
-        () -> {
-          while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, 
TimeUnit.MILLISECONDS)) {
-            ImmutableWorkerInfo immutableWorker;
-            HttpRemoteTaskRunnerWorkItem taskItem = null;
+    for (int i = 0; i < config.getPendingTasksRunnerNumThreads(); i++) {
+      pendingTasksExec.submit(
+          () -> {
             try {
-              synchronized (statusLock) {
-                taskItem = tasks.get(taskId);
+              if (!lifecycleLock.awaitStarted()) {
+                log.makeAlert("Lifecycle not started, PendingTaskExecution 
loop will not run.").emit();
+                return;
+              }
 
-                if (taskItem == null) {
-                  log.info(
-                      "Task[%s] work item not found. Probably user asked to 
shutdown before. Not assigning.",
-                      taskId
-                  );
-                  return;
-                }
+              pendingTasksExecutionLoop();
+            }
+            catch (Throwable t) {
+              log.makeAlert(t, "Error while waiting for lifecycle start. 
PendingTaskExecution loop will not run")
+                 .emit();
+            }
+            finally {
+              log.info("PendingTaskExecution loop exited.");
+            }
+          }
+      );
+    }
+  }
 
-                if (taskItem.getState() != 
HttpRemoteTaskRunnerWorkItem.State.PENDING) {
-                  log.info(
-                      "Task[%s] is in state[%s]. Probably some worker already 
reported it. Not assigning.",
-                      taskId,
-                      taskItem.getState()
-                  );
-                  return;
-                }
+  private void pendingTasksExecutionLoop()
+  {
+    while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, 
TimeUnit.MILLISECONDS)) {
+      try {
+        // Find one pending task to run and a worker to run on
+        HttpRemoteTaskRunnerWorkItem taskItem = null;
+        ImmutableWorkerInfo immutableWorker = null;
 
-                if (taskItem.getTask() == null) {
-                  throw new ISE("WTF! couldn't find Task instance for 
taskId[%s].", taskId);
-                }
-                immutableWorker = findWorkerToRunTask(taskItem.getTask());
-
-                if (immutableWorker == null) {
-                  // no free worker, wait for some worker to become free
-                  
statusLock.wait(config.getWaitForWorkerSlot().toStandardDuration().getMillis());
-                  continue;
-                } else if (workersWithUnacknowledgedTask.putIfAbsent(
-                    immutableWorker.getWorker().getHost(),
-                    taskId
-                ) != null) {
-                  // there was a race and someone else took this worker slot, 
try again
-                  continue;
-                }
-              }
+        synchronized (statusLock) {
+          Iterator<String> iter = pendingTaskIds.iterator();
+          while (iter.hasNext()) {
+            String taskId = iter.next();
+            HttpRemoteTaskRunnerWorkItem ti = tasks.get(taskId);
+
+            if (ti == null || !ti.getState().isPending()) {
+              // happens if the task was shutdown or was picked up earlier and 
no more pending
+              iter.remove();
 
 Review comment:
   Looks like `pendingTaskIds` is updated only here. This will lead to 
processing the same taskId multiple times. HRTR will assign the task to some 
worker in the first loop, ignore it until the worker starts processing that 
task in the below `if` clause, and then finally remove the taskId here. Can we 
update `pendingTaskIds` when the task is successfully assigned?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to