jon-wei closed pull request #6234: 'suspend' and 'resume' support for 
supervisors (kafka indexing service, materialized views)
URL: https://github.com/apache/incubator-druid/pull/6234
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/content/development/extensions-core/kafka-ingestion.md 
b/docs/content/development/extensions-core/kafka-ingestion.md
index 7fbacf4c587..7b17c46a270 100644
--- a/docs/content/development/extensions-core/kafka-ingestion.md
+++ b/docs/content/development/extensions-core/kafka-ingestion.md
@@ -194,17 +194,73 @@ existing publishing tasks and will create new tasks 
starting at the offsets the
 
 Seamless schema migrations can thus be achieved by simply submitting the new 
schema using this endpoint.
 
-#### Shutdown Supervisor
+#### Suspend Supervisor 
+
+```
+POST /druid/indexer/v1/supervisor/<supervisorId>/suspend
+```
+Suspend indexing tasks associated with a supervisor. Note that the supervisor 
itself will still be
+operating and emitting logs and metrics, it will just ensure that no indexing 
tasks are running until the supervisor
+is resumed. Responds with updated SupervisorSpec.
+
+#### Resume Supervisor 
+
+```
+POST /druid/indexer/v1/supervisor/<supervisorId>/resume
+```
+Resume indexing tasks for a supervisor. Responds with updated SupervisorSpec.
+
+#### Reset Supervisor
+```
+POST /druid/indexer/v1/supervisor/<supervisorId>/reset
+```
+The indexing service keeps track of the latest persisted Kafka offsets in 
order to provide exactly-once ingestion
+guarantees across tasks. Subsequent tasks must start reading from where the 
previous task completed in order for the
+generated segments to be accepted. If the messages at the expected starting 
offsets are no longer available in Kafka
+(typically because the message retention period has elapsed or the topic was 
removed and re-created) the supervisor will
+refuse to start and in-flight tasks will fail.
+
+This endpoint can be used to clear the stored offsets which will cause the 
supervisor to start reading from
+either the earliest or latest offsets in Kafka (depending on the value of 
`useEarliestOffset`). The supervisor must be
+running for this endpoint to be available. After the stored offsets are 
cleared, the supervisor will automatically kill
+and re-create any active tasks so that tasks begin reading from valid offsets.
+
+Note that since the stored offsets are necessary to guarantee exactly-once 
ingestion, resetting them with this endpoint
+may cause some Kafka messages to be skipped or to be read twice.
+
+#### Terminate Supervisor 
+```
+POST /druid/indexer/v1/supervisor/<supervisorId>/terminate
+```
+Terminate a supervisor and cause all associated indexing tasks managed by this 
supervisor to immediately stop and begin 
+publishing their segments. This supervisor will still exist in the metadata 
store and it's history may be retrieved 
+with the supervisor history api, but will not be listed in the 'get 
supervisors' api response nor can it's configuration
+or status report be retrieved. The only way this supervisor can start again is 
by submitting a functioning supervisor
+spec to the create api.
+
+#### Shutdown Supervisor 
+_Deprecated: use the equivalent 'terminate' instead_
 ```
 POST /druid/indexer/v1/supervisor/<supervisorId>/shutdown
 ```
-Note that this will cause all indexing tasks managed by this supervisor to 
immediately stop and begin publishing their segments.
 
 #### Get Supervisor IDs
 ```
 GET /druid/indexer/v1/supervisor
 ```
-Returns a list of the currently active supervisors.
+Returns a list of strings of the currently active supervisor ids.
+
+#### Get Supervisors
+```
+GET /druid/indexer/v1/supervisor?full
+```
+Returns a list of objects of the currently active supervisors.
+
+|Field|Type|Description|
+|---|---|---|
+|`id`|String|supervisor unique identifier|
+|`spec`|SupervisorSpec|json specification of supervisor (See Supervisor 
Configuration for details)|
+
 
 #### Get Supervisor Spec
 ```
@@ -233,24 +289,6 @@ GET /druid/indexer/v1/supervisor/<supervisorId>/history
 ```
 Returns an audit history of specs for the supervisor with the provided ID.
 
-#### Reset Supervisor
-```
-POST /druid/indexer/v1/supervisor/<supervisorId>/reset
-```
-The indexing service keeps track of the latest persisted Kafka offsets in 
order to provide exactly-once ingestion
-guarantees across tasks. Subsequent tasks must start reading from where the 
previous task completed in order for the
-generated segments to be accepted. If the messages at the expected starting 
offsets are no longer available in Kafka
-(typically because the message retention period has elapsed or the topic was 
removed and re-created) the supervisor will
-refuse to start and in-flight tasks will fail.
-
-This endpoint can be used to clear the stored offsets which will cause the 
supervisor to start reading from
-either the earliest or latest offsets in Kafka (depending on the value of 
`useEarliestOffset`). The supervisor must be
-running for this endpoint to be available. After the stored offsets are 
cleared, the supervisor will automatically kill
-and re-create any active tasks so that tasks begin reading from valid offsets.
-
-Note that since the stored offsets are necessary to guarantee exactly-once 
ingestion, resetting them with this endpoint
-may cause some Kafka messages to be skipped or to be read twice.
-
 ## Capacity Planning
 
 Kafka indexing tasks run on middle managers and are thus limited by the 
resources available in the middle manager
diff --git 
a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
 
b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
index 56afe3efd5e..d4e7f08d657 100644
--- 
a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
+++ 
b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
@@ -135,31 +135,7 @@ public void start()
       exec = 
MoreExecutors.listeningDecorator(Execs.scheduledSingleThreaded(supervisorId));
       final Duration delay = 
config.getTaskCheckDuration().toStandardDuration();
       future = exec.scheduleWithFixedDelay(
-          new Runnable() {
-            @Override
-            public void run()
-            {
-              try {
-                DataSourceMetadata metadata = 
metadataStorageCoordinator.getDataSourceMetadata(dataSource);
-                if (metadata instanceof DerivativeDataSourceMetadata 
-                    && 
spec.getBaseDataSource().equals(((DerivativeDataSourceMetadata) 
metadata).getBaseDataSource())
-                    && 
spec.getDimensions().equals(((DerivativeDataSourceMetadata) 
metadata).getDimensions())
-                    && 
spec.getMetrics().equals(((DerivativeDataSourceMetadata) 
metadata).getMetrics())) {
-                  checkSegmentsAndSubmitTasks();
-                } else {
-                  log.error(
-                      "Failed to start %s. Metadata in database(%s) is 
different from new dataSource metadata(%s)",
-                      supervisorId,
-                      metadata,
-                      spec
-                  );
-                }
-              }
-              catch (Exception e) {
-                log.makeAlert(e, StringUtils.format("uncaught exception in 
%s.", supervisorId)).emit();
-              }
-            }
-          },
+          MaterializedViewSupervisor.this::run,
           0,
           delay.getMillis(),
           TimeUnit.MILLISECONDS
@@ -167,7 +143,40 @@ public void run()
       started = true;
     }
   }
-  
+
+  @VisibleForTesting
+  public void run()
+  {
+    try {
+      if (spec.isSuspended()) {
+        log.info(
+            "Materialized view supervisor[%s:%s] is suspended",
+            spec.getId(),
+            spec.getDataSourceName()
+        );
+        return;
+      }
+
+      DataSourceMetadata metadata = 
metadataStorageCoordinator.getDataSourceMetadata(dataSource);
+      if (metadata instanceof DerivativeDataSourceMetadata
+          && spec.getBaseDataSource().equals(((DerivativeDataSourceMetadata) 
metadata).getBaseDataSource())
+          && spec.getDimensions().equals(((DerivativeDataSourceMetadata) 
metadata).getDimensions())
+          && spec.getMetrics().equals(((DerivativeDataSourceMetadata) 
metadata).getMetrics())) {
+        checkSegmentsAndSubmitTasks();
+      } else {
+        log.error(
+            "Failed to start %s. Metadata in database(%s) is different from 
new dataSource metadata(%s)",
+            supervisorId,
+            metadata,
+            spec
+        );
+      }
+    }
+    catch (Exception e) {
+      log.makeAlert(e, StringUtils.format("uncaught exception in %s.", 
supervisorId)).emit();
+    }
+  }
+
   @Override
   public void stop(boolean stopGracefully) 
   {
@@ -207,7 +216,8 @@ public SupervisorReport getStatus()
   {
     return new MaterializedViewSupervisorReport(
         dataSource,
-        DateTimes.nowUtc(), 
+        DateTimes.nowUtc(),
+        spec.isSuspended(),
         spec.getBaseDataSource(),
         spec.getDimensions(),
         spec.getMetrics(),
diff --git 
a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorReport.java
 
b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorReport.java
index 107354fe08f..f05f8e0f6d1 100644
--- 
a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorReport.java
+++ 
b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorReport.java
@@ -29,10 +29,11 @@
 
 public class MaterializedViewSupervisorReport extends SupervisorReport 
 {
-  
+
   public MaterializedViewSupervisorReport(
       String dataSource,
       DateTime generationTime,
+      boolean suspended,
       String baseDataSource,
       Set<String> dimensions,
       Set<String> metrics,
@@ -42,6 +43,7 @@ public MaterializedViewSupervisorReport(
     super(dataSource, generationTime, "{" +
         "dataSource='" + dataSource + '\'' +
         ", baseDataSource='" + baseDataSource + '\'' +
+        ", suspended='" + suspended + "\'" +
         ", dimensions=" + dimensions +
         ", metrics=" + metrics +
         ", missTimeline" + Sets.newHashSet(missTimeline) +
diff --git 
a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java
 
b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java
index b81d85e297a..29904a47635 100644
--- 
a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java
+++ 
b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java
@@ -81,6 +81,7 @@
   private final MaterializedViewTaskConfig config;
   private final AuthorizerMapper authorizerMapper;
   private final ChatHandlerProvider chatHandlerProvider;
+  private final boolean suspended;
   
   public MaterializedViewSupervisorSpec(
       @JsonProperty("baseDataSource") String baseDataSource,
@@ -92,6 +93,7 @@ public MaterializedViewSupervisorSpec(
       @JsonProperty("hadoopDependencyCoordinates") List<String> 
hadoopDependencyCoordinates,
       @JsonProperty("classpathPrefix") String classpathPrefix,
       @JsonProperty("context") Map<String, Object> context,
+      @JsonProperty("suspended") Boolean suspended,
       @JacksonInject ObjectMapper objectMapper,
       @JacksonInject TaskMaster taskMaster,
       @JacksonInject TaskStorage taskStorage,
@@ -139,7 +141,8 @@ public MaterializedViewSupervisorSpec(
     this.authorizerMapper = authorizerMapper;
     this.chatHandlerProvider = chatHandlerProvider;
     this.config = config;
-    
+    this.suspended = suspended != null ? suspended : false;
+
     this.metrics = Sets.newHashSet();
     for (AggregatorFactory aggregatorFactory : aggregators) {
       metrics.add(aggregatorFactory.getName());
@@ -305,7 +308,14 @@ public String getClasspathPrefix()
   {
     return context;
   }
-  
+
+  @Override
+  @JsonProperty("suspended")
+  public boolean isSuspended()
+  {
+    return suspended;
+  }
+
   @Override
   public String getId() 
   {
@@ -331,7 +341,59 @@ public Supervisor createSupervisor()
   {
     return ImmutableList.of(dataSourceName);
   }
-  
+
+  @Override
+  public SupervisorSpec createSuspendedSpec()
+  {
+    return new MaterializedViewSupervisorSpec(
+        baseDataSource,
+        dimensionsSpec,
+        aggregators,
+        tuningConfig,
+        dataSourceName,
+        hadoopCoordinates,
+        hadoopDependencyCoordinates,
+        classpathPrefix,
+        context,
+        true,
+        objectMapper,
+        taskMaster,
+        taskStorage,
+        metadataSupervisorManager,
+        segmentManager,
+        metadataStorageCoordinator,
+        config,
+        authorizerMapper,
+        chatHandlerProvider
+    );
+  }
+
+  @Override
+  public SupervisorSpec createRunningSpec()
+  {
+    return new MaterializedViewSupervisorSpec(
+        baseDataSource,
+        dimensionsSpec,
+        aggregators,
+        tuningConfig,
+        dataSourceName,
+        hadoopCoordinates,
+        hadoopDependencyCoordinates,
+        classpathPrefix,
+        context,
+        false,
+        objectMapper,
+        taskMaster,
+        taskStorage,
+        metadataSupervisorManager,
+        segmentManager,
+        metadataStorageCoordinator,
+        config,
+        authorizerMapper,
+        chatHandlerProvider
+    );
+  }
+
   @Override
   public String toString()
   {
diff --git 
a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java
 
b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java
index b9c816083f8..40da151f1d4 100644
--- 
a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java
+++ 
b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java
@@ -75,6 +75,7 @@ public void setup()
             .addValue(ChatHandlerProvider.class, new NoopChatHandlerProvider())
     );
   }
+
   @Test
   public void testSupervisorSerialization() throws IOException 
   {
@@ -132,6 +133,7 @@ public void testSupervisorSerialization() throws IOException
         null,
         null,
         null,
+        false,
         objectMapper,
         null,
         null,
@@ -150,6 +152,51 @@ public void testSupervisorSerialization() throws 
IOException
     Assert.assertEquals(expected.getMetrics(), spec.getMetrics());
   }
 
+  @Test
+  public void testSuspendResuume() throws IOException
+  {
+    String supervisorStr = "{\n" +
+                           "  \"type\" : \"derivativeDataSource\",\n" +
+                           "  \"baseDataSource\": \"wikiticker\",\n" +
+                           "  \"dimensionsSpec\":{\n" +
+                           "            \"dimensions\" : [\n" +
+                           "              \"isUnpatrolled\",\n" +
+                           "              \"metroCode\",\n" +
+                           "              \"namespace\",\n" +
+                           "              \"page\",\n" +
+                           "              \"regionIsoCode\",\n" +
+                           "              \"regionName\",\n" +
+                           "              \"user\"\n" +
+                           "            ]\n" +
+                           "          },\n" +
+                           "    \"metricsSpec\" : [\n" +
+                           "        {\n" +
+                           "          \"name\" : \"count\",\n" +
+                           "          \"type\" : \"count\"\n" +
+                           "        },\n" +
+                           "        {\n" +
+                           "          \"name\" : \"added\",\n" +
+                           "          \"type\" : \"longSum\",\n" +
+                           "          \"fieldName\" : \"added\"\n" +
+                           "        }\n" +
+                           "      ],\n" +
+                           "  \"tuningConfig\": {\n" +
+                           "      \"type\" : \"hadoop\"\n" +
+                           "  }\n" +
+                           "}";
+
+    MaterializedViewSupervisorSpec spec = 
objectMapper.readValue(supervisorStr, MaterializedViewSupervisorSpec.class);
+    Assert.assertFalse(spec.isSuspended());
+
+    String suspendedSerialized = 
objectMapper.writeValueAsString(spec.createSuspendedSpec());
+    MaterializedViewSupervisorSpec suspendedSpec = 
objectMapper.readValue(suspendedSerialized, 
MaterializedViewSupervisorSpec.class);
+    Assert.assertTrue(suspendedSpec.isSuspended());
+
+    String runningSerialized = 
objectMapper.writeValueAsString(spec.createRunningSpec());
+    MaterializedViewSupervisorSpec runningSpec = 
objectMapper.readValue(runningSerialized, MaterializedViewSupervisorSpec.class);
+    Assert.assertFalse(runningSpec.isSuspended());
+  }
+
   @Test
   public void testEmptyBaseDataSource() throws Exception
   {
@@ -182,6 +229,7 @@ public void testEmptyBaseDataSource() throws Exception
         null,
         null,
         null,
+        false,
         objectMapper,
         null,
         null,
@@ -226,6 +274,7 @@ public void testNullBaseDataSource() throws Exception
         null,
         null,
         null,
+        false,
         objectMapper,
         null,
         null,
diff --git 
a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
 
b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
index a48f76fa9a4..4ece1f0c926 100644
--- 
a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
+++ 
b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
@@ -47,6 +47,9 @@
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
 import static org.easymock.EasyMock.expect;
+
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
 import org.joda.time.Interval;
 import org.junit.Assert;
 import org.junit.Before;
@@ -109,6 +112,7 @@ public void setUp() throws IOException
         null,
         null,
         null,
+        false,
         objectMapper,
         taskMaster,
         taskStorage,
@@ -121,9 +125,9 @@ public void setUp() throws IOException
     );
     supervisor = (MaterializedViewSupervisor) spec.createSupervisor();
   }
-  
+
   @Test
-  public void testCheckSegments() throws IOException 
+  public void testCheckSegments() throws IOException
   {
     Set<DataSegment> baseSegments = Sets.newHashSet(
         new DataSegment(
@@ -156,7 +160,7 @@ public void testCheckSegments() throws IOException
     Pair<SortedMap<Interval, String>, Map<Interval, List<DataSegment>>> 
toBuildInterval = supervisor.checkSegments();
     Map<Interval, List<DataSegment>> expectedSegments = Maps.newHashMap();
     expectedSegments.put(
-        Intervals.of("2015-01-01T00Z/2015-01-02T00Z"), 
+        Intervals.of("2015-01-01T00Z/2015-01-02T00Z"),
         Collections.singletonList(
             new DataSegment(
                 "base",
@@ -173,4 +177,43 @@ public void testCheckSegments() throws IOException
     );
     Assert.assertEquals(expectedSegments, toBuildInterval.rhs);
   }
+
+
+  @Test
+  public void testSuspendedDoesntRun() throws IOException
+  {
+    MaterializedViewSupervisorSpec suspended = new 
MaterializedViewSupervisorSpec(
+        "base",
+        new DimensionsSpec(Collections.singletonList(new 
StringDimensionSchema("dim")), null, null),
+        new AggregatorFactory[]{new LongSumAggregatorFactory("m1", "m1")},
+        HadoopTuningConfig.makeDefaultTuningConfig(),
+        null,
+        null,
+        null,
+        null,
+        null,
+        true,
+        objectMapper,
+        taskMaster,
+        taskStorage,
+        metadataSupervisorManager,
+        sqlMetadataSegmentManager,
+        indexerMetadataStorageCoordinator,
+        new MaterializedViewTaskConfig(),
+        createMock(AuthorizerMapper.class),
+        createMock(ChatHandlerProvider.class)
+    );
+    MaterializedViewSupervisor supervisor = (MaterializedViewSupervisor) 
suspended.createSupervisor();
+
+    // mock IndexerSQLMetadataStorageCoordinator to ensure that 
getDataSourceMetadata is not called
+    // which will be true if truly suspended, since this is the first 
operation of the 'run' method otherwise
+    IndexerSQLMetadataStorageCoordinator mock = 
createMock(IndexerSQLMetadataStorageCoordinator.class);
+    
expect(mock.getDataSourceMetadata(suspended.getDataSourceName())).andAnswer((IAnswer)
 () -> {
+      Assert.fail();
+      return null;
+    }).anyTimes();
+
+    EasyMock.replay(mock);
+    supervisor.run();
+  }
 }
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 78652a70b91..5087eef33c2 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -309,14 +309,8 @@ public TaskLocation getTaskLocation(final String id)
         Optional<TaskRunner> taskRunner = taskMaster.getTaskRunner();
         if (taskRunner.isPresent()) {
           Optional<? extends TaskRunnerWorkItem> item = Iterables.tryFind(
-              taskRunner.get().getRunningTasks(), new 
Predicate<TaskRunnerWorkItem>()
-              {
-                @Override
-                public boolean apply(TaskRunnerWorkItem taskRunnerWorkItem)
-                {
-                  return id.equals(taskRunnerWorkItem.getTaskId());
-                }
-              }
+              taskRunner.get().getRunningTasks(),
+              (Predicate<TaskRunnerWorkItem>) taskRunnerWorkItem -> 
id.equals(taskRunnerWorkItem.getTaskId())
           );
 
           if (item.isPresent()) {
@@ -372,29 +366,24 @@ public void start()
         consumer = getKafkaConsumer();
 
         exec.submit(
-            new Runnable()
-            {
-              @Override
-              public void run()
-              {
-                try {
-                  while (!Thread.currentThread().isInterrupted()) {
-                    final Notice notice = notices.take();
-
-                    try {
-                      notice.handle();
-                    }
-                    catch (Throwable e) {
-                      log.makeAlert(e, "KafkaSupervisor[%s] failed to handle 
notice", dataSource)
-                         .addData("noticeClass", 
notice.getClass().getSimpleName())
-                         .emit();
-                    }
+            () -> {
+              try {
+                while (!Thread.currentThread().isInterrupted()) {
+                  final Notice notice = notices.take();
+
+                  try {
+                    notice.handle();
+                  }
+                  catch (Throwable e) {
+                    log.makeAlert(e, "KafkaSupervisor[%s] failed to handle 
notice", dataSource)
+                       .addData("noticeClass", 
notice.getClass().getSimpleName())
+                       .emit();
                   }
-                }
-                catch (InterruptedException e) {
-                  log.info("KafkaSupervisor[%s] interrupted, exiting", 
dataSource);
                 }
               }
+              catch (InterruptedException e) {
+                log.info("KafkaSupervisor[%s] interrupted, exiting", 
dataSource);
+              }
             }
         );
         firstRunTime = DateTimes.nowUtc().plus(ioConfig.getStartDelay());
@@ -898,7 +887,16 @@ void runInternal() throws ExecutionException, 
InterruptedException, TimeoutExcep
     checkTaskDuration();
     checkPendingCompletionTasks();
     checkCurrentTaskState();
-    createNewTasks();
+
+    // if supervisor is not suspended, ensure required tasks are running
+    // if suspended, ensure tasks have been requested to gracefully stop
+    if (!spec.isSuspended()) {
+      log.info("[%s] supervisor is running.", dataSource);
+      createNewTasks();
+    } else {
+      log.info("[%s] supervisor is suspended.", dataSource);
+      gracefulShutdownInternal();
+    }
 
     if (log.isDebugEnabled()) {
       log.debug(generateReport(true).toString());
@@ -2096,7 +2094,8 @@ private boolean isTaskInPendingCompletionGroups(String 
taskId)
         includeOffsets ? latestOffsetsFromKafka : null,
         includeOffsets ? partitionLag : null,
         includeOffsets ? partitionLag.values().stream().mapToLong(x -> 
Math.max(x, 0)).sum() : null,
-        includeOffsets ? offsetsLastUpdated : null
+        includeOffsets ? offsetsLastUpdated : null,
+        spec.isSuspended()
     );
     SupervisorReport<KafkaSupervisorReportPayload> report = new 
SupervisorReport<>(
         dataSource,
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java
index 2a5a4829b5a..d9533a37fb2 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java
@@ -42,6 +42,7 @@
   private final Map<Integer, Long> minimumLag;
   private final Long aggregateLag;
   private final DateTime offsetsLastUpdated;
+  private final boolean suspended;
 
   public KafkaSupervisorReportPayload(
       String dataSource,
@@ -52,7 +53,8 @@ public KafkaSupervisorReportPayload(
       @Nullable Map<Integer, Long> latestOffsets,
       @Nullable Map<Integer, Long> minimumLag,
       @Nullable Long aggregateLag,
-      @Nullable DateTime offsetsLastUpdated
+      @Nullable DateTime offsetsLastUpdated,
+      boolean suspended
   )
   {
     this.dataSource = dataSource;
@@ -66,6 +68,7 @@ public KafkaSupervisorReportPayload(
     this.minimumLag = minimumLag;
     this.aggregateLag = aggregateLag;
     this.offsetsLastUpdated = offsetsLastUpdated;
+    this.suspended = suspended;
   }
 
   public void addTask(TaskReportData data)
@@ -148,6 +151,12 @@ public DateTime getOffsetsLastUpdated()
     return offsetsLastUpdated;
   }
 
+  @JsonProperty
+  public boolean getSuspended()
+  {
+    return suspended;
+  }
+
   @Override
   public String toString()
   {
@@ -163,6 +172,7 @@ public String toString()
            (minimumLag != null ? ", minimumLag=" + minimumLag : "") +
            (aggregateLag != null ? ", aggregateLag=" + aggregateLag : "") +
            (offsetsLastUpdated != null ? ", offsetsLastUpdated=" + 
offsetsLastUpdated : "") +
+           ", suspended=" + suspended +
            '}';
   }
 }
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
index fc620ea61ac..28dab02c56e 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
@@ -55,6 +55,7 @@
   private final ServiceEmitter emitter;
   private final DruidMonitorSchedulerConfig monitorSchedulerConfig;
   private final RowIngestionMetersFactory rowIngestionMetersFactory;
+  private final boolean suspended;
 
   @JsonCreator
   public KafkaSupervisorSpec(
@@ -62,6 +63,7 @@ public KafkaSupervisorSpec(
       @JsonProperty("tuningConfig") KafkaSupervisorTuningConfig tuningConfig,
       @JsonProperty("ioConfig") KafkaSupervisorIOConfig ioConfig,
       @JsonProperty("context") Map<String, Object> context,
+      @JsonProperty("suspended") Boolean suspended,
       @JacksonInject TaskStorage taskStorage,
       @JacksonInject TaskMaster taskMaster,
       @JacksonInject IndexerMetadataStorageCoordinator 
indexerMetadataStorageCoordinator,
@@ -111,6 +113,7 @@ public KafkaSupervisorSpec(
     this.emitter = emitter;
     this.monitorSchedulerConfig = monitorSchedulerConfig;
     this.rowIngestionMetersFactory = rowIngestionMetersFactory;
+    this.suspended = suspended != null ? suspended : false;
   }
 
   @JsonProperty
@@ -137,6 +140,13 @@ public KafkaSupervisorIOConfig getIoConfig()
     return context;
   }
 
+  @Override
+  @JsonProperty("suspended")
+  public boolean isSuspended()
+  {
+    return suspended;
+  }
+
   public ServiceEmitter getEmitter()
   {
     return emitter;
@@ -182,4 +192,35 @@ public String toString()
            ", ioConfig=" + ioConfig +
            '}';
   }
+
+  @Override
+  public KafkaSupervisorSpec createSuspendedSpec()
+  {
+    return toggleSuspend(true);
+  }
+
+  @Override
+  public KafkaSupervisorSpec createRunningSpec()
+  {
+    return toggleSuspend(false);
+  }
+
+  private KafkaSupervisorSpec toggleSuspend(boolean suspend)
+  {
+    return new KafkaSupervisorSpec(
+        dataSchema,
+        tuningConfig,
+        ioConfig,
+        context,
+        suspend,
+        taskStorage,
+        taskMaster,
+        indexerMetadataStorageCoordinator,
+        kafkaIndexTaskClientFactory,
+        mapper,
+        emitter,
+        monitorSchedulerConfig,
+        rowIngestionMetersFactory
+    );
+  }
 }
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
new file mode 100644
index 00000000000..b4c6dfd2997
--- /dev/null
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka.supervisor;
+
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
+import org.apache.druid.indexing.kafka.KafkaIndexTaskClientFactory;
+import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.indexing.overlord.TaskMaster;
+import org.apache.druid.indexing.overlord.TaskStorage;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
+import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class KafkaSupervisorSpecTest
+{
+  private final ObjectMapper mapper;
+
+  public KafkaSupervisorSpecTest()
+  {
+    mapper = new DefaultObjectMapper();
+    mapper.setInjectableValues(
+        new InjectableValues.Std()
+            .addValue(TaskStorage.class, null)
+            .addValue(TaskMaster.class, null)
+            .addValue(IndexerMetadataStorageCoordinator.class, null)
+            .addValue(KafkaIndexTaskClientFactory.class, null)
+            .addValue(ObjectMapper.class, mapper)
+            .addValue(ServiceEmitter.class, new NoopServiceEmitter())
+            .addValue(DruidMonitorSchedulerConfig.class, null)
+            .addValue(RowIngestionMetersFactory.class, null)
+            .addValue(ExprMacroTable.class.getName(), 
LookupEnabledTestExprMacroTable.INSTANCE)
+    );
+    mapper.registerModules((Iterable<Module>) new 
KafkaIndexTaskModule().getJacksonModules());
+  }
+
+  @Test
+  public void testSerde() throws IOException
+  {
+    String json = "{\n"
+                  + "  \"type\": \"kafka\",\n"
+                  + "  \"dataSchema\": {\n"
+                  + "    \"dataSource\": \"metrics-kafka\",\n"
+                  + "    \"parser\": {\n"
+                  + "      \"type\": \"string\",\n"
+                  + "      \"parseSpec\": {\n"
+                  + "        \"format\": \"json\",\n"
+                  + "        \"timestampSpec\": {\n"
+                  + "          \"column\": \"timestamp\",\n"
+                  + "          \"format\": \"auto\"\n"
+                  + "        },\n"
+                  + "        \"dimensionsSpec\": {\n"
+                  + "          \"dimensions\": [],\n"
+                  + "          \"dimensionExclusions\": [\n"
+                  + "            \"timestamp\",\n"
+                  + "            \"value\"\n"
+                  + "          ]\n"
+                  + "        }\n"
+                  + "      }\n"
+                  + "    },\n"
+                  + "    \"metricsSpec\": [\n"
+                  + "      {\n"
+                  + "        \"name\": \"count\",\n"
+                  + "        \"type\": \"count\"\n"
+                  + "      },\n"
+                  + "      {\n"
+                  + "        \"name\": \"value_sum\",\n"
+                  + "        \"fieldName\": \"value\",\n"
+                  + "        \"type\": \"doubleSum\"\n"
+                  + "      },\n"
+                  + "      {\n"
+                  + "        \"name\": \"value_min\",\n"
+                  + "        \"fieldName\": \"value\",\n"
+                  + "        \"type\": \"doubleMin\"\n"
+                  + "      },\n"
+                  + "      {\n"
+                  + "        \"name\": \"value_max\",\n"
+                  + "        \"fieldName\": \"value\",\n"
+                  + "        \"type\": \"doubleMax\"\n"
+                  + "      }\n"
+                  + "    ],\n"
+                  + "    \"granularitySpec\": {\n"
+                  + "      \"type\": \"uniform\",\n"
+                  + "      \"segmentGranularity\": \"HOUR\",\n"
+                  + "      \"queryGranularity\": \"NONE\"\n"
+                  + "    }\n"
+                  + "  },\n"
+                  + "  \"ioConfig\": {\n"
+                  + "    \"topic\": \"metrics\",\n"
+                  + "    \"consumerProperties\": {\n"
+                  + "      \"bootstrap.servers\": \"localhost:9092\"\n"
+                  + "    },\n"
+                  + "    \"taskCount\": 1\n"
+                  + "  }\n"
+                  + "}";
+    KafkaSupervisorSpec spec = mapper.readValue(json, 
KafkaSupervisorSpec.class);
+
+    Assert.assertNotNull(spec);
+    Assert.assertNotNull(spec.getDataSchema());
+    Assert.assertEquals(4, spec.getDataSchema().getAggregators().length);
+    Assert.assertNotNull(spec.getIoConfig());
+    Assert.assertEquals("metrics", spec.getIoConfig().getTopic());
+    Assert.assertNotNull(spec.getTuningConfig());
+    Assert.assertNull(spec.getContext());
+    Assert.assertFalse(spec.isSuspended());
+    String serialized = mapper.writeValueAsString(spec);
+
+    // expect default values populated in reserialized string
+    Assert.assertTrue(serialized.contains("\"tuningConfig\":{"));
+    Assert.assertTrue(serialized.contains("\"indexSpec\":{"));
+    Assert.assertTrue(serialized.contains("\"suspended\":false"));
+
+    KafkaSupervisorSpec spec2 = mapper.readValue(serialized, 
KafkaSupervisorSpec.class);
+
+    String stable = mapper.writeValueAsString(spec2);
+
+    Assert.assertEquals(serialized, stable);
+  }
+
+  @Test
+  public void testSuspendResume() throws IOException
+  {
+    String json = "{\n"
+                  + "  \"type\": \"kafka\",\n"
+                  + "  \"dataSchema\": {\n"
+                  + "    \"dataSource\": \"metrics-kafka\",\n"
+                  + "    \"parser\": {\n"
+                  + "      \"type\": \"string\",\n"
+                  + "      \"parseSpec\": {\n"
+                  + "        \"format\": \"json\",\n"
+                  + "        \"timestampSpec\": {\n"
+                  + "          \"column\": \"timestamp\",\n"
+                  + "          \"format\": \"auto\"\n"
+                  + "        },\n"
+                  + "        \"dimensionsSpec\": {\n"
+                  + "          \"dimensions\": [],\n"
+                  + "          \"dimensionExclusions\": [\n"
+                  + "            \"timestamp\",\n"
+                  + "            \"value\"\n"
+                  + "          ]\n"
+                  + "        }\n"
+                  + "      }\n"
+                  + "    },\n"
+                  + "    \"metricsSpec\": [\n"
+                  + "      {\n"
+                  + "        \"name\": \"count\",\n"
+                  + "        \"type\": \"count\"\n"
+                  + "      },\n"
+                  + "      {\n"
+                  + "        \"name\": \"value_sum\",\n"
+                  + "        \"fieldName\": \"value\",\n"
+                  + "        \"type\": \"doubleSum\"\n"
+                  + "      },\n"
+                  + "      {\n"
+                  + "        \"name\": \"value_min\",\n"
+                  + "        \"fieldName\": \"value\",\n"
+                  + "        \"type\": \"doubleMin\"\n"
+                  + "      },\n"
+                  + "      {\n"
+                  + "        \"name\": \"value_max\",\n"
+                  + "        \"fieldName\": \"value\",\n"
+                  + "        \"type\": \"doubleMax\"\n"
+                  + "      }\n"
+                  + "    ],\n"
+                  + "    \"granularitySpec\": {\n"
+                  + "      \"type\": \"uniform\",\n"
+                  + "      \"segmentGranularity\": \"HOUR\",\n"
+                  + "      \"queryGranularity\": \"NONE\"\n"
+                  + "    }\n"
+                  + "  },\n"
+                  + "  \"ioConfig\": {\n"
+                  + "    \"topic\": \"metrics\",\n"
+                  + "    \"consumerProperties\": {\n"
+                  + "      \"bootstrap.servers\": \"localhost:9092\"\n"
+                  + "    },\n"
+                  + "    \"taskCount\": 1\n"
+                  + "  }\n"
+                  + "}";
+    KafkaSupervisorSpec spec = mapper.readValue(json, 
KafkaSupervisorSpec.class);
+
+    Assert.assertNotNull(spec);
+    Assert.assertNotNull(spec.getDataSchema());
+    Assert.assertEquals(4, spec.getDataSchema().getAggregators().length);
+    Assert.assertNotNull(spec.getIoConfig());
+    Assert.assertEquals("metrics", spec.getIoConfig().getTopic());
+    Assert.assertNotNull(spec.getTuningConfig());
+    Assert.assertNull(spec.getContext());
+    Assert.assertFalse(spec.isSuspended());
+
+    String suspendedSerialized = 
mapper.writeValueAsString(spec.createSuspendedSpec());
+
+    // expect default values populated in reserialized string
+    Assert.assertTrue(suspendedSerialized.contains("\"tuningConfig\":{"));
+    Assert.assertTrue(suspendedSerialized.contains("\"indexSpec\":{"));
+    Assert.assertTrue(suspendedSerialized.contains("\"suspended\":true"));
+
+    KafkaSupervisorSpec suspendedSpec = mapper.readValue(suspendedSerialized, 
KafkaSupervisorSpec.class);
+
+    Assert.assertTrue(suspendedSpec.isSuspended());
+
+    String runningSerialized = 
mapper.writeValueAsString(spec.createRunningSpec());
+
+    KafkaSupervisorSpec runningSpec = mapper.readValue(runningSerialized, 
KafkaSupervisorSpec.class);
+
+    Assert.assertFalse(runningSpec.isSuspended());
+  }
+}
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 964f5f7cac6..978de2f6886 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -79,6 +79,7 @@
 import org.easymock.CaptureType;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockSupport;
+import org.easymock.IAnswer;
 import org.joda.time.DateTime;
 import org.joda.time.Duration;
 import org.joda.time.Period;
@@ -2293,6 +2294,149 @@ public void testCheckpointWithNullTaskGroupId()
     verifyAll();
   }
 
+  @Test
+  public void testSuspendedNoRunningTasks() throws Exception
+  {
+    supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false, true);
+    addSomeEvents(1);
+
+    
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+    
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+    
expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
+    
expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
+        new KafkaDataSourceMetadata(
+            null
+        )
+    ).anyTimes();
+    // this asserts that taskQueue.add does not in fact get called because 
supervisor should be suspended
+    expect(taskQueue.add(anyObject())).andAnswer((IAnswer) () -> {
+      Assert.fail();
+      return null;
+    }).anyTimes();
+    taskRunner.registerListener(anyObject(TaskRunnerListener.class), 
anyObject(Executor.class));
+    replayAll();
+
+    supervisor.start();
+    supervisor.runInternal();
+    verifyAll();
+  }
+
+  @Test
+  public void testSuspendedRunningTasks() throws Exception
+  {
+    // graceful shutdown is expected to be called on running tasks since state 
is suspended
+
+    final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
+    final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
+    final DateTime startTime = DateTimes.nowUtc();
+
+    supervisor = getSupervisor(2, 1, true, "PT1H", null, null, false, true);
+    addSomeEvents(1);
+
+    Task id1 = createKafkaIndexTask(
+        "id1",
+        DATASOURCE,
+        0,
+        new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
+        new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    Task id2 = createKafkaIndexTask(
+        "id2",
+        DATASOURCE,
+        0,
+        new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
+        new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    Task id3 = createKafkaIndexTask(
+        "id3",
+        DATASOURCE,
+        0,
+        new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
+        new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    Collection workItems = new ArrayList<>();
+    workItems.add(new TestTaskRunnerWorkItem(id1, null, location1));
+    workItems.add(new TestTaskRunnerWorkItem(id2, null, location2));
+
+    
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+    
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+    expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+    expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, 
id3)).anyTimes();
+    
expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
+    
expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
+    
expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
+    expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
+    expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
+    expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
+    
expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
+        new KafkaDataSourceMetadata(
+            null
+        )
+    ).anyTimes();
+    
expect(taskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.PUBLISHING));
+    
expect(taskClient.getStatusAsync("id2")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
+    
expect(taskClient.getStatusAsync("id3")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
+    
expect(taskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime));
+    
expect(taskClient.getStartTimeAsync("id3")).andReturn(Futures.immediateFuture(startTime));
+    expect(taskClient.getEndOffsets("id1")).andReturn(ImmutableMap.of(0, 10L, 
1, 20L, 2, 30L));
+
+    // getCheckpoints will not be called for id1 as it is in publishing state
+    TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
+    checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(1);
+
+    taskRunner.registerListener(anyObject(TaskRunnerListener.class), 
anyObject(Executor.class));
+
+    expect(taskClient.pauseAsync("id2"))
+        .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 15L, 1, 25L, 2, 
30L)));
+    expect(taskClient.setEndOffsetsAsync("id2", ImmutableMap.of(0, 15L, 1, 
25L, 2, 30L), true))
+        .andReturn(Futures.immediateFuture(true));
+    taskQueue.shutdown("id3");
+    expectLastCall().times(2);
+
+    replayAll();
+    supervisor.start();
+    supervisor.runInternal();
+    verifyAll();
+  }
+
+  @Test
+  public void testResetSuspended() throws Exception
+  {
+    
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+    
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+    
expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
+    
expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
+    taskRunner.registerListener(anyObject(TaskRunnerListener.class), 
anyObject(Executor.class));
+    replayAll();
+
+    supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false, true);
+    supervisor.start();
+    supervisor.runInternal();
+    verifyAll();
+
+    reset(indexerMetadataStorageCoordinator);
+    
expect(indexerMetadataStorageCoordinator.deleteDataSourceMetadata(DATASOURCE)).andReturn(true);
+    replay(indexerMetadataStorageCoordinator);
+
+    supervisor.resetInternal(null);
+    verifyAll();
+  }
+
   private void addSomeEvents(int numEventsPerPartition) throws Exception
   {
     try (final KafkaProducer<byte[], byte[]> kafkaProducer = 
kafkaServer.newProducer()) {
@@ -2320,6 +2464,29 @@ private KafkaSupervisor getSupervisor(
       Period earlyMessageRejectionPeriod,
       boolean skipOffsetGaps
   )
+  {
+    return getSupervisor(
+        replicas,
+        taskCount,
+        useEarliestOffset,
+        duration,
+        lateMessageRejectionPeriod,
+        earlyMessageRejectionPeriod,
+        skipOffsetGaps,
+        false
+    );
+  }
+
+  private KafkaSupervisor getSupervisor(
+      int replicas,
+      int taskCount,
+      boolean useEarliestOffset,
+      String duration,
+      Period lateMessageRejectionPeriod,
+      Period earlyMessageRejectionPeriod,
+      boolean skipOffsetGaps,
+      boolean suspended
+  )
   {
     KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new 
KafkaSupervisorIOConfig(
         topic,
@@ -2368,6 +2535,7 @@ public KafkaIndexTaskClient build(
             tuningConfig,
             kafkaSupervisorIOConfig,
             null,
+            suspended,
             taskStorage,
             taskMaster,
             indexerMetadataStorageCoordinator,
@@ -2476,7 +2644,7 @@ public String getTaskType()
     }
 
     @Override
-    public String getDataSource() 
+    public String getDataSource()
     {
       return dataSource;
     }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index b5c24ef913f..cca6fe51bd8 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -91,6 +91,19 @@ public boolean stopAndRemoveSupervisor(String id)
     }
   }
 
+  public boolean suspendOrResumeSupervisor(String id, boolean suspend)
+  {
+    Preconditions.checkState(started, "SupervisorManager not started");
+    Pair<Supervisor, SupervisorSpec> pair = supervisors.get(id);
+    Preconditions.checkNotNull(pair.lhs, "spec");
+    synchronized (lock) {
+      Preconditions.checkState(started, "SupervisorManager not started");
+      SupervisorSpec nextState = suspend ? pair.rhs.createSuspendedSpec() : 
pair.rhs.createRunningSpec();
+      possiblyStopAndRemoveSupervisorInternal(nextState.getId(), false);
+      return createAndStartSupervisorInternal(nextState, true);
+    }
+  }
+
   @LifecycleStart
   public void start()
   {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
index 9be9c9fb3b5..d3e19bbb4ab 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
@@ -44,6 +44,7 @@
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
@@ -52,6 +53,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * Endpoints for submitting and starting a {@link SupervisorSpec}, getting 
running supervisors, stopping supervisors,
@@ -90,49 +92,55 @@ public SupervisorResource(TaskMaster taskMaster, 
AuthorizerMapper authorizerMapp
   public Response specPost(final SupervisorSpec spec, @Context final 
HttpServletRequest req)
   {
     return asLeaderWithSupervisorManager(
-        new Function<SupervisorManager, Response>()
-        {
-          @Override
-          public Response apply(SupervisorManager manager)
-          {
-            Preconditions.checkArgument(
-                spec.getDataSources() != null && spec.getDataSources().size() 
> 0,
-                "No dataSources found to perform authorization checks"
-            );
-
-            Access authResult = AuthorizationUtils.authorizeAllResourceActions(
-                req,
-                Iterables.transform(spec.getDataSources(), 
AuthorizationUtils.DATASOURCE_WRITE_RA_GENERATOR),
-                authorizerMapper
-            );
+        manager -> {
+          Preconditions.checkArgument(
+              spec.getDataSources() != null && spec.getDataSources().size() > 
0,
+              "No dataSources found to perform authorization checks"
+          );
 
-            if (!authResult.isAllowed()) {
-              throw new ForbiddenException(authResult.toString());
-            }
+          Access authResult = AuthorizationUtils.authorizeAllResourceActions(
+              req,
+              Iterables.transform(spec.getDataSources(), 
AuthorizationUtils.DATASOURCE_WRITE_RA_GENERATOR),
+              authorizerMapper
+          );
 
-            manager.createOrUpdateAndStartSupervisor(spec);
-            return Response.ok(ImmutableMap.of("id", spec.getId())).build();
+          if (!authResult.isAllowed()) {
+            throw new ForbiddenException(authResult.toString());
           }
+
+          manager.createOrUpdateAndStartSupervisor(spec);
+          return Response.ok(ImmutableMap.of("id", spec.getId())).build();
         }
     );
   }
 
   @GET
   @Produces(MediaType.APPLICATION_JSON)
-  public Response specGetAll(@Context final HttpServletRequest req)
+  public Response specGetAll(
+      @QueryParam("full") String full,
+      @Context final HttpServletRequest req
+  )
   {
     return asLeaderWithSupervisorManager(
-        new Function<SupervisorManager, Response>()
-        {
-          @Override
-          public Response apply(final SupervisorManager manager)
-          {
-            Set<String> authorizedSupervisorIds = 
filterAuthorizedSupervisorIds(
-                req,
-                manager,
-                manager.getSupervisorIds()
-            );
+        manager -> {
+          Set<String> authorizedSupervisorIds = filterAuthorizedSupervisorIds(
+              req,
+              manager,
+              manager.getSupervisorIds()
+          );
+
+          if (full == null) {
             return Response.ok(authorizedSupervisorIds).build();
+          } else {
+            List<Map<String, ?>> all =
+                authorizedSupervisorIds.stream()
+                                       .map(x -> ImmutableMap.<String, 
Object>builder()
+                                           .put("id", x)
+                                           .put("spec", 
manager.getSupervisorSpec(x).get())
+                                           .build()
+                                       )
+                                       .collect(Collectors.toList());
+            return Response.ok(all).build();
           }
         }
     );
@@ -145,20 +153,15 @@ public Response apply(final SupervisorManager manager)
   public Response specGet(@PathParam("id") final String id)
   {
     return asLeaderWithSupervisorManager(
-        new Function<SupervisorManager, Response>()
-        {
-          @Override
-          public Response apply(SupervisorManager manager)
-          {
-            Optional<SupervisorSpec> spec = manager.getSupervisorSpec(id);
-            if (!spec.isPresent()) {
-              return Response.status(Response.Status.NOT_FOUND)
-                             .entity(ImmutableMap.of("error", 
StringUtils.format("[%s] does not exist", id)))
-                             .build();
-            }
-
-            return Response.ok(spec.get()).build();
+        manager -> {
+          Optional<SupervisorSpec> spec = manager.getSupervisorSpec(id);
+          if (!spec.isPresent()) {
+            return Response.status(Response.Status.NOT_FOUND)
+                           .entity(ImmutableMap.of("error", 
StringUtils.format("[%s] does not exist", id)))
+                           .build();
           }
+
+          return Response.ok(spec.get()).build();
         }
     );
   }
@@ -170,20 +173,15 @@ public Response apply(SupervisorManager manager)
   public Response specGetStatus(@PathParam("id") final String id)
   {
     return asLeaderWithSupervisorManager(
-        new Function<SupervisorManager, Response>()
-        {
-          @Override
-          public Response apply(SupervisorManager manager)
-          {
-            Optional<SupervisorReport> spec = manager.getSupervisorStatus(id);
-            if (!spec.isPresent()) {
-              return Response.status(Response.Status.NOT_FOUND)
-                             .entity(ImmutableMap.of("error", 
StringUtils.format("[%s] does not exist", id)))
-                             .build();
-            }
-
-            return Response.ok(spec.get()).build();
+        manager -> {
+          Optional<SupervisorReport> spec = manager.getSupervisorStatus(id);
+          if (!spec.isPresent()) {
+            return Response.status(Response.Status.NOT_FOUND)
+                           .entity(ImmutableMap.of("error", 
StringUtils.format("[%s] does not exist", id)))
+                           .build();
           }
+
+          return Response.ok(spec.get()).build();
         }
     );
   }
@@ -197,49 +195,66 @@ public Response getAllTaskStats(
   )
   {
     return asLeaderWithSupervisorManager(
-        new Function<SupervisorManager, Response>()
-        {
-          @Override
-          public Response apply(SupervisorManager manager)
-          {
-            Optional<Map<String, Map<String, Object>>> stats = 
manager.getSupervisorStats(id);
-            if (!stats.isPresent()) {
-              return Response.status(Response.Status.NOT_FOUND)
-                             .entity(
-                                 ImmutableMap.of(
-                                     "error",
-                                     StringUtils.format("[%s] does not exist", 
id)
-                                 )
-                             )
-                             .build();
-            }
-
-            return Response.ok(stats.get()).build();
+        manager -> {
+          Optional<Map<String, Map<String, Object>>> stats = 
manager.getSupervisorStats(id);
+          if (!stats.isPresent()) {
+            return Response.status(Response.Status.NOT_FOUND)
+                           .entity(
+                               ImmutableMap.of(
+                                   "error",
+                                   StringUtils.format("[%s] does not exist", 
id)
+                               )
+                           )
+                           .build();
           }
+
+          return Response.ok(stats.get()).build();
         }
     );
   }
 
+  @POST
+  @Path("/{id}/resume")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(SupervisorResourceFilter.class)
+  public Response specResume(@PathParam("id") final String id)
+  {
+    return specSuspendOrResume(id, false);
+  }
+
+  @POST
+  @Path("/{id}/suspend")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(SupervisorResourceFilter.class)
+  public Response specSuspend(@PathParam("id") final String id)
+  {
+    return specSuspendOrResume(id, true);
+  }
 
+  @Deprecated
   @POST
   @Path("/{id}/shutdown")
   @Produces(MediaType.APPLICATION_JSON)
   @ResourceFilters(SupervisorResourceFilter.class)
   public Response shutdown(@PathParam("id") final String id)
+  {
+    return terminate(id);
+  }
+
+  @POST
+  @Path("/{id}/terminate")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(SupervisorResourceFilter.class)
+  public Response terminate(@PathParam("id") final String id)
   {
     return asLeaderWithSupervisorManager(
-        new Function<SupervisorManager, Response>()
-        {
-          @Override
-          public Response apply(SupervisorManager manager)
-          {
-            if (manager.stopAndRemoveSupervisor(id)) {
-              return Response.ok(ImmutableMap.of("id", id)).build();
-            } else {
-              return Response.status(Response.Status.NOT_FOUND)
-                             .entity(ImmutableMap.of("error", 
StringUtils.format("[%s] does not exist", id)))
-                             .build();
-            }
+        manager -> {
+          if (manager.stopAndRemoveSupervisor(id)) {
+            return Response.ok(ImmutableMap.of("id", id)).build();
+          } else {
+            return Response.status(Response.Status.NOT_FOUND)
+                           .entity(ImmutableMap.of("error", 
StringUtils.format("[%s] does not exist", id)))
+                           .build();
           }
         }
     );
@@ -251,21 +266,14 @@ public Response apply(SupervisorManager manager)
   public Response specGetAllHistory(@Context final HttpServletRequest req)
   {
     return asLeaderWithSupervisorManager(
-        new Function<SupervisorManager, Response>()
-        {
-          @Override
-          public Response apply(final SupervisorManager manager)
-          {
-            return Response.ok(
-                AuthorizationUtils.filterAuthorizedResources(
-                    req,
-                    manager.getSupervisorHistory(),
-                    SPEC_DATASOURCE_READ_RA_GENERATOR,
-                    authorizerMapper
-                )
-            ).build();
-          }
-        }
+        manager -> Response.ok(
+            AuthorizationUtils.filterAuthorizedResources(
+                req,
+                manager.getSupervisorHistory(),
+                SPEC_DATASOURCE_READ_RA_GENERATOR,
+                authorizerMapper
+            )
+        ).build()
     );
   }
 
@@ -277,38 +285,33 @@ public Response specGetHistory(
       @PathParam("id") final String id)
   {
     return asLeaderWithSupervisorManager(
-        new Function<SupervisorManager, Response>()
-        {
-          @Override
-          public Response apply(SupervisorManager manager)
-          {
-            Map<String, List<VersionedSupervisorSpec>> supervisorHistory = 
manager.getSupervisorHistory();
-            Iterable<VersionedSupervisorSpec> historyForId = 
supervisorHistory.get(id);
-            if (historyForId != null) {
-              final List<VersionedSupervisorSpec> authorizedHistoryForId =
-                  Lists.newArrayList(
-                      AuthorizationUtils.filterAuthorizedResources(
-                          req,
-                          historyForId,
-                          SPEC_DATASOURCE_READ_RA_GENERATOR,
-                          authorizerMapper
-                      )
-                  );
-              if (authorizedHistoryForId.size() > 0) {
-                return Response.ok(authorizedHistoryForId).build();
-              }
+        manager -> {
+          Map<String, List<VersionedSupervisorSpec>> supervisorHistory = 
manager.getSupervisorHistory();
+          Iterable<VersionedSupervisorSpec> historyForId = 
supervisorHistory.get(id);
+          if (historyForId != null) {
+            final List<VersionedSupervisorSpec> authorizedHistoryForId =
+                Lists.newArrayList(
+                    AuthorizationUtils.filterAuthorizedResources(
+                        req,
+                        historyForId,
+                        SPEC_DATASOURCE_READ_RA_GENERATOR,
+                        authorizerMapper
+                    )
+                );
+            if (authorizedHistoryForId.size() > 0) {
+              return Response.ok(authorizedHistoryForId).build();
             }
+          }
 
-            return Response.status(Response.Status.NOT_FOUND)
-                           .entity(
-                               ImmutableMap.of(
-                                   "error",
-                                   StringUtils.format("No history for [%s].", 
id)
-                               )
-                           )
-                           .build();
+          return Response.status(Response.Status.NOT_FOUND)
+                         .entity(
+                             ImmutableMap.of(
+                                 "error",
+                                 StringUtils.format("No history for [%s].", id)
+                             )
+                         )
+                         .build();
 
-          }
         }
     );
   }
@@ -320,18 +323,13 @@ public Response apply(SupervisorManager manager)
   public Response reset(@PathParam("id") final String id)
   {
     return asLeaderWithSupervisorManager(
-        new Function<SupervisorManager, Response>()
-        {
-          @Override
-          public Response apply(SupervisorManager manager)
-          {
-            if (manager.resetSupervisor(id, null)) {
-              return Response.ok(ImmutableMap.of("id", id)).build();
-            } else {
-              return Response.status(Response.Status.NOT_FOUND)
-                             .entity(ImmutableMap.of("error", 
StringUtils.format("[%s] does not exist", id)))
-                             .build();
-            }
+        manager -> {
+          if (manager.resetSupervisor(id, null)) {
+            return Response.ok(ImmutableMap.of("id", id)).build();
+          } else {
+            return Response.status(Response.Status.NOT_FOUND)
+                           .entity(ImmutableMap.of("error", 
StringUtils.format("[%s] does not exist", id)))
+                           .build();
           }
         }
     );
@@ -375,4 +373,29 @@ private Response 
asLeaderWithSupervisorManager(Function<SupervisorManager, Respo
         )
     );
   }
+
+  private Response specSuspendOrResume(final String id, boolean suspend)
+  {
+    return asLeaderWithSupervisorManager(
+        manager -> {
+          Optional<SupervisorSpec> spec = manager.getSupervisorSpec(id);
+          if (!spec.isPresent()) {
+            return Response.status(Response.Status.NOT_FOUND)
+                           .entity(ImmutableMap.of("error", 
StringUtils.format("[%s] does not exist", id)))
+                           .build();
+          }
+
+          if (spec.get().isSuspended() == suspend) {
+            final String errMsg =
+                StringUtils.format("[%s] is already %s", id, suspend ? 
"suspended" : "running");
+            return Response.status(Response.Status.BAD_REQUEST)
+                           .entity(ImmutableMap.of("error", errMsg))
+                           .build();
+          }
+          manager.suspendOrResumeSupervisor(id, suspend);
+          spec = manager.getSupervisorSpec(id);
+          return Response.ok(spec.get()).build();
+        }
+    );
+  }
 }
diff --git 
a/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js 
b/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js
index 5ec87c588fa..a3bf8e6a3f2 100644
--- a/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js
+++ b/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js
@@ -6,7 +6,7 @@ var killTask = function(taskId) {
   if(confirm('Do you really want to kill: '+taskId)) {
     $.ajax({
       type:'POST',
-      url: '/druid/indexer/v1/task/'+ taskId +'/shutdown',
+      url: '/druid/indexer/v1/task/'+ taskId +'/terminate',
       data: ''
     }).done(function(data) {
       setTimeout(function() { location.reload(true) }, 750);
@@ -16,6 +16,42 @@ var killTask = function(taskId) {
   }
 }
 
+
+var suspendSupervisor = function(supervisorId) {
+  if(confirm('Do you really want to suspend: '+ supervisorId)) {
+    $.ajax({
+      type:'POST',
+      url: '/druid/indexer/v1/supervisor/' + supervisorId + '/suspend',
+      data: ''
+    }).done(function(data) {
+      setTimeout(function() { location.reload(true) }, 750);
+    }).fail(function(data) {
+      var errMsg = data && data.responseJSON && data.responseJSON.error ?
+           data.responseJSON.error :
+          'suspend request failed, please check overlord logs for details.';
+      alert(errMsg);
+    })
+  }
+}
+
+
+var resumeSupervisor = function(supervisorId) {
+  if(confirm('Do you really want to resume: '+ supervisorId)) {
+    $.ajax({
+      type:'POST',
+      url: '/druid/indexer/v1/supervisor/' + supervisorId + '/resume',
+      data: ''
+    }).done(function(data) {
+      setTimeout(function() { location.reload(true) }, 750);
+    }).fail(function(data) {
+      var errMsg = data && data.responseJSON && data.responseJSON.error ?
+       data.responseJSON.error :
+      'resume request failed, please check overlord logs for details.';
+      alert(errMsg);
+    })
+  }
+}
+
 var resetSupervisor = function(supervisorId) {
   if(confirm('Do you really want to reset: '+ supervisorId)) {
     $.ajax({
@@ -31,7 +67,7 @@ var resetSupervisor = function(supervisorId) {
 }
 
 var shutdownSupervisor = function(supervisorId) {
-  if(confirm('Do you really want to shutdown: '+ supervisorId)) {
+  if(confirm('Do you really want to terminate: '+ supervisorId)) {
     $.ajax({
       type:'POST',
       url: '/druid/indexer/v1/supervisor/' + supervisorId + '/shutdown',
@@ -39,7 +75,7 @@ var shutdownSupervisor = function(supervisorId) {
     }).done(function(data) {
       setTimeout(function() { location.reload(true) }, 750);
     }).fail(function(data) {
-      alert('Shutdown request failed, please check overlord logs for 
details.');
+      alert('Terminate request failed, please check overlord logs for 
details.');
     })
   }
 }
@@ -59,18 +95,28 @@ $(document).ready(function() {
     }
   }
 
-  $.get('/druid/indexer/v1/supervisor', function(dataList) {
+  $.get('/druid/indexer/v1/supervisor?full', function(dataList) {
+
     var data = []
     for (i = 0 ; i < dataList.length ; i++) {
-      var supervisorId = encodeURIComponent(dataList[i])
+      var supervisorId = encodeURIComponent(dataList[i].id)
+      var supervisorSpec = dataList[i].spec;
+      var statusText = supervisorSpec && supervisorSpec.suspended ?
+       '<span style="color:#FF6000">suspended</span>' :
+       '<span style="color:#08B157">running</span>';
       data[i] = {
         "dataSource" : supervisorId,
         "more" :
           '<a href="/druid/indexer/v1/supervisor/' + supervisorId + 
'">payload</a>' +
           '<a href="/druid/indexer/v1/supervisor/' + supervisorId + 
'/status">status</a>' +
           '<a href="/druid/indexer/v1/supervisor/' + supervisorId + 
'/history">history</a>' +
+          (supervisorSpec.suspended ?
+           '<a style="padding-right:5px;" onclick="resumeSupervisor(\'' + 
supervisorId + '\');">resume</a>' :
+           '<a onclick="suspendSupervisor(\'' + supervisorId + 
'\');">suspend</a>'
+          ) +
           '<a onclick="resetSupervisor(\'' + supervisorId + '\');">reset</a>' +
-          '<a onclick="shutdownSupervisor(\'' + supervisorId + 
'\');">shutdown</a>'
+          '<a onclick="shutdownSupervisor(\'' + supervisorId + 
'\');">terminate</a>',
+        "status": statusText
       }
     }
     buildTable((data), $('#supervisorsTable'));
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java
index abdd60a9dcd..0a977093f50 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java
@@ -131,6 +131,24 @@ public Supervisor createSupervisor()
         {
           return ImmutableList.of("test");
         }
+
+        @Override
+        public SupervisorSpec createSuspendedSpec()
+        {
+          return null;
+        }
+
+        @Override
+        public SupervisorSpec createRunningSpec()
+        {
+          return null;
+        }
+
+        @Override
+        public boolean isSuspended()
+        {
+          return false;
+        }
       };
       
EasyMock.expect(supervisorManager.getSupervisorSpec(EasyMock.anyString()))
               .andReturn(Optional.of(supervisorSpec))
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
index f91dc1adfcb..85cfd95c726 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
@@ -24,6 +24,7 @@
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.metadata.MetadataSupervisorManager;
+import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockRunner;
 import org.easymock.EasyMockSupport;
@@ -40,6 +41,7 @@
 import java.util.Map;
 
 import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.eq;
 
 @RunWith(EasyMockRunner.class)
@@ -263,15 +265,114 @@ public void testResetSupervisor()
     verifyAll();
   }
 
+  @Test
+  public void testCreateSuspendResumeAndStopSupervisor()
+  {
+    Capture<TestSupervisorSpec> capturedInsert = Capture.newInstance();
+    SupervisorSpec spec = new TestSupervisorSpec("id1", supervisor1, false, 
supervisor2);
+    Map<String, SupervisorSpec> existingSpecs = ImmutableMap.of(
+        "id3", new TestSupervisorSpec("id3", supervisor3)
+    );
+
+    // mock adding a supervisor to manager with existing supervisor then 
suspending it
+    Assert.assertTrue(manager.getSupervisorIds().isEmpty());
+
+    
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
+    metadataSupervisorManager.insert("id1", spec);
+    supervisor3.start();
+    supervisor1.start();
+    replayAll();
+
+    manager.start();
+    Assert.assertEquals(1, manager.getSupervisorIds().size());
+
+    manager.createOrUpdateAndStartSupervisor(spec);
+    Assert.assertEquals(2, manager.getSupervisorIds().size());
+    Assert.assertEquals(spec, manager.getSupervisorSpec("id1").get());
+    verifyAll();
+
+    // mock suspend, which stops supervisor1 and sets suspended state in 
metadata, flipping to supervisor2
+    // in TestSupervisorSpec implementation of createSuspendedSpec
+    resetAll();
+    metadataSupervisorManager.insert(eq("id1"), capture(capturedInsert));
+    supervisor2.start();
+    supervisor1.stop(true);
+    replayAll();
+
+    manager.suspendOrResumeSupervisor("id1", true);
+    Assert.assertEquals(2, manager.getSupervisorIds().size());
+    Assert.assertEquals(capturedInsert.getValue(), 
manager.getSupervisorSpec("id1").get());
+    Assert.assertTrue(capturedInsert.getValue().suspended);
+    verifyAll();
+
+    // mock resume, which stops supervisor2 and sets suspended to false in 
metadata, flipping to supervisor1
+    // in TestSupervisorSpec implementation of createRunningSpec
+    resetAll();
+    metadataSupervisorManager.insert(eq("id1"), capture(capturedInsert));
+    supervisor2.stop(true);
+    supervisor1.start();
+    replayAll();
+
+    manager.suspendOrResumeSupervisor("id1", false);
+    Assert.assertEquals(2, manager.getSupervisorIds().size());
+    Assert.assertEquals(capturedInsert.getValue(), 
manager.getSupervisorSpec("id1").get());
+    Assert.assertFalse(capturedInsert.getValue().suspended);
+    verifyAll();
+
+    // mock stop of suspended then resumed supervisor
+    resetAll();
+    metadataSupervisorManager.insert(eq("id1"), 
anyObject(NoopSupervisorSpec.class));
+    supervisor1.stop(true);
+    replayAll();
+
+    boolean retVal = manager.stopAndRemoveSupervisor("id1");
+    Assert.assertTrue(retVal);
+    Assert.assertEquals(1, manager.getSupervisorIds().size());
+    Assert.assertEquals(Optional.absent(), manager.getSupervisorSpec("id1"));
+    verifyAll();
+
+    // mock manager shutdown to ensure supervisor 3 stops
+    resetAll();
+    supervisor3.stop(false);
+    replayAll();
+
+    manager.stop();
+    verifyAll();
+
+    Assert.assertTrue(manager.getSupervisorIds().isEmpty());
+  }
+
+
   private static class TestSupervisorSpec implements SupervisorSpec
   {
     private final String id;
     private final Supervisor supervisor;
+    private final boolean suspended;
+    private final Supervisor suspendedSupervisor;
+
 
     public TestSupervisorSpec(String id, Supervisor supervisor)
+    {
+      this(id, supervisor, false, null);
+    }
+
+    public TestSupervisorSpec(String id, Supervisor supervisor, boolean 
suspended, Supervisor suspendedSupervisor)
     {
       this.id = id;
       this.supervisor = supervisor;
+      this.suspended = suspended;
+      this.suspendedSupervisor = suspendedSupervisor;
+    }
+    @Override
+    public SupervisorSpec createSuspendedSpec()
+    {
+      return new TestSupervisorSpec(id, suspendedSupervisor, true, supervisor);
+    }
+
+    @Override
+    public SupervisorSpec createRunningSpec()
+    {
+      return new TestSupervisorSpec(id, suspendedSupervisor, false, 
supervisor);
     }
 
     @Override
@@ -286,11 +387,16 @@ public Supervisor createSupervisor()
       return supervisor;
     }
 
+    @Override
+    public boolean isSuspended()
+    {
+      return suspended;
+    }
+
     @Override
     public List<String> getDataSources()
     {
       return new ArrayList<>();
     }
-
   }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
index a3434bc4518..9d6eab33e1e 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
@@ -29,12 +29,10 @@
 import org.apache.druid.indexing.overlord.TaskMaster;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.server.security.Access;
-import org.apache.druid.server.security.Action;
 import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.server.security.AuthenticationResult;
 import org.apache.druid.server.security.Authorizer;
 import org.apache.druid.server.security.AuthorizerMapper;
-import org.apache.druid.server.security.Resource;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockRunner;
@@ -75,21 +73,14 @@ public void setUp()
           @Override
           public Authorizer getAuthorizer(String name)
           {
-            return new Authorizer()
-            {
-              @Override
-              public Access authorize(
-                  AuthenticationResult authenticationResult, Resource 
resource, Action action
-              )
-              {
-                if (authenticationResult.getIdentity().equals("druid")) {
-                  return Access.OK;
+            return (authenticationResult, resource, action) -> {
+              if (authenticationResult.getIdentity().equals("druid")) {
+                return Access.OK;
+              } else {
+                if (resource.getName().equals("datasource2")) {
+                  return new Access(false, "not authorized.");
                 } else {
-                  if (resource.getName().equals("datasource2")) {
-                    return new Access(false, "not authorized.");
-                  } else {
-                    return Access.OK;
-                  }
+                  return Access.OK;
                 }
               }
             };
@@ -171,7 +162,7 @@ public void testSpecGetAll()
     EasyMock.expectLastCall().anyTimes();
     replayAll();
 
-    Response response = supervisorResource.specGetAll(request);
+    Response response = supervisorResource.specGetAll(null, request);
     verifyAll();
 
     Assert.assertEquals(200, response.getStatus());
@@ -181,12 +172,61 @@ public void testSpecGetAll()
     
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent());
     replayAll();
 
-    response = supervisorResource.specGetAll(request);
+    response = supervisorResource.specGetAll(null, request);
     verifyAll();
 
     Assert.assertEquals(503, response.getStatus());
   }
 
+  @Test
+  public void testSpecGetAllFull()
+  {
+    Set<String> supervisorIds = ImmutableSet.of("id1", "id2");
+
+    SupervisorSpec spec1 = new TestSupervisorSpec("id1", null, null) {
+
+      @Override
+      public List<String> getDataSources()
+      {
+        return Collections.singletonList("datasource1");
+      }
+    };
+    SupervisorSpec spec2 = new TestSupervisorSpec("id2", null, null) {
+
+      @Override
+      public List<String> getDataSources()
+      {
+        return Collections.singletonList("datasource2");
+      }
+    };
+
+    
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+    
EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(supervisorIds).atLeastOnce();
+    
EasyMock.expect(supervisorManager.getSupervisorSpec("id1")).andReturn(Optional.of(spec1)).times(2);
+    
EasyMock.expect(supervisorManager.getSupervisorSpec("id2")).andReturn(Optional.of(spec2)).times(2);
+    
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).atLeastOnce();
+    
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce();
+    
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(
+        new AuthenticationResult("druid", "druid", null, null)
+    ).atLeastOnce();
+    request.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true);
+    EasyMock.expectLastCall().anyTimes();
+    replayAll();
+
+    Response response = supervisorResource.specGetAll("", request);
+    verifyAll();
+
+    Assert.assertEquals(200, response.getStatus());
+    List<Map<String, Object>> specs = (List<Map<String, Object>>) 
response.getEntity();
+    Assert.assertTrue(
+        specs.stream()
+             .allMatch(spec ->
+                           ("id1".equals(spec.get("id")) && 
spec1.equals(spec.get("spec"))) ||
+                           ("id2".equals(spec.get("id")) && 
spec2.equals(spec.get("spec")))
+             )
+    );
+  }
+
   @Test
   public void testSpecGet()
   {
@@ -249,6 +289,101 @@ public void testSpecGetStatus()
     Assert.assertEquals(503, response.getStatus());
   }
 
+  @Test
+  public void testSpecSuspend()
+  {
+
+    TestSupervisorSpec running = new TestSupervisorSpec("my-id", null, null, 
false) {
+      @Override
+      public List<String> getDataSources()
+      {
+        return Collections.singletonList("datasource1");
+      }
+    };
+    TestSupervisorSpec suspended = new TestSupervisorSpec("my-id", null, null, 
true) {
+      @Override
+      public List<String> getDataSources()
+      {
+        return Collections.singletonList("datasource1");
+      }
+    };
+
+    
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+    EasyMock.expect(supervisorManager.getSupervisorSpec("my-id"))
+            .andReturn(Optional.of(running)).times(1)
+            .andReturn(Optional.of(suspended)).times(1);
+    EasyMock.expect(supervisorManager.suspendOrResumeSupervisor("my-id", 
true)).andReturn(true);
+    EasyMock.expectLastCall().anyTimes();
+    replayAll();
+
+    Response response = supervisorResource.specSuspend("my-id");
+    verifyAll();
+
+    Assert.assertEquals(200, response.getStatus());
+    TestSupervisorSpec responseSpec = (TestSupervisorSpec) 
response.getEntity();
+    Assert.assertEquals(suspended.id, responseSpec.id);
+    Assert.assertEquals(suspended.suspended, responseSpec.suspended);
+    resetAll();
+
+    
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+    
EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(suspended)).atLeastOnce();
+    replayAll();
+
+    response = supervisorResource.specSuspend("my-id");
+    verifyAll();
+
+    Assert.assertEquals(400, response.getStatus());
+    Assert.assertEquals(ImmutableMap.of("error", "[my-id] is already 
suspended"), response.getEntity());
+  }
+
+
+
+  @Test
+  public void testSpecResume()
+  {
+    TestSupervisorSpec suspended = new TestSupervisorSpec("my-id", null, null, 
true) {
+      @Override
+      public List<String> getDataSources()
+      {
+        return Collections.singletonList("datasource1");
+      }
+    };
+    TestSupervisorSpec running = new TestSupervisorSpec("my-id", null, null, 
false) {
+      @Override
+      public List<String> getDataSources()
+      {
+        return Collections.singletonList("datasource1");
+      }
+    };
+
+    
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+    EasyMock.expect(supervisorManager.getSupervisorSpec("my-id"))
+            .andReturn(Optional.of(suspended)).times(1)
+            .andReturn(Optional.of(running)).times(1);
+    EasyMock.expect(supervisorManager.suspendOrResumeSupervisor("my-id", 
false)).andReturn(true);
+    EasyMock.expectLastCall().anyTimes();
+    replayAll();
+
+    Response response = supervisorResource.specResume("my-id");
+    verifyAll();
+
+    Assert.assertEquals(200, response.getStatus());
+    TestSupervisorSpec responseSpec = (TestSupervisorSpec) 
response.getEntity();
+    Assert.assertEquals(running.id, responseSpec.id);
+    Assert.assertEquals(running.suspended, responseSpec.suspended);
+    resetAll();
+
+    
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+    
EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(running)).atLeastOnce();
+    replayAll();
+
+    response = supervisorResource.specResume("my-id");
+    verifyAll();
+
+    Assert.assertEquals(400, response.getStatus());
+    Assert.assertEquals(ImmutableMap.of("error", "[my-id] is already 
running"), response.getEntity());
+  }
+
   @Test
   public void testShutdown()
   {
@@ -762,9 +897,10 @@ public void testNoopSupervisorSpecSerde() throws Exception
 
   private static class TestSupervisorSpec implements SupervisorSpec
   {
-    private final String id;
-    private final Supervisor supervisor;
-    private final List<String> datasources;
+    protected final String id;
+    protected final Supervisor supervisor;
+    protected final List<String> datasources;
+    boolean suspended;
 
     public TestSupervisorSpec(String id, Supervisor supervisor, List<String> 
datasources)
     {
@@ -773,6 +909,12 @@ public TestSupervisorSpec(String id, Supervisor 
supervisor, List<String> datasou
       this.datasources = datasources;
     }
 
+    public TestSupervisorSpec(String id, Supervisor supervisor, List<String> 
datasources, boolean suspended)
+    {
+      this(id, supervisor, datasources);
+      this.suspended = suspended;
+    }
+
     @Override
     public String getId()
     {
@@ -791,6 +933,25 @@ public Supervisor createSupervisor()
       return datasources;
     }
 
+
+    @Override
+    public SupervisorSpec createSuspendedSpec()
+    {
+      return new TestSupervisorSpec(id, supervisor, datasources, true);
+    }
+
+    @Override
+    public SupervisorSpec createRunningSpec()
+    {
+      return new TestSupervisorSpec(id, supervisor, datasources, false);
+    }
+
+    @Override
+    public boolean isSuspended()
+    {
+      return suspended;
+    }
+
     @Override
     public boolean equals(Object o)
     {
@@ -809,7 +970,10 @@ public boolean equals(Object o)
       if (supervisor != null ? !supervisor.equals(that.supervisor) : 
that.supervisor != null) {
         return false;
       }
-      return datasources != null ? datasources.equals(that.datasources) : 
that.datasources == null;
+      if (datasources != null ? !datasources.equals(that.datasources) : 
that.datasources != null) {
+        return false;
+      }
+      return isSuspended() == that.isSuspended();
 
     }
 
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
index c32ea7af0b2..6935a62f545 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
@@ -21,6 +21,7 @@
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 
 import javax.annotation.Nullable;
@@ -44,14 +45,28 @@
   @JsonProperty("id")
   private String id;
 
+  @JsonProperty("suspended")
+  private boolean suspended; //ignored
+
+  @VisibleForTesting
+  public NoopSupervisorSpec(
+      String id,
+      List<String> datasources
+  )
+  {
+    this(id, datasources, null);
+  }
+
   @JsonCreator
   public NoopSupervisorSpec(
       @Nullable @JsonProperty("id") String id,
-      @Nullable @JsonProperty("dataSources") List<String> datasources
+      @Nullable @JsonProperty("dataSources") List<String> datasources,
+      @Nullable @JsonProperty("suspended") Boolean suspended
   )
   {
     this.id = id;
     this.datasources = datasources == null ? new ArrayList<>() : datasources;
+    this.suspended = false; // ignore
   }
 
   @Override
@@ -61,6 +76,22 @@ public String getId()
     return id;
   }
 
+
+  @Override
+  @Nullable
+  @JsonProperty("dataSources")
+  public List<String> getDataSources()
+  {
+    return datasources;
+  }
+
+  @Override
+  @JsonProperty("suspended")
+  public boolean isSuspended()
+  {
+    return suspended;
+  }
+
   @Override
   public Supervisor createSupervisor()
   {
@@ -95,11 +126,15 @@ public void checkpoint(
   }
 
   @Override
-  @Nullable
-  @JsonProperty("dataSources")
-  public List<String> getDataSources()
+  public SupervisorSpec createRunningSpec()
   {
-    return datasources;
+    return new NoopSupervisorSpec(id, datasources);
+  }
+
+  @Override
+  public SupervisorSpec createSuspendedSpec()
+  {
+    return new NoopSupervisorSpec(id, datasources);
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
index 01bcb4cea56..56717691fde 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
@@ -21,6 +21,7 @@
 
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
 
 import java.util.List;
 
@@ -41,4 +42,19 @@
   Supervisor createSupervisor();
 
   List<String> getDataSources();
+
+  default SupervisorSpec createSuspendedSpec()
+  {
+    throw new NotImplementedException();
+  }
+
+  default SupervisorSpec createRunningSpec()
+  {
+    throw new NotImplementedException();
+  }
+
+  default boolean isSuspended()
+  {
+    return false;
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to