kfaraz commented on code in PR #17775:
URL: https://github.com/apache/druid/pull/17775#discussion_r1978658199
##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -386,5 +416,31 @@ public boolean cancel(boolean interruptIfRunning)
return true;
}
}
+
+ public ScheduledExecutorService getExecutorService(SegmentLoadingMode
loadingMode)
+ {
+ switch (loadingMode) {
+ case TURBO:
+ return turboExec;
+ case NORMAL:
+ return exec;
+ default:
+ throw DruidException.defensive("Unknown execution mode [%s]",
loadingMode);
+ }
+ }
+
+ public static SegmentLoadingMode getLoadingMode(CoordinatorDynamicConfig
coordinatorDynamicConfig, String serverName)
Review Comment:
This method may live in `CoordinatorDynamicConfig` instead.
##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -386,5 +416,31 @@ public boolean cancel(boolean interruptIfRunning)
return true;
}
}
+
+ public ScheduledExecutorService getExecutorService(SegmentLoadingMode
loadingMode)
+ {
+ switch (loadingMode) {
+ case TURBO:
+ return turboExec;
+ case NORMAL:
+ return exec;
+ default:
+ throw DruidException.defensive("Unknown execution mode [%s]",
loadingMode);
+ }
+ }
+
+ public static SegmentLoadingMode getLoadingMode(CoordinatorDynamicConfig
coordinatorDynamicConfig, String serverName)
+ {
+ final Set<String> turboLoadHistoricals =
coordinatorDynamicConfig.getTurboLoadHistoricals();
+ return turboLoadHistoricals.contains(serverName) ?
+ SegmentLoadingMode.TURBO :
+ SegmentLoadingMode.NORMAL;
+ }
+
+ public enum SegmentLoadingMode
Review Comment:
Please move this enum out into a separate file. It seems you had already
intended to do so.
##########
server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperCacheTest.java:
##########
@@ -219,12 +219,12 @@ public void testLoadLocalCache() throws IOException,
SegmentLoadingException
// Make sure adding segments beyond allowed size fails
DataSegment newSegment = TestSegmentUtils.makeSegment("test",
"new-segment", SEGMENT_SIZE);
- loadDropHandler.addSegment(newSegment, null);
+ loadDropHandler.addSegment(newSegment, null,
SegmentLoadDropHandler.SegmentLoadingMode.NORMAL);
Assert.assertFalse(segmentAnnouncer.getObservedSegments().contains(newSegment));
// Clearing some segment should allow for new segments
- loadDropHandler.removeSegment(expectedSegments.get(0), null, false);
- loadDropHandler.addSegment(newSegment, null);
+ loadDropHandler.removeSegment(expectedSegments.get(0), null, false,
SegmentLoadDropHandler.SegmentLoadingMode.NORMAL);
+ loadDropHandler.addSegment(newSegment, null,
SegmentLoadDropHandler.SegmentLoadingMode.NORMAL);
Review Comment:
The older variant of the method still exists, why do we need to use the new
one here?
##########
server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java:
##########
@@ -118,7 +120,8 @@ public CoordinatorDynamicConfig(
@JsonProperty("replicateAfterLoadTimeout") boolean
replicateAfterLoadTimeout,
@JsonProperty("useRoundRobinSegmentAssignment") @Nullable Boolean
useRoundRobinSegmentAssignment,
@JsonProperty("smartSegmentLoading") @Nullable Boolean
smartSegmentLoading,
- @JsonProperty("debugDimensions") @Nullable Map<String, String>
debugDimensions
+ @JsonProperty("debugDimensions") @Nullable Map<String, String>
debugDimensions,
+ @JsonProperty("turboLoadHistoricals") @Nullable Set<String>
turboLoadHistoricals
Review Comment:
Should we name the field `turboLoadingNodes` instead?
The coordinator does manage loading on non-historical nodes too.
Since there is no check stopping a user from putting say, a broker in turbo
mode, we should name this field accordingly.
##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -112,23 +115,27 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
private final HttpLoadQueuePeonConfig config;
+ private final String serverName;
private final ObjectMapper jsonMapper;
private final HttpClient httpClient;
private final URL changeRequestURL;
private final String serverId;
private final AtomicBoolean mainLoopInProgress = new AtomicBoolean(false);
private final ExecutorService callBackExecutor;
+ private final CoordinatorConfigManager coordinatorConfigManager;
private final ObjectWriter requestBodyWriter;
public HttpLoadQueuePeon(
String baseUrl,
+ String serverName,
ObjectMapper jsonMapper,
HttpClient httpClient,
HttpLoadQueuePeonConfig config,
ScheduledExecutorService processingExecutor,
- ExecutorService callBackExecutor
+ ExecutorService callBackExecutor,
+ CoordinatorConfigManager coordinatorConfigManager
Review Comment:
Don't pass the config manager in here, pass a
`Supplier<CoordinatorDynamicConfig>` instead,
or better yet you may just pass a `Supplier<SegmentLoadingMode>`. The
supplier can have the logic to determine the loading mode from the dynamic
config.
##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -386,5 +416,31 @@ public boolean cancel(boolean interruptIfRunning)
return true;
}
}
+
+ public ScheduledExecutorService getExecutorService(SegmentLoadingMode
loadingMode)
Review Comment:
Does this need to be public?
##########
server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java:
##########
@@ -308,6 +312,12 @@ public boolean getReplicateAfterLoadTimeout()
return replicateAfterLoadTimeout;
}
+ @JsonProperty
+ public Set<String> getTurboLoadHistoricals()
Review Comment:
Please add a javadoc here.
##########
server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java:
##########
@@ -215,23 +215,131 @@ public void onFailure(Throwable th)
return null;
}
+ /**
+ * Deprecated.
+ *
+ * @see SegmentListerResource#applyDataSegmentChangeRequests(long,
HistoricalSegmentChangeRequest, HttpServletRequest)
+ */
+ @Deprecated
+ @POST
+ @Path("/changeRequests")
+ @Produces({MediaType.APPLICATION_JSON,
SmileMediaTypes.APPLICATION_JACKSON_SMILE})
+ @Consumes({MediaType.APPLICATION_JSON,
SmileMediaTypes.APPLICATION_JACKSON_SMILE})
+ public void applyDataSegmentChangeRequests(
+ @QueryParam("timeout") long timeout,
+ List<DataSegmentChangeRequest> changeRequestList,
+ @Context final HttpServletRequest req
+ ) throws IOException
+ {
+ if (loadDropRequestHandler == null) {
+ sendErrorResponse(req, HttpServletResponse.SC_NOT_FOUND, "load/drop
handler is not available.");
+ return;
+ }
+
+ if (timeout <= 0) {
+ sendErrorResponse(req, HttpServletResponse.SC_BAD_REQUEST, "timeout must
be positive.");
+ return;
+ }
+
+ if (changeRequestList == null || changeRequestList.isEmpty()) {
+ sendErrorResponse(req, HttpServletResponse.SC_BAD_REQUEST, "No change
requests provided.");
+ return;
+ }
+
+ final ResponseContext context = createContext(req.getHeader("Accept"));
+ final ListenableFuture<List<DataSegmentChangeResponse>> future =
+ loadDropRequestHandler.processBatch(changeRequestList,
SegmentLoadDropHandler.SegmentLoadingMode.NORMAL);
+
+ final AsyncContext asyncContext = req.startAsync();
+
+ asyncContext.addListener(
+ new AsyncListener()
+ {
+ @Override
+ public void onComplete(AsyncEvent event)
+ {
+ }
+
+ @Override
+ public void onTimeout(AsyncEvent event)
+ {
+
+ // HTTP 204 NO_CONTENT is sent to the client.
+ future.cancel(true);
+ event.getAsyncContext().complete();
+ }
+
+ @Override
+ public void onError(AsyncEvent event)
+ {
+ }
+
+ @Override
+ public void onStartAsync(AsyncEvent event)
+ {
+ }
+ }
+ );
+
+ Futures.addCallback(
+ future,
+ new FutureCallback<>()
+ {
+ @Override
+ public void onSuccess(List<DataSegmentChangeResponse> result)
+ {
+ try {
+ HttpServletResponse response = (HttpServletResponse)
asyncContext.getResponse();
+ response.setStatus(HttpServletResponse.SC_OK);
+
context.inputMapper.writerFor(HttpLoadQueuePeon.RESPONSE_ENTITY_TYPE_REF)
+
.writeValue(asyncContext.getResponse().getOutputStream(), result);
+ asyncContext.complete();
+ }
+ catch (Exception ex) {
+ log.debug(ex, "Request timed out or closed already.");
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable th)
+ {
+ try {
+ HttpServletResponse response = (HttpServletResponse)
asyncContext.getResponse();
+ if (th instanceof IllegalArgumentException) {
+ response.sendError(HttpServletResponse.SC_BAD_REQUEST,
th.getMessage());
+ } else {
+
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
th.getMessage());
+ }
+ asyncContext.complete();
+ }
+ catch (Exception ex) {
+ log.debug(ex, "Request timed out or closed already.");
+ }
+ }
+ },
+ MoreExecutors.directExecutor()
+ );
+
+ asyncContext.setTimeout(timeout);
+ }
+
/**
* This endpoint is used by HttpLoadQueuePeon to assign segment load/drop
requests batch. This endpoint makes the
* client wait till one of the following events occur. Note that this is
implemented using async IO so no jetty
* threads are held while in wait.
- *
+ * <br>
* (1) Given timeout elapses.
* (2) Some load/drop request completed.
- *
+ * <br>
* It returns a map of "load/drop request -> SUCCESS/FAILED/PENDING status"
for each request in the batch.
*/
@POST
- @Path("/changeRequests")
+ @Path("/segmentChangeRequests")
@Produces({MediaType.APPLICATION_JSON,
SmileMediaTypes.APPLICATION_JACKSON_SMILE})
@Consumes({MediaType.APPLICATION_JSON,
SmileMediaTypes.APPLICATION_JACKSON_SMILE})
public void applyDataSegmentChangeRequests(
Review Comment:
It seems that this new method could have used the existing one, thereby
simplifying the diff.
Please check if that is a possibility.
Looking at the APIs, I suppose the loadingMode could simply be a query param
too.
That would not break backward compat either and we wouldn't need a new API.
What do you think?
##########
server/src/main/java/org/apache/druid/server/http/SegmentLoadingMode.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.http;
+
+/**
Review Comment:
empty file?
##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -244,14 +266,14 @@ public Collection<DataSegment> getSegmentsToDelete()
return ImmutableList.copyOf(segmentsToDelete);
}
- public ListenableFuture<List<DataSegmentChangeResponse>>
processBatch(List<DataSegmentChangeRequest> changeRequests)
+ public ListenableFuture<List<DataSegmentChangeResponse>>
processBatch(List<DataSegmentChangeRequest> changeRequests, SegmentLoadingMode
segmentLoadingMode)
Review Comment:
Please add a javadoc to this and other new methods.
##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -200,8 +209,13 @@ private void doSegmentManagement()
return;
}
+ SegmentLoadDropHandler.SegmentLoadingMode loadingMode =
+
SegmentLoadDropHandler.getLoadingMode(coordinatorConfigManager.getCurrentDynamicConfig(),
serverName);
Review Comment:
This class should not refer to `SegmentLoadDropHandler` in any way.
Add a non-static `getLoadingModeForServer(String)` method to
`CoordinatorDynamicConfig` instead.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]