This is an automated email from the ASF dual-hosted git repository. asdf2014 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push: new d738ce4 Enforce logging when killing a task (#6621) d738ce4 is described below commit d738ce4d2a4430cf95919e27b0eede171cbdf66c Author: Jihoon Son <jihoon...@apache.org> AuthorDate: Thu Nov 15 18:01:56 2018 -0800 Enforce logging when killing a task (#6621) * Enforce logging when killing a task * fix test * address comment * address comment --- .../MaterializedViewSupervisor.java | 4 +- .../indexing/kafka/supervisor/KafkaSupervisor.java | 96 +++++++++++++--------- .../kafka/supervisor/KafkaSupervisorTest.java | 42 ++++++---- .../src/test/resources/log4j2.xml | 35 ++++++++ .../druid/indexing/overlord/ForkingTaskRunner.java | 3 +- .../druid/indexing/overlord/RemoteTaskRunner.java | 3 +- .../overlord/SingleTaskBackgroundRunner.java | 3 +- .../apache/druid/indexing/overlord/TaskQueue.java | 18 ++-- .../apache/druid/indexing/overlord/TaskRunner.java | 9 +- .../overlord/hrtr/HttpRemoteTaskRunner.java | 3 +- .../indexing/overlord/http/OverlordResource.java | 4 +- .../druid/indexing/worker/http/WorkerResource.java | 2 +- ...teTaskRunnerRunPendingTasksConcurrencyTest.java | 6 +- .../druid/indexing/overlord/TestTaskRunner.java | 2 +- .../druid/indexing/overlord/http/OverlordTest.java | 2 +- 15 files changed, 155 insertions(+), 77 deletions(-) diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java index 2aa05e3..260caf0 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java @@ -346,7 +346,7 @@ public class MaterializedViewSupervisor implements Supervisor && !toBuildInterval.get(interval).equals(runningVersion.get(interval)) ) { if (taskMaster.getTaskQueue().isPresent()) { - taskMaster.getTaskQueue().get().shutdown(runningTasks.get(interval).getId()); + taskMaster.getTaskQueue().get().shutdown(runningTasks.get(interval).getId(), "version mismatch"); runningTasks.remove(interval); } } @@ -451,7 +451,7 @@ public class MaterializedViewSupervisor implements Supervisor { for (HadoopIndexTask task : runningTasks.values()) { if (taskMaster.getTaskQueue().isPresent()) { - taskMaster.getTaskQueue().get().shutdown(task.getId()); + taskMaster.getTaskQueue().get().shutdown(task.getId(), "killing all tasks"); } } runningTasks.clear(); diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 6609b24..eac54be 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -747,7 +747,7 @@ public class KafkaSupervisor implements Supervisor // Reset everything boolean result = indexerMetadataStorageCoordinator.deleteDataSourceMetadata(dataSource); log.info("Reset dataSource[%s] - dataSource metadata entry deleted? [%s]", dataSource, result); - taskGroups.values().forEach(this::killTasksInGroup); + taskGroups.values().forEach(group -> killTasksInGroup(group, "DataSourceMetadata is not found while reset")); taskGroups.clear(); partitionGroups.clear(); } else if (!(dataSourceMetadata instanceof KafkaDataSourceMetadata)) { @@ -811,7 +811,7 @@ public class KafkaSupervisor implements Supervisor if (metadataUpdateSuccess) { resetKafkaMetadata.getKafkaPartitions().getPartitionOffsetMap().keySet().forEach(partition -> { final int groupId = getTaskGroupIdForPartition(partition); - killTaskGroupForPartitions(ImmutableSet.of(partition)); + killTaskGroupForPartitions(ImmutableSet.of(partition), "DataSourceMetadata is updated while reset"); taskGroups.remove(groupId); partitionGroups.get(groupId).replaceAll((partitionId, offset) -> NOT_SET); }); @@ -828,19 +828,18 @@ public class KafkaSupervisor implements Supervisor } } - private void killTaskGroupForPartitions(Set<Integer> partitions) + private void killTaskGroupForPartitions(Set<Integer> partitions, String reasonFormat, Object... args) { for (Integer partition : partitions) { - killTasksInGroup(taskGroups.get(getTaskGroupIdForPartition(partition))); + killTasksInGroup(taskGroups.get(getTaskGroupIdForPartition(partition)), reasonFormat, args); } } - private void killTasksInGroup(TaskGroup taskGroup) + private void killTasksInGroup(TaskGroup taskGroup, String reasonFormat, Object... args) { if (taskGroup != null) { for (String taskId : taskGroup.tasks.keySet()) { - log.info("Killing task [%s] in the task group", taskId); - killTask(taskId); + killTask(taskId, reasonFormat, args); } } } @@ -855,7 +854,7 @@ public class KafkaSupervisor implements Supervisor for (TaskGroup taskGroup : taskGroups.values()) { for (Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) { if (taskInfoProvider.getTaskLocation(entry.getKey()).equals(TaskLocation.unknown())) { - killTask(entry.getKey()); + killTask(entry.getKey(), "Killing task for graceful shutdown"); } else { entry.getValue().startTime = DateTimes.EPOCH; } @@ -1236,8 +1235,7 @@ public class KafkaSupervisor implements Supervisor for (int i = 0; i < results.size(); i++) { if (results.get(i) == null) { String taskId = futureTaskIds.get(i); - log.warn("Task [%s] failed to return status, killing task", taskId); - killTask(taskId); + killTask(taskId, "Task [%s] failed to return status, killing task", taskId); } } log.debug("Found [%d] Kafka indexing tasks for dataSource [%s]", taskCount, dataSource); @@ -1297,7 +1295,7 @@ public class KafkaSupervisor implements Supervisor } catch (Exception e) { log.error(e, "Problem while getting checkpoints for task [%s], killing the task", taskId); - killTask(taskId); + killTask(taskId, "Exception[%s] while getting checkpoints", e.getClass()); taskGroup.tasks.remove(taskId); } } else if (checkpoints.isEmpty()) { @@ -1393,7 +1391,8 @@ public class KafkaSupervisor implements Supervisor taskSequences.stream().filter(taskIdSequences -> tasksToKill.contains(taskIdSequences.lhs)).forEach( sequenceCheckpoint -> { - log.warn( + killTask( + sequenceCheckpoint.lhs, "Killing task [%s], as its checkpoints [%s] are not consistent with group checkpoints[%s] or latest " + "persisted offsets in metadata store [%s]", sequenceCheckpoint.lhs, @@ -1401,7 +1400,6 @@ public class KafkaSupervisor implements Supervisor taskGroup.sequenceOffsets, latestOffsetsFromDb ); - killTask(sequenceCheckpoint.lhs); taskGroup.tasks.remove(sequenceCheckpoint.lhs); } ); @@ -1505,8 +1503,7 @@ public class KafkaSupervisor implements Supervisor // request threw an exception so kill the task if (results.get(i) == null) { String taskId = futureTaskIds.get(i); - log.warn("Task [%s] failed to return start time, killing task", taskId); - killTask(taskId); + killTask(taskId, "Task [%s] failed to return start time, killing task", taskId); } } } @@ -1553,13 +1550,12 @@ public class KafkaSupervisor implements Supervisor partitionGroups.get(groupId).put(entry.getKey(), entry.getValue()); } } else { - log.warn( - "All tasks in group [%s] failed to transition to publishing state, killing tasks [%s]", - groupId, - group.taskIds() - ); for (String id : group.taskIds()) { - killTask(id); + killTask( + id, + "All tasks in group [%s] failed to transition to publishing state", + groupId + ); } // clear partitionGroups, so that latest offsets from db is used as start offsets not the stale ones // if tasks did some successful incremental handoffs @@ -1589,7 +1585,7 @@ public class KafkaSupervisor implements Supervisor // metadata store (which will have advanced if we succeeded in publishing and will remain the same if // publishing failed and we need to re-ingest) return Futures.transform( - stopTasksInGroup(taskGroup), + stopTasksInGroup(taskGroup, "task[%s] succeeded in the taskGroup", task.status.getId()), new Function<Object, Map<Integer, Long>>() { @Nullable @@ -1604,8 +1600,7 @@ public class KafkaSupervisor implements Supervisor if (task.status.isRunnable()) { if (taskInfoProvider.getTaskLocation(taskId).equals(TaskLocation.unknown())) { - log.info("Killing task [%s] which hasn't been assigned to a worker", taskId); - killTask(taskId); + killTask(taskId, "Killing task [%s] which hasn't been assigned to a worker", taskId); i.remove(); } } @@ -1634,8 +1629,7 @@ public class KafkaSupervisor implements Supervisor if (result == null || result.isEmpty()) { // kill tasks that didn't return a value String taskId = pauseTaskIds.get(i); - log.warn("Task [%s] failed to respond to [pause] in a timely manner, killing task", taskId); - killTask(taskId); + killTask(taskId, "Task [%s] failed to respond to [pause] in a timely manner, killing task", taskId); taskGroup.tasks.remove(taskId); } else { // otherwise build a map of the highest offsets seen @@ -1683,8 +1677,11 @@ public class KafkaSupervisor implements Supervisor for (int i = 0; i < results.size(); i++) { if (results.get(i) == null || !results.get(i)) { String taskId = setEndOffsetTaskIds.get(i); - log.warn("Task [%s] failed to respond to [set end offsets] in a timely manner, killing task", taskId); - killTask(taskId); + killTask( + taskId, + "Task [%s] failed to respond to [set end offsets] in a timely manner, killing task", + taskId + ); taskGroup.tasks.remove(taskId); } } @@ -1730,7 +1727,12 @@ public class KafkaSupervisor implements Supervisor if (stopTasksInTaskGroup) { // One of the earlier groups that was handling the same partition set timed out before the segments were // published so stop any additional groups handling the same partition set that are pending completion. - futures.add(stopTasksInGroup(group)); + futures.add( + stopTasksInGroup( + group, + "one of earlier groups that was handling the same partition set timed out before publishing segments" + ) + ); toRemove.add(group); continue; } @@ -1755,8 +1757,9 @@ public class KafkaSupervisor implements Supervisor if (taskData.status.isSuccess()) { // If one of the pending completion tasks was successful, stop the rest of the tasks in the group as // we no longer need them to publish their segment. - log.info("Task [%s] completed successfully, stopping tasks %s", taskId, group.taskIds()); - futures.add(stopTasksInGroup(group)); + futures.add( + stopTasksInGroup(group, "Task [%s] completed successfully, stopping tasks %s", taskId, group.taskIds()) + ); foundSuccess = true; toRemove.add(group); // remove the TaskGroup from the list of pending completion task groups break; // skip iterating the rest of the tasks in this group as they've all been stopped now @@ -1778,12 +1781,20 @@ public class KafkaSupervisor implements Supervisor // reset partitions offsets for this task group so that they will be re-read from metadata storage partitionGroups.get(groupId).replaceAll((partition, offset) -> NOT_SET); // kill all the tasks in this pending completion group - killTasksInGroup(group); + killTasksInGroup( + group, + "No task in pending completion taskGroup[%d] succeeded before completion timeout elapsed", + groupId + ); // set a flag so the other pending completion groups for this set of partitions will also stop stopTasksInTaskGroup = true; // kill all the tasks in the currently reading task group and remove the bad task group - killTasksInGroup(taskGroups.remove(groupId)); + killTasksInGroup( + taskGroups.remove(groupId), + "No task in the corresponding pending completion taskGroup[%d] succeeded before completion timeout elapsed", + groupId + ); toRemove.add(group); } } @@ -1837,7 +1848,7 @@ public class KafkaSupervisor implements Supervisor // check for successful tasks, and if we find one, stop all tasks in the group and remove the group so it can // be recreated with the next set of offsets if (taskData.status.isSuccess()) { - futures.add(stopTasksInGroup(taskGroup)); + futures.add(stopTasksInGroup(taskGroup, "task[%s] succeeded in the same taskGroup", taskData.status.getId())); iTaskGroups.remove(); break; } @@ -2099,18 +2110,24 @@ public class KafkaSupervisor implements Supervisor } } - private ListenableFuture<?> stopTasksInGroup(@Nullable TaskGroup taskGroup) + private ListenableFuture<?> stopTasksInGroup(@Nullable TaskGroup taskGroup, String stopReasonFormat, Object... args) { if (taskGroup == null) { return Futures.immediateFuture(null); } + log.info( + "Stopping all tasks in taskGroup[%s] because: [%s]", + taskGroup.groupId, + StringUtils.format(stopReasonFormat, args) + ); + final List<ListenableFuture<Void>> futures = new ArrayList<>(); for (Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) { final String taskId = entry.getKey(); final TaskData taskData = entry.getValue(); if (taskData.status == null) { - killTask(taskId); + killTask(taskId, "Killing task since task status is not known to supervisor"); } else if (!taskData.status.isComplete()) { futures.add(stopTask(taskId, false)); } @@ -2129,8 +2146,7 @@ public class KafkaSupervisor implements Supervisor public Void apply(@Nullable Boolean result) { if (result == null || !result) { - log.info("Task [%s] failed to stop in a timely manner, killing task", id); - killTask(id); + killTask(id, "Task [%s] failed to stop in a timely manner, killing task", id); } return null; } @@ -2138,11 +2154,11 @@ public class KafkaSupervisor implements Supervisor ); } - private void killTask(final String id) + private void killTask(final String id, String reasonFormat, Object... args) { Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue(); if (taskQueue.isPresent()) { - taskQueue.get().shutdown(id); + taskQueue.get().shutdown(id, reasonFormat, args); } else { log.error("Failed to get task queue because I'm not the leader!"); } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 2379123..9b4a5a6 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -654,7 +654,7 @@ public class KafkaSupervisorTest extends EasyMockSupport expect(taskClient.stopAsync("id1", false)).andReturn(Futures.immediateFuture(true)); expect(taskClient.stopAsync("id3", false)).andReturn(Futures.immediateFuture(false)); taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); - taskQueue.shutdown("id3"); + taskQueue.shutdown("id3", "Task [%s] failed to stop in a timely manner, killing task", "id3"); expect(taskQueue.add(anyObject(Task.class))).andReturn(true); @@ -763,8 +763,8 @@ public class KafkaSupervisorTest extends EasyMockSupport .times(1); taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); - taskQueue.shutdown("id4"); - taskQueue.shutdown("id5"); + taskQueue.shutdown("id4", "Task [%s] failed to stop in a timely manner, killing task", "id4"); + taskQueue.shutdown("id5", "Task [%s] failed to stop in a timely manner, killing task", "id5"); replayAll(); supervisor.start(); @@ -1464,7 +1464,7 @@ public class KafkaSupervisorTest extends EasyMockSupport .andReturn(Futures.immediateFuture(KafkaIndexTask.Status.NOT_STARTED)); expect(taskClient.getStartTimeAsync(task.getId())) .andReturn(Futures.immediateFailedFuture(new RuntimeException())); - taskQueue.shutdown(task.getId()); + taskQueue.shutdown(task.getId(), "Task [%s] failed to return start time, killing task", task.getId()); } replay(taskStorage, taskClient, taskQueue); @@ -1535,7 +1535,11 @@ public class KafkaSupervisorTest extends EasyMockSupport .times(2); expect(taskClient.pauseAsync(EasyMock.contains("sequenceName-0"))) .andReturn(Futures.immediateFailedFuture(new RuntimeException())).times(2); - taskQueue.shutdown(EasyMock.contains("sequenceName-0")); + taskQueue.shutdown( + EasyMock.contains("sequenceName-0"), + EasyMock.eq("Task [%s] failed to respond to [pause] in a timely manner, killing task"), + EasyMock.contains("sequenceName-0") + ); expectLastCall().times(2); expect(taskQueue.add(capture(captured))).andReturn(true).times(2); @@ -1622,7 +1626,11 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.eq(true) ) ).andReturn(Futures.immediateFailedFuture(new RuntimeException())).times(2); - taskQueue.shutdown(EasyMock.contains("sequenceName-0")); + taskQueue.shutdown( + EasyMock.contains("sequenceName-0"), + EasyMock.eq("All tasks in group [%s] failed to transition to publishing state"), + EasyMock.eq(0) + ); expectLastCall().times(2); expect(taskQueue.add(capture(captured))).andReturn(true).times(2); @@ -1749,8 +1757,10 @@ public class KafkaSupervisorTest extends EasyMockSupport .andReturn(Futures.immediateFuture((Map<Integer, Long>) ImmutableMap.of(0, 15L, 1, 25L, 2, 30L))); expect(taskClient.setEndOffsetsAsync("id2", ImmutableMap.of(0, 15L, 1, 25L, 2, 30L), true)) .andReturn(Futures.immediateFuture(true)); - taskQueue.shutdown("id3"); - expectLastCall().times(2); + taskQueue.shutdown("id3", "Killing task for graceful shutdown"); + expectLastCall().times(1); + taskQueue.shutdown("id3", "Killing task [%s] which hasn't been assigned to a worker", "id3"); + expectLastCall().times(1); replay(taskRunner, taskClient, taskQueue); @@ -1950,8 +1960,8 @@ public class KafkaSupervisorTest extends EasyMockSupport reset(taskQueue, indexerMetadataStorageCoordinator); expect(indexerMetadataStorageCoordinator.deleteDataSourceMetadata(DATASOURCE)).andReturn(true); - taskQueue.shutdown("id2"); - taskQueue.shutdown("id3"); + taskQueue.shutdown("id2", "DataSourceMetadata is not found while reset"); + taskQueue.shutdown("id3", "DataSourceMetadata is not found while reset"); replay(taskQueue, indexerMetadataStorageCoordinator); supervisor.resetInternal(null); @@ -2036,9 +2046,9 @@ public class KafkaSupervisorTest extends EasyMockSupport reset(taskQueue, indexerMetadataStorageCoordinator); expect(indexerMetadataStorageCoordinator.deleteDataSourceMetadata(DATASOURCE)).andReturn(true); - taskQueue.shutdown("id1"); - taskQueue.shutdown("id2"); - taskQueue.shutdown("id3"); + taskQueue.shutdown("id1", "DataSourceMetadata is not found while reset"); + taskQueue.shutdown("id2", "DataSourceMetadata is not found while reset"); + taskQueue.shutdown("id3", "DataSourceMetadata is not found while reset"); replay(taskQueue, indexerMetadataStorageCoordinator); supervisor.resetInternal(null); @@ -2424,8 +2434,10 @@ public class KafkaSupervisorTest extends EasyMockSupport .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 15L, 1, 25L, 2, 30L))); expect(taskClient.setEndOffsetsAsync("id2", ImmutableMap.of(0, 15L, 1, 25L, 2, 30L), true)) .andReturn(Futures.immediateFuture(true)); - taskQueue.shutdown("id3"); - expectLastCall().times(2); + taskQueue.shutdown("id3", "Killing task for graceful shutdown"); + expectLastCall().times(1); + taskQueue.shutdown("id3", "Killing task [%s] which hasn't been assigned to a worker", "id3"); + expectLastCall().times(1); replayAll(); supervisor.start(); diff --git a/extensions-core/kafka-indexing-service/src/test/resources/log4j2.xml b/extensions-core/kafka-indexing-service/src/test/resources/log4j2.xml new file mode 100644 index 0000000..bca6c69 --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/test/resources/log4j2.xml @@ -0,0 +1,35 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> + +<Configuration status="WARN"> + <Appenders> + <Console name="Console" target="SYSTEM_OUT"> + <PatternLayout pattern="%d{ISO8601} %p [%t] %c - %m%n"/> + </Console> + </Appenders> + <Loggers> + <Root level="info"> + <AppenderRef ref="Console"/> + </Root> + <Logger level="debug" name="org.apache.druid" additivity="false"> + <AppenderRef ref="Console"/> + </Logger> + </Loggers> +</Configuration> diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java index 03af857..c3a0bc9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java @@ -583,8 +583,9 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer } @Override - public void shutdown(final String taskid) + public void shutdown(final String taskid, String reason) { + log.info("Shutdown [%s] because: [%s]", taskid, reason); final ForkingTaskRunnerWorkItem taskInfo; synchronized (tasks) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java index bde17ef..b83ea67 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java @@ -541,8 +541,9 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer * @param taskId - task id to shutdown */ @Override - public void shutdown(final String taskId) + public void shutdown(final String taskId, String reason) { + log.info("Shutdown [%s] because: [%s]", taskId, reason); if (!lifecycleLock.awaitStarted(1, TimeUnit.SECONDS)) { log.info("This TaskRunner is stopped or not yet started. Ignoring shutdown command for task: %s", taskId); } else if (pendingTasks.remove(taskId) != null) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java index a0fb43a..a9b6317 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java @@ -278,8 +278,9 @@ public class SingleTaskBackgroundRunner implements TaskRunner, QuerySegmentWalke * @param taskid task ID to clean up resources for */ @Override - public void shutdown(final String taskid) + public void shutdown(final String taskid, String reason) { + log.info("Shutdown [%s] because: [%s]", taskid, reason); if (runningItem != null && runningItem.getTask().getId().equals(taskid)) { runningItem.getResult().cancel(true); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index 082e157..9a9c41d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -252,7 +252,7 @@ public class TaskQueue } catch (Exception e) { log.warn(e, "Exception thrown during isReady for task: %s", task.getId()); - notifyStatus(task, TaskStatus.failure(task.getId())); + notifyStatus(task, TaskStatus.failure(task.getId()), "failed because of exception[%s]", e.getClass()); continue; } if (taskIsReady) { @@ -286,7 +286,11 @@ public class TaskQueue log.info("Asking taskRunner to clean up %,d tasks.", tasksToKill.size()); for (final String taskId : tasksToKill) { try { - taskRunner.shutdown(taskId); + taskRunner.shutdown( + taskId, + "task is not in runnerTaskFutures[%s]", + runnerTaskFutures.keySet() + ); } catch (Exception e) { log.warn(e, "TaskRunner failed to clean up task: %s", taskId); @@ -356,7 +360,7 @@ public class TaskQueue * * @param taskId task to kill */ - public void shutdown(final String taskId) + public void shutdown(final String taskId, String reasonFormat, Object... args) { giant.lock(); @@ -364,7 +368,7 @@ public class TaskQueue Preconditions.checkNotNull(taskId, "taskId"); for (final Task task : tasks) { if (task.getId().equals(taskId)) { - notifyStatus(task, TaskStatus.failure(taskId)); + notifyStatus(task, TaskStatus.failure(taskId), reasonFormat, args); break; } } @@ -386,7 +390,7 @@ public class TaskQueue * @throws IllegalArgumentException if the task ID does not match the status ID * @throws IllegalStateException if this queue is currently shut down */ - private void notifyStatus(final Task task, final TaskStatus taskStatus) + private void notifyStatus(final Task task, final TaskStatus taskStatus, String reasonFormat, Object... args) { giant.lock(); @@ -402,7 +406,7 @@ public class TaskQueue ); // Inform taskRunner that this task can be shut down try { - taskRunner.shutdown(task.getId()); + taskRunner.shutdown(task.getId(), reasonFormat, args); } catch (Exception e) { log.warn(e, "TaskRunner failed to cleanup task after completion: %s", task.getId()); @@ -493,7 +497,7 @@ public class TaskQueue return; } - notifyStatus(task, status); + notifyStatus(task, status, "notified status change from task"); // Emit event and log, if the task is done if (status.isComplete()) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java index 6d4c867..8159659 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java @@ -27,6 +27,7 @@ import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; import javax.annotation.Nullable; import java.util.Collection; @@ -81,8 +82,14 @@ public interface TaskRunner * currently-running tasks. * * @param taskid task ID to clean up resources for + * @param reason reason to kill this task */ - void shutdown(String taskid); + void shutdown(String taskid, String reason); + + default void shutdown(String taskid, String reasonFormat, Object... args) + { + shutdown(taskid, StringUtils.format(reasonFormat, args)); + } /** * Stop this task runner. This may block until currently-running tasks can be gracefully stopped. After calling diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java index 3d36000..bf96b10 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java @@ -1041,7 +1041,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer } @Override - public void shutdown(String taskId) + public void shutdown(String taskId, String reason) { if (!lifecycleLock.awaitStarted(1, TimeUnit.SECONDS)) { log.info("This TaskRunner is stopped or not yet started. Ignoring shutdown command for task: %s", taskId); @@ -1050,6 +1050,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer WorkerHolder workerHolderRunningTask = null; synchronized (statusLock) { + log.info("Shutdown [%s] because: [%s]", taskId, reason); HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem = tasks.remove(taskId); if (taskRunnerWorkItem != null) { if (taskRunnerWorkItem.getState() == HttpRemoteTaskRunnerWorkItem.State.RUNNING) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index a2be927..f0748f9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -330,7 +330,7 @@ public class OverlordResource @Override public Response apply(TaskQueue taskQueue) { - taskQueue.shutdown(taskid); + taskQueue.shutdown(taskid, "Shutdown request from user"); return Response.ok(ImmutableMap.of("task", taskid)).build(); } } @@ -351,7 +351,7 @@ public class OverlordResource { final List<TaskInfo<Task, TaskStatus>> tasks = taskStorageQueryAdapter.getActiveTaskInfo(dataSource); for (final TaskInfo<Task, TaskStatus> task : tasks) { - taskQueue.shutdown(task.getId()); + taskQueue.shutdown(task.getId(), "Shutdown request from user"); } return Response.ok(ImmutableMap.of("dataSource", dataSource)).build(); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/WorkerResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/WorkerResource.java index d37b2bf..bef73ce 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/WorkerResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/WorkerResource.java @@ -168,7 +168,7 @@ public class WorkerResource public Response doShutdown(@PathParam("taskid") String taskid) { try { - taskRunner.shutdown(taskid); + taskRunner.shutdown(taskid, "shut down request via HTTP endpoint"); } catch (Exception e) { log.error(e, "Failed to issue shutdown for task: %s", taskid); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerRunPendingTasksConcurrencyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerRunPendingTasksConcurrencyTest.java index 3aed5a5..4cc352f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerRunPendingTasksConcurrencyTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerRunPendingTasksConcurrencyTest.java @@ -101,19 +101,19 @@ public class RemoteTaskRunnerRunPendingTasksConcurrencyTest if (remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[2].getId()) && remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[3].getId())) { - remoteTaskRunner.shutdown("task4"); + remoteTaskRunner.shutdown("task4", "test"); mockWorkerRunningAndCompletionSuccessfulTasks(tasks[3], tasks[2]); Assert.assertEquals(TaskState.SUCCESS, results[3].get().getStatusCode()); Assert.assertEquals(TaskState.SUCCESS, results[2].get().getStatusCode()); } else if (remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[3].getId()) && remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[4].getId())) { - remoteTaskRunner.shutdown("task2"); + remoteTaskRunner.shutdown("task2", "test"); mockWorkerRunningAndCompletionSuccessfulTasks(tasks[4], tasks[3]); Assert.assertEquals(TaskState.SUCCESS, results[4].get().getStatusCode()); Assert.assertEquals(TaskState.SUCCESS, results[3].get().getStatusCode()); } else if (remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[4].getId()) && remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[2].getId())) { - remoteTaskRunner.shutdown("task3"); + remoteTaskRunner.shutdown("task3", "test"); mockWorkerRunningAndCompletionSuccessfulTasks(tasks[4], tasks[2]); Assert.assertEquals(TaskState.SUCCESS, results[4].get().getStatusCode()); Assert.assertEquals(TaskState.SUCCESS, results[2].get().getStatusCode()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java index fbc5b24..cb6cffe 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java @@ -236,7 +236,7 @@ public class TestTaskRunner implements TaskRunner, QuerySegmentWalker } @Override - public void shutdown(final String taskid) + public void shutdown(final String taskid, String reason) { for (final TaskRunnerWorkItem runningItem : runningItems) { if (runningItem.getTaskId().equals(taskid)) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java index 3a4c210..ecf2af4 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java @@ -408,7 +408,7 @@ public class OverlordTest } @Override - public void shutdown(String taskid) {} + public void shutdown(String taskid, String reason) {} @Override public synchronized Collection<? extends TaskRunnerWorkItem> getRunningTasks() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org