jon-wei closed pull request #6234: 'suspend' and 'resume' support for supervisors (kafka indexing service, materialized views) URL: https://github.com/apache/incubator-druid/pull/6234
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/content/development/extensions-core/kafka-ingestion.md b/docs/content/development/extensions-core/kafka-ingestion.md index 7fbacf4c587..7b17c46a270 100644 --- a/docs/content/development/extensions-core/kafka-ingestion.md +++ b/docs/content/development/extensions-core/kafka-ingestion.md @@ -194,17 +194,73 @@ existing publishing tasks and will create new tasks starting at the offsets the Seamless schema migrations can thus be achieved by simply submitting the new schema using this endpoint. -#### Shutdown Supervisor +#### Suspend Supervisor + +``` +POST /druid/indexer/v1/supervisor/<supervisorId>/suspend +``` +Suspend indexing tasks associated with a supervisor. Note that the supervisor itself will still be +operating and emitting logs and metrics, it will just ensure that no indexing tasks are running until the supervisor +is resumed. Responds with updated SupervisorSpec. + +#### Resume Supervisor + +``` +POST /druid/indexer/v1/supervisor/<supervisorId>/resume +``` +Resume indexing tasks for a supervisor. Responds with updated SupervisorSpec. + +#### Reset Supervisor +``` +POST /druid/indexer/v1/supervisor/<supervisorId>/reset +``` +The indexing service keeps track of the latest persisted Kafka offsets in order to provide exactly-once ingestion +guarantees across tasks. Subsequent tasks must start reading from where the previous task completed in order for the +generated segments to be accepted. If the messages at the expected starting offsets are no longer available in Kafka +(typically because the message retention period has elapsed or the topic was removed and re-created) the supervisor will +refuse to start and in-flight tasks will fail. + +This endpoint can be used to clear the stored offsets which will cause the supervisor to start reading from +either the earliest or latest offsets in Kafka (depending on the value of `useEarliestOffset`). The supervisor must be +running for this endpoint to be available. After the stored offsets are cleared, the supervisor will automatically kill +and re-create any active tasks so that tasks begin reading from valid offsets. + +Note that since the stored offsets are necessary to guarantee exactly-once ingestion, resetting them with this endpoint +may cause some Kafka messages to be skipped or to be read twice. + +#### Terminate Supervisor +``` +POST /druid/indexer/v1/supervisor/<supervisorId>/terminate +``` +Terminate a supervisor and cause all associated indexing tasks managed by this supervisor to immediately stop and begin +publishing their segments. This supervisor will still exist in the metadata store and it's history may be retrieved +with the supervisor history api, but will not be listed in the 'get supervisors' api response nor can it's configuration +or status report be retrieved. The only way this supervisor can start again is by submitting a functioning supervisor +spec to the create api. + +#### Shutdown Supervisor +_Deprecated: use the equivalent 'terminate' instead_ ``` POST /druid/indexer/v1/supervisor/<supervisorId>/shutdown ``` -Note that this will cause all indexing tasks managed by this supervisor to immediately stop and begin publishing their segments. #### Get Supervisor IDs ``` GET /druid/indexer/v1/supervisor ``` -Returns a list of the currently active supervisors. +Returns a list of strings of the currently active supervisor ids. + +#### Get Supervisors +``` +GET /druid/indexer/v1/supervisor?full +``` +Returns a list of objects of the currently active supervisors. + +|Field|Type|Description| +|---|---|---| +|`id`|String|supervisor unique identifier| +|`spec`|SupervisorSpec|json specification of supervisor (See Supervisor Configuration for details)| + #### Get Supervisor Spec ``` @@ -233,24 +289,6 @@ GET /druid/indexer/v1/supervisor/<supervisorId>/history ``` Returns an audit history of specs for the supervisor with the provided ID. -#### Reset Supervisor -``` -POST /druid/indexer/v1/supervisor/<supervisorId>/reset -``` -The indexing service keeps track of the latest persisted Kafka offsets in order to provide exactly-once ingestion -guarantees across tasks. Subsequent tasks must start reading from where the previous task completed in order for the -generated segments to be accepted. If the messages at the expected starting offsets are no longer available in Kafka -(typically because the message retention period has elapsed or the topic was removed and re-created) the supervisor will -refuse to start and in-flight tasks will fail. - -This endpoint can be used to clear the stored offsets which will cause the supervisor to start reading from -either the earliest or latest offsets in Kafka (depending on the value of `useEarliestOffset`). The supervisor must be -running for this endpoint to be available. After the stored offsets are cleared, the supervisor will automatically kill -and re-create any active tasks so that tasks begin reading from valid offsets. - -Note that since the stored offsets are necessary to guarantee exactly-once ingestion, resetting them with this endpoint -may cause some Kafka messages to be skipped or to be read twice. - ## Capacity Planning Kafka indexing tasks run on middle managers and are thus limited by the resources available in the middle manager diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java index 56afe3efd5e..d4e7f08d657 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java @@ -135,31 +135,7 @@ public void start() exec = MoreExecutors.listeningDecorator(Execs.scheduledSingleThreaded(supervisorId)); final Duration delay = config.getTaskCheckDuration().toStandardDuration(); future = exec.scheduleWithFixedDelay( - new Runnable() { - @Override - public void run() - { - try { - DataSourceMetadata metadata = metadataStorageCoordinator.getDataSourceMetadata(dataSource); - if (metadata instanceof DerivativeDataSourceMetadata - && spec.getBaseDataSource().equals(((DerivativeDataSourceMetadata) metadata).getBaseDataSource()) - && spec.getDimensions().equals(((DerivativeDataSourceMetadata) metadata).getDimensions()) - && spec.getMetrics().equals(((DerivativeDataSourceMetadata) metadata).getMetrics())) { - checkSegmentsAndSubmitTasks(); - } else { - log.error( - "Failed to start %s. Metadata in database(%s) is different from new dataSource metadata(%s)", - supervisorId, - metadata, - spec - ); - } - } - catch (Exception e) { - log.makeAlert(e, StringUtils.format("uncaught exception in %s.", supervisorId)).emit(); - } - } - }, + MaterializedViewSupervisor.this::run, 0, delay.getMillis(), TimeUnit.MILLISECONDS @@ -167,7 +143,40 @@ public void run() started = true; } } - + + @VisibleForTesting + public void run() + { + try { + if (spec.isSuspended()) { + log.info( + "Materialized view supervisor[%s:%s] is suspended", + spec.getId(), + spec.getDataSourceName() + ); + return; + } + + DataSourceMetadata metadata = metadataStorageCoordinator.getDataSourceMetadata(dataSource); + if (metadata instanceof DerivativeDataSourceMetadata + && spec.getBaseDataSource().equals(((DerivativeDataSourceMetadata) metadata).getBaseDataSource()) + && spec.getDimensions().equals(((DerivativeDataSourceMetadata) metadata).getDimensions()) + && spec.getMetrics().equals(((DerivativeDataSourceMetadata) metadata).getMetrics())) { + checkSegmentsAndSubmitTasks(); + } else { + log.error( + "Failed to start %s. Metadata in database(%s) is different from new dataSource metadata(%s)", + supervisorId, + metadata, + spec + ); + } + } + catch (Exception e) { + log.makeAlert(e, StringUtils.format("uncaught exception in %s.", supervisorId)).emit(); + } + } + @Override public void stop(boolean stopGracefully) { @@ -207,7 +216,8 @@ public SupervisorReport getStatus() { return new MaterializedViewSupervisorReport( dataSource, - DateTimes.nowUtc(), + DateTimes.nowUtc(), + spec.isSuspended(), spec.getBaseDataSource(), spec.getDimensions(), spec.getMetrics(), diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorReport.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorReport.java index 107354fe08f..f05f8e0f6d1 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorReport.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorReport.java @@ -29,10 +29,11 @@ public class MaterializedViewSupervisorReport extends SupervisorReport { - + public MaterializedViewSupervisorReport( String dataSource, DateTime generationTime, + boolean suspended, String baseDataSource, Set<String> dimensions, Set<String> metrics, @@ -42,6 +43,7 @@ public MaterializedViewSupervisorReport( super(dataSource, generationTime, "{" + "dataSource='" + dataSource + '\'' + ", baseDataSource='" + baseDataSource + '\'' + + ", suspended='" + suspended + "\'" + ", dimensions=" + dimensions + ", metrics=" + metrics + ", missTimeline" + Sets.newHashSet(missTimeline) + diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java index b81d85e297a..29904a47635 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java @@ -81,6 +81,7 @@ private final MaterializedViewTaskConfig config; private final AuthorizerMapper authorizerMapper; private final ChatHandlerProvider chatHandlerProvider; + private final boolean suspended; public MaterializedViewSupervisorSpec( @JsonProperty("baseDataSource") String baseDataSource, @@ -92,6 +93,7 @@ public MaterializedViewSupervisorSpec( @JsonProperty("hadoopDependencyCoordinates") List<String> hadoopDependencyCoordinates, @JsonProperty("classpathPrefix") String classpathPrefix, @JsonProperty("context") Map<String, Object> context, + @JsonProperty("suspended") Boolean suspended, @JacksonInject ObjectMapper objectMapper, @JacksonInject TaskMaster taskMaster, @JacksonInject TaskStorage taskStorage, @@ -139,7 +141,8 @@ public MaterializedViewSupervisorSpec( this.authorizerMapper = authorizerMapper; this.chatHandlerProvider = chatHandlerProvider; this.config = config; - + this.suspended = suspended != null ? suspended : false; + this.metrics = Sets.newHashSet(); for (AggregatorFactory aggregatorFactory : aggregators) { metrics.add(aggregatorFactory.getName()); @@ -305,7 +308,14 @@ public String getClasspathPrefix() { return context; } - + + @Override + @JsonProperty("suspended") + public boolean isSuspended() + { + return suspended; + } + @Override public String getId() { @@ -331,7 +341,59 @@ public Supervisor createSupervisor() { return ImmutableList.of(dataSourceName); } - + + @Override + public SupervisorSpec createSuspendedSpec() + { + return new MaterializedViewSupervisorSpec( + baseDataSource, + dimensionsSpec, + aggregators, + tuningConfig, + dataSourceName, + hadoopCoordinates, + hadoopDependencyCoordinates, + classpathPrefix, + context, + true, + objectMapper, + taskMaster, + taskStorage, + metadataSupervisorManager, + segmentManager, + metadataStorageCoordinator, + config, + authorizerMapper, + chatHandlerProvider + ); + } + + @Override + public SupervisorSpec createRunningSpec() + { + return new MaterializedViewSupervisorSpec( + baseDataSource, + dimensionsSpec, + aggregators, + tuningConfig, + dataSourceName, + hadoopCoordinates, + hadoopDependencyCoordinates, + classpathPrefix, + context, + false, + objectMapper, + taskMaster, + taskStorage, + metadataSupervisorManager, + segmentManager, + metadataStorageCoordinator, + config, + authorizerMapper, + chatHandlerProvider + ); + } + @Override public String toString() { diff --git a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java index b9c816083f8..40da151f1d4 100644 --- a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java +++ b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java @@ -75,6 +75,7 @@ public void setup() .addValue(ChatHandlerProvider.class, new NoopChatHandlerProvider()) ); } + @Test public void testSupervisorSerialization() throws IOException { @@ -132,6 +133,7 @@ public void testSupervisorSerialization() throws IOException null, null, null, + false, objectMapper, null, null, @@ -150,6 +152,51 @@ public void testSupervisorSerialization() throws IOException Assert.assertEquals(expected.getMetrics(), spec.getMetrics()); } + @Test + public void testSuspendResuume() throws IOException + { + String supervisorStr = "{\n" + + " \"type\" : \"derivativeDataSource\",\n" + + " \"baseDataSource\": \"wikiticker\",\n" + + " \"dimensionsSpec\":{\n" + + " \"dimensions\" : [\n" + + " \"isUnpatrolled\",\n" + + " \"metroCode\",\n" + + " \"namespace\",\n" + + " \"page\",\n" + + " \"regionIsoCode\",\n" + + " \"regionName\",\n" + + " \"user\"\n" + + " ]\n" + + " },\n" + + " \"metricsSpec\" : [\n" + + " {\n" + + " \"name\" : \"count\",\n" + + " \"type\" : \"count\"\n" + + " },\n" + + " {\n" + + " \"name\" : \"added\",\n" + + " \"type\" : \"longSum\",\n" + + " \"fieldName\" : \"added\"\n" + + " }\n" + + " ],\n" + + " \"tuningConfig\": {\n" + + " \"type\" : \"hadoop\"\n" + + " }\n" + + "}"; + + MaterializedViewSupervisorSpec spec = objectMapper.readValue(supervisorStr, MaterializedViewSupervisorSpec.class); + Assert.assertFalse(spec.isSuspended()); + + String suspendedSerialized = objectMapper.writeValueAsString(spec.createSuspendedSpec()); + MaterializedViewSupervisorSpec suspendedSpec = objectMapper.readValue(suspendedSerialized, MaterializedViewSupervisorSpec.class); + Assert.assertTrue(suspendedSpec.isSuspended()); + + String runningSerialized = objectMapper.writeValueAsString(spec.createRunningSpec()); + MaterializedViewSupervisorSpec runningSpec = objectMapper.readValue(runningSerialized, MaterializedViewSupervisorSpec.class); + Assert.assertFalse(runningSpec.isSuspended()); + } + @Test public void testEmptyBaseDataSource() throws Exception { @@ -182,6 +229,7 @@ public void testEmptyBaseDataSource() throws Exception null, null, null, + false, objectMapper, null, null, @@ -226,6 +274,7 @@ public void testNullBaseDataSource() throws Exception null, null, null, + false, objectMapper, null, null, diff --git a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java index a48f76fa9a4..4ece1f0c926 100644 --- a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java +++ b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java @@ -47,6 +47,9 @@ import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; import static org.easymock.EasyMock.expect; + +import org.easymock.EasyMock; +import org.easymock.IAnswer; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Before; @@ -109,6 +112,7 @@ public void setUp() throws IOException null, null, null, + false, objectMapper, taskMaster, taskStorage, @@ -121,9 +125,9 @@ public void setUp() throws IOException ); supervisor = (MaterializedViewSupervisor) spec.createSupervisor(); } - + @Test - public void testCheckSegments() throws IOException + public void testCheckSegments() throws IOException { Set<DataSegment> baseSegments = Sets.newHashSet( new DataSegment( @@ -156,7 +160,7 @@ public void testCheckSegments() throws IOException Pair<SortedMap<Interval, String>, Map<Interval, List<DataSegment>>> toBuildInterval = supervisor.checkSegments(); Map<Interval, List<DataSegment>> expectedSegments = Maps.newHashMap(); expectedSegments.put( - Intervals.of("2015-01-01T00Z/2015-01-02T00Z"), + Intervals.of("2015-01-01T00Z/2015-01-02T00Z"), Collections.singletonList( new DataSegment( "base", @@ -173,4 +177,43 @@ public void testCheckSegments() throws IOException ); Assert.assertEquals(expectedSegments, toBuildInterval.rhs); } + + + @Test + public void testSuspendedDoesntRun() throws IOException + { + MaterializedViewSupervisorSpec suspended = new MaterializedViewSupervisorSpec( + "base", + new DimensionsSpec(Collections.singletonList(new StringDimensionSchema("dim")), null, null), + new AggregatorFactory[]{new LongSumAggregatorFactory("m1", "m1")}, + HadoopTuningConfig.makeDefaultTuningConfig(), + null, + null, + null, + null, + null, + true, + objectMapper, + taskMaster, + taskStorage, + metadataSupervisorManager, + sqlMetadataSegmentManager, + indexerMetadataStorageCoordinator, + new MaterializedViewTaskConfig(), + createMock(AuthorizerMapper.class), + createMock(ChatHandlerProvider.class) + ); + MaterializedViewSupervisor supervisor = (MaterializedViewSupervisor) suspended.createSupervisor(); + + // mock IndexerSQLMetadataStorageCoordinator to ensure that getDataSourceMetadata is not called + // which will be true if truly suspended, since this is the first operation of the 'run' method otherwise + IndexerSQLMetadataStorageCoordinator mock = createMock(IndexerSQLMetadataStorageCoordinator.class); + expect(mock.getDataSourceMetadata(suspended.getDataSourceName())).andAnswer((IAnswer) () -> { + Assert.fail(); + return null; + }).anyTimes(); + + EasyMock.replay(mock); + supervisor.run(); + } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 78652a70b91..5087eef33c2 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -309,14 +309,8 @@ public TaskLocation getTaskLocation(final String id) Optional<TaskRunner> taskRunner = taskMaster.getTaskRunner(); if (taskRunner.isPresent()) { Optional<? extends TaskRunnerWorkItem> item = Iterables.tryFind( - taskRunner.get().getRunningTasks(), new Predicate<TaskRunnerWorkItem>() - { - @Override - public boolean apply(TaskRunnerWorkItem taskRunnerWorkItem) - { - return id.equals(taskRunnerWorkItem.getTaskId()); - } - } + taskRunner.get().getRunningTasks(), + (Predicate<TaskRunnerWorkItem>) taskRunnerWorkItem -> id.equals(taskRunnerWorkItem.getTaskId()) ); if (item.isPresent()) { @@ -372,29 +366,24 @@ public void start() consumer = getKafkaConsumer(); exec.submit( - new Runnable() - { - @Override - public void run() - { - try { - while (!Thread.currentThread().isInterrupted()) { - final Notice notice = notices.take(); - - try { - notice.handle(); - } - catch (Throwable e) { - log.makeAlert(e, "KafkaSupervisor[%s] failed to handle notice", dataSource) - .addData("noticeClass", notice.getClass().getSimpleName()) - .emit(); - } + () -> { + try { + while (!Thread.currentThread().isInterrupted()) { + final Notice notice = notices.take(); + + try { + notice.handle(); + } + catch (Throwable e) { + log.makeAlert(e, "KafkaSupervisor[%s] failed to handle notice", dataSource) + .addData("noticeClass", notice.getClass().getSimpleName()) + .emit(); } - } - catch (InterruptedException e) { - log.info("KafkaSupervisor[%s] interrupted, exiting", dataSource); } } + catch (InterruptedException e) { + log.info("KafkaSupervisor[%s] interrupted, exiting", dataSource); + } } ); firstRunTime = DateTimes.nowUtc().plus(ioConfig.getStartDelay()); @@ -898,7 +887,16 @@ void runInternal() throws ExecutionException, InterruptedException, TimeoutExcep checkTaskDuration(); checkPendingCompletionTasks(); checkCurrentTaskState(); - createNewTasks(); + + // if supervisor is not suspended, ensure required tasks are running + // if suspended, ensure tasks have been requested to gracefully stop + if (!spec.isSuspended()) { + log.info("[%s] supervisor is running.", dataSource); + createNewTasks(); + } else { + log.info("[%s] supervisor is suspended.", dataSource); + gracefulShutdownInternal(); + } if (log.isDebugEnabled()) { log.debug(generateReport(true).toString()); @@ -2096,7 +2094,8 @@ private boolean isTaskInPendingCompletionGroups(String taskId) includeOffsets ? latestOffsetsFromKafka : null, includeOffsets ? partitionLag : null, includeOffsets ? partitionLag.values().stream().mapToLong(x -> Math.max(x, 0)).sum() : null, - includeOffsets ? offsetsLastUpdated : null + includeOffsets ? offsetsLastUpdated : null, + spec.isSuspended() ); SupervisorReport<KafkaSupervisorReportPayload> report = new SupervisorReport<>( dataSource, diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java index 2a5a4829b5a..d9533a37fb2 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java @@ -42,6 +42,7 @@ private final Map<Integer, Long> minimumLag; private final Long aggregateLag; private final DateTime offsetsLastUpdated; + private final boolean suspended; public KafkaSupervisorReportPayload( String dataSource, @@ -52,7 +53,8 @@ public KafkaSupervisorReportPayload( @Nullable Map<Integer, Long> latestOffsets, @Nullable Map<Integer, Long> minimumLag, @Nullable Long aggregateLag, - @Nullable DateTime offsetsLastUpdated + @Nullable DateTime offsetsLastUpdated, + boolean suspended ) { this.dataSource = dataSource; @@ -66,6 +68,7 @@ public KafkaSupervisorReportPayload( this.minimumLag = minimumLag; this.aggregateLag = aggregateLag; this.offsetsLastUpdated = offsetsLastUpdated; + this.suspended = suspended; } public void addTask(TaskReportData data) @@ -148,6 +151,12 @@ public DateTime getOffsetsLastUpdated() return offsetsLastUpdated; } + @JsonProperty + public boolean getSuspended() + { + return suspended; + } + @Override public String toString() { @@ -163,6 +172,7 @@ public String toString() (minimumLag != null ? ", minimumLag=" + minimumLag : "") + (aggregateLag != null ? ", aggregateLag=" + aggregateLag : "") + (offsetsLastUpdated != null ? ", offsetsLastUpdated=" + offsetsLastUpdated : "") + + ", suspended=" + suspended + '}'; } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java index fc620ea61ac..28dab02c56e 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java @@ -55,6 +55,7 @@ private final ServiceEmitter emitter; private final DruidMonitorSchedulerConfig monitorSchedulerConfig; private final RowIngestionMetersFactory rowIngestionMetersFactory; + private final boolean suspended; @JsonCreator public KafkaSupervisorSpec( @@ -62,6 +63,7 @@ public KafkaSupervisorSpec( @JsonProperty("tuningConfig") KafkaSupervisorTuningConfig tuningConfig, @JsonProperty("ioConfig") KafkaSupervisorIOConfig ioConfig, @JsonProperty("context") Map<String, Object> context, + @JsonProperty("suspended") Boolean suspended, @JacksonInject TaskStorage taskStorage, @JacksonInject TaskMaster taskMaster, @JacksonInject IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator, @@ -111,6 +113,7 @@ public KafkaSupervisorSpec( this.emitter = emitter; this.monitorSchedulerConfig = monitorSchedulerConfig; this.rowIngestionMetersFactory = rowIngestionMetersFactory; + this.suspended = suspended != null ? suspended : false; } @JsonProperty @@ -137,6 +140,13 @@ public KafkaSupervisorIOConfig getIoConfig() return context; } + @Override + @JsonProperty("suspended") + public boolean isSuspended() + { + return suspended; + } + public ServiceEmitter getEmitter() { return emitter; @@ -182,4 +192,35 @@ public String toString() ", ioConfig=" + ioConfig + '}'; } + + @Override + public KafkaSupervisorSpec createSuspendedSpec() + { + return toggleSuspend(true); + } + + @Override + public KafkaSupervisorSpec createRunningSpec() + { + return toggleSuspend(false); + } + + private KafkaSupervisorSpec toggleSuspend(boolean suspend) + { + return new KafkaSupervisorSpec( + dataSchema, + tuningConfig, + ioConfig, + context, + suspend, + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + kafkaIndexTaskClientFactory, + mapper, + emitter, + monitorSchedulerConfig, + rowIngestionMetersFactory + ); + } } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java new file mode 100644 index 00000000000..b4c6dfd2997 --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.kafka.supervisor; + +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; +import org.apache.druid.indexing.kafka.KafkaIndexTaskClientFactory; +import org.apache.druid.indexing.kafka.KafkaIndexTaskModule; +import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import org.apache.druid.indexing.overlord.TaskMaster; +import org.apache.druid.indexing.overlord.TaskStorage; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable; +import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig; +import org.apache.druid.server.metrics.NoopServiceEmitter; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class KafkaSupervisorSpecTest +{ + private final ObjectMapper mapper; + + public KafkaSupervisorSpecTest() + { + mapper = new DefaultObjectMapper(); + mapper.setInjectableValues( + new InjectableValues.Std() + .addValue(TaskStorage.class, null) + .addValue(TaskMaster.class, null) + .addValue(IndexerMetadataStorageCoordinator.class, null) + .addValue(KafkaIndexTaskClientFactory.class, null) + .addValue(ObjectMapper.class, mapper) + .addValue(ServiceEmitter.class, new NoopServiceEmitter()) + .addValue(DruidMonitorSchedulerConfig.class, null) + .addValue(RowIngestionMetersFactory.class, null) + .addValue(ExprMacroTable.class.getName(), LookupEnabledTestExprMacroTable.INSTANCE) + ); + mapper.registerModules((Iterable<Module>) new KafkaIndexTaskModule().getJacksonModules()); + } + + @Test + public void testSerde() throws IOException + { + String json = "{\n" + + " \"type\": \"kafka\",\n" + + " \"dataSchema\": {\n" + + " \"dataSource\": \"metrics-kafka\",\n" + + " \"parser\": {\n" + + " \"type\": \"string\",\n" + + " \"parseSpec\": {\n" + + " \"format\": \"json\",\n" + + " \"timestampSpec\": {\n" + + " \"column\": \"timestamp\",\n" + + " \"format\": \"auto\"\n" + + " },\n" + + " \"dimensionsSpec\": {\n" + + " \"dimensions\": [],\n" + + " \"dimensionExclusions\": [\n" + + " \"timestamp\",\n" + + " \"value\"\n" + + " ]\n" + + " }\n" + + " }\n" + + " },\n" + + " \"metricsSpec\": [\n" + + " {\n" + + " \"name\": \"count\",\n" + + " \"type\": \"count\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_sum\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleSum\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_min\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleMin\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_max\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleMax\"\n" + + " }\n" + + " ],\n" + + " \"granularitySpec\": {\n" + + " \"type\": \"uniform\",\n" + + " \"segmentGranularity\": \"HOUR\",\n" + + " \"queryGranularity\": \"NONE\"\n" + + " }\n" + + " },\n" + + " \"ioConfig\": {\n" + + " \"topic\": \"metrics\",\n" + + " \"consumerProperties\": {\n" + + " \"bootstrap.servers\": \"localhost:9092\"\n" + + " },\n" + + " \"taskCount\": 1\n" + + " }\n" + + "}"; + KafkaSupervisorSpec spec = mapper.readValue(json, KafkaSupervisorSpec.class); + + Assert.assertNotNull(spec); + Assert.assertNotNull(spec.getDataSchema()); + Assert.assertEquals(4, spec.getDataSchema().getAggregators().length); + Assert.assertNotNull(spec.getIoConfig()); + Assert.assertEquals("metrics", spec.getIoConfig().getTopic()); + Assert.assertNotNull(spec.getTuningConfig()); + Assert.assertNull(spec.getContext()); + Assert.assertFalse(spec.isSuspended()); + String serialized = mapper.writeValueAsString(spec); + + // expect default values populated in reserialized string + Assert.assertTrue(serialized.contains("\"tuningConfig\":{")); + Assert.assertTrue(serialized.contains("\"indexSpec\":{")); + Assert.assertTrue(serialized.contains("\"suspended\":false")); + + KafkaSupervisorSpec spec2 = mapper.readValue(serialized, KafkaSupervisorSpec.class); + + String stable = mapper.writeValueAsString(spec2); + + Assert.assertEquals(serialized, stable); + } + + @Test + public void testSuspendResume() throws IOException + { + String json = "{\n" + + " \"type\": \"kafka\",\n" + + " \"dataSchema\": {\n" + + " \"dataSource\": \"metrics-kafka\",\n" + + " \"parser\": {\n" + + " \"type\": \"string\",\n" + + " \"parseSpec\": {\n" + + " \"format\": \"json\",\n" + + " \"timestampSpec\": {\n" + + " \"column\": \"timestamp\",\n" + + " \"format\": \"auto\"\n" + + " },\n" + + " \"dimensionsSpec\": {\n" + + " \"dimensions\": [],\n" + + " \"dimensionExclusions\": [\n" + + " \"timestamp\",\n" + + " \"value\"\n" + + " ]\n" + + " }\n" + + " }\n" + + " },\n" + + " \"metricsSpec\": [\n" + + " {\n" + + " \"name\": \"count\",\n" + + " \"type\": \"count\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_sum\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleSum\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_min\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleMin\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_max\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleMax\"\n" + + " }\n" + + " ],\n" + + " \"granularitySpec\": {\n" + + " \"type\": \"uniform\",\n" + + " \"segmentGranularity\": \"HOUR\",\n" + + " \"queryGranularity\": \"NONE\"\n" + + " }\n" + + " },\n" + + " \"ioConfig\": {\n" + + " \"topic\": \"metrics\",\n" + + " \"consumerProperties\": {\n" + + " \"bootstrap.servers\": \"localhost:9092\"\n" + + " },\n" + + " \"taskCount\": 1\n" + + " }\n" + + "}"; + KafkaSupervisorSpec spec = mapper.readValue(json, KafkaSupervisorSpec.class); + + Assert.assertNotNull(spec); + Assert.assertNotNull(spec.getDataSchema()); + Assert.assertEquals(4, spec.getDataSchema().getAggregators().length); + Assert.assertNotNull(spec.getIoConfig()); + Assert.assertEquals("metrics", spec.getIoConfig().getTopic()); + Assert.assertNotNull(spec.getTuningConfig()); + Assert.assertNull(spec.getContext()); + Assert.assertFalse(spec.isSuspended()); + + String suspendedSerialized = mapper.writeValueAsString(spec.createSuspendedSpec()); + + // expect default values populated in reserialized string + Assert.assertTrue(suspendedSerialized.contains("\"tuningConfig\":{")); + Assert.assertTrue(suspendedSerialized.contains("\"indexSpec\":{")); + Assert.assertTrue(suspendedSerialized.contains("\"suspended\":true")); + + KafkaSupervisorSpec suspendedSpec = mapper.readValue(suspendedSerialized, KafkaSupervisorSpec.class); + + Assert.assertTrue(suspendedSpec.isSuspended()); + + String runningSerialized = mapper.writeValueAsString(spec.createRunningSpec()); + + KafkaSupervisorSpec runningSpec = mapper.readValue(runningSerialized, KafkaSupervisorSpec.class); + + Assert.assertFalse(runningSpec.isSuspended()); + } +} diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 964f5f7cac6..978de2f6886 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -79,6 +79,7 @@ import org.easymock.CaptureType; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; +import org.easymock.IAnswer; import org.joda.time.DateTime; import org.joda.time.Duration; import org.joda.time.Period; @@ -2293,6 +2294,149 @@ public void testCheckpointWithNullTaskGroupId() verifyAll(); } + @Test + public void testSuspendedNoRunningTasks() throws Exception + { + supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false, true); + addSomeEvents(1); + + expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); + expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( + new KafkaDataSourceMetadata( + null + ) + ).anyTimes(); + // this asserts that taskQueue.add does not in fact get called because supervisor should be suspended + expect(taskQueue.add(anyObject())).andAnswer((IAnswer) () -> { + Assert.fail(); + return null; + }).anyTimes(); + taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); + replayAll(); + + supervisor.start(); + supervisor.runInternal(); + verifyAll(); + } + + @Test + public void testSuspendedRunningTasks() throws Exception + { + // graceful shutdown is expected to be called on running tasks since state is suspended + + final TaskLocation location1 = new TaskLocation("testHost", 1234, -1); + final TaskLocation location2 = new TaskLocation("testHost2", 145, -1); + final DateTime startTime = DateTimes.nowUtc(); + + supervisor = getSupervisor(2, 1, true, "PT1H", null, null, false, true); + addSomeEvents(1); + + Task id1 = createKafkaIndexTask( + "id1", + DATASOURCE, + 0, + new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)), + new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + null, + null + ); + + Task id2 = createKafkaIndexTask( + "id2", + DATASOURCE, + 0, + new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)), + new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + null, + null + ); + + Task id3 = createKafkaIndexTask( + "id3", + DATASOURCE, + 0, + new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)), + new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + null, + null + ); + + Collection workItems = new ArrayList<>(); + workItems.add(new TestTaskRunnerWorkItem(id1, null, location1)); + workItems.add(new TestTaskRunnerWorkItem(id2, null, location2)); + + expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); + expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); + expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); + expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); + expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); + expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); + expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes(); + expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes(); + expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes(); + expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( + new KafkaDataSourceMetadata( + null + ) + ).anyTimes(); + expect(taskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.PUBLISHING)); + expect(taskClient.getStatusAsync("id2")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING)); + expect(taskClient.getStatusAsync("id3")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING)); + expect(taskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime)); + expect(taskClient.getStartTimeAsync("id3")).andReturn(Futures.immediateFuture(startTime)); + expect(taskClient.getEndOffsets("id1")).andReturn(ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)); + + // getCheckpoints will not be called for id1 as it is in publishing state + TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>(); + checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(1); + + taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); + + expect(taskClient.pauseAsync("id2")) + .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 15L, 1, 25L, 2, 30L))); + expect(taskClient.setEndOffsetsAsync("id2", ImmutableMap.of(0, 15L, 1, 25L, 2, 30L), true)) + .andReturn(Futures.immediateFuture(true)); + taskQueue.shutdown("id3"); + expectLastCall().times(2); + + replayAll(); + supervisor.start(); + supervisor.runInternal(); + verifyAll(); + } + + @Test + public void testResetSuspended() throws Exception + { + expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); + expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); + expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); + replayAll(); + + supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false, true); + supervisor.start(); + supervisor.runInternal(); + verifyAll(); + + reset(indexerMetadataStorageCoordinator); + expect(indexerMetadataStorageCoordinator.deleteDataSourceMetadata(DATASOURCE)).andReturn(true); + replay(indexerMetadataStorageCoordinator); + + supervisor.resetInternal(null); + verifyAll(); + } + private void addSomeEvents(int numEventsPerPartition) throws Exception { try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) { @@ -2320,6 +2464,29 @@ private KafkaSupervisor getSupervisor( Period earlyMessageRejectionPeriod, boolean skipOffsetGaps ) + { + return getSupervisor( + replicas, + taskCount, + useEarliestOffset, + duration, + lateMessageRejectionPeriod, + earlyMessageRejectionPeriod, + skipOffsetGaps, + false + ); + } + + private KafkaSupervisor getSupervisor( + int replicas, + int taskCount, + boolean useEarliestOffset, + String duration, + Period lateMessageRejectionPeriod, + Period earlyMessageRejectionPeriod, + boolean skipOffsetGaps, + boolean suspended + ) { KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig( topic, @@ -2368,6 +2535,7 @@ public KafkaIndexTaskClient build( tuningConfig, kafkaSupervisorIOConfig, null, + suspended, taskStorage, taskMaster, indexerMetadataStorageCoordinator, @@ -2476,7 +2644,7 @@ public String getTaskType() } @Override - public String getDataSource() + public String getDataSource() { return dataSource; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index b5c24ef913f..cca6fe51bd8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -91,6 +91,19 @@ public boolean stopAndRemoveSupervisor(String id) } } + public boolean suspendOrResumeSupervisor(String id, boolean suspend) + { + Preconditions.checkState(started, "SupervisorManager not started"); + Pair<Supervisor, SupervisorSpec> pair = supervisors.get(id); + Preconditions.checkNotNull(pair.lhs, "spec"); + synchronized (lock) { + Preconditions.checkState(started, "SupervisorManager not started"); + SupervisorSpec nextState = suspend ? pair.rhs.createSuspendedSpec() : pair.rhs.createRunningSpec(); + possiblyStopAndRemoveSupervisorInternal(nextState.getId(), false); + return createAndStartSupervisorInternal(nextState, true); + } + } + @LifecycleStart public void start() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java index 9be9c9fb3b5..d3e19bbb4ab 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java @@ -44,6 +44,7 @@ import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; @@ -52,6 +53,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; /** * Endpoints for submitting and starting a {@link SupervisorSpec}, getting running supervisors, stopping supervisors, @@ -90,49 +92,55 @@ public SupervisorResource(TaskMaster taskMaster, AuthorizerMapper authorizerMapp public Response specPost(final SupervisorSpec spec, @Context final HttpServletRequest req) { return asLeaderWithSupervisorManager( - new Function<SupervisorManager, Response>() - { - @Override - public Response apply(SupervisorManager manager) - { - Preconditions.checkArgument( - spec.getDataSources() != null && spec.getDataSources().size() > 0, - "No dataSources found to perform authorization checks" - ); - - Access authResult = AuthorizationUtils.authorizeAllResourceActions( - req, - Iterables.transform(spec.getDataSources(), AuthorizationUtils.DATASOURCE_WRITE_RA_GENERATOR), - authorizerMapper - ); + manager -> { + Preconditions.checkArgument( + spec.getDataSources() != null && spec.getDataSources().size() > 0, + "No dataSources found to perform authorization checks" + ); - if (!authResult.isAllowed()) { - throw new ForbiddenException(authResult.toString()); - } + Access authResult = AuthorizationUtils.authorizeAllResourceActions( + req, + Iterables.transform(spec.getDataSources(), AuthorizationUtils.DATASOURCE_WRITE_RA_GENERATOR), + authorizerMapper + ); - manager.createOrUpdateAndStartSupervisor(spec); - return Response.ok(ImmutableMap.of("id", spec.getId())).build(); + if (!authResult.isAllowed()) { + throw new ForbiddenException(authResult.toString()); } + + manager.createOrUpdateAndStartSupervisor(spec); + return Response.ok(ImmutableMap.of("id", spec.getId())).build(); } ); } @GET @Produces(MediaType.APPLICATION_JSON) - public Response specGetAll(@Context final HttpServletRequest req) + public Response specGetAll( + @QueryParam("full") String full, + @Context final HttpServletRequest req + ) { return asLeaderWithSupervisorManager( - new Function<SupervisorManager, Response>() - { - @Override - public Response apply(final SupervisorManager manager) - { - Set<String> authorizedSupervisorIds = filterAuthorizedSupervisorIds( - req, - manager, - manager.getSupervisorIds() - ); + manager -> { + Set<String> authorizedSupervisorIds = filterAuthorizedSupervisorIds( + req, + manager, + manager.getSupervisorIds() + ); + + if (full == null) { return Response.ok(authorizedSupervisorIds).build(); + } else { + List<Map<String, ?>> all = + authorizedSupervisorIds.stream() + .map(x -> ImmutableMap.<String, Object>builder() + .put("id", x) + .put("spec", manager.getSupervisorSpec(x).get()) + .build() + ) + .collect(Collectors.toList()); + return Response.ok(all).build(); } } ); @@ -145,20 +153,15 @@ public Response apply(final SupervisorManager manager) public Response specGet(@PathParam("id") final String id) { return asLeaderWithSupervisorManager( - new Function<SupervisorManager, Response>() - { - @Override - public Response apply(SupervisorManager manager) - { - Optional<SupervisorSpec> spec = manager.getSupervisorSpec(id); - if (!spec.isPresent()) { - return Response.status(Response.Status.NOT_FOUND) - .entity(ImmutableMap.of("error", StringUtils.format("[%s] does not exist", id))) - .build(); - } - - return Response.ok(spec.get()).build(); + manager -> { + Optional<SupervisorSpec> spec = manager.getSupervisorSpec(id); + if (!spec.isPresent()) { + return Response.status(Response.Status.NOT_FOUND) + .entity(ImmutableMap.of("error", StringUtils.format("[%s] does not exist", id))) + .build(); } + + return Response.ok(spec.get()).build(); } ); } @@ -170,20 +173,15 @@ public Response apply(SupervisorManager manager) public Response specGetStatus(@PathParam("id") final String id) { return asLeaderWithSupervisorManager( - new Function<SupervisorManager, Response>() - { - @Override - public Response apply(SupervisorManager manager) - { - Optional<SupervisorReport> spec = manager.getSupervisorStatus(id); - if (!spec.isPresent()) { - return Response.status(Response.Status.NOT_FOUND) - .entity(ImmutableMap.of("error", StringUtils.format("[%s] does not exist", id))) - .build(); - } - - return Response.ok(spec.get()).build(); + manager -> { + Optional<SupervisorReport> spec = manager.getSupervisorStatus(id); + if (!spec.isPresent()) { + return Response.status(Response.Status.NOT_FOUND) + .entity(ImmutableMap.of("error", StringUtils.format("[%s] does not exist", id))) + .build(); } + + return Response.ok(spec.get()).build(); } ); } @@ -197,49 +195,66 @@ public Response getAllTaskStats( ) { return asLeaderWithSupervisorManager( - new Function<SupervisorManager, Response>() - { - @Override - public Response apply(SupervisorManager manager) - { - Optional<Map<String, Map<String, Object>>> stats = manager.getSupervisorStats(id); - if (!stats.isPresent()) { - return Response.status(Response.Status.NOT_FOUND) - .entity( - ImmutableMap.of( - "error", - StringUtils.format("[%s] does not exist", id) - ) - ) - .build(); - } - - return Response.ok(stats.get()).build(); + manager -> { + Optional<Map<String, Map<String, Object>>> stats = manager.getSupervisorStats(id); + if (!stats.isPresent()) { + return Response.status(Response.Status.NOT_FOUND) + .entity( + ImmutableMap.of( + "error", + StringUtils.format("[%s] does not exist", id) + ) + ) + .build(); } + + return Response.ok(stats.get()).build(); } ); } + @POST + @Path("/{id}/resume") + @Produces(MediaType.APPLICATION_JSON) + @ResourceFilters(SupervisorResourceFilter.class) + public Response specResume(@PathParam("id") final String id) + { + return specSuspendOrResume(id, false); + } + + @POST + @Path("/{id}/suspend") + @Produces(MediaType.APPLICATION_JSON) + @ResourceFilters(SupervisorResourceFilter.class) + public Response specSuspend(@PathParam("id") final String id) + { + return specSuspendOrResume(id, true); + } + @Deprecated @POST @Path("/{id}/shutdown") @Produces(MediaType.APPLICATION_JSON) @ResourceFilters(SupervisorResourceFilter.class) public Response shutdown(@PathParam("id") final String id) + { + return terminate(id); + } + + @POST + @Path("/{id}/terminate") + @Produces(MediaType.APPLICATION_JSON) + @ResourceFilters(SupervisorResourceFilter.class) + public Response terminate(@PathParam("id") final String id) { return asLeaderWithSupervisorManager( - new Function<SupervisorManager, Response>() - { - @Override - public Response apply(SupervisorManager manager) - { - if (manager.stopAndRemoveSupervisor(id)) { - return Response.ok(ImmutableMap.of("id", id)).build(); - } else { - return Response.status(Response.Status.NOT_FOUND) - .entity(ImmutableMap.of("error", StringUtils.format("[%s] does not exist", id))) - .build(); - } + manager -> { + if (manager.stopAndRemoveSupervisor(id)) { + return Response.ok(ImmutableMap.of("id", id)).build(); + } else { + return Response.status(Response.Status.NOT_FOUND) + .entity(ImmutableMap.of("error", StringUtils.format("[%s] does not exist", id))) + .build(); } } ); @@ -251,21 +266,14 @@ public Response apply(SupervisorManager manager) public Response specGetAllHistory(@Context final HttpServletRequest req) { return asLeaderWithSupervisorManager( - new Function<SupervisorManager, Response>() - { - @Override - public Response apply(final SupervisorManager manager) - { - return Response.ok( - AuthorizationUtils.filterAuthorizedResources( - req, - manager.getSupervisorHistory(), - SPEC_DATASOURCE_READ_RA_GENERATOR, - authorizerMapper - ) - ).build(); - } - } + manager -> Response.ok( + AuthorizationUtils.filterAuthorizedResources( + req, + manager.getSupervisorHistory(), + SPEC_DATASOURCE_READ_RA_GENERATOR, + authorizerMapper + ) + ).build() ); } @@ -277,38 +285,33 @@ public Response specGetHistory( @PathParam("id") final String id) { return asLeaderWithSupervisorManager( - new Function<SupervisorManager, Response>() - { - @Override - public Response apply(SupervisorManager manager) - { - Map<String, List<VersionedSupervisorSpec>> supervisorHistory = manager.getSupervisorHistory(); - Iterable<VersionedSupervisorSpec> historyForId = supervisorHistory.get(id); - if (historyForId != null) { - final List<VersionedSupervisorSpec> authorizedHistoryForId = - Lists.newArrayList( - AuthorizationUtils.filterAuthorizedResources( - req, - historyForId, - SPEC_DATASOURCE_READ_RA_GENERATOR, - authorizerMapper - ) - ); - if (authorizedHistoryForId.size() > 0) { - return Response.ok(authorizedHistoryForId).build(); - } + manager -> { + Map<String, List<VersionedSupervisorSpec>> supervisorHistory = manager.getSupervisorHistory(); + Iterable<VersionedSupervisorSpec> historyForId = supervisorHistory.get(id); + if (historyForId != null) { + final List<VersionedSupervisorSpec> authorizedHistoryForId = + Lists.newArrayList( + AuthorizationUtils.filterAuthorizedResources( + req, + historyForId, + SPEC_DATASOURCE_READ_RA_GENERATOR, + authorizerMapper + ) + ); + if (authorizedHistoryForId.size() > 0) { + return Response.ok(authorizedHistoryForId).build(); } + } - return Response.status(Response.Status.NOT_FOUND) - .entity( - ImmutableMap.of( - "error", - StringUtils.format("No history for [%s].", id) - ) - ) - .build(); + return Response.status(Response.Status.NOT_FOUND) + .entity( + ImmutableMap.of( + "error", + StringUtils.format("No history for [%s].", id) + ) + ) + .build(); - } } ); } @@ -320,18 +323,13 @@ public Response apply(SupervisorManager manager) public Response reset(@PathParam("id") final String id) { return asLeaderWithSupervisorManager( - new Function<SupervisorManager, Response>() - { - @Override - public Response apply(SupervisorManager manager) - { - if (manager.resetSupervisor(id, null)) { - return Response.ok(ImmutableMap.of("id", id)).build(); - } else { - return Response.status(Response.Status.NOT_FOUND) - .entity(ImmutableMap.of("error", StringUtils.format("[%s] does not exist", id))) - .build(); - } + manager -> { + if (manager.resetSupervisor(id, null)) { + return Response.ok(ImmutableMap.of("id", id)).build(); + } else { + return Response.status(Response.Status.NOT_FOUND) + .entity(ImmutableMap.of("error", StringUtils.format("[%s] does not exist", id))) + .build(); } } ); @@ -375,4 +373,29 @@ private Response asLeaderWithSupervisorManager(Function<SupervisorManager, Respo ) ); } + + private Response specSuspendOrResume(final String id, boolean suspend) + { + return asLeaderWithSupervisorManager( + manager -> { + Optional<SupervisorSpec> spec = manager.getSupervisorSpec(id); + if (!spec.isPresent()) { + return Response.status(Response.Status.NOT_FOUND) + .entity(ImmutableMap.of("error", StringUtils.format("[%s] does not exist", id))) + .build(); + } + + if (spec.get().isSuspended() == suspend) { + final String errMsg = + StringUtils.format("[%s] is already %s", id, suspend ? "suspended" : "running"); + return Response.status(Response.Status.BAD_REQUEST) + .entity(ImmutableMap.of("error", errMsg)) + .build(); + } + manager.suspendOrResumeSupervisor(id, suspend); + spec = manager.getSupervisorSpec(id); + return Response.ok(spec.get()).build(); + } + ); + } } diff --git a/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js b/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js index 5ec87c588fa..a3bf8e6a3f2 100644 --- a/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js +++ b/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js @@ -6,7 +6,7 @@ var killTask = function(taskId) { if(confirm('Do you really want to kill: '+taskId)) { $.ajax({ type:'POST', - url: '/druid/indexer/v1/task/'+ taskId +'/shutdown', + url: '/druid/indexer/v1/task/'+ taskId +'/terminate', data: '' }).done(function(data) { setTimeout(function() { location.reload(true) }, 750); @@ -16,6 +16,42 @@ var killTask = function(taskId) { } } + +var suspendSupervisor = function(supervisorId) { + if(confirm('Do you really want to suspend: '+ supervisorId)) { + $.ajax({ + type:'POST', + url: '/druid/indexer/v1/supervisor/' + supervisorId + '/suspend', + data: '' + }).done(function(data) { + setTimeout(function() { location.reload(true) }, 750); + }).fail(function(data) { + var errMsg = data && data.responseJSON && data.responseJSON.error ? + data.responseJSON.error : + 'suspend request failed, please check overlord logs for details.'; + alert(errMsg); + }) + } +} + + +var resumeSupervisor = function(supervisorId) { + if(confirm('Do you really want to resume: '+ supervisorId)) { + $.ajax({ + type:'POST', + url: '/druid/indexer/v1/supervisor/' + supervisorId + '/resume', + data: '' + }).done(function(data) { + setTimeout(function() { location.reload(true) }, 750); + }).fail(function(data) { + var errMsg = data && data.responseJSON && data.responseJSON.error ? + data.responseJSON.error : + 'resume request failed, please check overlord logs for details.'; + alert(errMsg); + }) + } +} + var resetSupervisor = function(supervisorId) { if(confirm('Do you really want to reset: '+ supervisorId)) { $.ajax({ @@ -31,7 +67,7 @@ var resetSupervisor = function(supervisorId) { } var shutdownSupervisor = function(supervisorId) { - if(confirm('Do you really want to shutdown: '+ supervisorId)) { + if(confirm('Do you really want to terminate: '+ supervisorId)) { $.ajax({ type:'POST', url: '/druid/indexer/v1/supervisor/' + supervisorId + '/shutdown', @@ -39,7 +75,7 @@ var shutdownSupervisor = function(supervisorId) { }).done(function(data) { setTimeout(function() { location.reload(true) }, 750); }).fail(function(data) { - alert('Shutdown request failed, please check overlord logs for details.'); + alert('Terminate request failed, please check overlord logs for details.'); }) } } @@ -59,18 +95,28 @@ $(document).ready(function() { } } - $.get('/druid/indexer/v1/supervisor', function(dataList) { + $.get('/druid/indexer/v1/supervisor?full', function(dataList) { + var data = [] for (i = 0 ; i < dataList.length ; i++) { - var supervisorId = encodeURIComponent(dataList[i]) + var supervisorId = encodeURIComponent(dataList[i].id) + var supervisorSpec = dataList[i].spec; + var statusText = supervisorSpec && supervisorSpec.suspended ? + '<span style="color:#FF6000">suspended</span>' : + '<span style="color:#08B157">running</span>'; data[i] = { "dataSource" : supervisorId, "more" : '<a href="/druid/indexer/v1/supervisor/' + supervisorId + '">payload</a>' + '<a href="/druid/indexer/v1/supervisor/' + supervisorId + '/status">status</a>' + '<a href="/druid/indexer/v1/supervisor/' + supervisorId + '/history">history</a>' + + (supervisorSpec.suspended ? + '<a style="padding-right:5px;" onclick="resumeSupervisor(\'' + supervisorId + '\');">resume</a>' : + '<a onclick="suspendSupervisor(\'' + supervisorId + '\');">suspend</a>' + ) + '<a onclick="resetSupervisor(\'' + supervisorId + '\');">reset</a>' + - '<a onclick="shutdownSupervisor(\'' + supervisorId + '\');">shutdown</a>' + '<a onclick="shutdownSupervisor(\'' + supervisorId + '\');">terminate</a>', + "status": statusText } } buildTable((data), $('#supervisorsTable')); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java index abdd60a9dcd..0a977093f50 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java @@ -131,6 +131,24 @@ public Supervisor createSupervisor() { return ImmutableList.of("test"); } + + @Override + public SupervisorSpec createSuspendedSpec() + { + return null; + } + + @Override + public SupervisorSpec createRunningSpec() + { + return null; + } + + @Override + public boolean isSuspended() + { + return false; + } }; EasyMock.expect(supervisorManager.getSupervisorSpec(EasyMock.anyString())) .andReturn(Optional.of(supervisorSpec)) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java index f91dc1adfcb..85cfd95c726 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java @@ -24,6 +24,7 @@ import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.metadata.MetadataSupervisorManager; +import org.easymock.Capture; import org.easymock.EasyMock; import org.easymock.EasyMockRunner; import org.easymock.EasyMockSupport; @@ -40,6 +41,7 @@ import java.util.Map; import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.capture; import static org.easymock.EasyMock.eq; @RunWith(EasyMockRunner.class) @@ -263,15 +265,114 @@ public void testResetSupervisor() verifyAll(); } + @Test + public void testCreateSuspendResumeAndStopSupervisor() + { + Capture<TestSupervisorSpec> capturedInsert = Capture.newInstance(); + SupervisorSpec spec = new TestSupervisorSpec("id1", supervisor1, false, supervisor2); + Map<String, SupervisorSpec> existingSpecs = ImmutableMap.of( + "id3", new TestSupervisorSpec("id3", supervisor3) + ); + + // mock adding a supervisor to manager with existing supervisor then suspending it + Assert.assertTrue(manager.getSupervisorIds().isEmpty()); + + EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs); + metadataSupervisorManager.insert("id1", spec); + supervisor3.start(); + supervisor1.start(); + replayAll(); + + manager.start(); + Assert.assertEquals(1, manager.getSupervisorIds().size()); + + manager.createOrUpdateAndStartSupervisor(spec); + Assert.assertEquals(2, manager.getSupervisorIds().size()); + Assert.assertEquals(spec, manager.getSupervisorSpec("id1").get()); + verifyAll(); + + // mock suspend, which stops supervisor1 and sets suspended state in metadata, flipping to supervisor2 + // in TestSupervisorSpec implementation of createSuspendedSpec + resetAll(); + metadataSupervisorManager.insert(eq("id1"), capture(capturedInsert)); + supervisor2.start(); + supervisor1.stop(true); + replayAll(); + + manager.suspendOrResumeSupervisor("id1", true); + Assert.assertEquals(2, manager.getSupervisorIds().size()); + Assert.assertEquals(capturedInsert.getValue(), manager.getSupervisorSpec("id1").get()); + Assert.assertTrue(capturedInsert.getValue().suspended); + verifyAll(); + + // mock resume, which stops supervisor2 and sets suspended to false in metadata, flipping to supervisor1 + // in TestSupervisorSpec implementation of createRunningSpec + resetAll(); + metadataSupervisorManager.insert(eq("id1"), capture(capturedInsert)); + supervisor2.stop(true); + supervisor1.start(); + replayAll(); + + manager.suspendOrResumeSupervisor("id1", false); + Assert.assertEquals(2, manager.getSupervisorIds().size()); + Assert.assertEquals(capturedInsert.getValue(), manager.getSupervisorSpec("id1").get()); + Assert.assertFalse(capturedInsert.getValue().suspended); + verifyAll(); + + // mock stop of suspended then resumed supervisor + resetAll(); + metadataSupervisorManager.insert(eq("id1"), anyObject(NoopSupervisorSpec.class)); + supervisor1.stop(true); + replayAll(); + + boolean retVal = manager.stopAndRemoveSupervisor("id1"); + Assert.assertTrue(retVal); + Assert.assertEquals(1, manager.getSupervisorIds().size()); + Assert.assertEquals(Optional.absent(), manager.getSupervisorSpec("id1")); + verifyAll(); + + // mock manager shutdown to ensure supervisor 3 stops + resetAll(); + supervisor3.stop(false); + replayAll(); + + manager.stop(); + verifyAll(); + + Assert.assertTrue(manager.getSupervisorIds().isEmpty()); + } + + private static class TestSupervisorSpec implements SupervisorSpec { private final String id; private final Supervisor supervisor; + private final boolean suspended; + private final Supervisor suspendedSupervisor; + public TestSupervisorSpec(String id, Supervisor supervisor) + { + this(id, supervisor, false, null); + } + + public TestSupervisorSpec(String id, Supervisor supervisor, boolean suspended, Supervisor suspendedSupervisor) { this.id = id; this.supervisor = supervisor; + this.suspended = suspended; + this.suspendedSupervisor = suspendedSupervisor; + } + @Override + public SupervisorSpec createSuspendedSpec() + { + return new TestSupervisorSpec(id, suspendedSupervisor, true, supervisor); + } + + @Override + public SupervisorSpec createRunningSpec() + { + return new TestSupervisorSpec(id, suspendedSupervisor, false, supervisor); } @Override @@ -286,11 +387,16 @@ public Supervisor createSupervisor() return supervisor; } + @Override + public boolean isSuspended() + { + return suspended; + } + @Override public List<String> getDataSources() { return new ArrayList<>(); } - } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java index a3434bc4518..9d6eab33e1e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java @@ -29,12 +29,10 @@ import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.server.security.Access; -import org.apache.druid.server.security.Action; import org.apache.druid.server.security.AuthConfig; import org.apache.druid.server.security.AuthenticationResult; import org.apache.druid.server.security.Authorizer; import org.apache.druid.server.security.AuthorizerMapper; -import org.apache.druid.server.security.Resource; import org.easymock.Capture; import org.easymock.EasyMock; import org.easymock.EasyMockRunner; @@ -75,21 +73,14 @@ public void setUp() @Override public Authorizer getAuthorizer(String name) { - return new Authorizer() - { - @Override - public Access authorize( - AuthenticationResult authenticationResult, Resource resource, Action action - ) - { - if (authenticationResult.getIdentity().equals("druid")) { - return Access.OK; + return (authenticationResult, resource, action) -> { + if (authenticationResult.getIdentity().equals("druid")) { + return Access.OK; + } else { + if (resource.getName().equals("datasource2")) { + return new Access(false, "not authorized."); } else { - if (resource.getName().equals("datasource2")) { - return new Access(false, "not authorized."); - } else { - return Access.OK; - } + return Access.OK; } } }; @@ -171,7 +162,7 @@ public void testSpecGetAll() EasyMock.expectLastCall().anyTimes(); replayAll(); - Response response = supervisorResource.specGetAll(request); + Response response = supervisorResource.specGetAll(null, request); verifyAll(); Assert.assertEquals(200, response.getStatus()); @@ -181,12 +172,61 @@ public void testSpecGetAll() EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()); replayAll(); - response = supervisorResource.specGetAll(request); + response = supervisorResource.specGetAll(null, request); verifyAll(); Assert.assertEquals(503, response.getStatus()); } + @Test + public void testSpecGetAllFull() + { + Set<String> supervisorIds = ImmutableSet.of("id1", "id2"); + + SupervisorSpec spec1 = new TestSupervisorSpec("id1", null, null) { + + @Override + public List<String> getDataSources() + { + return Collections.singletonList("datasource1"); + } + }; + SupervisorSpec spec2 = new TestSupervisorSpec("id2", null, null) { + + @Override + public List<String> getDataSources() + { + return Collections.singletonList("datasource2"); + } + }; + + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); + EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(supervisorIds).atLeastOnce(); + EasyMock.expect(supervisorManager.getSupervisorSpec("id1")).andReturn(Optional.of(spec1)).times(2); + EasyMock.expect(supervisorManager.getSupervisorSpec("id2")).andReturn(Optional.of(spec2)).times(2); + EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).atLeastOnce(); + EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce(); + EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn( + new AuthenticationResult("druid", "druid", null, null) + ).atLeastOnce(); + request.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true); + EasyMock.expectLastCall().anyTimes(); + replayAll(); + + Response response = supervisorResource.specGetAll("", request); + verifyAll(); + + Assert.assertEquals(200, response.getStatus()); + List<Map<String, Object>> specs = (List<Map<String, Object>>) response.getEntity(); + Assert.assertTrue( + specs.stream() + .allMatch(spec -> + ("id1".equals(spec.get("id")) && spec1.equals(spec.get("spec"))) || + ("id2".equals(spec.get("id")) && spec2.equals(spec.get("spec"))) + ) + ); + } + @Test public void testSpecGet() { @@ -249,6 +289,101 @@ public void testSpecGetStatus() Assert.assertEquals(503, response.getStatus()); } + @Test + public void testSpecSuspend() + { + + TestSupervisorSpec running = new TestSupervisorSpec("my-id", null, null, false) { + @Override + public List<String> getDataSources() + { + return Collections.singletonList("datasource1"); + } + }; + TestSupervisorSpec suspended = new TestSupervisorSpec("my-id", null, null, true) { + @Override + public List<String> getDataSources() + { + return Collections.singletonList("datasource1"); + } + }; + + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); + EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")) + .andReturn(Optional.of(running)).times(1) + .andReturn(Optional.of(suspended)).times(1); + EasyMock.expect(supervisorManager.suspendOrResumeSupervisor("my-id", true)).andReturn(true); + EasyMock.expectLastCall().anyTimes(); + replayAll(); + + Response response = supervisorResource.specSuspend("my-id"); + verifyAll(); + + Assert.assertEquals(200, response.getStatus()); + TestSupervisorSpec responseSpec = (TestSupervisorSpec) response.getEntity(); + Assert.assertEquals(suspended.id, responseSpec.id); + Assert.assertEquals(suspended.suspended, responseSpec.suspended); + resetAll(); + + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); + EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(suspended)).atLeastOnce(); + replayAll(); + + response = supervisorResource.specSuspend("my-id"); + verifyAll(); + + Assert.assertEquals(400, response.getStatus()); + Assert.assertEquals(ImmutableMap.of("error", "[my-id] is already suspended"), response.getEntity()); + } + + + + @Test + public void testSpecResume() + { + TestSupervisorSpec suspended = new TestSupervisorSpec("my-id", null, null, true) { + @Override + public List<String> getDataSources() + { + return Collections.singletonList("datasource1"); + } + }; + TestSupervisorSpec running = new TestSupervisorSpec("my-id", null, null, false) { + @Override + public List<String> getDataSources() + { + return Collections.singletonList("datasource1"); + } + }; + + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); + EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")) + .andReturn(Optional.of(suspended)).times(1) + .andReturn(Optional.of(running)).times(1); + EasyMock.expect(supervisorManager.suspendOrResumeSupervisor("my-id", false)).andReturn(true); + EasyMock.expectLastCall().anyTimes(); + replayAll(); + + Response response = supervisorResource.specResume("my-id"); + verifyAll(); + + Assert.assertEquals(200, response.getStatus()); + TestSupervisorSpec responseSpec = (TestSupervisorSpec) response.getEntity(); + Assert.assertEquals(running.id, responseSpec.id); + Assert.assertEquals(running.suspended, responseSpec.suspended); + resetAll(); + + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); + EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(running)).atLeastOnce(); + replayAll(); + + response = supervisorResource.specResume("my-id"); + verifyAll(); + + Assert.assertEquals(400, response.getStatus()); + Assert.assertEquals(ImmutableMap.of("error", "[my-id] is already running"), response.getEntity()); + } + @Test public void testShutdown() { @@ -762,9 +897,10 @@ public void testNoopSupervisorSpecSerde() throws Exception private static class TestSupervisorSpec implements SupervisorSpec { - private final String id; - private final Supervisor supervisor; - private final List<String> datasources; + protected final String id; + protected final Supervisor supervisor; + protected final List<String> datasources; + boolean suspended; public TestSupervisorSpec(String id, Supervisor supervisor, List<String> datasources) { @@ -773,6 +909,12 @@ public TestSupervisorSpec(String id, Supervisor supervisor, List<String> datasou this.datasources = datasources; } + public TestSupervisorSpec(String id, Supervisor supervisor, List<String> datasources, boolean suspended) + { + this(id, supervisor, datasources); + this.suspended = suspended; + } + @Override public String getId() { @@ -791,6 +933,25 @@ public Supervisor createSupervisor() return datasources; } + + @Override + public SupervisorSpec createSuspendedSpec() + { + return new TestSupervisorSpec(id, supervisor, datasources, true); + } + + @Override + public SupervisorSpec createRunningSpec() + { + return new TestSupervisorSpec(id, supervisor, datasources, false); + } + + @Override + public boolean isSuspended() + { + return suspended; + } + @Override public boolean equals(Object o) { @@ -809,7 +970,10 @@ public boolean equals(Object o) if (supervisor != null ? !supervisor.equals(that.supervisor) : that.supervisor != null) { return false; } - return datasources != null ? datasources.equals(that.datasources) : that.datasources == null; + if (datasources != null ? !datasources.equals(that.datasources) : that.datasources != null) { + return false; + } + return isSuspended() == that.isSuspended(); } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java index c32ea7af0b2..6935a62f545 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; import org.apache.druid.indexing.overlord.DataSourceMetadata; import javax.annotation.Nullable; @@ -44,14 +45,28 @@ @JsonProperty("id") private String id; + @JsonProperty("suspended") + private boolean suspended; //ignored + + @VisibleForTesting + public NoopSupervisorSpec( + String id, + List<String> datasources + ) + { + this(id, datasources, null); + } + @JsonCreator public NoopSupervisorSpec( @Nullable @JsonProperty("id") String id, - @Nullable @JsonProperty("dataSources") List<String> datasources + @Nullable @JsonProperty("dataSources") List<String> datasources, + @Nullable @JsonProperty("suspended") Boolean suspended ) { this.id = id; this.datasources = datasources == null ? new ArrayList<>() : datasources; + this.suspended = false; // ignore } @Override @@ -61,6 +76,22 @@ public String getId() return id; } + + @Override + @Nullable + @JsonProperty("dataSources") + public List<String> getDataSources() + { + return datasources; + } + + @Override + @JsonProperty("suspended") + public boolean isSuspended() + { + return suspended; + } + @Override public Supervisor createSupervisor() { @@ -95,11 +126,15 @@ public void checkpoint( } @Override - @Nullable - @JsonProperty("dataSources") - public List<String> getDataSources() + public SupervisorSpec createRunningSpec() { - return datasources; + return new NoopSupervisorSpec(id, datasources); + } + + @Override + public SupervisorSpec createSuspendedSpec() + { + return new NoopSupervisorSpec(id, datasources); } @Override diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java index 01bcb4cea56..56717691fde 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; import java.util.List; @@ -41,4 +42,19 @@ Supervisor createSupervisor(); List<String> getDataSources(); + + default SupervisorSpec createSuspendedSpec() + { + throw new NotImplementedException(); + } + + default SupervisorSpec createRunningSpec() + { + throw new NotImplementedException(); + } + + default boolean isSuspended() + { + return false; + } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org