[GitHub] [incubator-druid] himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots
himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r356799820 ## File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java ## @@ -313,6 +325,25 @@ private void scheduleCompletedTaskStatusCleanupFromZk() ); } + /** + * Must not be used outside of this class and {@link HttpRemoteTaskRunnerResource} + */ + @SuppressWarnings("GuardedBy") // Read on workersWithUnacknowledgedTask is safe + public Map getWorkersEligibleToRunTasks() Review comment: changed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots
himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r356800295 ## File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java ## @@ -1432,15 +1533,35 @@ void taskAddedOrUpdated(final TaskAnnouncement announcement, final WorkerHolder { enum State { - PENDING(0), - RUNNING(1), - COMPLETE(2); + // Task has been given to HRTR, but a worker to run this task hasn't been identified yet. + PENDING(0, true, RunnerTaskState.PENDING), + + // A Worker has been identified to run this task, but request to run task hasn't been made to worker yet + // or worker hasn't acknowledged the task yet. + PENDING_WORKER_ASSIGN(1, true, RunnerTaskState.PENDING), - int index; + RUNNING(2, false, RunnerTaskState.RUNNING), + COMPLETE(3, false, RunnerTaskState.NONE); - State(int index) + private int index; + private boolean isPending; + private RunnerTaskState runnerTaskState; Review comment: changed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots
himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r356800260 ## File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java ## @@ -313,6 +325,25 @@ private void scheduleCompletedTaskStatusCleanupFromZk() ); } + /** + * Must not be used outside of this class and {@link HttpRemoteTaskRunnerResource} + */ + @SuppressWarnings("GuardedBy") // Read on workersWithUnacknowledgedTask is safe + public Map getWorkersEligibleToRunTasks() + { +return Maps.transformEntries( +Maps.filterEntries( +workers, +input -> !lazyWorkers.containsKey(input.getKey()) && + !workersWithUnacknowledgedTask.containsKey(input.getKey()) && + !blackListedWorkers.containsKey(input.getKey()) && Review comment: added comment to explain it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots
himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r356799820 ## File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java ## @@ -313,6 +325,25 @@ private void scheduleCompletedTaskStatusCleanupFromZk() ); } + /** + * Must not be used outside of this class and {@link HttpRemoteTaskRunnerResource} + */ + @SuppressWarnings("GuardedBy") // Read on workersWithUnacknowledgedTask is safe + public Map getWorkersEligibleToRunTasks() Review comment: changed, here and in few other similar places This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots
himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r356800295 ## File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java ## @@ -1432,15 +1533,35 @@ void taskAddedOrUpdated(final TaskAnnouncement announcement, final WorkerHolder { enum State { - PENDING(0), - RUNNING(1), - COMPLETE(2); + // Task has been given to HRTR, but a worker to run this task hasn't been identified yet. + PENDING(0, true, RunnerTaskState.PENDING), + + // A Worker has been identified to run this task, but request to run task hasn't been made to worker yet + // or worker hasn't acknowledged the task yet. + PENDING_WORKER_ASSIGN(1, true, RunnerTaskState.PENDING), - int index; + RUNNING(2, false, RunnerTaskState.RUNNING), + COMPLETE(3, false, RunnerTaskState.NONE); - State(int index) + private int index; + private boolean isPending; + private RunnerTaskState runnerTaskState; Review comment: changed, for some reason I thought enum state was immutable without even explicitly doing so.. duh This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots
himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r356800868 ## File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java ## @@ -1501,6 +1623,23 @@ public void setState(State state) state ); + setStateUnconditionally(state); +} + +public void revertStateFromPendingWorkerAssignToPending() +{ + Preconditions.checkState( + this.state == State.PENDING_WORKER_ASSIGN, + "Can't move state from [%s] to [%s]", + this.state, + State.PENDING + ); + + setStateUnconditionally(State.PENDING); +} + +private void setStateUnconditionally(State state) +{ if (log.isDebugEnabled()) { log.debug( new RuntimeException("Stacktrace..."), Review comment: yes, added a comment too. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots
himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r356801301 ## File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java ## @@ -1021,92 +1043,164 @@ public void unregisterListener(String listenerId) HttpRemoteTaskRunnerWorkItem.State.PENDING ); tasks.put(task.getId(), taskRunnerWorkItem); -addPendingTaskToExecutor(task.getId()); +pendingTaskIds.add(task.getId()); + +statusLock.notifyAll(); + return taskRunnerWorkItem.getResult(); } } } - private void addPendingTaskToExecutor(final String taskId) + private void startPendingTaskHandling() { -pendingTasksExec.execute( -() -> { - while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { -ImmutableWorkerInfo immutableWorker; -HttpRemoteTaskRunnerWorkItem taskItem = null; +for (int i = 0; i < config.getPendingTasksRunnerNumThreads(); i++) { + pendingTasksExec.submit( + () -> { try { - synchronized (statusLock) { -taskItem = tasks.get(taskId); + if (!lifecycleLock.awaitStarted()) { +log.makeAlert("Lifecycle not started, PendingTaskExecution loop will not run.").emit(); +return; + } -if (taskItem == null) { - log.info( - "Task[%s] work item not found. Probably user asked to shutdown before. Not assigning.", - taskId - ); - return; -} + pendingTasksExecutionLoop(); +} +catch (Throwable t) { + log.makeAlert(t, "Error while waiting for lifecycle start. PendingTaskExecution loop will not run") + .emit(); +} +finally { + log.info("PendingTaskExecution loop exited."); +} + } + ); +} + } -if (taskItem.getState() != HttpRemoteTaskRunnerWorkItem.State.PENDING) { - log.info( - "Task[%s] is in state[%s]. Probably some worker already reported it. Not assigning.", - taskId, - taskItem.getState() - ); - return; -} + private void pendingTasksExecutionLoop() + { +while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { + try { +// Find one pending task to run and a worker to run on +HttpRemoteTaskRunnerWorkItem taskItem = null; +ImmutableWorkerInfo immutableWorker = null; -if (taskItem.getTask() == null) { - throw new ISE("WTF! couldn't find Task instance for taskId[%s].", taskId); -} -immutableWorker = findWorkerToRunTask(taskItem.getTask()); - -if (immutableWorker == null) { - // no free worker, wait for some worker to become free - statusLock.wait(config.getWaitForWorkerSlot().toStandardDuration().getMillis()); - continue; -} else if (workersWithUnacknowledgedTask.putIfAbsent( -immutableWorker.getWorker().getHost(), -taskId -) != null) { - // there was a race and someone else took this worker slot, try again - continue; -} - } +synchronized (statusLock) { + Iterator iter = pendingTaskIds.iterator(); + while (iter.hasNext()) { +String taskId = iter.next(); +HttpRemoteTaskRunnerWorkItem ti = tasks.get(taskId); + +if (ti == null || !ti.getState().isPending()) { + // happens if the task was shutdown or was picked up earlier and no more pending + iter.remove(); + continue; +} - try { -// this will send HTTP request to worker for assigning task and hence kept -// outside the synchronized block. -if (runTaskOnWorker(taskItem, immutableWorker.getWorker().getHost())) { - return; -} - } - finally { - workersWithUnacknowledgedTask.remove(immutableWorker.getWorker().getHost()); -synchronized (statusLock) { - statusLock.notifyAll(); -} - } +if (ti.getState() == HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN) { + // picked up by another pending task executor thread. + continue;
[GitHub] [incubator-druid] himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots
himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r356801359 ## File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java ## @@ -1021,92 +1043,164 @@ public void unregisterListener(String listenerId) HttpRemoteTaskRunnerWorkItem.State.PENDING ); tasks.put(task.getId(), taskRunnerWorkItem); -addPendingTaskToExecutor(task.getId()); +pendingTaskIds.add(task.getId()); + +statusLock.notifyAll(); + return taskRunnerWorkItem.getResult(); } } } - private void addPendingTaskToExecutor(final String taskId) + private void startPendingTaskHandling() { -pendingTasksExec.execute( -() -> { - while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { -ImmutableWorkerInfo immutableWorker; -HttpRemoteTaskRunnerWorkItem taskItem = null; +for (int i = 0; i < config.getPendingTasksRunnerNumThreads(); i++) { + pendingTasksExec.submit( + () -> { try { - synchronized (statusLock) { -taskItem = tasks.get(taskId); + if (!lifecycleLock.awaitStarted()) { +log.makeAlert("Lifecycle not started, PendingTaskExecution loop will not run.").emit(); +return; + } -if (taskItem == null) { - log.info( - "Task[%s] work item not found. Probably user asked to shutdown before. Not assigning.", - taskId - ); - return; -} + pendingTasksExecutionLoop(); +} +catch (Throwable t) { + log.makeAlert(t, "Error while waiting for lifecycle start. PendingTaskExecution loop will not run") + .emit(); +} +finally { + log.info("PendingTaskExecution loop exited."); +} + } + ); +} + } -if (taskItem.getState() != HttpRemoteTaskRunnerWorkItem.State.PENDING) { - log.info( - "Task[%s] is in state[%s]. Probably some worker already reported it. Not assigning.", - taskId, - taskItem.getState() - ); - return; -} + private void pendingTasksExecutionLoop() + { +while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { + try { +// Find one pending task to run and a worker to run on +HttpRemoteTaskRunnerWorkItem taskItem = null; +ImmutableWorkerInfo immutableWorker = null; -if (taskItem.getTask() == null) { - throw new ISE("WTF! couldn't find Task instance for taskId[%s].", taskId); -} -immutableWorker = findWorkerToRunTask(taskItem.getTask()); - -if (immutableWorker == null) { - // no free worker, wait for some worker to become free - statusLock.wait(config.getWaitForWorkerSlot().toStandardDuration().getMillis()); - continue; -} else if (workersWithUnacknowledgedTask.putIfAbsent( -immutableWorker.getWorker().getHost(), -taskId -) != null) { - // there was a race and someone else took this worker slot, try again - continue; -} - } +synchronized (statusLock) { + Iterator iter = pendingTaskIds.iterator(); + while (iter.hasNext()) { +String taskId = iter.next(); +HttpRemoteTaskRunnerWorkItem ti = tasks.get(taskId); + +if (ti == null || !ti.getState().isPending()) { + // happens if the task was shutdown or was picked up earlier and no more pending + iter.remove(); + continue; +} - try { -// this will send HTTP request to worker for assigning task and hence kept -// outside the synchronized block. -if (runTaskOnWorker(taskItem, immutableWorker.getWorker().getHost())) { - return; -} - } - finally { - workersWithUnacknowledgedTask.remove(immutableWorker.getWorker().getHost()); -synchronized (statusLock) { - statusLock.notifyAll(); -} - } +if (ti.getState() == HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN) { + // picked up by another pending task executor thread. + continue;
[GitHub] [incubator-druid] himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots
himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r356803046 ## File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java ## @@ -1021,92 +1043,164 @@ public void unregisterListener(String listenerId) HttpRemoteTaskRunnerWorkItem.State.PENDING ); tasks.put(task.getId(), taskRunnerWorkItem); -addPendingTaskToExecutor(task.getId()); +pendingTaskIds.add(task.getId()); + +statusLock.notifyAll(); + return taskRunnerWorkItem.getResult(); } } } - private void addPendingTaskToExecutor(final String taskId) + private void startPendingTaskHandling() { -pendingTasksExec.execute( -() -> { - while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { -ImmutableWorkerInfo immutableWorker; -HttpRemoteTaskRunnerWorkItem taskItem = null; +for (int i = 0; i < config.getPendingTasksRunnerNumThreads(); i++) { + pendingTasksExec.submit( + () -> { try { - synchronized (statusLock) { -taskItem = tasks.get(taskId); + if (!lifecycleLock.awaitStarted()) { +log.makeAlert("Lifecycle not started, PendingTaskExecution loop will not run.").emit(); +return; + } -if (taskItem == null) { - log.info( - "Task[%s] work item not found. Probably user asked to shutdown before. Not assigning.", - taskId - ); - return; -} + pendingTasksExecutionLoop(); +} +catch (Throwable t) { + log.makeAlert(t, "Error while waiting for lifecycle start. PendingTaskExecution loop will not run") + .emit(); +} +finally { + log.info("PendingTaskExecution loop exited."); +} + } + ); +} + } -if (taskItem.getState() != HttpRemoteTaskRunnerWorkItem.State.PENDING) { - log.info( - "Task[%s] is in state[%s]. Probably some worker already reported it. Not assigning.", - taskId, - taskItem.getState() - ); - return; -} + private void pendingTasksExecutionLoop() + { +while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { + try { +// Find one pending task to run and a worker to run on +HttpRemoteTaskRunnerWorkItem taskItem = null; +ImmutableWorkerInfo immutableWorker = null; -if (taskItem.getTask() == null) { - throw new ISE("WTF! couldn't find Task instance for taskId[%s].", taskId); -} -immutableWorker = findWorkerToRunTask(taskItem.getTask()); - -if (immutableWorker == null) { - // no free worker, wait for some worker to become free - statusLock.wait(config.getWaitForWorkerSlot().toStandardDuration().getMillis()); - continue; -} else if (workersWithUnacknowledgedTask.putIfAbsent( -immutableWorker.getWorker().getHost(), -taskId -) != null) { - // there was a race and someone else took this worker slot, try again - continue; -} - } +synchronized (statusLock) { + Iterator iter = pendingTaskIds.iterator(); + while (iter.hasNext()) { +String taskId = iter.next(); +HttpRemoteTaskRunnerWorkItem ti = tasks.get(taskId); + +if (ti == null || !ti.getState().isPending()) { + // happens if the task was shutdown or was picked up earlier and no more pending + iter.remove(); Review comment: before leaving the lock , ti state is updated to PENDING_WORKER_ASSIGN so no other task executor thread can pick up. `pendingTaskIds` variable declaration has a comment that this variable is exclusively manipulated by only external task submitter threads or task executor threads which I preferred. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsub
[GitHub] [incubator-druid] himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots
himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r357028172 ## File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java ## @@ -1021,92 +1043,164 @@ public void unregisterListener(String listenerId) HttpRemoteTaskRunnerWorkItem.State.PENDING ); tasks.put(task.getId(), taskRunnerWorkItem); -addPendingTaskToExecutor(task.getId()); +pendingTaskIds.add(task.getId()); + +statusLock.notifyAll(); + return taskRunnerWorkItem.getResult(); } } } - private void addPendingTaskToExecutor(final String taskId) + private void startPendingTaskHandling() { -pendingTasksExec.execute( -() -> { - while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { -ImmutableWorkerInfo immutableWorker; -HttpRemoteTaskRunnerWorkItem taskItem = null; +for (int i = 0; i < config.getPendingTasksRunnerNumThreads(); i++) { + pendingTasksExec.submit( + () -> { try { - synchronized (statusLock) { -taskItem = tasks.get(taskId); + if (!lifecycleLock.awaitStarted()) { +log.makeAlert("Lifecycle not started, PendingTaskExecution loop will not run.").emit(); +return; + } -if (taskItem == null) { - log.info( - "Task[%s] work item not found. Probably user asked to shutdown before. Not assigning.", - taskId - ); - return; -} + pendingTasksExecutionLoop(); +} +catch (Throwable t) { + log.makeAlert(t, "Error while waiting for lifecycle start. PendingTaskExecution loop will not run") + .emit(); +} +finally { + log.info("PendingTaskExecution loop exited."); +} + } + ); +} + } -if (taskItem.getState() != HttpRemoteTaskRunnerWorkItem.State.PENDING) { - log.info( - "Task[%s] is in state[%s]. Probably some worker already reported it. Not assigning.", - taskId, - taskItem.getState() - ); - return; -} + private void pendingTasksExecutionLoop() + { +while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { + try { +// Find one pending task to run and a worker to run on +HttpRemoteTaskRunnerWorkItem taskItem = null; +ImmutableWorkerInfo immutableWorker = null; -if (taskItem.getTask() == null) { - throw new ISE("WTF! couldn't find Task instance for taskId[%s].", taskId); -} -immutableWorker = findWorkerToRunTask(taskItem.getTask()); - -if (immutableWorker == null) { - // no free worker, wait for some worker to become free - statusLock.wait(config.getWaitForWorkerSlot().toStandardDuration().getMillis()); - continue; -} else if (workersWithUnacknowledgedTask.putIfAbsent( -immutableWorker.getWorker().getHost(), -taskId -) != null) { - // there was a race and someone else took this worker slot, try again - continue; -} - } +synchronized (statusLock) { + Iterator iter = pendingTaskIds.iterator(); + while (iter.hasNext()) { +String taskId = iter.next(); +HttpRemoteTaskRunnerWorkItem ti = tasks.get(taskId); + +if (ti == null || !ti.getState().isPending()) { + // happens if the task was shutdown or was picked up earlier and no more pending + iter.remove(); Review comment: > Do you mean the other threads can pick up even after the ti state is set to PENDING_WORKER_ASSIGN but will ignore it in the below if clause? Or, do they really not pick up because somehow pendingTaskIds is updated before other threads start picking up? yes, other executor threads would see that but ignore because of the if clause you noted. > But can pendingTaskIds be updated after this line? Like as the below code: ... if we removed here then we might have to add it back just in case task couldn't run for whatever reason, adding back would be complex because we will find out that we need to add it back at some later time and also adding back to its old position in the list would be difficult. as opposed to all that we remove it
[GitHub] [incubator-druid] himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots
himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r357028172 ## File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java ## @@ -1021,92 +1043,164 @@ public void unregisterListener(String listenerId) HttpRemoteTaskRunnerWorkItem.State.PENDING ); tasks.put(task.getId(), taskRunnerWorkItem); -addPendingTaskToExecutor(task.getId()); +pendingTaskIds.add(task.getId()); + +statusLock.notifyAll(); + return taskRunnerWorkItem.getResult(); } } } - private void addPendingTaskToExecutor(final String taskId) + private void startPendingTaskHandling() { -pendingTasksExec.execute( -() -> { - while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { -ImmutableWorkerInfo immutableWorker; -HttpRemoteTaskRunnerWorkItem taskItem = null; +for (int i = 0; i < config.getPendingTasksRunnerNumThreads(); i++) { + pendingTasksExec.submit( + () -> { try { - synchronized (statusLock) { -taskItem = tasks.get(taskId); + if (!lifecycleLock.awaitStarted()) { +log.makeAlert("Lifecycle not started, PendingTaskExecution loop will not run.").emit(); +return; + } -if (taskItem == null) { - log.info( - "Task[%s] work item not found. Probably user asked to shutdown before. Not assigning.", - taskId - ); - return; -} + pendingTasksExecutionLoop(); +} +catch (Throwable t) { + log.makeAlert(t, "Error while waiting for lifecycle start. PendingTaskExecution loop will not run") + .emit(); +} +finally { + log.info("PendingTaskExecution loop exited."); +} + } + ); +} + } -if (taskItem.getState() != HttpRemoteTaskRunnerWorkItem.State.PENDING) { - log.info( - "Task[%s] is in state[%s]. Probably some worker already reported it. Not assigning.", - taskId, - taskItem.getState() - ); - return; -} + private void pendingTasksExecutionLoop() + { +while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { + try { +// Find one pending task to run and a worker to run on +HttpRemoteTaskRunnerWorkItem taskItem = null; +ImmutableWorkerInfo immutableWorker = null; -if (taskItem.getTask() == null) { - throw new ISE("WTF! couldn't find Task instance for taskId[%s].", taskId); -} -immutableWorker = findWorkerToRunTask(taskItem.getTask()); - -if (immutableWorker == null) { - // no free worker, wait for some worker to become free - statusLock.wait(config.getWaitForWorkerSlot().toStandardDuration().getMillis()); - continue; -} else if (workersWithUnacknowledgedTask.putIfAbsent( -immutableWorker.getWorker().getHost(), -taskId -) != null) { - // there was a race and someone else took this worker slot, try again - continue; -} - } +synchronized (statusLock) { + Iterator iter = pendingTaskIds.iterator(); + while (iter.hasNext()) { +String taskId = iter.next(); +HttpRemoteTaskRunnerWorkItem ti = tasks.get(taskId); + +if (ti == null || !ti.getState().isPending()) { + // happens if the task was shutdown or was picked up earlier and no more pending + iter.remove(); Review comment: > Do you mean the other threads can pick up even after the ti state is set to PENDING_WORKER_ASSIGN but will ignore it in the below if clause? Or, do they really not pick up because somehow pendingTaskIds is updated before other threads start picking up? yes, other executor threads would see that but ignore because of the if clause you noted. > But can pendingTaskIds be updated after this line? Like as the below code: ... if we removed here then we might have to add it back just in case task couldn't run for whatever reason, adding back would be complex because we will find out that we need to add it back at some later time and also adding back to its old position in the list would be difficult. as opposed to all that we remove it
[GitHub] [incubator-druid] himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots
himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r357032841 ## File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java ## @@ -1021,92 +1043,164 @@ public void unregisterListener(String listenerId) HttpRemoteTaskRunnerWorkItem.State.PENDING ); tasks.put(task.getId(), taskRunnerWorkItem); -addPendingTaskToExecutor(task.getId()); +pendingTaskIds.add(task.getId()); + +statusLock.notifyAll(); + return taskRunnerWorkItem.getResult(); } } } - private void addPendingTaskToExecutor(final String taskId) + private void startPendingTaskHandling() { -pendingTasksExec.execute( -() -> { - while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { -ImmutableWorkerInfo immutableWorker; -HttpRemoteTaskRunnerWorkItem taskItem = null; +for (int i = 0; i < config.getPendingTasksRunnerNumThreads(); i++) { + pendingTasksExec.submit( + () -> { try { - synchronized (statusLock) { -taskItem = tasks.get(taskId); + if (!lifecycleLock.awaitStarted()) { +log.makeAlert("Lifecycle not started, PendingTaskExecution loop will not run.").emit(); +return; + } -if (taskItem == null) { - log.info( - "Task[%s] work item not found. Probably user asked to shutdown before. Not assigning.", - taskId - ); - return; -} + pendingTasksExecutionLoop(); +} +catch (Throwable t) { + log.makeAlert(t, "Error while waiting for lifecycle start. PendingTaskExecution loop will not run") + .emit(); +} +finally { + log.info("PendingTaskExecution loop exited."); +} + } + ); +} + } -if (taskItem.getState() != HttpRemoteTaskRunnerWorkItem.State.PENDING) { - log.info( - "Task[%s] is in state[%s]. Probably some worker already reported it. Not assigning.", - taskId, - taskItem.getState() - ); - return; -} + private void pendingTasksExecutionLoop() + { +while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { + try { +// Find one pending task to run and a worker to run on +HttpRemoteTaskRunnerWorkItem taskItem = null; +ImmutableWorkerInfo immutableWorker = null; -if (taskItem.getTask() == null) { - throw new ISE("WTF! couldn't find Task instance for taskId[%s].", taskId); -} -immutableWorker = findWorkerToRunTask(taskItem.getTask()); - -if (immutableWorker == null) { - // no free worker, wait for some worker to become free - statusLock.wait(config.getWaitForWorkerSlot().toStandardDuration().getMillis()); - continue; -} else if (workersWithUnacknowledgedTask.putIfAbsent( -immutableWorker.getWorker().getHost(), -taskId -) != null) { - // there was a race and someone else took this worker slot, try again - continue; -} - } +synchronized (statusLock) { + Iterator iter = pendingTaskIds.iterator(); + while (iter.hasNext()) { +String taskId = iter.next(); +HttpRemoteTaskRunnerWorkItem ti = tasks.get(taskId); + +if (ti == null || !ti.getState().isPending()) { + // happens if the task was shutdown or was picked up earlier and no more pending + iter.remove(); Review comment: if you still have doubts, maybe describe a scenario where you think this doesn't work and I will try to explain how I think that scenario would play out :) . This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots
himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r357322140 ## File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java ## @@ -1021,92 +1043,164 @@ public void unregisterListener(String listenerId) HttpRemoteTaskRunnerWorkItem.State.PENDING ); tasks.put(task.getId(), taskRunnerWorkItem); -addPendingTaskToExecutor(task.getId()); +pendingTaskIds.add(task.getId()); + +statusLock.notifyAll(); + return taskRunnerWorkItem.getResult(); } } } - private void addPendingTaskToExecutor(final String taskId) + private void startPendingTaskHandling() { -pendingTasksExec.execute( -() -> { - while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { -ImmutableWorkerInfo immutableWorker; -HttpRemoteTaskRunnerWorkItem taskItem = null; +for (int i = 0; i < config.getPendingTasksRunnerNumThreads(); i++) { + pendingTasksExec.submit( + () -> { try { - synchronized (statusLock) { -taskItem = tasks.get(taskId); + if (!lifecycleLock.awaitStarted()) { +log.makeAlert("Lifecycle not started, PendingTaskExecution loop will not run.").emit(); +return; + } -if (taskItem == null) { - log.info( - "Task[%s] work item not found. Probably user asked to shutdown before. Not assigning.", - taskId - ); - return; -} + pendingTasksExecutionLoop(); +} +catch (Throwable t) { + log.makeAlert(t, "Error while waiting for lifecycle start. PendingTaskExecution loop will not run") + .emit(); +} +finally { + log.info("PendingTaskExecution loop exited."); +} + } + ); +} + } -if (taskItem.getState() != HttpRemoteTaskRunnerWorkItem.State.PENDING) { - log.info( - "Task[%s] is in state[%s]. Probably some worker already reported it. Not assigning.", - taskId, - taskItem.getState() - ); - return; -} + private void pendingTasksExecutionLoop() + { +while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { + try { +// Find one pending task to run and a worker to run on +HttpRemoteTaskRunnerWorkItem taskItem = null; +ImmutableWorkerInfo immutableWorker = null; -if (taskItem.getTask() == null) { - throw new ISE("WTF! couldn't find Task instance for taskId[%s].", taskId); -} -immutableWorker = findWorkerToRunTask(taskItem.getTask()); - -if (immutableWorker == null) { - // no free worker, wait for some worker to become free - statusLock.wait(config.getWaitForWorkerSlot().toStandardDuration().getMillis()); - continue; -} else if (workersWithUnacknowledgedTask.putIfAbsent( -immutableWorker.getWorker().getHost(), -taskId -) != null) { - // there was a race and someone else took this worker slot, try again - continue; -} - } +synchronized (statusLock) { + Iterator iter = pendingTaskIds.iterator(); + while (iter.hasNext()) { +String taskId = iter.next(); +HttpRemoteTaskRunnerWorkItem ti = tasks.get(taskId); + +if (ti == null || !ti.getState().isPending()) { + // happens if the task was shutdown or was picked up earlier and no more pending + iter.remove(); Review comment: got it. sure, added some commentary to hopefully make it clearer. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [incubator-druid] himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots
himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r357343567 ## File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java ## @@ -1138,6 +1139,12 @@ private void pendingTasksExecutionLoop() // set state to PENDING_WORKER_ASSIGN before releasing the lock so that this task item is not picked // up by another task execution thread. +// note that we can't simply delete this task item from pendingTaskIds or else we would have to add it +// back if this thread couldn't run this task for any reason, which we will know at some later time +// and also we will need to add it back to its old position in the list. that becomes complex quickly. +// Instead we keep the PENDING_WORKER_ASSIGN to notify other task execution threads not to pick this one up. +// And, it is automatically removed by any of the task exeuction threads when they notice that Review comment: fixed, thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org