[GitHub] [incubator-druid] himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots

2019-12-11 Thread GitBox
himanshug commented on a change in pull request #8697: HRTR: make pending task 
execution handling to go through all tasks on not finding worker slots
URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r356799820
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
 ##
 @@ -313,6 +325,25 @@ private void scheduleCompletedTaskStatusCleanupFromZk()
 );
   }
 
+  /**
+   * Must not be used outside of this class and {@link 
HttpRemoteTaskRunnerResource}
+   */
+  @SuppressWarnings("GuardedBy") // Read on workersWithUnacknowledgedTask is 
safe
+  public Map getWorkersEligibleToRunTasks()
 
 Review comment:
   changed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots

2019-12-11 Thread GitBox
himanshug commented on a change in pull request #8697: HRTR: make pending task 
execution handling to go through all tasks on not finding worker slots
URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r356800295
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
 ##
 @@ -1432,15 +1533,35 @@ void taskAddedOrUpdated(final TaskAnnouncement 
announcement, final WorkerHolder
   {
 enum State
 {
-  PENDING(0),
-  RUNNING(1),
-  COMPLETE(2);
+  // Task has been given to HRTR, but a worker to run this task hasn't 
been identified yet.
+  PENDING(0, true, RunnerTaskState.PENDING),
+
+  // A Worker has been identified to run this task, but request to run 
task hasn't been made to worker yet
+  // or worker hasn't acknowledged the task yet.
+  PENDING_WORKER_ASSIGN(1, true, RunnerTaskState.PENDING),
 
-  int index;
+  RUNNING(2, false, RunnerTaskState.RUNNING),
+  COMPLETE(3, false, RunnerTaskState.NONE);
 
-  State(int index)
+  private int index;
+  private boolean isPending;
+  private RunnerTaskState runnerTaskState;
 
 Review comment:
   changed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots

2019-12-11 Thread GitBox
himanshug commented on a change in pull request #8697: HRTR: make pending task 
execution handling to go through all tasks on not finding worker slots
URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r356800260
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
 ##
 @@ -313,6 +325,25 @@ private void scheduleCompletedTaskStatusCleanupFromZk()
 );
   }
 
+  /**
+   * Must not be used outside of this class and {@link 
HttpRemoteTaskRunnerResource}
+   */
+  @SuppressWarnings("GuardedBy") // Read on workersWithUnacknowledgedTask is 
safe
+  public Map getWorkersEligibleToRunTasks()
+  {
+return Maps.transformEntries(
+Maps.filterEntries(
+workers,
+input -> !lazyWorkers.containsKey(input.getKey()) &&
+ 
!workersWithUnacknowledgedTask.containsKey(input.getKey()) &&
+ !blackListedWorkers.containsKey(input.getKey()) &&
 
 Review comment:
   added comment to explain it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots

2019-12-11 Thread GitBox
himanshug commented on a change in pull request #8697: HRTR: make pending task 
execution handling to go through all tasks on not finding worker slots
URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r356799820
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
 ##
 @@ -313,6 +325,25 @@ private void scheduleCompletedTaskStatusCleanupFromZk()
 );
   }
 
+  /**
+   * Must not be used outside of this class and {@link 
HttpRemoteTaskRunnerResource}
+   */
+  @SuppressWarnings("GuardedBy") // Read on workersWithUnacknowledgedTask is 
safe
+  public Map getWorkersEligibleToRunTasks()
 
 Review comment:
   changed, here and in few other similar places


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots

2019-12-11 Thread GitBox
himanshug commented on a change in pull request #8697: HRTR: make pending task 
execution handling to go through all tasks on not finding worker slots
URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r356800295
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
 ##
 @@ -1432,15 +1533,35 @@ void taskAddedOrUpdated(final TaskAnnouncement 
announcement, final WorkerHolder
   {
 enum State
 {
-  PENDING(0),
-  RUNNING(1),
-  COMPLETE(2);
+  // Task has been given to HRTR, but a worker to run this task hasn't 
been identified yet.
+  PENDING(0, true, RunnerTaskState.PENDING),
+
+  // A Worker has been identified to run this task, but request to run 
task hasn't been made to worker yet
+  // or worker hasn't acknowledged the task yet.
+  PENDING_WORKER_ASSIGN(1, true, RunnerTaskState.PENDING),
 
-  int index;
+  RUNNING(2, false, RunnerTaskState.RUNNING),
+  COMPLETE(3, false, RunnerTaskState.NONE);
 
-  State(int index)
+  private int index;
+  private boolean isPending;
+  private RunnerTaskState runnerTaskState;
 
 Review comment:
   changed, for some reason I thought enum state was immutable without even 
explicitly doing so.. duh


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots

2019-12-11 Thread GitBox
himanshug commented on a change in pull request #8697: HRTR: make pending task 
execution handling to go through all tasks on not finding worker slots
URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r356800868
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
 ##
 @@ -1501,6 +1623,23 @@ public void setState(State state)
   state
   );
 
+  setStateUnconditionally(state);
+}
+
+public void revertStateFromPendingWorkerAssignToPending()
+{
+  Preconditions.checkState(
+  this.state == State.PENDING_WORKER_ASSIGN,
+  "Can't move state from [%s] to [%s]",
+  this.state,
+  State.PENDING
+  );
+
+  setStateUnconditionally(State.PENDING);
+}
+
+private void setStateUnconditionally(State state)
+{
   if (log.isDebugEnabled()) {
 log.debug(
 new RuntimeException("Stacktrace..."),
 
 Review comment:
   yes, added a comment too.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots

2019-12-11 Thread GitBox
himanshug commented on a change in pull request #8697: HRTR: make pending task 
execution handling to go through all tasks on not finding worker slots
URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r356801301
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
 ##
 @@ -1021,92 +1043,164 @@ public void unregisterListener(String listenerId)
 HttpRemoteTaskRunnerWorkItem.State.PENDING
 );
 tasks.put(task.getId(), taskRunnerWorkItem);
-addPendingTaskToExecutor(task.getId());
+pendingTaskIds.add(task.getId());
+
+statusLock.notifyAll();
+
 return taskRunnerWorkItem.getResult();
   }
 }
   }
 
-  private void addPendingTaskToExecutor(final String taskId)
+  private void startPendingTaskHandling()
   {
-pendingTasksExec.execute(
-() -> {
-  while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, 
TimeUnit.MILLISECONDS)) {
-ImmutableWorkerInfo immutableWorker;
-HttpRemoteTaskRunnerWorkItem taskItem = null;
+for (int i = 0; i < config.getPendingTasksRunnerNumThreads(); i++) {
+  pendingTasksExec.submit(
+  () -> {
 try {
-  synchronized (statusLock) {
-taskItem = tasks.get(taskId);
+  if (!lifecycleLock.awaitStarted()) {
+log.makeAlert("Lifecycle not started, PendingTaskExecution 
loop will not run.").emit();
+return;
+  }
 
-if (taskItem == null) {
-  log.info(
-  "Task[%s] work item not found. Probably user asked to 
shutdown before. Not assigning.",
-  taskId
-  );
-  return;
-}
+  pendingTasksExecutionLoop();
+}
+catch (Throwable t) {
+  log.makeAlert(t, "Error while waiting for lifecycle start. 
PendingTaskExecution loop will not run")
+ .emit();
+}
+finally {
+  log.info("PendingTaskExecution loop exited.");
+}
+  }
+  );
+}
+  }
 
-if (taskItem.getState() != 
HttpRemoteTaskRunnerWorkItem.State.PENDING) {
-  log.info(
-  "Task[%s] is in state[%s]. Probably some worker already 
reported it. Not assigning.",
-  taskId,
-  taskItem.getState()
-  );
-  return;
-}
+  private void pendingTasksExecutionLoop()
+  {
+while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, 
TimeUnit.MILLISECONDS)) {
+  try {
+// Find one pending task to run and a worker to run on
+HttpRemoteTaskRunnerWorkItem taskItem = null;
+ImmutableWorkerInfo immutableWorker = null;
 
-if (taskItem.getTask() == null) {
-  throw new ISE("WTF! couldn't find Task instance for 
taskId[%s].", taskId);
-}
-immutableWorker = findWorkerToRunTask(taskItem.getTask());
-
-if (immutableWorker == null) {
-  // no free worker, wait for some worker to become free
-  
statusLock.wait(config.getWaitForWorkerSlot().toStandardDuration().getMillis());
-  continue;
-} else if (workersWithUnacknowledgedTask.putIfAbsent(
-immutableWorker.getWorker().getHost(),
-taskId
-) != null) {
-  // there was a race and someone else took this worker slot, 
try again
-  continue;
-}
-  }
+synchronized (statusLock) {
+  Iterator iter = pendingTaskIds.iterator();
+  while (iter.hasNext()) {
+String taskId = iter.next();
+HttpRemoteTaskRunnerWorkItem ti = tasks.get(taskId);
+
+if (ti == null || !ti.getState().isPending()) {
+  // happens if the task was shutdown or was picked up earlier and 
no more pending
+  iter.remove();
+  continue;
+}
 
-  try {
-// this will send HTTP request to worker for assigning task 
and hence kept
-// outside the synchronized block.
-if (runTaskOnWorker(taskItem, 
immutableWorker.getWorker().getHost())) {
-  return;
-}
-  }
-  finally {
-
workersWithUnacknowledgedTask.remove(immutableWorker.getWorker().getHost());
-synchronized (statusLock) {
-  statusLock.notifyAll();
-}
-  }
+if (ti.getState() == 
HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN) {
+  // picked up by another pending task executor thread.
+  continue;

[GitHub] [incubator-druid] himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots

2019-12-11 Thread GitBox
himanshug commented on a change in pull request #8697: HRTR: make pending task 
execution handling to go through all tasks on not finding worker slots
URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r356801359
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
 ##
 @@ -1021,92 +1043,164 @@ public void unregisterListener(String listenerId)
 HttpRemoteTaskRunnerWorkItem.State.PENDING
 );
 tasks.put(task.getId(), taskRunnerWorkItem);
-addPendingTaskToExecutor(task.getId());
+pendingTaskIds.add(task.getId());
+
+statusLock.notifyAll();
+
 return taskRunnerWorkItem.getResult();
   }
 }
   }
 
-  private void addPendingTaskToExecutor(final String taskId)
+  private void startPendingTaskHandling()
   {
-pendingTasksExec.execute(
-() -> {
-  while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, 
TimeUnit.MILLISECONDS)) {
-ImmutableWorkerInfo immutableWorker;
-HttpRemoteTaskRunnerWorkItem taskItem = null;
+for (int i = 0; i < config.getPendingTasksRunnerNumThreads(); i++) {
+  pendingTasksExec.submit(
+  () -> {
 try {
-  synchronized (statusLock) {
-taskItem = tasks.get(taskId);
+  if (!lifecycleLock.awaitStarted()) {
+log.makeAlert("Lifecycle not started, PendingTaskExecution 
loop will not run.").emit();
+return;
+  }
 
-if (taskItem == null) {
-  log.info(
-  "Task[%s] work item not found. Probably user asked to 
shutdown before. Not assigning.",
-  taskId
-  );
-  return;
-}
+  pendingTasksExecutionLoop();
+}
+catch (Throwable t) {
+  log.makeAlert(t, "Error while waiting for lifecycle start. 
PendingTaskExecution loop will not run")
+ .emit();
+}
+finally {
+  log.info("PendingTaskExecution loop exited.");
+}
+  }
+  );
+}
+  }
 
-if (taskItem.getState() != 
HttpRemoteTaskRunnerWorkItem.State.PENDING) {
-  log.info(
-  "Task[%s] is in state[%s]. Probably some worker already 
reported it. Not assigning.",
-  taskId,
-  taskItem.getState()
-  );
-  return;
-}
+  private void pendingTasksExecutionLoop()
+  {
+while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, 
TimeUnit.MILLISECONDS)) {
+  try {
+// Find one pending task to run and a worker to run on
+HttpRemoteTaskRunnerWorkItem taskItem = null;
+ImmutableWorkerInfo immutableWorker = null;
 
-if (taskItem.getTask() == null) {
-  throw new ISE("WTF! couldn't find Task instance for 
taskId[%s].", taskId);
-}
-immutableWorker = findWorkerToRunTask(taskItem.getTask());
-
-if (immutableWorker == null) {
-  // no free worker, wait for some worker to become free
-  
statusLock.wait(config.getWaitForWorkerSlot().toStandardDuration().getMillis());
-  continue;
-} else if (workersWithUnacknowledgedTask.putIfAbsent(
-immutableWorker.getWorker().getHost(),
-taskId
-) != null) {
-  // there was a race and someone else took this worker slot, 
try again
-  continue;
-}
-  }
+synchronized (statusLock) {
+  Iterator iter = pendingTaskIds.iterator();
+  while (iter.hasNext()) {
+String taskId = iter.next();
+HttpRemoteTaskRunnerWorkItem ti = tasks.get(taskId);
+
+if (ti == null || !ti.getState().isPending()) {
+  // happens if the task was shutdown or was picked up earlier and 
no more pending
+  iter.remove();
+  continue;
+}
 
-  try {
-// this will send HTTP request to worker for assigning task 
and hence kept
-// outside the synchronized block.
-if (runTaskOnWorker(taskItem, 
immutableWorker.getWorker().getHost())) {
-  return;
-}
-  }
-  finally {
-
workersWithUnacknowledgedTask.remove(immutableWorker.getWorker().getHost());
-synchronized (statusLock) {
-  statusLock.notifyAll();
-}
-  }
+if (ti.getState() == 
HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN) {
+  // picked up by another pending task executor thread.
+  continue;

[GitHub] [incubator-druid] himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots

2019-12-11 Thread GitBox
himanshug commented on a change in pull request #8697: HRTR: make pending task 
execution handling to go through all tasks on not finding worker slots
URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r356803046
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
 ##
 @@ -1021,92 +1043,164 @@ public void unregisterListener(String listenerId)
 HttpRemoteTaskRunnerWorkItem.State.PENDING
 );
 tasks.put(task.getId(), taskRunnerWorkItem);
-addPendingTaskToExecutor(task.getId());
+pendingTaskIds.add(task.getId());
+
+statusLock.notifyAll();
+
 return taskRunnerWorkItem.getResult();
   }
 }
   }
 
-  private void addPendingTaskToExecutor(final String taskId)
+  private void startPendingTaskHandling()
   {
-pendingTasksExec.execute(
-() -> {
-  while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, 
TimeUnit.MILLISECONDS)) {
-ImmutableWorkerInfo immutableWorker;
-HttpRemoteTaskRunnerWorkItem taskItem = null;
+for (int i = 0; i < config.getPendingTasksRunnerNumThreads(); i++) {
+  pendingTasksExec.submit(
+  () -> {
 try {
-  synchronized (statusLock) {
-taskItem = tasks.get(taskId);
+  if (!lifecycleLock.awaitStarted()) {
+log.makeAlert("Lifecycle not started, PendingTaskExecution 
loop will not run.").emit();
+return;
+  }
 
-if (taskItem == null) {
-  log.info(
-  "Task[%s] work item not found. Probably user asked to 
shutdown before. Not assigning.",
-  taskId
-  );
-  return;
-}
+  pendingTasksExecutionLoop();
+}
+catch (Throwable t) {
+  log.makeAlert(t, "Error while waiting for lifecycle start. 
PendingTaskExecution loop will not run")
+ .emit();
+}
+finally {
+  log.info("PendingTaskExecution loop exited.");
+}
+  }
+  );
+}
+  }
 
-if (taskItem.getState() != 
HttpRemoteTaskRunnerWorkItem.State.PENDING) {
-  log.info(
-  "Task[%s] is in state[%s]. Probably some worker already 
reported it. Not assigning.",
-  taskId,
-  taskItem.getState()
-  );
-  return;
-}
+  private void pendingTasksExecutionLoop()
+  {
+while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, 
TimeUnit.MILLISECONDS)) {
+  try {
+// Find one pending task to run and a worker to run on
+HttpRemoteTaskRunnerWorkItem taskItem = null;
+ImmutableWorkerInfo immutableWorker = null;
 
-if (taskItem.getTask() == null) {
-  throw new ISE("WTF! couldn't find Task instance for 
taskId[%s].", taskId);
-}
-immutableWorker = findWorkerToRunTask(taskItem.getTask());
-
-if (immutableWorker == null) {
-  // no free worker, wait for some worker to become free
-  
statusLock.wait(config.getWaitForWorkerSlot().toStandardDuration().getMillis());
-  continue;
-} else if (workersWithUnacknowledgedTask.putIfAbsent(
-immutableWorker.getWorker().getHost(),
-taskId
-) != null) {
-  // there was a race and someone else took this worker slot, 
try again
-  continue;
-}
-  }
+synchronized (statusLock) {
+  Iterator iter = pendingTaskIds.iterator();
+  while (iter.hasNext()) {
+String taskId = iter.next();
+HttpRemoteTaskRunnerWorkItem ti = tasks.get(taskId);
+
+if (ti == null || !ti.getState().isPending()) {
+  // happens if the task was shutdown or was picked up earlier and 
no more pending
+  iter.remove();
 
 Review comment:
   before leaving the lock , ti state is updated to PENDING_WORKER_ASSIGN so no 
other task executor thread can pick up.
   
   `pendingTaskIds` variable declaration has a comment that this variable is 
exclusively manipulated by only external task submitter threads or task 
executor threads which I preferred.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsub

[GitHub] [incubator-druid] himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots

2019-12-12 Thread GitBox
himanshug commented on a change in pull request #8697: HRTR: make pending task 
execution handling to go through all tasks on not finding worker slots
URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r357028172
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
 ##
 @@ -1021,92 +1043,164 @@ public void unregisterListener(String listenerId)
 HttpRemoteTaskRunnerWorkItem.State.PENDING
 );
 tasks.put(task.getId(), taskRunnerWorkItem);
-addPendingTaskToExecutor(task.getId());
+pendingTaskIds.add(task.getId());
+
+statusLock.notifyAll();
+
 return taskRunnerWorkItem.getResult();
   }
 }
   }
 
-  private void addPendingTaskToExecutor(final String taskId)
+  private void startPendingTaskHandling()
   {
-pendingTasksExec.execute(
-() -> {
-  while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, 
TimeUnit.MILLISECONDS)) {
-ImmutableWorkerInfo immutableWorker;
-HttpRemoteTaskRunnerWorkItem taskItem = null;
+for (int i = 0; i < config.getPendingTasksRunnerNumThreads(); i++) {
+  pendingTasksExec.submit(
+  () -> {
 try {
-  synchronized (statusLock) {
-taskItem = tasks.get(taskId);
+  if (!lifecycleLock.awaitStarted()) {
+log.makeAlert("Lifecycle not started, PendingTaskExecution 
loop will not run.").emit();
+return;
+  }
 
-if (taskItem == null) {
-  log.info(
-  "Task[%s] work item not found. Probably user asked to 
shutdown before. Not assigning.",
-  taskId
-  );
-  return;
-}
+  pendingTasksExecutionLoop();
+}
+catch (Throwable t) {
+  log.makeAlert(t, "Error while waiting for lifecycle start. 
PendingTaskExecution loop will not run")
+ .emit();
+}
+finally {
+  log.info("PendingTaskExecution loop exited.");
+}
+  }
+  );
+}
+  }
 
-if (taskItem.getState() != 
HttpRemoteTaskRunnerWorkItem.State.PENDING) {
-  log.info(
-  "Task[%s] is in state[%s]. Probably some worker already 
reported it. Not assigning.",
-  taskId,
-  taskItem.getState()
-  );
-  return;
-}
+  private void pendingTasksExecutionLoop()
+  {
+while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, 
TimeUnit.MILLISECONDS)) {
+  try {
+// Find one pending task to run and a worker to run on
+HttpRemoteTaskRunnerWorkItem taskItem = null;
+ImmutableWorkerInfo immutableWorker = null;
 
-if (taskItem.getTask() == null) {
-  throw new ISE("WTF! couldn't find Task instance for 
taskId[%s].", taskId);
-}
-immutableWorker = findWorkerToRunTask(taskItem.getTask());
-
-if (immutableWorker == null) {
-  // no free worker, wait for some worker to become free
-  
statusLock.wait(config.getWaitForWorkerSlot().toStandardDuration().getMillis());
-  continue;
-} else if (workersWithUnacknowledgedTask.putIfAbsent(
-immutableWorker.getWorker().getHost(),
-taskId
-) != null) {
-  // there was a race and someone else took this worker slot, 
try again
-  continue;
-}
-  }
+synchronized (statusLock) {
+  Iterator iter = pendingTaskIds.iterator();
+  while (iter.hasNext()) {
+String taskId = iter.next();
+HttpRemoteTaskRunnerWorkItem ti = tasks.get(taskId);
+
+if (ti == null || !ti.getState().isPending()) {
+  // happens if the task was shutdown or was picked up earlier and 
no more pending
+  iter.remove();
 
 Review comment:
   > Do you mean the other threads can pick up even after the ti state is set 
to PENDING_WORKER_ASSIGN but will ignore it in the below if clause? Or, do they 
really not pick up because somehow pendingTaskIds is updated before other 
threads start picking up?
   
   yes, other executor threads would see that but ignore because of the if 
clause you noted.
   
   > But can pendingTaskIds be updated after this line? Like as the below code: 
...
   
   if we removed here then we might have to add it back just in case task 
couldn't run for whatever reason, adding back would be complex because we will 
find out that we need to add it back at some later time and also adding back to 
its old position in the list would be difficult.
   as opposed to all that we remove it

[GitHub] [incubator-druid] himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots

2019-12-12 Thread GitBox
himanshug commented on a change in pull request #8697: HRTR: make pending task 
execution handling to go through all tasks on not finding worker slots
URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r357028172
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
 ##
 @@ -1021,92 +1043,164 @@ public void unregisterListener(String listenerId)
 HttpRemoteTaskRunnerWorkItem.State.PENDING
 );
 tasks.put(task.getId(), taskRunnerWorkItem);
-addPendingTaskToExecutor(task.getId());
+pendingTaskIds.add(task.getId());
+
+statusLock.notifyAll();
+
 return taskRunnerWorkItem.getResult();
   }
 }
   }
 
-  private void addPendingTaskToExecutor(final String taskId)
+  private void startPendingTaskHandling()
   {
-pendingTasksExec.execute(
-() -> {
-  while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, 
TimeUnit.MILLISECONDS)) {
-ImmutableWorkerInfo immutableWorker;
-HttpRemoteTaskRunnerWorkItem taskItem = null;
+for (int i = 0; i < config.getPendingTasksRunnerNumThreads(); i++) {
+  pendingTasksExec.submit(
+  () -> {
 try {
-  synchronized (statusLock) {
-taskItem = tasks.get(taskId);
+  if (!lifecycleLock.awaitStarted()) {
+log.makeAlert("Lifecycle not started, PendingTaskExecution 
loop will not run.").emit();
+return;
+  }
 
-if (taskItem == null) {
-  log.info(
-  "Task[%s] work item not found. Probably user asked to 
shutdown before. Not assigning.",
-  taskId
-  );
-  return;
-}
+  pendingTasksExecutionLoop();
+}
+catch (Throwable t) {
+  log.makeAlert(t, "Error while waiting for lifecycle start. 
PendingTaskExecution loop will not run")
+ .emit();
+}
+finally {
+  log.info("PendingTaskExecution loop exited.");
+}
+  }
+  );
+}
+  }
 
-if (taskItem.getState() != 
HttpRemoteTaskRunnerWorkItem.State.PENDING) {
-  log.info(
-  "Task[%s] is in state[%s]. Probably some worker already 
reported it. Not assigning.",
-  taskId,
-  taskItem.getState()
-  );
-  return;
-}
+  private void pendingTasksExecutionLoop()
+  {
+while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, 
TimeUnit.MILLISECONDS)) {
+  try {
+// Find one pending task to run and a worker to run on
+HttpRemoteTaskRunnerWorkItem taskItem = null;
+ImmutableWorkerInfo immutableWorker = null;
 
-if (taskItem.getTask() == null) {
-  throw new ISE("WTF! couldn't find Task instance for 
taskId[%s].", taskId);
-}
-immutableWorker = findWorkerToRunTask(taskItem.getTask());
-
-if (immutableWorker == null) {
-  // no free worker, wait for some worker to become free
-  
statusLock.wait(config.getWaitForWorkerSlot().toStandardDuration().getMillis());
-  continue;
-} else if (workersWithUnacknowledgedTask.putIfAbsent(
-immutableWorker.getWorker().getHost(),
-taskId
-) != null) {
-  // there was a race and someone else took this worker slot, 
try again
-  continue;
-}
-  }
+synchronized (statusLock) {
+  Iterator iter = pendingTaskIds.iterator();
+  while (iter.hasNext()) {
+String taskId = iter.next();
+HttpRemoteTaskRunnerWorkItem ti = tasks.get(taskId);
+
+if (ti == null || !ti.getState().isPending()) {
+  // happens if the task was shutdown or was picked up earlier and 
no more pending
+  iter.remove();
 
 Review comment:
   > Do you mean the other threads can pick up even after the ti state is set 
to PENDING_WORKER_ASSIGN but will ignore it in the below if clause? Or, do they 
really not pick up because somehow pendingTaskIds is updated before other 
threads start picking up?
   
   yes, other executor threads would see that but ignore because of the if 
clause you noted.
   
   > But can pendingTaskIds be updated after this line? Like as the below code: 
...
   
   if we removed here then we might have to add it back just in case task 
couldn't run for whatever reason, adding back would be complex because we will 
find out that we need to add it back at some later time and also adding back to 
its old position in the list would be difficult.
   as opposed to all that we remove it

[GitHub] [incubator-druid] himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots

2019-12-12 Thread GitBox
himanshug commented on a change in pull request #8697: HRTR: make pending task 
execution handling to go through all tasks on not finding worker slots
URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r357032841
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
 ##
 @@ -1021,92 +1043,164 @@ public void unregisterListener(String listenerId)
 HttpRemoteTaskRunnerWorkItem.State.PENDING
 );
 tasks.put(task.getId(), taskRunnerWorkItem);
-addPendingTaskToExecutor(task.getId());
+pendingTaskIds.add(task.getId());
+
+statusLock.notifyAll();
+
 return taskRunnerWorkItem.getResult();
   }
 }
   }
 
-  private void addPendingTaskToExecutor(final String taskId)
+  private void startPendingTaskHandling()
   {
-pendingTasksExec.execute(
-() -> {
-  while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, 
TimeUnit.MILLISECONDS)) {
-ImmutableWorkerInfo immutableWorker;
-HttpRemoteTaskRunnerWorkItem taskItem = null;
+for (int i = 0; i < config.getPendingTasksRunnerNumThreads(); i++) {
+  pendingTasksExec.submit(
+  () -> {
 try {
-  synchronized (statusLock) {
-taskItem = tasks.get(taskId);
+  if (!lifecycleLock.awaitStarted()) {
+log.makeAlert("Lifecycle not started, PendingTaskExecution 
loop will not run.").emit();
+return;
+  }
 
-if (taskItem == null) {
-  log.info(
-  "Task[%s] work item not found. Probably user asked to 
shutdown before. Not assigning.",
-  taskId
-  );
-  return;
-}
+  pendingTasksExecutionLoop();
+}
+catch (Throwable t) {
+  log.makeAlert(t, "Error while waiting for lifecycle start. 
PendingTaskExecution loop will not run")
+ .emit();
+}
+finally {
+  log.info("PendingTaskExecution loop exited.");
+}
+  }
+  );
+}
+  }
 
-if (taskItem.getState() != 
HttpRemoteTaskRunnerWorkItem.State.PENDING) {
-  log.info(
-  "Task[%s] is in state[%s]. Probably some worker already 
reported it. Not assigning.",
-  taskId,
-  taskItem.getState()
-  );
-  return;
-}
+  private void pendingTasksExecutionLoop()
+  {
+while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, 
TimeUnit.MILLISECONDS)) {
+  try {
+// Find one pending task to run and a worker to run on
+HttpRemoteTaskRunnerWorkItem taskItem = null;
+ImmutableWorkerInfo immutableWorker = null;
 
-if (taskItem.getTask() == null) {
-  throw new ISE("WTF! couldn't find Task instance for 
taskId[%s].", taskId);
-}
-immutableWorker = findWorkerToRunTask(taskItem.getTask());
-
-if (immutableWorker == null) {
-  // no free worker, wait for some worker to become free
-  
statusLock.wait(config.getWaitForWorkerSlot().toStandardDuration().getMillis());
-  continue;
-} else if (workersWithUnacknowledgedTask.putIfAbsent(
-immutableWorker.getWorker().getHost(),
-taskId
-) != null) {
-  // there was a race and someone else took this worker slot, 
try again
-  continue;
-}
-  }
+synchronized (statusLock) {
+  Iterator iter = pendingTaskIds.iterator();
+  while (iter.hasNext()) {
+String taskId = iter.next();
+HttpRemoteTaskRunnerWorkItem ti = tasks.get(taskId);
+
+if (ti == null || !ti.getState().isPending()) {
+  // happens if the task was shutdown or was picked up earlier and 
no more pending
+  iter.remove();
 
 Review comment:
   if you still have doubts, maybe describe a scenario where you think this 
doesn't work and I will try to explain how I think that scenario would play out 
:) .


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots

2019-12-12 Thread GitBox
himanshug commented on a change in pull request #8697: HRTR: make pending task 
execution handling to go through all tasks on not finding worker slots
URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r357322140
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
 ##
 @@ -1021,92 +1043,164 @@ public void unregisterListener(String listenerId)
 HttpRemoteTaskRunnerWorkItem.State.PENDING
 );
 tasks.put(task.getId(), taskRunnerWorkItem);
-addPendingTaskToExecutor(task.getId());
+pendingTaskIds.add(task.getId());
+
+statusLock.notifyAll();
+
 return taskRunnerWorkItem.getResult();
   }
 }
   }
 
-  private void addPendingTaskToExecutor(final String taskId)
+  private void startPendingTaskHandling()
   {
-pendingTasksExec.execute(
-() -> {
-  while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, 
TimeUnit.MILLISECONDS)) {
-ImmutableWorkerInfo immutableWorker;
-HttpRemoteTaskRunnerWorkItem taskItem = null;
+for (int i = 0; i < config.getPendingTasksRunnerNumThreads(); i++) {
+  pendingTasksExec.submit(
+  () -> {
 try {
-  synchronized (statusLock) {
-taskItem = tasks.get(taskId);
+  if (!lifecycleLock.awaitStarted()) {
+log.makeAlert("Lifecycle not started, PendingTaskExecution 
loop will not run.").emit();
+return;
+  }
 
-if (taskItem == null) {
-  log.info(
-  "Task[%s] work item not found. Probably user asked to 
shutdown before. Not assigning.",
-  taskId
-  );
-  return;
-}
+  pendingTasksExecutionLoop();
+}
+catch (Throwable t) {
+  log.makeAlert(t, "Error while waiting for lifecycle start. 
PendingTaskExecution loop will not run")
+ .emit();
+}
+finally {
+  log.info("PendingTaskExecution loop exited.");
+}
+  }
+  );
+}
+  }
 
-if (taskItem.getState() != 
HttpRemoteTaskRunnerWorkItem.State.PENDING) {
-  log.info(
-  "Task[%s] is in state[%s]. Probably some worker already 
reported it. Not assigning.",
-  taskId,
-  taskItem.getState()
-  );
-  return;
-}
+  private void pendingTasksExecutionLoop()
+  {
+while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, 
TimeUnit.MILLISECONDS)) {
+  try {
+// Find one pending task to run and a worker to run on
+HttpRemoteTaskRunnerWorkItem taskItem = null;
+ImmutableWorkerInfo immutableWorker = null;
 
-if (taskItem.getTask() == null) {
-  throw new ISE("WTF! couldn't find Task instance for 
taskId[%s].", taskId);
-}
-immutableWorker = findWorkerToRunTask(taskItem.getTask());
-
-if (immutableWorker == null) {
-  // no free worker, wait for some worker to become free
-  
statusLock.wait(config.getWaitForWorkerSlot().toStandardDuration().getMillis());
-  continue;
-} else if (workersWithUnacknowledgedTask.putIfAbsent(
-immutableWorker.getWorker().getHost(),
-taskId
-) != null) {
-  // there was a race and someone else took this worker slot, 
try again
-  continue;
-}
-  }
+synchronized (statusLock) {
+  Iterator iter = pendingTaskIds.iterator();
+  while (iter.hasNext()) {
+String taskId = iter.next();
+HttpRemoteTaskRunnerWorkItem ti = tasks.get(taskId);
+
+if (ti == null || !ti.getState().isPending()) {
+  // happens if the task was shutdown or was picked up earlier and 
no more pending
+  iter.remove();
 
 Review comment:
   got it. sure, added some commentary to hopefully make it clearer.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots

2019-12-12 Thread GitBox
himanshug commented on a change in pull request #8697: HRTR: make pending task 
execution handling to go through all tasks on not finding worker slots
URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r357343567
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
 ##
 @@ -1138,6 +1139,12 @@ private void pendingTasksExecutionLoop()
 
 // set state to PENDING_WORKER_ASSIGN before releasing the lock so 
that this task item is not picked
 // up by another task execution thread.
+// note that we can't simply delete this task item from 
pendingTaskIds or else we would have to add it
+// back if this thread couldn't run this task for any reason, 
which we will know at some later time
+// and also we will need to add it back to its old position in the 
list. that becomes complex quickly.
+// Instead we keep the PENDING_WORKER_ASSIGN to notify other task 
execution threads not to pick this one up.
+// And, it is automatically removed by any of the task exeuction 
threads when they notice that
 
 Review comment:
   fixed, thanks


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org