This is an automated email from the ASF dual-hosted git repository.

tuglu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d7c3782153 Poll from memory before fetching task information from DB 
(#18448)
6d7c3782153 is described below

commit 6d7c3782153e45888ccca80032453276e541e494
Author: jtuglu1 <[email protected]>
AuthorDate: Sat Aug 30 22:59:07 2025 -0700

    Poll from memory before fetching task information from DB (#18448)
    
    - When running high #s of tasks in a cluster, the Overlord can become 
bottlenecked by the endpoint `/task/{taskid}/status` as it issues an expensive 
I/O against the metadata store every time.
    - This change allows the requests to fetch task status from memory if the 
task entry exists, otherwise falling back to the DB.
---
 .../indexing/overlord/HeapMemoryTaskStorage.java   |  15 ++-
 .../indexing/overlord/MetadataTaskStorage.java     |  25 ++++-
 .../druid/indexing/overlord/TaskQueryTool.java     |  21 ++--
 .../apache/druid/indexing/overlord/TaskQueue.java  | 124 +++++++++++++--------
 .../druid/indexing/overlord/TaskStorage.java       |  11 +-
 .../druid/indexing/overlord/TaskQueueTest.java     |  66 +++++++++++
 .../overlord/http/OverlordResourceTest.java        |  31 ++++--
 .../java/org/apache/druid/indexer/TaskInfo.java    |   8 ++
 8 files changed, 225 insertions(+), 76 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java
index 5109abe2377..5f01a5fcc56 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java
@@ -76,7 +76,7 @@ public class HeapMemoryTaskStorage implements TaskStorage
   }
 
   @Override
-  public void insert(Task task, TaskStatus status)
+  public TaskInfo<Task, TaskStatus> insert(Task task, TaskStatus status)
   {
     Preconditions.checkNotNull(task, "task");
     Preconditions.checkNotNull(status, "status");
@@ -94,6 +94,7 @@ public class HeapMemoryTaskStorage implements TaskStorage
     }
 
     log.info("Inserted task[%s] with status[%s]", task.getId(), status);
+    return TaskStuff.toTaskInfo(newTaskStuff);
   }
 
   @Override
@@ -159,6 +160,18 @@ public class HeapMemoryTaskStorage implements TaskStorage
     return listBuilder.build();
   }
 
+  @Override
+  public List<TaskInfo<Task, TaskStatus>> getActiveTaskInfos()
+  {
+    final ImmutableList.Builder<TaskInfo<Task, TaskStatus>> listBuilder = 
ImmutableList.builder();
+    for (final TaskStuff taskStuff : tasks.values()) {
+      if (taskStuff.getStatus().isRunnable()) {
+        listBuilder.add(TaskStuff.toTaskInfo(taskStuff));
+      }
+    }
+    return listBuilder.build();
+  }
+
   @Override
   public List<Task> getActiveTasksByDatasource(String datasource)
   {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java
index 683b9e80549..ae78bc2c3ff 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java
@@ -46,6 +46,7 @@ import org.apache.druid.metadata.MetadataStorageTablesConfig;
 import org.apache.druid.metadata.TaskLookup;
 import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup;
 import org.apache.druid.metadata.TaskLookup.TaskLookupType;
+import org.joda.time.DateTime;
 
 import javax.annotation.Nullable;
 import java.util.Collections;
@@ -111,7 +112,7 @@ public class MetadataTaskStorage implements TaskStorage
   }
 
   @Override
-  public void insert(final Task task, final TaskStatus status)
+  public TaskInfo<Task, TaskStatus> insert(final Task task, final TaskStatus 
status)
   {
     Preconditions.checkNotNull(task, "task");
     Preconditions.checkNotNull(status, "status");
@@ -123,11 +124,12 @@ public class MetadataTaskStorage implements TaskStorage
     );
 
     log.info("Inserting task [%s] with status [%s].", task.getId(), status);
+    final DateTime insertionTime = DateTimes.nowUtc();
 
     try {
       handler.insert(
           task.getId(),
-          DateTimes.nowUtc(),
+          insertionTime,
           task.getDataSource(),
           task,
           status.isRunnable(),
@@ -142,6 +144,14 @@ public class MetadataTaskStorage implements TaskStorage
     catch (Exception e) {
       throw new RuntimeException(e);
     }
+
+    return new TaskInfo<>(
+        task.getId(),
+        insertionTime,
+        status,
+        task.getDataSource(),
+        task
+    );
   }
 
   @Override
@@ -182,10 +192,17 @@ public class MetadataTaskStorage implements TaskStorage
   {
     // filter out taskInfo with a null 'task' which should only happen in 
practice if we are missing a jackson module
     // and don't know what to do with the payload, so we won't be able to make 
use of it anyway
-    return 
handler.getTaskInfos(Collections.singletonMap(TaskLookupType.ACTIVE, 
ActiveTaskLookup.getInstance()), null)
+    return getActiveTaskInfos().stream()
+                               .map(TaskInfo::getTask)
+                               .collect(Collectors.toList());
+  }
+
+  @Override
+  public List<TaskInfo<Task, TaskStatus>> getActiveTaskInfos()
+  {
+    return handler.getTaskInfos(Map.of(TaskLookupType.ACTIVE, 
ActiveTaskLookup.getInstance()), null)
                   .stream()
                   .filter(taskInfo -> taskInfo.getStatus().isRunnable() && 
taskInfo.getTask() != null)
-                  .map(TaskInfo::getTask)
                   .collect(Collectors.toList());
   }
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java
index 8e4830611be..d33c0a2769b 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java
@@ -33,7 +33,6 @@ import 
org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
 import org.apache.druid.indexing.overlord.http.TaskStateLookup;
 import org.apache.druid.indexing.overlord.http.TotalWorkerCapacityResponse;
 import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
-import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
@@ -41,7 +40,6 @@ import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.metadata.LockFilterPolicy;
 import org.apache.druid.metadata.TaskLookup;
 import org.apache.druid.metadata.TaskLookup.TaskLookupType;
-import org.joda.time.DateTime;
 import org.joda.time.Duration;
 import org.joda.time.Interval;
 
@@ -147,6 +145,13 @@ public class TaskQueryTool
   @Nullable
   public TaskInfo<Task, TaskStatus> getTaskInfo(String taskId)
   {
+    final Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
+    if (taskQueue.isPresent()) {
+      final Optional<TaskInfo<Task, TaskStatus>> taskStatus = 
taskQueue.get().getActiveTaskInfo(taskId);
+      if (taskStatus.isPresent()) {
+        return taskStatus.get();
+      }
+    }
     return storage.getTaskInfo(taskId);
   }
 
@@ -156,12 +161,10 @@ public class TaskQueryTool
     if (taskQueue.isPresent()) {
       // Serve active task statuses from memory
       final List<TaskStatusPlus> taskStatusPlusList = new ArrayList<>();
+      final List<TaskInfo<Task, TaskStatus>> activeTasks = 
taskQueue.get().getTaskInfos();
 
-      // Use a dummy created time as this is not used by the caller, just 
needs to be non-null
-      final DateTime createdTime = DateTimes.nowUtc();
-
-      final List<Task> activeTasks = taskQueue.get().getTasks();
-      for (Task task : activeTasks) {
+      for (TaskInfo<Task, TaskStatus> taskInfo : activeTasks) {
+        final Task task = taskInfo.getTask();
         final Optional<TaskStatus> statusOptional = 
taskQueue.get().getTaskStatus(task.getId());
         if (statusOptional.isPresent()) {
           final TaskStatus status = statusOptional.get();
@@ -170,8 +173,8 @@ public class TaskQueryTool
                   task.getId(),
                   task.getGroupId(),
                   task.getType(),
-                  createdTime,
-                  createdTime,
+                  taskInfo.getCreatedTime(),
+                  taskInfo.getCreatedTime(),
                   status.getStatusCode(),
                   null,
                   status.getDuration(),
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index f18c3b4465e..d7024eabfe5 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -37,6 +37,7 @@ import org.apache.druid.error.DruidException;
 import org.apache.druid.error.EntryAlreadyExists;
 import org.apache.druid.error.InvalidInput;
 import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskInfo;
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.Counters;
@@ -425,7 +426,7 @@ public class TaskQueue
   {
     // Don't do anything with tasks that have recently finished; notifyStatus 
will handle it.
     if (entry != null && !entry.isComplete) {
-      final Task task = entry.task;
+      final Task task = entry.getTask();
 
       if (entry.future == null) {
         if (runnerTaskFuture == null) {
@@ -535,10 +536,10 @@ public class TaskQueue
       // If this throws with any sort of exception, including 
TaskExistsException, we don't want to
       // insert the task into our queue. So don't catch it.
       final DateTime insertTime = DateTimes.nowUtc();
-      taskStorage.insert(task, TaskStatus.running(task.getId()));
+      final TaskInfo<Task, TaskStatus> taskInfo = taskStorage.insert(task, 
TaskStatus.running(task.getId()));
       // Note: the TaskEntry created for this task doesn't actually use the 
`insertTime` timestamp, it uses a new
       // timestamp created in the ctor. This prevents races from occurring 
while syncFromStorage() is happening.
-      addTaskInternal(task, insertTime);
+      addTaskInternal(taskInfo, insertTime);
       requestManagement();
       return true;
     }
@@ -548,17 +549,17 @@ public class TaskQueue
   }
 
   @GuardedBy("startStopLock")
-  private void addTaskInternal(final Task task, final DateTime updateTime)
+  private void addTaskInternal(final TaskInfo<Task, TaskStatus> taskInfo, 
final DateTime updateTime)
   {
     final AtomicBoolean added = new AtomicBoolean(false);
     final TaskEntry entry = addOrUpdateTaskEntry(
-        task.getId(),
+        taskInfo.getId(),
         prevEntry -> {
           if (prevEntry == null) {
             added.set(true);
-            return new TaskEntry(task);
+            return new TaskEntry(taskInfo);
           } else if (prevEntry.lastUpdatedTime.isBefore(updateTime)) {
-            prevEntry.lastUpdatedTime = updateTime;
+            prevEntry.updateStatus(taskInfo.getStatus(), updateTime);
           }
 
           return prevEntry;
@@ -566,9 +567,9 @@ public class TaskQueue
     );
 
     if (added.get()) {
-      taskLockbox.add(task);
-    } else if (!entry.task.equals(task)) {
-      throw new ISE("Cannot add task[%s] as a different task for the same ID 
has already been added.", task.getId());
+      taskLockbox.add(taskInfo.getTask());
+    } else if (!entry.getTask().equals(taskInfo.getTask())) {
+      throw new ISE("Cannot add task[%s] as a different task for the same ID 
has already been added.", taskInfo.getId());
     }
   }
 
@@ -584,14 +585,14 @@ public class TaskQueue
   @GuardedBy("startStopLock")
   private boolean removeTaskInternal(final String taskId, final DateTime 
deleteTime)
   {
-    final AtomicReference<Task> removedTask = new AtomicReference<>();
+    final AtomicReference<TaskInfo<Task, TaskStatus>> removedTask = new 
AtomicReference<>();
 
     addOrUpdateTaskEntry(
         taskId,
         prevEntry -> {
           // Remove the task only if it is complete OR it doesn't have a more 
recent update
           if (prevEntry != null && (prevEntry.isComplete || 
prevEntry.lastUpdatedTime.isBefore(deleteTime))) {
-            removedTask.set(prevEntry.task);
+            removedTask.set(prevEntry.taskInfo);
             // Remove this taskId from activeTasks by mapping it to null
             return null;
           }
@@ -601,7 +602,7 @@ public class TaskQueue
     );
 
     if (removedTask.get() != null) {
-      removeTaskLock(removedTask.get());
+      removeTaskLock(removedTask.get().getTask());
       return true;
     }
     return false;
@@ -686,7 +687,7 @@ public class TaskQueue
       return;
     }
 
-    final Task task = entry.task;
+    final Task task = entry.getTask();
     Preconditions.checkNotNull(task, "task");
     Preconditions.checkNotNull(taskStatus, "status");
     Preconditions.checkState(active, "Queue is not active!");
@@ -708,6 +709,7 @@ public class TaskQueue
 
     // Mark this task as complete, so it isn't managed while being cleaned up.
     entry.isComplete = true;
+    entry.updateStatus(taskStatus, DateTimes.nowUtc());
 
     final TaskLocation taskLocation = taskRunner.getTaskLocation(task.getId());
 
@@ -835,26 +837,26 @@ public class TaskQueue
 
     try {
       if (active) {
-        final Map<String, Task> newTasks =
-            CollectionUtils.toMap(taskStorage.getActiveTasks(), Task::getId, 
Function.identity());
-        final Map<String, Task> oldTasks =
-            CollectionUtils.mapValues(activeTasks, entry -> entry.task);
+        final Map<String, TaskInfo<Task, TaskStatus>> newTasks =
+            CollectionUtils.toMap(taskStorage.getActiveTaskInfos(), 
TaskInfo::getId, Function.identity());
+        final Map<String, TaskInfo<Task, TaskStatus>> oldTasks =
+            CollectionUtils.mapValues(activeTasks, entry -> entry.taskInfo);
 
         // Identify the tasks that have been added or removed from the storage
-        final MapDifference<String, Task> mapDifference = 
Maps.difference(oldTasks, newTasks);
-        final Collection<Task> addedTasks = 
mapDifference.entriesOnlyOnRight().values();
-        final Collection<Task> removedTasks = 
mapDifference.entriesOnlyOnLeft().values();
+        final MapDifference<String, TaskInfo<Task, TaskStatus>> mapDifference 
= Maps.difference(oldTasks, newTasks);
+        final Collection<TaskInfo<Task, TaskStatus>> addedTasks = 
mapDifference.entriesOnlyOnRight().values();
+        final Collection<TaskInfo<Task, TaskStatus>> removedTasks = 
mapDifference.entriesOnlyOnLeft().values();
 
         // Remove tasks not present in metadata store if their lastUpdatedTime 
is before syncStartTime
         int numTasksRemoved = 0;
-        for (Task task : removedTasks) {
+        for (TaskInfo<Task, TaskStatus> task : removedTasks) {
           if (removeTaskInternal(task.getId(), syncStartTime)) {
             ++numTasksRemoved;
           }
         }
 
         // Add new tasks present in metadata store if their lastUpdatedTime is 
before syncStartTime
-        for (Task task : addedTasks) {
+        for (TaskInfo<Task, TaskStatus> task : addedTasks) {
           addTaskInternal(task, syncStartTime);
         }
 
@@ -876,15 +878,6 @@ public class TaskQueue
     }
   }
 
-  private static Map<String, Task> toTaskIDMap(List<Task> taskList)
-  {
-    Map<String, Task> rv = new HashMap<>();
-    for (Task task : taskList) {
-      rv.put(task.getId(), task);
-    }
-    return rv;
-  }
-
   private Map<RowKey, Long> getDeltaValues(Map<RowKey, Long> total, 
Map<RowKey, Long> prev)
   {
     final Map<RowKey, Long> deltaValues = new HashMap<>();
@@ -921,7 +914,7 @@ public class TaskQueue
   {
     return activeTasks.values().stream()
                       .filter(entry -> !entry.isComplete)
-                      .map(entry -> entry.task)
+                      .map(TaskEntry::getTask)
                       .collect(Collectors.toMap(Task::getId, 
TaskQueue::getMetricKey));
   }
 
@@ -958,7 +951,7 @@ public class TaskQueue
 
     return activeTasks.values().stream()
                       .filter(entry -> !entry.isComplete)
-                      .map(entry -> entry.task)
+                      .map(TaskEntry::getTask)
                       .filter(task -> 
!runnerKnownTaskIds.contains(task.getId()))
                       .collect(Collectors.toMap(TaskQueue::getMetricKey, task 
-> 1L, Long::sum));
   }
@@ -998,16 +991,16 @@ public class TaskQueue
    */
   public Optional<Task> getActiveTask(String id)
   {
-    final TaskEntry entry = activeTasks.get(id);
-    if (entry == null) {
+    final Optional<TaskInfo<Task, TaskStatus>> taskInfo = 
getActiveTaskInfo(id);
+    if (!taskInfo.isPresent()) {
       return Optional.absent();
     }
 
-    Task task = entry.task;
+    Task task = taskInfo.get().getTask();
     if (task != null) {
       try {
         // Write and read the value using a mapper with password redaction 
mixin.
-        task = 
passwordRedactingMapper.readValue(passwordRedactingMapper.writeValueAsString(entry.task),
 Task.class);
+        task = 
passwordRedactingMapper.readValue(passwordRedactingMapper.writeValueAsString(task),
 Task.class);
       }
       catch (JsonProcessingException e) {
         log.error(e, "Failed to serialize or deserialize task with id [%s].", 
task.getId());
@@ -1020,25 +1013,40 @@ public class TaskQueue
   }
 
   /**
-   * List of all active and completed tasks currently being managed by this
-   * TaskQueue.
+   * Gets the {@link TaskInfo} for the given {@code taskId} from {@link 
#activeTasks} if present,
+   * otherwise returns an empty optional.
+   */
+  public Optional<TaskInfo<Task, TaskStatus>> getActiveTaskInfo(String taskId)
+  {
+    final TaskEntry entry = activeTasks.get(taskId);
+    return entry == null ? Optional.absent() : Optional.of(entry.taskInfo);
+  }
+
+  /**
+   * List of all active and completed task infos currently being managed by 
this TaskQueue.
+   */
+  public List<TaskInfo<Task, TaskStatus>> getTaskInfos()
+  {
+    return activeTasks.values().stream().map(entry -> 
entry.taskInfo).collect(Collectors.toList());
+  }
+
+  /**
+   * List of all active and completed tasks currently being managed by this 
TaskQueue.
    */
   public List<Task> getTasks()
   {
-    return activeTasks.values().stream().map(entry -> 
entry.task).collect(Collectors.toList());
+    return 
getTaskInfos().stream().map(TaskInfo::getTask).collect(Collectors.toList());
   }
 
   /**
-   * Returns the list of currently active tasks for the given datasource.
+   * Returns a map of currently active tasks for the given datasource.
    */
   public Map<String, Task> getActiveTasksForDatasource(String datasource)
   {
     return activeTasks.values().stream().filter(
         entry -> !entry.isComplete
-                 && entry.task.getDataSource().equals(datasource)
-    ).map(
-        entry -> entry.task
-    ).collect(
+                 && entry.taskInfo.getDataSource().equals(datasource)
+    ).map(TaskEntry::getTask).collect(
         Collectors.toMap(Task::getId, Function.identity())
     );
   }
@@ -1140,17 +1148,35 @@ public class TaskQueue
    */
   static class TaskEntry
   {
-    private final Task task;
+    private TaskInfo<Task, TaskStatus> taskInfo;
 
     private DateTime lastUpdatedTime;
     private ListenableFuture<TaskStatus> future = null;
     private boolean isComplete = false;
 
-    TaskEntry(Task task)
+    TaskEntry(TaskInfo<Task, TaskStatus> taskInfo)
     {
-      this.task = task;
+      this.taskInfo = taskInfo;
       this.lastUpdatedTime = DateTimes.nowUtc();
     }
+
+    /**
+     * Returns the task associated with this {@link TaskEntry}
+     */
+    Task getTask()
+    {
+      return taskInfo.getTask();
+    }
+
+    /**
+     * Updates the {@link TaskStatus} for the task associated with this {@link 
TaskEntry} and sets the corresponding
+     * update time.
+     */
+    void updateStatus(TaskStatus status, DateTime updateTime)
+    {
+      this.taskInfo = this.taskInfo.withStatus(status);
+      this.lastUpdatedTime = updateTime;
+    }
   }
 
   private static RowKey getMetricKey(final Task task)
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java
index b231b3f37c2..32f1a9372cb 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java
@@ -40,8 +40,9 @@ public interface TaskStorage
    *
    * @param task   task to add
    * @param status task status
+   * @return a TaskInfo object representing the task information that was 
committed to the storage facility
    */
-  void insert(Task task, TaskStatus status);
+  TaskInfo<Task, TaskStatus> insert(Task task, TaskStatus status);
 
   /**
    * Persists task status in the storage facility. This method should throw an 
exception if the task status lifecycle
@@ -117,6 +118,14 @@ public interface TaskStorage
    */
   List<Task> getActiveTasks();
 
+  /**
+   * Returns a list of currently running or pending task infos as stored in 
the storage facility. No particular order
+   * is guaranteed, but implementations are encouraged to return tasks in 
ascending order of creation.
+   *
+   * @return list of active task infos
+   */
+  List<TaskInfo<Task, TaskStatus>> getActiveTaskInfos();
+
   /**
    * Returns a list of currently running or pending tasks as stored in the 
storage facility. No particular order
    * is guaranteed, but implementations are encouraged to return tasks in 
ascending order of creation.
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
index d03a5c8c2cf..0cb35ba3917 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
@@ -38,6 +38,7 @@ import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
 import org.apache.druid.discovery.WorkerNodeService;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskInfo;
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
@@ -624,6 +625,71 @@ public class TaskQueueTest extends IngestionTestBase
     Assert.assertEquals(taskInStorageAsString, taskInQueueAsString);
   }
 
+  @Test
+  public void testTaskShutdownUpdatesTaskStatusInTaskQueue()
+  {
+    final String shutdownReason = "Test shutdown reason";
+    final TaskStatus shutdownStatus = TaskStatus.failure("shutdown-test-task", 
shutdownReason);
+    final TestTask task = new TestTask("shutdown-test-task", 
Intervals.of("2021-01-01/P1D"));
+    taskQueue.add(task);
+
+    final Optional<TaskInfo<Task, TaskStatus>> activeInfoOpt = 
taskQueue.getActiveTaskInfo(task.getId());
+    Assert.assertTrue(activeInfoOpt.isPresent());
+    Assert.assertEquals(TaskState.RUNNING, 
activeInfoOpt.get().getStatus().getStatusCode());
+
+    taskQueue.shutdown(task.getId(), shutdownReason);
+
+    final Optional<TaskInfo<Task, TaskStatus>> afterShutdownInfoOpt = 
taskQueue.getActiveTaskInfo(task.getId());
+    Assert.assertTrue(afterShutdownInfoOpt.isPresent());
+    Assert.assertEquals(shutdownStatus, 
afterShutdownInfoOpt.get().getStatus());
+    Assert.assertEquals(shutdownStatus, 
getTaskStorage().getStatus(task.getId()).get());
+  }
+
+  @Test
+  public void testTaskSuccessUpdatesTaskStatusInTaskQueue() throws Exception
+  {
+    final TaskStatus successStatus = TaskStatus.success("success-test-task");
+    final TestTask task = new TestTask("success-test-task", 
Intervals.of("2021-01-01/P1D"));
+    taskQueue.add(task);
+    taskQueue.manageQueuedTasks();
+
+    // ensure success callback has fired
+    Thread.sleep(100);
+    Assert.assertTrue(task.isDone());
+
+    final Optional<TaskInfo<Task, TaskStatus>> activeInfoOpt = 
taskQueue.getActiveTaskInfo(task.getId());
+    Assert.assertTrue(activeInfoOpt.isPresent());
+    Assert.assertEquals(successStatus, activeInfoOpt.get().getStatus());
+    Assert.assertEquals(successStatus, 
getTaskStorage().getStatus(task.getId()).get());
+  }
+
+  @Test
+  public void testTaskFailureUpdatesTaskStatusInTaskQueue() throws Exception
+  {
+    final TaskStatus failedStatus = TaskStatus.failure("failure-test-task", 
"error");
+    final TestTask task = new TestTask("failure-test-task", 
Intervals.of("2021-01-01/P1D"))
+    {
+      @Override
+      public TaskStatus runTask(TaskToolbox toolbox)
+      {
+        super.done = true;
+        return failedStatus;
+      }
+    };
+
+    taskQueue.add(task);
+    taskQueue.manageQueuedTasks();
+
+    // ensure failed callback has fired
+    Thread.sleep(100);
+    Assert.assertTrue(task.isDone());
+
+    final Optional<TaskInfo<Task, TaskStatus>> activeInfoOpt = 
taskQueue.getActiveTaskInfo(task.getId());
+    Assert.assertTrue(activeInfoOpt.isPresent());
+    Assert.assertEquals(failedStatus, activeInfoOpt.get().getStatus());
+    Assert.assertEquals(failedStatus, 
getTaskStorage().getStatus(task.getId()).get());
+  }
+
   private HttpRemoteTaskRunner createHttpRemoteTaskRunner()
   {
     final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
index afa67d41bea..a6988feeeb1 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
@@ -999,21 +999,28 @@ public class OverlordResourceTest
     final Task task = NoopTask.create();
     final String taskId = task.getId();
     final TaskStatus status = TaskStatus.running(taskId);
+    final TaskInfo<Task, TaskStatus> taskInfo = new TaskInfo<>(
+        task.getId(),
+        DateTimes.of("2018-01-01"),
+        status,
+        task.getDataSource(),
+        task
+    );
 
-    EasyMock.expect(taskQueryTool.getTaskInfo(taskId))
-            .andReturn(new TaskInfo(
-                task.getId(),
-                DateTimes.of("2018-01-01"),
-                status,
-                task.getDataSource(),
-                task
-            ));
-
-    EasyMock.expect(taskQueryTool.getTaskInfo("othertask"))
-            .andReturn(null);
+    // For noop, simulate in-memory hit
+    
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).once();
+    
EasyMock.expect(taskQueue.getActiveTaskInfo(taskId)).andReturn(Optional.of(taskInfo)).once();
 
+    
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).once();
     EasyMock.<Collection<? extends 
TaskRunnerWorkItem>>expect(taskRunner.getKnownTasks())
-        .andReturn(ImmutableList.of());
+            .andReturn(ImmutableList.of(new 
MockTaskRunnerWorkItem(taskId))).anyTimes();
+    
EasyMock.expect(taskRunner.getRunnerTaskState(taskId)).andReturn(RunnerTaskState.RUNNING).once();
+    
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).once();
+
+    // For "othertask", simulate in-memory miss, then task storage read
+    
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).once();
+    
EasyMock.expect(taskQueue.getActiveTaskInfo("othertask")).andReturn(Optional.absent()).once();
+    
EasyMock.expect(taskStorage.getTaskInfo("othertask")).andReturn(null).once();
 
     replayAll();
 
diff --git a/processing/src/main/java/org/apache/druid/indexer/TaskInfo.java 
b/processing/src/main/java/org/apache/druid/indexer/TaskInfo.java
index aa9aa1097db..2616f9daf02 100644
--- a/processing/src/main/java/org/apache/druid/indexer/TaskInfo.java
+++ b/processing/src/main/java/org/apache/druid/indexer/TaskInfo.java
@@ -76,4 +76,12 @@ public class TaskInfo<EntryType, StatusType>
   {
     return task;
   }
+
+  /**
+   * Returns a copy of this TaskInfo object with the given status.
+   */
+  public TaskInfo<EntryType, StatusType> withStatus(StatusType status)
+  {
+    return new TaskInfo<>(id, createdTime, status, dataSource, task);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to