kfaraz commented on code in PR #18448:
URL: https://github.com/apache/druid/pull/18448#discussion_r2309489510


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -716,7 +711,7 @@ private void notifyStatus(final TaskEntry entry, final 
TaskStatus taskStatus, St
     // Mark this task as complete, so it isn't managed while being cleaned up.
     entry.isComplete = true;
     // Update the task status associated with this entry
-    entry.taskInfo = entry.taskInfo.withNewStatus(taskStatus);
+    entry.taskInfo = entry.taskInfo.withStatus(taskStatus);

Review Comment:
   Shouldn't this also go through `entry.updateStatus()`?



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -1020,24 +1024,44 @@ public Optional<Task> getActiveTask(String id)
   }
 
   /**
-   * List of all active and completed tasks currently being managed by this
-   * TaskQueue.
+   * Gets the {@link TaskInfo} for the given {@code taskId} from {@link 
#activeTasks} if present,
+   * otherwise returns an empty optional.
+   */
+  public Optional<TaskInfo<Task, TaskStatus>> getActiveTaskInfo(String taskId)
+  {
+    final TaskEntry entry = activeTasks.get(taskId);
+    if (entry == null) {
+      return Optional.absent();
+    }
+    return Optional.of(entry.taskInfo);
+  }
+
+  /**
+   * List of all active and completed task infos currently being managed by 
this TaskQueue.
+   */
+  public List<TaskInfo<Task, TaskStatus>> getTaskInfos()
+  {
+    return activeTasks.values().stream().map(entry -> 
entry.taskInfo).collect(Collectors.toList());
+  }
+
+  /**
+   * List of all active and completed tasks currently being managed by this 
TaskQueue.
    */
   public List<Task> getTasks()
   {
-    return activeTasks.values().stream().map(entry -> 
entry.task).collect(Collectors.toList());
+    return 
getTaskInfos().stream().map(TaskInfo::getTask).collect(Collectors.toList());
   }
 
   /**
-   * Returns the list of currently active tasks for the given datasource.
+   * Returns a map of currently active tasks for the given datasource.
    */
   public Map<String, Task> getActiveTasksForDatasource(String datasource)
   {
     return activeTasks.values().stream().filter(
         entry -> !entry.isComplete
-                 && entry.task.getDataSource().equals(datasource)
+                 && entry.taskInfo.getDataSource().equals(datasource)
     ).map(
-        entry -> entry.task
+        entry -> entry.getTask()

Review Comment:
   ```suggestion
           TaskEntry::getTask
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -921,7 +925,7 @@ private Map<String, RowKey> getCurrentTaskDatasources()
   {
     return activeTasks.values().stream()
                       .filter(entry -> !entry.isComplete)
-                      .map(entry -> entry.task)
+                      .map(entry -> entry.getTask())

Review Comment:
   ```suggestion
                         .map(TaskEntry::getTask)
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -548,27 +549,28 @@ public boolean add(final Task task)
   }
 
   @GuardedBy("startStopLock")
-  private void addTaskInternal(final Task task, final DateTime updateTime)
+  private void addTaskInternal(final TaskInfo<Task, TaskStatus> taskInfo, 
final DateTime updateTime)
   {
     final AtomicBoolean added = new AtomicBoolean(false);
     final TaskEntry entry = addOrUpdateTaskEntry(
-        task.getId(),
+        taskInfo.getId(),
         prevEntry -> {
           if (prevEntry == null) {
             added.set(true);
-            return new TaskEntry(task);
+            return new TaskEntry(taskInfo);
           } else if (prevEntry.lastUpdatedTime.isBefore(updateTime)) {
-            prevEntry.lastUpdatedTime = updateTime;
+            // Ensure we keep the current status up-to-date

Review Comment:
   Nit: not really needed.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -958,7 +962,7 @@ public Map<RowKey, Long> getWaitingTaskCount()
 
     return activeTasks.values().stream()
                       .filter(entry -> !entry.isComplete)
-                      .map(entry -> entry.task)
+                      .map(entry -> entry.getTask())

Review Comment:
   ```suggestion
                         .map(TaskEntry::getTask)
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -835,26 +839,26 @@ void syncFromStorage()
 
     try {
       if (active) {
-        final Map<String, Task> newTasks =
-            CollectionUtils.toMap(taskStorage.getActiveTasks(), Task::getId, 
Function.identity());
-        final Map<String, Task> oldTasks =
-            CollectionUtils.mapValues(activeTasks, entry -> entry.task);
+        final Map<String, TaskInfo<Task, TaskStatus>> newTasks =
+            CollectionUtils.toMap(taskStorage.getActiveTaskInfos(), (taskInfo) 
-> taskInfo.getTask().getId(), Function.identity());

Review Comment:
   shorter alternative:
   ```suggestion
               CollectionUtils.toMap(taskStorage.getActiveTaskInfos(), 
TaskInfo::getId, Function.identity());
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -1140,17 +1164,27 @@ private void removeTaskLock(Task task)
    */
   static class TaskEntry
   {
-    private final Task task;
+    private TaskInfo<Task, TaskStatus> taskInfo;
 
     private DateTime lastUpdatedTime;
     private ListenableFuture<TaskStatus> future = null;
     private boolean isComplete = false;
 
-    TaskEntry(Task task)
+    TaskEntry(TaskInfo<Task, TaskStatus> taskInfo)
     {
-      this.task = task;
+      this.taskInfo = taskInfo;
       this.lastUpdatedTime = DateTimes.nowUtc();
     }
+
+    /**
+     * Updates the {@link TaskStatus} for the task associated with this {@link 
TaskEntry} and sets the corresponding
+     * update time.
+     */
+    public void updateStatus(TaskStatus status, DateTime updateTime)
+    {
+      this.taskInfo.withStatus(status);

Review Comment:
   This just creates a new instance of `TaskInfo` but doesn't assign it to the 
`taskInfo` field.
   So this method is not actually updating the status here.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -1020,24 +1024,44 @@ public Optional<Task> getActiveTask(String id)
   }
 
   /**
-   * List of all active and completed tasks currently being managed by this
-   * TaskQueue.
+   * Gets the {@link TaskInfo} for the given {@code taskId} from {@link 
#activeTasks} if present,
+   * otherwise returns an empty optional.
+   */
+  public Optional<TaskInfo<Task, TaskStatus>> getActiveTaskInfo(String taskId)
+  {
+    final TaskEntry entry = activeTasks.get(taskId);
+    if (entry == null) {
+      return Optional.absent();
+    }
+    return Optional.of(entry.taskInfo);

Review Comment:
   Nit: shorter alternative
   ```suggestion
       return entry == null ? Optional.absent() : Optional.of(entry.taskInfo);
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -1140,17 +1164,35 @@ private void removeTaskLock(Task task)
    */
   static class TaskEntry
   {
-    private final Task task;
+    private TaskInfo<Task, TaskStatus> taskInfo;
 
     private DateTime lastUpdatedTime;
     private ListenableFuture<TaskStatus> future = null;
     private boolean isComplete = false;
 
-    TaskEntry(Task task)
+    TaskEntry(TaskInfo<Task, TaskStatus> taskInfo)
     {
-      this.task = task;
+      this.taskInfo = taskInfo;
       this.lastUpdatedTime = DateTimes.nowUtc();
     }
+
+    /**
+     * Returns the task associated with this {@link TaskEntry}
+     */
+    public Task getTask()
+    {
+      return taskInfo.getTask();
+    }
+
+    /**
+     * Updates the {@link TaskStatus} for the task associated with this {@link 
TaskEntry} and sets the corresponding
+     * update time.
+     */
+    public void updateStatus(TaskStatus status, DateTime updateTime)

Review Comment:
   ```suggestion
       void updateStatus(TaskStatus status, DateTime updateTime)
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -708,6 +710,8 @@ private void notifyStatus(final TaskEntry entry, final 
TaskStatus taskStatus, St
 
     // Mark this task as complete, so it isn't managed while being cleaned up.
     entry.isComplete = true;
+    // Update the task status associated with this entry

Review Comment:
   Not really needed.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -1140,17 +1164,35 @@ private void removeTaskLock(Task task)
    */
   static class TaskEntry
   {
-    private final Task task;
+    private TaskInfo<Task, TaskStatus> taskInfo;
 
     private DateTime lastUpdatedTime;
     private ListenableFuture<TaskStatus> future = null;
     private boolean isComplete = false;
 
-    TaskEntry(Task task)
+    TaskEntry(TaskInfo<Task, TaskStatus> taskInfo)
     {
-      this.task = task;
+      this.taskInfo = taskInfo;
       this.lastUpdatedTime = DateTimes.nowUtc();
     }
+
+    /**
+     * Returns the task associated with this {@link TaskEntry}
+     */
+    public Task getTask()

Review Comment:
   ```suggestion
       Task getTask()
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java:
##########
@@ -182,13 +192,22 @@ public List<Task> getActiveTasks()
   {
     // filter out taskInfo with a null 'task' which should only happen in 
practice if we are missing a jackson module
     // and don't know what to do with the payload, so we won't be able to make 
use of it anyway
-    return 
handler.getTaskInfos(Collections.singletonMap(TaskLookupType.ACTIVE, 
ActiveTaskLookup.getInstance()), null)
+    return handler.getTaskInfos(Map.of(TaskLookupType.ACTIVE, 
ActiveTaskLookup.getInstance()), null)
                   .stream()
                   .filter(taskInfo -> taskInfo.getStatus().isRunnable() && 
taskInfo.getTask() != null)

Review Comment:
   Should we simplify this to the following?
   ```suggestion
       return handler.getActiveTaskInfos()
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -548,27 +549,28 @@ public boolean add(final Task task)
   }
 
   @GuardedBy("startStopLock")
-  private void addTaskInternal(final Task task, final DateTime updateTime)
+  private void addTaskInternal(final TaskInfo<Task, TaskStatus> taskInfo, 
final DateTime updateTime)
   {
     final AtomicBoolean added = new AtomicBoolean(false);
     final TaskEntry entry = addOrUpdateTaskEntry(
-        task.getId(),
+        taskInfo.getId(),
         prevEntry -> {
           if (prevEntry == null) {
             added.set(true);
-            return new TaskEntry(task);
+            return new TaskEntry(taskInfo);
           } else if (prevEntry.lastUpdatedTime.isBefore(updateTime)) {
-            prevEntry.lastUpdatedTime = updateTime;
+            // Ensure we keep the current status up-to-date
+            prevEntry.updateStatus(taskInfo.getStatus(), updateTime);
           }
 
           return prevEntry;
         }
     );
 
     if (added.get()) {
-      taskLockbox.add(task);
-    } else if (!entry.task.equals(task)) {
-      throw new ISE("Cannot add task[%s] as a different task for the same ID 
has already been added.", task.getId());
+      taskLockbox.add(taskInfo.getTask());
+    } else if (!entry.getTask().equals(taskInfo.getTask())) {
+      throw new ISE("Cannot add task[%s] as a different task for the same ID 
has already been added.", taskInfo.getTask().getId());

Review Comment:
   Can use `taskInfo.getId()` instead of `taskInfo.getTask().getId()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to