[GitHub] [incubator-druid] jihoonson commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots

2019-12-12 Thread GitBox
jihoonson commented on a change in pull request #8697: HRTR: make pending task 
execution handling to go through all tasks on not finding worker slots
URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r357323580
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
 ##
 @@ -1138,6 +1139,12 @@ private void pendingTasksExecutionLoop()
 
 // set state to PENDING_WORKER_ASSIGN before releasing the lock so 
that this task item is not picked
 // up by another task execution thread.
+// note that we can't simply delete this task item from 
pendingTaskIds or else we would have to add it
+// back if this thread couldn't run this task for any reason, 
which we will know at some later time
+// and also we will need to add it back to its old position in the 
list. that becomes complex quickly.
+// Instead we keep the PENDING_WORKER_ASSIGN to notify other task 
execution threads not to pick this one up.
+// And, it is automatically removed by any of the task exeuction 
threads when they notice that
 
 Review comment:
   typo: exeuction -> execution


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] jihoonson commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots

2019-12-12 Thread GitBox
jihoonson commented on a change in pull request #8697: HRTR: make pending task 
execution handling to go through all tasks on not finding worker slots
URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r357275275
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
 ##
 @@ -1021,92 +1043,164 @@ public void unregisterListener(String listenerId)
 HttpRemoteTaskRunnerWorkItem.State.PENDING
 );
 tasks.put(task.getId(), taskRunnerWorkItem);
-addPendingTaskToExecutor(task.getId());
+pendingTaskIds.add(task.getId());
+
+statusLock.notifyAll();
+
 return taskRunnerWorkItem.getResult();
   }
 }
   }
 
-  private void addPendingTaskToExecutor(final String taskId)
+  private void startPendingTaskHandling()
   {
-pendingTasksExec.execute(
-() -> {
-  while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, 
TimeUnit.MILLISECONDS)) {
-ImmutableWorkerInfo immutableWorker;
-HttpRemoteTaskRunnerWorkItem taskItem = null;
+for (int i = 0; i < config.getPendingTasksRunnerNumThreads(); i++) {
+  pendingTasksExec.submit(
+  () -> {
 try {
-  synchronized (statusLock) {
-taskItem = tasks.get(taskId);
+  if (!lifecycleLock.awaitStarted()) {
+log.makeAlert("Lifecycle not started, PendingTaskExecution 
loop will not run.").emit();
+return;
+  }
 
-if (taskItem == null) {
-  log.info(
-  "Task[%s] work item not found. Probably user asked to 
shutdown before. Not assigning.",
-  taskId
-  );
-  return;
-}
+  pendingTasksExecutionLoop();
+}
+catch (Throwable t) {
+  log.makeAlert(t, "Error while waiting for lifecycle start. 
PendingTaskExecution loop will not run")
+ .emit();
+}
+finally {
+  log.info("PendingTaskExecution loop exited.");
+}
+  }
+  );
+}
+  }
 
-if (taskItem.getState() != 
HttpRemoteTaskRunnerWorkItem.State.PENDING) {
-  log.info(
-  "Task[%s] is in state[%s]. Probably some worker already 
reported it. Not assigning.",
-  taskId,
-  taskItem.getState()
-  );
-  return;
-}
+  private void pendingTasksExecutionLoop()
+  {
+while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, 
TimeUnit.MILLISECONDS)) {
+  try {
+// Find one pending task to run and a worker to run on
+HttpRemoteTaskRunnerWorkItem taskItem = null;
+ImmutableWorkerInfo immutableWorker = null;
 
-if (taskItem.getTask() == null) {
-  throw new ISE("WTF! couldn't find Task instance for 
taskId[%s].", taskId);
-}
-immutableWorker = findWorkerToRunTask(taskItem.getTask());
-
-if (immutableWorker == null) {
-  // no free worker, wait for some worker to become free
-  
statusLock.wait(config.getWaitForWorkerSlot().toStandardDuration().getMillis());
-  continue;
-} else if (workersWithUnacknowledgedTask.putIfAbsent(
-immutableWorker.getWorker().getHost(),
-taskId
-) != null) {
-  // there was a race and someone else took this worker slot, 
try again
-  continue;
-}
-  }
+synchronized (statusLock) {
+  Iterator iter = pendingTaskIds.iterator();
+  while (iter.hasNext()) {
+String taskId = iter.next();
+HttpRemoteTaskRunnerWorkItem ti = tasks.get(taskId);
+
+if (ti == null || !ti.getState().isPending()) {
+  // happens if the task was shutdown or was picked up earlier and 
no more pending
+  iter.remove();
 
 Review comment:
   Oh I believe this would work. I was just curious whether it was intended or 
not. I'm ok with it if this was intended. Would you please add a comment to 
avoid confusion in the future?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] jihoonson commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots

2019-12-11 Thread GitBox
jihoonson commented on a change in pull request #8697: HRTR: make pending task 
execution handling to go through all tasks on not finding worker slots
URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r356919309
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
 ##
 @@ -1021,92 +1043,164 @@ public void unregisterListener(String listenerId)
 HttpRemoteTaskRunnerWorkItem.State.PENDING
 );
 tasks.put(task.getId(), taskRunnerWorkItem);
-addPendingTaskToExecutor(task.getId());
+pendingTaskIds.add(task.getId());
+
+statusLock.notifyAll();
+
 return taskRunnerWorkItem.getResult();
   }
 }
   }
 
-  private void addPendingTaskToExecutor(final String taskId)
+  private void startPendingTaskHandling()
   {
-pendingTasksExec.execute(
-() -> {
-  while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, 
TimeUnit.MILLISECONDS)) {
-ImmutableWorkerInfo immutableWorker;
-HttpRemoteTaskRunnerWorkItem taskItem = null;
+for (int i = 0; i < config.getPendingTasksRunnerNumThreads(); i++) {
+  pendingTasksExec.submit(
+  () -> {
 try {
-  synchronized (statusLock) {
-taskItem = tasks.get(taskId);
+  if (!lifecycleLock.awaitStarted()) {
+log.makeAlert("Lifecycle not started, PendingTaskExecution 
loop will not run.").emit();
+return;
+  }
 
-if (taskItem == null) {
-  log.info(
-  "Task[%s] work item not found. Probably user asked to 
shutdown before. Not assigning.",
-  taskId
-  );
-  return;
-}
+  pendingTasksExecutionLoop();
+}
+catch (Throwable t) {
+  log.makeAlert(t, "Error while waiting for lifecycle start. 
PendingTaskExecution loop will not run")
+ .emit();
+}
+finally {
+  log.info("PendingTaskExecution loop exited.");
+}
+  }
+  );
+}
+  }
 
-if (taskItem.getState() != 
HttpRemoteTaskRunnerWorkItem.State.PENDING) {
-  log.info(
-  "Task[%s] is in state[%s]. Probably some worker already 
reported it. Not assigning.",
-  taskId,
-  taskItem.getState()
-  );
-  return;
-}
+  private void pendingTasksExecutionLoop()
+  {
+while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, 
TimeUnit.MILLISECONDS)) {
+  try {
+// Find one pending task to run and a worker to run on
+HttpRemoteTaskRunnerWorkItem taskItem = null;
+ImmutableWorkerInfo immutableWorker = null;
 
-if (taskItem.getTask() == null) {
-  throw new ISE("WTF! couldn't find Task instance for 
taskId[%s].", taskId);
-}
-immutableWorker = findWorkerToRunTask(taskItem.getTask());
-
-if (immutableWorker == null) {
-  // no free worker, wait for some worker to become free
-  
statusLock.wait(config.getWaitForWorkerSlot().toStandardDuration().getMillis());
-  continue;
-} else if (workersWithUnacknowledgedTask.putIfAbsent(
-immutableWorker.getWorker().getHost(),
-taskId
-) != null) {
-  // there was a race and someone else took this worker slot, 
try again
-  continue;
-}
-  }
+synchronized (statusLock) {
+  Iterator iter = pendingTaskIds.iterator();
+  while (iter.hasNext()) {
+String taskId = iter.next();
+HttpRemoteTaskRunnerWorkItem ti = tasks.get(taskId);
+
+if (ti == null || !ti.getState().isPending()) {
+  // happens if the task was shutdown or was picked up earlier and 
no more pending
+  iter.remove();
 
 Review comment:
   Hmm, sorry I don't think I fully understand this logic yet. 
   
   > before leaving the lock , ti state is updated to PENDING_WORKER_ASSIGN so 
no other task executor thread can pick up.
   
   Do you mean the other threads can pick up even after the ti state is set to 
`PENDING_WORKER_ASSIGN` but will ignore it in the [below if 
clause](https://github.com/apache/incubator-druid/pull/8697/files#diff-b994d2f0b67b07608a060f52a17fbd01R1107)?
 Or, do they really not pick up because somehow `pendingTaskIds` is updated 
before other threads start picking up?
   
   > `pendingTaskIds` variable declaration has a comment that this variable is 
exclusively manipulated by only external task submitter threads or task 
executor threads which I preferred.
   
   Yes, I 

[GitHub] [incubator-druid] jihoonson commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots

2019-12-11 Thread GitBox
jihoonson commented on a change in pull request #8697: HRTR: make pending task 
execution handling to go through all tasks on not finding worker slots
URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r356919345
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
 ##
 @@ -1021,92 +1043,164 @@ public void unregisterListener(String listenerId)
 HttpRemoteTaskRunnerWorkItem.State.PENDING
 );
 tasks.put(task.getId(), taskRunnerWorkItem);
-addPendingTaskToExecutor(task.getId());
+pendingTaskIds.add(task.getId());
+
+statusLock.notifyAll();
+
 return taskRunnerWorkItem.getResult();
   }
 }
   }
 
-  private void addPendingTaskToExecutor(final String taskId)
+  private void startPendingTaskHandling()
   {
-pendingTasksExec.execute(
-() -> {
-  while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, 
TimeUnit.MILLISECONDS)) {
-ImmutableWorkerInfo immutableWorker;
-HttpRemoteTaskRunnerWorkItem taskItem = null;
+for (int i = 0; i < config.getPendingTasksRunnerNumThreads(); i++) {
+  pendingTasksExec.submit(
+  () -> {
 try {
-  synchronized (statusLock) {
-taskItem = tasks.get(taskId);
+  if (!lifecycleLock.awaitStarted()) {
+log.makeAlert("Lifecycle not started, PendingTaskExecution 
loop will not run.").emit();
+return;
+  }
 
-if (taskItem == null) {
-  log.info(
-  "Task[%s] work item not found. Probably user asked to 
shutdown before. Not assigning.",
-  taskId
-  );
-  return;
-}
+  pendingTasksExecutionLoop();
+}
+catch (Throwable t) {
+  log.makeAlert(t, "Error while waiting for lifecycle start. 
PendingTaskExecution loop will not run")
+ .emit();
+}
+finally {
+  log.info("PendingTaskExecution loop exited.");
+}
+  }
+  );
+}
+  }
 
-if (taskItem.getState() != 
HttpRemoteTaskRunnerWorkItem.State.PENDING) {
-  log.info(
-  "Task[%s] is in state[%s]. Probably some worker already 
reported it. Not assigning.",
-  taskId,
-  taskItem.getState()
-  );
-  return;
-}
+  private void pendingTasksExecutionLoop()
+  {
+while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, 
TimeUnit.MILLISECONDS)) {
+  try {
+// Find one pending task to run and a worker to run on
+HttpRemoteTaskRunnerWorkItem taskItem = null;
+ImmutableWorkerInfo immutableWorker = null;
 
-if (taskItem.getTask() == null) {
-  throw new ISE("WTF! couldn't find Task instance for 
taskId[%s].", taskId);
-}
-immutableWorker = findWorkerToRunTask(taskItem.getTask());
-
-if (immutableWorker == null) {
-  // no free worker, wait for some worker to become free
-  
statusLock.wait(config.getWaitForWorkerSlot().toStandardDuration().getMillis());
-  continue;
-} else if (workersWithUnacknowledgedTask.putIfAbsent(
-immutableWorker.getWorker().getHost(),
-taskId
-) != null) {
-  // there was a race and someone else took this worker slot, 
try again
-  continue;
-}
-  }
+synchronized (statusLock) {
+  Iterator iter = pendingTaskIds.iterator();
+  while (iter.hasNext()) {
+String taskId = iter.next();
+HttpRemoteTaskRunnerWorkItem ti = tasks.get(taskId);
+
+if (ti == null || !ti.getState().isPending()) {
+  // happens if the task was shutdown or was picked up earlier and 
no more pending
+  iter.remove();
+  continue;
+}
 
-  try {
-// this will send HTTP request to worker for assigning task 
and hence kept
-// outside the synchronized block.
-if (runTaskOnWorker(taskItem, 
immutableWorker.getWorker().getHost())) {
-  return;
-}
-  }
-  finally {
-
workersWithUnacknowledgedTask.remove(immutableWorker.getWorker().getHost());
-synchronized (statusLock) {
-  statusLock.notifyAll();
-}
-  }
+if (ti.getState() == 
HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN) {
+  // picked up by another pending task executor thread.
+  continue;

[GitHub] [incubator-druid] jihoonson commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots

2019-12-11 Thread GitBox
jihoonson commented on a change in pull request #8697: HRTR: make pending task 
execution handling to go through all tasks on not finding worker slots
URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r356819299
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
 ##
 @@ -313,6 +325,25 @@ private void scheduleCompletedTaskStatusCleanupFromZk()
 );
   }
 
+  /**
+   * Must not be used outside of this class and {@link 
HttpRemoteTaskRunnerResource}
+   */
+  @SuppressWarnings("GuardedBy") // Read on workersWithUnacknowledgedTask is 
safe
+  public Map getWorkersEligibleToRunTasks()
+  {
+return Maps.transformEntries(
+Maps.filterEntries(
+workers,
+input -> !lazyWorkers.containsKey(input.getKey()) &&
+ 
!workersWithUnacknowledgedTask.containsKey(input.getKey()) &&
+ !blackListedWorkers.containsKey(input.getKey()) &&
 
 Review comment:
   Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] jihoonson commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots

2019-12-10 Thread GitBox
jihoonson commented on a change in pull request #8697: HRTR: make pending task 
execution handling to go through all tasks on not finding worker slots
URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r356344644
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
 ##
 @@ -1021,92 +1043,164 @@ public void unregisterListener(String listenerId)
 HttpRemoteTaskRunnerWorkItem.State.PENDING
 );
 tasks.put(task.getId(), taskRunnerWorkItem);
-addPendingTaskToExecutor(task.getId());
+pendingTaskIds.add(task.getId());
+
+statusLock.notifyAll();
+
 return taskRunnerWorkItem.getResult();
   }
 }
   }
 
-  private void addPendingTaskToExecutor(final String taskId)
+  private void startPendingTaskHandling()
   {
-pendingTasksExec.execute(
-() -> {
-  while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, 
TimeUnit.MILLISECONDS)) {
-ImmutableWorkerInfo immutableWorker;
-HttpRemoteTaskRunnerWorkItem taskItem = null;
+for (int i = 0; i < config.getPendingTasksRunnerNumThreads(); i++) {
+  pendingTasksExec.submit(
+  () -> {
 try {
-  synchronized (statusLock) {
-taskItem = tasks.get(taskId);
+  if (!lifecycleLock.awaitStarted()) {
+log.makeAlert("Lifecycle not started, PendingTaskExecution 
loop will not run.").emit();
+return;
+  }
 
-if (taskItem == null) {
-  log.info(
-  "Task[%s] work item not found. Probably user asked to 
shutdown before. Not assigning.",
-  taskId
-  );
-  return;
-}
+  pendingTasksExecutionLoop();
+}
+catch (Throwable t) {
+  log.makeAlert(t, "Error while waiting for lifecycle start. 
PendingTaskExecution loop will not run")
+ .emit();
+}
+finally {
+  log.info("PendingTaskExecution loop exited.");
+}
+  }
+  );
+}
+  }
 
-if (taskItem.getState() != 
HttpRemoteTaskRunnerWorkItem.State.PENDING) {
-  log.info(
-  "Task[%s] is in state[%s]. Probably some worker already 
reported it. Not assigning.",
-  taskId,
-  taskItem.getState()
-  );
-  return;
-}
+  private void pendingTasksExecutionLoop()
+  {
+while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, 
TimeUnit.MILLISECONDS)) {
+  try {
+// Find one pending task to run and a worker to run on
+HttpRemoteTaskRunnerWorkItem taskItem = null;
+ImmutableWorkerInfo immutableWorker = null;
 
-if (taskItem.getTask() == null) {
-  throw new ISE("WTF! couldn't find Task instance for 
taskId[%s].", taskId);
-}
-immutableWorker = findWorkerToRunTask(taskItem.getTask());
-
-if (immutableWorker == null) {
-  // no free worker, wait for some worker to become free
-  
statusLock.wait(config.getWaitForWorkerSlot().toStandardDuration().getMillis());
-  continue;
-} else if (workersWithUnacknowledgedTask.putIfAbsent(
-immutableWorker.getWorker().getHost(),
-taskId
-) != null) {
-  // there was a race and someone else took this worker slot, 
try again
-  continue;
-}
-  }
+synchronized (statusLock) {
+  Iterator iter = pendingTaskIds.iterator();
+  while (iter.hasNext()) {
+String taskId = iter.next();
+HttpRemoteTaskRunnerWorkItem ti = tasks.get(taskId);
+
+if (ti == null || !ti.getState().isPending()) {
+  // happens if the task was shutdown or was picked up earlier and 
no more pending
+  iter.remove();
+  continue;
+}
 
-  try {
-// this will send HTTP request to worker for assigning task 
and hence kept
-// outside the synchronized block.
-if (runTaskOnWorker(taskItem, 
immutableWorker.getWorker().getHost())) {
-  return;
-}
-  }
-  finally {
-
workersWithUnacknowledgedTask.remove(immutableWorker.getWorker().getHost());
-synchronized (statusLock) {
-  statusLock.notifyAll();
-}
-  }
+if (ti.getState() == 
HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN) {
+  // picked up by another pending task executor thread.
+  continue;

[GitHub] [incubator-druid] jihoonson commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots

2019-12-10 Thread GitBox
jihoonson commented on a change in pull request #8697: HRTR: make pending task 
execution handling to go through all tasks on not finding worker slots
URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r356350247
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
 ##
 @@ -1021,92 +1043,164 @@ public void unregisterListener(String listenerId)
 HttpRemoteTaskRunnerWorkItem.State.PENDING
 );
 tasks.put(task.getId(), taskRunnerWorkItem);
-addPendingTaskToExecutor(task.getId());
+pendingTaskIds.add(task.getId());
+
+statusLock.notifyAll();
+
 return taskRunnerWorkItem.getResult();
   }
 }
   }
 
-  private void addPendingTaskToExecutor(final String taskId)
+  private void startPendingTaskHandling()
   {
-pendingTasksExec.execute(
-() -> {
-  while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, 
TimeUnit.MILLISECONDS)) {
-ImmutableWorkerInfo immutableWorker;
-HttpRemoteTaskRunnerWorkItem taskItem = null;
+for (int i = 0; i < config.getPendingTasksRunnerNumThreads(); i++) {
+  pendingTasksExec.submit(
+  () -> {
 try {
-  synchronized (statusLock) {
-taskItem = tasks.get(taskId);
+  if (!lifecycleLock.awaitStarted()) {
+log.makeAlert("Lifecycle not started, PendingTaskExecution 
loop will not run.").emit();
+return;
+  }
 
-if (taskItem == null) {
-  log.info(
-  "Task[%s] work item not found. Probably user asked to 
shutdown before. Not assigning.",
-  taskId
-  );
-  return;
-}
+  pendingTasksExecutionLoop();
+}
+catch (Throwable t) {
+  log.makeAlert(t, "Error while waiting for lifecycle start. 
PendingTaskExecution loop will not run")
+ .emit();
+}
+finally {
+  log.info("PendingTaskExecution loop exited.");
+}
+  }
+  );
+}
+  }
 
-if (taskItem.getState() != 
HttpRemoteTaskRunnerWorkItem.State.PENDING) {
-  log.info(
-  "Task[%s] is in state[%s]. Probably some worker already 
reported it. Not assigning.",
-  taskId,
-  taskItem.getState()
-  );
-  return;
-}
+  private void pendingTasksExecutionLoop()
+  {
+while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, 
TimeUnit.MILLISECONDS)) {
+  try {
+// Find one pending task to run and a worker to run on
+HttpRemoteTaskRunnerWorkItem taskItem = null;
+ImmutableWorkerInfo immutableWorker = null;
 
-if (taskItem.getTask() == null) {
-  throw new ISE("WTF! couldn't find Task instance for 
taskId[%s].", taskId);
-}
-immutableWorker = findWorkerToRunTask(taskItem.getTask());
-
-if (immutableWorker == null) {
-  // no free worker, wait for some worker to become free
-  
statusLock.wait(config.getWaitForWorkerSlot().toStandardDuration().getMillis());
-  continue;
-} else if (workersWithUnacknowledgedTask.putIfAbsent(
-immutableWorker.getWorker().getHost(),
-taskId
-) != null) {
-  // there was a race and someone else took this worker slot, 
try again
-  continue;
-}
-  }
+synchronized (statusLock) {
+  Iterator iter = pendingTaskIds.iterator();
+  while (iter.hasNext()) {
+String taskId = iter.next();
+HttpRemoteTaskRunnerWorkItem ti = tasks.get(taskId);
+
+if (ti == null || !ti.getState().isPending()) {
+  // happens if the task was shutdown or was picked up earlier and 
no more pending
+  iter.remove();
 
 Review comment:
   Looks like `pendingTaskIds` is updated only here. This will lead to 
processing the same taskId multiple times. HRTR will assign the task to some 
worker in the first loop, ignore it until the worker starts processing that 
task in the below `if` clause, and then finally remove the taskId here. Can we 
update `pendingTaskIds` when the task is successfully assigned?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-druid] jihoonson commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots

2019-12-10 Thread GitBox
jihoonson commented on a change in pull request #8697: HRTR: make pending task 
execution handling to go through all tasks on not finding worker slots
URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r356339972
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
 ##
 @@ -1501,6 +1623,23 @@ public void setState(State state)
   state
   );
 
+  setStateUnconditionally(state);
+}
+
+public void revertStateFromPendingWorkerAssignToPending()
+{
+  Preconditions.checkState(
+  this.state == State.PENDING_WORKER_ASSIGN,
+  "Can't move state from [%s] to [%s]",
+  this.state,
+  State.PENDING
+  );
+
+  setStateUnconditionally(State.PENDING);
+}
+
+private void setStateUnconditionally(State state)
+{
   if (log.isDebugEnabled()) {
 log.debug(
 new RuntimeException("Stacktrace..."),
 
 Review comment:
   Is this to print the stack trace so that you can see where this method was 
called from when the log was printed?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] jihoonson commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots

2019-12-10 Thread GitBox
jihoonson commented on a change in pull request #8697: HRTR: make pending task 
execution handling to go through all tasks on not finding worker slots
URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r356339621
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
 ##
 @@ -1432,15 +1533,35 @@ void taskAddedOrUpdated(final TaskAnnouncement 
announcement, final WorkerHolder
   {
 enum State
 {
-  PENDING(0),
-  RUNNING(1),
-  COMPLETE(2);
+  // Task has been given to HRTR, but a worker to run this task hasn't 
been identified yet.
+  PENDING(0, true, RunnerTaskState.PENDING),
+
+  // A Worker has been identified to run this task, but request to run 
task hasn't been made to worker yet
+  // or worker hasn't acknowledged the task yet.
+  PENDING_WORKER_ASSIGN(1, true, RunnerTaskState.PENDING),
 
-  int index;
+  RUNNING(2, false, RunnerTaskState.RUNNING),
+  COMPLETE(3, false, RunnerTaskState.NONE);
 
-  State(int index)
+  private int index;
+  private boolean isPending;
+  private RunnerTaskState runnerTaskState;
 
 Review comment:
   These variables can be final.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] jihoonson commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots

2019-12-10 Thread GitBox
jihoonson commented on a change in pull request #8697: HRTR: make pending task 
execution handling to go through all tasks on not finding worker slots
URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r356344661
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
 ##
 @@ -1021,92 +1043,164 @@ public void unregisterListener(String listenerId)
 HttpRemoteTaskRunnerWorkItem.State.PENDING
 );
 tasks.put(task.getId(), taskRunnerWorkItem);
-addPendingTaskToExecutor(task.getId());
+pendingTaskIds.add(task.getId());
+
+statusLock.notifyAll();
+
 return taskRunnerWorkItem.getResult();
   }
 }
   }
 
-  private void addPendingTaskToExecutor(final String taskId)
+  private void startPendingTaskHandling()
   {
-pendingTasksExec.execute(
-() -> {
-  while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, 
TimeUnit.MILLISECONDS)) {
-ImmutableWorkerInfo immutableWorker;
-HttpRemoteTaskRunnerWorkItem taskItem = null;
+for (int i = 0; i < config.getPendingTasksRunnerNumThreads(); i++) {
+  pendingTasksExec.submit(
+  () -> {
 try {
-  synchronized (statusLock) {
-taskItem = tasks.get(taskId);
+  if (!lifecycleLock.awaitStarted()) {
+log.makeAlert("Lifecycle not started, PendingTaskExecution 
loop will not run.").emit();
+return;
+  }
 
-if (taskItem == null) {
-  log.info(
-  "Task[%s] work item not found. Probably user asked to 
shutdown before. Not assigning.",
-  taskId
-  );
-  return;
-}
+  pendingTasksExecutionLoop();
+}
+catch (Throwable t) {
+  log.makeAlert(t, "Error while waiting for lifecycle start. 
PendingTaskExecution loop will not run")
+ .emit();
+}
+finally {
+  log.info("PendingTaskExecution loop exited.");
+}
+  }
+  );
+}
+  }
 
-if (taskItem.getState() != 
HttpRemoteTaskRunnerWorkItem.State.PENDING) {
-  log.info(
-  "Task[%s] is in state[%s]. Probably some worker already 
reported it. Not assigning.",
-  taskId,
-  taskItem.getState()
-  );
-  return;
-}
+  private void pendingTasksExecutionLoop()
+  {
+while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, 
TimeUnit.MILLISECONDS)) {
+  try {
+// Find one pending task to run and a worker to run on
+HttpRemoteTaskRunnerWorkItem taskItem = null;
+ImmutableWorkerInfo immutableWorker = null;
 
-if (taskItem.getTask() == null) {
-  throw new ISE("WTF! couldn't find Task instance for 
taskId[%s].", taskId);
-}
-immutableWorker = findWorkerToRunTask(taskItem.getTask());
-
-if (immutableWorker == null) {
-  // no free worker, wait for some worker to become free
-  
statusLock.wait(config.getWaitForWorkerSlot().toStandardDuration().getMillis());
-  continue;
-} else if (workersWithUnacknowledgedTask.putIfAbsent(
-immutableWorker.getWorker().getHost(),
-taskId
-) != null) {
-  // there was a race and someone else took this worker slot, 
try again
-  continue;
-}
-  }
+synchronized (statusLock) {
+  Iterator iter = pendingTaskIds.iterator();
+  while (iter.hasNext()) {
+String taskId = iter.next();
+HttpRemoteTaskRunnerWorkItem ti = tasks.get(taskId);
+
+if (ti == null || !ti.getState().isPending()) {
+  // happens if the task was shutdown or was picked up earlier and 
no more pending
+  iter.remove();
+  continue;
+}
 
-  try {
-// this will send HTTP request to worker for assigning task 
and hence kept
-// outside the synchronized block.
-if (runTaskOnWorker(taskItem, 
immutableWorker.getWorker().getHost())) {
-  return;
-}
-  }
-  finally {
-
workersWithUnacknowledgedTask.remove(immutableWorker.getWorker().getHost());
-synchronized (statusLock) {
-  statusLock.notifyAll();
-}
-  }
+if (ti.getState() == 
HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN) {
+  // picked up by another pending task executor thread.
+  continue;

[GitHub] [incubator-druid] jihoonson commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots

2019-12-10 Thread GitBox
jihoonson commented on a change in pull request #8697: HRTR: make pending task 
execution handling to go through all tasks on not finding worker slots
URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r356331904
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
 ##
 @@ -313,6 +325,25 @@ private void scheduleCompletedTaskStatusCleanupFromZk()
 );
   }
 
+  /**
+   * Must not be used outside of this class and {@link 
HttpRemoteTaskRunnerResource}
+   */
+  @SuppressWarnings("GuardedBy") // Read on workersWithUnacknowledgedTask is 
safe
+  public Map getWorkersEligibleToRunTasks()
 
 Review comment:
   Looks better to be package-private if it's not supposed to be used in 
general.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [incubator-druid] jihoonson commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots

2019-12-10 Thread GitBox
jihoonson commented on a change in pull request #8697: HRTR: make pending task 
execution handling to go through all tasks on not finding worker slots
URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r356338730
 
 

 ##
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
 ##
 @@ -313,6 +325,25 @@ private void scheduleCompletedTaskStatusCleanupFromZk()
 );
   }
 
+  /**
+   * Must not be used outside of this class and {@link 
HttpRemoteTaskRunnerResource}
+   */
+  @SuppressWarnings("GuardedBy") // Read on workersWithUnacknowledgedTask is 
safe
+  public Map getWorkersEligibleToRunTasks()
+  {
+return Maps.transformEntries(
+Maps.filterEntries(
+workers,
+input -> !lazyWorkers.containsKey(input.getKey()) &&
+ 
!workersWithUnacknowledgedTask.containsKey(input.getKey()) &&
+ !blackListedWorkers.containsKey(input.getKey()) &&
 
 Review comment:
   Hmm, this looks racy because each concurrent map is updated under different 
locks. But is this ok because this method is called periodically?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org