kfaraz commented on code in PR #14581:
URL: https://github.com/apache/druid/pull/14581#discussion_r1264934508


##########
server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java:
##########
@@ -43,17 +52,129 @@
  */
 public interface OverlordClient
 {
+  /**
+   * Contact the Overlord that we believe to be the leader, and return the 
result of its
+   * {@code /druid/indexer/v1/leader} API. This may be a different Overlord 
server than the one we contacted, if
+   * a leader change happened since the last time we updated our sense of who 
the leader is.
+   */
+  ListenableFuture<URI> findCurrentLeader();
+
+  /**
+   * Run a task with the provided ID and payload. The payload must be 
convertible by an
+   * {@link com.fasterxml.jackson.databind.ObjectMapper} into a Task object. 
This method does not take Task objects
+   * directly, because Task is in the indexing-service package.
+   *
+   * @param taskId     task ID
+   * @param taskObject task payload
+   */
   ListenableFuture<Void> runTask(String taskId, Object taskObject);
 
+  /**
+   * Run a "kill" task for a particular datasource and interval. Shortcut to 
{@link #runTask(String, Object)}.
+   *
+   * @param idPrefix   Descriptive prefix to include at the start of task IDs
+   * @param dataSource Datasource to kill
+   * @param interval   Interval to kill
+   */
+  default ListenableFuture<Void> runKillTask(String idPrefix, String 
dataSource, Interval interval)

Review Comment:
   Would it be better to retain the name `killUnusedSegments` from 
`IndexingServiceClient`? It aligns well with `killPendingSegments` and mentions 
exactly what we are nuking. We should probably mention this in the javadoc here 
as well.



##########
server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java:
##########
@@ -43,17 +52,129 @@
  */
 public interface OverlordClient
 {
+  /**
+   * Contact the Overlord that we believe to be the leader, and return the 
result of its
+   * {@code /druid/indexer/v1/leader} API. This may be a different Overlord 
server than the one we contacted, if
+   * a leader change happened since the last time we updated our sense of who 
the leader is.
+   */
+  ListenableFuture<URI> findCurrentLeader();
+
+  /**
+   * Run a task with the provided ID and payload. The payload must be 
convertible by an
+   * {@link com.fasterxml.jackson.databind.ObjectMapper} into a Task object. 
This method does not take Task objects
+   * directly, because Task is in the indexing-service package.
+   *
+   * @param taskId     task ID
+   * @param taskObject task payload
+   */
   ListenableFuture<Void> runTask(String taskId, Object taskObject);
 
+  /**
+   * Run a "kill" task for a particular datasource and interval. Shortcut to 
{@link #runTask(String, Object)}.
+   *
+   * @param idPrefix   Descriptive prefix to include at the start of task IDs
+   * @param dataSource Datasource to kill
+   * @param interval   Interval to kill
+   */
+  default ListenableFuture<Void> runKillTask(String idPrefix, String 
dataSource, Interval interval)
+  {
+    final String taskId = IdUtils.newTaskId(idPrefix, 
ClientKillUnusedSegmentsTaskQuery.TYPE, dataSource, interval);
+    final ClientTaskQuery taskQuery = new 
ClientKillUnusedSegmentsTaskQuery(taskId, dataSource, interval, false);
+    return runTask(taskId, taskQuery);
+  }
+
+  /**
+   * Cancel a task.
+   *
+   * @param taskId task ID
+   */
   ListenableFuture<Void> cancelTask(String taskId);
 
+  /**
+   * Return {@link TaskStatusPlus} for all tasks matching a set of optional 
search parameters.
+   *
+   * @param state             task state: may be "pending", "waiting", 
"running", or "complete"
+   * @param dataSource        datasource
+   * @param maxCompletedTasks maximum number of completed tasks to return. 
Does not affect other kinds of tasks

Review Comment:
   Since this field applies only to complete tasks, maybe we should have a 
separate method to `getCompleteTaskStatuses` (perhaps in a later PR).



##########
indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java:
##########
@@ -502,51 +501,51 @@ private void scheduleCompletedTasksCleanup()
             }
 
             ImmutableSet<String> taskIds = 
ImmutableSet.copyOf(completedTasks.keySet());
-            Map<String, TaskStatus> taskStatusesFromOverlord = null;
+            Either<Throwable, Map<String, TaskStatus>> apiCallResult = null;
 
             try {
-              StringFullResponseHolder fullResponseHolder = overlordClient.go(
-                  overlordClient.makeRequest(HttpMethod.POST, 
"/druid/indexer/v1/taskStatus")
-                                
.setContent(jsonMapper.writeValueAsBytes(taskIds))
-                                .addHeader(HttpHeaders.Names.ACCEPT, 
MediaType.APPLICATION_JSON)
-                                .addHeader(HttpHeaders.Names.CONTENT_TYPE, 
MediaType.APPLICATION_JSON)
-
-              );
-              if (fullResponseHolder.getStatus().getCode() == 200) {
-                String responseContent = fullResponseHolder.getContent();
-                taskStatusesFromOverlord = jsonMapper.readValue(
-                    responseContent,
-                    new TypeReference<Map<String, TaskStatus>>()
-                    {
-                    }
-                );
-                log.debug("Received completed task status response [%s].", 
responseContent);
-              } else if (fullResponseHolder.getStatus().getCode() == 404) {
-                // NOTE: this is to support backward compatibility, when 
overlord doesn't have "activeTasks" endpoint.
-                // this if clause should be removed in a future release.
-                log.debug("Deleting all completed tasks. Overlord appears to 
be running on older version.");
-                taskStatusesFromOverlord = ImmutableMap.of();
+              apiCallResult =
+                  
Either.value(FutureUtils.get(overlordClient.taskStatuses(taskIds), true));
+              log.debug("Received completed task status response [%s].", 
apiCallResult);
+            }
+            catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+            }
+            catch (ExecutionException e) {
+              if (e.getCause() instanceof HttpResponseException) {
+                final HttpResponseStatus status = ((HttpResponseException) 
e.getCause()).getResponse().getStatus();
+                if (status.getCode() == 404) {
+                  // NOTE: this is to support backward compatibility, when 
overlord doesn't have "activeTasks" endpoint.
+                  // this if clause should be removed in a future release.
+                  log.debug("Deleting all completed tasks. Overlord appears to 
be running on older version.");
+                  apiCallResult = Either.value(ImmutableMap.of());
+                } else {
+                  log.info(
+                      "Got non-success code[%s] from overlord while getting 
active tasks. will retry on next scheduled run.",

Review Comment:
   ```suggestion
                         "Got non-success code[%d] from overlord while getting 
active tasks. Will retry on next scheduled run.",
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java:
##########
@@ -502,51 +501,51 @@ private void scheduleCompletedTasksCleanup()
             }
 
             ImmutableSet<String> taskIds = 
ImmutableSet.copyOf(completedTasks.keySet());
-            Map<String, TaskStatus> taskStatusesFromOverlord = null;
+            Either<Throwable, Map<String, TaskStatus>> apiCallResult = null;
 
             try {
-              StringFullResponseHolder fullResponseHolder = overlordClient.go(
-                  overlordClient.makeRequest(HttpMethod.POST, 
"/druid/indexer/v1/taskStatus")
-                                
.setContent(jsonMapper.writeValueAsBytes(taskIds))
-                                .addHeader(HttpHeaders.Names.ACCEPT, 
MediaType.APPLICATION_JSON)
-                                .addHeader(HttpHeaders.Names.CONTENT_TYPE, 
MediaType.APPLICATION_JSON)
-
-              );
-              if (fullResponseHolder.getStatus().getCode() == 200) {
-                String responseContent = fullResponseHolder.getContent();
-                taskStatusesFromOverlord = jsonMapper.readValue(
-                    responseContent,
-                    new TypeReference<Map<String, TaskStatus>>()
-                    {
-                    }
-                );
-                log.debug("Received completed task status response [%s].", 
responseContent);
-              } else if (fullResponseHolder.getStatus().getCode() == 404) {
-                // NOTE: this is to support backward compatibility, when 
overlord doesn't have "activeTasks" endpoint.
-                // this if clause should be removed in a future release.
-                log.debug("Deleting all completed tasks. Overlord appears to 
be running on older version.");
-                taskStatusesFromOverlord = ImmutableMap.of();
+              apiCallResult =
+                  
Either.value(FutureUtils.get(overlordClient.taskStatuses(taskIds), true));
+              log.debug("Received completed task status response [%s].", 
apiCallResult);
+            }
+            catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+            }
+            catch (ExecutionException e) {
+              if (e.getCause() instanceof HttpResponseException) {
+                final HttpResponseStatus status = ((HttpResponseException) 
e.getCause()).getResponse().getStatus();
+                if (status.getCode() == 404) {
+                  // NOTE: this is to support backward compatibility, when 
overlord doesn't have "activeTasks" endpoint.
+                  // this if clause should be removed in a future release.
+                  log.debug("Deleting all completed tasks. Overlord appears to 
be running on older version.");
+                  apiCallResult = Either.value(ImmutableMap.of());
+                } else {
+                  log.info(
+                      "Got non-success code[%s] from overlord while getting 
active tasks. will retry on next scheduled run.",
+                      status.getCode()
+                  );
+                }
               } else {
-                log.info(
-                    "Got non-success code[%s] from overlord while getting 
active tasks. will retry on next scheduled run.",
-                    fullResponseHolder.getStatus().getCode()
-                );
+                apiCallResult = Either.error(e.getCause());
               }
             }
-            catch (Exception ex) {
-              log.warn(ex, "Exception while getting active tasks from 
overlord. will retry on next scheduled run.");
-
-              if (ex instanceof InterruptedException) {
-                Thread.currentThread().interrupt();
-              }
+            catch (Exception e) {
+              apiCallResult = Either.error(e);
             }
 
-            if (taskStatusesFromOverlord == null) {
+            if (apiCallResult == null) {
+              return;
+            } else if (apiCallResult.isError()) {
+              log.warn(
+                  apiCallResult.error(),
+                  "Exception while getting active tasks from overlord. will 
retry on next scheduled run."

Review Comment:
   ```suggestion
                     "Exception while getting active tasks from Overlord. Will 
retry on next scheduled run."
   ```



##########
server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java:
##########
@@ -2465,4 +2232,18 @@ public void 
testFindMaxNumTaskSlotsUsedByOneCompactionTaskWhenIsSequentialMode()
       Assert.assertEquals(1, 
CompactSegments.findMaxNumTaskSlotsUsedByOneCompactionTask(tuningConfig));
     }
   }
+
+  private static ArgumentCaptor<Object> setUpMockClient(final OverlordClient 
mockClient)

Review Comment:
   Do we need to use a mock `OverlordClient`? We could try to reuse the 
`TestOverlordClient` already written here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to