kfaraz commented on code in PR #17775:
URL: https://github.com/apache/druid/pull/17775#discussion_r1978658199


##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -386,5 +416,31 @@ public boolean cancel(boolean interruptIfRunning)
       return true;
     }
   }
+
+  public ScheduledExecutorService getExecutorService(SegmentLoadingMode 
loadingMode)
+  {
+    switch (loadingMode) {
+      case TURBO:
+        return turboExec;
+      case NORMAL:
+        return exec;
+      default:
+        throw DruidException.defensive("Unknown execution mode [%s]", 
loadingMode);
+    }
+  }
+
+  public static SegmentLoadingMode getLoadingMode(CoordinatorDynamicConfig 
coordinatorDynamicConfig, String serverName)

Review Comment:
   This method may live in `CoordinatorDynamicConfig` instead.



##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -386,5 +416,31 @@ public boolean cancel(boolean interruptIfRunning)
       return true;
     }
   }
+
+  public ScheduledExecutorService getExecutorService(SegmentLoadingMode 
loadingMode)
+  {
+    switch (loadingMode) {
+      case TURBO:
+        return turboExec;
+      case NORMAL:
+        return exec;
+      default:
+        throw DruidException.defensive("Unknown execution mode [%s]", 
loadingMode);
+    }
+  }
+
+  public static SegmentLoadingMode getLoadingMode(CoordinatorDynamicConfig 
coordinatorDynamicConfig, String serverName)
+  {
+    final Set<String> turboLoadHistoricals = 
coordinatorDynamicConfig.getTurboLoadHistoricals();
+    return turboLoadHistoricals.contains(serverName) ?
+           SegmentLoadingMode.TURBO :
+           SegmentLoadingMode.NORMAL;
+  }
+
+  public enum SegmentLoadingMode

Review Comment:
   Please move this enum out into a separate file. It seems you had already 
intended to do so.



##########
server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperCacheTest.java:
##########
@@ -219,12 +219,12 @@ public void testLoadLocalCache() throws IOException, 
SegmentLoadingException
 
     // Make sure adding segments beyond allowed size fails
     DataSegment newSegment = TestSegmentUtils.makeSegment("test", 
"new-segment", SEGMENT_SIZE);
-    loadDropHandler.addSegment(newSegment, null);
+    loadDropHandler.addSegment(newSegment, null, 
SegmentLoadDropHandler.SegmentLoadingMode.NORMAL);
     
Assert.assertFalse(segmentAnnouncer.getObservedSegments().contains(newSegment));
 
     // Clearing some segment should allow for new segments
-    loadDropHandler.removeSegment(expectedSegments.get(0), null, false);
-    loadDropHandler.addSegment(newSegment, null);
+    loadDropHandler.removeSegment(expectedSegments.get(0), null, false, 
SegmentLoadDropHandler.SegmentLoadingMode.NORMAL);
+    loadDropHandler.addSegment(newSegment, null, 
SegmentLoadDropHandler.SegmentLoadingMode.NORMAL);

Review Comment:
   The older variant of the method still exists, why do we need to use the new 
one here?



##########
server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java:
##########
@@ -118,7 +120,8 @@ public CoordinatorDynamicConfig(
       @JsonProperty("replicateAfterLoadTimeout") boolean 
replicateAfterLoadTimeout,
       @JsonProperty("useRoundRobinSegmentAssignment") @Nullable Boolean 
useRoundRobinSegmentAssignment,
       @JsonProperty("smartSegmentLoading") @Nullable Boolean 
smartSegmentLoading,
-      @JsonProperty("debugDimensions") @Nullable Map<String, String> 
debugDimensions
+      @JsonProperty("debugDimensions") @Nullable Map<String, String> 
debugDimensions,
+      @JsonProperty("turboLoadHistoricals") @Nullable Set<String> 
turboLoadHistoricals

Review Comment:
   Should we name the field `turboLoadingNodes` instead?
   The coordinator does manage loading on non-historical nodes too.
   Since there is no check stopping a user from putting say, a broker in turbo 
mode, we should name this field accordingly.



##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -112,23 +115,27 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
 
   private final HttpLoadQueuePeonConfig config;
 
+  private final String serverName;
   private final ObjectMapper jsonMapper;
   private final HttpClient httpClient;
   private final URL changeRequestURL;
   private final String serverId;
 
   private final AtomicBoolean mainLoopInProgress = new AtomicBoolean(false);
   private final ExecutorService callBackExecutor;
+  private final CoordinatorConfigManager coordinatorConfigManager;
 
   private final ObjectWriter requestBodyWriter;
 
   public HttpLoadQueuePeon(
       String baseUrl,
+      String serverName,
       ObjectMapper jsonMapper,
       HttpClient httpClient,
       HttpLoadQueuePeonConfig config,
       ScheduledExecutorService processingExecutor,
-      ExecutorService callBackExecutor
+      ExecutorService callBackExecutor,
+      CoordinatorConfigManager coordinatorConfigManager

Review Comment:
   Don't pass the config manager in here, pass a 
`Supplier<CoordinatorDynamicConfig>` instead,
   or better yet you may just pass a `Supplier<SegmentLoadingMode>`. The 
supplier can have the logic to determine the loading mode from the dynamic 
config.



##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -386,5 +416,31 @@ public boolean cancel(boolean interruptIfRunning)
       return true;
     }
   }
+
+  public ScheduledExecutorService getExecutorService(SegmentLoadingMode 
loadingMode)

Review Comment:
   Does this need to be public?



##########
server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java:
##########
@@ -308,6 +312,12 @@ public boolean getReplicateAfterLoadTimeout()
     return replicateAfterLoadTimeout;
   }
 
+  @JsonProperty
+  public Set<String> getTurboLoadHistoricals()

Review Comment:
   Please add a javadoc here.



##########
server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java:
##########
@@ -215,23 +215,131 @@ public void onFailure(Throwable th)
     return null;
   }
 
+  /**
+   * Deprecated.
+   *
+   * @see SegmentListerResource#applyDataSegmentChangeRequests(long, 
HistoricalSegmentChangeRequest, HttpServletRequest)
+   */
+  @Deprecated
+  @POST
+  @Path("/changeRequests")
+  @Produces({MediaType.APPLICATION_JSON, 
SmileMediaTypes.APPLICATION_JACKSON_SMILE})
+  @Consumes({MediaType.APPLICATION_JSON, 
SmileMediaTypes.APPLICATION_JACKSON_SMILE})
+  public void applyDataSegmentChangeRequests(
+      @QueryParam("timeout") long timeout,
+      List<DataSegmentChangeRequest> changeRequestList,
+      @Context final HttpServletRequest req
+  ) throws IOException
+  {
+    if (loadDropRequestHandler == null) {
+      sendErrorResponse(req, HttpServletResponse.SC_NOT_FOUND, "load/drop 
handler is not available.");
+      return;
+    }
+
+    if (timeout <= 0) {
+      sendErrorResponse(req, HttpServletResponse.SC_BAD_REQUEST, "timeout must 
be positive.");
+      return;
+    }
+
+    if (changeRequestList == null || changeRequestList.isEmpty()) {
+      sendErrorResponse(req, HttpServletResponse.SC_BAD_REQUEST, "No change 
requests provided.");
+      return;
+    }
+
+    final ResponseContext context = createContext(req.getHeader("Accept"));
+    final ListenableFuture<List<DataSegmentChangeResponse>> future =
+        loadDropRequestHandler.processBatch(changeRequestList, 
SegmentLoadDropHandler.SegmentLoadingMode.NORMAL);
+
+    final AsyncContext asyncContext = req.startAsync();
+
+    asyncContext.addListener(
+        new AsyncListener()
+        {
+          @Override
+          public void onComplete(AsyncEvent event)
+          {
+          }
+
+          @Override
+          public void onTimeout(AsyncEvent event)
+          {
+
+            // HTTP 204 NO_CONTENT is sent to the client.
+            future.cancel(true);
+            event.getAsyncContext().complete();
+          }
+
+          @Override
+          public void onError(AsyncEvent event)
+          {
+          }
+
+          @Override
+          public void onStartAsync(AsyncEvent event)
+          {
+          }
+        }
+    );
+
+    Futures.addCallback(
+        future,
+        new FutureCallback<>()
+        {
+          @Override
+          public void onSuccess(List<DataSegmentChangeResponse> result)
+          {
+            try {
+              HttpServletResponse response = (HttpServletResponse) 
asyncContext.getResponse();
+              response.setStatus(HttpServletResponse.SC_OK);
+              
context.inputMapper.writerFor(HttpLoadQueuePeon.RESPONSE_ENTITY_TYPE_REF)
+                                 
.writeValue(asyncContext.getResponse().getOutputStream(), result);
+              asyncContext.complete();
+            }
+            catch (Exception ex) {
+              log.debug(ex, "Request timed out or closed already.");
+            }
+          }
+
+          @Override
+          public void onFailure(Throwable th)
+          {
+            try {
+              HttpServletResponse response = (HttpServletResponse) 
asyncContext.getResponse();
+              if (th instanceof IllegalArgumentException) {
+                response.sendError(HttpServletResponse.SC_BAD_REQUEST, 
th.getMessage());
+              } else {
+                
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, 
th.getMessage());
+              }
+              asyncContext.complete();
+            }
+            catch (Exception ex) {
+              log.debug(ex, "Request timed out or closed already.");
+            }
+          }
+        },
+        MoreExecutors.directExecutor()
+    );
+
+    asyncContext.setTimeout(timeout);
+  }
+
   /**
    * This endpoint is used by HttpLoadQueuePeon to assign segment load/drop 
requests batch. This endpoint makes the
    * client wait till one of the following events occur. Note that this is 
implemented using async IO so no jetty
    * threads are held while in wait.
-   *
+   * <br>
    * (1) Given timeout elapses.
    * (2) Some load/drop request completed.
-   *
+   * <br>
    * It returns a map of "load/drop request -> SUCCESS/FAILED/PENDING status" 
for each request in the batch.
    */
   @POST
-  @Path("/changeRequests")
+  @Path("/segmentChangeRequests")
   @Produces({MediaType.APPLICATION_JSON, 
SmileMediaTypes.APPLICATION_JACKSON_SMILE})
   @Consumes({MediaType.APPLICATION_JSON, 
SmileMediaTypes.APPLICATION_JACKSON_SMILE})
   public void applyDataSegmentChangeRequests(

Review Comment:
   It seems that this new method could have used the existing one, thereby 
simplifying the diff.
   Please check if that is a possibility.
   
   Looking at the APIs, I suppose the loadingMode could simply be a query param 
too.
   That would not break backward compat either and we wouldn't need a new API.
   What do you think?



##########
server/src/main/java/org/apache/druid/server/http/SegmentLoadingMode.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.http;
+
+/**

Review Comment:
   empty file?



##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -244,14 +266,14 @@ public Collection<DataSegment> getSegmentsToDelete()
     return ImmutableList.copyOf(segmentsToDelete);
   }
 
-  public ListenableFuture<List<DataSegmentChangeResponse>> 
processBatch(List<DataSegmentChangeRequest> changeRequests)
+  public ListenableFuture<List<DataSegmentChangeResponse>> 
processBatch(List<DataSegmentChangeRequest> changeRequests, SegmentLoadingMode 
segmentLoadingMode)

Review Comment:
   Please add a javadoc to this and other new methods.



##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -200,8 +209,13 @@ private void doSegmentManagement()
       return;
     }
 
+    SegmentLoadDropHandler.SegmentLoadingMode loadingMode =
+        
SegmentLoadDropHandler.getLoadingMode(coordinatorConfigManager.getCurrentDynamicConfig(),
 serverName);

Review Comment:
   This class should not refer to `SegmentLoadDropHandler` in any way.
   Add a non-static `getLoadingModeForServer(String)` method to 
`CoordinatorDynamicConfig` instead.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to