This is an automated email from the ASF dual-hosted git repository.
vogievetsky pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 37db5d9b81 Reset offsets supervisor API (#14772)
37db5d9b81 is described below
commit 37db5d9b81be63848403d7ce5f7c0c5f090de437
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Thu Aug 17 17:13:10 2023 -0400
Reset offsets supervisor API (#14772)
* Add supervisor /resetOffsets API.
- Add a new endpoint
/druid/indexer/v1/supervisor/<supervisorId>/resetOffsets
which accepts DataSourceMetadata as a body parameter.
- Update logs, unit tests and docs.
* Add a new interface method for backwards compatibility.
* Rename
* Adjust tests and javadocs.
* Use CoreInjectorBuilder instead of deprecated makeInjectorWithModules
* UT fix
* Doc updates.
* remove extraneous debugging logs.
* Remove the boolean setting; only ResetHandle() and resetInternal()
* Relax constraints and add a new ResetOffsetsNotice; cleanup old logic.
* A separate ResetOffsetsNotice and some cleanup.
* Minor cleanup
* Add a check & test to verify that sequence numbers are only of type
SeekableStreamEndSequenceNumbers
* Add unit tests for the no op implementations for test coverage
* CodeQL fix
* checkstyle from merge conflict
* Doc changes
* DOCUSAURUS code tabs fix. Thanks, Brian!
---
docs/api-reference/supervisor-api.md | 100 +++-
.../extensions-core/kafka-supervisor-operations.md | 64 +++
.../extensions-core/kinesis-ingestion.md | 12 +-
.../MaterializedViewSupervisor.java | 6 +
.../MaterializedViewSupervisorTest.java | 33 ++
.../kafka/KafkaDataSourceMetadataTest.java | 56 +++
.../kinesis/KinesisDataSourceMetadataTest.java | 55 ++
.../kinesis/supervisor/KinesisSupervisorTest.java | 1 -
.../overlord/supervisor/SupervisorManager.java | 8 +-
.../overlord/supervisor/SupervisorResource.java | 25 +-
.../supervisor/SeekableStreamSupervisor.java | 144 +++++-
.../overlord/supervisor/SupervisorManagerTest.java | 30 ++
.../supervisor/SupervisorResourceTest.java | 49 ++
.../TestSeekableStreamDataSourceMetadata.java | 48 ++
.../SeekableStreamSupervisorStateTest.java | 552 ++++++++++++++++++++-
.../overlord/supervisor/NoopSupervisorSpec.java | 5 +
.../indexing/overlord/supervisor/Supervisor.java | 13 +
.../druid/indexing/NoopSupervisorSpecTest.java | 18 +
website/.spelling | 1 +
19 files changed, 1208 insertions(+), 12 deletions(-)
diff --git a/docs/api-reference/supervisor-api.md
b/docs/api-reference/supervisor-api.md
index 300e46ff1b..b315971ec2 100644
--- a/docs/api-reference/supervisor-api.md
+++ b/docs/api-reference/supervisor-api.md
@@ -3065,7 +3065,7 @@ Host: http://ROUTER_IP:ROUTER_PORT
### Reset a supervisor
-Resets the specified supervisor. This endpoint clears stored offsets in Kafka
or sequence numbers in Kinesis, prompting the supervisor to resume data
reading. The supervisor will start from the earliest or latest available
position, depending on the platform (offsets in Kafka or sequence numbers in
Kinesis). It kills and recreates active tasks to read from valid positions.
+Resets the specified supervisor. This endpoint clears _all_ stored offsets in
Kafka or sequence numbers in Kinesis, prompting the supervisor to resume data
reading. The supervisor will start from the earliest or latest available
position, depending on the platform (offsets in Kafka or sequence numbers in
Kinesis). It kills and recreates active tasks to read from valid positions.
Use this endpoint to recover from a stopped state due to missing offsets in
Kafka or sequence numbers in Kinesis. Use this endpoint with caution as it may
result in skipped messages and lead to data loss or duplicate data.
@@ -3130,6 +3130,104 @@ Host: http://ROUTER_IP:ROUTER_PORT
```
</details>
+### Reset Offsets for a supervisor
+
+Resets the specified offsets for a supervisor. This endpoint clears _only_ the
specified offsets in Kafka or sequence numbers in Kinesis, prompting the
supervisor to resume data reading.
+If there are no stored offsets, the specified offsets will be set in the
metadata store. The supervisor will start from the reset offsets for the
partitions specified and for the other partitions from the stored offset.
+It kills and recreates active tasks pertaining to the partitions specified to
read from valid offsets.
+
+Use this endpoint to selectively reset offsets for partitions without
resetting the entire set.
+
+#### URL
+
+<code class="postAPI">POST</code>
<code>/druid/indexer/v1/supervisor/:supervisorId/resetOffsets</code>
+
+#### Responses
+
+<!--DOCUSAURUS_CODE_TABS-->
+
+<!--200 SUCCESS-->
+
+*Successfully reset offsets*
+
+<!--404 NOT FOUND-->
+
+*Invalid supervisor ID*
+
+<!--END_DOCUSAURUS_CODE_TABS-->
+
+---
+#### Reset Offsets Metadata
+
+This section presents the structure and details of the reset offsets metadata
payload.
+
+| Field | Type | Description | Required |
+|---------|---------|---------|---------|
+| `type` | String | The type of reset offsets metadata payload. It must match
the supervisor's `type`. Possible values: `kafka` or `kinesis`. | Yes |
+| `partitions` | Object | An object representing the reset metadata. See below
for details. | Yes |
+
+#### Partitions
+
+The following table defines the fields within the `partitions` object in the
reset offsets metadata payload.
+
+| Field | Type | Description | Required |
+|---------|---------|---------|---------|
+| `type` | String | Must be set as `end`. Indicates the end sequence numbers
for the reset offsets. | Yes |
+| `stream` | String | The stream to be reset. It must be a valid stream
consumed by the supervisor. | Yes |
+| `partitionOffsetMap` | Object | A map of partitions to corresponding offsets
for the stream to be reset.| Yes |
+
+#### Sample request
+
+The following example shows how to reset offsets for a kafka supervisor with
the name `social_media`. Let's say the supervisor is reading
+from a kafka topic `ads_media_stream` and has the stored offsets: `{"0": 0,
"1": 10, "2": 20, "3": 40}`.
+
+<!--DOCUSAURUS_CODE_TABS-->
+
+<!--cURL-->
+
+```shell
+curl --request POST
"http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor/social_media/resetOffsets"
+--header 'Content-Type: application/json'
+--data-raw
'{"type":"kafka","partitions":{"type":"end","stream":"ads_media_stream","partitionOffsetMap":{"0":100,
"2": 650}}}'
+```
+
+<!--HTTP-->
+
+```HTTP
+POST /druid/indexer/v1/supervisor/social_media/resetOffsets HTTP/1.1
+Host: http://ROUTER_IP:ROUTER_PORT
+Content-Type: application/json
+
+{
+ "type": "kafka",
+ "partitions": {
+ "type": "end",
+ "stream": "ads_media_stream",
+ "partitionOffsetMap": {
+ "0": 100,
+ "2": 650
+ }
+ }
+}
+```
+
+The above operation will reset offsets only for partitions 0 and 2 to 100 and
650 respectively. After a successful reset,
+when the supervisor's tasks restart, they will resume reading from `{"0": 100,
"1": 10, "2": 650, "3": 40}`.
+
+<!--END_DOCUSAURUS_CODE_TABS-->
+
+#### Sample response
+
+<details>
+ <summary>Click to show sample response</summary>
+
+ ```json
+{
+ "id": "social_media"
+}
+ ```
+</details>
+
### Terminate a supervisor
Terminates a supervisor and its associated indexing tasks, triggering the
publishing of their segments. When terminated, a tombstone marker is placed in
the database to prevent reloading on restart.
diff --git a/docs/development/extensions-core/kafka-supervisor-operations.md
b/docs/development/extensions-core/kafka-supervisor-operations.md
index e1de35eb2f..9b6265154c 100644
--- a/docs/development/extensions-core/kafka-supervisor-operations.md
+++ b/docs/development/extensions-core/kafka-supervisor-operations.md
@@ -131,6 +131,70 @@ to start and in flight tasks will fail. This operation
enables you to recover fr
Note that the supervisor must be running for this endpoint to be available.
+## Resetting Offsets for a Supervisor
+
+The supervisor must be running for this endpoint to be available.
+
+The `POST /druid/indexer/v1/supervisor/<supervisorId>/resetOffsets` operation
clears stored
+offsets, causing the supervisor to start reading from the specified offsets.
After resetting stored
+offsets, the supervisor kills and recreates any active tasks pertaining to the
specified partitions,
+so that tasks begin reading from specified offsets. For partitions that are
not specified in this operation, the supervisor
+will resume from the last stored offset.
+
+Use care when using this operation! Resetting offsets for a supervisor may
cause Kafka messages to be skipped or read
+twice, resulting in missing or duplicate data.
+
+#### Sample request
+
+The following example shows how to reset offsets for a kafka supervisor with
the name `social_media`. Let's say the supervisor is reading
+from two kafka topics `ads_media_foo` and `ads_media_bar` and has the stored
offsets: `{"ads_media_foo:0": 0, "ads_media_foo:1": 10, "ads_media_bar:0": 20,
"ads_media_bar:1": 40}`.
+
+<!--DOCUSAURUS_CODE_TABS-->
+
+<!--cURL-->
+
+```shell
+curl --request POST
"http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor/social_media/resetOffsets"
+--header 'Content-Type: application/json'
+--data-raw
'{"type":"kafka","partitions":{"type":"end","stream":"ads_media_foo|ads_media_bar","partitionOffsetMap":{"ads_media_foo:0":
3, "ads_media_bar:1": 12}}}'
+```
+
+<!--HTTP-->
+
+```HTTP
+POST /druid/indexer/v1/supervisor/social_media/resetOffsets HTTP/1.1
+Host: http://ROUTER_IP:ROUTER_PORT
+Content-Type: application/json
+
+{
+ "type": "kafka",
+ "partitions": {
+ "type": "end",
+ "stream": "ads_media_foo|ads_media_bar",
+ "partitionOffsetMap": {
+ "ads_media_foo:0": 3,
+ "ads_media_bar:1": 12
+ }
+ }
+}
+```
+The above operation will reset offsets for `ads_media_foo` partition 0 and
`ads_media_bar` partition 1 to offsets 3 and 12 respectively. After a
successful reset,
+when the supervisor's tasks restart, they will resume reading from
`{"ads_media_foo:0": 3, "ads_media_foo:1": 10, "ads_media_bar:0": 20,
"ads_media_bar:1": 12}`.
+
+<!--END_DOCUSAURUS_CODE_TABS-->
+
+#### Sample response
+
+<details>
+ <summary>Click to show sample response</summary>
+
+ ```json
+{
+ "id": "social_media"
+}
+ ```
+</details>
+
## Terminating Supervisors
The `POST /druid/indexer/v1/supervisor/<supervisorId>/terminate` operation
terminates a supervisor and causes all
diff --git a/docs/development/extensions-core/kinesis-ingestion.md
b/docs/development/extensions-core/kinesis-ingestion.md
index 24038d7c9d..0c3066d2f7 100644
--- a/docs/development/extensions-core/kinesis-ingestion.md
+++ b/docs/development/extensions-core/kinesis-ingestion.md
@@ -45,7 +45,7 @@ See [Supervisor API](../../api-reference/supervisor-api.md)
for more information
|--------|----|-----------|--------|
|`type`|String|The supervisor type; this should always be `kinesis`.|Yes|
|`spec`|Object|The container object for the supervisor configuration.|Yes|
-|`ioConfig`|Object|The [I/O configuration](#supervisor-io-configuration)
object for configuring Kafka connection and I/O-related settings for the
supervisor and indexing task.|Yes|
+|`ioConfig`|Object|The [I/O configuration](#supervisor-io-configuration)
object for configuring Kinesis connection and I/O-related settings for the
supervisor and indexing task.|Yes|
|`dataSchema`|Object|The schema used by the Kinesis indexing task during
ingestion. See [`dataSchema`](../../ingestion/ingestion-spec.md#dataschema) for
more information.|Yes|
|`tuningConfig`|Object|The [tuning
configuration](#supervisor-tuning-configuration) object for configuring
performance-related settings for the supervisor and indexing tasks.|No|
@@ -593,6 +593,16 @@ for the generated segments to be accepted. If the messages
at the expected start
no longer available in Kinesis (typically because the message retention period
has elapsed or the topic was
removed and re-created) the supervisor will refuse to start and in-flight
tasks will fail. This endpoint enables you to recover from this condition.
+### Resetting Offsets for a supervisor
+
+To reset partition offsets for a supervisor, send a `POST` request to the
`/druid/indexer/v1/supervisor/:supervisorId/resetOffsets` endpoint. This
endpoint clears stored
+sequence numbers, prompting the supervisor to start reading from the specified
offsets.
+After resetting stored offsets, the supervisor kills and recreates any active
tasks pertaining to the specified partitions,
+so that tasks begin reading specified offsets. For partitions that are not
specified in this operation, the supervisor will resume from the last
+stored offset.
+
+Use this endpoint with caution as it may result in skipped messages, leading
to data loss or duplicate data.
+
### Terminate a supervisor
To terminate a supervisor and its associated indexing tasks, send a `POST`
request to the `/druid/indexer/v1/supervisor/:supervisorId/terminate` endpoint.
diff --git
a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
index fd9303c17b..ac2738534d 100644
---
a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
+++
b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
@@ -277,6 +277,12 @@ public class MaterializedViewSupervisor implements
Supervisor
}
}
+ @Override
+ public void resetOffsets(DataSourceMetadata resetDataSourceMetadata)
+ {
+ throw new UnsupportedOperationException("Reset offsets not supported in
MaterializedViewSupervisor");
+ }
+
@Override
public void checkpoint(int taskGroupId, DataSourceMetadata
checkpointMetadata)
{
diff --git
a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
index d822948881..e49b160c1e 100644
---
a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
+++
b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
@@ -387,4 +387,37 @@ public class MaterializedViewSupervisorTest
EasyMock.replay(mock);
supervisor.run();
}
+
+ @Test
+ public void testResetOffsetsNotSupported()
+ {
+ MaterializedViewSupervisorSpec suspended = new
MaterializedViewSupervisorSpec(
+ "base",
+ new DimensionsSpec(Collections.singletonList(new
StringDimensionSchema("dim"))),
+ new AggregatorFactory[]{new LongSumAggregatorFactory("m1", "m1")},
+ HadoopTuningConfig.makeDefaultTuningConfig(),
+ null,
+ null,
+ null,
+ null,
+ null,
+ true,
+ objectMapper,
+ taskMaster,
+ taskStorage,
+ metadataSupervisorManager,
+ sqlSegmentsMetadataManager,
+ indexerMetadataStorageCoordinator,
+ new MaterializedViewTaskConfig(),
+ EasyMock.createMock(AuthorizerMapper.class),
+ EasyMock.createMock(ChatHandlerProvider.class),
+ new SupervisorStateManagerConfig()
+ );
+ MaterializedViewSupervisor supervisor = (MaterializedViewSupervisor)
suspended.createSupervisor();
+ Assert.assertThrows(
+ "Reset offsets not supported in MaterializedViewSupervisor",
+ UnsupportedOperationException.class,
+ () -> supervisor.resetOffsets(null)
+ );
+ }
}
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadataTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadataTest.java
index 45f61b61b0..800a1fedf9 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadataTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadataTest.java
@@ -19,11 +19,19 @@
package org.apache.druid.indexing.kafka;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.inject.Injector;
+import com.google.inject.name.Names;
import org.apache.druid.data.input.kafka.KafkaTopicPartition;
+import org.apache.druid.guice.StartupInjectorBuilder;
+import org.apache.druid.indexing.overlord.DataSourceMetadata;
import
org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
+import org.apache.druid.initialization.CoreInjectorBuilder;
+import org.apache.druid.initialization.DruidModule;
import org.apache.druid.utils.CollectionUtils;
import org.junit.Assert;
import org.junit.Test;
@@ -163,6 +171,37 @@ public class KafkaDataSourceMetadataTest
);
}
+ @Test
+ public void testKafkaDataSourceMetadataSerdeRoundTrip() throws
JsonProcessingException
+ {
+ ObjectMapper jsonMapper = createObjectMapper();
+
+ KafkaDataSourceMetadata kdm1 = endMetadata(ImmutableMap.of());
+ String kdmStr1 = jsonMapper.writeValueAsString(kdm1);
+ DataSourceMetadata dsMeta1 = jsonMapper.readValue(kdmStr1,
DataSourceMetadata.class);
+ Assert.assertEquals(kdm1, dsMeta1);
+
+ KafkaDataSourceMetadata kdm2 = endMetadata(ImmutableMap.of(1, 3L));
+ String kdmStr2 = jsonMapper.writeValueAsString(kdm2);
+ DataSourceMetadata dsMeta2 = jsonMapper.readValue(kdmStr2,
DataSourceMetadata.class);
+ Assert.assertEquals(kdm2, dsMeta2);
+ }
+
+ @Test
+ public void testKafkaDataSourceMetadataSerde() throws JsonProcessingException
+ {
+ ObjectMapper jsonMapper = createObjectMapper();
+ KafkaDataSourceMetadata expectedKdm1 = endMetadata(ImmutableMap.of(1, 3L));
+ String kdmStr1 =
"{\"type\":\"kafka\",\"partitions\":{\"type\":\"end\",\"stream\":\"foo\",\"topic\":\"foo\",\"partitionSequenceNumberMap\":{\"1\":3},\"partitionOffsetMap\":{\"1\":3},\"exclusivePartitions\":[]}}\n";
+ DataSourceMetadata dsMeta1 = jsonMapper.readValue(kdmStr1,
DataSourceMetadata.class);
+ Assert.assertEquals(dsMeta1, expectedKdm1);
+
+ KafkaDataSourceMetadata expectedKdm2 = endMetadata(ImmutableMap.of(1, 3L,
2, 1900L));
+ String kdmStr2 =
"{\"type\":\"kafka\",\"partitions\":{\"type\":\"end\",\"stream\":\"foo\",\"topic\":\"food\",\"partitionSequenceNumberMap\":{\"1\":3,
\"2\":1900},\"partitionOffsetMap\":{\"1\":3,
\"2\":1900},\"exclusivePartitions\":[]}}\n";
+ DataSourceMetadata dsMeta2 = jsonMapper.readValue(kdmStr2,
DataSourceMetadata.class);
+ Assert.assertEquals(dsMeta2, expectedKdm2);
+ }
+
private static KafkaDataSourceMetadata startMetadata(Map<Integer, Long>
offsets)
{
Map<KafkaTopicPartition, Long> newOffsets = CollectionUtils.mapKeys(
@@ -188,4 +227,21 @@ public class KafkaDataSourceMetadataTest
);
return new KafkaDataSourceMetadata(new
SeekableStreamEndSequenceNumbers<>("foo", newOffsets));
}
+
+ private static ObjectMapper createObjectMapper()
+ {
+ DruidModule module = new KafkaIndexTaskModule();
+ final Injector injector = new CoreInjectorBuilder(new
StartupInjectorBuilder().build())
+ .addModule(
+ binder -> {
+
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test");
+
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8000);
+
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(9000);
+ }
+ ).build();
+
+ ObjectMapper objectMapper = injector.getInstance(ObjectMapper.class);
+ module.getJacksonModules().forEach(objectMapper::registerModule);
+ return objectMapper;
+ }
}
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisDataSourceMetadataTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisDataSourceMetadataTest.java
index fbb3d6405f..8798153170 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisDataSourceMetadataTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisDataSourceMetadataTest.java
@@ -20,10 +20,18 @@
package org.apache.druid.indexing.kinesis;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.inject.Injector;
+import com.google.inject.name.Names;
+import org.apache.druid.guice.StartupInjectorBuilder;
+import org.apache.druid.indexing.overlord.DataSourceMetadata;
import
org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
+import org.apache.druid.initialization.CoreInjectorBuilder;
+import org.apache.druid.initialization.DruidModule;
import org.junit.Assert;
import org.junit.Test;
@@ -217,6 +225,37 @@ public class KinesisDataSourceMetadataTest
);
}
+ @Test
+ public void testKinesisDataSourceMetadataSerdeRoundTrip() throws
JsonProcessingException
+ {
+ ObjectMapper jsonMapper = createObjectMapper();
+
+ KinesisDataSourceMetadata kdm1 = startMetadata(ImmutableMap.of(),
ImmutableSet.of());
+ String kdmStr1 = jsonMapper.writeValueAsString(kdm1);
+ DataSourceMetadata dsMeta1 = jsonMapper.readValue(kdmStr1,
DataSourceMetadata.class);
+ Assert.assertEquals(kdm1, dsMeta1);
+
+ KinesisDataSourceMetadata kdm2 = startMetadata(ImmutableMap.of("1", "3"),
ImmutableSet.of());
+ String kdmStr2 = jsonMapper.writeValueAsString(kdm2);
+ DataSourceMetadata dsMeta2 = jsonMapper.readValue(kdmStr2,
DataSourceMetadata.class);
+ Assert.assertEquals(kdm2, dsMeta2);
+ }
+
+ @Test
+ public void testKinesisDataSourceMetadataSerde() throws
JsonProcessingException
+ {
+ ObjectMapper jsonMapper = createObjectMapper();
+ KinesisDataSourceMetadata expectedKdm1 = endMetadata(ImmutableMap.of("1",
"5"));
+ String kdmStr1 =
"{\"type\":\"kinesis\",\"partitions\":{\"type\":\"end\",\"stream\":\"foo\",\"topic\":\"foo\",\"partitionSequenceNumberMap\":{\"1\":5},\"partitionOffsetMap\":{\"1\":5},\"exclusivePartitions\":[]}}\n";
+ DataSourceMetadata dsMeta1 = jsonMapper.readValue(kdmStr1,
DataSourceMetadata.class);
+ Assert.assertEquals(dsMeta1, expectedKdm1);
+
+ KinesisDataSourceMetadata expectedKdm2 = endMetadata(ImmutableMap.of("1",
"10", "2", "19"));
+ String kdmStr2 =
"{\"type\":\"kinesis\",\"partitions\":{\"type\":\"end\",\"stream\":\"foo\",\"topic\":\"food\",\"partitionSequenceNumberMap\":{\"1\":10,
\"2\":19},\"partitionOffsetMap\":{\"1\":10,
\"2\":19},\"exclusivePartitions\":[]}}\n";
+ DataSourceMetadata dsMeta2 = jsonMapper.readValue(kdmStr2,
DataSourceMetadata.class);
+ Assert.assertEquals(dsMeta2, expectedKdm2);
+ }
+
private static KinesisDataSourceMetadata simpleStartMetadata(Map<String,
String> sequences)
{
return startMetadata(sequences, sequences.keySet());
@@ -233,4 +272,20 @@ public class KinesisDataSourceMetadataTest
{
return new KinesisDataSourceMetadata(new
SeekableStreamEndSequenceNumbers<>("foo", sequences));
}
+
+ private static ObjectMapper createObjectMapper()
+ {
+ DruidModule module = new KinesisIndexingServiceModule();
+ final Injector injector = new CoreInjectorBuilder(new
StartupInjectorBuilder().build())
+ .addModule(
+ binder -> {
+
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test");
+
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8000);
+
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(9000);
+ }
+ ).build();
+ ObjectMapper objectMapper = injector.getInstance(ObjectMapper.class);
+ module.getJacksonModules().forEach(objectMapper::registerModule);
+ return objectMapper;
+ }
}
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 127683f8e4..39d943dbeb 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -2636,7 +2636,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
supervisor.resetInternal(null);
verifyAll();
-
}
@Test
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index f582e9e9ae..2cd926bae9 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -201,7 +201,7 @@ public class SupervisorManager
return supervisor == null ? Optional.absent() :
Optional.fromNullable(supervisor.lhs.isHealthy());
}
- public boolean resetSupervisor(String id, @Nullable DataSourceMetadata
dataSourceMetadata)
+ public boolean resetSupervisor(String id, @Nullable DataSourceMetadata
resetDataSourceMetadata)
{
Preconditions.checkState(started, "SupervisorManager not started");
Preconditions.checkNotNull(id, "id");
@@ -212,7 +212,11 @@ public class SupervisorManager
return false;
}
- supervisor.lhs.reset(dataSourceMetadata);
+ if (resetDataSourceMetadata == null) {
+ supervisor.lhs.reset(null);
+ } else {
+ supervisor.lhs.resetOffsets(resetDataSourceMetadata);
+ }
SupervisorTaskAutoScaler autoscaler = autoscalers.get(id);
if (autoscaler != null) {
autoscaler.reset();
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
index 4eaf5756c1..604c4073f3 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
@@ -30,6 +30,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.sun.jersey.spi.container.ResourceFilters;
+import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.TaskMaster;
import
org.apache.druid.indexing.overlord.http.security.SupervisorResourceFilter;
import org.apache.druid.java.util.common.StringUtils;
@@ -45,6 +46,7 @@ import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
+import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
@@ -493,10 +495,31 @@ public class SupervisorResource
@Produces(MediaType.APPLICATION_JSON)
@ResourceFilters(SupervisorResourceFilter.class)
public Response reset(@PathParam("id") final String id)
+ {
+ return handleResetRequest(id, null);
+ }
+
+ @POST
+ @Path("/{id}/resetOffsets")
+ @Produces(MediaType.APPLICATION_JSON)
+ @Consumes(MediaType.APPLICATION_JSON)
+ @ResourceFilters(SupervisorResourceFilter.class)
+ public Response resetOffsets(
+ @PathParam("id") final String id,
+ final DataSourceMetadata resetDataSourceMetadata
+ )
+ {
+ return handleResetRequest(id, resetDataSourceMetadata);
+ }
+
+ private Response handleResetRequest(
+ final String id,
+ @Nullable final DataSourceMetadata resetDataSourceMetadata
+ )
{
return asLeaderWithSupervisorManager(
manager -> {
- if (manager.resetSupervisor(id, null)) {
+ if (manager.resetSupervisor(id, resetDataSourceMetadata)) {
return Response.ok(ImmutableMap.of("id", id)).build();
} else {
return Response.status(Response.Status.NOT_FOUND)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index f8b41fce61..29fd16d1a4 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -42,6 +42,8 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
@@ -61,6 +63,7 @@ import
org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import
org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
+import
org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient;
import
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientAsyncImpl;
@@ -95,6 +98,7 @@ import
org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.joda.time.DateTime;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.io.IOException;
@@ -608,6 +612,31 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
+ private class ResetOffsetsNotice implements Notice
+ {
+ final DataSourceMetadata dataSourceMetadata;
+ private static final String TYPE = "reset_offsets_notice";
+
+ ResetOffsetsNotice(
+ final DataSourceMetadata dataSourceMetadata
+ )
+ {
+ this.dataSourceMetadata = dataSourceMetadata;
+ }
+
+ @Override
+ public void handle()
+ {
+ resetOffsetsInternal(dataSourceMetadata);
+ }
+
+ @Override
+ public String getType()
+ {
+ return TYPE;
+ }
+ }
+
protected class CheckpointNotice implements Notice
{
private final int taskGroupId;
@@ -998,12 +1027,59 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
@Override
- public void reset(DataSourceMetadata dataSourceMetadata)
+ public void reset(@Nullable final DataSourceMetadata dataSourceMetadata)
{
- log.info("Posting ResetNotice");
+ log.info("Posting ResetNotice with datasource metadata [%s]",
dataSourceMetadata);
addNotice(new ResetNotice(dataSourceMetadata));
}
+ /**
+ * Reset offsets with provided dataSource metadata. Validates {@code
resetDataSourceMetadata},
+ * creates a {@code ResetOffsetsNotice} with the metadata and adds it to the
notice queue. The resulting stored offsets
+ * is a union of existing checkpointed offsets and provided offsets.
+ * @param resetDataSourceMetadata required datasource metadata with offsets
to reset.
+ * @throws DruidException if any metadata attribute doesn't match the
supervisor's.
+ */
+ @Override
+ public void resetOffsets(@Nonnull DataSourceMetadata resetDataSourceMetadata)
+ {
+ if (resetDataSourceMetadata == null) {
+ throw InvalidInput.exception("Reset dataSourceMetadata is required for
resetOffsets.");
+ }
+
+ if (!checkSourceMetadataMatch(resetDataSourceMetadata)) {
+ throw InvalidInput.exception(
+ "Datasource metadata instance does not match required, found
instance of [%s].",
+ resetDataSourceMetadata.getClass()
+ );
+ }
+ @SuppressWarnings("unchecked")
+ final SeekableStreamDataSourceMetadata<PartitionIdType,
SequenceOffsetType> resetMetadata =
+ (SeekableStreamDataSourceMetadata<PartitionIdType,
SequenceOffsetType>) resetDataSourceMetadata;
+
+ final SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType>
streamSequenceNumbers = resetMetadata.getSeekableStreamSequenceNumbers();
+ if (!(streamSequenceNumbers instanceof SeekableStreamEndSequenceNumbers)) {
+ throw InvalidInput.exception(
+ "Provided datasourceMetadata[%s] is invalid. Sequence numbers can
only be of type[%s], but found[%s].",
+ resetMetadata,
+ SeekableStreamEndSequenceNumbers.class.getSimpleName(),
+ streamSequenceNumbers.getClass().getSimpleName()
+ );
+ }
+
+ final String resetStream = streamSequenceNumbers.getStream();
+ if (!ioConfig.getStream().equals(resetStream)) {
+ throw InvalidInput.exception(
+ "Stream[%s] doesn't exist in the supervisor[%s]. Supervisor is
consuming stream[%s].",
+ resetStream,
+ supervisorId,
+ ioConfig.getStream()
+ );
+ }
+ log.info("Posting ResetOffsetsNotice with reset dataSource metadata[%s]",
resetDataSourceMetadata);
+ addNotice(new ResetOffsetsNotice(resetDataSourceMetadata));
+ }
+
public ReentrantLock getRecordSupplierLock()
{
return recordSupplierLock;
@@ -1693,6 +1769,70 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
+ /**
+ * Reset offsets with the data source metadata. If checkpoints exist, the
resulting stored offsets will be a union of
+ * existing checkpointed offsets and provided offsets; any checkpointed
offsets not specified in the metadata will be
+ * preserved as-is. If checkpoints don't exist, the provided reset
datasource metdadata will be inserted into
+ * the metadata storage. Once the offsets are reset, any active tasks
serving the partition offsets will be restarted.
+ * @param dataSourceMetadata Required reset data source metdata. Assumed
that the metadata is validated.
+ */
+ public void resetOffsetsInternal(@Nonnull final DataSourceMetadata
dataSourceMetadata)
+ {
+ log.info("Reset offsets for dataSource[%s] with metadata[%s]", dataSource,
dataSourceMetadata);
+
+ @SuppressWarnings("unchecked")
+ final SeekableStreamDataSourceMetadata<PartitionIdType,
SequenceOffsetType> resetMetadata =
+ (SeekableStreamDataSourceMetadata<PartitionIdType,
SequenceOffsetType>) dataSourceMetadata;
+
+ final boolean metadataUpdateSuccess;
+ final DataSourceMetadata metadata =
indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(dataSource);
+ if (metadata == null) {
+ log.info("Checkpointed metadata in null for dataSource[%s] - inserting
metadata[%s]", dataSource, resetMetadata);
+ metadataUpdateSuccess =
indexerMetadataStorageCoordinator.insertDataSourceMetadata(dataSource,
resetMetadata);
+ } else {
+ if (!checkSourceMetadataMatch(metadata)) {
+ throw InvalidInput.exception(
+ "Datasource metadata instance does not match required, found
instance of [%s]",
+ metadata.getClass()
+ );
+ }
+ @SuppressWarnings("unchecked")
+ final SeekableStreamDataSourceMetadata<PartitionIdType,
SequenceOffsetType> currentMetadata =
+ (SeekableStreamDataSourceMetadata<PartitionIdType,
SequenceOffsetType>) metadata;
+ final DataSourceMetadata newMetadata =
currentMetadata.plus(resetMetadata);
+ log.info("Current checkpointed metadata[%s], new metadata[%s] for
dataSource[%s]", currentMetadata, newMetadata, dataSource);
+ try {
+ metadataUpdateSuccess =
indexerMetadataStorageCoordinator.resetDataSourceMetadata(dataSource,
newMetadata);
+ }
+ catch (IOException e) {
+ log.error("Reset offsets for dataSource[%s] with metadata[%s] failed
[%s]", dataSource, newMetadata, e.getMessage());
+ throw new RuntimeException(e);
+ }
+ }
+
+ if (!metadataUpdateSuccess) {
+ throw new ISE("Unable to reset metadata[%s] for datasource[%s]",
dataSource, dataSourceMetadata);
+ }
+
+ resetMetadata.getSeekableStreamSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .keySet()
+ .forEach(partition -> {
+ final int groupId = getTaskGroupIdForPartition(partition);
+ killTaskGroupForPartitions(
+ ImmutableSet.of(partition),
+ "DataSourceMetadata is updated while reset offsets is
called"
+ );
+ activelyReadingTaskGroups.remove(groupId);
+ // killTaskGroupForPartitions() cleans up partitionGroups.
+ // Add the removed groups back.
+ partitionGroups.computeIfAbsent(groupId, k -> new
HashSet<>());
+ partitionOffsets.put(partition, getNotSetMarker());
+ });
+
+ }
+
+
private void killTask(final String id, String reasonFormat, Object... args)
{
Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
index 822e035984..ba4b963e9b 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
@@ -22,7 +22,10 @@ package org.apache.druid.indexing.overlord.supervisor;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
+import
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
+import
org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.metadata.MetadataSupervisorManager;
import org.easymock.Capture;
@@ -327,6 +330,33 @@ public class SupervisorManagerTest extends EasyMockSupport
verifyAll();
}
+ @Test
+ public void testResetSupervisorWithSpecificOffsets()
+ {
+ Map<String, SupervisorSpec> existingSpecs = ImmutableMap.of(
+ "id1", new TestSupervisorSpec("id1", supervisor1)
+ );
+
+ DataSourceMetadata datasourceMetadata = new
TestSeekableStreamDataSourceMetadata(
+ new SeekableStreamStartSequenceNumbers<>(
+ "topic",
+ ImmutableMap.of("0", "10", "1", "20", "2", "30"),
+ ImmutableSet.of()
+ )
+ );
+
+
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
+ supervisor1.start();
+ supervisor1.resetOffsets(datasourceMetadata);
+ replayAll();
+
+ manager.start();
+ Assert.assertTrue("resetValidSupervisor", manager.resetSupervisor("id1",
datasourceMetadata));
+ Assert.assertFalse("resetInvalidSupervisor",
manager.resetSupervisor("nobody_home", datasourceMetadata));
+
+ verifyAll();
+ }
+
@Test
public void testCreateSuspendResumeAndStopSupervisor()
{
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
index 60cd027c0d..f1793db633 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
@@ -28,6 +28,8 @@ import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.TaskMaster;
import
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
+import
org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.security.Access;
@@ -1198,6 +1200,53 @@ public class SupervisorResourceTest extends
EasyMockSupport
verifyAll();
}
+ @Test
+ public void testResetOffsets()
+ {
+ Capture<String> id1 = Capture.newInstance();
+ Capture<String> id2 = Capture.newInstance();
+
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)).times(2);
+ EasyMock.expect(supervisorManager.resetSupervisor(
+ EasyMock.capture(id1),
+ EasyMock.anyObject(DataSourceMetadata.class)
+ )).andReturn(true);
+ EasyMock.expect(supervisorManager.resetSupervisor(
+ EasyMock.capture(id2),
+ EasyMock.anyObject(DataSourceMetadata.class)
+ )).andReturn(false);
+ replayAll();
+
+ DataSourceMetadata datasourceMetadata = new
TestSeekableStreamDataSourceMetadata(
+ new SeekableStreamStartSequenceNumbers<>(
+ "topic",
+ ImmutableMap.of("0", "10", "1", "20", "2", "30"),
+ ImmutableSet.of()
+ )
+ );
+
+ Response response = supervisorResource.resetOffsets("my-id",
datasourceMetadata);
+
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals(ImmutableMap.of("id", "my-id"), response.getEntity());
+
+ response = supervisorResource.resetOffsets("my-id-2", datasourceMetadata);
+
+ Assert.assertEquals(404, response.getStatus());
+ Assert.assertEquals("my-id", id1.getValue());
+ Assert.assertEquals("my-id-2", id2.getValue());
+ verifyAll();
+
+ resetAll();
+
+
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent());
+ replayAll();
+
+ response = supervisorResource.terminate("my-id");
+
+ Assert.assertEquals(503, response.getStatus());
+ verifyAll();
+ }
+
@Test
public void testNoopSupervisorSpecSerde() throws Exception
{
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamDataSourceMetadata.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamDataSourceMetadata.java
new file mode 100644
index 0000000000..7d87427e45
--- /dev/null
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamDataSourceMetadata.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.indexing.overlord.DataSourceMetadata;
+
+public class TestSeekableStreamDataSourceMetadata extends
SeekableStreamDataSourceMetadata<String, String>
+{
+ @JsonCreator
+ public TestSeekableStreamDataSourceMetadata(
+ @JsonProperty("partitions") SeekableStreamSequenceNumbers<String,
String> seekableStreamSequenceNumbers)
+ {
+ super(seekableStreamSequenceNumbers);
+ }
+
+ @Override
+ protected SeekableStreamDataSourceMetadata<String, String>
createConcreteDataSourceMetaData(
+ SeekableStreamSequenceNumbers<String, String>
seekableStreamSequenceNumbers
+ )
+ {
+ return new
TestSeekableStreamDataSourceMetadata(seekableStreamSequenceNumbers);
+ }
+
+ @Override
+ public DataSourceMetadata asStartMetadata()
+ {
+ return null;
+ }
+}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 46f86bacb4..7e8afdf6bc 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -32,6 +32,8 @@ import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.task.Task;
@@ -56,6 +58,7 @@ import
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
+import
org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamException;
@@ -81,6 +84,7 @@ import
org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
+import org.hamcrest.MatcherAssert;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;
@@ -90,6 +94,7 @@ import org.junit.Test;
import javax.annotation.Nullable;
import java.io.File;
+import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
@@ -171,8 +176,7 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class),
EasyMock.anyObject(Executor.class));
EasyMock.expectLastCall().times(0, 1);
- EasyMock
-
.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(null).anyTimes();
+
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(null).anyTimes();
EasyMock.expect(recordSupplier.getAssignment()).andReturn(ImmutableSet.of(SHARD0_PARTITION)).anyTimes();
EasyMock.expect(recordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("10").anyTimes();
}
@@ -831,7 +835,6 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
null
);
-
supervisor.start();
Assert.assertTrue(supervisor.stateManager.isHealthy());
@@ -1039,6 +1042,523 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
);
}
+ @Test
+ public void testSupervisorResetAllWithCheckpoints() throws
InterruptedException
+ {
+ EasyMock.expect(spec.isSuspended()).andReturn(false);
+
EasyMock.expect(indexerMetadataStorageCoordinator.deleteDataSourceMetadata(DATASOURCE)).andReturn(
+ true
+ );
+ taskQueue.shutdown("task1", "DataSourceMetadata is not found while reset");
+ EasyMock.expectLastCall();
+ replayAll();
+
+ final TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor();
+
+ supervisor.start();
+ supervisor.addTaskGroupToActivelyReadingTaskGroup(
+ supervisor.getTaskGroupIdForPartition("0"),
+ ImmutableMap.of("0", "5"),
+ Optional.absent(),
+ Optional.absent(),
+ ImmutableSet.of("task1"),
+ ImmutableSet.of()
+ );
+
+ supervisor.addTaskGroupToPendingCompletionTaskGroup(
+ supervisor.getTaskGroupIdForPartition("1"),
+ ImmutableMap.of("1", "6"),
+ Optional.absent(),
+ Optional.absent(),
+ ImmutableSet.of("task2"),
+ ImmutableSet.of()
+ );
+
+ Assert.assertEquals(1, supervisor.getActiveTaskGroupsCount());
+ Assert.assertEquals(0, supervisor.getNoticesQueueSize());
+ Assert.assertEquals(0, supervisor.getPartitionOffsets().size());
+
+ supervisor.reset(null);
+ validateSupervisorStateAfterResetOffsets(supervisor, ImmutableMap.of(), 0);
+ }
+
+ @Test
+ public void testSupervisorResetOneTaskSpecificOffsetsWithCheckpoints()
throws InterruptedException, IOException
+ {
+ final ImmutableMap<String, String> checkpointOffsets =
ImmutableMap.of("0", "0", "1", "10", "2", "20", "3", "30");
+ final ImmutableMap<String, String> resetOffsets = ImmutableMap.of("0",
"1000", "2", "2500");
+ final ImmutableMap<String, String> expectedOffsets = ImmutableMap.of("0",
"1000", "1", "10", "2", "2500", "3", "30");
+
+ EasyMock.expect(spec.isSuspended()).andReturn(false);
+ EasyMock.reset(indexerMetadataStorageCoordinator);
+
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
+ new TestSeekableStreamDataSourceMetadata(
+ new SeekableStreamEndSequenceNumbers<>(
+ STREAM,
+ checkpointOffsets
+ )
+ )
+ );
+
EasyMock.expect(indexerMetadataStorageCoordinator.resetDataSourceMetadata(DATASOURCE,
new TestSeekableStreamDataSourceMetadata(
+ new SeekableStreamEndSequenceNumbers<>(
+ STREAM,
+ expectedOffsets
+ ))
+ )).andReturn(
+ true
+ );
+
+ taskQueue.shutdown("task1", "DataSourceMetadata is updated while reset
offsets is called");
+ EasyMock.expectLastCall();
+
+ replayAll();
+
+ final TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor();
+
+ supervisor.start();
+ supervisor.addTaskGroupToActivelyReadingTaskGroup(
+ supervisor.getTaskGroupIdForPartition("0"),
+ checkpointOffsets,
+ Optional.absent(),
+ Optional.absent(),
+ ImmutableSet.of("task1"),
+ ImmutableSet.of()
+ );
+
+ final DataSourceMetadata resetMetadata = new
TestSeekableStreamDataSourceMetadata(
+ new SeekableStreamEndSequenceNumbers<>(
+ STREAM,
+ resetOffsets
+ )
+ );
+
+ Assert.assertEquals(1, supervisor.getActiveTaskGroupsCount());
+ Assert.assertEquals(0, supervisor.getNoticesQueueSize());
+ Assert.assertEquals(0, supervisor.getPartitionOffsets().size());
+
+ supervisor.resetOffsets(resetMetadata);
+
+ validateSupervisorStateAfterResetOffsets(supervisor, resetOffsets, 0);
+ }
+
+ @Test
+ public void testSupervisorResetSpecificOffsetsTasksWithCheckpoints() throws
InterruptedException, IOException
+ {
+ final ImmutableMap<String, String> checkpointOffsets =
ImmutableMap.of("0", "5", "1", "6", "2", "100");
+ final ImmutableMap<String, String> resetOffsets = ImmutableMap.of("0",
"10", "1", "8");
+ final ImmutableMap<String, String> expectedOffsets = ImmutableMap.of("0",
"10", "1", "8", "2", "100");
+
+ EasyMock.expect(spec.isSuspended()).andReturn(false);
+ EasyMock.reset(indexerMetadataStorageCoordinator);
+
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
+ new TestSeekableStreamDataSourceMetadata(
+ new SeekableStreamEndSequenceNumbers<>(
+ STREAM,
+ checkpointOffsets
+ )
+ )
+ );
+
EasyMock.expect(indexerMetadataStorageCoordinator.resetDataSourceMetadata(DATASOURCE,
new TestSeekableStreamDataSourceMetadata(
+ new SeekableStreamEndSequenceNumbers<>(
+ "stream",
+ expectedOffsets
+ )
+ ))).andReturn(true);
+ taskQueue.shutdown("task1", "DataSourceMetadata is updated while reset
offsets is called");
+ EasyMock.expectLastCall();
+
+ taskQueue.shutdown("task2", "DataSourceMetadata is updated while reset
offsets is called");
+ EasyMock.expectLastCall();
+
+ replayAll();
+
+ final TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor();
+
+ // Spin off two active tasks with each task serving one partition.
+ supervisor.getIoConfig().setTaskCount(3);
+ supervisor.start();
+ supervisor.addTaskGroupToActivelyReadingTaskGroup(
+ supervisor.getTaskGroupIdForPartition("0"),
+ ImmutableMap.of("0", "5"),
+ Optional.absent(),
+ Optional.absent(),
+ ImmutableSet.of("task1"),
+ ImmutableSet.of()
+ );
+
+ supervisor.addTaskGroupToActivelyReadingTaskGroup(
+ supervisor.getTaskGroupIdForPartition("1"),
+ ImmutableMap.of("1", "6"),
+ Optional.absent(),
+ Optional.absent(),
+ ImmutableSet.of("task2"),
+ ImmutableSet.of()
+ );
+
+ supervisor.addTaskGroupToActivelyReadingTaskGroup(
+ supervisor.getTaskGroupIdForPartition("2"),
+ ImmutableMap.of("2", "100"),
+ Optional.absent(),
+ Optional.absent(),
+ ImmutableSet.of("task3"),
+ ImmutableSet.of()
+ );
+
+ final DataSourceMetadata resetMetadata = new
TestSeekableStreamDataSourceMetadata(
+ new SeekableStreamEndSequenceNumbers<>(
+ STREAM,
+ resetOffsets
+ )
+ );
+
+ Assert.assertEquals(3, supervisor.getActiveTaskGroupsCount());
+ Assert.assertEquals(0, supervisor.getNoticesQueueSize());
+ Assert.assertEquals(0, supervisor.getPartitionOffsets().size());
+
+ supervisor.resetOffsets(resetMetadata);
+
+ validateSupervisorStateAfterResetOffsets(supervisor, resetOffsets, 1);
+ }
+
+ @Test
+ public void testSupervisorResetOffsetsWithNoCheckpoints() throws
InterruptedException
+ {
+ final ImmutableMap<String, String> resetOffsets = ImmutableMap.of("0",
"10", "1", "8");
+ final ImmutableMap<String, String> expectedOffsets =
ImmutableMap.copyOf(resetOffsets);
+
+ EasyMock.expect(spec.isSuspended()).andReturn(false);
+ EasyMock.reset(indexerMetadataStorageCoordinator);
+
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(null);
+
EasyMock.expect(indexerMetadataStorageCoordinator.insertDataSourceMetadata(DATASOURCE,
new TestSeekableStreamDataSourceMetadata(
+ new SeekableStreamEndSequenceNumbers<>(
+ "stream",
+ expectedOffsets
+ )
+ ))).andReturn(true);
+ taskQueue.shutdown("task1", "DataSourceMetadata is updated while reset
offsets is called");
+ EasyMock.expectLastCall();
+
+ taskQueue.shutdown("task2", "DataSourceMetadata is updated while reset
offsets is called");
+ EasyMock.expectLastCall();
+
+ replayAll();
+
+ final TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor();
+
+ // Spin off three active tasks with each task serving one partition.
+ supervisor.getIoConfig().setTaskCount(3);
+ supervisor.start();
+ supervisor.addTaskGroupToActivelyReadingTaskGroup(
+ supervisor.getTaskGroupIdForPartition("0"),
+ ImmutableMap.of("0", "5"),
+ Optional.absent(),
+ Optional.absent(),
+ ImmutableSet.of("task1"),
+ ImmutableSet.of()
+ );
+
+ supervisor.addTaskGroupToActivelyReadingTaskGroup(
+ supervisor.getTaskGroupIdForPartition("1"),
+ ImmutableMap.of("1", "6"),
+ Optional.absent(),
+ Optional.absent(),
+ ImmutableSet.of("task2"),
+ ImmutableSet.of()
+ );
+
+ supervisor.addTaskGroupToActivelyReadingTaskGroup(
+ supervisor.getTaskGroupIdForPartition("2"),
+ ImmutableMap.of("2", "100"),
+ Optional.absent(),
+ Optional.absent(),
+ ImmutableSet.of("task3"),
+ ImmutableSet.of()
+ );
+
+ final DataSourceMetadata resetMetadata = new
TestSeekableStreamDataSourceMetadata(
+ new SeekableStreamEndSequenceNumbers<>(
+ STREAM,
+ resetOffsets
+ )
+ );
+
+ Assert.assertEquals(3, supervisor.getActiveTaskGroupsCount());
+ Assert.assertEquals(0, supervisor.getNoticesQueueSize());
+ Assert.assertEquals(0, supervisor.getPartitionOffsets().size());
+
+ supervisor.resetOffsets(resetMetadata);
+
+ validateSupervisorStateAfterResetOffsets(supervisor, resetOffsets, 1);
+ }
+
+
+ @Test
+ public void testSupervisorResetWithNoPartitions() throws IOException,
InterruptedException
+ {
+ final ImmutableMap<String, String> checkpointOffsets =
ImmutableMap.of("0", "5", "1", "6");
+ final ImmutableMap<String, String> resetOffsets = ImmutableMap.of();
+ final ImmutableMap<String, String> expectedOffsets = ImmutableMap.of("0",
"5", "1", "6");
+
+ EasyMock.expect(spec.isSuspended()).andReturn(false);
+ EasyMock.reset(indexerMetadataStorageCoordinator);
+
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
+ new TestSeekableStreamDataSourceMetadata(
+ new SeekableStreamEndSequenceNumbers<>(
+ STREAM,
+ checkpointOffsets
+ )
+ )
+ );
+
EasyMock.expect(indexerMetadataStorageCoordinator.resetDataSourceMetadata(DATASOURCE,
new TestSeekableStreamDataSourceMetadata(
+ new SeekableStreamEndSequenceNumbers<>(
+ "stream",
+ expectedOffsets
+ )
+ ))).andReturn(true);
+
+ replayAll();
+
+ final TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor();
+
+ // Spin off two active tasks with each task serving one partition.
+ supervisor.getIoConfig().setTaskCount(2);
+ supervisor.start();
+ supervisor.addTaskGroupToActivelyReadingTaskGroup(
+ supervisor.getTaskGroupIdForPartition("0"),
+ ImmutableMap.of("0", "5"),
+ Optional.absent(),
+ Optional.absent(),
+ ImmutableSet.of("task1"),
+ ImmutableSet.of()
+ );
+
+ supervisor.addTaskGroupToActivelyReadingTaskGroup(
+ supervisor.getTaskGroupIdForPartition("1"),
+ ImmutableMap.of("1", "6"),
+ Optional.absent(),
+ Optional.absent(),
+ ImmutableSet.of("task2"),
+ ImmutableSet.of()
+ );
+
+ final DataSourceMetadata resetMetadata = new
TestSeekableStreamDataSourceMetadata(
+ new SeekableStreamEndSequenceNumbers<>(
+ STREAM,
+ resetOffsets
+ )
+ );
+
+ Assert.assertEquals(2, supervisor.getActiveTaskGroupsCount());
+ Assert.assertEquals(0, supervisor.getNoticesQueueSize());
+ Assert.assertEquals(0, supervisor.getPartitionOffsets().size());
+
+ supervisor.resetOffsets(resetMetadata);
+
+ validateSupervisorStateAfterResetOffsets(supervisor, resetOffsets, 2);
+ }
+
+ @Test
+ public void testSupervisorResetWithNewPartition() throws IOException,
InterruptedException
+ {
+ final ImmutableMap<String, String> checkpointOffsets =
ImmutableMap.of("0", "5", "1", "6");
+ final ImmutableMap<String, String> resetOffsets = ImmutableMap.of("2",
"20");
+ final ImmutableMap<String, String> expectedOffsets = ImmutableMap.of("0",
"5", "1", "6", "2", "20");
+
+ EasyMock.expect(spec.isSuspended()).andReturn(false);
+ EasyMock.reset(indexerMetadataStorageCoordinator);
+
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
+ new TestSeekableStreamDataSourceMetadata(
+ new SeekableStreamEndSequenceNumbers<>(
+ STREAM,
+ checkpointOffsets
+ )
+ )
+ );
+
EasyMock.expect(indexerMetadataStorageCoordinator.resetDataSourceMetadata(DATASOURCE,
new TestSeekableStreamDataSourceMetadata(
+ new SeekableStreamEndSequenceNumbers<>(
+ "stream",
+ expectedOffsets
+ )
+ ))).andReturn(true);
+ taskQueue.shutdown("task1", "DataSourceMetadata is updated while reset
offsets is called");
+ EasyMock.expectLastCall();
+
+ replayAll();
+
+ final TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor();
+
+ // Spin off two active tasks with each task serving one partition.
+ supervisor.getIoConfig().setTaskCount(2);
+ supervisor.start();
+ supervisor.addTaskGroupToActivelyReadingTaskGroup(
+ supervisor.getTaskGroupIdForPartition("0"),
+ ImmutableMap.of("0", "5"),
+ Optional.absent(),
+ Optional.absent(),
+ ImmutableSet.of("task1"),
+ ImmutableSet.of()
+ );
+
+ supervisor.addTaskGroupToActivelyReadingTaskGroup(
+ supervisor.getTaskGroupIdForPartition("1"),
+ ImmutableMap.of("1", "6"),
+ Optional.absent(),
+ Optional.absent(),
+ ImmutableSet.of("task2"),
+ ImmutableSet.of()
+ );
+
+ final DataSourceMetadata resetMetadata = new
TestSeekableStreamDataSourceMetadata(
+ new SeekableStreamEndSequenceNumbers<>(
+ STREAM,
+ resetOffsets
+ )
+ );
+
+ Assert.assertEquals(2, supervisor.getActiveTaskGroupsCount());
+ Assert.assertEquals(0, supervisor.getNoticesQueueSize());
+ Assert.assertEquals(0, supervisor.getPartitionOffsets().size());
+
+ supervisor.resetOffsets(resetMetadata);
+
+ validateSupervisorStateAfterResetOffsets(supervisor, resetOffsets, 1);
+ }
+
+ @Test
+ public void testSupervisorNoResetDataSourceMetadata()
+ {
+ EasyMock.expect(spec.isSuspended()).andReturn(false);
+ replayAll();
+
+ final TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor();
+
+ supervisor.start();
+ supervisor.addTaskGroupToActivelyReadingTaskGroup(
+ supervisor.getTaskGroupIdForPartition("0"),
+ ImmutableMap.of("0", "0"),
+ Optional.absent(),
+ Optional.absent(),
+ ImmutableSet.of("task1"),
+ ImmutableSet.of()
+ );
+
+ supervisor.addTaskGroupToPendingCompletionTaskGroup(
+ supervisor.getTaskGroupIdForPartition("1"),
+ ImmutableMap.of("1", "0"),
+ Optional.absent(),
+ Optional.absent(),
+ ImmutableSet.of("task2"),
+ ImmutableSet.of()
+ );
+
+ verifyAll();
+
+ MatcherAssert.assertThat(
+ Assert.assertThrows(DruidException.class, () ->
+ supervisor.resetOffsets(null)
+ ),
+ DruidExceptionMatcher.invalidInput().expectMessageIs(
+ "Reset dataSourceMetadata is required for resetOffsets."
+ )
+ );
+ }
+
+ @Test
+ public void testSupervisorResetWithInvalidStartSequenceMetadata()
+ {
+ EasyMock.expect(spec.isSuspended()).andReturn(false);
+ replayAll();
+
+ final TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor();
+
+ supervisor.start();
+ supervisor.addTaskGroupToActivelyReadingTaskGroup(
+ supervisor.getTaskGroupIdForPartition("0"),
+ ImmutableMap.of("0", "0"),
+ Optional.absent(),
+ Optional.absent(),
+ ImmutableSet.of("task1"),
+ ImmutableSet.of()
+ );
+
+ supervisor.addTaskGroupToPendingCompletionTaskGroup(
+ supervisor.getTaskGroupIdForPartition("1"),
+ ImmutableMap.of("1", "0"),
+ Optional.absent(),
+ Optional.absent(),
+ ImmutableSet.of("task2"),
+ ImmutableSet.of()
+ );
+
+ verifyAll();
+
+ final DataSourceMetadata dataSourceMetadata = new
TestSeekableStreamDataSourceMetadata(
+ new SeekableStreamStartSequenceNumbers<>(
+ "i-am-not-real",
+ ImmutableMap.of("0", "10", "1", "20", "2", "30"),
+ ImmutableSet.of()
+ )
+ );
+
+ MatcherAssert.assertThat(
+ Assert.assertThrows(DruidException.class, () ->
+ supervisor.resetOffsets(dataSourceMetadata)
+ ),
+ DruidExceptionMatcher.invalidInput().expectMessageIs(
+ StringUtils.format(
+ "Provided datasourceMetadata[%s] is invalid. Sequence numbers
can only be of type[SeekableStreamEndSequenceNumbers], but
found[SeekableStreamStartSequenceNumbers].",
+ dataSourceMetadata
+ )
+ )
+ );
+ }
+
+ @Test
+ public void testSupervisorResetInvalidStream()
+ {
+ EasyMock.expect(spec.isSuspended()).andReturn(false);
+ replayAll();
+
+ final TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor();
+
+ supervisor.start();
+ supervisor.addTaskGroupToActivelyReadingTaskGroup(
+ supervisor.getTaskGroupIdForPartition("0"),
+ ImmutableMap.of("0", "0"),
+ Optional.absent(),
+ Optional.absent(),
+ ImmutableSet.of("task1"),
+ ImmutableSet.of()
+ );
+
+ supervisor.addTaskGroupToPendingCompletionTaskGroup(
+ supervisor.getTaskGroupIdForPartition("1"),
+ ImmutableMap.of("1", "0"),
+ Optional.absent(),
+ Optional.absent(),
+ ImmutableSet.of("task2"),
+ ImmutableSet.of()
+ );
+
+ verifyAll();
+
+ final DataSourceMetadata dataSourceMetadata = new
TestSeekableStreamDataSourceMetadata(
+ new SeekableStreamEndSequenceNumbers<>(
+ "i-am-not-real",
+ ImmutableMap.of("0", "10", "1", "20", "2", "30")
+ )
+ );
+
+ MatcherAssert.assertThat(
+ Assert.assertThrows(DruidException.class, () ->
+ supervisor.resetOffsets(dataSourceMetadata)
+ ),
+ DruidExceptionMatcher.invalidInput().expectMessageIs(
+ "Stream[i-am-not-real] doesn't exist in the
supervisor[testSupervisorId]. Supervisor is consuming stream[stream]."
+ )
+ );
+ }
+
@Test
public void testStaleOffsetsNegativeLagNotEmitted() throws Exception
{
@@ -1063,6 +1583,25 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
Assert.assertEquals(0, emitter.getEvents().size());
}
+ private void validateSupervisorStateAfterResetOffsets(
+ final TestSeekableStreamSupervisor supervisor,
+ final ImmutableMap<String, String> expectedResetOffsets,
+ final int expectedActiveTaskCount
+ ) throws InterruptedException
+ {
+ // Wait for the notice queue to be drained asynchronously before we
validate the supervisor's final state.
+ while (supervisor.getNoticesQueueSize() > 0) {
+ Thread.sleep(100);
+ }
+ Thread.sleep(1000);
+ Assert.assertEquals(expectedActiveTaskCount,
supervisor.getActiveTaskGroupsCount());
+ Assert.assertEquals(expectedResetOffsets.size(),
supervisor.getPartitionOffsets().size());
+ for (Map.Entry<String, String> entry : expectedResetOffsets.entrySet()) {
+ Assert.assertEquals(supervisor.getNotSetMarker(),
supervisor.getPartitionOffsets().get(entry.getKey()));
+ }
+ verifyAll();
+ }
+
@Test
public void testScheduleReporting()
{
@@ -1419,7 +1958,12 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
@Override
protected int getTaskGroupIdForPartition(String partition)
{
- return 0;
+ try {
+ return Integer.parseInt(partition) % spec.getIoConfig().getTaskCount();
+ }
+ catch (NumberFormatException e) {
+ return 0;
+ }
}
@Override
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
index cedf4677e5..e733ef6c23 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
@@ -163,6 +163,11 @@ public class NoopSupervisorSpec implements SupervisorSpec
{
}
+ @Override
+ public void resetOffsets(DataSourceMetadata resetDataSourceMetadata)
+ {
+ }
+
@Override
public void checkpoint(int taskGroupId, DataSourceMetadata
checkpointMetadata)
{
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
index 10c48578a6..bcfc5ebe81 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
@@ -21,6 +21,7 @@ package org.apache.druid.indexing.overlord.supervisor;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.segment.incremental.ParseExceptionReport;
@@ -61,8 +62,20 @@ public interface Supervisor
return null; // default implementation for interface compatability;
returning null since true or false is misleading
}
+ /**
+ * Resets all offsets for a dataSource.
+ * @param dataSourceMetadata optional dataSource metadata.
+ */
void reset(DataSourceMetadata dataSourceMetadata);
+ /**
+ * Reset offsets with provided dataSource metadata. The resulting stored
offsets should be a union of existing checkpointed
+ * offsets with provided offsets.
+ * @param resetDataSourceMetadata required datasource metadata with offsets
to reset.
+ * @throws DruidException if any metadata attribute doesn't match the
supervisor's state.
+ */
+ void resetOffsets(DataSourceMetadata resetDataSourceMetadata);
+
/**
* The definition of checkpoint is not very strict as currently it does not
affect data or control path.
* On this call Supervisor can potentially checkpoint data processed so far
to some durable storage
diff --git
a/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java
b/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java
index 742854b26d..a6e4b8fe09 100644
--- a/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java
+++ b/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java
@@ -19,8 +19,10 @@
package org.apache.druid.indexing;
+import org.apache.druid.indexing.overlord.ObjectMetadata;
import org.apache.druid.indexing.overlord.supervisor.NoopSupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.junit.Assert;
@@ -71,4 +73,20 @@ public class NoopSupervisorSpecTest
NoopSupervisorSpec noopSupervisorSpec = new NoopSupervisorSpec(null,
Collections.singletonList("datasource1"));
Assert.assertTrue(noopSupervisorSpec.getInputSourceResources().isEmpty());
}
+
+ @Test
+ public void testNoppSupervisorResetOffsetsDoNothing()
+ {
+ NoopSupervisorSpec expectedSpec = new NoopSupervisorSpec(null, null);
+ Supervisor noOpSupervisor = expectedSpec.createSupervisor();
+ Assert.assertEquals(-1, noOpSupervisor.getActiveTaskGroupsCount());
+ noOpSupervisor.resetOffsets(null);
+ Assert.assertEquals(-1, noOpSupervisor.getActiveTaskGroupsCount());
+ Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING,
noOpSupervisor.getState());
+
+ Assert.assertEquals(-1, noOpSupervisor.getActiveTaskGroupsCount());
+ noOpSupervisor.resetOffsets(new ObjectMetadata("someObject"));
+ Assert.assertEquals(-1, noOpSupervisor.getActiveTaskGroupsCount());
+ Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING,
noOpSupervisor.getState());
+ }
}
diff --git a/website/.spelling b/website/.spelling
index 0f35993488..b34ad9c6e8 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -1098,6 +1098,7 @@ numProcessors
q.size
repartitionTransitionDuration
replicastaskCounttaskCount
+resetOffsets
resetuseEarliestSequenceNumberPOST
resumePOST
statusrecentErrorsdruid.supervisor.maxStoredExceptionEventsstatedetailedStatestatedetailedStatestatestatePENDINGRUNNINGSUSPENDEDSTOPPINGUNHEALTHY_SUPERVISORUNHEALTHY_TASKSdetailedStatestatedruid.supervisor.unhealthinessThresholddruid.supervisor.taskUnhealthinessThresholdtaskDurationtaskCountreplicasdetailedStatedetailedStateRUNNINGPOST
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]