kfaraz commented on code in PR #14581:
URL: https://github.com/apache/druid/pull/14581#discussion_r1264934508
##########
server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java:
##########
@@ -43,17 +52,129 @@
*/
public interface OverlordClient
{
+ /**
+ * Contact the Overlord that we believe to be the leader, and return the
result of its
+ * {@code /druid/indexer/v1/leader} API. This may be a different Overlord
server than the one we contacted, if
+ * a leader change happened since the last time we updated our sense of who
the leader is.
+ */
+ ListenableFuture<URI> findCurrentLeader();
+
+ /**
+ * Run a task with the provided ID and payload. The payload must be
convertible by an
+ * {@link com.fasterxml.jackson.databind.ObjectMapper} into a Task object.
This method does not take Task objects
+ * directly, because Task is in the indexing-service package.
+ *
+ * @param taskId task ID
+ * @param taskObject task payload
+ */
ListenableFuture<Void> runTask(String taskId, Object taskObject);
+ /**
+ * Run a "kill" task for a particular datasource and interval. Shortcut to
{@link #runTask(String, Object)}.
+ *
+ * @param idPrefix Descriptive prefix to include at the start of task IDs
+ * @param dataSource Datasource to kill
+ * @param interval Interval to kill
+ */
+ default ListenableFuture<Void> runKillTask(String idPrefix, String
dataSource, Interval interval)
Review Comment:
Would it be better to retain the name `killUnusedSegments` from
`IndexingServiceClient`? It aligns well with `killPendingSegments` and mentions
exactly what we are nuking. We should probably mention this in the javadoc here
as well.
##########
server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java:
##########
@@ -43,17 +52,129 @@
*/
public interface OverlordClient
{
+ /**
+ * Contact the Overlord that we believe to be the leader, and return the
result of its
+ * {@code /druid/indexer/v1/leader} API. This may be a different Overlord
server than the one we contacted, if
+ * a leader change happened since the last time we updated our sense of who
the leader is.
+ */
+ ListenableFuture<URI> findCurrentLeader();
+
+ /**
+ * Run a task with the provided ID and payload. The payload must be
convertible by an
+ * {@link com.fasterxml.jackson.databind.ObjectMapper} into a Task object.
This method does not take Task objects
+ * directly, because Task is in the indexing-service package.
+ *
+ * @param taskId task ID
+ * @param taskObject task payload
+ */
ListenableFuture<Void> runTask(String taskId, Object taskObject);
+ /**
+ * Run a "kill" task for a particular datasource and interval. Shortcut to
{@link #runTask(String, Object)}.
+ *
+ * @param idPrefix Descriptive prefix to include at the start of task IDs
+ * @param dataSource Datasource to kill
+ * @param interval Interval to kill
+ */
+ default ListenableFuture<Void> runKillTask(String idPrefix, String
dataSource, Interval interval)
+ {
+ final String taskId = IdUtils.newTaskId(idPrefix,
ClientKillUnusedSegmentsTaskQuery.TYPE, dataSource, interval);
+ final ClientTaskQuery taskQuery = new
ClientKillUnusedSegmentsTaskQuery(taskId, dataSource, interval, false);
+ return runTask(taskId, taskQuery);
+ }
+
+ /**
+ * Cancel a task.
+ *
+ * @param taskId task ID
+ */
ListenableFuture<Void> cancelTask(String taskId);
+ /**
+ * Return {@link TaskStatusPlus} for all tasks matching a set of optional
search parameters.
+ *
+ * @param state task state: may be "pending", "waiting",
"running", or "complete"
+ * @param dataSource datasource
+ * @param maxCompletedTasks maximum number of completed tasks to return.
Does not affect other kinds of tasks
Review Comment:
Since this field applies only to complete tasks, maybe we should have a
separate method to `getCompleteTaskStatuses` (perhaps in a later PR).
##########
indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java:
##########
@@ -502,51 +501,51 @@ private void scheduleCompletedTasksCleanup()
}
ImmutableSet<String> taskIds =
ImmutableSet.copyOf(completedTasks.keySet());
- Map<String, TaskStatus> taskStatusesFromOverlord = null;
+ Either<Throwable, Map<String, TaskStatus>> apiCallResult = null;
try {
- StringFullResponseHolder fullResponseHolder = overlordClient.go(
- overlordClient.makeRequest(HttpMethod.POST,
"/druid/indexer/v1/taskStatus")
-
.setContent(jsonMapper.writeValueAsBytes(taskIds))
- .addHeader(HttpHeaders.Names.ACCEPT,
MediaType.APPLICATION_JSON)
- .addHeader(HttpHeaders.Names.CONTENT_TYPE,
MediaType.APPLICATION_JSON)
-
- );
- if (fullResponseHolder.getStatus().getCode() == 200) {
- String responseContent = fullResponseHolder.getContent();
- taskStatusesFromOverlord = jsonMapper.readValue(
- responseContent,
- new TypeReference<Map<String, TaskStatus>>()
- {
- }
- );
- log.debug("Received completed task status response [%s].",
responseContent);
- } else if (fullResponseHolder.getStatus().getCode() == 404) {
- // NOTE: this is to support backward compatibility, when
overlord doesn't have "activeTasks" endpoint.
- // this if clause should be removed in a future release.
- log.debug("Deleting all completed tasks. Overlord appears to
be running on older version.");
- taskStatusesFromOverlord = ImmutableMap.of();
+ apiCallResult =
+
Either.value(FutureUtils.get(overlordClient.taskStatuses(taskIds), true));
+ log.debug("Received completed task status response [%s].",
apiCallResult);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ catch (ExecutionException e) {
+ if (e.getCause() instanceof HttpResponseException) {
+ final HttpResponseStatus status = ((HttpResponseException)
e.getCause()).getResponse().getStatus();
+ if (status.getCode() == 404) {
+ // NOTE: this is to support backward compatibility, when
overlord doesn't have "activeTasks" endpoint.
+ // this if clause should be removed in a future release.
+ log.debug("Deleting all completed tasks. Overlord appears to
be running on older version.");
+ apiCallResult = Either.value(ImmutableMap.of());
+ } else {
+ log.info(
+ "Got non-success code[%s] from overlord while getting
active tasks. will retry on next scheduled run.",
Review Comment:
```suggestion
"Got non-success code[%d] from overlord while getting
active tasks. Will retry on next scheduled run.",
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java:
##########
@@ -502,51 +501,51 @@ private void scheduleCompletedTasksCleanup()
}
ImmutableSet<String> taskIds =
ImmutableSet.copyOf(completedTasks.keySet());
- Map<String, TaskStatus> taskStatusesFromOverlord = null;
+ Either<Throwable, Map<String, TaskStatus>> apiCallResult = null;
try {
- StringFullResponseHolder fullResponseHolder = overlordClient.go(
- overlordClient.makeRequest(HttpMethod.POST,
"/druid/indexer/v1/taskStatus")
-
.setContent(jsonMapper.writeValueAsBytes(taskIds))
- .addHeader(HttpHeaders.Names.ACCEPT,
MediaType.APPLICATION_JSON)
- .addHeader(HttpHeaders.Names.CONTENT_TYPE,
MediaType.APPLICATION_JSON)
-
- );
- if (fullResponseHolder.getStatus().getCode() == 200) {
- String responseContent = fullResponseHolder.getContent();
- taskStatusesFromOverlord = jsonMapper.readValue(
- responseContent,
- new TypeReference<Map<String, TaskStatus>>()
- {
- }
- );
- log.debug("Received completed task status response [%s].",
responseContent);
- } else if (fullResponseHolder.getStatus().getCode() == 404) {
- // NOTE: this is to support backward compatibility, when
overlord doesn't have "activeTasks" endpoint.
- // this if clause should be removed in a future release.
- log.debug("Deleting all completed tasks. Overlord appears to
be running on older version.");
- taskStatusesFromOverlord = ImmutableMap.of();
+ apiCallResult =
+
Either.value(FutureUtils.get(overlordClient.taskStatuses(taskIds), true));
+ log.debug("Received completed task status response [%s].",
apiCallResult);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ catch (ExecutionException e) {
+ if (e.getCause() instanceof HttpResponseException) {
+ final HttpResponseStatus status = ((HttpResponseException)
e.getCause()).getResponse().getStatus();
+ if (status.getCode() == 404) {
+ // NOTE: this is to support backward compatibility, when
overlord doesn't have "activeTasks" endpoint.
+ // this if clause should be removed in a future release.
+ log.debug("Deleting all completed tasks. Overlord appears to
be running on older version.");
+ apiCallResult = Either.value(ImmutableMap.of());
+ } else {
+ log.info(
+ "Got non-success code[%s] from overlord while getting
active tasks. will retry on next scheduled run.",
+ status.getCode()
+ );
+ }
} else {
- log.info(
- "Got non-success code[%s] from overlord while getting
active tasks. will retry on next scheduled run.",
- fullResponseHolder.getStatus().getCode()
- );
+ apiCallResult = Either.error(e.getCause());
}
}
- catch (Exception ex) {
- log.warn(ex, "Exception while getting active tasks from
overlord. will retry on next scheduled run.");
-
- if (ex instanceof InterruptedException) {
- Thread.currentThread().interrupt();
- }
+ catch (Exception e) {
+ apiCallResult = Either.error(e);
}
- if (taskStatusesFromOverlord == null) {
+ if (apiCallResult == null) {
+ return;
+ } else if (apiCallResult.isError()) {
+ log.warn(
+ apiCallResult.error(),
+ "Exception while getting active tasks from overlord. will
retry on next scheduled run."
Review Comment:
```suggestion
"Exception while getting active tasks from Overlord. Will
retry on next scheduled run."
```
##########
server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java:
##########
@@ -2465,4 +2232,18 @@ public void
testFindMaxNumTaskSlotsUsedByOneCompactionTaskWhenIsSequentialMode()
Assert.assertEquals(1,
CompactSegments.findMaxNumTaskSlotsUsedByOneCompactionTask(tuningConfig));
}
}
+
+ private static ArgumentCaptor<Object> setUpMockClient(final OverlordClient
mockClient)
Review Comment:
Do we need to use a mock `OverlordClient`? We could try to reuse the
`TestOverlordClient` already written here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]