kfaraz commented on code in PR #17775:
URL: https://github.com/apache/druid/pull/17775#discussion_r1984707287


##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -386,5 +415,17 @@ public boolean cancel(boolean interruptIfRunning)
       return true;
     }
   }
+
+  private ScheduledExecutorService getExecutorService(SegmentLoadingMode 
loadingMode)
+  {
+    switch (loadingMode) {
+      case TURBO:
+        return turboExec;
+      case NORMAL:
+        return exec;
+      default:
+        throw DruidException.defensive("Unknown execution mode [%s]", 
loadingMode);
+    }

Review Comment:
   Nit: This probably can be simpler
   
   ```suggestion
        return loadingMode == SegmentLoadingMode.TURBO ? turboExec : exec;
   ```



##########
server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java:
##########
@@ -162,6 +166,7 @@ public CoordinatorDynamicConfig(
     );
     this.debugDimensions = debugDimensions;
     this.validDebugDimensions = validateDebugDimensions(debugDimensions);
+    this.turboLoadingNodes = parseJsonStringOrArray(turboLoadingNodes);

Review Comment:
   We need not support `String` since this is a new config. Let's just support 
array only.
   
   ```suggestion
       this.turboLoadingNodes = Configs.valueOrDefault(turboLoadingNodes, 
Set.of());
   ```



##########
docs/configuration/index.md:
##########
@@ -885,7 +885,7 @@ These Coordinator static configurations can be defined in 
the `coordinator/runti
 |`druid.coordinator.kill.maxInterval`|The largest interval, as an [ISO 8601 
duration](https://en.wikipedia.org/wiki/ISO_8601#Durations), of segments to 
delete per kill task. Set to zero, e.g. `PT0S`, for unlimited. This only 
applies when `druid.coordinator.kill.on=true`.|`P30D`|
 |`druid.coordinator.balancer.strategy`|The [balancing 
strategy](../design/coordinator.md#balancing-segments-in-a-tier) used by the 
Coordinator to distribute segments among the Historical servers in a tier. The 
`cost` strategy distributes segments by minimizing a cost function, 
`diskNormalized` weights these costs with the disk usage ratios of the servers 
and `random` distributes segments randomly.|`cost`|
 |`druid.coordinator.loadqueuepeon.http.repeatDelay`|The start and repeat delay 
(in milliseconds) for the load queue peon, which manages the load/drop queue of 
segments for any server.|1 minute|
-|`druid.coordinator.loadqueuepeon.http.batchSize`|Number of segment load/drop 
requests to batch in one HTTP request. Note that it must be smaller than 
`druid.segmentCache.numLoadingThreads` config on Historical service.|1|
+|`druid.coordinator.loadqueuepeon.http.batchSize`|Number of segment load/drop 
requests to batch in one HTTP request. Note that it must be smaller than or 
equal to the `druid.segmentCache.numLoadingThreads` config on Historical 
service. If the value is not provided, the coordinator automatically 
dynamically configures the value to the `numLoadingThreads` available on the 
Historical. | `druid.segmentCache.numLoadingThreads` |

Review Comment:
   ```suggestion
   |`druid.coordinator.loadqueuepeon.http.batchSize`|Number of segment 
load/drop requests to batch in one HTTP request. Note that it must be smaller 
than or equal to the `druid.segmentCache.numLoadingThreads` config on 
Historical service. If this value is not configured, the coordinator uses the 
value of the `numLoadingThreads` for the respective server. | 
`druid.segmentCache.numLoadingThreads` |
   ```



##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -308,6 +311,9 @@ private void logRequestFailure(Throwable t)
           processingExecutor
       );
     }
+    catch (MalformedURLException ex) {
+      throw new RuntimeException(ex);
+    }

Review Comment:
   We probably don't need a separate catch since we are already catching all 
throwables.



##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -200,13 +192,24 @@ private void doSegmentManagement()
       return;
     }
 
+    SegmentLoadingMode loadingMode = loadingModeSupplier.get();

Review Comment:
   ```suggestion
       final SegmentLoadingMode loadingMode = loadingModeSupplier.get();
   ```



##########
server/src/main/java/org/apache/druid/server/http/HistoricalLoadingCapabilities.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.http;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.error.DruidException;
+
+public class HistoricalLoadingCapabilities

Review Comment:
   We might use this for servers other than historicals.
   After all, the `SegmentLoadDropHandler` is used on other server types too.
   
   ```suggestion
   public class SegmentLoadingCapabilities
   ```
   
   Please add a short javadoc too.



##########
server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java:
##########
@@ -327,6 +330,25 @@ public void onFailure(Throwable th)
     asyncContext.setTimeout(timeout);
   }
 
+  @GET
+  @Path("/segmentLoadingCapabilities")

Review Comment:
   ```suggestion
     @Path("/loadCapabilities")
   ```



##########
server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java:
##########
@@ -454,19 +454,27 @@ private Environment(
           createBalancerStrategy(balancerStrategy),
           new HttpLoadQueuePeonConfig(null, null, null)
       );
+
+      JacksonConfigManager jacksonConfigManager = mockConfigManager();
+      setDynamicConfig(dynamicConfig);
+      CoordinatorConfigManager coordinatorConfigManager = new 
CoordinatorConfigManager(
+          jacksonConfigManager,
+          null,
+          null
+      );
+
       this.loadQueueTaskMaster = new LoadQueueTaskMaster(
           OBJECT_MAPPER,
           executorFactory.create(1, ExecutorFactory.LOAD_QUEUE_EXECUTOR),
           executorFactory.create(1, ExecutorFactory.LOAD_CALLBACK_EXECUTOR),
           coordinatorConfig.getHttpLoadQueuePeonConfig(),
-          httpClient
+          httpClient,
+          coordinatorConfigManager::getCurrentDynamicConfig

Review Comment:
   ```suggestion
             () -> dynamicConfig
   ```



##########
server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java:
##########
@@ -76,6 +82,15 @@ public class HttpLoadQueuePeonTest
   public void setUp()
   {
     httpClient = new TestHttpClient();
+    JacksonConfigManager configManager = 
EasyMock.createNiceMock(JacksonConfigManager.class);
+    EasyMock.expect(
+        configManager.watch(
+            EasyMock.eq(CoordinatorDynamicConfig.CONFIG_KEY),
+            EasyMock.anyObject(Class.class),
+            EasyMock.anyObject()
+        )
+    ).andReturn(new 
AtomicReference<>(CoordinatorDynamicConfig.builder().build())).anyTimes();
+    EasyMock.replay(configManager);

Review Comment:
   I don't think `configManager` is being used anywhere. We can remove this.



##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -139,17 +146,72 @@ public HttpLoadQueuePeon(
     this.callBackExecutor = callBackExecutor;
 
     this.serverId = baseUrl;
+    this.loadingModeSupplier = loadingModeSupplier;
+    this.serverCapabilities = fetchSegmentLoadingCapabilities();
+  }
+
+  @VisibleForTesting
+  HttpLoadQueuePeon(
+      String baseUrl,
+      ObjectMapper jsonMapper,
+      HttpClient httpClient,
+      HttpLoadQueuePeonConfig config,
+      ScheduledExecutorService processingExecutor,
+      ExecutorService callBackExecutor,
+      Supplier<SegmentLoadingMode> loadingModeSupplier,
+      HistoricalLoadingCapabilities serverCapabilities
+  )
+  {
+    this.jsonMapper = jsonMapper;
+    this.requestBodyWriter = jsonMapper.writerFor(REQUEST_ENTITY_TYPE_REF);
+    this.httpClient = httpClient;
+    this.config = config;
+    this.processingExecutor = processingExecutor;
+    this.callBackExecutor = callBackExecutor;
+
+    this.serverId = baseUrl;
+    this.loadingModeSupplier = loadingModeSupplier;
+    this.serverCapabilities = serverCapabilities;
+  }
+
+  private HistoricalLoadingCapabilities fetchSegmentLoadingCapabilities()
+  {
     try {
-      this.changeRequestURL = new URL(
-          new URL(baseUrl),
-          StringUtils.nonStrictFormat(
-              "druid-internal/v1/segments/changeRequests?timeout=%d",
-              config.getHostTimeout().getMillis()
-          )
+      log.trace("Fetching historical capabilities from Server[%s].", new 
URL(serverId));
+      final URL segmentLoadingCapabilitiesURL = new URL(
+          new URL(serverId),
+          "druid-internal/v1/segments/segmentLoadingCapabilities"
+      );
+
+      BytesAccumulatingResponseHandler responseHandler = new 
BytesAccumulatingResponseHandler();
+      InputStream stream = httpClient.go(
+          new Request(HttpMethod.GET, segmentLoadingCapabilitiesURL)
+              .addHeader(HttpHeaders.Names.ACCEPT, MediaType.APPLICATION_JSON),
+          responseHandler,
+          new Duration(10000)
+      ).get();
+
+      if (HttpServletResponse.SC_NOT_FOUND == responseHandler.getStatus()) {
+        log.info(
+            "Historical capabilities endpoint not found at server[%s]. Using 
default values.",

Review Comment:
   Include the URL in this log message.



##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -139,17 +146,72 @@ public HttpLoadQueuePeon(
     this.callBackExecutor = callBackExecutor;
 
     this.serverId = baseUrl;
+    this.loadingModeSupplier = loadingModeSupplier;
+    this.serverCapabilities = fetchSegmentLoadingCapabilities();
+  }
+
+  @VisibleForTesting
+  HttpLoadQueuePeon(
+      String baseUrl,
+      ObjectMapper jsonMapper,
+      HttpClient httpClient,
+      HttpLoadQueuePeonConfig config,
+      ScheduledExecutorService processingExecutor,
+      ExecutorService callBackExecutor,
+      Supplier<SegmentLoadingMode> loadingModeSupplier,
+      HistoricalLoadingCapabilities serverCapabilities
+  )
+  {
+    this.jsonMapper = jsonMapper;
+    this.requestBodyWriter = jsonMapper.writerFor(REQUEST_ENTITY_TYPE_REF);
+    this.httpClient = httpClient;
+    this.config = config;
+    this.processingExecutor = processingExecutor;
+    this.callBackExecutor = callBackExecutor;
+
+    this.serverId = baseUrl;
+    this.loadingModeSupplier = loadingModeSupplier;
+    this.serverCapabilities = serverCapabilities;
+  }
+
+  private HistoricalLoadingCapabilities fetchSegmentLoadingCapabilities()
+  {
     try {
-      this.changeRequestURL = new URL(
-          new URL(baseUrl),
-          StringUtils.nonStrictFormat(
-              "druid-internal/v1/segments/changeRequests?timeout=%d",
-              config.getHostTimeout().getMillis()
-          )
+      log.trace("Fetching historical capabilities from Server[%s].", new 
URL(serverId));
+      final URL segmentLoadingCapabilitiesURL = new URL(
+          new URL(serverId),
+          "druid-internal/v1/segments/segmentLoadingCapabilities"
+      );
+
+      BytesAccumulatingResponseHandler responseHandler = new 
BytesAccumulatingResponseHandler();
+      InputStream stream = httpClient.go(
+          new Request(HttpMethod.GET, segmentLoadingCapabilitiesURL)
+              .addHeader(HttpHeaders.Names.ACCEPT, MediaType.APPLICATION_JSON),
+          responseHandler,
+          new Duration(10000)
+      ).get();
+
+      if (HttpServletResponse.SC_NOT_FOUND == responseHandler.getStatus()) {
+        log.info(

Review Comment:
   ```suggestion
           log.warn(
   ```



##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -308,12 +380,29 @@ private void logRequestFailure(Throwable t)
           processingExecutor
       );
     }
+    catch (MalformedURLException ex) {
+      throw new RuntimeException(ex);
+    }
     catch (Throwable th) {
       log.error(th, "Error sending load/drop request to [%s].", serverId);
       mainLoopInProgress.set(false);
     }
   }
 
+  @VisibleForTesting
+  int calculateBatchSize(SegmentLoadingMode loadingMode)
+  {
+    if (config.getBatchSize() != null) {
+      return config.getBatchSize();
+    } else if (SegmentLoadingMode.TURBO.equals(loadingMode)) {
+      return serverCapabilities.getNumTurboLoadingThreads();
+    } else if (SegmentLoadingMode.NORMAL.equals(loadingMode)) {
+      return serverCapabilities.getNumLoadingThreads();
+    } else {
+      throw DruidException.defensive().build("unsupported loading mode");
+    }

Review Comment:
   Precedence is:
   turbo config > batch size config > default
   
   (you may call this out in the javadoc of this method)
   
   ```suggestion
       if (SegmentLoadingMode.TURBO.equals(loadingMode)) {
         return serverCapabilities.getNumTurboLoadingThreads();
       } else if {
         return Configs.valueOrDefault(config.getBatchSize(), 
serverCapabilities.getNumLoadingThreads());
       }
   ```
   
   (I assume that once we are in this method, `serverCapabilities` will never 
be null.)



##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -139,17 +146,72 @@ public HttpLoadQueuePeon(
     this.callBackExecutor = callBackExecutor;
 
     this.serverId = baseUrl;
+    this.loadingModeSupplier = loadingModeSupplier;
+    this.serverCapabilities = fetchSegmentLoadingCapabilities();
+  }
+
+  @VisibleForTesting
+  HttpLoadQueuePeon(
+      String baseUrl,
+      ObjectMapper jsonMapper,
+      HttpClient httpClient,
+      HttpLoadQueuePeonConfig config,
+      ScheduledExecutorService processingExecutor,
+      ExecutorService callBackExecutor,
+      Supplier<SegmentLoadingMode> loadingModeSupplier,
+      HistoricalLoadingCapabilities serverCapabilities
+  )
+  {
+    this.jsonMapper = jsonMapper;
+    this.requestBodyWriter = jsonMapper.writerFor(REQUEST_ENTITY_TYPE_REF);
+    this.httpClient = httpClient;
+    this.config = config;
+    this.processingExecutor = processingExecutor;
+    this.callBackExecutor = callBackExecutor;
+
+    this.serverId = baseUrl;
+    this.loadingModeSupplier = loadingModeSupplier;
+    this.serverCapabilities = serverCapabilities;
+  }
+
+  private HistoricalLoadingCapabilities fetchSegmentLoadingCapabilities()
+  {
     try {
-      this.changeRequestURL = new URL(
-          new URL(baseUrl),
-          StringUtils.nonStrictFormat(
-              "druid-internal/v1/segments/changeRequests?timeout=%d",
-              config.getHostTimeout().getMillis()
-          )
+      log.trace("Fetching historical capabilities from Server[%s].", new 
URL(serverId));
+      final URL segmentLoadingCapabilitiesURL = new URL(
+          new URL(serverId),
+          "druid-internal/v1/segments/segmentLoadingCapabilities"
+      );
+
+      BytesAccumulatingResponseHandler responseHandler = new 
BytesAccumulatingResponseHandler();
+      InputStream stream = httpClient.go(
+          new Request(HttpMethod.GET, segmentLoadingCapabilitiesURL)
+              .addHeader(HttpHeaders.Names.ACCEPT, MediaType.APPLICATION_JSON),
+          responseHandler,
+          new Duration(10000)
+      ).get();
+
+      if (HttpServletResponse.SC_NOT_FOUND == responseHandler.getStatus()) {
+        log.info(
+            "Historical capabilities endpoint not found at server[%s]. Using 
default values.",
+            new URL(serverId)
+        );
+        return new HistoricalLoadingCapabilities(1, 1);
+      }
+
+      if (HttpServletResponse.SC_OK != responseHandler.getStatus()) {
+        throw new RE("Error when fetching capabilities from server[%s]. 
Received [%s]", new URL(serverId), responseHandler.getStatus());
+      }
+
+      return jsonMapper.readValue(
+          stream,
+          HistoricalLoadingCapabilities.class
       );
     }
-    catch (MalformedURLException ex) {
-      throw new RuntimeException(ex);
+    catch (Throwable th) {
+      throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+                          .ofCategory(DruidException.Category.RUNTIME_FAILURE)
+                          .build(th, "Received error while fetching historical 
capabilities from Server[%s].", serverId);

Review Comment:
   This exception is just logged, you can continue using a `RuntimeException` 
here.



##########
docs/configuration/index.md:
##########
@@ -953,6 +953,7 @@ The following table shows the dynamic configuration 
properties for the Coordinat
 |`decommissioningNodes`|List of Historical servers to decommission. 
Coordinator will not assign new segments to decommissioning servers, and 
segments will be moved away from them to be placed on non-decommissioning 
servers at the maximum rate specified by `maxSegmentsToMove`.|none|
 |`pauseCoordination`|Boolean flag for whether or not the Coordinator should 
execute its various duties of coordinating the cluster. Setting this to true 
essentially pauses all coordination work while allowing the API to remain up. 
Duties that are paused include all classes that implement the `CoordinatorDuty` 
interface. Such duties include: segment balancing, segment compaction, 
submitting kill tasks for unused segments (if enabled), logging of used 
segments in the cluster, marking of newly unused or overshadowed segments, 
matching and execution of load/drop rules for used segments, unloading segments 
that are no longer marked as used from Historical servers. An example of when 
an admin may want to pause coordination would be if they are doing deep storage 
maintenance on HDFS name nodes with downtime and don't want the Coordinator to 
be directing Historical nodes to hit the name node with API requests until 
maintenance is done and the deep store is declared healthy for use again.|fa
 lse|
 |`replicateAfterLoadTimeout`|Boolean flag for whether or not additional 
replication is needed for segments that have failed to load due to the expiry 
of `druid.coordinator.load.timeout`. If this is set to true, the Coordinator 
will attempt to replicate the failed segment on a different historical server. 
This helps improve the segment availability if there are a few slow Historicals 
in the cluster. However, the slow Historical may still load the segment later 
and the Coordinator may issue drop requests if the segment is 
over-replicated.|false|
+|`turboLoadingNodes`| List of Historical servers to place in turbo loading 
mode. These Historicals will load segments more quickly but at the cost of 
query performance. `turboLoadingNodes` requires dynamic configuration of 
batchSize to provide performance improvements. Hence, the runtime parameter 
`druid.coordinator.loadqueuepeon.http.batchSize` must not be configured. |none|

Review Comment:
   We shouldn't require the users to not specify the `batchSize`. The dynamic 
config `turboLoadingNodes` should simply override the behaviour of the 
batchSize. In other words, if a server is in `turboLoadingNodes`, we will not 
use the configured batchSize for it.
   
   ```suggestion
   |`turboLoadingNodes`| List of Historical servers to place in turbo loading 
mode. These servers use a larger thread-pool to load segments faster but at the 
cost of query performance. For servers specified in `turboLoadingNodes`, 
`druid.coordinator.loadqueuepeon.http.batchSize` is ignored and the coordinator 
uses the value of the respective `numLoadingThreads` instead. |none|
   ```



##########
server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java:
##########
@@ -454,19 +454,27 @@ private Environment(
           createBalancerStrategy(balancerStrategy),
           new HttpLoadQueuePeonConfig(null, null, null)
       );
+
+      JacksonConfigManager jacksonConfigManager = mockConfigManager();
+      setDynamicConfig(dynamicConfig);
+      CoordinatorConfigManager coordinatorConfigManager = new 
CoordinatorConfigManager(
+          jacksonConfigManager,
+          null,
+          null
+      );

Review Comment:
   I don't think we need this.



##########
server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java:
##########
@@ -327,6 +330,25 @@ public void onFailure(Throwable th)
     asyncContext.setTimeout(timeout);
   }
 
+  @GET
+  @Path("/segmentLoadingCapabilities")
+  @Produces({MediaType.APPLICATION_JSON, 
SmileMediaTypes.APPLICATION_JACKSON_SMILE})
+  public Response getSegmentLoadingCapabilities(
+      @Context final HttpServletRequest req
+  ) throws IOException
+  {
+    if (loadDropRequestHandler == null) {
+      sendErrorResponse(req, HttpServletResponse.SC_NOT_FOUND, "load/drop 
handler is not available.");

Review Comment:
   I am not sure if this is needed since you are returning a response in the 
next line itself.



##########
server/src/main/java/org/apache/druid/server/http/HistoricalLoadingCapabilities.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.http;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.error.DruidException;
+
+public class HistoricalLoadingCapabilities
+{
+  private final int numLoadingThreads;
+  private final int numTurboLoadingThreads;
+
+  @JsonCreator
+  public HistoricalLoadingCapabilities(
+      @JsonProperty("numLoadingThreads") int numLoadingThreads,
+      @JsonProperty("numTurboLoadingThreads") int numTurboLoadingThreads
+  )
+  {
+    if (numLoadingThreads < 1 || numTurboLoadingThreads < 1) {
+      throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+                          .ofCategory(DruidException.Category.INVALID_INPUT)
+                          .build("numLoadingThreads and numTurboLoadingThreads 
must be greater than 0.");
+    }

Review Comment:
   Is this really needed?
   This object will never be constructed by users, afaict.



##########
server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java:
##########
@@ -219,10 +221,10 @@ public void onFailure(Throwable th)
    * This endpoint is used by HttpLoadQueuePeon to assign segment load/drop 
requests batch. This endpoint makes the
    * client wait till one of the following events occur. Note that this is 
implemented using async IO so no jetty
    * threads are held while in wait.
-   *
+   * <br>
    * (1) Given timeout elapses.
    * (2) Some load/drop request completed.
-   *
+   * <br>

Review Comment:
   Nit: Use `<ol>` and `<il>` tags instead.



##########
server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java:
##########
@@ -308,6 +320,19 @@ public boolean getReplicateAfterLoadTimeout()
     return replicateAfterLoadTimeout;
   }
 
+  /**
+   * List of historical servers to put into turboloading mode. These 
historicals will use a larger thread pool to load
+   * segments. This causes decreases the average time taken to load segments. 
However, this also means less resources
+   * given to query threads which causes a drop in query performance.

Review Comment:
   ```suggestion
      * segments. This causes decreases the average time taken to load 
segments. However, this also means less resources
      * available to query threads which may cause a drop in query performance.
   ```



##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -200,13 +192,24 @@ private void doSegmentManagement()
       return;
     }
 
+    SegmentLoadingMode loadingMode = loadingModeSupplier.get();
+
     try {
-      log.trace("Sending [%d] load/drop requests to Server[%s].", 
newRequests.size(), serverId);
+      log.trace("Sending [%d] load/drop requests to Server[%s] in loadingMode 
[%s].", newRequests.size(), serverId, loadingMode);

Review Comment:
   ```suggestion
         log.trace("Sending [%d] load/drop requests to Server[%s] in 
loadingMode[%s].", newRequests.size(), serverId, loadingMode);
   ```



##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -88,9 +95,21 @@ public SegmentLoadDropHandler(
         config,
         announcer,
         segmentManager,
+        new ThreadPoolExecutor(
+            config.getNumLoadingThreads(), config.getNumLoadingThreads(),
+            60L, TimeUnit.SECONDS,
+            new SynchronousQueue<>(),
+            Execs.makeThreadFactory("SimpleDataSegmentChangeHandler-%s")

Review Comment:
   ```suggestion
               Execs.makeThreadFactory("SegmentLoadDropHandler-normal-%s")
   ```



##########
server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java:
##########
@@ -316,6 +333,30 @@ public void testLoadRateIsChangedWhenLoadSucceeds() throws 
InterruptedException
     );
   }
 
+  @Test
+  public void testBatchSize()
+  {
+    Assert.assertEquals(10, 
httpLoadQueuePeon.calculateBatchSize(SegmentLoadingMode.NORMAL));
+
+    httpLoadQueuePeon = new HttpLoadQueuePeon(
+        "http://dummy:4000";,
+        MAPPER,
+        httpClient,
+        new HttpLoadQueuePeonConfig(null, null, null),
+        new WrappingScheduledExecutorService(
+            "HttpLoadQueuePeonTest-%s",
+            httpClient.processingExecutor,
+            true
+        ),
+        httpClient.callbackExecutor,
+        () -> SegmentLoadingMode.NORMAL,
+        new HistoricalLoadingCapabilities(1, 3)
+    );

Review Comment:
   Why create another object? It seems to have the same arguments as the value 
already assigned to `httpLoadQueuePeon`.



##########
server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java:
##########
@@ -308,6 +320,19 @@ public boolean getReplicateAfterLoadTimeout()
     return replicateAfterLoadTimeout;
   }
 
+  /**
+   * List of historical servers to put into turboloading mode. These 
historicals will use a larger thread pool to load

Review Comment:
   ```suggestion
      * List of servers to put in turbo-loading mode. These servers will use a 
larger thread pool to load
   ```



##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -63,7 +68,9 @@ public class SegmentLoadDropHandler implements 
DataSegmentChangeHandler
   private final SegmentLoaderConfig config;
   private final DataSegmentAnnouncer announcer;
   private final SegmentManager segmentManager;
-  private final ScheduledExecutorService exec;
+  private final ScheduledExecutorService scheduledExecutorService;
+  private final ThreadPoolExecutor standardExec;
+  private final ThreadPoolExecutor turboExec;

Review Comment:
   ```suggestion
     private final ThreadPoolExecutor normalLoadExec;
     private final ThreadPoolExecutor turboLoadExec;
   ```



##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -88,9 +95,21 @@ public SegmentLoadDropHandler(
         config,
         announcer,
         segmentManager,
+        new ThreadPoolExecutor(
+            config.getNumLoadingThreads(), config.getNumLoadingThreads(),
+            60L, TimeUnit.SECONDS,

Review Comment:
   Please put args on separate lines.



##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -386,5 +449,22 @@ public boolean cancel(boolean interruptIfRunning)
       return true;
     }
   }
+
+  private ExecutorService getExecutorService(SegmentLoadingMode loadingMode)
+  {
+    switch (loadingMode) {
+      case TURBO:
+        return turboExec;
+      case NORMAL:
+        return standardExec;
+      default:
+        throw DruidException.defensive("Unknown execution mode [%s]", 
loadingMode);
+    }

Review Comment:
   This can be simplified:
   
   ```suggestion
       return loadingMode == TURBO ? turboExec : normalExec;
   ```



##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -88,9 +95,21 @@ public SegmentLoadDropHandler(
         config,
         announcer,
         segmentManager,
+        new ThreadPoolExecutor(
+            config.getNumLoadingThreads(), config.getNumLoadingThreads(),
+            60L, TimeUnit.SECONDS,
+            new SynchronousQueue<>(),
+            Execs.makeThreadFactory("SimpleDataSegmentChangeHandler-%s")
+        ),
+        new ThreadPoolExecutor(
+            config.getNumBootstrapThreads(), config.getNumBootstrapThreads(),
+            60L, TimeUnit.SECONDS,
+            new SynchronousQueue<>(),
+            Execs.makeThreadFactory("TurboDataSegmentChangeHandler-%s")

Review Comment:
   ```suggestion
               Execs.makeThreadFactory("SegmentLoadDropHandler-turbo-%s")
   ```



##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -124,6 +150,11 @@ public Map<String, SegmentRowCountDistribution> 
getRowCountDistributionPerDataso
 
   @Override
   public void addSegment(DataSegment segment, @Nullable 
DataSegmentChangeCallback callback)
+  {
+    addSegment(segment, callback, SegmentLoadingMode.NORMAL);
+  }
+
+  public void addSegment(DataSegment segment, @Nullable 
DataSegmentChangeCallback callback, SegmentLoadingMode loadingMode)

Review Comment:
   Does this need to be public?



##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -179,14 +210,21 @@ each time when addSegment() is called, it has to wait for 
the lock in order to m
   @Override
   public void removeSegment(DataSegment segment, @Nullable 
DataSegmentChangeCallback callback)
   {
-    removeSegment(segment, callback, true);
+    removeSegment(segment, callback, true, SegmentLoadingMode.NORMAL);
+  }
+
+  @VisibleForTesting
+  void removeSegment(DataSegment segment, @Nullable DataSegmentChangeCallback 
callback, boolean scheduleDrop)
+  {
+    removeSegment(segment, callback, scheduleDrop, SegmentLoadingMode.NORMAL);

Review Comment:
   For remove segment, loading mode should not matter.



##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -100,13 +119,20 @@ public SegmentLoadDropHandler(
       SegmentLoaderConfig config,
       DataSegmentAnnouncer announcer,
       SegmentManager segmentManager,
-      ScheduledExecutorService exec
+      ThreadPoolExecutor standardExec,
+      ThreadPoolExecutor turboExec,
+      ScheduledExecutorService scheduledExecutorService
   )
   {
     this.config = config;
     this.announcer = announcer;
     this.segmentManager = segmentManager;
-    this.exec = exec;
+    this.standardExec = standardExec;
+    this.turboExec = turboExec;
+    this.scheduledExecutorService = scheduledExecutorService;
+
+    this.standardExec.allowCoreThreadTimeOut(true);
+    this.turboExec.allowCoreThreadTimeOut(true);

Review Comment:
   We probably don't need this if we do the following.
   
   Since non-core threads can always be terminated, how about we just use a 
single thread pool with
   `coreSize = numLoadingThreads` and `maxSize = max(numLoadingThreads, 
numBootstrapThreads)`.
   
   We would submit more requests only when a server is in turbo mode.
   In normal mode, server would automatically terminate threads and use a 
smaller thread pool size.



##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -63,7 +68,9 @@ public class SegmentLoadDropHandler implements 
DataSegmentChangeHandler
   private final SegmentLoaderConfig config;
   private final DataSegmentAnnouncer announcer;
   private final SegmentManager segmentManager;
-  private final ScheduledExecutorService exec;
+  private final ScheduledExecutorService scheduledExecutorService;

Review Comment:
   I don't think we really need a third thread pool, you can just make the 
normal thread pool a `ScheduledThreadPoolExecutor` and always use it for drops.



##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -139,17 +146,72 @@ public HttpLoadQueuePeon(
     this.callBackExecutor = callBackExecutor;
 
     this.serverId = baseUrl;
+    this.loadingModeSupplier = loadingModeSupplier;
+    this.serverCapabilities = fetchSegmentLoadingCapabilities();
+  }
+
+  @VisibleForTesting
+  HttpLoadQueuePeon(
+      String baseUrl,
+      ObjectMapper jsonMapper,
+      HttpClient httpClient,
+      HttpLoadQueuePeonConfig config,
+      ScheduledExecutorService processingExecutor,
+      ExecutorService callBackExecutor,
+      Supplier<SegmentLoadingMode> loadingModeSupplier,
+      HistoricalLoadingCapabilities serverCapabilities
+  )
+  {
+    this.jsonMapper = jsonMapper;
+    this.requestBodyWriter = jsonMapper.writerFor(REQUEST_ENTITY_TYPE_REF);
+    this.httpClient = httpClient;
+    this.config = config;
+    this.processingExecutor = processingExecutor;
+    this.callBackExecutor = callBackExecutor;
+
+    this.serverId = baseUrl;
+    this.loadingModeSupplier = loadingModeSupplier;
+    this.serverCapabilities = serverCapabilities;
+  }
+
+  private HistoricalLoadingCapabilities fetchSegmentLoadingCapabilities()
+  {
     try {
-      this.changeRequestURL = new URL(
-          new URL(baseUrl),
-          StringUtils.nonStrictFormat(
-              "druid-internal/v1/segments/changeRequests?timeout=%d",
-              config.getHostTimeout().getMillis()
-          )
+      log.trace("Fetching historical capabilities from Server[%s].", new 
URL(serverId));
+      final URL segmentLoadingCapabilitiesURL = new URL(
+          new URL(serverId),
+          "druid-internal/v1/segments/segmentLoadingCapabilities"
+      );
+
+      BytesAccumulatingResponseHandler responseHandler = new 
BytesAccumulatingResponseHandler();
+      InputStream stream = httpClient.go(
+          new Request(HttpMethod.GET, segmentLoadingCapabilitiesURL)
+              .addHeader(HttpHeaders.Names.ACCEPT, MediaType.APPLICATION_JSON),
+          responseHandler,
+          new Duration(10000)
+      ).get();
+
+      if (HttpServletResponse.SC_NOT_FOUND == responseHandler.getStatus()) {
+        log.info(
+            "Historical capabilities endpoint not found at server[%s]. Using 
default values.",
+            new URL(serverId)
+        );
+        return new HistoricalLoadingCapabilities(1, 1);
+      }
+
+      if (HttpServletResponse.SC_OK != responseHandler.getStatus()) {
+        throw new RE("Error when fetching capabilities from server[%s]. 
Received [%s]", new URL(serverId), responseHandler.getStatus());

Review Comment:
   Throwing an exception here will just cause a generic alert with message 
`Caught exception, ignoring so that schedule keeps going.` to be raised in 
`DruidCoordinator`.
   
   Maybe do a more specific `log.makeAlert` here before throwing the exception.



##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -139,17 +146,72 @@ public HttpLoadQueuePeon(
     this.callBackExecutor = callBackExecutor;
 
     this.serverId = baseUrl;
+    this.loadingModeSupplier = loadingModeSupplier;
+    this.serverCapabilities = fetchSegmentLoadingCapabilities();
+  }
+
+  @VisibleForTesting
+  HttpLoadQueuePeon(
+      String baseUrl,
+      ObjectMapper jsonMapper,
+      HttpClient httpClient,
+      HttpLoadQueuePeonConfig config,
+      ScheduledExecutorService processingExecutor,
+      ExecutorService callBackExecutor,
+      Supplier<SegmentLoadingMode> loadingModeSupplier,
+      HistoricalLoadingCapabilities serverCapabilities
+  )
+  {
+    this.jsonMapper = jsonMapper;
+    this.requestBodyWriter = jsonMapper.writerFor(REQUEST_ENTITY_TYPE_REF);
+    this.httpClient = httpClient;
+    this.config = config;
+    this.processingExecutor = processingExecutor;
+    this.callBackExecutor = callBackExecutor;
+
+    this.serverId = baseUrl;
+    this.loadingModeSupplier = loadingModeSupplier;
+    this.serverCapabilities = serverCapabilities;
+  }
+
+  private HistoricalLoadingCapabilities fetchSegmentLoadingCapabilities()
+  {
     try {
-      this.changeRequestURL = new URL(
-          new URL(baseUrl),
-          StringUtils.nonStrictFormat(
-              "druid-internal/v1/segments/changeRequests?timeout=%d",
-              config.getHostTimeout().getMillis()
-          )
+      log.trace("Fetching historical capabilities from Server[%s].", new 
URL(serverId));

Review Comment:
   I don't think we ever enable trace logs. Might as well not have this.



##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -139,17 +146,72 @@ public HttpLoadQueuePeon(
     this.callBackExecutor = callBackExecutor;
 
     this.serverId = baseUrl;
+    this.loadingModeSupplier = loadingModeSupplier;
+    this.serverCapabilities = fetchSegmentLoadingCapabilities();
+  }
+
+  @VisibleForTesting
+  HttpLoadQueuePeon(
+      String baseUrl,
+      ObjectMapper jsonMapper,
+      HttpClient httpClient,
+      HttpLoadQueuePeonConfig config,
+      ScheduledExecutorService processingExecutor,
+      ExecutorService callBackExecutor,
+      Supplier<SegmentLoadingMode> loadingModeSupplier,
+      HistoricalLoadingCapabilities serverCapabilities
+  )
+  {
+    this.jsonMapper = jsonMapper;
+    this.requestBodyWriter = jsonMapper.writerFor(REQUEST_ENTITY_TYPE_REF);
+    this.httpClient = httpClient;
+    this.config = config;
+    this.processingExecutor = processingExecutor;
+    this.callBackExecutor = callBackExecutor;
+
+    this.serverId = baseUrl;
+    this.loadingModeSupplier = loadingModeSupplier;
+    this.serverCapabilities = serverCapabilities;
+  }

Review Comment:
   Can we avoid adding this constructor?



##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -114,21 +119,23 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
 
   private final ObjectMapper jsonMapper;
   private final HttpClient httpClient;
-  private final URL changeRequestURL;
   private final String serverId;
 
   private final AtomicBoolean mainLoopInProgress = new AtomicBoolean(false);
   private final ExecutorService callBackExecutor;
+  private final Supplier<SegmentLoadingMode> loadingModeSupplier;
 
   private final ObjectWriter requestBodyWriter;
+  private final HistoricalLoadingCapabilities serverCapabilities;
 
   public HttpLoadQueuePeon(
       String baseUrl,
       ObjectMapper jsonMapper,
       HttpClient httpClient,
       HttpLoadQueuePeonConfig config,
       ScheduledExecutorService processingExecutor,
-      ExecutorService callBackExecutor
+      ExecutorService callBackExecutor,
+      Supplier<SegmentLoadingMode> loadingModeSupplier

Review Comment:
   Nit: Put this arg after `HttpLoadQueuePeonConfig config`.



##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -139,17 +146,72 @@ public HttpLoadQueuePeon(
     this.callBackExecutor = callBackExecutor;
 
     this.serverId = baseUrl;
+    this.loadingModeSupplier = loadingModeSupplier;
+    this.serverCapabilities = fetchSegmentLoadingCapabilities();
+  }
+
+  @VisibleForTesting
+  HttpLoadQueuePeon(
+      String baseUrl,
+      ObjectMapper jsonMapper,
+      HttpClient httpClient,
+      HttpLoadQueuePeonConfig config,
+      ScheduledExecutorService processingExecutor,
+      ExecutorService callBackExecutor,
+      Supplier<SegmentLoadingMode> loadingModeSupplier,
+      HistoricalLoadingCapabilities serverCapabilities
+  )
+  {
+    this.jsonMapper = jsonMapper;
+    this.requestBodyWriter = jsonMapper.writerFor(REQUEST_ENTITY_TYPE_REF);
+    this.httpClient = httpClient;
+    this.config = config;
+    this.processingExecutor = processingExecutor;
+    this.callBackExecutor = callBackExecutor;
+
+    this.serverId = baseUrl;
+    this.loadingModeSupplier = loadingModeSupplier;
+    this.serverCapabilities = serverCapabilities;
+  }
+
+  private HistoricalLoadingCapabilities fetchSegmentLoadingCapabilities()
+  {
     try {
-      this.changeRequestURL = new URL(
-          new URL(baseUrl),
-          StringUtils.nonStrictFormat(
-              "druid-internal/v1/segments/changeRequests?timeout=%d",
-              config.getHostTimeout().getMillis()
-          )
+      log.trace("Fetching historical capabilities from Server[%s].", new 
URL(serverId));
+      final URL segmentLoadingCapabilitiesURL = new URL(
+          new URL(serverId),
+          "druid-internal/v1/segments/segmentLoadingCapabilities"
+      );
+
+      BytesAccumulatingResponseHandler responseHandler = new 
BytesAccumulatingResponseHandler();
+      InputStream stream = httpClient.go(
+          new Request(HttpMethod.GET, segmentLoadingCapabilitiesURL)
+              .addHeader(HttpHeaders.Names.ACCEPT, MediaType.APPLICATION_JSON),
+          responseHandler,
+          new Duration(10000)
+      ).get();
+
+      if (HttpServletResponse.SC_NOT_FOUND == responseHandler.getStatus()) {
+        log.info(
+            "Historical capabilities endpoint not found at server[%s]. Using 
default values.",
+            new URL(serverId)
+        );
+        return new HistoricalLoadingCapabilities(1, 1);
+      }
+
+      if (HttpServletResponse.SC_OK != responseHandler.getStatus()) {

Review Comment:
   ```suggestion
         } else if (HttpServletResponse.SC_OK != responseHandler.getStatus()) {
   ```



##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -160,7 +222,8 @@ private void doSegmentManagement()
       return;
     }
 
-    final int batchSize = config.getBatchSize();
+    final SegmentLoadingMode loadingMode = loadingModeSupplier.get();
+    int batchSize = calculateBatchSize(loadingMode);

Review Comment:
   ```suggestion
       final int batchSize = calculateBatchSize(loadingMode);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to