kfaraz commented on code in PR #18448:
URL: https://github.com/apache/druid/pull/18448#discussion_r2309489510
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -716,7 +711,7 @@ private void notifyStatus(final TaskEntry entry, final
TaskStatus taskStatus, St
// Mark this task as complete, so it isn't managed while being cleaned up.
entry.isComplete = true;
// Update the task status associated with this entry
- entry.taskInfo = entry.taskInfo.withNewStatus(taskStatus);
+ entry.taskInfo = entry.taskInfo.withStatus(taskStatus);
Review Comment:
Shouldn't this also go through `entry.updateStatus()`?
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -1020,24 +1024,44 @@ public Optional<Task> getActiveTask(String id)
}
/**
- * List of all active and completed tasks currently being managed by this
- * TaskQueue.
+ * Gets the {@link TaskInfo} for the given {@code taskId} from {@link
#activeTasks} if present,
+ * otherwise returns an empty optional.
+ */
+ public Optional<TaskInfo<Task, TaskStatus>> getActiveTaskInfo(String taskId)
+ {
+ final TaskEntry entry = activeTasks.get(taskId);
+ if (entry == null) {
+ return Optional.absent();
+ }
+ return Optional.of(entry.taskInfo);
+ }
+
+ /**
+ * List of all active and completed task infos currently being managed by
this TaskQueue.
+ */
+ public List<TaskInfo<Task, TaskStatus>> getTaskInfos()
+ {
+ return activeTasks.values().stream().map(entry ->
entry.taskInfo).collect(Collectors.toList());
+ }
+
+ /**
+ * List of all active and completed tasks currently being managed by this
TaskQueue.
*/
public List<Task> getTasks()
{
- return activeTasks.values().stream().map(entry ->
entry.task).collect(Collectors.toList());
+ return
getTaskInfos().stream().map(TaskInfo::getTask).collect(Collectors.toList());
}
/**
- * Returns the list of currently active tasks for the given datasource.
+ * Returns a map of currently active tasks for the given datasource.
*/
public Map<String, Task> getActiveTasksForDatasource(String datasource)
{
return activeTasks.values().stream().filter(
entry -> !entry.isComplete
- && entry.task.getDataSource().equals(datasource)
+ && entry.taskInfo.getDataSource().equals(datasource)
).map(
- entry -> entry.task
+ entry -> entry.getTask()
Review Comment:
```suggestion
TaskEntry::getTask
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -921,7 +925,7 @@ private Map<String, RowKey> getCurrentTaskDatasources()
{
return activeTasks.values().stream()
.filter(entry -> !entry.isComplete)
- .map(entry -> entry.task)
+ .map(entry -> entry.getTask())
Review Comment:
```suggestion
.map(TaskEntry::getTask)
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -548,27 +549,28 @@ public boolean add(final Task task)
}
@GuardedBy("startStopLock")
- private void addTaskInternal(final Task task, final DateTime updateTime)
+ private void addTaskInternal(final TaskInfo<Task, TaskStatus> taskInfo,
final DateTime updateTime)
{
final AtomicBoolean added = new AtomicBoolean(false);
final TaskEntry entry = addOrUpdateTaskEntry(
- task.getId(),
+ taskInfo.getId(),
prevEntry -> {
if (prevEntry == null) {
added.set(true);
- return new TaskEntry(task);
+ return new TaskEntry(taskInfo);
} else if (prevEntry.lastUpdatedTime.isBefore(updateTime)) {
- prevEntry.lastUpdatedTime = updateTime;
+ // Ensure we keep the current status up-to-date
Review Comment:
Nit: not really needed.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -958,7 +962,7 @@ public Map<RowKey, Long> getWaitingTaskCount()
return activeTasks.values().stream()
.filter(entry -> !entry.isComplete)
- .map(entry -> entry.task)
+ .map(entry -> entry.getTask())
Review Comment:
```suggestion
.map(TaskEntry::getTask)
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -835,26 +839,26 @@ void syncFromStorage()
try {
if (active) {
- final Map<String, Task> newTasks =
- CollectionUtils.toMap(taskStorage.getActiveTasks(), Task::getId,
Function.identity());
- final Map<String, Task> oldTasks =
- CollectionUtils.mapValues(activeTasks, entry -> entry.task);
+ final Map<String, TaskInfo<Task, TaskStatus>> newTasks =
+ CollectionUtils.toMap(taskStorage.getActiveTaskInfos(), (taskInfo)
-> taskInfo.getTask().getId(), Function.identity());
Review Comment:
shorter alternative:
```suggestion
CollectionUtils.toMap(taskStorage.getActiveTaskInfos(),
TaskInfo::getId, Function.identity());
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -1140,17 +1164,27 @@ private void removeTaskLock(Task task)
*/
static class TaskEntry
{
- private final Task task;
+ private TaskInfo<Task, TaskStatus> taskInfo;
private DateTime lastUpdatedTime;
private ListenableFuture<TaskStatus> future = null;
private boolean isComplete = false;
- TaskEntry(Task task)
+ TaskEntry(TaskInfo<Task, TaskStatus> taskInfo)
{
- this.task = task;
+ this.taskInfo = taskInfo;
this.lastUpdatedTime = DateTimes.nowUtc();
}
+
+ /**
+ * Updates the {@link TaskStatus} for the task associated with this {@link
TaskEntry} and sets the corresponding
+ * update time.
+ */
+ public void updateStatus(TaskStatus status, DateTime updateTime)
+ {
+ this.taskInfo.withStatus(status);
Review Comment:
This just creates a new instance of `TaskInfo` but doesn't assign it to the
`taskInfo` field.
So this method is not actually updating the status here.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -1020,24 +1024,44 @@ public Optional<Task> getActiveTask(String id)
}
/**
- * List of all active and completed tasks currently being managed by this
- * TaskQueue.
+ * Gets the {@link TaskInfo} for the given {@code taskId} from {@link
#activeTasks} if present,
+ * otherwise returns an empty optional.
+ */
+ public Optional<TaskInfo<Task, TaskStatus>> getActiveTaskInfo(String taskId)
+ {
+ final TaskEntry entry = activeTasks.get(taskId);
+ if (entry == null) {
+ return Optional.absent();
+ }
+ return Optional.of(entry.taskInfo);
Review Comment:
Nit: shorter alternative
```suggestion
return entry == null ? Optional.absent() : Optional.of(entry.taskInfo);
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -1140,17 +1164,35 @@ private void removeTaskLock(Task task)
*/
static class TaskEntry
{
- private final Task task;
+ private TaskInfo<Task, TaskStatus> taskInfo;
private DateTime lastUpdatedTime;
private ListenableFuture<TaskStatus> future = null;
private boolean isComplete = false;
- TaskEntry(Task task)
+ TaskEntry(TaskInfo<Task, TaskStatus> taskInfo)
{
- this.task = task;
+ this.taskInfo = taskInfo;
this.lastUpdatedTime = DateTimes.nowUtc();
}
+
+ /**
+ * Returns the task associated with this {@link TaskEntry}
+ */
+ public Task getTask()
+ {
+ return taskInfo.getTask();
+ }
+
+ /**
+ * Updates the {@link TaskStatus} for the task associated with this {@link
TaskEntry} and sets the corresponding
+ * update time.
+ */
+ public void updateStatus(TaskStatus status, DateTime updateTime)
Review Comment:
```suggestion
void updateStatus(TaskStatus status, DateTime updateTime)
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -708,6 +710,8 @@ private void notifyStatus(final TaskEntry entry, final
TaskStatus taskStatus, St
// Mark this task as complete, so it isn't managed while being cleaned up.
entry.isComplete = true;
+ // Update the task status associated with this entry
Review Comment:
Not really needed.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -1140,17 +1164,35 @@ private void removeTaskLock(Task task)
*/
static class TaskEntry
{
- private final Task task;
+ private TaskInfo<Task, TaskStatus> taskInfo;
private DateTime lastUpdatedTime;
private ListenableFuture<TaskStatus> future = null;
private boolean isComplete = false;
- TaskEntry(Task task)
+ TaskEntry(TaskInfo<Task, TaskStatus> taskInfo)
{
- this.task = task;
+ this.taskInfo = taskInfo;
this.lastUpdatedTime = DateTimes.nowUtc();
}
+
+ /**
+ * Returns the task associated with this {@link TaskEntry}
+ */
+ public Task getTask()
Review Comment:
```suggestion
Task getTask()
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java:
##########
@@ -182,13 +192,22 @@ public List<Task> getActiveTasks()
{
// filter out taskInfo with a null 'task' which should only happen in
practice if we are missing a jackson module
// and don't know what to do with the payload, so we won't be able to make
use of it anyway
- return
handler.getTaskInfos(Collections.singletonMap(TaskLookupType.ACTIVE,
ActiveTaskLookup.getInstance()), null)
+ return handler.getTaskInfos(Map.of(TaskLookupType.ACTIVE,
ActiveTaskLookup.getInstance()), null)
.stream()
.filter(taskInfo -> taskInfo.getStatus().isRunnable() &&
taskInfo.getTask() != null)
Review Comment:
Should we simplify this to the following?
```suggestion
return handler.getActiveTaskInfos()
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -548,27 +549,28 @@ public boolean add(final Task task)
}
@GuardedBy("startStopLock")
- private void addTaskInternal(final Task task, final DateTime updateTime)
+ private void addTaskInternal(final TaskInfo<Task, TaskStatus> taskInfo,
final DateTime updateTime)
{
final AtomicBoolean added = new AtomicBoolean(false);
final TaskEntry entry = addOrUpdateTaskEntry(
- task.getId(),
+ taskInfo.getId(),
prevEntry -> {
if (prevEntry == null) {
added.set(true);
- return new TaskEntry(task);
+ return new TaskEntry(taskInfo);
} else if (prevEntry.lastUpdatedTime.isBefore(updateTime)) {
- prevEntry.lastUpdatedTime = updateTime;
+ // Ensure we keep the current status up-to-date
+ prevEntry.updateStatus(taskInfo.getStatus(), updateTime);
}
return prevEntry;
}
);
if (added.get()) {
- taskLockbox.add(task);
- } else if (!entry.task.equals(task)) {
- throw new ISE("Cannot add task[%s] as a different task for the same ID
has already been added.", task.getId());
+ taskLockbox.add(taskInfo.getTask());
+ } else if (!entry.getTask().equals(taskInfo.getTask())) {
+ throw new ISE("Cannot add task[%s] as a different task for the same ID
has already been added.", taskInfo.getTask().getId());
Review Comment:
Can use `taskInfo.getId()` instead of `taskInfo.getTask().getId()`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]