AmatyaAvadhanula commented on code in PR #12404:
URL: https://github.com/apache/druid/pull/12404#discussion_r897973590
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java:
##########
@@ -745,87 +745,57 @@ private List<TaskStatusPlus> getTasks(
if (state == TaskStateLookup.PENDING || state == TaskStateLookup.RUNNING) {
// We are interested in only those tasks which are in taskRunner.
- taskInfoStreamFromTaskStorage = taskInfoStreamFromTaskStorage
- .filter(info -> runnerWorkItems.containsKey(info.getId()));
+ taskStatusPlusStream = taskStatusPlusStream
+ .filter(statusPlus ->
runnerWorkItems.containsKey(statusPlus.getId()));
}
- final List<TaskInfo<Task, TaskStatus>> taskInfoFromTaskStorage =
taskInfoStreamFromTaskStorage
- .collect(Collectors.toList());
+ final List<TaskStatusPlus> taskStatusPlusList =
taskStatusPlusStream.collect(Collectors.toList());
// Separate complete and active tasks from taskStorage.
// Note that taskStorage can return only either complete tasks or active
tasks per TaskLookupType.
- final List<TaskInfo<Task, TaskStatus>> completeTaskInfoFromTaskStorage =
new ArrayList<>();
- final List<TaskInfo<Task, TaskStatus>> activeTaskInfoFromTaskStorage = new
ArrayList<>();
- for (TaskInfo<Task, TaskStatus> info : taskInfoFromTaskStorage) {
- if (info.getStatus().isComplete()) {
- completeTaskInfoFromTaskStorage.add(info);
+ final List<TaskStatusPlus> completeTaskStatusPlusList = new ArrayList<>();
+ final List<TaskStatusPlus> activeTaskStatusPlusList = new ArrayList<>();
+ for (TaskStatusPlus statusPlus : taskStatusPlusList) {
+ if (statusPlus.getStatusCode().isComplete()) {
+ completeTaskStatusPlusList.add(statusPlus);
} else {
- activeTaskInfoFromTaskStorage.add(info);
+ activeTaskStatusPlusList.add(statusPlus);
}
}
- final List<TaskStatusPlus> statuses = new ArrayList<>();
- completeTaskInfoFromTaskStorage.forEach(taskInfo -> statuses.add(
- new TaskStatusPlus(
- taskInfo.getId(),
- taskInfo.getTask() == null ? null :
taskInfo.getTask().getGroupId(),
- taskInfo.getTask() == null ? null : taskInfo.getTask().getType(),
- taskInfo.getCreatedTime(),
- DateTimes.EPOCH,
- taskInfo.getStatus().getStatusCode(),
- RunnerTaskState.NONE,
- taskInfo.getStatus().getDuration(),
- taskInfo.getStatus().getLocation(),
- taskInfo.getDataSource(),
- taskInfo.getStatus().getErrorMsg()
- )
- ));
+ final List<TaskStatusPlus> taskStatuses = new
ArrayList<>(completeTaskStatusPlusList);
- activeTaskInfoFromTaskStorage.forEach(taskInfo -> {
- final TaskRunnerWorkItem runnerWorkItem =
runnerWorkItems.get(taskInfo.getId());
+ activeTaskStatusPlusList.forEach(statusPlus -> {
+ final TaskRunnerWorkItem runnerWorkItem =
runnerWorkItems.get(statusPlus.getId());
if (runnerWorkItem == null) {
// a task is assumed to be a waiting task if it exists in taskStorage
but not in taskRunner.
if (state == TaskStateLookup.WAITING || state == TaskStateLookup.ALL) {
- statuses.add(
- new TaskStatusPlus(
- taskInfo.getId(),
- taskInfo.getTask() == null ? null :
taskInfo.getTask().getGroupId(),
- taskInfo.getTask() == null ? null :
taskInfo.getTask().getType(),
- taskInfo.getCreatedTime(),
- DateTimes.EPOCH,
- taskInfo.getStatus().getStatusCode(),
- RunnerTaskState.WAITING,
- taskInfo.getStatus().getDuration(),
- taskInfo.getStatus().getLocation(),
- taskInfo.getDataSource(),
- taskInfo.getStatus().getErrorMsg()
- )
- );
+ taskStatuses.add(statusPlus);
}
} else {
if (state == TaskStateLookup.PENDING || state ==
TaskStateLookup.RUNNING || state == TaskStateLookup.ALL) {
- statuses.add(
+ taskStatuses.add(
new TaskStatusPlus(
- taskInfo.getId(),
- taskInfo.getTask() == null ? null :
taskInfo.getTask().getGroupId(),
- taskInfo.getTask() == null ? null :
taskInfo.getTask().getType(),
+ statusPlus.getId(),
+ statusPlus.getGroupId(),
+ statusPlus.getType(),
runnerWorkItem.getCreatedTime(),
runnerWorkItem.getQueueInsertionTime(),
- taskInfo.getStatus().getStatusCode(),
- taskRunner.getRunnerTaskState(taskInfo.getId()), // this is
racy for remoteTaskRunner
- taskInfo.getStatus().getDuration(),
+ statusPlus.getStatusCode(),
+ taskRunner.getRunnerTaskState(statusPlus.getId()), // this
is racy for remoteTaskRunner
+ statusPlus.getDuration(),
runnerWorkItem.getLocation(), // location in taskInfo is
only updated after the task is done.
- taskInfo.getDataSource(),
- taskInfo.getStatus().getErrorMsg()
+ statusPlus.getDataSource(),
+ statusPlus.getErrorMsg()
)
);
}
}
});
- return statuses;
+ return taskStatuses;
}
- private Stream<TaskInfo<Task, TaskStatus>> getTaskInfoStreamFromTaskStorage(
+ private Stream<TaskStatusPlus> getTaskStatusPlusList(
Review Comment:
Filtering has been happening before my changes, I haven't made semantic
changes to this method
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]