kfaraz commented on code in PR #18082:
URL: https://github.com/apache/druid/pull/18082#discussion_r2139146461
##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -251,29 +251,47 @@ public void createTable(final String tableName, final
Iterable<String> sql)
/**
* Execute the desired ALTER statement on the desired table
*
- * @param tableName The name of the table being altered
- * @param sql ALTER statment to be executed
+ * @param tableName The name of the table being altered
+ * @param sql ALTER statements to be executed
+ * @param useTransaction Whether to use a transaction for this operation
*/
- private void alterTable(final String tableName, final Iterable<String> sql)
+ private void alterTable(final String tableName, final Iterable<String> sql,
final boolean useTransaction)
{
try {
retryWithHandle(handle -> {
if (tableExists(handle, tableName)) {
- final Batch batch = handle.createBatch();
- for (String s : sql) {
- log.info("Altering table[%s], with command: %s", tableName, s);
Review Comment:
We should retain this log line somewhere, maybe in `executeBatch` method.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/ResetDataSourceMetadataAction.java:
##########
@@ -26,14 +26,17 @@
public class ResetDataSourceMetadataAction implements TaskAction<Boolean>
{
+ private final String supervisorId;
private final String dataSource;
private final DataSourceMetadata resetMetadata;
public ResetDataSourceMetadataAction(
+ @JsonProperty("supervisorId") String supervisorId,
@JsonProperty("dataSource") String dataSource,
Review Comment:
We should probably make this nullable (and also deprecated?) because new
code need not send this anymore.
##########
server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java:
##########
@@ -405,47 +408,48 @@ SegmentPublishResult commitReplaceSegments(
);
/**
- * Retrieves data source's metadata from the metadata store. Returns null if
there is no metadata.
+ * Retrieves supervisor's metadata from the datasource metadata store.
Returns null if there is no metadata.
Review Comment:
```suggestion
* Retrieves supervisor's metadata from the metadata store. Returns null
if there is no metadata.
```
##########
server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java:
##########
@@ -405,47 +408,48 @@ SegmentPublishResult commitReplaceSegments(
);
/**
- * Retrieves data source's metadata from the metadata store. Returns null if
there is no metadata.
+ * Retrieves supervisor's metadata from the datasource metadata store.
Returns null if there is no metadata.
*/
- @Nullable DataSourceMetadata retrieveDataSourceMetadata(String dataSource);
+ @Nullable DataSourceMetadata retrieveDataSourceMetadata(String supervisorId);
/**
- * Removes entry for 'dataSource' from the dataSource metadata table.
+ * Removes entry for 'supervisorId' from the dataSource metadata table.
Review Comment:
```suggestion
* Removes entry for {@code supervisorId} from the dataSource metadata
table.
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/ResetDataSourceMetadataAction.java:
##########
@@ -44,6 +47,12 @@ public String getDataSource()
return dataSource;
}
+ @JsonProperty
+ public String getSupervisorId()
Review Comment:
Please annotate as nullable.
##########
integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceMultipleSupervisorTest.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.indexer;
+
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.tests.TestNGGroup;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+@Test(groups = {TestNGGroup.KAFKA_INDEX})
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITKafkaIndexingServiceMultipleSupervisorTest extends
AbstractKafkaIndexingServiceTest
+{
+ @Override
+ public String getTestNamePrefix()
+ {
+ return "kafka_multi_supervisor";
+ }
+
+ @BeforeClass
+ public void beforeClass() throws Exception
+ {
+ doBeforeClass();
+ }
+
+ /**
+ * This test tests multiple (2) supervisors simultaneously ingesting into
the same datasource
+ */
+ @Test
+ public void testKafkaIndexMultiSupervisorWithTransaction() throws Exception
+ {
+ doTestMultiSupervisorIndexDataStableState(
+ true,
+ 2
+ );
+ }
+
+ /**
+ * This test tests multiple (2) supervisors simultaneously ingesting into
the same datasource
+ */
Review Comment:
Javadoc is not needed, the functionality should be apparent from the test
name itself.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -1418,6 +1429,7 @@ protected void sendResetRequestAndWait(
.getTaskActionClient()
.submit(
new ResetDataSourceMetadataAction(
+ getSupervisorId(),
task.getDataSource(),
Review Comment:
I think we can just pass `null` for datasource now since the task action
doesn't use it if the `supervisorId` is non null.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -569,7 +575,7 @@ private boolean changeTaskCount(int desiredActiveTaskCount)
AUTOSCALER_SCALING_TIME_METRIC,
scaleActionStopwatch.millisElapsed()
));
- log.info("Changed taskCount to [%s] for dataSource [%s].",
desiredActiveTaskCount, dataSource);
+ log.info("Changed taskCount to [%s] for supervisor [%s] for dataSource
[%s].", desiredActiveTaskCount, supervisorId, dataSource);
Review Comment:
```suggestion
log.info("Changed taskCount to [%s] for supervisor[%s] of
dataSource[%s].", desiredActiveTaskCount, supervisorId, dataSource);
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -918,7 +924,7 @@ public SeekableStreamSupervisor(
this.autoScalerConfig = ioConfig.getAutoScalerConfig();
this.tuningConfig = spec.getTuningConfig();
this.taskTuningConfig = this.tuningConfig.convertToTaskTuningConfig();
- this.supervisorId = supervisorId;
+ this.supervisorId = Preconditions.checkNotNull(supervisorId, "supervisorId
cannot be null");
Review Comment:
The old code used a `supervisorId` in this class which had the value:
```java
StringUtils.format("KafkaSupervisor-%s",
spec.getDataSchema().getDataSource())
```
(or equivalent for RabbitMQ and Kinesis)
This supervisorId was being used only for logging and not sent over the wire
or persisted to DB. So, users searching for a specific format of log messages
might get incorrect results after this change.
We should either call out this point in "Upgrade Notes" section in the PR
description
OR
build a logging key here that retains the older naming scheme.
One advantage of the old logging key was that the type of the supervisor was
readily apparent. After this PR, it is possible to have a Kinesis supervisor
and a Kafka supervisor ingesting into the same datasource. The type distinction
in the logs would then come in handy.
Let me know what you think.
##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -328,6 +346,27 @@ tableName, getPayloadType(), getCollation()
)
)
);
+
+ if (!tableHasColumn(tableName, "supervisor_id")) {
+ final List<String> alterCommands = new ArrayList<>();
+ alterCommands.add(StringUtils.format(
+ "ALTER TABLE %s ADD COLUMN supervisor_id VARCHAR(255) NOT NULL
DEFAULT 'NULL'",
Review Comment:
`"null"` and `null` can be confusing. Maybe use a default value of empty
string instead.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java:
##########
@@ -66,25 +66,29 @@ public class SegmentTransactionalAppendAction implements
TaskAction<SegmentPubli
@Nullable
private final DataSourceMetadata endMetadata;
@Nullable
+ private final String supervisorId;
+ @Nullable
private final SegmentSchemaMapping segmentSchemaMapping;
public static SegmentTransactionalAppendAction forSegments(Set<DataSegment>
segments, SegmentSchemaMapping segmentSchemaMapping)
{
- return new SegmentTransactionalAppendAction(segments, null, null,
segmentSchemaMapping);
+ return new SegmentTransactionalAppendAction(null, segments, null, null,
segmentSchemaMapping);
}
public static SegmentTransactionalAppendAction forSegmentsAndMetadata(
+ String supervisorId,
Review Comment:
Please put this argument after `segments` arg so that the logical order of
arguments is:
segments -> metadata -> schema
Some comment for other task places (like `IndexerMetadataCoordinator`) for
consistency.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/ResetDataSourceMetadataAction.java:
##########
@@ -26,14 +26,17 @@
public class ResetDataSourceMetadataAction implements TaskAction<Boolean>
{
+ private final String supervisorId;
private final String dataSource;
private final DataSourceMetadata resetMetadata;
public ResetDataSourceMetadataAction(
+ @JsonProperty("supervisorId") String supervisorId,
Review Comment:
Needs to be nullable for rolling upgrades. We can default to `dataSource`
when null.
##########
server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java:
##########
@@ -335,7 +336,8 @@ SegmentPublishResult commitSegmentsAndMetadata(
Set<DataSegment> segments,
@Nullable DataSourceMetadata startMetadata,
@Nullable DataSourceMetadata endMetadata,
- @Nullable SegmentSchemaMapping segmentSchemaMapping
+ @Nullable SegmentSchemaMapping segmentSchemaMapping,
+ @Nullable String supervisorId
Review Comment:
Please put this arg right before `startMetadata`. Similar comment for other
methods.
##########
processing/src/main/java/org/apache/druid/query/DruidMetrics.java:
##########
@@ -49,6 +49,7 @@ public class DruidMetrics
public static final String TASK_ACTION_TYPE = "taskActionType";
public static final String STREAM = "stream";
public static final String PARTITION = "partition";
+ public static final String SUPERVISOR_ID = "supervisor";
Review Comment:
For consistency with other dimensions:
```suggestion
public static final String SUPERVISOR_ID = "supervisorId";
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java:
##########
@@ -132,7 +146,9 @@ public SegmentSchemaMapping getSegmentSchemaMapping()
@Override
public TypeReference<SegmentPublishResult> getReturnTypeReference()
{
- return new TypeReference<>() {};
+ return new TypeReference<SegmentPublishResult>()
+ {
+ };
Review Comment:
Change not needed.
##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -328,6 +346,27 @@ tableName, getPayloadType(), getCollation()
)
)
);
+
+ if (!tableHasColumn(tableName, "supervisor_id")) {
+ final List<String> alterCommands = new ArrayList<>();
+ alterCommands.add(StringUtils.format(
+ "ALTER TABLE %s ADD COLUMN supervisor_id VARCHAR(255) NOT NULL
DEFAULT 'NULL'",
Review Comment:
We should use collation here, same as dataSource column.
##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -328,6 +346,27 @@ tableName, getPayloadType(), getCollation()
)
)
);
+
+ if (!tableHasColumn(tableName, "supervisor_id")) {
Review Comment:
Please move this logic to a new method `alterDataSourceTable`.
Also, please update the CREATE statement above to have the supervisor_id and
the correct PRIMARY KEY. We shouldn't need to alter a table if it is being
freshly created.
##########
extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/DerivativeDataSourceManager.java:
##########
@@ -158,7 +158,7 @@ private void updateDerivatives()
handle
.createQuery(
StringUtils.format(
- "SELECT DISTINCT dataSource,commit_metadata_payload FROM
%1$s",
+ "SELECT DISTINCT
supervisor_id,dataSource,commit_metadata_payload FROM %1$s",
Review Comment:
Did you intend to read the `supervisor_id` from the result later in this
method?
Or is it being selected just for uniqueness?
##########
sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java:
##########
@@ -1392,6 +1392,7 @@ public void testSupervisorTable() throws Exception
String json = "[{\n"
+ "\t\"id\": \"wikipedia\",\n"
+ + "\t\"dataSource\": \"wikipedia\",\n"
Review Comment:
Try using an id that is different from the datasource name.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java:
##########
@@ -94,14 +98,24 @@ private SegmentTransactionalAppendAction(
this.segments = segments;
this.startMetadata = startMetadata;
this.endMetadata = endMetadata;
+ this.supervisorId = supervisorId;
if ((startMetadata == null && endMetadata != null)
|| (startMetadata != null && endMetadata == null)) {
throw InvalidInput.exception("startMetadata and endMetadata must either
be both null or both non-null.");
+ } else if (startMetadata != null && supervisorId == null) {
Review Comment:
We would need to handle null `supervisorId` during rolling upgrades where a
task on old version sends a commit request to Overlord on new version.
So we shouldn't throw an exception here.
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -152,9 +151,13 @@ protected boolean
checkSourceMetadataMatch(DataSourceMetadata metadata)
}
@Override
- protected boolean doesTaskTypeMatchSupervisor(Task task)
+ protected boolean doesTaskMatchSupervisor(Task task)
{
- return task instanceof KafkaIndexTask;
+ if (!(task instanceof KafkaIndexTask)) {
+ return false;
+ }
+ final String supervisorId = ((KafkaIndexTask) task).getSupervisorId();
+ return supervisorId.equals(spec.getId());
Review Comment:
Maybe use a positive condition in the `if` clause instead, and do a
null-safe equals check.
```suggestion
if (task instanceof KafkaIndexTask) {
final String supervisorId = ((KafkaIndexTask) task).getSupervisorId();
return Objects.equal(supervisorId, spec.getId());
} else {
return false;
}
```
Same comment for rabbit stream and kinesis (if applicable).
##########
integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceMultipleSupervisorTest.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.indexer;
+
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.tests.TestNGGroup;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+@Test(groups = {TestNGGroup.KAFKA_INDEX})
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITKafkaIndexingServiceMultipleSupervisorTest extends
AbstractKafkaIndexingServiceTest
+{
+ @Override
+ public String getTestNamePrefix()
+ {
+ return "kafka_multi_supervisor";
+ }
+
+ @BeforeClass
+ public void beforeClass() throws Exception
+ {
+ doBeforeClass();
+ }
+
+ /**
+ * This test tests multiple (2) supervisors simultaneously ingesting into
the same datasource
+ */
+ @Test
+ public void testKafkaIndexMultiSupervisorWithTransaction() throws Exception
+ {
+ doTestMultiSupervisorIndexDataStableState(
+ true,
+ 2
+ );
+ }
+
+ /**
+ * This test tests multiple (2) supervisors simultaneously ingesting into
the same datasource
+ */
+ @Test
+ public void testKafkaIndexMultiSupervisorWithNoTransaction() throws Exception
Review Comment:
I don't think this test warrants a separate class.
Add the transactional test method to
`ITKafkaIndexingServiceTransactionalParallelizedTest`
and the non-transactional one to
`ITKafkaIndexingServiceNonTransactionalParallelizedTest`.
Please do a similar thing for the Kinesis ITs too.
##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatus.java:
##########
@@ -99,6 +102,12 @@ public String getId()
return id;
}
+ @JsonProperty
+ public String getDatasource()
Review Comment:
To align with the field name, I tried a `SELECT * FROM sys.supervisors` and
got datasource field as null probably because of the field name mismatch.
```suggestion
public String getDataSource()
```
##########
extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorSpec.java:
##########
@@ -63,6 +63,7 @@ public RabbitStreamSupervisorSpec(
@JacksonInject SupervisorStateManagerConfig supervisorStateManagerConfig)
{
super(
+ id,
Review Comment:
We should not pass null to the super constructor here.
Instead, we should default to `StringUtils.format("RabbitSupervisor-%s",
spec.getDataSchema().getDataSource())` (previously done in the
`RabbitStreamSupervisor` class).
Same comment for Kinesis and Kafka too.
##########
integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java:
##########
@@ -170,8 +172,7 @@ protected static List<String> listDataFormatResources()
throws IOException
{
return listResources(DATA_RESOURCE_ROOT)
.stream()
- .filter(resource -> !SUPERVISOR_SPEC_TEMPLATE_FILE.equals(resource))
- .filter(resource ->
!SUPERVISOR_WITH_AUTOSCALER_SPEC_TEMPLATE_FILE.equals(resource))
+ .filter(r -> !r.endsWith(".json")) // filter out top-level spec files
Review Comment:
Is this just a shorthand of the previous code here or is there some logical
difference too?
##########
extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTask.java:
##########
@@ -53,7 +55,9 @@ public RabbitStreamIndexTask(
@JsonProperty("tuningConfig") RabbitStreamIndexTaskTuningConfig
tuningConfig,
@JsonProperty("ioConfig") RabbitStreamIndexTaskIOConfig ioConfig,
@JsonProperty("context") Map<String, Object> context,
- @JacksonInject ObjectMapper configMapper)
+ @JacksonInject ObjectMapper configMapper,
+ @JsonProperty("supervisorId") @Nullable String supervisorId
Review Comment:
Please make this the second arg here. Same comment for other for
`*IndexTask`.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java:
##########
@@ -226,7 +226,8 @@ public Response specGetAll(
if (includeFull) {
Optional<SupervisorSpec> theSpec =
manager.getSupervisorSpec(x);
if (theSpec.isPresent()) {
- theBuilder.withSpec(manager.getSupervisorSpec(x).get());
+ theBuilder.withSpec(theSpec.get())
+
.withDataSource(theSpec.get().getDataSources().get(0));
Review Comment:
Can the `getDataSources()` method ever return empty or null?
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java:
##########
@@ -81,7 +83,8 @@ public SeekableStreamIndexTask(
final SeekableStreamIndexTaskTuningConfig tuningConfig,
final SeekableStreamIndexTaskIOConfig<PartitionIdType,
SequenceOffsetType> ioConfig,
@Nullable final Map<String, Object> context,
- @Nullable final String groupId
+ @Nullable final String groupId,
+ @Nullable final String supervisorId
Review Comment:
I feel these args should come right after `id`. If we don't want to bloat up
the diff, we can leave `groupId` where it is and put `supervisorId` after `id`.
Also, given that `supervisorId` is now a core identifying field of the task,
we should not make it nullable. It should be up to the sub-classes to pass the
correct non-null value.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java:
##########
@@ -122,7 +130,15 @@ private SegmentTransactionalInsertAction(
this.startMetadata = startMetadata;
this.endMetadata = endMetadata;
this.dataSource = dataSource;
+ this.supervisorId = supervisorId;
this.segmentSchemaMapping = segmentSchemaMapping;
+
+ if ((startMetadata == null && endMetadata != null)
+ || (startMetadata != null && endMetadata == null)) {
+ throw InvalidInput.exception("startMetadata and endMetadata must either
be both null or both non-null.");
+ } else if (startMetadata != null && supervisorId == null) {
Review Comment:
Need to handle null `supervisorId` during rolling upgrades.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -479,13 +481,15 @@ public void handle()
}
final Integer desiredTaskCount = computeDesiredTaskCount.call();
ServiceMetricEvent.Builder event = ServiceMetricEvent.builder()
- .setDimension(DruidMetrics.DATASOURCE, dataSource)
+
.setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId)
+
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.setDimension(DruidMetrics.STREAM, getIoConfig().getStream());
if (nowTime - dynamicTriggerLastRunTime <
autoScalerConfig.getMinTriggerScaleActionFrequencyMillis()) {
log.info(
- "DynamicAllocationTasksNotice submitted again in [%d] millis,
minTriggerDynamicFrequency is [%s] for dataSource [%s], skipping it! desired
task count is [%s], active task count is [%s]",
+ "DynamicAllocationTasksNotice submitted again in [%d] millis,
minTriggerDynamicFrequency is [%s] for supervisor [%s] for dataSource [%s],
skipping it! desired task count is [%s], active task count is [%s]",
Review Comment:
The double `for` (for supervisor for datasource) seems weird. Please
rephrase here and other places.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -312,6 +312,17 @@ private Set<PartitionIdType>
computeExclusiveStartPartitionsForSequence(
}
}
+ public String getSupervisorId()
Review Comment:
Does this need to be public?
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -312,6 +312,17 @@ private Set<PartitionIdType>
computeExclusiveStartPartitionsForSequence(
}
}
+ public String getSupervisorId()
+ {
+ @Nullable
+ final String supervisorId = task.getSupervisorId();
+ // Backwards compatibility: if task spec from metadata has a null
supervisorId field, fall back to dataSource
Review Comment:
Please put this in the javadoc of this method.
##########
extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTask.java:
##########
@@ -62,7 +66,9 @@ public RabbitStreamIndexTask(
tuningConfig,
ioConfig,
context,
- getFormattedGroupId(dataSchema.getDataSource(), TYPE));
+ getFormattedGroupId(Configs.valueOrDefault(supervisorId,
dataSchema.getDataSource()), TYPE),
+ supervisorId
Review Comment:
We should not pass a `null` to the super. Default it to `dataSource` here
itself.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java:
##########
@@ -93,6 +96,7 @@ public SeekableStreamSupervisorSpec(
)
{
this.ingestionSchema = checkIngestionSchema(ingestionSchema);
+ this.id = Preconditions.checkNotNull(Configs.valueOrDefault(id,
ingestionSchema.getDataSchema().getDataSource()), "spec id cannot be null!");
Review Comment:
We should check for null but not default to datasource here.
The sub classes should pass in the correct value.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -1703,15 +1709,15 @@ public void runInternal()
// if suspended, ensure tasks have been requested to gracefully stop
if
(stateManager.getSupervisorState().getBasicState().equals(SupervisorStateManager.BasicState.STOPPING))
{
// if we're already terminating, don't do anything here, the
terminate already handles shutdown
- log.debug("Supervisor for datasource[%s] is already stopping.",
dataSource);
+ log.debug("Supervisor [%s] for datasource[%s] is already stopping.",
supervisorId, dataSource);
Review Comment:
Nit:
```suggestion
log.debug("Supervisor[%s] for datasource[%s] is already
stopping.", supervisorId, dataSource);
```
##########
server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java:
##########
@@ -379,7 +381,8 @@ SegmentPublishResult commitAppendSegmentsAndMetadata(
DataSourceMetadata startMetadata,
DataSourceMetadata endMetadata,
String taskAllocatorId,
- @Nullable SegmentSchemaMapping segmentSchemaMapping
+ @Nullable SegmentSchemaMapping segmentSchemaMapping,
+ @Nullable String supervisorId
Review Comment:
Since the `startMetadata` and `endMetadata` are not nullable, `supervisorId`
should not be nullable either. Caller should ensure passing a non-null value.
DB layer can just do a validation that it is non null, it shouldn't need to
determine the default value.
##########
indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java:
##########
@@ -92,13 +93,15 @@ private LockResult acquireTimeChunkLock(TaskLockType
lockType, Task task, Interv
}
@Test
- public void testTransactionalUpdateDataSourceMetadata() throws Exception
+ public void
testTransactionalUpdateDataSourceMetadataWithDefaultSupervisorId() throws
Exception
Review Comment:
Maybe consider this naming convention for slightly better readability:
```suggestion
public void test_updateDataSourceMetadata_withDefaultSupervisorId() throws
Exception
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]