kfaraz commented on code in PR #17775:
URL: https://github.com/apache/druid/pull/17775#discussion_r1984707287
##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -386,5 +415,17 @@ public boolean cancel(boolean interruptIfRunning)
return true;
}
}
+
+ private ScheduledExecutorService getExecutorService(SegmentLoadingMode
loadingMode)
+ {
+ switch (loadingMode) {
+ case TURBO:
+ return turboExec;
+ case NORMAL:
+ return exec;
+ default:
+ throw DruidException.defensive("Unknown execution mode [%s]",
loadingMode);
+ }
Review Comment:
Nit: This probably can be simpler
```suggestion
return loadingMode == SegmentLoadingMode.TURBO ? turboExec : exec;
```
##########
server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java:
##########
@@ -162,6 +166,7 @@ public CoordinatorDynamicConfig(
);
this.debugDimensions = debugDimensions;
this.validDebugDimensions = validateDebugDimensions(debugDimensions);
+ this.turboLoadingNodes = parseJsonStringOrArray(turboLoadingNodes);
Review Comment:
We need not support `String` since this is a new config. Let's just support
array only.
```suggestion
this.turboLoadingNodes = Configs.valueOrDefault(turboLoadingNodes,
Set.of());
```
##########
docs/configuration/index.md:
##########
@@ -885,7 +885,7 @@ These Coordinator static configurations can be defined in
the `coordinator/runti
|`druid.coordinator.kill.maxInterval`|The largest interval, as an [ISO 8601
duration](https://en.wikipedia.org/wiki/ISO_8601#Durations), of segments to
delete per kill task. Set to zero, e.g. `PT0S`, for unlimited. This only
applies when `druid.coordinator.kill.on=true`.|`P30D`|
|`druid.coordinator.balancer.strategy`|The [balancing
strategy](../design/coordinator.md#balancing-segments-in-a-tier) used by the
Coordinator to distribute segments among the Historical servers in a tier. The
`cost` strategy distributes segments by minimizing a cost function,
`diskNormalized` weights these costs with the disk usage ratios of the servers
and `random` distributes segments randomly.|`cost`|
|`druid.coordinator.loadqueuepeon.http.repeatDelay`|The start and repeat delay
(in milliseconds) for the load queue peon, which manages the load/drop queue of
segments for any server.|1 minute|
-|`druid.coordinator.loadqueuepeon.http.batchSize`|Number of segment load/drop
requests to batch in one HTTP request. Note that it must be smaller than
`druid.segmentCache.numLoadingThreads` config on Historical service.|1|
+|`druid.coordinator.loadqueuepeon.http.batchSize`|Number of segment load/drop
requests to batch in one HTTP request. Note that it must be smaller than or
equal to the `druid.segmentCache.numLoadingThreads` config on Historical
service. If the value is not provided, the coordinator automatically
dynamically configures the value to the `numLoadingThreads` available on the
Historical. | `druid.segmentCache.numLoadingThreads` |
Review Comment:
```suggestion
|`druid.coordinator.loadqueuepeon.http.batchSize`|Number of segment
load/drop requests to batch in one HTTP request. Note that it must be smaller
than or equal to the `druid.segmentCache.numLoadingThreads` config on
Historical service. If this value is not configured, the coordinator uses the
value of the `numLoadingThreads` for the respective server. |
`druid.segmentCache.numLoadingThreads` |
```
##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -308,6 +311,9 @@ private void logRequestFailure(Throwable t)
processingExecutor
);
}
+ catch (MalformedURLException ex) {
+ throw new RuntimeException(ex);
+ }
Review Comment:
We probably don't need a separate catch since we are already catching all
throwables.
##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -200,13 +192,24 @@ private void doSegmentManagement()
return;
}
+ SegmentLoadingMode loadingMode = loadingModeSupplier.get();
Review Comment:
```suggestion
final SegmentLoadingMode loadingMode = loadingModeSupplier.get();
```
##########
server/src/main/java/org/apache/druid/server/http/HistoricalLoadingCapabilities.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.http;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.error.DruidException;
+
+public class HistoricalLoadingCapabilities
Review Comment:
We might use this for servers other than historicals.
After all, the `SegmentLoadDropHandler` is used on other server types too.
```suggestion
public class SegmentLoadingCapabilities
```
Please add a short javadoc too.
##########
server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java:
##########
@@ -327,6 +330,25 @@ public void onFailure(Throwable th)
asyncContext.setTimeout(timeout);
}
+ @GET
+ @Path("/segmentLoadingCapabilities")
Review Comment:
```suggestion
@Path("/loadCapabilities")
```
##########
server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java:
##########
@@ -454,19 +454,27 @@ private Environment(
createBalancerStrategy(balancerStrategy),
new HttpLoadQueuePeonConfig(null, null, null)
);
+
+ JacksonConfigManager jacksonConfigManager = mockConfigManager();
+ setDynamicConfig(dynamicConfig);
+ CoordinatorConfigManager coordinatorConfigManager = new
CoordinatorConfigManager(
+ jacksonConfigManager,
+ null,
+ null
+ );
+
this.loadQueueTaskMaster = new LoadQueueTaskMaster(
OBJECT_MAPPER,
executorFactory.create(1, ExecutorFactory.LOAD_QUEUE_EXECUTOR),
executorFactory.create(1, ExecutorFactory.LOAD_CALLBACK_EXECUTOR),
coordinatorConfig.getHttpLoadQueuePeonConfig(),
- httpClient
+ httpClient,
+ coordinatorConfigManager::getCurrentDynamicConfig
Review Comment:
```suggestion
() -> dynamicConfig
```
##########
server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java:
##########
@@ -76,6 +82,15 @@ public class HttpLoadQueuePeonTest
public void setUp()
{
httpClient = new TestHttpClient();
+ JacksonConfigManager configManager =
EasyMock.createNiceMock(JacksonConfigManager.class);
+ EasyMock.expect(
+ configManager.watch(
+ EasyMock.eq(CoordinatorDynamicConfig.CONFIG_KEY),
+ EasyMock.anyObject(Class.class),
+ EasyMock.anyObject()
+ )
+ ).andReturn(new
AtomicReference<>(CoordinatorDynamicConfig.builder().build())).anyTimes();
+ EasyMock.replay(configManager);
Review Comment:
I don't think `configManager` is being used anywhere. We can remove this.
##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -139,17 +146,72 @@ public HttpLoadQueuePeon(
this.callBackExecutor = callBackExecutor;
this.serverId = baseUrl;
+ this.loadingModeSupplier = loadingModeSupplier;
+ this.serverCapabilities = fetchSegmentLoadingCapabilities();
+ }
+
+ @VisibleForTesting
+ HttpLoadQueuePeon(
+ String baseUrl,
+ ObjectMapper jsonMapper,
+ HttpClient httpClient,
+ HttpLoadQueuePeonConfig config,
+ ScheduledExecutorService processingExecutor,
+ ExecutorService callBackExecutor,
+ Supplier<SegmentLoadingMode> loadingModeSupplier,
+ HistoricalLoadingCapabilities serverCapabilities
+ )
+ {
+ this.jsonMapper = jsonMapper;
+ this.requestBodyWriter = jsonMapper.writerFor(REQUEST_ENTITY_TYPE_REF);
+ this.httpClient = httpClient;
+ this.config = config;
+ this.processingExecutor = processingExecutor;
+ this.callBackExecutor = callBackExecutor;
+
+ this.serverId = baseUrl;
+ this.loadingModeSupplier = loadingModeSupplier;
+ this.serverCapabilities = serverCapabilities;
+ }
+
+ private HistoricalLoadingCapabilities fetchSegmentLoadingCapabilities()
+ {
try {
- this.changeRequestURL = new URL(
- new URL(baseUrl),
- StringUtils.nonStrictFormat(
- "druid-internal/v1/segments/changeRequests?timeout=%d",
- config.getHostTimeout().getMillis()
- )
+ log.trace("Fetching historical capabilities from Server[%s].", new
URL(serverId));
+ final URL segmentLoadingCapabilitiesURL = new URL(
+ new URL(serverId),
+ "druid-internal/v1/segments/segmentLoadingCapabilities"
+ );
+
+ BytesAccumulatingResponseHandler responseHandler = new
BytesAccumulatingResponseHandler();
+ InputStream stream = httpClient.go(
+ new Request(HttpMethod.GET, segmentLoadingCapabilitiesURL)
+ .addHeader(HttpHeaders.Names.ACCEPT, MediaType.APPLICATION_JSON),
+ responseHandler,
+ new Duration(10000)
+ ).get();
+
+ if (HttpServletResponse.SC_NOT_FOUND == responseHandler.getStatus()) {
+ log.info(
+ "Historical capabilities endpoint not found at server[%s]. Using
default values.",
Review Comment:
Include the URL in this log message.
##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -139,17 +146,72 @@ public HttpLoadQueuePeon(
this.callBackExecutor = callBackExecutor;
this.serverId = baseUrl;
+ this.loadingModeSupplier = loadingModeSupplier;
+ this.serverCapabilities = fetchSegmentLoadingCapabilities();
+ }
+
+ @VisibleForTesting
+ HttpLoadQueuePeon(
+ String baseUrl,
+ ObjectMapper jsonMapper,
+ HttpClient httpClient,
+ HttpLoadQueuePeonConfig config,
+ ScheduledExecutorService processingExecutor,
+ ExecutorService callBackExecutor,
+ Supplier<SegmentLoadingMode> loadingModeSupplier,
+ HistoricalLoadingCapabilities serverCapabilities
+ )
+ {
+ this.jsonMapper = jsonMapper;
+ this.requestBodyWriter = jsonMapper.writerFor(REQUEST_ENTITY_TYPE_REF);
+ this.httpClient = httpClient;
+ this.config = config;
+ this.processingExecutor = processingExecutor;
+ this.callBackExecutor = callBackExecutor;
+
+ this.serverId = baseUrl;
+ this.loadingModeSupplier = loadingModeSupplier;
+ this.serverCapabilities = serverCapabilities;
+ }
+
+ private HistoricalLoadingCapabilities fetchSegmentLoadingCapabilities()
+ {
try {
- this.changeRequestURL = new URL(
- new URL(baseUrl),
- StringUtils.nonStrictFormat(
- "druid-internal/v1/segments/changeRequests?timeout=%d",
- config.getHostTimeout().getMillis()
- )
+ log.trace("Fetching historical capabilities from Server[%s].", new
URL(serverId));
+ final URL segmentLoadingCapabilitiesURL = new URL(
+ new URL(serverId),
+ "druid-internal/v1/segments/segmentLoadingCapabilities"
+ );
+
+ BytesAccumulatingResponseHandler responseHandler = new
BytesAccumulatingResponseHandler();
+ InputStream stream = httpClient.go(
+ new Request(HttpMethod.GET, segmentLoadingCapabilitiesURL)
+ .addHeader(HttpHeaders.Names.ACCEPT, MediaType.APPLICATION_JSON),
+ responseHandler,
+ new Duration(10000)
+ ).get();
+
+ if (HttpServletResponse.SC_NOT_FOUND == responseHandler.getStatus()) {
+ log.info(
Review Comment:
```suggestion
log.warn(
```
##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -308,12 +380,29 @@ private void logRequestFailure(Throwable t)
processingExecutor
);
}
+ catch (MalformedURLException ex) {
+ throw new RuntimeException(ex);
+ }
catch (Throwable th) {
log.error(th, "Error sending load/drop request to [%s].", serverId);
mainLoopInProgress.set(false);
}
}
+ @VisibleForTesting
+ int calculateBatchSize(SegmentLoadingMode loadingMode)
+ {
+ if (config.getBatchSize() != null) {
+ return config.getBatchSize();
+ } else if (SegmentLoadingMode.TURBO.equals(loadingMode)) {
+ return serverCapabilities.getNumTurboLoadingThreads();
+ } else if (SegmentLoadingMode.NORMAL.equals(loadingMode)) {
+ return serverCapabilities.getNumLoadingThreads();
+ } else {
+ throw DruidException.defensive().build("unsupported loading mode");
+ }
Review Comment:
Precedence is:
turbo config > batch size config > default
(you may call this out in the javadoc of this method)
```suggestion
if (SegmentLoadingMode.TURBO.equals(loadingMode)) {
return serverCapabilities.getNumTurboLoadingThreads();
} else if {
return Configs.valueOrDefault(config.getBatchSize(),
serverCapabilities.getNumLoadingThreads());
}
```
(I assume that once we are in this method, `serverCapabilities` will never
be null.)
##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -139,17 +146,72 @@ public HttpLoadQueuePeon(
this.callBackExecutor = callBackExecutor;
this.serverId = baseUrl;
+ this.loadingModeSupplier = loadingModeSupplier;
+ this.serverCapabilities = fetchSegmentLoadingCapabilities();
+ }
+
+ @VisibleForTesting
+ HttpLoadQueuePeon(
+ String baseUrl,
+ ObjectMapper jsonMapper,
+ HttpClient httpClient,
+ HttpLoadQueuePeonConfig config,
+ ScheduledExecutorService processingExecutor,
+ ExecutorService callBackExecutor,
+ Supplier<SegmentLoadingMode> loadingModeSupplier,
+ HistoricalLoadingCapabilities serverCapabilities
+ )
+ {
+ this.jsonMapper = jsonMapper;
+ this.requestBodyWriter = jsonMapper.writerFor(REQUEST_ENTITY_TYPE_REF);
+ this.httpClient = httpClient;
+ this.config = config;
+ this.processingExecutor = processingExecutor;
+ this.callBackExecutor = callBackExecutor;
+
+ this.serverId = baseUrl;
+ this.loadingModeSupplier = loadingModeSupplier;
+ this.serverCapabilities = serverCapabilities;
+ }
+
+ private HistoricalLoadingCapabilities fetchSegmentLoadingCapabilities()
+ {
try {
- this.changeRequestURL = new URL(
- new URL(baseUrl),
- StringUtils.nonStrictFormat(
- "druid-internal/v1/segments/changeRequests?timeout=%d",
- config.getHostTimeout().getMillis()
- )
+ log.trace("Fetching historical capabilities from Server[%s].", new
URL(serverId));
+ final URL segmentLoadingCapabilitiesURL = new URL(
+ new URL(serverId),
+ "druid-internal/v1/segments/segmentLoadingCapabilities"
+ );
+
+ BytesAccumulatingResponseHandler responseHandler = new
BytesAccumulatingResponseHandler();
+ InputStream stream = httpClient.go(
+ new Request(HttpMethod.GET, segmentLoadingCapabilitiesURL)
+ .addHeader(HttpHeaders.Names.ACCEPT, MediaType.APPLICATION_JSON),
+ responseHandler,
+ new Duration(10000)
+ ).get();
+
+ if (HttpServletResponse.SC_NOT_FOUND == responseHandler.getStatus()) {
+ log.info(
+ "Historical capabilities endpoint not found at server[%s]. Using
default values.",
+ new URL(serverId)
+ );
+ return new HistoricalLoadingCapabilities(1, 1);
+ }
+
+ if (HttpServletResponse.SC_OK != responseHandler.getStatus()) {
+ throw new RE("Error when fetching capabilities from server[%s].
Received [%s]", new URL(serverId), responseHandler.getStatus());
+ }
+
+ return jsonMapper.readValue(
+ stream,
+ HistoricalLoadingCapabilities.class
);
}
- catch (MalformedURLException ex) {
- throw new RuntimeException(ex);
+ catch (Throwable th) {
+ throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+ .ofCategory(DruidException.Category.RUNTIME_FAILURE)
+ .build(th, "Received error while fetching historical
capabilities from Server[%s].", serverId);
Review Comment:
This exception is just logged, you can continue using a `RuntimeException`
here.
##########
docs/configuration/index.md:
##########
@@ -953,6 +953,7 @@ The following table shows the dynamic configuration
properties for the Coordinat
|`decommissioningNodes`|List of Historical servers to decommission.
Coordinator will not assign new segments to decommissioning servers, and
segments will be moved away from them to be placed on non-decommissioning
servers at the maximum rate specified by `maxSegmentsToMove`.|none|
|`pauseCoordination`|Boolean flag for whether or not the Coordinator should
execute its various duties of coordinating the cluster. Setting this to true
essentially pauses all coordination work while allowing the API to remain up.
Duties that are paused include all classes that implement the `CoordinatorDuty`
interface. Such duties include: segment balancing, segment compaction,
submitting kill tasks for unused segments (if enabled), logging of used
segments in the cluster, marking of newly unused or overshadowed segments,
matching and execution of load/drop rules for used segments, unloading segments
that are no longer marked as used from Historical servers. An example of when
an admin may want to pause coordination would be if they are doing deep storage
maintenance on HDFS name nodes with downtime and don't want the Coordinator to
be directing Historical nodes to hit the name node with API requests until
maintenance is done and the deep store is declared healthy for use again.|fa
lse|
|`replicateAfterLoadTimeout`|Boolean flag for whether or not additional
replication is needed for segments that have failed to load due to the expiry
of `druid.coordinator.load.timeout`. If this is set to true, the Coordinator
will attempt to replicate the failed segment on a different historical server.
This helps improve the segment availability if there are a few slow Historicals
in the cluster. However, the slow Historical may still load the segment later
and the Coordinator may issue drop requests if the segment is
over-replicated.|false|
+|`turboLoadingNodes`| List of Historical servers to place in turbo loading
mode. These Historicals will load segments more quickly but at the cost of
query performance. `turboLoadingNodes` requires dynamic configuration of
batchSize to provide performance improvements. Hence, the runtime parameter
`druid.coordinator.loadqueuepeon.http.batchSize` must not be configured. |none|
Review Comment:
We shouldn't require the users to not specify the `batchSize`. The dynamic
config `turboLoadingNodes` should simply override the behaviour of the
batchSize. In other words, if a server is in `turboLoadingNodes`, we will not
use the configured batchSize for it.
```suggestion
|`turboLoadingNodes`| List of Historical servers to place in turbo loading
mode. These servers use a larger thread-pool to load segments faster but at the
cost of query performance. For servers specified in `turboLoadingNodes`,
`druid.coordinator.loadqueuepeon.http.batchSize` is ignored and the coordinator
uses the value of the respective `numLoadingThreads` instead. |none|
```
##########
server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java:
##########
@@ -454,19 +454,27 @@ private Environment(
createBalancerStrategy(balancerStrategy),
new HttpLoadQueuePeonConfig(null, null, null)
);
+
+ JacksonConfigManager jacksonConfigManager = mockConfigManager();
+ setDynamicConfig(dynamicConfig);
+ CoordinatorConfigManager coordinatorConfigManager = new
CoordinatorConfigManager(
+ jacksonConfigManager,
+ null,
+ null
+ );
Review Comment:
I don't think we need this.
##########
server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java:
##########
@@ -327,6 +330,25 @@ public void onFailure(Throwable th)
asyncContext.setTimeout(timeout);
}
+ @GET
+ @Path("/segmentLoadingCapabilities")
+ @Produces({MediaType.APPLICATION_JSON,
SmileMediaTypes.APPLICATION_JACKSON_SMILE})
+ public Response getSegmentLoadingCapabilities(
+ @Context final HttpServletRequest req
+ ) throws IOException
+ {
+ if (loadDropRequestHandler == null) {
+ sendErrorResponse(req, HttpServletResponse.SC_NOT_FOUND, "load/drop
handler is not available.");
Review Comment:
I am not sure if this is needed since you are returning a response in the
next line itself.
##########
server/src/main/java/org/apache/druid/server/http/HistoricalLoadingCapabilities.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.http;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.error.DruidException;
+
+public class HistoricalLoadingCapabilities
+{
+ private final int numLoadingThreads;
+ private final int numTurboLoadingThreads;
+
+ @JsonCreator
+ public HistoricalLoadingCapabilities(
+ @JsonProperty("numLoadingThreads") int numLoadingThreads,
+ @JsonProperty("numTurboLoadingThreads") int numTurboLoadingThreads
+ )
+ {
+ if (numLoadingThreads < 1 || numTurboLoadingThreads < 1) {
+ throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+ .ofCategory(DruidException.Category.INVALID_INPUT)
+ .build("numLoadingThreads and numTurboLoadingThreads
must be greater than 0.");
+ }
Review Comment:
Is this really needed?
This object will never be constructed by users, afaict.
##########
server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java:
##########
@@ -219,10 +221,10 @@ public void onFailure(Throwable th)
* This endpoint is used by HttpLoadQueuePeon to assign segment load/drop
requests batch. This endpoint makes the
* client wait till one of the following events occur. Note that this is
implemented using async IO so no jetty
* threads are held while in wait.
- *
+ * <br>
* (1) Given timeout elapses.
* (2) Some load/drop request completed.
- *
+ * <br>
Review Comment:
Nit: Use `<ol>` and `<il>` tags instead.
##########
server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java:
##########
@@ -308,6 +320,19 @@ public boolean getReplicateAfterLoadTimeout()
return replicateAfterLoadTimeout;
}
+ /**
+ * List of historical servers to put into turboloading mode. These
historicals will use a larger thread pool to load
+ * segments. This causes decreases the average time taken to load segments.
However, this also means less resources
+ * given to query threads which causes a drop in query performance.
Review Comment:
```suggestion
* segments. This causes decreases the average time taken to load
segments. However, this also means less resources
* available to query threads which may cause a drop in query performance.
```
##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -200,13 +192,24 @@ private void doSegmentManagement()
return;
}
+ SegmentLoadingMode loadingMode = loadingModeSupplier.get();
+
try {
- log.trace("Sending [%d] load/drop requests to Server[%s].",
newRequests.size(), serverId);
+ log.trace("Sending [%d] load/drop requests to Server[%s] in loadingMode
[%s].", newRequests.size(), serverId, loadingMode);
Review Comment:
```suggestion
log.trace("Sending [%d] load/drop requests to Server[%s] in
loadingMode[%s].", newRequests.size(), serverId, loadingMode);
```
##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -88,9 +95,21 @@ public SegmentLoadDropHandler(
config,
announcer,
segmentManager,
+ new ThreadPoolExecutor(
+ config.getNumLoadingThreads(), config.getNumLoadingThreads(),
+ 60L, TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ Execs.makeThreadFactory("SimpleDataSegmentChangeHandler-%s")
Review Comment:
```suggestion
Execs.makeThreadFactory("SegmentLoadDropHandler-normal-%s")
```
##########
server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java:
##########
@@ -316,6 +333,30 @@ public void testLoadRateIsChangedWhenLoadSucceeds() throws
InterruptedException
);
}
+ @Test
+ public void testBatchSize()
+ {
+ Assert.assertEquals(10,
httpLoadQueuePeon.calculateBatchSize(SegmentLoadingMode.NORMAL));
+
+ httpLoadQueuePeon = new HttpLoadQueuePeon(
+ "http://dummy:4000",
+ MAPPER,
+ httpClient,
+ new HttpLoadQueuePeonConfig(null, null, null),
+ new WrappingScheduledExecutorService(
+ "HttpLoadQueuePeonTest-%s",
+ httpClient.processingExecutor,
+ true
+ ),
+ httpClient.callbackExecutor,
+ () -> SegmentLoadingMode.NORMAL,
+ new HistoricalLoadingCapabilities(1, 3)
+ );
Review Comment:
Why create another object? It seems to have the same arguments as the value
already assigned to `httpLoadQueuePeon`.
##########
server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java:
##########
@@ -308,6 +320,19 @@ public boolean getReplicateAfterLoadTimeout()
return replicateAfterLoadTimeout;
}
+ /**
+ * List of historical servers to put into turboloading mode. These
historicals will use a larger thread pool to load
Review Comment:
```suggestion
* List of servers to put in turbo-loading mode. These servers will use a
larger thread pool to load
```
##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -63,7 +68,9 @@ public class SegmentLoadDropHandler implements
DataSegmentChangeHandler
private final SegmentLoaderConfig config;
private final DataSegmentAnnouncer announcer;
private final SegmentManager segmentManager;
- private final ScheduledExecutorService exec;
+ private final ScheduledExecutorService scheduledExecutorService;
+ private final ThreadPoolExecutor standardExec;
+ private final ThreadPoolExecutor turboExec;
Review Comment:
```suggestion
private final ThreadPoolExecutor normalLoadExec;
private final ThreadPoolExecutor turboLoadExec;
```
##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -88,9 +95,21 @@ public SegmentLoadDropHandler(
config,
announcer,
segmentManager,
+ new ThreadPoolExecutor(
+ config.getNumLoadingThreads(), config.getNumLoadingThreads(),
+ 60L, TimeUnit.SECONDS,
Review Comment:
Please put args on separate lines.
##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -386,5 +449,22 @@ public boolean cancel(boolean interruptIfRunning)
return true;
}
}
+
+ private ExecutorService getExecutorService(SegmentLoadingMode loadingMode)
+ {
+ switch (loadingMode) {
+ case TURBO:
+ return turboExec;
+ case NORMAL:
+ return standardExec;
+ default:
+ throw DruidException.defensive("Unknown execution mode [%s]",
loadingMode);
+ }
Review Comment:
This can be simplified:
```suggestion
return loadingMode == TURBO ? turboExec : normalExec;
```
##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -88,9 +95,21 @@ public SegmentLoadDropHandler(
config,
announcer,
segmentManager,
+ new ThreadPoolExecutor(
+ config.getNumLoadingThreads(), config.getNumLoadingThreads(),
+ 60L, TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ Execs.makeThreadFactory("SimpleDataSegmentChangeHandler-%s")
+ ),
+ new ThreadPoolExecutor(
+ config.getNumBootstrapThreads(), config.getNumBootstrapThreads(),
+ 60L, TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ Execs.makeThreadFactory("TurboDataSegmentChangeHandler-%s")
Review Comment:
```suggestion
Execs.makeThreadFactory("SegmentLoadDropHandler-turbo-%s")
```
##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -124,6 +150,11 @@ public Map<String, SegmentRowCountDistribution>
getRowCountDistributionPerDataso
@Override
public void addSegment(DataSegment segment, @Nullable
DataSegmentChangeCallback callback)
+ {
+ addSegment(segment, callback, SegmentLoadingMode.NORMAL);
+ }
+
+ public void addSegment(DataSegment segment, @Nullable
DataSegmentChangeCallback callback, SegmentLoadingMode loadingMode)
Review Comment:
Does this need to be public?
##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -179,14 +210,21 @@ each time when addSegment() is called, it has to wait for
the lock in order to m
@Override
public void removeSegment(DataSegment segment, @Nullable
DataSegmentChangeCallback callback)
{
- removeSegment(segment, callback, true);
+ removeSegment(segment, callback, true, SegmentLoadingMode.NORMAL);
+ }
+
+ @VisibleForTesting
+ void removeSegment(DataSegment segment, @Nullable DataSegmentChangeCallback
callback, boolean scheduleDrop)
+ {
+ removeSegment(segment, callback, scheduleDrop, SegmentLoadingMode.NORMAL);
Review Comment:
For remove segment, loading mode should not matter.
##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -100,13 +119,20 @@ public SegmentLoadDropHandler(
SegmentLoaderConfig config,
DataSegmentAnnouncer announcer,
SegmentManager segmentManager,
- ScheduledExecutorService exec
+ ThreadPoolExecutor standardExec,
+ ThreadPoolExecutor turboExec,
+ ScheduledExecutorService scheduledExecutorService
)
{
this.config = config;
this.announcer = announcer;
this.segmentManager = segmentManager;
- this.exec = exec;
+ this.standardExec = standardExec;
+ this.turboExec = turboExec;
+ this.scheduledExecutorService = scheduledExecutorService;
+
+ this.standardExec.allowCoreThreadTimeOut(true);
+ this.turboExec.allowCoreThreadTimeOut(true);
Review Comment:
We probably don't need this if we do the following.
Since non-core threads can always be terminated, how about we just use a
single thread pool with
`coreSize = numLoadingThreads` and `maxSize = max(numLoadingThreads,
numBootstrapThreads)`.
We would submit more requests only when a server is in turbo mode.
In normal mode, server would automatically terminate threads and use a
smaller thread pool size.
##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -63,7 +68,9 @@ public class SegmentLoadDropHandler implements
DataSegmentChangeHandler
private final SegmentLoaderConfig config;
private final DataSegmentAnnouncer announcer;
private final SegmentManager segmentManager;
- private final ScheduledExecutorService exec;
+ private final ScheduledExecutorService scheduledExecutorService;
Review Comment:
I don't think we really need a third thread pool, you can just make the
normal thread pool a `ScheduledThreadPoolExecutor` and always use it for drops.
##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -139,17 +146,72 @@ public HttpLoadQueuePeon(
this.callBackExecutor = callBackExecutor;
this.serverId = baseUrl;
+ this.loadingModeSupplier = loadingModeSupplier;
+ this.serverCapabilities = fetchSegmentLoadingCapabilities();
+ }
+
+ @VisibleForTesting
+ HttpLoadQueuePeon(
+ String baseUrl,
+ ObjectMapper jsonMapper,
+ HttpClient httpClient,
+ HttpLoadQueuePeonConfig config,
+ ScheduledExecutorService processingExecutor,
+ ExecutorService callBackExecutor,
+ Supplier<SegmentLoadingMode> loadingModeSupplier,
+ HistoricalLoadingCapabilities serverCapabilities
+ )
+ {
+ this.jsonMapper = jsonMapper;
+ this.requestBodyWriter = jsonMapper.writerFor(REQUEST_ENTITY_TYPE_REF);
+ this.httpClient = httpClient;
+ this.config = config;
+ this.processingExecutor = processingExecutor;
+ this.callBackExecutor = callBackExecutor;
+
+ this.serverId = baseUrl;
+ this.loadingModeSupplier = loadingModeSupplier;
+ this.serverCapabilities = serverCapabilities;
+ }
+
+ private HistoricalLoadingCapabilities fetchSegmentLoadingCapabilities()
+ {
try {
- this.changeRequestURL = new URL(
- new URL(baseUrl),
- StringUtils.nonStrictFormat(
- "druid-internal/v1/segments/changeRequests?timeout=%d",
- config.getHostTimeout().getMillis()
- )
+ log.trace("Fetching historical capabilities from Server[%s].", new
URL(serverId));
+ final URL segmentLoadingCapabilitiesURL = new URL(
+ new URL(serverId),
+ "druid-internal/v1/segments/segmentLoadingCapabilities"
+ );
+
+ BytesAccumulatingResponseHandler responseHandler = new
BytesAccumulatingResponseHandler();
+ InputStream stream = httpClient.go(
+ new Request(HttpMethod.GET, segmentLoadingCapabilitiesURL)
+ .addHeader(HttpHeaders.Names.ACCEPT, MediaType.APPLICATION_JSON),
+ responseHandler,
+ new Duration(10000)
+ ).get();
+
+ if (HttpServletResponse.SC_NOT_FOUND == responseHandler.getStatus()) {
+ log.info(
+ "Historical capabilities endpoint not found at server[%s]. Using
default values.",
+ new URL(serverId)
+ );
+ return new HistoricalLoadingCapabilities(1, 1);
+ }
+
+ if (HttpServletResponse.SC_OK != responseHandler.getStatus()) {
+ throw new RE("Error when fetching capabilities from server[%s].
Received [%s]", new URL(serverId), responseHandler.getStatus());
Review Comment:
Throwing an exception here will just cause a generic alert with message
`Caught exception, ignoring so that schedule keeps going.` to be raised in
`DruidCoordinator`.
Maybe do a more specific `log.makeAlert` here before throwing the exception.
##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -139,17 +146,72 @@ public HttpLoadQueuePeon(
this.callBackExecutor = callBackExecutor;
this.serverId = baseUrl;
+ this.loadingModeSupplier = loadingModeSupplier;
+ this.serverCapabilities = fetchSegmentLoadingCapabilities();
+ }
+
+ @VisibleForTesting
+ HttpLoadQueuePeon(
+ String baseUrl,
+ ObjectMapper jsonMapper,
+ HttpClient httpClient,
+ HttpLoadQueuePeonConfig config,
+ ScheduledExecutorService processingExecutor,
+ ExecutorService callBackExecutor,
+ Supplier<SegmentLoadingMode> loadingModeSupplier,
+ HistoricalLoadingCapabilities serverCapabilities
+ )
+ {
+ this.jsonMapper = jsonMapper;
+ this.requestBodyWriter = jsonMapper.writerFor(REQUEST_ENTITY_TYPE_REF);
+ this.httpClient = httpClient;
+ this.config = config;
+ this.processingExecutor = processingExecutor;
+ this.callBackExecutor = callBackExecutor;
+
+ this.serverId = baseUrl;
+ this.loadingModeSupplier = loadingModeSupplier;
+ this.serverCapabilities = serverCapabilities;
+ }
+
+ private HistoricalLoadingCapabilities fetchSegmentLoadingCapabilities()
+ {
try {
- this.changeRequestURL = new URL(
- new URL(baseUrl),
- StringUtils.nonStrictFormat(
- "druid-internal/v1/segments/changeRequests?timeout=%d",
- config.getHostTimeout().getMillis()
- )
+ log.trace("Fetching historical capabilities from Server[%s].", new
URL(serverId));
Review Comment:
I don't think we ever enable trace logs. Might as well not have this.
##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -139,17 +146,72 @@ public HttpLoadQueuePeon(
this.callBackExecutor = callBackExecutor;
this.serverId = baseUrl;
+ this.loadingModeSupplier = loadingModeSupplier;
+ this.serverCapabilities = fetchSegmentLoadingCapabilities();
+ }
+
+ @VisibleForTesting
+ HttpLoadQueuePeon(
+ String baseUrl,
+ ObjectMapper jsonMapper,
+ HttpClient httpClient,
+ HttpLoadQueuePeonConfig config,
+ ScheduledExecutorService processingExecutor,
+ ExecutorService callBackExecutor,
+ Supplier<SegmentLoadingMode> loadingModeSupplier,
+ HistoricalLoadingCapabilities serverCapabilities
+ )
+ {
+ this.jsonMapper = jsonMapper;
+ this.requestBodyWriter = jsonMapper.writerFor(REQUEST_ENTITY_TYPE_REF);
+ this.httpClient = httpClient;
+ this.config = config;
+ this.processingExecutor = processingExecutor;
+ this.callBackExecutor = callBackExecutor;
+
+ this.serverId = baseUrl;
+ this.loadingModeSupplier = loadingModeSupplier;
+ this.serverCapabilities = serverCapabilities;
+ }
Review Comment:
Can we avoid adding this constructor?
##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -114,21 +119,23 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
private final ObjectMapper jsonMapper;
private final HttpClient httpClient;
- private final URL changeRequestURL;
private final String serverId;
private final AtomicBoolean mainLoopInProgress = new AtomicBoolean(false);
private final ExecutorService callBackExecutor;
+ private final Supplier<SegmentLoadingMode> loadingModeSupplier;
private final ObjectWriter requestBodyWriter;
+ private final HistoricalLoadingCapabilities serverCapabilities;
public HttpLoadQueuePeon(
String baseUrl,
ObjectMapper jsonMapper,
HttpClient httpClient,
HttpLoadQueuePeonConfig config,
ScheduledExecutorService processingExecutor,
- ExecutorService callBackExecutor
+ ExecutorService callBackExecutor,
+ Supplier<SegmentLoadingMode> loadingModeSupplier
Review Comment:
Nit: Put this arg after `HttpLoadQueuePeonConfig config`.
##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -139,17 +146,72 @@ public HttpLoadQueuePeon(
this.callBackExecutor = callBackExecutor;
this.serverId = baseUrl;
+ this.loadingModeSupplier = loadingModeSupplier;
+ this.serverCapabilities = fetchSegmentLoadingCapabilities();
+ }
+
+ @VisibleForTesting
+ HttpLoadQueuePeon(
+ String baseUrl,
+ ObjectMapper jsonMapper,
+ HttpClient httpClient,
+ HttpLoadQueuePeonConfig config,
+ ScheduledExecutorService processingExecutor,
+ ExecutorService callBackExecutor,
+ Supplier<SegmentLoadingMode> loadingModeSupplier,
+ HistoricalLoadingCapabilities serverCapabilities
+ )
+ {
+ this.jsonMapper = jsonMapper;
+ this.requestBodyWriter = jsonMapper.writerFor(REQUEST_ENTITY_TYPE_REF);
+ this.httpClient = httpClient;
+ this.config = config;
+ this.processingExecutor = processingExecutor;
+ this.callBackExecutor = callBackExecutor;
+
+ this.serverId = baseUrl;
+ this.loadingModeSupplier = loadingModeSupplier;
+ this.serverCapabilities = serverCapabilities;
+ }
+
+ private HistoricalLoadingCapabilities fetchSegmentLoadingCapabilities()
+ {
try {
- this.changeRequestURL = new URL(
- new URL(baseUrl),
- StringUtils.nonStrictFormat(
- "druid-internal/v1/segments/changeRequests?timeout=%d",
- config.getHostTimeout().getMillis()
- )
+ log.trace("Fetching historical capabilities from Server[%s].", new
URL(serverId));
+ final URL segmentLoadingCapabilitiesURL = new URL(
+ new URL(serverId),
+ "druid-internal/v1/segments/segmentLoadingCapabilities"
+ );
+
+ BytesAccumulatingResponseHandler responseHandler = new
BytesAccumulatingResponseHandler();
+ InputStream stream = httpClient.go(
+ new Request(HttpMethod.GET, segmentLoadingCapabilitiesURL)
+ .addHeader(HttpHeaders.Names.ACCEPT, MediaType.APPLICATION_JSON),
+ responseHandler,
+ new Duration(10000)
+ ).get();
+
+ if (HttpServletResponse.SC_NOT_FOUND == responseHandler.getStatus()) {
+ log.info(
+ "Historical capabilities endpoint not found at server[%s]. Using
default values.",
+ new URL(serverId)
+ );
+ return new HistoricalLoadingCapabilities(1, 1);
+ }
+
+ if (HttpServletResponse.SC_OK != responseHandler.getStatus()) {
Review Comment:
```suggestion
} else if (HttpServletResponse.SC_OK != responseHandler.getStatus()) {
```
##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -160,7 +222,8 @@ private void doSegmentManagement()
return;
}
- final int batchSize = config.getBatchSize();
+ final SegmentLoadingMode loadingMode = loadingModeSupplier.get();
+ int batchSize = calculateBatchSize(loadingMode);
Review Comment:
```suggestion
final int batchSize = calculateBatchSize(loadingMode);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]