kfaraz commented on code in PR #16889:
URL: https://github.com/apache/druid/pull/16889#discussion_r1773405124


##########
docs/configuration/index.md:
##########
@@ -1135,6 +1135,7 @@ The following configs only apply if the Overlord is 
running in remote mode. For
 |`druid.indexer.runner.taskAssignmentTimeout`|How long to wait after a task 
has been assigned to a Middle Manager before throwing an error.|`PT5M`|
 |`druid.indexer.runner.minWorkerVersion`|The minimum Middle Manager version to 
send tasks to. The version number is a string. This affects the expected 
behavior during certain operations like comparison against 
`druid.worker.version`. Specifically, the version comparison follows dictionary 
order. Use ISO8601 date format for the version to accommodate date comparisons. 
|"0"|
 | `druid.indexer.runner.parallelIndexTaskSlotRatio`| The ratio of task slots 
available for parallel indexing supervisor tasks per worker. The specified 
value must be in the range `[0, 1]`. |1|
+|`druid.indexer.runner.taskSlotLimits`| A map where each key is a task type, 
and the corresponding value represents the limit on the number of task slots 
that a task of that type can occupy on a worker. The key is a `String` that 
specifies the task type. The value can either be a Double or Integer. A 
`Double` in the range [0, 1], representing a ratio of the available task slots 
that tasks of this type can occupy. For example, a value of 0.5 means that 
tasks of this type can occupy up to 50% of the task slots on a worker. A value 
of 0 means that tasks of this type can occupy no slots. A value of 1.0 means no 
restriction, allowing tasks of this type to occupy all available slots. An 
`Integer` that is greater than or equal to 0, representing an absolute limit on 
the number of task slots that tasks of this type can occupy. For example, a 
value of 5 means that tasks of this type can occupy up to 5 task slots on a 
worker.  `taskSlotLimits = {"index_parallel": 0.5, "query_controller": 3}`.
  In this example 'index_parallel' tasks can occupy up to 50% of task slots and 
'query_controller' can occupy up to 3 task slots |Empty map|

Review Comment:
   I don't think it is intuitive to put absolute limits and ratios in the same 
property.
   It would be better to have two separate properties.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/WorkerTaskRunnerConfig.java:
##########
@@ -29,6 +34,10 @@ public class WorkerTaskRunnerConfig
   @JsonProperty
   private double parallelIndexTaskSlotRatio = 1;
 
+  @JsonProperty
+  @JsonDeserialize(using = TaskSlotLimitsDeserializer.class)
+  private Map<String, Number> taskSlotLimits = new HashMap<>();

Review Comment:
   Let's not try to merge task slot ratios and absolute limits into one field. 
They are best kept separate.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java:
##########
@@ -225,6 +253,114 @@ private int getWorkerParallelIndexCapacity(double 
parallelIndexTaskSlotRatio)
     return workerParallelIndexCapacity;
   }
 
+  /**
+   * Determines if a specific task can be executed on the worker based on
+   * various capacity, custom limits, and availability conditions.
+   * <p>
+   * This method checks:
+   * <ul>
+   *   <li>Whether the worker has sufficient capacity to handle the task.</li>
+   *   <li>Whether the task can run under custom-defined limits for its type,
+   *   such as a maximum number of tasks allowed or a ratio of slots the task 
type can occupy.</li>
+   *   <li>Whether the availability group of the task is currently 
available.</li>
+   * </ul>
+   *
+   * @param task The {@link Task} to be executed. The task contains details 
such as required capacity
+   *             and its type.
+   * @param taskLimits A map containing custom limits for different task 
types. The key is a string
+   *                   representing the task type, and the value is a {@link 
Number} which can be:
+   *                   <ul>
+   *                     <li>A {@code Double} representing a ratio of 
available slots the task type can use (0 to 1).</li>
+   *                     <li>An {@code Integer} representing an absolute limit 
of slots the task type can occupy.</li>
+   *                   </ul>
+   *                   If the task type is not present in this map, the task 
can use all available slots.
+   * @return {@code true} if the task can run, meaning the worker has 
sufficient capacity,
+   *         the task type does not exceed custom limits, and the task's 
availability group is available.
+   *         Returns {@code false} otherwise.

Review Comment:
   this is not really needed, seems too verbose.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java:
##########
@@ -42,9 +46,11 @@
 @PublicApi
 public class ImmutableWorkerInfo
 {
+  private static final Logger logger = new Logger(ImmutableWorkerInfo.class);

Review Comment:
   We should not be doing any logging in this class.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java:
##########
@@ -225,6 +253,114 @@ private int getWorkerParallelIndexCapacity(double 
parallelIndexTaskSlotRatio)
     return workerParallelIndexCapacity;
   }
 
+  /**
+   * Determines if a specific task can be executed on the worker based on
+   * various capacity, custom limits, and availability conditions.
+   * <p>
+   * This method checks:
+   * <ul>
+   *   <li>Whether the worker has sufficient capacity to handle the task.</li>
+   *   <li>Whether the task can run under custom-defined limits for its type,
+   *   such as a maximum number of tasks allowed or a ratio of slots the task 
type can occupy.</li>
+   *   <li>Whether the availability group of the task is currently 
available.</li>
+   * </ul>
+   *
+   * @param task The {@link Task} to be executed. The task contains details 
such as required capacity
+   *             and its type.
+   * @param taskLimits A map containing custom limits for different task 
types. The key is a string
+   *                   representing the task type, and the value is a {@link 
Number} which can be:
+   *                   <ul>
+   *                     <li>A {@code Double} representing a ratio of 
available slots the task type can use (0 to 1).</li>
+   *                     <li>An {@code Integer} representing an absolute limit 
of slots the task type can occupy.</li>
+   *                   </ul>
+   *                   If the task type is not present in this map, the task 
can use all available slots.
+   * @return {@code true} if the task can run, meaning the worker has 
sufficient capacity,
+   *         the task type does not exceed custom limits, and the task's 
availability group is available.
+   *         Returns {@code false} otherwise.
+   */
+  public boolean canRunTask(Task task, Map<String, Number> taskLimits)

Review Comment:
   We don't need to have two separate `canRunTask` methods. You can just pass 
`WorkerTaskRunnerConfig` instead of `parallelIndexTaskSlotRatio` into the 
existing `canRunTask` method.
   
   Once we do that, some of the new private methods will go away, and simplify 
the change set.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java:
##########
@@ -260,12 +399,24 @@ public boolean equals(Object o)
              : that.blacklistedUntil != null);
   }
 
+  public Map<String, Integer> incrementTypeSpecificCapacity(String type, int 
capacityToAdd)
+  {
+    Map<String, Integer> result = new HashMap<>(typeSpecificCapacityMap);
+    if (result.containsKey(type)) {
+      result.put(type, result.get(type) + capacityToAdd);
+    } else {
+      result.put(type, capacityToAdd);
+    }

Review Comment:
   maybe simplified with `result.merge()`.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java:
##########
@@ -225,6 +253,114 @@ private int getWorkerParallelIndexCapacity(double 
parallelIndexTaskSlotRatio)
     return workerParallelIndexCapacity;
   }
 
+  /**
+   * Determines if a specific task can be executed on the worker based on
+   * various capacity, custom limits, and availability conditions.
+   * <p>
+   * This method checks:

Review Comment:
   ```suggestion
      * This method returns true only if:
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/WorkerTaskRunnerConfig.java:
##########
@@ -48,4 +57,46 @@ public double getParallelIndexTaskSlotRatio()
   {
     return parallelIndexTaskSlotRatio;
   }
+
+  /**
+   * The `taskSlotLimits` configuration is a map where each key is a task type,
+   * and the corresponding value represents the limit on the number of task 
slots
+   * that a task of that type can occupy on a worker.
+   * <p>
+   * The key is a `String` that specifies the task type.
+   * The value can either be a Double or Integer:
+   * <p>
+   * 1. A `Double` in the range [0, 1], representing a ratio of the available 
task slots
+   * that tasks of this type can occupy. For example, a value of 0.5 means 
that tasks
+   * of this type can occupy up to 50% of the task slots on a worker.
+   * A value of 0 means that tasks of this type can occupy no slots (i.e., 
they are effectively disabled).
+   * A value of 1.0 means no restriction, allowing tasks of this type to 
occupy all available slots.
+   * <p>
+   * 2. An `Integer` that is greater than or equal to 0, representing an 
absolute limit
+   * on the number of task slots that tasks of this type can occupy. For 
example, a value of 5
+   * means that tasks of this type can occupy up to 5 task slots on a worker.
+   * <p>
+   * If a task type is not present in the `taskSlotLimits` map, there is no 
restriction
+   * on the number of task slots it can occupy, meaning it can use all 
available slots.
+   * <p>
+   * Example:
+   * <p>
+   * taskSlotLimits = {
+   * "index_parallel": 0.5,  // 'index_parallel' can occupy up to 50% of task 
slots
+   * "query_controller": 3     // 'query_controller' can occupy up to 3 task 
slots
+   * }
+   * <p>
+   * This configuration allows for granular control over the allocation of 
task slots
+   * based on the specific needs of different task types, helping to prevent 
any one type
+   * of task from monopolizing worker resources and reducing the risk of 
deadlocks.
+   *
+   * @return A map where the key is the task type (`String`), and the value is 
either a `Double` (0 to 1)
+   * representing the ratio of task slots available for that type, or an 
`Integer` (>= 0)
+   * representing the absolute limit of task slots for that type. If a task 
type is absent,
+   * it is not limited in terms of the number of task slots it can occupy.
+   */

Review Comment:
   Please try to simplify and shorten this javadoc retaining only the necessary 
information in a concise manner.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/util/TaskSlotLimitsDeserializer.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.util;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+public class TaskSlotLimitsDeserializer extends JsonDeserializer<Map<String, 
Number>>

Review Comment:
   This should not be needed once task slot ratio and task slot limits are 
separated.



##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfoTest.java:
##########
@@ -201,6 +211,7 @@ public void testEqualsAndSerde()
         ),
         3,
         0,
+        new HashMap<>(),

Review Comment:
   Use `Collections.emptyMap()` instead.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java:
##########
@@ -57,6 +63,7 @@ public ImmutableWorkerInfo(
       @JsonProperty("worker") Worker worker,
       @JsonProperty("currCapacityUsed") int currCapacityUsed,
       @JsonProperty("currParallelIndexCapacityUsed") int 
currParallelIndexCapacityUsed,
+      @JsonProperty("currTypeSpecificCapacityUsed") @Nullable Map<String, 
Integer> typeSpecificCapacityMap,

Review Comment:
   Please use the same name for the argument, the JSON serialized field and the 
class member for readability.
   
   Also, this name is a little ambiguous, use something else like 
`currCapacityUsedByTaskType`.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java:
##########
@@ -225,6 +253,114 @@ private int getWorkerParallelIndexCapacity(double 
parallelIndexTaskSlotRatio)
     return workerParallelIndexCapacity;
   }
 
+  /**
+   * Determines if a specific task can be executed on the worker based on
+   * various capacity, custom limits, and availability conditions.
+   * <p>
+   * This method checks:
+   * <ul>
+   *   <li>Whether the worker has sufficient capacity to handle the task.</li>

Review Comment:
   ```suggestion
      *   <li>The worker has sufficient capacity to handle the task.</li>
   ```



##########
docs/configuration/index.md:
##########
@@ -1135,6 +1135,7 @@ The following configs only apply if the Overlord is 
running in remote mode. For
 |`druid.indexer.runner.taskAssignmentTimeout`|How long to wait after a task 
has been assigned to a Middle Manager before throwing an error.|`PT5M`|
 |`druid.indexer.runner.minWorkerVersion`|The minimum Middle Manager version to 
send tasks to. The version number is a string. This affects the expected 
behavior during certain operations like comparison against 
`druid.worker.version`. Specifically, the version comparison follows dictionary 
order. Use ISO8601 date format for the version to accommodate date comparisons. 
|"0"|
 | `druid.indexer.runner.parallelIndexTaskSlotRatio`| The ratio of task slots 
available for parallel indexing supervisor tasks per worker. The specified 
value must be in the range `[0, 1]`. |1|
+|`druid.indexer.runner.taskSlotLimits`| A map where each key is a task type, 
and the corresponding value represents the limit on the number of task slots 
that a task of that type can occupy on a worker. The key is a `String` that 
specifies the task type. The value can either be a Double or Integer. A 
`Double` in the range [0, 1], representing a ratio of the available task slots 
that tasks of this type can occupy. An `Integer` that is greater than or equal 
to 0, representing an absolute limit on the number of task slots that tasks of 
this type can occupy.|Empty map|

Review Comment:
   I think the limit on compaction tasks (or kill tasks for that matter) should 
not be a concern.
   This is a runtime property, typically controlled by an admin.
   So, if an admin wants to restrict the number of concurrent compaction tasks, 
it is fair to honor that irrespective of the value of `compactionTaskSlotRatio` 
or `maxCompactionTaskSlots` set in the coordinator dynamic configs.
   
   We just need to call it out clearly in the release notes and the docs of the 
new property.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ZkWorker.java:
##########
@@ -167,6 +167,29 @@ private int getCurrParallelIndexCapacityUsed(Map<String, 
TaskAnnouncement> tasks
     return currParallelIndexCapacityUsed;
   }
 
+  @JsonProperty("currTypeSpecificCapacityUsed")
+  public Map<String, Integer> getCurrTypeSpecificCapacityUsed()

Review Comment:
   Yes, ZK-based task runner is deprecated and we should not support the new 
feature with ZK.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to