This is an automated email from the ASF dual-hosted git repository.

asdf2014 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new d738ce4  Enforce logging when killing a task (#6621)
d738ce4 is described below

commit d738ce4d2a4430cf95919e27b0eede171cbdf66c
Author: Jihoon Son <jihoon...@apache.org>
AuthorDate: Thu Nov 15 18:01:56 2018 -0800

    Enforce logging when killing a task (#6621)
    
    * Enforce logging when killing a task
    
    * fix test
    
    * address comment
    
    * address comment
---
 .../MaterializedViewSupervisor.java                |  4 +-
 .../indexing/kafka/supervisor/KafkaSupervisor.java | 96 +++++++++++++---------
 .../kafka/supervisor/KafkaSupervisorTest.java      | 42 ++++++----
 .../src/test/resources/log4j2.xml                  | 35 ++++++++
 .../druid/indexing/overlord/ForkingTaskRunner.java |  3 +-
 .../druid/indexing/overlord/RemoteTaskRunner.java  |  3 +-
 .../overlord/SingleTaskBackgroundRunner.java       |  3 +-
 .../apache/druid/indexing/overlord/TaskQueue.java  | 18 ++--
 .../apache/druid/indexing/overlord/TaskRunner.java |  9 +-
 .../overlord/hrtr/HttpRemoteTaskRunner.java        |  3 +-
 .../indexing/overlord/http/OverlordResource.java   |  4 +-
 .../druid/indexing/worker/http/WorkerResource.java |  2 +-
 ...teTaskRunnerRunPendingTasksConcurrencyTest.java |  6 +-
 .../druid/indexing/overlord/TestTaskRunner.java    |  2 +-
 .../druid/indexing/overlord/http/OverlordTest.java |  2 +-
 15 files changed, 155 insertions(+), 77 deletions(-)

diff --git 
a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
 
b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
index 2aa05e3..260caf0 100644
--- 
a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
+++ 
b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
@@ -346,7 +346,7 @@ public class MaterializedViewSupervisor implements 
Supervisor
           && 
!toBuildInterval.get(interval).equals(runningVersion.get(interval))
       ) {
         if (taskMaster.getTaskQueue().isPresent()) {
-          
taskMaster.getTaskQueue().get().shutdown(runningTasks.get(interval).getId());
+          
taskMaster.getTaskQueue().get().shutdown(runningTasks.get(interval).getId(), 
"version mismatch");
           runningTasks.remove(interval);
         }
       }
@@ -451,7 +451,7 @@ public class MaterializedViewSupervisor implements 
Supervisor
   {
     for (HadoopIndexTask task : runningTasks.values()) {
       if (taskMaster.getTaskQueue().isPresent()) {
-        taskMaster.getTaskQueue().get().shutdown(task.getId());
+        taskMaster.getTaskQueue().get().shutdown(task.getId(), "killing all 
tasks");
       }
     }
     runningTasks.clear();
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 6609b24..eac54be 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -747,7 +747,7 @@ public class KafkaSupervisor implements Supervisor
       // Reset everything
       boolean result = 
indexerMetadataStorageCoordinator.deleteDataSourceMetadata(dataSource);
       log.info("Reset dataSource[%s] - dataSource metadata entry deleted? 
[%s]", dataSource, result);
-      taskGroups.values().forEach(this::killTasksInGroup);
+      taskGroups.values().forEach(group -> killTasksInGroup(group, 
"DataSourceMetadata is not found while reset"));
       taskGroups.clear();
       partitionGroups.clear();
     } else if (!(dataSourceMetadata instanceof KafkaDataSourceMetadata)) {
@@ -811,7 +811,7 @@ public class KafkaSupervisor implements Supervisor
         if (metadataUpdateSuccess) {
           
resetKafkaMetadata.getKafkaPartitions().getPartitionOffsetMap().keySet().forEach(partition
 -> {
             final int groupId = getTaskGroupIdForPartition(partition);
-            killTaskGroupForPartitions(ImmutableSet.of(partition));
+            killTaskGroupForPartitions(ImmutableSet.of(partition), 
"DataSourceMetadata is updated while reset");
             taskGroups.remove(groupId);
             partitionGroups.get(groupId).replaceAll((partitionId, offset) -> 
NOT_SET);
           });
@@ -828,19 +828,18 @@ public class KafkaSupervisor implements Supervisor
     }
   }
 
-  private void killTaskGroupForPartitions(Set<Integer> partitions)
+  private void killTaskGroupForPartitions(Set<Integer> partitions, String 
reasonFormat, Object... args)
   {
     for (Integer partition : partitions) {
-      killTasksInGroup(taskGroups.get(getTaskGroupIdForPartition(partition)));
+      killTasksInGroup(taskGroups.get(getTaskGroupIdForPartition(partition)), 
reasonFormat, args);
     }
   }
 
-  private void killTasksInGroup(TaskGroup taskGroup)
+  private void killTasksInGroup(TaskGroup taskGroup, String reasonFormat, 
Object... args)
   {
     if (taskGroup != null) {
       for (String taskId : taskGroup.tasks.keySet()) {
-        log.info("Killing task [%s] in the task group", taskId);
-        killTask(taskId);
+        killTask(taskId, reasonFormat, args);
       }
     }
   }
@@ -855,7 +854,7 @@ public class KafkaSupervisor implements Supervisor
     for (TaskGroup taskGroup : taskGroups.values()) {
       for (Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
         if 
(taskInfoProvider.getTaskLocation(entry.getKey()).equals(TaskLocation.unknown()))
 {
-          killTask(entry.getKey());
+          killTask(entry.getKey(), "Killing task for graceful shutdown");
         } else {
           entry.getValue().startTime = DateTimes.EPOCH;
         }
@@ -1236,8 +1235,7 @@ public class KafkaSupervisor implements Supervisor
     for (int i = 0; i < results.size(); i++) {
       if (results.get(i) == null) {
         String taskId = futureTaskIds.get(i);
-        log.warn("Task [%s] failed to return status, killing task", taskId);
-        killTask(taskId);
+        killTask(taskId, "Task [%s] failed to return status, killing task", 
taskId);
       }
     }
     log.debug("Found [%d] Kafka indexing tasks for dataSource [%s]", 
taskCount, dataSource);
@@ -1297,7 +1295,7 @@ public class KafkaSupervisor implements Supervisor
           }
           catch (Exception e) {
             log.error(e, "Problem while getting checkpoints for task [%s], 
killing the task", taskId);
-            killTask(taskId);
+            killTask(taskId, "Exception[%s] while getting checkpoints", 
e.getClass());
             taskGroup.tasks.remove(taskId);
           }
         } else if (checkpoints.isEmpty()) {
@@ -1393,7 +1391,8 @@ public class KafkaSupervisor implements Supervisor
 
     taskSequences.stream().filter(taskIdSequences -> 
tasksToKill.contains(taskIdSequences.lhs)).forEach(
         sequenceCheckpoint -> {
-          log.warn(
+          killTask(
+              sequenceCheckpoint.lhs,
               "Killing task [%s], as its checkpoints [%s] are not consistent 
with group checkpoints[%s] or latest "
               + "persisted offsets in metadata store [%s]",
               sequenceCheckpoint.lhs,
@@ -1401,7 +1400,6 @@ public class KafkaSupervisor implements Supervisor
               taskGroup.sequenceOffsets,
               latestOffsetsFromDb
           );
-          killTask(sequenceCheckpoint.lhs);
           taskGroup.tasks.remove(sequenceCheckpoint.lhs);
         }
     );
@@ -1505,8 +1503,7 @@ public class KafkaSupervisor implements Supervisor
       // request threw an exception so kill the task
       if (results.get(i) == null) {
         String taskId = futureTaskIds.get(i);
-        log.warn("Task [%s] failed to return start time, killing task", 
taskId);
-        killTask(taskId);
+        killTask(taskId, "Task [%s] failed to return start time, killing 
task", taskId);
       }
     }
   }
@@ -1553,13 +1550,12 @@ public class KafkaSupervisor implements Supervisor
           partitionGroups.get(groupId).put(entry.getKey(), entry.getValue());
         }
       } else {
-        log.warn(
-            "All tasks in group [%s] failed to transition to publishing state, 
killing tasks [%s]",
-            groupId,
-            group.taskIds()
-        );
         for (String id : group.taskIds()) {
-          killTask(id);
+          killTask(
+              id,
+              "All tasks in group [%s] failed to transition to publishing 
state",
+              groupId
+          );
         }
         // clear partitionGroups, so that latest offsets from db is used as 
start offsets not the stale ones
         // if tasks did some successful incremental handoffs
@@ -1589,7 +1585,7 @@ public class KafkaSupervisor implements Supervisor
             // metadata store (which will have advanced if we succeeded in 
publishing and will remain the same if
             // publishing failed and we need to re-ingest)
             return Futures.transform(
-                stopTasksInGroup(taskGroup),
+                stopTasksInGroup(taskGroup, "task[%s] succeeded in the 
taskGroup", task.status.getId()),
                 new Function<Object, Map<Integer, Long>>()
                 {
                   @Nullable
@@ -1604,8 +1600,7 @@ public class KafkaSupervisor implements Supervisor
 
           if (task.status.isRunnable()) {
             if 
(taskInfoProvider.getTaskLocation(taskId).equals(TaskLocation.unknown())) {
-              log.info("Killing task [%s] which hasn't been assigned to a 
worker", taskId);
-              killTask(taskId);
+              killTask(taskId, "Killing task [%s] which hasn't been assigned 
to a worker", taskId);
               i.remove();
             }
           }
@@ -1634,8 +1629,7 @@ public class KafkaSupervisor implements Supervisor
 
               if (result == null || result.isEmpty()) { // kill tasks that 
didn't return a value
                 String taskId = pauseTaskIds.get(i);
-                log.warn("Task [%s] failed to respond to [pause] in a timely 
manner, killing task", taskId);
-                killTask(taskId);
+                killTask(taskId, "Task [%s] failed to respond to [pause] in a 
timely manner, killing task", taskId);
                 taskGroup.tasks.remove(taskId);
 
               } else { // otherwise build a map of the highest offsets seen
@@ -1683,8 +1677,11 @@ public class KafkaSupervisor implements Supervisor
               for (int i = 0; i < results.size(); i++) {
                 if (results.get(i) == null || !results.get(i)) {
                   String taskId = setEndOffsetTaskIds.get(i);
-                  log.warn("Task [%s] failed to respond to [set end offsets] 
in a timely manner, killing task", taskId);
-                  killTask(taskId);
+                  killTask(
+                      taskId,
+                      "Task [%s] failed to respond to [set end offsets] in a 
timely manner, killing task",
+                      taskId
+                  );
                   taskGroup.tasks.remove(taskId);
                 }
               }
@@ -1730,7 +1727,12 @@ public class KafkaSupervisor implements Supervisor
         if (stopTasksInTaskGroup) {
           // One of the earlier groups that was handling the same partition 
set timed out before the segments were
           // published so stop any additional groups handling the same 
partition set that are pending completion.
-          futures.add(stopTasksInGroup(group));
+          futures.add(
+              stopTasksInGroup(
+                  group,
+                  "one of earlier groups that was handling the same partition 
set timed out before publishing segments"
+              )
+          );
           toRemove.add(group);
           continue;
         }
@@ -1755,8 +1757,9 @@ public class KafkaSupervisor implements Supervisor
           if (taskData.status.isSuccess()) {
             // If one of the pending completion tasks was successful, stop the 
rest of the tasks in the group as
             // we no longer need them to publish their segment.
-            log.info("Task [%s] completed successfully, stopping tasks %s", 
taskId, group.taskIds());
-            futures.add(stopTasksInGroup(group));
+            futures.add(
+                stopTasksInGroup(group, "Task [%s] completed successfully, 
stopping tasks %s", taskId, group.taskIds())
+            );
             foundSuccess = true;
             toRemove.add(group); // remove the TaskGroup from the list of 
pending completion task groups
             break; // skip iterating the rest of the tasks in this group as 
they've all been stopped now
@@ -1778,12 +1781,20 @@ public class KafkaSupervisor implements Supervisor
           // reset partitions offsets for this task group so that they will be 
re-read from metadata storage
           partitionGroups.get(groupId).replaceAll((partition, offset) -> 
NOT_SET);
           // kill all the tasks in this pending completion group
-          killTasksInGroup(group);
+          killTasksInGroup(
+              group,
+              "No task in pending completion taskGroup[%d] succeeded before 
completion timeout elapsed",
+              groupId
+          );
           // set a flag so the other pending completion groups for this set of 
partitions will also stop
           stopTasksInTaskGroup = true;
 
           // kill all the tasks in the currently reading task group and remove 
the bad task group
-          killTasksInGroup(taskGroups.remove(groupId));
+          killTasksInGroup(
+              taskGroups.remove(groupId),
+              "No task in the corresponding pending completion taskGroup[%d] 
succeeded before completion timeout elapsed",
+              groupId
+          );
           toRemove.add(group);
         }
       }
@@ -1837,7 +1848,7 @@ public class KafkaSupervisor implements Supervisor
         // check for successful tasks, and if we find one, stop all tasks in 
the group and remove the group so it can
         // be recreated with the next set of offsets
         if (taskData.status.isSuccess()) {
-          futures.add(stopTasksInGroup(taskGroup));
+          futures.add(stopTasksInGroup(taskGroup, "task[%s] succeeded in the 
same taskGroup", taskData.status.getId()));
           iTaskGroups.remove();
           break;
         }
@@ -2099,18 +2110,24 @@ public class KafkaSupervisor implements Supervisor
     }
   }
 
-  private ListenableFuture<?> stopTasksInGroup(@Nullable TaskGroup taskGroup)
+  private ListenableFuture<?> stopTasksInGroup(@Nullable TaskGroup taskGroup, 
String stopReasonFormat, Object... args)
   {
     if (taskGroup == null) {
       return Futures.immediateFuture(null);
     }
 
+    log.info(
+        "Stopping all tasks in taskGroup[%s] because: [%s]",
+        taskGroup.groupId,
+        StringUtils.format(stopReasonFormat, args)
+    );
+
     final List<ListenableFuture<Void>> futures = new ArrayList<>();
     for (Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
       final String taskId = entry.getKey();
       final TaskData taskData = entry.getValue();
       if (taskData.status == null) {
-        killTask(taskId);
+        killTask(taskId, "Killing task since task status is not known to 
supervisor");
       } else if (!taskData.status.isComplete()) {
         futures.add(stopTask(taskId, false));
       }
@@ -2129,8 +2146,7 @@ public class KafkaSupervisor implements Supervisor
           public Void apply(@Nullable Boolean result)
           {
             if (result == null || !result) {
-              log.info("Task [%s] failed to stop in a timely manner, killing 
task", id);
-              killTask(id);
+              killTask(id, "Task [%s] failed to stop in a timely manner, 
killing task", id);
             }
             return null;
           }
@@ -2138,11 +2154,11 @@ public class KafkaSupervisor implements Supervisor
     );
   }
 
-  private void killTask(final String id)
+  private void killTask(final String id, String reasonFormat, Object... args)
   {
     Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
     if (taskQueue.isPresent()) {
-      taskQueue.get().shutdown(id);
+      taskQueue.get().shutdown(id, reasonFormat, args);
     } else {
       log.error("Failed to get task queue because I'm not the leader!");
     }
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 2379123..9b4a5a6 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -654,7 +654,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     expect(taskClient.stopAsync("id1", 
false)).andReturn(Futures.immediateFuture(true));
     expect(taskClient.stopAsync("id3", 
false)).andReturn(Futures.immediateFuture(false));
     taskRunner.registerListener(anyObject(TaskRunnerListener.class), 
anyObject(Executor.class));
-    taskQueue.shutdown("id3");
+    taskQueue.shutdown("id3", "Task [%s] failed to stop in a timely manner, 
killing task", "id3");
 
     expect(taskQueue.add(anyObject(Task.class))).andReturn(true);
 
@@ -763,8 +763,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
         .times(1);
 
     taskRunner.registerListener(anyObject(TaskRunnerListener.class), 
anyObject(Executor.class));
-    taskQueue.shutdown("id4");
-    taskQueue.shutdown("id5");
+    taskQueue.shutdown("id4", "Task [%s] failed to stop in a timely manner, 
killing task", "id4");
+    taskQueue.shutdown("id5", "Task [%s] failed to stop in a timely manner, 
killing task", "id5");
     replayAll();
 
     supervisor.start();
@@ -1464,7 +1464,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
           
.andReturn(Futures.immediateFuture(KafkaIndexTask.Status.NOT_STARTED));
       expect(taskClient.getStartTimeAsync(task.getId()))
           .andReturn(Futures.immediateFailedFuture(new RuntimeException()));
-      taskQueue.shutdown(task.getId());
+      taskQueue.shutdown(task.getId(), "Task [%s] failed to return start time, 
killing task", task.getId());
     }
     replay(taskStorage, taskClient, taskQueue);
 
@@ -1535,7 +1535,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
         .times(2);
     expect(taskClient.pauseAsync(EasyMock.contains("sequenceName-0")))
         .andReturn(Futures.immediateFailedFuture(new 
RuntimeException())).times(2);
-    taskQueue.shutdown(EasyMock.contains("sequenceName-0"));
+    taskQueue.shutdown(
+        EasyMock.contains("sequenceName-0"),
+        EasyMock.eq("Task [%s] failed to respond to [pause] in a timely 
manner, killing task"),
+        EasyMock.contains("sequenceName-0")
+    );
     expectLastCall().times(2);
     expect(taskQueue.add(capture(captured))).andReturn(true).times(2);
 
@@ -1622,7 +1626,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
             EasyMock.eq(true)
         )
     ).andReturn(Futures.immediateFailedFuture(new 
RuntimeException())).times(2);
-    taskQueue.shutdown(EasyMock.contains("sequenceName-0"));
+    taskQueue.shutdown(
+        EasyMock.contains("sequenceName-0"),
+        EasyMock.eq("All tasks in group [%s] failed to transition to 
publishing state"),
+        EasyMock.eq(0)
+    );
     expectLastCall().times(2);
     expect(taskQueue.add(capture(captured))).andReturn(true).times(2);
 
@@ -1749,8 +1757,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
         .andReturn(Futures.immediateFuture((Map<Integer, Long>) 
ImmutableMap.of(0, 15L, 1, 25L, 2, 30L)));
     expect(taskClient.setEndOffsetsAsync("id2", ImmutableMap.of(0, 15L, 1, 
25L, 2, 30L), true))
         .andReturn(Futures.immediateFuture(true));
-    taskQueue.shutdown("id3");
-    expectLastCall().times(2);
+    taskQueue.shutdown("id3", "Killing task for graceful shutdown");
+    expectLastCall().times(1);
+    taskQueue.shutdown("id3", "Killing task [%s] which hasn't been assigned to 
a worker", "id3");
+    expectLastCall().times(1);
 
     replay(taskRunner, taskClient, taskQueue);
 
@@ -1950,8 +1960,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
 
     reset(taskQueue, indexerMetadataStorageCoordinator);
     
expect(indexerMetadataStorageCoordinator.deleteDataSourceMetadata(DATASOURCE)).andReturn(true);
-    taskQueue.shutdown("id2");
-    taskQueue.shutdown("id3");
+    taskQueue.shutdown("id2", "DataSourceMetadata is not found while reset");
+    taskQueue.shutdown("id3", "DataSourceMetadata is not found while reset");
     replay(taskQueue, indexerMetadataStorageCoordinator);
 
     supervisor.resetInternal(null);
@@ -2036,9 +2046,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
 
     reset(taskQueue, indexerMetadataStorageCoordinator);
     
expect(indexerMetadataStorageCoordinator.deleteDataSourceMetadata(DATASOURCE)).andReturn(true);
-    taskQueue.shutdown("id1");
-    taskQueue.shutdown("id2");
-    taskQueue.shutdown("id3");
+    taskQueue.shutdown("id1", "DataSourceMetadata is not found while reset");
+    taskQueue.shutdown("id2", "DataSourceMetadata is not found while reset");
+    taskQueue.shutdown("id3", "DataSourceMetadata is not found while reset");
     replay(taskQueue, indexerMetadataStorageCoordinator);
 
     supervisor.resetInternal(null);
@@ -2424,8 +2434,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
         .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 15L, 1, 25L, 2, 
30L)));
     expect(taskClient.setEndOffsetsAsync("id2", ImmutableMap.of(0, 15L, 1, 
25L, 2, 30L), true))
         .andReturn(Futures.immediateFuture(true));
-    taskQueue.shutdown("id3");
-    expectLastCall().times(2);
+    taskQueue.shutdown("id3", "Killing task for graceful shutdown");
+    expectLastCall().times(1);
+    taskQueue.shutdown("id3", "Killing task [%s] which hasn't been assigned to 
a worker", "id3");
+    expectLastCall().times(1);
 
     replayAll();
     supervisor.start();
diff --git 
a/extensions-core/kafka-indexing-service/src/test/resources/log4j2.xml 
b/extensions-core/kafka-indexing-service/src/test/resources/log4j2.xml
new file mode 100644
index 0000000..bca6c69
--- /dev/null
+++ b/extensions-core/kafka-indexing-service/src/test/resources/log4j2.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<Configuration status="WARN">
+  <Appenders>
+    <Console name="Console" target="SYSTEM_OUT">
+      <PatternLayout pattern="%d{ISO8601} %p [%t] %c - %m%n"/>
+    </Console>
+  </Appenders>
+  <Loggers>
+    <Root level="info">
+      <AppenderRef ref="Console"/>
+    </Root>
+    <Logger level="debug" name="org.apache.druid" additivity="false">
+      <AppenderRef ref="Console"/>
+    </Logger>
+  </Loggers>
+</Configuration>
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
index 03af857..c3a0bc9 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
@@ -583,8 +583,9 @@ public class ForkingTaskRunner implements TaskRunner, 
TaskLogStreamer
   }
 
   @Override
-  public void shutdown(final String taskid)
+  public void shutdown(final String taskid, String reason)
   {
+    log.info("Shutdown [%s] because: [%s]", taskid, reason);
     final ForkingTaskRunnerWorkItem taskInfo;
 
     synchronized (tasks) {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
index bde17ef..b83ea67 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
@@ -541,8 +541,9 @@ public class RemoteTaskRunner implements WorkerTaskRunner, 
TaskLogStreamer
    * @param taskId - task id to shutdown
    */
   @Override
-  public void shutdown(final String taskId)
+  public void shutdown(final String taskId, String reason)
   {
+    log.info("Shutdown [%s] because: [%s]", taskId, reason);
     if (!lifecycleLock.awaitStarted(1, TimeUnit.SECONDS)) {
       log.info("This TaskRunner is stopped or not yet started. Ignoring 
shutdown command for task: %s", taskId);
     } else if (pendingTasks.remove(taskId) != null) {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
index a0fb43a..a9b6317 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
@@ -278,8 +278,9 @@ public class SingleTaskBackgroundRunner implements 
TaskRunner, QuerySegmentWalke
    * @param taskid task ID to clean up resources for
    */
   @Override
-  public void shutdown(final String taskid)
+  public void shutdown(final String taskid, String reason)
   {
+    log.info("Shutdown [%s] because: [%s]", taskid, reason);
     if (runningItem != null && runningItem.getTask().getId().equals(taskid)) {
       runningItem.getResult().cancel(true);
     }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index 082e157..9a9c41d 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -252,7 +252,7 @@ public class TaskQueue
               }
               catch (Exception e) {
                 log.warn(e, "Exception thrown during isReady for task: %s", 
task.getId());
-                notifyStatus(task, TaskStatus.failure(task.getId()));
+                notifyStatus(task, TaskStatus.failure(task.getId()), "failed 
because of exception[%s]", e.getClass());
                 continue;
               }
               if (taskIsReady) {
@@ -286,7 +286,11 @@ public class TaskQueue
           log.info("Asking taskRunner to clean up %,d tasks.", 
tasksToKill.size());
           for (final String taskId : tasksToKill) {
             try {
-              taskRunner.shutdown(taskId);
+              taskRunner.shutdown(
+                  taskId,
+                  "task is not in runnerTaskFutures[%s]",
+                  runnerTaskFutures.keySet()
+              );
             }
             catch (Exception e) {
               log.warn(e, "TaskRunner failed to clean up task: %s", taskId);
@@ -356,7 +360,7 @@ public class TaskQueue
    *
    * @param taskId task to kill
    */
-  public void shutdown(final String taskId)
+  public void shutdown(final String taskId, String reasonFormat, Object... 
args)
   {
     giant.lock();
 
@@ -364,7 +368,7 @@ public class TaskQueue
       Preconditions.checkNotNull(taskId, "taskId");
       for (final Task task : tasks) {
         if (task.getId().equals(taskId)) {
-          notifyStatus(task, TaskStatus.failure(taskId));
+          notifyStatus(task, TaskStatus.failure(taskId), reasonFormat, args);
           break;
         }
       }
@@ -386,7 +390,7 @@ public class TaskQueue
    * @throws IllegalArgumentException if the task ID does not match the status 
ID
    * @throws IllegalStateException    if this queue is currently shut down
    */
-  private void notifyStatus(final Task task, final TaskStatus taskStatus)
+  private void notifyStatus(final Task task, final TaskStatus taskStatus, 
String reasonFormat, Object... args)
   {
     giant.lock();
 
@@ -402,7 +406,7 @@ public class TaskQueue
       );
       // Inform taskRunner that this task can be shut down
       try {
-        taskRunner.shutdown(task.getId());
+        taskRunner.shutdown(task.getId(), reasonFormat, args);
       }
       catch (Exception e) {
         log.warn(e, "TaskRunner failed to cleanup task after completion: %s", 
task.getId());
@@ -493,7 +497,7 @@ public class TaskQueue
                 return;
               }
 
-              notifyStatus(task, status);
+              notifyStatus(task, status, "notified status change from task");
 
               // Emit event and log, if the task is done
               if (status.isComplete()) {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java
index 6d4c867..8159659 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java
@@ -27,6 +27,7 @@ import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
 import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
 
 import javax.annotation.Nullable;
 import java.util.Collection;
@@ -81,8 +82,14 @@ public interface TaskRunner
    * currently-running tasks.
    *
    * @param taskid task ID to clean up resources for
+   * @param reason reason to kill this task
    */
-  void shutdown(String taskid);
+  void shutdown(String taskid, String reason);
+
+  default void shutdown(String taskid, String reasonFormat, Object... args)
+  {
+    shutdown(taskid, StringUtils.format(reasonFormat, args));
+  }
 
   /**
    * Stop this task runner. This may block until currently-running tasks can 
be gracefully stopped. After calling
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index 3d36000..bf96b10 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -1041,7 +1041,7 @@ public class HttpRemoteTaskRunner implements 
WorkerTaskRunner, TaskLogStreamer
   }
 
   @Override
-  public void shutdown(String taskId)
+  public void shutdown(String taskId, String reason)
   {
     if (!lifecycleLock.awaitStarted(1, TimeUnit.SECONDS)) {
       log.info("This TaskRunner is stopped or not yet started. Ignoring 
shutdown command for task: %s", taskId);
@@ -1050,6 +1050,7 @@ public class HttpRemoteTaskRunner implements 
WorkerTaskRunner, TaskLogStreamer
 
     WorkerHolder workerHolderRunningTask = null;
     synchronized (statusLock) {
+      log.info("Shutdown [%s] because: [%s]", taskId, reason);
       HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem = tasks.remove(taskId);
       if (taskRunnerWorkItem != null) {
         if (taskRunnerWorkItem.getState() == 
HttpRemoteTaskRunnerWorkItem.State.RUNNING) {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
index a2be927..f0748f9 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
@@ -330,7 +330,7 @@ public class OverlordResource
           @Override
           public Response apply(TaskQueue taskQueue)
           {
-            taskQueue.shutdown(taskid);
+            taskQueue.shutdown(taskid, "Shutdown request from user");
             return Response.ok(ImmutableMap.of("task", taskid)).build();
           }
         }
@@ -351,7 +351,7 @@ public class OverlordResource
           {
             final List<TaskInfo<Task, TaskStatus>> tasks = 
taskStorageQueryAdapter.getActiveTaskInfo(dataSource);
             for (final TaskInfo<Task, TaskStatus> task : tasks) {
-              taskQueue.shutdown(task.getId());
+              taskQueue.shutdown(task.getId(), "Shutdown request from user");
             }
             return Response.ok(ImmutableMap.of("dataSource", 
dataSource)).build();
           }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/WorkerResource.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/WorkerResource.java
index d37b2bf..bef73ce 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/WorkerResource.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/WorkerResource.java
@@ -168,7 +168,7 @@ public class WorkerResource
   public Response doShutdown(@PathParam("taskid") String taskid)
   {
     try {
-      taskRunner.shutdown(taskid);
+      taskRunner.shutdown(taskid, "shut down request via HTTP endpoint");
     }
     catch (Exception e) {
       log.error(e, "Failed to issue shutdown for task: %s", taskid);
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerRunPendingTasksConcurrencyTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerRunPendingTasksConcurrencyTest.java
index 3aed5a5..4cc352f 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerRunPendingTasksConcurrencyTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerRunPendingTasksConcurrencyTest.java
@@ -101,19 +101,19 @@ public class 
RemoteTaskRunnerRunPendingTasksConcurrencyTest
 
     if 
(remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[2].getId())
         && 
remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[3].getId()))
 {
-      remoteTaskRunner.shutdown("task4");
+      remoteTaskRunner.shutdown("task4", "test");
       mockWorkerRunningAndCompletionSuccessfulTasks(tasks[3], tasks[2]);
       Assert.assertEquals(TaskState.SUCCESS, results[3].get().getStatusCode());
       Assert.assertEquals(TaskState.SUCCESS, results[2].get().getStatusCode());
     } else if 
(remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[3].getId())
                && 
remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[4].getId()))
 {
-      remoteTaskRunner.shutdown("task2");
+      remoteTaskRunner.shutdown("task2", "test");
       mockWorkerRunningAndCompletionSuccessfulTasks(tasks[4], tasks[3]);
       Assert.assertEquals(TaskState.SUCCESS, results[4].get().getStatusCode());
       Assert.assertEquals(TaskState.SUCCESS, results[3].get().getStatusCode());
     } else if 
(remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[4].getId())
                && 
remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[2].getId()))
 {
-      remoteTaskRunner.shutdown("task3");
+      remoteTaskRunner.shutdown("task3", "test");
       mockWorkerRunningAndCompletionSuccessfulTasks(tasks[4], tasks[2]);
       Assert.assertEquals(TaskState.SUCCESS, results[4].get().getStatusCode());
       Assert.assertEquals(TaskState.SUCCESS, results[2].get().getStatusCode());
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java
index fbc5b24..cb6cffe 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java
@@ -236,7 +236,7 @@ public class TestTaskRunner implements TaskRunner, 
QuerySegmentWalker
   }
 
   @Override
-  public void shutdown(final String taskid)
+  public void shutdown(final String taskid, String reason)
   {
     for (final TaskRunnerWorkItem runningItem : runningItems) {
       if (runningItem.getTaskId().equals(taskid)) {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
index 3a4c210..ecf2af4 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
@@ -408,7 +408,7 @@ public class OverlordTest
     }
 
     @Override
-    public void shutdown(String taskid) {}
+    public void shutdown(String taskid, String reason) {}
 
     @Override
     public synchronized Collection<? extends TaskRunnerWorkItem> 
getRunningTasks()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to