kfaraz commented on code in PR #16889: URL: https://github.com/apache/druid/pull/16889#discussion_r1773405124
########## docs/configuration/index.md: ########## @@ -1135,6 +1135,7 @@ The following configs only apply if the Overlord is running in remote mode. For |`druid.indexer.runner.taskAssignmentTimeout`|How long to wait after a task has been assigned to a Middle Manager before throwing an error.|`PT5M`| |`druid.indexer.runner.minWorkerVersion`|The minimum Middle Manager version to send tasks to. The version number is a string. This affects the expected behavior during certain operations like comparison against `druid.worker.version`. Specifically, the version comparison follows dictionary order. Use ISO8601 date format for the version to accommodate date comparisons. |"0"| | `druid.indexer.runner.parallelIndexTaskSlotRatio`| The ratio of task slots available for parallel indexing supervisor tasks per worker. The specified value must be in the range `[0, 1]`. |1| +|`druid.indexer.runner.taskSlotLimits`| A map where each key is a task type, and the corresponding value represents the limit on the number of task slots that a task of that type can occupy on a worker. The key is a `String` that specifies the task type. The value can either be a Double or Integer. A `Double` in the range [0, 1], representing a ratio of the available task slots that tasks of this type can occupy. For example, a value of 0.5 means that tasks of this type can occupy up to 50% of the task slots on a worker. A value of 0 means that tasks of this type can occupy no slots. A value of 1.0 means no restriction, allowing tasks of this type to occupy all available slots. An `Integer` that is greater than or equal to 0, representing an absolute limit on the number of task slots that tasks of this type can occupy. For example, a value of 5 means that tasks of this type can occupy up to 5 task slots on a worker. `taskSlotLimits = {"index_parallel": 0.5, "query_controller": 3}`. In this example 'index_parallel' tasks can occupy up to 50% of task slots and 'query_controller' can occupy up to 3 task slots |Empty map| Review Comment: I don't think it is intuitive to put absolute limits and ratios in the same property. It would be better to have two separate properties. ########## indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/WorkerTaskRunnerConfig.java: ########## @@ -29,6 +34,10 @@ public class WorkerTaskRunnerConfig @JsonProperty private double parallelIndexTaskSlotRatio = 1; + @JsonProperty + @JsonDeserialize(using = TaskSlotLimitsDeserializer.class) + private Map<String, Number> taskSlotLimits = new HashMap<>(); Review Comment: Let's not try to merge task slot ratios and absolute limits into one field. They are best kept separate. ########## indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java: ########## @@ -225,6 +253,114 @@ private int getWorkerParallelIndexCapacity(double parallelIndexTaskSlotRatio) return workerParallelIndexCapacity; } + /** + * Determines if a specific task can be executed on the worker based on + * various capacity, custom limits, and availability conditions. + * <p> + * This method checks: + * <ul> + * <li>Whether the worker has sufficient capacity to handle the task.</li> + * <li>Whether the task can run under custom-defined limits for its type, + * such as a maximum number of tasks allowed or a ratio of slots the task type can occupy.</li> + * <li>Whether the availability group of the task is currently available.</li> + * </ul> + * + * @param task The {@link Task} to be executed. The task contains details such as required capacity + * and its type. + * @param taskLimits A map containing custom limits for different task types. The key is a string + * representing the task type, and the value is a {@link Number} which can be: + * <ul> + * <li>A {@code Double} representing a ratio of available slots the task type can use (0 to 1).</li> + * <li>An {@code Integer} representing an absolute limit of slots the task type can occupy.</li> + * </ul> + * If the task type is not present in this map, the task can use all available slots. + * @return {@code true} if the task can run, meaning the worker has sufficient capacity, + * the task type does not exceed custom limits, and the task's availability group is available. + * Returns {@code false} otherwise. Review Comment: this is not really needed, seems too verbose. ########## indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java: ########## @@ -42,9 +46,11 @@ @PublicApi public class ImmutableWorkerInfo { + private static final Logger logger = new Logger(ImmutableWorkerInfo.class); Review Comment: We should not be doing any logging in this class. ########## indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java: ########## @@ -225,6 +253,114 @@ private int getWorkerParallelIndexCapacity(double parallelIndexTaskSlotRatio) return workerParallelIndexCapacity; } + /** + * Determines if a specific task can be executed on the worker based on + * various capacity, custom limits, and availability conditions. + * <p> + * This method checks: + * <ul> + * <li>Whether the worker has sufficient capacity to handle the task.</li> + * <li>Whether the task can run under custom-defined limits for its type, + * such as a maximum number of tasks allowed or a ratio of slots the task type can occupy.</li> + * <li>Whether the availability group of the task is currently available.</li> + * </ul> + * + * @param task The {@link Task} to be executed. The task contains details such as required capacity + * and its type. + * @param taskLimits A map containing custom limits for different task types. The key is a string + * representing the task type, and the value is a {@link Number} which can be: + * <ul> + * <li>A {@code Double} representing a ratio of available slots the task type can use (0 to 1).</li> + * <li>An {@code Integer} representing an absolute limit of slots the task type can occupy.</li> + * </ul> + * If the task type is not present in this map, the task can use all available slots. + * @return {@code true} if the task can run, meaning the worker has sufficient capacity, + * the task type does not exceed custom limits, and the task's availability group is available. + * Returns {@code false} otherwise. + */ + public boolean canRunTask(Task task, Map<String, Number> taskLimits) Review Comment: We don't need to have two separate `canRunTask` methods. You can just pass `WorkerTaskRunnerConfig` instead of `parallelIndexTaskSlotRatio` into the existing `canRunTask` method. Once we do that, some of the new private methods will go away, and simplify the change set. ########## indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java: ########## @@ -260,12 +399,24 @@ public boolean equals(Object o) : that.blacklistedUntil != null); } + public Map<String, Integer> incrementTypeSpecificCapacity(String type, int capacityToAdd) + { + Map<String, Integer> result = new HashMap<>(typeSpecificCapacityMap); + if (result.containsKey(type)) { + result.put(type, result.get(type) + capacityToAdd); + } else { + result.put(type, capacityToAdd); + } Review Comment: maybe simplified with `result.merge()`. ########## indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java: ########## @@ -225,6 +253,114 @@ private int getWorkerParallelIndexCapacity(double parallelIndexTaskSlotRatio) return workerParallelIndexCapacity; } + /** + * Determines if a specific task can be executed on the worker based on + * various capacity, custom limits, and availability conditions. + * <p> + * This method checks: Review Comment: ```suggestion * This method returns true only if: ``` ########## indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/WorkerTaskRunnerConfig.java: ########## @@ -48,4 +57,46 @@ public double getParallelIndexTaskSlotRatio() { return parallelIndexTaskSlotRatio; } + + /** + * The `taskSlotLimits` configuration is a map where each key is a task type, + * and the corresponding value represents the limit on the number of task slots + * that a task of that type can occupy on a worker. + * <p> + * The key is a `String` that specifies the task type. + * The value can either be a Double or Integer: + * <p> + * 1. A `Double` in the range [0, 1], representing a ratio of the available task slots + * that tasks of this type can occupy. For example, a value of 0.5 means that tasks + * of this type can occupy up to 50% of the task slots on a worker. + * A value of 0 means that tasks of this type can occupy no slots (i.e., they are effectively disabled). + * A value of 1.0 means no restriction, allowing tasks of this type to occupy all available slots. + * <p> + * 2. An `Integer` that is greater than or equal to 0, representing an absolute limit + * on the number of task slots that tasks of this type can occupy. For example, a value of 5 + * means that tasks of this type can occupy up to 5 task slots on a worker. + * <p> + * If a task type is not present in the `taskSlotLimits` map, there is no restriction + * on the number of task slots it can occupy, meaning it can use all available slots. + * <p> + * Example: + * <p> + * taskSlotLimits = { + * "index_parallel": 0.5, // 'index_parallel' can occupy up to 50% of task slots + * "query_controller": 3 // 'query_controller' can occupy up to 3 task slots + * } + * <p> + * This configuration allows for granular control over the allocation of task slots + * based on the specific needs of different task types, helping to prevent any one type + * of task from monopolizing worker resources and reducing the risk of deadlocks. + * + * @return A map where the key is the task type (`String`), and the value is either a `Double` (0 to 1) + * representing the ratio of task slots available for that type, or an `Integer` (>= 0) + * representing the absolute limit of task slots for that type. If a task type is absent, + * it is not limited in terms of the number of task slots it can occupy. + */ Review Comment: Please try to simplify and shorten this javadoc retaining only the necessary information in a concise manner. ########## indexing-service/src/main/java/org/apache/druid/indexing/overlord/util/TaskSlotLimitsDeserializer.java: ########## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord.util; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonNode; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +public class TaskSlotLimitsDeserializer extends JsonDeserializer<Map<String, Number>> Review Comment: This should not be needed once task slot ratio and task slot limits are separated. ########## indexing-service/src/test/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfoTest.java: ########## @@ -201,6 +211,7 @@ public void testEqualsAndSerde() ), 3, 0, + new HashMap<>(), Review Comment: Use `Collections.emptyMap()` instead. ########## indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java: ########## @@ -57,6 +63,7 @@ public ImmutableWorkerInfo( @JsonProperty("worker") Worker worker, @JsonProperty("currCapacityUsed") int currCapacityUsed, @JsonProperty("currParallelIndexCapacityUsed") int currParallelIndexCapacityUsed, + @JsonProperty("currTypeSpecificCapacityUsed") @Nullable Map<String, Integer> typeSpecificCapacityMap, Review Comment: Please use the same name for the argument, the JSON serialized field and the class member for readability. Also, this name is a little ambiguous, use something else like `currCapacityUsedByTaskType`. ########## indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java: ########## @@ -225,6 +253,114 @@ private int getWorkerParallelIndexCapacity(double parallelIndexTaskSlotRatio) return workerParallelIndexCapacity; } + /** + * Determines if a specific task can be executed on the worker based on + * various capacity, custom limits, and availability conditions. + * <p> + * This method checks: + * <ul> + * <li>Whether the worker has sufficient capacity to handle the task.</li> Review Comment: ```suggestion * <li>The worker has sufficient capacity to handle the task.</li> ``` ########## docs/configuration/index.md: ########## @@ -1135,6 +1135,7 @@ The following configs only apply if the Overlord is running in remote mode. For |`druid.indexer.runner.taskAssignmentTimeout`|How long to wait after a task has been assigned to a Middle Manager before throwing an error.|`PT5M`| |`druid.indexer.runner.minWorkerVersion`|The minimum Middle Manager version to send tasks to. The version number is a string. This affects the expected behavior during certain operations like comparison against `druid.worker.version`. Specifically, the version comparison follows dictionary order. Use ISO8601 date format for the version to accommodate date comparisons. |"0"| | `druid.indexer.runner.parallelIndexTaskSlotRatio`| The ratio of task slots available for parallel indexing supervisor tasks per worker. The specified value must be in the range `[0, 1]`. |1| +|`druid.indexer.runner.taskSlotLimits`| A map where each key is a task type, and the corresponding value represents the limit on the number of task slots that a task of that type can occupy on a worker. The key is a `String` that specifies the task type. The value can either be a Double or Integer. A `Double` in the range [0, 1], representing a ratio of the available task slots that tasks of this type can occupy. An `Integer` that is greater than or equal to 0, representing an absolute limit on the number of task slots that tasks of this type can occupy.|Empty map| Review Comment: I think the limit on compaction tasks (or kill tasks for that matter) should not be a concern. This is a runtime property, typically controlled by an admin. So, if an admin wants to restrict the number of concurrent compaction tasks, it is fair to honor that irrespective of the value of `compactionTaskSlotRatio` or `maxCompactionTaskSlots` set in the coordinator dynamic configs. We just need to call it out clearly in the release notes and the docs of the new property. ########## indexing-service/src/main/java/org/apache/druid/indexing/overlord/ZkWorker.java: ########## @@ -167,6 +167,29 @@ private int getCurrParallelIndexCapacityUsed(Map<String, TaskAnnouncement> tasks return currParallelIndexCapacityUsed; } + @JsonProperty("currTypeSpecificCapacityUsed") + public Map<String, Integer> getCurrTypeSpecificCapacityUsed() Review Comment: Yes, ZK-based task runner is deprecated and we should not support the new feature with ZK. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org