kfaraz commented on code in PR #18082:
URL: https://github.com/apache/druid/pull/18082#discussion_r2139146461


##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -251,29 +251,47 @@ public void createTable(final String tableName, final 
Iterable<String> sql)
   /**
    * Execute the desired ALTER statement on the desired table
    *
-   * @param tableName The name of the table being altered
-   * @param sql ALTER statment to be executed
+   * @param tableName      The name of the table being altered
+   * @param sql            ALTER statements to be executed
+   * @param useTransaction Whether to use a transaction for this operation
    */
-  private void alterTable(final String tableName, final Iterable<String> sql)
+  private void alterTable(final String tableName, final Iterable<String> sql, 
final boolean useTransaction)
   {
     try {
       retryWithHandle(handle -> {
         if (tableExists(handle, tableName)) {
-          final Batch batch = handle.createBatch();
-          for (String s : sql) {
-            log.info("Altering table[%s], with command: %s", tableName, s);

Review Comment:
   We should retain this log line somewhere, maybe in `executeBatch` method.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/ResetDataSourceMetadataAction.java:
##########
@@ -26,14 +26,17 @@
 
 public class ResetDataSourceMetadataAction implements TaskAction<Boolean>
 {
+  private final String supervisorId;
   private final String dataSource;
   private final DataSourceMetadata resetMetadata;
 
   public ResetDataSourceMetadataAction(
+      @JsonProperty("supervisorId") String supervisorId,
       @JsonProperty("dataSource") String dataSource,

Review Comment:
   We should probably make this nullable (and also deprecated?) because new 
code need not send this anymore.



##########
server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java:
##########
@@ -405,47 +408,48 @@ SegmentPublishResult commitReplaceSegments(
   );
 
   /**
-   * Retrieves data source's metadata from the metadata store. Returns null if 
there is no metadata.
+   * Retrieves supervisor's metadata from the datasource metadata store. 
Returns null if there is no metadata.

Review Comment:
   ```suggestion
      * Retrieves supervisor's metadata from the metadata store. Returns null 
if there is no metadata.
   ```



##########
server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java:
##########
@@ -405,47 +408,48 @@ SegmentPublishResult commitReplaceSegments(
   );
 
   /**
-   * Retrieves data source's metadata from the metadata store. Returns null if 
there is no metadata.
+   * Retrieves supervisor's metadata from the datasource metadata store. 
Returns null if there is no metadata.
    */
-  @Nullable DataSourceMetadata retrieveDataSourceMetadata(String dataSource);
+  @Nullable DataSourceMetadata retrieveDataSourceMetadata(String supervisorId);
 
   /**
-   * Removes entry for 'dataSource' from the dataSource metadata table.
+   * Removes entry for 'supervisorId' from the dataSource metadata table.

Review Comment:
   ```suggestion
      * Removes entry for {@code supervisorId} from the dataSource metadata 
table.
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/ResetDataSourceMetadataAction.java:
##########
@@ -44,6 +47,12 @@ public String getDataSource()
     return dataSource;
   }
 
+  @JsonProperty
+  public String getSupervisorId()

Review Comment:
   Please annotate as nullable.



##########
integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceMultipleSupervisorTest.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.indexer;
+
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.tests.TestNGGroup;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+@Test(groups = {TestNGGroup.KAFKA_INDEX})
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITKafkaIndexingServiceMultipleSupervisorTest extends 
AbstractKafkaIndexingServiceTest
+{
+  @Override
+  public String getTestNamePrefix()
+  {
+    return "kafka_multi_supervisor";
+  }
+
+  @BeforeClass
+  public void beforeClass() throws Exception
+  {
+    doBeforeClass();
+  }
+
+  /**
+   * This test tests multiple (2) supervisors simultaneously ingesting into 
the same datasource
+   */
+  @Test
+  public void testKafkaIndexMultiSupervisorWithTransaction() throws Exception
+  {
+    doTestMultiSupervisorIndexDataStableState(
+        true,
+        2
+    );
+  }
+
+  /**
+   * This test tests multiple (2) supervisors simultaneously ingesting into 
the same datasource
+   */

Review Comment:
   Javadoc is not needed, the functionality should be apparent from the test 
name itself.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -1418,6 +1429,7 @@ protected void sendResetRequestAndWait(
         .getTaskActionClient()
         .submit(
             new ResetDataSourceMetadataAction(
+                getSupervisorId(),
                 task.getDataSource(),

Review Comment:
   I think we can just pass `null` for datasource now since the task action 
doesn't use it if the `supervisorId` is non null.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -569,7 +575,7 @@ private boolean changeTaskCount(int desiredActiveTaskCount)
                                          AUTOSCALER_SCALING_TIME_METRIC,
                                          scaleActionStopwatch.millisElapsed()
                                      ));
-      log.info("Changed taskCount to [%s] for dataSource [%s].", 
desiredActiveTaskCount, dataSource);
+      log.info("Changed taskCount to [%s] for supervisor [%s] for dataSource 
[%s].", desiredActiveTaskCount, supervisorId, dataSource);

Review Comment:
   ```suggestion
         log.info("Changed taskCount to [%s] for supervisor[%s] of 
dataSource[%s].", desiredActiveTaskCount, supervisorId, dataSource);
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -918,7 +924,7 @@ public SeekableStreamSupervisor(
     this.autoScalerConfig = ioConfig.getAutoScalerConfig();
     this.tuningConfig = spec.getTuningConfig();
     this.taskTuningConfig = this.tuningConfig.convertToTaskTuningConfig();
-    this.supervisorId = supervisorId;
+    this.supervisorId = Preconditions.checkNotNull(supervisorId, "supervisorId 
cannot be null");

Review Comment:
   The old code used a `supervisorId` in this class which had the value:
   
   ```java
   StringUtils.format("KafkaSupervisor-%s", 
spec.getDataSchema().getDataSource())
   ```
   (or equivalent for RabbitMQ and Kinesis)
   
   This supervisorId was being used only for logging and not sent over the wire 
or persisted to DB. So, users searching for a specific format of log messages 
might get incorrect results after this change.
   
   We should either call out this point in "Upgrade Notes" section in the PR 
description
   OR
   build a logging key here that retains the older naming scheme.
   
   One advantage of the old logging key was that the type of the supervisor was 
readily apparent. After this PR, it is possible to have a Kinesis supervisor 
and a Kafka supervisor ingesting into the same datasource. The type distinction 
in the logs would then come in handy.
   
   Let me know what you think.



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -328,6 +346,27 @@ tableName, getPayloadType(), getCollation()
             )
         )
     );
+
+    if (!tableHasColumn(tableName, "supervisor_id")) {
+      final List<String> alterCommands = new ArrayList<>();
+      alterCommands.add(StringUtils.format(
+          "ALTER TABLE %s ADD COLUMN supervisor_id VARCHAR(255) NOT NULL 
DEFAULT 'NULL'",

Review Comment:
   `"null"` and `null` can be confusing. Maybe use a default value of empty 
string instead.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java:
##########
@@ -66,25 +66,29 @@ public class SegmentTransactionalAppendAction implements 
TaskAction<SegmentPubli
   @Nullable
   private final DataSourceMetadata endMetadata;
   @Nullable
+  private final String supervisorId;
+  @Nullable
   private final SegmentSchemaMapping segmentSchemaMapping;
 
   public static SegmentTransactionalAppendAction forSegments(Set<DataSegment> 
segments, SegmentSchemaMapping segmentSchemaMapping)
   {
-    return new SegmentTransactionalAppendAction(segments, null, null, 
segmentSchemaMapping);
+    return new SegmentTransactionalAppendAction(null, segments, null, null, 
segmentSchemaMapping);
   }
 
   public static SegmentTransactionalAppendAction forSegmentsAndMetadata(
+      String supervisorId,

Review Comment:
   Please put this argument after `segments` arg so that the logical order of 
arguments is:
   segments -> metadata -> schema
   
   Some comment for other task places (like `IndexerMetadataCoordinator`) for 
consistency.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/ResetDataSourceMetadataAction.java:
##########
@@ -26,14 +26,17 @@
 
 public class ResetDataSourceMetadataAction implements TaskAction<Boolean>
 {
+  private final String supervisorId;
   private final String dataSource;
   private final DataSourceMetadata resetMetadata;
 
   public ResetDataSourceMetadataAction(
+      @JsonProperty("supervisorId") String supervisorId,

Review Comment:
   Needs to be nullable for rolling upgrades. We can default to `dataSource` 
when null.



##########
server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java:
##########
@@ -335,7 +336,8 @@ SegmentPublishResult commitSegmentsAndMetadata(
       Set<DataSegment> segments,
       @Nullable DataSourceMetadata startMetadata,
       @Nullable DataSourceMetadata endMetadata,
-      @Nullable SegmentSchemaMapping segmentSchemaMapping
+      @Nullable SegmentSchemaMapping segmentSchemaMapping,
+      @Nullable String supervisorId

Review Comment:
   Please put this arg right before `startMetadata`. Similar comment for other 
methods.



##########
processing/src/main/java/org/apache/druid/query/DruidMetrics.java:
##########
@@ -49,6 +49,7 @@ public class DruidMetrics
   public static final String TASK_ACTION_TYPE = "taskActionType";
   public static final String STREAM = "stream";
   public static final String PARTITION = "partition";
+  public static final String SUPERVISOR_ID = "supervisor";

Review Comment:
   For consistency with other dimensions:
   ```suggestion
     public static final String SUPERVISOR_ID = "supervisorId";
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java:
##########
@@ -132,7 +146,9 @@ public SegmentSchemaMapping getSegmentSchemaMapping()
   @Override
   public TypeReference<SegmentPublishResult> getReturnTypeReference()
   {
-    return new TypeReference<>() {};
+    return new TypeReference<SegmentPublishResult>()
+    {
+    };

Review Comment:
   Change not needed.



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -328,6 +346,27 @@ tableName, getPayloadType(), getCollation()
             )
         )
     );
+
+    if (!tableHasColumn(tableName, "supervisor_id")) {
+      final List<String> alterCommands = new ArrayList<>();
+      alterCommands.add(StringUtils.format(
+          "ALTER TABLE %s ADD COLUMN supervisor_id VARCHAR(255) NOT NULL 
DEFAULT 'NULL'",

Review Comment:
   We should use collation here, same as dataSource column.



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -328,6 +346,27 @@ tableName, getPayloadType(), getCollation()
             )
         )
     );
+
+    if (!tableHasColumn(tableName, "supervisor_id")) {

Review Comment:
   Please move this logic to a new method `alterDataSourceTable`.
   
   Also, please update the CREATE statement above to have the supervisor_id and 
the correct PRIMARY KEY. We shouldn't need to alter a table if it is being 
freshly created.



##########
extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/DerivativeDataSourceManager.java:
##########
@@ -158,7 +158,7 @@ private void updateDerivatives()
           handle
               .createQuery(
                   StringUtils.format(
-                      "SELECT DISTINCT dataSource,commit_metadata_payload FROM 
%1$s",
+                      "SELECT DISTINCT 
supervisor_id,dataSource,commit_metadata_payload FROM %1$s",

Review Comment:
   Did you intend to read the `supervisor_id` from the result later in this 
method?
   Or is it being selected just for uniqueness?



##########
sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java:
##########
@@ -1392,6 +1392,7 @@ public void testSupervisorTable() throws Exception
 
     String json = "[{\n"
                   + "\t\"id\": \"wikipedia\",\n"
+                  + "\t\"dataSource\": \"wikipedia\",\n"

Review Comment:
   Try using an id that is different from the datasource name.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java:
##########
@@ -94,14 +98,24 @@ private SegmentTransactionalAppendAction(
     this.segments = segments;
     this.startMetadata = startMetadata;
     this.endMetadata = endMetadata;
+    this.supervisorId = supervisorId;
 
     if ((startMetadata == null && endMetadata != null)
         || (startMetadata != null && endMetadata == null)) {
       throw InvalidInput.exception("startMetadata and endMetadata must either 
be both null or both non-null.");
+    } else if (startMetadata != null && supervisorId == null) {

Review Comment:
   We would need to handle null `supervisorId` during rolling upgrades where a 
task on old version sends a commit request to Overlord on new version.
   
   So we shouldn't throw an exception here.



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -152,9 +151,13 @@ protected boolean 
checkSourceMetadataMatch(DataSourceMetadata metadata)
   }
 
   @Override
-  protected boolean doesTaskTypeMatchSupervisor(Task task)
+  protected boolean doesTaskMatchSupervisor(Task task)
   {
-    return task instanceof KafkaIndexTask;
+    if (!(task instanceof KafkaIndexTask)) {
+      return false;
+    }
+    final String supervisorId = ((KafkaIndexTask) task).getSupervisorId();
+    return supervisorId.equals(spec.getId());

Review Comment:
   Maybe use a positive condition in the `if` clause instead, and do a 
null-safe equals check.
   
   ```suggestion
       if (task instanceof KafkaIndexTask) {
         final String supervisorId = ((KafkaIndexTask) task).getSupervisorId();
         return Objects.equal(supervisorId, spec.getId());
       } else {
         return false;
       }
   ```
   
   Same comment for rabbit stream and kinesis (if applicable).



##########
integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceMultipleSupervisorTest.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.indexer;
+
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.tests.TestNGGroup;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+@Test(groups = {TestNGGroup.KAFKA_INDEX})
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITKafkaIndexingServiceMultipleSupervisorTest extends 
AbstractKafkaIndexingServiceTest
+{
+  @Override
+  public String getTestNamePrefix()
+  {
+    return "kafka_multi_supervisor";
+  }
+
+  @BeforeClass
+  public void beforeClass() throws Exception
+  {
+    doBeforeClass();
+  }
+
+  /**
+   * This test tests multiple (2) supervisors simultaneously ingesting into 
the same datasource
+   */
+  @Test
+  public void testKafkaIndexMultiSupervisorWithTransaction() throws Exception
+  {
+    doTestMultiSupervisorIndexDataStableState(
+        true,
+        2
+    );
+  }
+
+  /**
+   * This test tests multiple (2) supervisors simultaneously ingesting into 
the same datasource
+   */
+  @Test
+  public void testKafkaIndexMultiSupervisorWithNoTransaction() throws Exception

Review Comment:
   I don't think this test warrants a separate class.
   
   Add the transactional test method to 
`ITKafkaIndexingServiceTransactionalParallelizedTest`
   and the non-transactional one to 
`ITKafkaIndexingServiceNonTransactionalParallelizedTest`.
   
   Please do a similar thing for the Kinesis ITs too.



##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatus.java:
##########
@@ -99,6 +102,12 @@ public String getId()
     return id;
   }
 
+  @JsonProperty
+  public String getDatasource()

Review Comment:
   To align with the field name, I tried a `SELECT * FROM sys.supervisors` and 
got datasource field as null probably because of the field name mismatch.
   
   ```suggestion
     public String getDataSource()
   ```



##########
extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorSpec.java:
##########
@@ -63,6 +63,7 @@ public RabbitStreamSupervisorSpec(
       @JacksonInject SupervisorStateManagerConfig supervisorStateManagerConfig)
   {
     super(
+        id,

Review Comment:
   We should not pass null to the super constructor here.
   Instead, we should default to `StringUtils.format("RabbitSupervisor-%s", 
spec.getDataSchema().getDataSource())` (previously done in the 
`RabbitStreamSupervisor` class). 
   
   Same comment for Kinesis and Kafka too.



##########
integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java:
##########
@@ -170,8 +172,7 @@ protected static List<String> listDataFormatResources() 
throws IOException
   {
     return listResources(DATA_RESOURCE_ROOT)
         .stream()
-        .filter(resource -> !SUPERVISOR_SPEC_TEMPLATE_FILE.equals(resource))
-        .filter(resource -> 
!SUPERVISOR_WITH_AUTOSCALER_SPEC_TEMPLATE_FILE.equals(resource))
+        .filter(r -> !r.endsWith(".json")) // filter out top-level spec files

Review Comment:
   Is this just a shorthand of the previous code here or is there some logical 
difference too?



##########
extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTask.java:
##########
@@ -53,7 +55,9 @@ public RabbitStreamIndexTask(
       @JsonProperty("tuningConfig") RabbitStreamIndexTaskTuningConfig 
tuningConfig,
       @JsonProperty("ioConfig") RabbitStreamIndexTaskIOConfig ioConfig,
       @JsonProperty("context") Map<String, Object> context,
-      @JacksonInject ObjectMapper configMapper)
+      @JacksonInject ObjectMapper configMapper,
+      @JsonProperty("supervisorId") @Nullable String supervisorId

Review Comment:
   Please make this the second arg here. Same comment for other for 
`*IndexTask`.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java:
##########
@@ -226,7 +226,8 @@ public Response specGetAll(
                   if (includeFull) {
                     Optional<SupervisorSpec> theSpec = 
manager.getSupervisorSpec(x);
                     if (theSpec.isPresent()) {
-                      theBuilder.withSpec(manager.getSupervisorSpec(x).get());
+                      theBuilder.withSpec(theSpec.get())
+                          
.withDataSource(theSpec.get().getDataSources().get(0));

Review Comment:
   Can the `getDataSources()` method ever return empty or null?



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java:
##########
@@ -81,7 +83,8 @@ public SeekableStreamIndexTask(
       final SeekableStreamIndexTaskTuningConfig tuningConfig,
       final SeekableStreamIndexTaskIOConfig<PartitionIdType, 
SequenceOffsetType> ioConfig,
       @Nullable final Map<String, Object> context,
-      @Nullable final String groupId
+      @Nullable final String groupId,
+      @Nullable final String supervisorId

Review Comment:
   I feel these args should come right after `id`. If we don't want to bloat up 
the diff, we can leave `groupId` where it is and put `supervisorId` after `id`.
   
   Also, given that `supervisorId` is now a core identifying field of the task, 
we should not make it nullable. It should be up to the sub-classes to pass the 
correct non-null value.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java:
##########
@@ -122,7 +130,15 @@ private SegmentTransactionalInsertAction(
     this.startMetadata = startMetadata;
     this.endMetadata = endMetadata;
     this.dataSource = dataSource;
+    this.supervisorId = supervisorId;
     this.segmentSchemaMapping = segmentSchemaMapping;
+
+    if ((startMetadata == null && endMetadata != null)
+        || (startMetadata != null && endMetadata == null)) {
+      throw InvalidInput.exception("startMetadata and endMetadata must either 
be both null or both non-null.");
+    } else if (startMetadata != null && supervisorId == null) {

Review Comment:
   Need to handle null `supervisorId` during rolling upgrades.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -479,13 +481,15 @@ public void handle()
           }
           final Integer desiredTaskCount = computeDesiredTaskCount.call();
           ServiceMetricEvent.Builder event = ServiceMetricEvent.builder()
-              .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                                                               
.setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId)
+                                                               
.setDimension(DruidMetrics.DATASOURCE, dataSource)
               .setDimension(DruidMetrics.STREAM, getIoConfig().getStream());
           if (nowTime - dynamicTriggerLastRunTime < 
autoScalerConfig.getMinTriggerScaleActionFrequencyMillis()) {
             log.info(
-                "DynamicAllocationTasksNotice submitted again in [%d] millis, 
minTriggerDynamicFrequency is [%s] for dataSource [%s], skipping it! desired 
task count is [%s], active task count is [%s]",
+                "DynamicAllocationTasksNotice submitted again in [%d] millis, 
minTriggerDynamicFrequency is [%s] for supervisor [%s] for dataSource [%s], 
skipping it! desired task count is [%s], active task count is [%s]",

Review Comment:
   The double `for` (for supervisor for datasource) seems weird. Please 
rephrase here and other places.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -312,6 +312,17 @@ private Set<PartitionIdType> 
computeExclusiveStartPartitionsForSequence(
     }
   }
 
+  public String getSupervisorId()

Review Comment:
   Does this need to be public?



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -312,6 +312,17 @@ private Set<PartitionIdType> 
computeExclusiveStartPartitionsForSequence(
     }
   }
 
+  public String getSupervisorId()
+  {
+    @Nullable
+    final String supervisorId = task.getSupervisorId();
+    // Backwards compatibility: if task spec from metadata has a null 
supervisorId field, fall back to dataSource

Review Comment:
   Please put this in the javadoc of this method.



##########
extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTask.java:
##########
@@ -62,7 +66,9 @@ public RabbitStreamIndexTask(
         tuningConfig,
         ioConfig,
         context,
-        getFormattedGroupId(dataSchema.getDataSource(), TYPE));
+        getFormattedGroupId(Configs.valueOrDefault(supervisorId, 
dataSchema.getDataSource()), TYPE),
+        supervisorId

Review Comment:
   We should not pass a `null` to the super. Default it to `dataSource` here 
itself.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java:
##########
@@ -93,6 +96,7 @@ public SeekableStreamSupervisorSpec(
   )
   {
     this.ingestionSchema = checkIngestionSchema(ingestionSchema);
+    this.id = Preconditions.checkNotNull(Configs.valueOrDefault(id, 
ingestionSchema.getDataSchema().getDataSource()), "spec id cannot be null!");

Review Comment:
   We should check for null but not default to datasource here.
   The sub classes should pass in the correct value.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -1703,15 +1709,15 @@ public void runInternal()
         // if suspended, ensure tasks have been requested to gracefully stop
         if 
(stateManager.getSupervisorState().getBasicState().equals(SupervisorStateManager.BasicState.STOPPING))
 {
           // if we're already terminating, don't do anything here, the 
terminate already handles shutdown
-          log.debug("Supervisor for datasource[%s] is already stopping.", 
dataSource);
+          log.debug("Supervisor [%s] for datasource[%s] is already stopping.", 
supervisorId, dataSource);

Review Comment:
   Nit:
   ```suggestion
             log.debug("Supervisor[%s] for datasource[%s] is already 
stopping.", supervisorId, dataSource);
   ```



##########
server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java:
##########
@@ -379,7 +381,8 @@ SegmentPublishResult commitAppendSegmentsAndMetadata(
       DataSourceMetadata startMetadata,
       DataSourceMetadata endMetadata,
       String taskAllocatorId,
-      @Nullable SegmentSchemaMapping segmentSchemaMapping
+      @Nullable SegmentSchemaMapping segmentSchemaMapping,
+      @Nullable String supervisorId

Review Comment:
   Since the `startMetadata` and `endMetadata` are not nullable, `supervisorId` 
should not be nullable either. Caller should ensure passing a non-null value. 
DB layer can just do a validation that it is non null, it shouldn't need to 
determine the default value.



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java:
##########
@@ -92,13 +93,15 @@ private LockResult acquireTimeChunkLock(TaskLockType 
lockType, Task task, Interv
   }
 
   @Test
-  public void testTransactionalUpdateDataSourceMetadata() throws Exception
+  public void 
testTransactionalUpdateDataSourceMetadataWithDefaultSupervisorId() throws 
Exception

Review Comment:
   Maybe consider this naming convention for slightly better readability:
   
   ```suggestion
     public void test_updateDataSourceMetadata_withDefaultSupervisorId() throws 
Exception
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to