[GitHub] [incubator-druid] jihoonson commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots
jihoonson commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r357323580 ## File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java ## @@ -1138,6 +1139,12 @@ private void pendingTasksExecutionLoop() // set state to PENDING_WORKER_ASSIGN before releasing the lock so that this task item is not picked // up by another task execution thread. +// note that we can't simply delete this task item from pendingTaskIds or else we would have to add it +// back if this thread couldn't run this task for any reason, which we will know at some later time +// and also we will need to add it back to its old position in the list. that becomes complex quickly. +// Instead we keep the PENDING_WORKER_ASSIGN to notify other task execution threads not to pick this one up. +// And, it is automatically removed by any of the task exeuction threads when they notice that Review comment: typo: exeuction -> execution This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] jihoonson commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots
jihoonson commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r357275275 ## File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java ## @@ -1021,92 +1043,164 @@ public void unregisterListener(String listenerId) HttpRemoteTaskRunnerWorkItem.State.PENDING ); tasks.put(task.getId(), taskRunnerWorkItem); -addPendingTaskToExecutor(task.getId()); +pendingTaskIds.add(task.getId()); + +statusLock.notifyAll(); + return taskRunnerWorkItem.getResult(); } } } - private void addPendingTaskToExecutor(final String taskId) + private void startPendingTaskHandling() { -pendingTasksExec.execute( -() -> { - while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { -ImmutableWorkerInfo immutableWorker; -HttpRemoteTaskRunnerWorkItem taskItem = null; +for (int i = 0; i < config.getPendingTasksRunnerNumThreads(); i++) { + pendingTasksExec.submit( + () -> { try { - synchronized (statusLock) { -taskItem = tasks.get(taskId); + if (!lifecycleLock.awaitStarted()) { +log.makeAlert("Lifecycle not started, PendingTaskExecution loop will not run.").emit(); +return; + } -if (taskItem == null) { - log.info( - "Task[%s] work item not found. Probably user asked to shutdown before. Not assigning.", - taskId - ); - return; -} + pendingTasksExecutionLoop(); +} +catch (Throwable t) { + log.makeAlert(t, "Error while waiting for lifecycle start. PendingTaskExecution loop will not run") + .emit(); +} +finally { + log.info("PendingTaskExecution loop exited."); +} + } + ); +} + } -if (taskItem.getState() != HttpRemoteTaskRunnerWorkItem.State.PENDING) { - log.info( - "Task[%s] is in state[%s]. Probably some worker already reported it. Not assigning.", - taskId, - taskItem.getState() - ); - return; -} + private void pendingTasksExecutionLoop() + { +while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { + try { +// Find one pending task to run and a worker to run on +HttpRemoteTaskRunnerWorkItem taskItem = null; +ImmutableWorkerInfo immutableWorker = null; -if (taskItem.getTask() == null) { - throw new ISE("WTF! couldn't find Task instance for taskId[%s].", taskId); -} -immutableWorker = findWorkerToRunTask(taskItem.getTask()); - -if (immutableWorker == null) { - // no free worker, wait for some worker to become free - statusLock.wait(config.getWaitForWorkerSlot().toStandardDuration().getMillis()); - continue; -} else if (workersWithUnacknowledgedTask.putIfAbsent( -immutableWorker.getWorker().getHost(), -taskId -) != null) { - // there was a race and someone else took this worker slot, try again - continue; -} - } +synchronized (statusLock) { + Iterator iter = pendingTaskIds.iterator(); + while (iter.hasNext()) { +String taskId = iter.next(); +HttpRemoteTaskRunnerWorkItem ti = tasks.get(taskId); + +if (ti == null || !ti.getState().isPending()) { + // happens if the task was shutdown or was picked up earlier and no more pending + iter.remove(); Review comment: Oh I believe this would work. I was just curious whether it was intended or not. I'm ok with it if this was intended. Would you please add a comment to avoid confusion in the future? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] jihoonson commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots
jihoonson commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r356919309 ## File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java ## @@ -1021,92 +1043,164 @@ public void unregisterListener(String listenerId) HttpRemoteTaskRunnerWorkItem.State.PENDING ); tasks.put(task.getId(), taskRunnerWorkItem); -addPendingTaskToExecutor(task.getId()); +pendingTaskIds.add(task.getId()); + +statusLock.notifyAll(); + return taskRunnerWorkItem.getResult(); } } } - private void addPendingTaskToExecutor(final String taskId) + private void startPendingTaskHandling() { -pendingTasksExec.execute( -() -> { - while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { -ImmutableWorkerInfo immutableWorker; -HttpRemoteTaskRunnerWorkItem taskItem = null; +for (int i = 0; i < config.getPendingTasksRunnerNumThreads(); i++) { + pendingTasksExec.submit( + () -> { try { - synchronized (statusLock) { -taskItem = tasks.get(taskId); + if (!lifecycleLock.awaitStarted()) { +log.makeAlert("Lifecycle not started, PendingTaskExecution loop will not run.").emit(); +return; + } -if (taskItem == null) { - log.info( - "Task[%s] work item not found. Probably user asked to shutdown before. Not assigning.", - taskId - ); - return; -} + pendingTasksExecutionLoop(); +} +catch (Throwable t) { + log.makeAlert(t, "Error while waiting for lifecycle start. PendingTaskExecution loop will not run") + .emit(); +} +finally { + log.info("PendingTaskExecution loop exited."); +} + } + ); +} + } -if (taskItem.getState() != HttpRemoteTaskRunnerWorkItem.State.PENDING) { - log.info( - "Task[%s] is in state[%s]. Probably some worker already reported it. Not assigning.", - taskId, - taskItem.getState() - ); - return; -} + private void pendingTasksExecutionLoop() + { +while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { + try { +// Find one pending task to run and a worker to run on +HttpRemoteTaskRunnerWorkItem taskItem = null; +ImmutableWorkerInfo immutableWorker = null; -if (taskItem.getTask() == null) { - throw new ISE("WTF! couldn't find Task instance for taskId[%s].", taskId); -} -immutableWorker = findWorkerToRunTask(taskItem.getTask()); - -if (immutableWorker == null) { - // no free worker, wait for some worker to become free - statusLock.wait(config.getWaitForWorkerSlot().toStandardDuration().getMillis()); - continue; -} else if (workersWithUnacknowledgedTask.putIfAbsent( -immutableWorker.getWorker().getHost(), -taskId -) != null) { - // there was a race and someone else took this worker slot, try again - continue; -} - } +synchronized (statusLock) { + Iterator iter = pendingTaskIds.iterator(); + while (iter.hasNext()) { +String taskId = iter.next(); +HttpRemoteTaskRunnerWorkItem ti = tasks.get(taskId); + +if (ti == null || !ti.getState().isPending()) { + // happens if the task was shutdown or was picked up earlier and no more pending + iter.remove(); Review comment: Hmm, sorry I don't think I fully understand this logic yet. > before leaving the lock , ti state is updated to PENDING_WORKER_ASSIGN so no other task executor thread can pick up. Do you mean the other threads can pick up even after the ti state is set to `PENDING_WORKER_ASSIGN` but will ignore it in the [below if clause](https://github.com/apache/incubator-druid/pull/8697/files#diff-b994d2f0b67b07608a060f52a17fbd01R1107)? Or, do they really not pick up because somehow `pendingTaskIds` is updated before other threads start picking up? > `pendingTaskIds` variable declaration has a comment that this variable is exclusively manipulated by only external task submitter threads or task executor threads which I preferred. Yes, I
[GitHub] [incubator-druid] jihoonson commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots
jihoonson commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r356919345 ## File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java ## @@ -1021,92 +1043,164 @@ public void unregisterListener(String listenerId) HttpRemoteTaskRunnerWorkItem.State.PENDING ); tasks.put(task.getId(), taskRunnerWorkItem); -addPendingTaskToExecutor(task.getId()); +pendingTaskIds.add(task.getId()); + +statusLock.notifyAll(); + return taskRunnerWorkItem.getResult(); } } } - private void addPendingTaskToExecutor(final String taskId) + private void startPendingTaskHandling() { -pendingTasksExec.execute( -() -> { - while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { -ImmutableWorkerInfo immutableWorker; -HttpRemoteTaskRunnerWorkItem taskItem = null; +for (int i = 0; i < config.getPendingTasksRunnerNumThreads(); i++) { + pendingTasksExec.submit( + () -> { try { - synchronized (statusLock) { -taskItem = tasks.get(taskId); + if (!lifecycleLock.awaitStarted()) { +log.makeAlert("Lifecycle not started, PendingTaskExecution loop will not run.").emit(); +return; + } -if (taskItem == null) { - log.info( - "Task[%s] work item not found. Probably user asked to shutdown before. Not assigning.", - taskId - ); - return; -} + pendingTasksExecutionLoop(); +} +catch (Throwable t) { + log.makeAlert(t, "Error while waiting for lifecycle start. PendingTaskExecution loop will not run") + .emit(); +} +finally { + log.info("PendingTaskExecution loop exited."); +} + } + ); +} + } -if (taskItem.getState() != HttpRemoteTaskRunnerWorkItem.State.PENDING) { - log.info( - "Task[%s] is in state[%s]. Probably some worker already reported it. Not assigning.", - taskId, - taskItem.getState() - ); - return; -} + private void pendingTasksExecutionLoop() + { +while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { + try { +// Find one pending task to run and a worker to run on +HttpRemoteTaskRunnerWorkItem taskItem = null; +ImmutableWorkerInfo immutableWorker = null; -if (taskItem.getTask() == null) { - throw new ISE("WTF! couldn't find Task instance for taskId[%s].", taskId); -} -immutableWorker = findWorkerToRunTask(taskItem.getTask()); - -if (immutableWorker == null) { - // no free worker, wait for some worker to become free - statusLock.wait(config.getWaitForWorkerSlot().toStandardDuration().getMillis()); - continue; -} else if (workersWithUnacknowledgedTask.putIfAbsent( -immutableWorker.getWorker().getHost(), -taskId -) != null) { - // there was a race and someone else took this worker slot, try again - continue; -} - } +synchronized (statusLock) { + Iterator iter = pendingTaskIds.iterator(); + while (iter.hasNext()) { +String taskId = iter.next(); +HttpRemoteTaskRunnerWorkItem ti = tasks.get(taskId); + +if (ti == null || !ti.getState().isPending()) { + // happens if the task was shutdown or was picked up earlier and no more pending + iter.remove(); + continue; +} - try { -// this will send HTTP request to worker for assigning task and hence kept -// outside the synchronized block. -if (runTaskOnWorker(taskItem, immutableWorker.getWorker().getHost())) { - return; -} - } - finally { - workersWithUnacknowledgedTask.remove(immutableWorker.getWorker().getHost()); -synchronized (statusLock) { - statusLock.notifyAll(); -} - } +if (ti.getState() == HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN) { + // picked up by another pending task executor thread. + continue;
[GitHub] [incubator-druid] jihoonson commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots
jihoonson commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r356819299 ## File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java ## @@ -313,6 +325,25 @@ private void scheduleCompletedTaskStatusCleanupFromZk() ); } + /** + * Must not be used outside of this class and {@link HttpRemoteTaskRunnerResource} + */ + @SuppressWarnings("GuardedBy") // Read on workersWithUnacknowledgedTask is safe + public Map getWorkersEligibleToRunTasks() + { +return Maps.transformEntries( +Maps.filterEntries( +workers, +input -> !lazyWorkers.containsKey(input.getKey()) && + !workersWithUnacknowledgedTask.containsKey(input.getKey()) && + !blackListedWorkers.containsKey(input.getKey()) && Review comment: Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] jihoonson commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots
jihoonson commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r356344644 ## File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java ## @@ -1021,92 +1043,164 @@ public void unregisterListener(String listenerId) HttpRemoteTaskRunnerWorkItem.State.PENDING ); tasks.put(task.getId(), taskRunnerWorkItem); -addPendingTaskToExecutor(task.getId()); +pendingTaskIds.add(task.getId()); + +statusLock.notifyAll(); + return taskRunnerWorkItem.getResult(); } } } - private void addPendingTaskToExecutor(final String taskId) + private void startPendingTaskHandling() { -pendingTasksExec.execute( -() -> { - while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { -ImmutableWorkerInfo immutableWorker; -HttpRemoteTaskRunnerWorkItem taskItem = null; +for (int i = 0; i < config.getPendingTasksRunnerNumThreads(); i++) { + pendingTasksExec.submit( + () -> { try { - synchronized (statusLock) { -taskItem = tasks.get(taskId); + if (!lifecycleLock.awaitStarted()) { +log.makeAlert("Lifecycle not started, PendingTaskExecution loop will not run.").emit(); +return; + } -if (taskItem == null) { - log.info( - "Task[%s] work item not found. Probably user asked to shutdown before. Not assigning.", - taskId - ); - return; -} + pendingTasksExecutionLoop(); +} +catch (Throwable t) { + log.makeAlert(t, "Error while waiting for lifecycle start. PendingTaskExecution loop will not run") + .emit(); +} +finally { + log.info("PendingTaskExecution loop exited."); +} + } + ); +} + } -if (taskItem.getState() != HttpRemoteTaskRunnerWorkItem.State.PENDING) { - log.info( - "Task[%s] is in state[%s]. Probably some worker already reported it. Not assigning.", - taskId, - taskItem.getState() - ); - return; -} + private void pendingTasksExecutionLoop() + { +while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { + try { +// Find one pending task to run and a worker to run on +HttpRemoteTaskRunnerWorkItem taskItem = null; +ImmutableWorkerInfo immutableWorker = null; -if (taskItem.getTask() == null) { - throw new ISE("WTF! couldn't find Task instance for taskId[%s].", taskId); -} -immutableWorker = findWorkerToRunTask(taskItem.getTask()); - -if (immutableWorker == null) { - // no free worker, wait for some worker to become free - statusLock.wait(config.getWaitForWorkerSlot().toStandardDuration().getMillis()); - continue; -} else if (workersWithUnacknowledgedTask.putIfAbsent( -immutableWorker.getWorker().getHost(), -taskId -) != null) { - // there was a race and someone else took this worker slot, try again - continue; -} - } +synchronized (statusLock) { + Iterator iter = pendingTaskIds.iterator(); + while (iter.hasNext()) { +String taskId = iter.next(); +HttpRemoteTaskRunnerWorkItem ti = tasks.get(taskId); + +if (ti == null || !ti.getState().isPending()) { + // happens if the task was shutdown or was picked up earlier and no more pending + iter.remove(); + continue; +} - try { -// this will send HTTP request to worker for assigning task and hence kept -// outside the synchronized block. -if (runTaskOnWorker(taskItem, immutableWorker.getWorker().getHost())) { - return; -} - } - finally { - workersWithUnacknowledgedTask.remove(immutableWorker.getWorker().getHost()); -synchronized (statusLock) { - statusLock.notifyAll(); -} - } +if (ti.getState() == HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN) { + // picked up by another pending task executor thread. + continue;
[GitHub] [incubator-druid] jihoonson commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots
jihoonson commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r356350247 ## File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java ## @@ -1021,92 +1043,164 @@ public void unregisterListener(String listenerId) HttpRemoteTaskRunnerWorkItem.State.PENDING ); tasks.put(task.getId(), taskRunnerWorkItem); -addPendingTaskToExecutor(task.getId()); +pendingTaskIds.add(task.getId()); + +statusLock.notifyAll(); + return taskRunnerWorkItem.getResult(); } } } - private void addPendingTaskToExecutor(final String taskId) + private void startPendingTaskHandling() { -pendingTasksExec.execute( -() -> { - while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { -ImmutableWorkerInfo immutableWorker; -HttpRemoteTaskRunnerWorkItem taskItem = null; +for (int i = 0; i < config.getPendingTasksRunnerNumThreads(); i++) { + pendingTasksExec.submit( + () -> { try { - synchronized (statusLock) { -taskItem = tasks.get(taskId); + if (!lifecycleLock.awaitStarted()) { +log.makeAlert("Lifecycle not started, PendingTaskExecution loop will not run.").emit(); +return; + } -if (taskItem == null) { - log.info( - "Task[%s] work item not found. Probably user asked to shutdown before. Not assigning.", - taskId - ); - return; -} + pendingTasksExecutionLoop(); +} +catch (Throwable t) { + log.makeAlert(t, "Error while waiting for lifecycle start. PendingTaskExecution loop will not run") + .emit(); +} +finally { + log.info("PendingTaskExecution loop exited."); +} + } + ); +} + } -if (taskItem.getState() != HttpRemoteTaskRunnerWorkItem.State.PENDING) { - log.info( - "Task[%s] is in state[%s]. Probably some worker already reported it. Not assigning.", - taskId, - taskItem.getState() - ); - return; -} + private void pendingTasksExecutionLoop() + { +while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { + try { +// Find one pending task to run and a worker to run on +HttpRemoteTaskRunnerWorkItem taskItem = null; +ImmutableWorkerInfo immutableWorker = null; -if (taskItem.getTask() == null) { - throw new ISE("WTF! couldn't find Task instance for taskId[%s].", taskId); -} -immutableWorker = findWorkerToRunTask(taskItem.getTask()); - -if (immutableWorker == null) { - // no free worker, wait for some worker to become free - statusLock.wait(config.getWaitForWorkerSlot().toStandardDuration().getMillis()); - continue; -} else if (workersWithUnacknowledgedTask.putIfAbsent( -immutableWorker.getWorker().getHost(), -taskId -) != null) { - // there was a race and someone else took this worker slot, try again - continue; -} - } +synchronized (statusLock) { + Iterator iter = pendingTaskIds.iterator(); + while (iter.hasNext()) { +String taskId = iter.next(); +HttpRemoteTaskRunnerWorkItem ti = tasks.get(taskId); + +if (ti == null || !ti.getState().isPending()) { + // happens if the task was shutdown or was picked up earlier and no more pending + iter.remove(); Review comment: Looks like `pendingTaskIds` is updated only here. This will lead to processing the same taskId multiple times. HRTR will assign the task to some worker in the first loop, ignore it until the worker starts processing that task in the below `if` clause, and then finally remove the taskId here. Can we update `pendingTaskIds` when the task is successfully assigned? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-druid] jihoonson commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots
jihoonson commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r356339972 ## File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java ## @@ -1501,6 +1623,23 @@ public void setState(State state) state ); + setStateUnconditionally(state); +} + +public void revertStateFromPendingWorkerAssignToPending() +{ + Preconditions.checkState( + this.state == State.PENDING_WORKER_ASSIGN, + "Can't move state from [%s] to [%s]", + this.state, + State.PENDING + ); + + setStateUnconditionally(State.PENDING); +} + +private void setStateUnconditionally(State state) +{ if (log.isDebugEnabled()) { log.debug( new RuntimeException("Stacktrace..."), Review comment: Is this to print the stack trace so that you can see where this method was called from when the log was printed? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] jihoonson commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots
jihoonson commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r356339621 ## File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java ## @@ -1432,15 +1533,35 @@ void taskAddedOrUpdated(final TaskAnnouncement announcement, final WorkerHolder { enum State { - PENDING(0), - RUNNING(1), - COMPLETE(2); + // Task has been given to HRTR, but a worker to run this task hasn't been identified yet. + PENDING(0, true, RunnerTaskState.PENDING), + + // A Worker has been identified to run this task, but request to run task hasn't been made to worker yet + // or worker hasn't acknowledged the task yet. + PENDING_WORKER_ASSIGN(1, true, RunnerTaskState.PENDING), - int index; + RUNNING(2, false, RunnerTaskState.RUNNING), + COMPLETE(3, false, RunnerTaskState.NONE); - State(int index) + private int index; + private boolean isPending; + private RunnerTaskState runnerTaskState; Review comment: These variables can be final. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] jihoonson commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots
jihoonson commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r356344661 ## File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java ## @@ -1021,92 +1043,164 @@ public void unregisterListener(String listenerId) HttpRemoteTaskRunnerWorkItem.State.PENDING ); tasks.put(task.getId(), taskRunnerWorkItem); -addPendingTaskToExecutor(task.getId()); +pendingTaskIds.add(task.getId()); + +statusLock.notifyAll(); + return taskRunnerWorkItem.getResult(); } } } - private void addPendingTaskToExecutor(final String taskId) + private void startPendingTaskHandling() { -pendingTasksExec.execute( -() -> { - while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { -ImmutableWorkerInfo immutableWorker; -HttpRemoteTaskRunnerWorkItem taskItem = null; +for (int i = 0; i < config.getPendingTasksRunnerNumThreads(); i++) { + pendingTasksExec.submit( + () -> { try { - synchronized (statusLock) { -taskItem = tasks.get(taskId); + if (!lifecycleLock.awaitStarted()) { +log.makeAlert("Lifecycle not started, PendingTaskExecution loop will not run.").emit(); +return; + } -if (taskItem == null) { - log.info( - "Task[%s] work item not found. Probably user asked to shutdown before. Not assigning.", - taskId - ); - return; -} + pendingTasksExecutionLoop(); +} +catch (Throwable t) { + log.makeAlert(t, "Error while waiting for lifecycle start. PendingTaskExecution loop will not run") + .emit(); +} +finally { + log.info("PendingTaskExecution loop exited."); +} + } + ); +} + } -if (taskItem.getState() != HttpRemoteTaskRunnerWorkItem.State.PENDING) { - log.info( - "Task[%s] is in state[%s]. Probably some worker already reported it. Not assigning.", - taskId, - taskItem.getState() - ); - return; -} + private void pendingTasksExecutionLoop() + { +while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { + try { +// Find one pending task to run and a worker to run on +HttpRemoteTaskRunnerWorkItem taskItem = null; +ImmutableWorkerInfo immutableWorker = null; -if (taskItem.getTask() == null) { - throw new ISE("WTF! couldn't find Task instance for taskId[%s].", taskId); -} -immutableWorker = findWorkerToRunTask(taskItem.getTask()); - -if (immutableWorker == null) { - // no free worker, wait for some worker to become free - statusLock.wait(config.getWaitForWorkerSlot().toStandardDuration().getMillis()); - continue; -} else if (workersWithUnacknowledgedTask.putIfAbsent( -immutableWorker.getWorker().getHost(), -taskId -) != null) { - // there was a race and someone else took this worker slot, try again - continue; -} - } +synchronized (statusLock) { + Iterator iter = pendingTaskIds.iterator(); + while (iter.hasNext()) { +String taskId = iter.next(); +HttpRemoteTaskRunnerWorkItem ti = tasks.get(taskId); + +if (ti == null || !ti.getState().isPending()) { + // happens if the task was shutdown or was picked up earlier and no more pending + iter.remove(); + continue; +} - try { -// this will send HTTP request to worker for assigning task and hence kept -// outside the synchronized block. -if (runTaskOnWorker(taskItem, immutableWorker.getWorker().getHost())) { - return; -} - } - finally { - workersWithUnacknowledgedTask.remove(immutableWorker.getWorker().getHost()); -synchronized (statusLock) { - statusLock.notifyAll(); -} - } +if (ti.getState() == HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN) { + // picked up by another pending task executor thread. + continue;
[GitHub] [incubator-druid] jihoonson commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots
jihoonson commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r356331904 ## File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java ## @@ -313,6 +325,25 @@ private void scheduleCompletedTaskStatusCleanupFromZk() ); } + /** + * Must not be used outside of this class and {@link HttpRemoteTaskRunnerResource} + */ + @SuppressWarnings("GuardedBy") // Read on workersWithUnacknowledgedTask is safe + public Map getWorkersEligibleToRunTasks() Review comment: Looks better to be package-private if it's not supposed to be used in general. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] jihoonson commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots
jihoonson commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r356338730 ## File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java ## @@ -313,6 +325,25 @@ private void scheduleCompletedTaskStatusCleanupFromZk() ); } + /** + * Must not be used outside of this class and {@link HttpRemoteTaskRunnerResource} + */ + @SuppressWarnings("GuardedBy") // Read on workersWithUnacknowledgedTask is safe + public Map getWorkersEligibleToRunTasks() + { +return Maps.transformEntries( +Maps.filterEntries( +workers, +input -> !lazyWorkers.containsKey(input.getKey()) && + !workersWithUnacknowledgedTask.containsKey(input.getKey()) && + !blackListedWorkers.containsKey(input.getKey()) && Review comment: Hmm, this looks racy because each concurrent map is updated under different locks. But is this ok because this method is called periodically? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org