This is an automated email from the ASF dual-hosted git repository.
tuglu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 6d7c3782153 Poll from memory before fetching task information from DB
(#18448)
6d7c3782153 is described below
commit 6d7c3782153e45888ccca80032453276e541e494
Author: jtuglu1 <[email protected]>
AuthorDate: Sat Aug 30 22:59:07 2025 -0700
Poll from memory before fetching task information from DB (#18448)
- When running high #s of tasks in a cluster, the Overlord can become
bottlenecked by the endpoint `/task/{taskid}/status` as it issues an expensive
I/O against the metadata store every time.
- This change allows the requests to fetch task status from memory if the
task entry exists, otherwise falling back to the DB.
---
.../indexing/overlord/HeapMemoryTaskStorage.java | 15 ++-
.../indexing/overlord/MetadataTaskStorage.java | 25 ++++-
.../druid/indexing/overlord/TaskQueryTool.java | 21 ++--
.../apache/druid/indexing/overlord/TaskQueue.java | 124 +++++++++++++--------
.../druid/indexing/overlord/TaskStorage.java | 11 +-
.../druid/indexing/overlord/TaskQueueTest.java | 66 +++++++++++
.../overlord/http/OverlordResourceTest.java | 31 ++++--
.../java/org/apache/druid/indexer/TaskInfo.java | 8 ++
8 files changed, 225 insertions(+), 76 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java
index 5109abe2377..5f01a5fcc56 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java
@@ -76,7 +76,7 @@ public class HeapMemoryTaskStorage implements TaskStorage
}
@Override
- public void insert(Task task, TaskStatus status)
+ public TaskInfo<Task, TaskStatus> insert(Task task, TaskStatus status)
{
Preconditions.checkNotNull(task, "task");
Preconditions.checkNotNull(status, "status");
@@ -94,6 +94,7 @@ public class HeapMemoryTaskStorage implements TaskStorage
}
log.info("Inserted task[%s] with status[%s]", task.getId(), status);
+ return TaskStuff.toTaskInfo(newTaskStuff);
}
@Override
@@ -159,6 +160,18 @@ public class HeapMemoryTaskStorage implements TaskStorage
return listBuilder.build();
}
+ @Override
+ public List<TaskInfo<Task, TaskStatus>> getActiveTaskInfos()
+ {
+ final ImmutableList.Builder<TaskInfo<Task, TaskStatus>> listBuilder =
ImmutableList.builder();
+ for (final TaskStuff taskStuff : tasks.values()) {
+ if (taskStuff.getStatus().isRunnable()) {
+ listBuilder.add(TaskStuff.toTaskInfo(taskStuff));
+ }
+ }
+ return listBuilder.build();
+ }
+
@Override
public List<Task> getActiveTasksByDatasource(String datasource)
{
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java
index 683b9e80549..ae78bc2c3ff 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java
@@ -46,6 +46,7 @@ import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.TaskLookup;
import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup;
import org.apache.druid.metadata.TaskLookup.TaskLookupType;
+import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.Collections;
@@ -111,7 +112,7 @@ public class MetadataTaskStorage implements TaskStorage
}
@Override
- public void insert(final Task task, final TaskStatus status)
+ public TaskInfo<Task, TaskStatus> insert(final Task task, final TaskStatus
status)
{
Preconditions.checkNotNull(task, "task");
Preconditions.checkNotNull(status, "status");
@@ -123,11 +124,12 @@ public class MetadataTaskStorage implements TaskStorage
);
log.info("Inserting task [%s] with status [%s].", task.getId(), status);
+ final DateTime insertionTime = DateTimes.nowUtc();
try {
handler.insert(
task.getId(),
- DateTimes.nowUtc(),
+ insertionTime,
task.getDataSource(),
task,
status.isRunnable(),
@@ -142,6 +144,14 @@ public class MetadataTaskStorage implements TaskStorage
catch (Exception e) {
throw new RuntimeException(e);
}
+
+ return new TaskInfo<>(
+ task.getId(),
+ insertionTime,
+ status,
+ task.getDataSource(),
+ task
+ );
}
@Override
@@ -182,10 +192,17 @@ public class MetadataTaskStorage implements TaskStorage
{
// filter out taskInfo with a null 'task' which should only happen in
practice if we are missing a jackson module
// and don't know what to do with the payload, so we won't be able to make
use of it anyway
- return
handler.getTaskInfos(Collections.singletonMap(TaskLookupType.ACTIVE,
ActiveTaskLookup.getInstance()), null)
+ return getActiveTaskInfos().stream()
+ .map(TaskInfo::getTask)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public List<TaskInfo<Task, TaskStatus>> getActiveTaskInfos()
+ {
+ return handler.getTaskInfos(Map.of(TaskLookupType.ACTIVE,
ActiveTaskLookup.getInstance()), null)
.stream()
.filter(taskInfo -> taskInfo.getStatus().isRunnable() &&
taskInfo.getTask() != null)
- .map(TaskInfo::getTask)
.collect(Collectors.toList());
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java
index 8e4830611be..d33c0a2769b 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java
@@ -33,7 +33,6 @@ import
org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import org.apache.druid.indexing.overlord.http.TaskStateLookup;
import org.apache.druid.indexing.overlord.http.TotalWorkerCapacityResponse;
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
-import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
@@ -41,7 +40,6 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.metadata.TaskLookup;
import org.apache.druid.metadata.TaskLookup.TaskLookupType;
-import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
@@ -147,6 +145,13 @@ public class TaskQueryTool
@Nullable
public TaskInfo<Task, TaskStatus> getTaskInfo(String taskId)
{
+ final Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
+ if (taskQueue.isPresent()) {
+ final Optional<TaskInfo<Task, TaskStatus>> taskStatus =
taskQueue.get().getActiveTaskInfo(taskId);
+ if (taskStatus.isPresent()) {
+ return taskStatus.get();
+ }
+ }
return storage.getTaskInfo(taskId);
}
@@ -156,12 +161,10 @@ public class TaskQueryTool
if (taskQueue.isPresent()) {
// Serve active task statuses from memory
final List<TaskStatusPlus> taskStatusPlusList = new ArrayList<>();
+ final List<TaskInfo<Task, TaskStatus>> activeTasks =
taskQueue.get().getTaskInfos();
- // Use a dummy created time as this is not used by the caller, just
needs to be non-null
- final DateTime createdTime = DateTimes.nowUtc();
-
- final List<Task> activeTasks = taskQueue.get().getTasks();
- for (Task task : activeTasks) {
+ for (TaskInfo<Task, TaskStatus> taskInfo : activeTasks) {
+ final Task task = taskInfo.getTask();
final Optional<TaskStatus> statusOptional =
taskQueue.get().getTaskStatus(task.getId());
if (statusOptional.isPresent()) {
final TaskStatus status = statusOptional.get();
@@ -170,8 +173,8 @@ public class TaskQueryTool
task.getId(),
task.getGroupId(),
task.getType(),
- createdTime,
- createdTime,
+ taskInfo.getCreatedTime(),
+ taskInfo.getCreatedTime(),
status.getStatusCode(),
null,
status.getDuration(),
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index f18c3b4465e..d7024eabfe5 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -37,6 +37,7 @@ import org.apache.druid.error.DruidException;
import org.apache.druid.error.EntryAlreadyExists;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.Counters;
@@ -425,7 +426,7 @@ public class TaskQueue
{
// Don't do anything with tasks that have recently finished; notifyStatus
will handle it.
if (entry != null && !entry.isComplete) {
- final Task task = entry.task;
+ final Task task = entry.getTask();
if (entry.future == null) {
if (runnerTaskFuture == null) {
@@ -535,10 +536,10 @@ public class TaskQueue
// If this throws with any sort of exception, including
TaskExistsException, we don't want to
// insert the task into our queue. So don't catch it.
final DateTime insertTime = DateTimes.nowUtc();
- taskStorage.insert(task, TaskStatus.running(task.getId()));
+ final TaskInfo<Task, TaskStatus> taskInfo = taskStorage.insert(task,
TaskStatus.running(task.getId()));
// Note: the TaskEntry created for this task doesn't actually use the
`insertTime` timestamp, it uses a new
// timestamp created in the ctor. This prevents races from occurring
while syncFromStorage() is happening.
- addTaskInternal(task, insertTime);
+ addTaskInternal(taskInfo, insertTime);
requestManagement();
return true;
}
@@ -548,17 +549,17 @@ public class TaskQueue
}
@GuardedBy("startStopLock")
- private void addTaskInternal(final Task task, final DateTime updateTime)
+ private void addTaskInternal(final TaskInfo<Task, TaskStatus> taskInfo,
final DateTime updateTime)
{
final AtomicBoolean added = new AtomicBoolean(false);
final TaskEntry entry = addOrUpdateTaskEntry(
- task.getId(),
+ taskInfo.getId(),
prevEntry -> {
if (prevEntry == null) {
added.set(true);
- return new TaskEntry(task);
+ return new TaskEntry(taskInfo);
} else if (prevEntry.lastUpdatedTime.isBefore(updateTime)) {
- prevEntry.lastUpdatedTime = updateTime;
+ prevEntry.updateStatus(taskInfo.getStatus(), updateTime);
}
return prevEntry;
@@ -566,9 +567,9 @@ public class TaskQueue
);
if (added.get()) {
- taskLockbox.add(task);
- } else if (!entry.task.equals(task)) {
- throw new ISE("Cannot add task[%s] as a different task for the same ID
has already been added.", task.getId());
+ taskLockbox.add(taskInfo.getTask());
+ } else if (!entry.getTask().equals(taskInfo.getTask())) {
+ throw new ISE("Cannot add task[%s] as a different task for the same ID
has already been added.", taskInfo.getId());
}
}
@@ -584,14 +585,14 @@ public class TaskQueue
@GuardedBy("startStopLock")
private boolean removeTaskInternal(final String taskId, final DateTime
deleteTime)
{
- final AtomicReference<Task> removedTask = new AtomicReference<>();
+ final AtomicReference<TaskInfo<Task, TaskStatus>> removedTask = new
AtomicReference<>();
addOrUpdateTaskEntry(
taskId,
prevEntry -> {
// Remove the task only if it is complete OR it doesn't have a more
recent update
if (prevEntry != null && (prevEntry.isComplete ||
prevEntry.lastUpdatedTime.isBefore(deleteTime))) {
- removedTask.set(prevEntry.task);
+ removedTask.set(prevEntry.taskInfo);
// Remove this taskId from activeTasks by mapping it to null
return null;
}
@@ -601,7 +602,7 @@ public class TaskQueue
);
if (removedTask.get() != null) {
- removeTaskLock(removedTask.get());
+ removeTaskLock(removedTask.get().getTask());
return true;
}
return false;
@@ -686,7 +687,7 @@ public class TaskQueue
return;
}
- final Task task = entry.task;
+ final Task task = entry.getTask();
Preconditions.checkNotNull(task, "task");
Preconditions.checkNotNull(taskStatus, "status");
Preconditions.checkState(active, "Queue is not active!");
@@ -708,6 +709,7 @@ public class TaskQueue
// Mark this task as complete, so it isn't managed while being cleaned up.
entry.isComplete = true;
+ entry.updateStatus(taskStatus, DateTimes.nowUtc());
final TaskLocation taskLocation = taskRunner.getTaskLocation(task.getId());
@@ -835,26 +837,26 @@ public class TaskQueue
try {
if (active) {
- final Map<String, Task> newTasks =
- CollectionUtils.toMap(taskStorage.getActiveTasks(), Task::getId,
Function.identity());
- final Map<String, Task> oldTasks =
- CollectionUtils.mapValues(activeTasks, entry -> entry.task);
+ final Map<String, TaskInfo<Task, TaskStatus>> newTasks =
+ CollectionUtils.toMap(taskStorage.getActiveTaskInfos(),
TaskInfo::getId, Function.identity());
+ final Map<String, TaskInfo<Task, TaskStatus>> oldTasks =
+ CollectionUtils.mapValues(activeTasks, entry -> entry.taskInfo);
// Identify the tasks that have been added or removed from the storage
- final MapDifference<String, Task> mapDifference =
Maps.difference(oldTasks, newTasks);
- final Collection<Task> addedTasks =
mapDifference.entriesOnlyOnRight().values();
- final Collection<Task> removedTasks =
mapDifference.entriesOnlyOnLeft().values();
+ final MapDifference<String, TaskInfo<Task, TaskStatus>> mapDifference
= Maps.difference(oldTasks, newTasks);
+ final Collection<TaskInfo<Task, TaskStatus>> addedTasks =
mapDifference.entriesOnlyOnRight().values();
+ final Collection<TaskInfo<Task, TaskStatus>> removedTasks =
mapDifference.entriesOnlyOnLeft().values();
// Remove tasks not present in metadata store if their lastUpdatedTime
is before syncStartTime
int numTasksRemoved = 0;
- for (Task task : removedTasks) {
+ for (TaskInfo<Task, TaskStatus> task : removedTasks) {
if (removeTaskInternal(task.getId(), syncStartTime)) {
++numTasksRemoved;
}
}
// Add new tasks present in metadata store if their lastUpdatedTime is
before syncStartTime
- for (Task task : addedTasks) {
+ for (TaskInfo<Task, TaskStatus> task : addedTasks) {
addTaskInternal(task, syncStartTime);
}
@@ -876,15 +878,6 @@ public class TaskQueue
}
}
- private static Map<String, Task> toTaskIDMap(List<Task> taskList)
- {
- Map<String, Task> rv = new HashMap<>();
- for (Task task : taskList) {
- rv.put(task.getId(), task);
- }
- return rv;
- }
-
private Map<RowKey, Long> getDeltaValues(Map<RowKey, Long> total,
Map<RowKey, Long> prev)
{
final Map<RowKey, Long> deltaValues = new HashMap<>();
@@ -921,7 +914,7 @@ public class TaskQueue
{
return activeTasks.values().stream()
.filter(entry -> !entry.isComplete)
- .map(entry -> entry.task)
+ .map(TaskEntry::getTask)
.collect(Collectors.toMap(Task::getId,
TaskQueue::getMetricKey));
}
@@ -958,7 +951,7 @@ public class TaskQueue
return activeTasks.values().stream()
.filter(entry -> !entry.isComplete)
- .map(entry -> entry.task)
+ .map(TaskEntry::getTask)
.filter(task ->
!runnerKnownTaskIds.contains(task.getId()))
.collect(Collectors.toMap(TaskQueue::getMetricKey, task
-> 1L, Long::sum));
}
@@ -998,16 +991,16 @@ public class TaskQueue
*/
public Optional<Task> getActiveTask(String id)
{
- final TaskEntry entry = activeTasks.get(id);
- if (entry == null) {
+ final Optional<TaskInfo<Task, TaskStatus>> taskInfo =
getActiveTaskInfo(id);
+ if (!taskInfo.isPresent()) {
return Optional.absent();
}
- Task task = entry.task;
+ Task task = taskInfo.get().getTask();
if (task != null) {
try {
// Write and read the value using a mapper with password redaction
mixin.
- task =
passwordRedactingMapper.readValue(passwordRedactingMapper.writeValueAsString(entry.task),
Task.class);
+ task =
passwordRedactingMapper.readValue(passwordRedactingMapper.writeValueAsString(task),
Task.class);
}
catch (JsonProcessingException e) {
log.error(e, "Failed to serialize or deserialize task with id [%s].",
task.getId());
@@ -1020,25 +1013,40 @@ public class TaskQueue
}
/**
- * List of all active and completed tasks currently being managed by this
- * TaskQueue.
+ * Gets the {@link TaskInfo} for the given {@code taskId} from {@link
#activeTasks} if present,
+ * otherwise returns an empty optional.
+ */
+ public Optional<TaskInfo<Task, TaskStatus>> getActiveTaskInfo(String taskId)
+ {
+ final TaskEntry entry = activeTasks.get(taskId);
+ return entry == null ? Optional.absent() : Optional.of(entry.taskInfo);
+ }
+
+ /**
+ * List of all active and completed task infos currently being managed by
this TaskQueue.
+ */
+ public List<TaskInfo<Task, TaskStatus>> getTaskInfos()
+ {
+ return activeTasks.values().stream().map(entry ->
entry.taskInfo).collect(Collectors.toList());
+ }
+
+ /**
+ * List of all active and completed tasks currently being managed by this
TaskQueue.
*/
public List<Task> getTasks()
{
- return activeTasks.values().stream().map(entry ->
entry.task).collect(Collectors.toList());
+ return
getTaskInfos().stream().map(TaskInfo::getTask).collect(Collectors.toList());
}
/**
- * Returns the list of currently active tasks for the given datasource.
+ * Returns a map of currently active tasks for the given datasource.
*/
public Map<String, Task> getActiveTasksForDatasource(String datasource)
{
return activeTasks.values().stream().filter(
entry -> !entry.isComplete
- && entry.task.getDataSource().equals(datasource)
- ).map(
- entry -> entry.task
- ).collect(
+ && entry.taskInfo.getDataSource().equals(datasource)
+ ).map(TaskEntry::getTask).collect(
Collectors.toMap(Task::getId, Function.identity())
);
}
@@ -1140,17 +1148,35 @@ public class TaskQueue
*/
static class TaskEntry
{
- private final Task task;
+ private TaskInfo<Task, TaskStatus> taskInfo;
private DateTime lastUpdatedTime;
private ListenableFuture<TaskStatus> future = null;
private boolean isComplete = false;
- TaskEntry(Task task)
+ TaskEntry(TaskInfo<Task, TaskStatus> taskInfo)
{
- this.task = task;
+ this.taskInfo = taskInfo;
this.lastUpdatedTime = DateTimes.nowUtc();
}
+
+ /**
+ * Returns the task associated with this {@link TaskEntry}
+ */
+ Task getTask()
+ {
+ return taskInfo.getTask();
+ }
+
+ /**
+ * Updates the {@link TaskStatus} for the task associated with this {@link
TaskEntry} and sets the corresponding
+ * update time.
+ */
+ void updateStatus(TaskStatus status, DateTime updateTime)
+ {
+ this.taskInfo = this.taskInfo.withStatus(status);
+ this.lastUpdatedTime = updateTime;
+ }
}
private static RowKey getMetricKey(final Task task)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java
index b231b3f37c2..32f1a9372cb 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java
@@ -40,8 +40,9 @@ public interface TaskStorage
*
* @param task task to add
* @param status task status
+ * @return a TaskInfo object representing the task information that was
committed to the storage facility
*/
- void insert(Task task, TaskStatus status);
+ TaskInfo<Task, TaskStatus> insert(Task task, TaskStatus status);
/**
* Persists task status in the storage facility. This method should throw an
exception if the task status lifecycle
@@ -117,6 +118,14 @@ public interface TaskStorage
*/
List<Task> getActiveTasks();
+ /**
+ * Returns a list of currently running or pending task infos as stored in
the storage facility. No particular order
+ * is guaranteed, but implementations are encouraged to return tasks in
ascending order of creation.
+ *
+ * @return list of active task infos
+ */
+ List<TaskInfo<Task, TaskStatus>> getActiveTaskInfos();
+
/**
* Returns a list of currently running or pending tasks as stored in the
storage facility. No particular order
* is guaranteed, but implementations are encouraged to return tasks in
ascending order of creation.
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
index d03a5c8c2cf..0cb35ba3917 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
@@ -38,6 +38,7 @@ import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.WorkerNodeService;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
@@ -624,6 +625,71 @@ public class TaskQueueTest extends IngestionTestBase
Assert.assertEquals(taskInStorageAsString, taskInQueueAsString);
}
+ @Test
+ public void testTaskShutdownUpdatesTaskStatusInTaskQueue()
+ {
+ final String shutdownReason = "Test shutdown reason";
+ final TaskStatus shutdownStatus = TaskStatus.failure("shutdown-test-task",
shutdownReason);
+ final TestTask task = new TestTask("shutdown-test-task",
Intervals.of("2021-01-01/P1D"));
+ taskQueue.add(task);
+
+ final Optional<TaskInfo<Task, TaskStatus>> activeInfoOpt =
taskQueue.getActiveTaskInfo(task.getId());
+ Assert.assertTrue(activeInfoOpt.isPresent());
+ Assert.assertEquals(TaskState.RUNNING,
activeInfoOpt.get().getStatus().getStatusCode());
+
+ taskQueue.shutdown(task.getId(), shutdownReason);
+
+ final Optional<TaskInfo<Task, TaskStatus>> afterShutdownInfoOpt =
taskQueue.getActiveTaskInfo(task.getId());
+ Assert.assertTrue(afterShutdownInfoOpt.isPresent());
+ Assert.assertEquals(shutdownStatus,
afterShutdownInfoOpt.get().getStatus());
+ Assert.assertEquals(shutdownStatus,
getTaskStorage().getStatus(task.getId()).get());
+ }
+
+ @Test
+ public void testTaskSuccessUpdatesTaskStatusInTaskQueue() throws Exception
+ {
+ final TaskStatus successStatus = TaskStatus.success("success-test-task");
+ final TestTask task = new TestTask("success-test-task",
Intervals.of("2021-01-01/P1D"));
+ taskQueue.add(task);
+ taskQueue.manageQueuedTasks();
+
+ // ensure success callback has fired
+ Thread.sleep(100);
+ Assert.assertTrue(task.isDone());
+
+ final Optional<TaskInfo<Task, TaskStatus>> activeInfoOpt =
taskQueue.getActiveTaskInfo(task.getId());
+ Assert.assertTrue(activeInfoOpt.isPresent());
+ Assert.assertEquals(successStatus, activeInfoOpt.get().getStatus());
+ Assert.assertEquals(successStatus,
getTaskStorage().getStatus(task.getId()).get());
+ }
+
+ @Test
+ public void testTaskFailureUpdatesTaskStatusInTaskQueue() throws Exception
+ {
+ final TaskStatus failedStatus = TaskStatus.failure("failure-test-task",
"error");
+ final TestTask task = new TestTask("failure-test-task",
Intervals.of("2021-01-01/P1D"))
+ {
+ @Override
+ public TaskStatus runTask(TaskToolbox toolbox)
+ {
+ super.done = true;
+ return failedStatus;
+ }
+ };
+
+ taskQueue.add(task);
+ taskQueue.manageQueuedTasks();
+
+ // ensure failed callback has fired
+ Thread.sleep(100);
+ Assert.assertTrue(task.isDone());
+
+ final Optional<TaskInfo<Task, TaskStatus>> activeInfoOpt =
taskQueue.getActiveTaskInfo(task.getId());
+ Assert.assertTrue(activeInfoOpt.isPresent());
+ Assert.assertEquals(failedStatus, activeInfoOpt.get().getStatus());
+ Assert.assertEquals(failedStatus,
getTaskStorage().getStatus(task.getId()).get());
+ }
+
private HttpRemoteTaskRunner createHttpRemoteTaskRunner()
{
final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
index afa67d41bea..a6988feeeb1 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
@@ -999,21 +999,28 @@ public class OverlordResourceTest
final Task task = NoopTask.create();
final String taskId = task.getId();
final TaskStatus status = TaskStatus.running(taskId);
+ final TaskInfo<Task, TaskStatus> taskInfo = new TaskInfo<>(
+ task.getId(),
+ DateTimes.of("2018-01-01"),
+ status,
+ task.getDataSource(),
+ task
+ );
- EasyMock.expect(taskQueryTool.getTaskInfo(taskId))
- .andReturn(new TaskInfo(
- task.getId(),
- DateTimes.of("2018-01-01"),
- status,
- task.getDataSource(),
- task
- ));
-
- EasyMock.expect(taskQueryTool.getTaskInfo("othertask"))
- .andReturn(null);
+ // For noop, simulate in-memory hit
+
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).once();
+
EasyMock.expect(taskQueue.getActiveTaskInfo(taskId)).andReturn(Optional.of(taskInfo)).once();
+
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).once();
EasyMock.<Collection<? extends
TaskRunnerWorkItem>>expect(taskRunner.getKnownTasks())
- .andReturn(ImmutableList.of());
+ .andReturn(ImmutableList.of(new
MockTaskRunnerWorkItem(taskId))).anyTimes();
+
EasyMock.expect(taskRunner.getRunnerTaskState(taskId)).andReturn(RunnerTaskState.RUNNING).once();
+
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).once();
+
+ // For "othertask", simulate in-memory miss, then task storage read
+
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).once();
+
EasyMock.expect(taskQueue.getActiveTaskInfo("othertask")).andReturn(Optional.absent()).once();
+
EasyMock.expect(taskStorage.getTaskInfo("othertask")).andReturn(null).once();
replayAll();
diff --git a/processing/src/main/java/org/apache/druid/indexer/TaskInfo.java
b/processing/src/main/java/org/apache/druid/indexer/TaskInfo.java
index aa9aa1097db..2616f9daf02 100644
--- a/processing/src/main/java/org/apache/druid/indexer/TaskInfo.java
+++ b/processing/src/main/java/org/apache/druid/indexer/TaskInfo.java
@@ -76,4 +76,12 @@ public class TaskInfo<EntryType, StatusType>
{
return task;
}
+
+ /**
+ * Returns a copy of this TaskInfo object with the given status.
+ */
+ public TaskInfo<EntryType, StatusType> withStatus(StatusType status)
+ {
+ return new TaskInfo<>(id, createdTime, status, dataSource, task);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]