This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b568241f0c Add field `taskLimits` to worker select strategies (#16889)
8b568241f0c is described below

commit 8b568241f0c8ee30d4846d28cd37235f5a34d7d5
Author: Misha <[email protected]>
AuthorDate: Thu Mar 6 16:57:35 2025 +0100

    Add field `taskLimits` to worker select strategies (#16889)
    
    Changes
    ---------
    - Add field `taskLimits` to the following worker select strategies
    `equalDistribution`, `equalDistributionWithCategorySpec`, 
`fillCapacityWithCategorySpec`, `fillCapacity`
    - Add sub-fields `maxSlotCountByType` and `maxSlotRatioByType` to 
`taskLimits`
    - Apply these limits per worker when assigning new tasks
    
    ---------
    Co-authored-by: sviatahorau <[email protected]>
    Co-authored-by: Benedict Jin <[email protected]>
    Co-authored-by: Kashif Faraz <[email protected]>
---
 docs/configuration/index.md                        |  14 ++
 .../overlord/setup/WorkerBehaviorConfigTest.java   |   3 +-
 .../indexing/overlord/ImmutableWorkerInfo.java     |  28 ++-
 ...PendingTaskBasedWorkerProvisioningStrategy.java |   6 +
 ...stributionWithAffinityWorkerSelectStrategy.java |   5 +-
 ...butionWithCategorySpecWorkerSelectStrategy.java |  21 ++-
 .../EqualDistributionWorkerSelectStrategy.java     |  22 ++-
 ...llCapacityWithAffinityWorkerSelectStrategy.java |   5 +-
 ...pacityWithCategorySpecWorkerSelectStrategy.java |  22 ++-
 .../setup/FillCapacityWorkerSelectStrategy.java    |  22 ++-
 .../druid/indexing/overlord/setup/TaskLimits.java  | 195 +++++++++++++++++++++
 .../overlord/setup/WorkerBehaviorConfig.java       |   2 +-
 .../indexing/overlord/setup/WorkerSelectUtils.java |  37 +++-
 .../indexing/overlord/ImmutableWorkerInfoTest.java |   5 +
 .../indexing/overlord/RemoteTaskRunnerTest.java    |   2 +-
 .../PendingTaskBasedProvisioningStrategyTest.java  |   2 +-
 .../overlord/hrtr/HttpRemoteTaskRunnerTest.java    |   2 +-
 ...butionWithAffinityWorkerSelectStrategyTest.java | 149 +++++++++++++++-
 ...onWithCategorySpecWorkerSelectStrategyTest.java |   4 +-
 .../EqualDistributionWorkerSelectStrategyTest.java |  14 +-
 ...pacityWithAffinityWorkerSelectStrategyTest.java | 145 ++++++++++++++-
 ...tyWithCategorySpecWorkerSelectStrategyTest.java |   4 +-
 22 files changed, 660 insertions(+), 49 deletions(-)

diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 0ed82017c3d..d75309b7ac3 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1277,6 +1277,7 @@ This evenly distributes work across your Middle Managers.
 |--------|-----------|-------|
 |`type`|`equalDistribution`|required; must be `equalDistribution`|
 |`affinityConfig`|[`AffinityConfig`](#affinityconfig) object|null (no 
affinity)|
+|`taskLimits`|[`TaskLimits`](#tasklimits) object|null (no limits)|
 
 ###### `equalDistributionWithCategorySpec`
 
@@ -1288,6 +1289,7 @@ This strategy doesn't work with `AutoScaler` since the 
behavior is undefined.
 |--------|-----------|-------|
 |`type`|`equalDistributionWithCategorySpec`|required; must be 
`equalDistributionWithCategorySpec`|
 |`workerCategorySpec`|[`WorkerCategorySpec`](#workercategoryspec) object|null 
(no worker category spec)|
+|`taskLimits`|[`TaskLimits`](#tasklimits) object|null (no limits)|
 
 The following example shows tasks of type `index_kafka` that default to 
running on Middle Managers of category `c1`, except for tasks that write to 
datasource `ds1`, which run on Middle Managers of category `c2`.
 
@@ -1323,6 +1325,7 @@ Middle Managers up to capacity simultaneously, rather 
than a single Middle Manag
 |--------|-----------|-------|
 |`type`| `fillCapacity`|required; must be `fillCapacity`|
 |`affinityConfig`| [`AffinityConfig`](#affinityconfig) object |null (no 
affinity)|
+|`taskLimits`|[`TaskLimits`](#tasklimits) object|null (no limits)|
 
 ###### `fillCapacityWithCategorySpec`
 
@@ -1334,6 +1337,7 @@ This strategy doesn't work with `AutoScaler` since the 
behavior is undefined.
 |--------|-----------|-------|
 |`type`|`fillCapacityWithCategorySpec`.|required; must be 
`fillCapacityWithCategorySpec`|
 |`workerCategorySpec`|[`WorkerCategorySpec`](#workercategoryspec) object|null 
(no worker category spec)|
+|`taskLimits`|[`TaskLimits`](#tasklimits) object|null (no limits)|
 
 <a name="javascript-worker-select-strategy"></a>
 
@@ -1383,6 +1387,16 @@ field. If not provided, the default is to not use it at 
all.
 |`categoryMap`|A JSON map object mapping a task type String name to a 
[CategoryConfig](#categoryconfig) object, by which you can specify category 
config for different task type.|`{}`|
 |`strong`|With weak workerCategorySpec (the default), tasks for a dataSource 
may be assigned to other Middle Managers if the Middle Managers specified in 
`categoryMap` are not able to run all pending tasks in the queue for that 
dataSource. With strong workerCategorySpec, tasks for a dataSource will only 
ever be assigned to their specified Middle Managers, and will wait in the 
pending queue if necessary.|false|
 
+###### `taskLimits`
+
+The `taskLimits` field can be used with the `equalDistribution`, 
`fillCapacity`, `equalDistributionWithCategorySpec` and 
`fillCapacityWithCategorySpec` strategies.
+If you don't provide it, it will default to not being used.
+
+|Property|Description|Default|
+|--------|-----------|-------|
+|`maxSlotCountByType`|A map where each key is a task type (`String`), and the 
corresponding value represents the absolute limit on the number of task slots 
that tasks of this type can occupy. The value is an `Integer` that is greater 
than or equal to 0. For example, a value of 5 means that tasks of this type can 
occupy up to 5 task slots in total. If both absolute and ratio limits are 
specified for the same task type, the effective limit will be the smaller of 
the absolute limit and the  [...]
+|`maxSlotRatioByType`|A map where each key is a task type (`String`), and the 
corresponding value is a `Double` which should be in the range [0, 1], 
representing the ratio of task slots that tasks of this type can occupy. This 
ratio defines the proportion of total task slots a task type can use, 
calculated as `ratio * totalSlots`. If both absolute and ratio limits are 
specified for the same task type, the effective limit will be the smaller of 
the absolute limit and the limit derived fro [...]
+
 ###### CategoryConfig
 
 |Property|Description|Default|
diff --git 
a/extensions-core/ec2-extensions/src/test/java/org/apache/druid/indexing/overlord/setup/WorkerBehaviorConfigTest.java
 
b/extensions-core/ec2-extensions/src/test/java/org/apache/druid/indexing/overlord/setup/WorkerBehaviorConfigTest.java
index a47a79fa9bc..f777bad42e8 100644
--- 
a/extensions-core/ec2-extensions/src/test/java/org/apache/druid/indexing/overlord/setup/WorkerBehaviorConfigTest.java
+++ 
b/extensions-core/ec2-extensions/src/test/java/org/apache/druid/indexing/overlord/setup/WorkerBehaviorConfigTest.java
@@ -45,7 +45,8 @@ public class WorkerBehaviorConfigTest
             new AffinityConfig(
                 ImmutableMap.of("foo", ImmutableSet.of("localhost")),
                 false
-            )
+            ),
+            null
         ),
         new EC2AutoScaler(
             7,
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java
index b1eafe0dc59..d0a938e7022 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableSet;
+import org.apache.druid.common.config.Configs;
 import org.apache.druid.guice.annotations.PublicApi;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.TaskResource;
@@ -33,6 +34,8 @@ import org.joda.time.DateTime;
 
 import javax.annotation.Nullable;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
@@ -45,6 +48,7 @@ public class ImmutableWorkerInfo
   private final Worker worker;
   private final int currCapacityUsed;
   private final int currParallelIndexCapacityUsed;
+  private final Map<String, Integer> currCapacityUsedByTaskType;
   private final ImmutableSet<String> availabilityGroups;
   private final ImmutableSet<String> runningTasks;
   private final DateTime lastCompletedTaskTime;
@@ -57,6 +61,7 @@ public class ImmutableWorkerInfo
       @JsonProperty("worker") Worker worker,
       @JsonProperty("currCapacityUsed") int currCapacityUsed,
       @JsonProperty("currParallelIndexCapacityUsed") int 
currParallelIndexCapacityUsed,
+      @JsonProperty("currCapacityUsedByTaskType") Map<String, Integer> 
currCapacityUsedByTaskType,
       @JsonProperty("availabilityGroups") Set<String> availabilityGroups,
       @JsonProperty("runningTasks") Collection<String> runningTasks,
       @JsonProperty("lastCompletedTaskTime") DateTime lastCompletedTaskTime,
@@ -66,6 +71,7 @@ public class ImmutableWorkerInfo
     this.worker = worker;
     this.currCapacityUsed = currCapacityUsed;
     this.currParallelIndexCapacityUsed = currParallelIndexCapacityUsed;
+    this.currCapacityUsedByTaskType = 
Configs.valueOrDefault(currCapacityUsedByTaskType, Collections.emptyMap());
     this.availabilityGroups = ImmutableSet.copyOf(availabilityGroups);
     this.runningTasks = ImmutableSet.copyOf(runningTasks);
     this.lastCompletedTaskTime = lastCompletedTaskTime;
@@ -76,12 +82,13 @@ public class ImmutableWorkerInfo
       Worker worker,
       int currCapacityUsed,
       int currParallelIndexCapacityUsed,
+      Map<String, Integer> currCapacityUsedByTaskType,
       Set<String> availabilityGroups,
       Collection<String> runningTasks,
       DateTime lastCompletedTaskTime
   )
   {
-    this(worker, currCapacityUsed, currParallelIndexCapacityUsed, 
availabilityGroups,
+    this(worker, currCapacityUsed, currParallelIndexCapacityUsed, 
currCapacityUsedByTaskType, availabilityGroups,
          runningTasks, lastCompletedTaskTime, null
     );
   }
@@ -94,7 +101,7 @@ public class ImmutableWorkerInfo
       DateTime lastCompletedTaskTime
   )
   {
-    this(worker, currCapacityUsed, 0, availabilityGroups, runningTasks, 
lastCompletedTaskTime, null);
+    this(worker, currCapacityUsed, 0, Collections.emptyMap(), 
availabilityGroups, runningTasks, lastCompletedTaskTime, null);
   }
 
   /**
@@ -111,6 +118,8 @@ public class ImmutableWorkerInfo
     int currParallelIndexCapacityUsed = 0;
     ImmutableSet.Builder<String> taskIds = ImmutableSet.builder();
     ImmutableSet.Builder<String> availabilityGroups = ImmutableSet.builder();
+    Map<String, Integer> currCapacityUsedByTaskType = new HashMap<>();
+
 
     for (final Map.Entry<String, TaskAnnouncement> entry : 
announcements.entrySet()) {
       final TaskAnnouncement announcement = entry.getValue();
@@ -126,6 +135,8 @@ public class ImmutableWorkerInfo
           currParallelIndexCapacityUsed += requiredCapacity;
         }
 
+        currCapacityUsedByTaskType.merge(announcement.getTaskType(), 1, 
Integer::sum);
+
         taskIds.add(taskId);
         availabilityGroups.add(taskResource.getAvailabilityGroup());
       }
@@ -135,6 +146,7 @@ public class ImmutableWorkerInfo
         worker,
         currCapacityUsed,
         currParallelIndexCapacityUsed,
+        currCapacityUsedByTaskType,
         availabilityGroups.build(),
         taskIds.build(),
         lastCompletedTaskTime,
@@ -160,6 +172,13 @@ public class ImmutableWorkerInfo
     return currParallelIndexCapacityUsed;
   }
 
+  @JsonProperty("currCapacityUsedByTaskType")
+  @JsonInclude(JsonInclude.Include.NON_EMPTY)
+  public Map<String, Integer> getCurrCapacityUsedByTaskType()
+  {
+    return currCapacityUsedByTaskType;
+  }
+
   @JsonProperty("availabilityGroups")
   public Set<String> getAvailabilityGroups()
   {
@@ -243,6 +262,9 @@ public class ImmutableWorkerInfo
     if (currParallelIndexCapacityUsed != that.currParallelIndexCapacityUsed) {
       return false;
     }
+    if (!currCapacityUsedByTaskType.equals(that.currCapacityUsedByTaskType)) {
+      return false;
+    }
     if (!worker.equals(that.worker)) {
       return false;
     }
@@ -266,6 +288,7 @@ public class ImmutableWorkerInfo
     int result = worker.hashCode();
     result = 31 * result + currCapacityUsed;
     result = 31 * result + currParallelIndexCapacityUsed;
+    result = 31 * result + currCapacityUsedByTaskType.hashCode();
     result = 31 * result + availabilityGroups.hashCode();
     result = 31 * result + runningTasks.hashCode();
     result = 31 * result + lastCompletedTaskTime.hashCode();
@@ -280,6 +303,7 @@ public class ImmutableWorkerInfo
            "worker=" + worker +
            ", currCapacityUsed=" + currCapacityUsed +
            ", currParallelIndexCapacityUsed=" + currParallelIndexCapacityUsed +
+           ", currCapacityUsedByTaskType=" + currCapacityUsedByTaskType +
            ", availabilityGroups=" + availabilityGroups +
            ", runningTasks=" + runningTasks +
            ", lastCompletedTaskTime=" + lastCompletedTaskTime +
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java
index 0d715b53549..10122678893 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java
@@ -482,10 +482,16 @@ public class PendingTaskBasedWorkerProvisioningStrategy 
extends AbstractWorkerPr
     int parallelIndexTaskCapacity = 
task.getType().equals(ParallelIndexSupervisorTask.TYPE)
                                     ? 
task.getTaskResource().getRequiredCapacity()
                                     : 0;
+    int taskCapacity = task.getTaskResource().getRequiredCapacity();
+
+    final Map<String, Integer> typeSpecificCapacity = new 
HashMap<>(immutableWorker.getCurrCapacityUsedByTaskType());
+    typeSpecificCapacity.merge(task.getType(), taskCapacity, Integer::sum);
+
     return new ImmutableWorkerInfo(
         immutableWorker.getWorker(),
         immutableWorker.getCurrCapacityUsed() + 1,
         immutableWorker.getCurrParallelIndexCapacityUsed() + 
parallelIndexTaskCapacity,
+        typeSpecificCapacity,
         Sets.union(
             immutableWorker.getAvailabilityGroups(),
             Sets.newHashSet(
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithAffinityWorkerSelectStrategy.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithAffinityWorkerSelectStrategy.java
index 6d28ae78fd0..e60aeae018a 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithAffinityWorkerSelectStrategy.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithAffinityWorkerSelectStrategy.java
@@ -29,9 +29,10 @@ public class 
EqualDistributionWithAffinityWorkerSelectStrategy extends EqualDist
 {
   @JsonCreator
   public EqualDistributionWithAffinityWorkerSelectStrategy(
-      @JsonProperty("affinityConfig") AffinityConfig affinityConfig
+      @JsonProperty("affinityConfig") AffinityConfig affinityConfig,
+      @JsonProperty("taskLimits") TaskLimits taskLimits
   )
   {
-    super(affinityConfig);
+    super(affinityConfig, taskLimits);
   }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategy.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategy.java
index ec65693ac3d..b3715aec285 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategy.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategy.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.overlord.setup;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableMap;
+import org.apache.druid.common.config.Configs;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
 import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
@@ -32,13 +33,16 @@ import java.util.Objects;
 public class EqualDistributionWithCategorySpecWorkerSelectStrategy implements 
WorkerSelectStrategy
 {
   private final WorkerCategorySpec workerCategorySpec;
+  private final TaskLimits taskLimits;
 
   @JsonCreator
   public EqualDistributionWithCategorySpecWorkerSelectStrategy(
-      @JsonProperty("workerCategorySpec") WorkerCategorySpec workerCategorySpec
+      @JsonProperty("workerCategorySpec") WorkerCategorySpec 
workerCategorySpec,
+      @JsonProperty("taskLimits") @Nullable TaskLimits taskLimits
   )
   {
     this.workerCategorySpec = workerCategorySpec;
+    this.taskLimits = Configs.valueOrDefault(taskLimits, TaskLimits.EMPTY);
   }
 
   @JsonProperty
@@ -47,6 +51,12 @@ public class 
EqualDistributionWithCategorySpecWorkerSelectStrategy implements Wo
     return workerCategorySpec;
   }
 
+  @JsonProperty
+  public TaskLimits getTaskLimits()
+  {
+    return taskLimits;
+  }
+
   @Nullable
   @Override
   public ImmutableWorkerInfo findWorkerForTask(
@@ -60,7 +70,8 @@ public class 
EqualDistributionWithCategorySpecWorkerSelectStrategy implements Wo
         zkWorkers,
         config,
         workerCategorySpec,
-        EqualDistributionWorkerSelectStrategy::selectFromEligibleWorkers
+        EqualDistributionWorkerSelectStrategy::selectFromEligibleWorkers,
+        taskLimits
     );
   }
 
@@ -74,13 +85,14 @@ public class 
EqualDistributionWithCategorySpecWorkerSelectStrategy implements Wo
       return false;
     }
     final EqualDistributionWithCategorySpecWorkerSelectStrategy that = 
(EqualDistributionWithCategorySpecWorkerSelectStrategy) o;
-    return Objects.equals(workerCategorySpec, that.workerCategorySpec);
+    return Objects.equals(workerCategorySpec, that.workerCategorySpec)
+           && Objects.equals(taskLimits, that.taskLimits);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(workerCategorySpec);
+    return Objects.hash(workerCategorySpec, taskLimits);
   }
 
   @Override
@@ -88,6 +100,7 @@ public class 
EqualDistributionWithCategorySpecWorkerSelectStrategy implements Wo
   {
     return "EqualDistributionWithCategorySpecWorkerSelectStrategy{" +
            "workerCategorySpec=" + workerCategorySpec +
+           ", taskLimits=" + taskLimits +
            '}';
   }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategy.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategy.java
index afb020e9f7a..f3775d91e52 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategy.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategy.java
@@ -22,10 +22,12 @@ package org.apache.druid.indexing.overlord.setup;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableMap;
+import org.apache.druid.common.config.Configs;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
 import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
 
+import javax.annotation.Nullable;
 import java.util.Comparator;
 import java.util.Map;
 import java.util.Objects;
@@ -35,13 +37,16 @@ import java.util.Objects;
 public class EqualDistributionWorkerSelectStrategy implements 
WorkerSelectStrategy
 {
   private final AffinityConfig affinityConfig;
+  private final TaskLimits taskLimits;
 
   @JsonCreator
   public EqualDistributionWorkerSelectStrategy(
-      @JsonProperty("affinityConfig") AffinityConfig affinityConfig
+      @JsonProperty("affinityConfig") AffinityConfig affinityConfig,
+      @JsonProperty("taskLimits") @Nullable TaskLimits taskLimits
   )
   {
     this.affinityConfig = affinityConfig;
+    this.taskLimits = Configs.valueOrDefault(taskLimits, TaskLimits.EMPTY);
   }
 
   @JsonProperty
@@ -50,6 +55,12 @@ public class EqualDistributionWorkerSelectStrategy 
implements WorkerSelectStrate
     return affinityConfig;
   }
 
+  @JsonProperty
+  public TaskLimits getTaskLimits()
+  {
+    return taskLimits;
+  }
+
   @Override
   public ImmutableWorkerInfo findWorkerForTask(
       final WorkerTaskRunnerConfig config,
@@ -62,7 +73,8 @@ public class EqualDistributionWorkerSelectStrategy implements 
WorkerSelectStrate
         zkWorkers,
         config,
         affinityConfig,
-        EqualDistributionWorkerSelectStrategy::selectFromEligibleWorkers
+        EqualDistributionWorkerSelectStrategy::selectFromEligibleWorkers,
+        taskLimits
     );
   }
 
@@ -83,13 +95,14 @@ public class EqualDistributionWorkerSelectStrategy 
implements WorkerSelectStrate
       return false;
     }
     final EqualDistributionWorkerSelectStrategy that = 
(EqualDistributionWorkerSelectStrategy) o;
-    return Objects.equals(affinityConfig, that.affinityConfig);
+    return Objects.equals(affinityConfig, that.affinityConfig)
+           && Objects.equals(taskLimits, that.taskLimits);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(affinityConfig);
+    return Objects.hash(affinityConfig, taskLimits);
   }
 
   @Override
@@ -97,6 +110,7 @@ public class EqualDistributionWorkerSelectStrategy 
implements WorkerSelectStrate
   {
     return "EqualDistributionWorkerSelectStrategy{" +
            "affinityConfig=" + affinityConfig +
+           ", taskLimits=" + taskLimits +
            '}';
   }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategy.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategy.java
index 46c303f9048..efa24b75b5f 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategy.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategy.java
@@ -29,9 +29,10 @@ public class FillCapacityWithAffinityWorkerSelectStrategy 
extends FillCapacityWo
 {
   @JsonCreator
   public FillCapacityWithAffinityWorkerSelectStrategy(
-      @JsonProperty("affinityConfig") AffinityConfig affinityConfig
+      @JsonProperty("affinityConfig") AffinityConfig affinityConfig,
+      @JsonProperty("taskLimits") TaskLimits taskLimits
   )
   {
-    super(affinityConfig);
+    super(affinityConfig, taskLimits);
   }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategy.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategy.java
index 3dcdfe9a540..d50b19824cf 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategy.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategy.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.overlord.setup;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableMap;
+import org.apache.druid.common.config.Configs;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
 import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
@@ -32,13 +33,17 @@ import java.util.Objects;
 public class FillCapacityWithCategorySpecWorkerSelectStrategy implements 
WorkerSelectStrategy
 {
   private final WorkerCategorySpec workerCategorySpec;
+  private final TaskLimits taskLimits;
+
 
   @JsonCreator
   public FillCapacityWithCategorySpecWorkerSelectStrategy(
-      @JsonProperty("workerCategorySpec") WorkerCategorySpec workerCategorySpec
+      @JsonProperty("workerCategorySpec") WorkerCategorySpec 
workerCategorySpec,
+      @JsonProperty("taskLimits") @Nullable TaskLimits taskLimits
   )
   {
     this.workerCategorySpec = workerCategorySpec;
+    this.taskLimits = Configs.valueOrDefault(taskLimits, TaskLimits.EMPTY);
   }
 
   @JsonProperty
@@ -47,6 +52,12 @@ public class 
FillCapacityWithCategorySpecWorkerSelectStrategy implements WorkerS
     return workerCategorySpec;
   }
 
+  @JsonProperty
+  public TaskLimits getTaskLimits()
+  {
+    return taskLimits;
+  }
+
   @Nullable
   @Override
   public ImmutableWorkerInfo findWorkerForTask(
@@ -60,7 +71,8 @@ public class FillCapacityWithCategorySpecWorkerSelectStrategy 
implements WorkerS
         zkWorkers,
         config,
         workerCategorySpec,
-        FillCapacityWorkerSelectStrategy::selectFromEligibleWorkers
+        FillCapacityWorkerSelectStrategy::selectFromEligibleWorkers,
+        taskLimits
     );
   }
 
@@ -74,13 +86,14 @@ public class 
FillCapacityWithCategorySpecWorkerSelectStrategy implements WorkerS
       return false;
     }
     final FillCapacityWithCategorySpecWorkerSelectStrategy that = 
(FillCapacityWithCategorySpecWorkerSelectStrategy) o;
-    return Objects.equals(workerCategorySpec, that.workerCategorySpec);
+    return Objects.equals(workerCategorySpec, that.workerCategorySpec)
+           && Objects.equals(taskLimits, that.taskLimits);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(workerCategorySpec);
+    return Objects.hash(workerCategorySpec, taskLimits);
   }
 
   @Override
@@ -88,6 +101,7 @@ public class 
FillCapacityWithCategorySpecWorkerSelectStrategy implements WorkerS
   {
     return "FillCapacityWithCategorySpecWorkerSelectStrategy{" +
            "workerCategorySpec=" + workerCategorySpec +
+           ", taskLimits=" + taskLimits +
            '}';
   }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java
index 47ac65b8271..bd8b53bacec 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java
@@ -22,10 +22,12 @@ package org.apache.druid.indexing.overlord.setup;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableMap;
+import org.apache.druid.common.config.Configs;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
 import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
 
+import javax.annotation.Nullable;
 import java.util.Comparator;
 import java.util.Map;
 import java.util.Objects;
@@ -33,13 +35,16 @@ import java.util.Objects;
 public class FillCapacityWorkerSelectStrategy implements WorkerSelectStrategy
 {
   private final AffinityConfig affinityConfig;
+  private final TaskLimits taskLimits;
 
   @JsonCreator
   public FillCapacityWorkerSelectStrategy(
-      @JsonProperty("affinityConfig") AffinityConfig affinityConfig
+      @JsonProperty("affinityConfig") AffinityConfig affinityConfig,
+      @JsonProperty("taskLimits") @Nullable TaskLimits taskLimits
   )
   {
     this.affinityConfig = affinityConfig;
+    this.taskLimits = Configs.valueOrDefault(taskLimits, TaskLimits.EMPTY);
   }
 
   @JsonProperty
@@ -48,6 +53,12 @@ public class FillCapacityWorkerSelectStrategy implements 
WorkerSelectStrategy
     return affinityConfig;
   }
 
+  @JsonProperty
+  public TaskLimits getTaskLimits()
+  {
+    return taskLimits;
+  }
+
   @Override
   public ImmutableWorkerInfo findWorkerForTask(
       final WorkerTaskRunnerConfig config,
@@ -60,7 +71,8 @@ public class FillCapacityWorkerSelectStrategy implements 
WorkerSelectStrategy
         zkWorkers,
         config,
         affinityConfig,
-        FillCapacityWorkerSelectStrategy::selectFromEligibleWorkers
+        FillCapacityWorkerSelectStrategy::selectFromEligibleWorkers,
+        taskLimits
     );
   }
 
@@ -81,13 +93,14 @@ public class FillCapacityWorkerSelectStrategy implements 
WorkerSelectStrategy
       return false;
     }
     final FillCapacityWorkerSelectStrategy that = 
(FillCapacityWorkerSelectStrategy) o;
-    return Objects.equals(affinityConfig, that.affinityConfig);
+    return Objects.equals(affinityConfig, that.affinityConfig)
+           && Objects.equals(taskLimits, that.taskLimits);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(affinityConfig);
+    return Objects.hash(affinityConfig, taskLimits);
   }
 
   @Override
@@ -95,6 +108,7 @@ public class FillCapacityWorkerSelectStrategy implements 
WorkerSelectStrategy
   {
     return "FillCapacityWorkerSelectStrategy{" +
            "affinityConfig=" + affinityConfig +
+           ", taskLimits=" + taskLimits +
            '}';
   }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/TaskLimits.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/TaskLimits.java
new file mode 100644
index 00000000000..39e6333c62f
--- /dev/null
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/TaskLimits.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.setup;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.common.config.Configs;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.indexing.common.task.Task;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Defines global limits for task execution using absolute slot counts and 
proportional ratios.
+ *
+ * <p>Task count limits ({@code maxSlotCountByType}) define the maximum number 
of slots per task type.
+ * Task ratios ({@code maxSlotRatioByType}) define the proportion of total 
slots a task type can use.
+ * If both are set for a task type, the lower limit applies.</p>
+ *
+ * <p>Example:
+ * {@code maxSlotCountByType = {"index_parallel": 3, "query_controller": 5}}
+ * {@code maxSlotRatioByType = {"index_parallel": 0.5, "query_controller": 
0.25}}</p>
+ */
+public class TaskLimits
+{
+  public static final TaskLimits EMPTY = new TaskLimits();
+  private final Map<String, Integer> maxSlotCountByType;
+  private final Map<String, Double> maxSlotRatioByType;
+
+  private TaskLimits()
+  {
+    this(Map.of(), Map.of());
+  }
+
+  @JsonCreator
+  public TaskLimits(
+      @JsonProperty("maxSlotCountByType") @Nullable Map<String, Integer> 
maxSlotCountByType,
+      @JsonProperty("maxSlotRatioByType") @Nullable Map<String, Double> 
maxSlotRatioByType
+  )
+  {
+    validateLimits(maxSlotCountByType, maxSlotRatioByType);
+    this.maxSlotCountByType = Configs.valueOrDefault(maxSlotCountByType, 
Collections.emptyMap());
+    this.maxSlotRatioByType = Configs.valueOrDefault(maxSlotRatioByType, 
Collections.emptyMap());
+  }
+
+  /**
+   * Determines whether the given task can be executed based on task limits 
and available capacity.
+   *
+   * @param task             The task to check.
+   * @param currentSlotsUsed The current capacity used by tasks of the same 
type.
+   * @param totalCapacity    The total available capacity across all workers.
+   * @return {@code true} if the task meets the defined limits and capacity 
constraints; {@code false} otherwise.
+   */
+  public boolean canRunTask(Task task, Integer currentSlotsUsed, Integer 
totalCapacity)
+  {
+    if (maxSlotRatioByType.isEmpty() && maxSlotCountByType.isEmpty()) {
+      return true;
+    }
+    return meetsTaskLimit(
+        task,
+        currentSlotsUsed,
+        totalCapacity
+    );
+  }
+
+  private boolean meetsTaskLimit(
+      Task task,
+      Integer currentSlotsUsed,
+      Integer totalCapacity
+  )
+  {
+    final Integer limit = getLimitForTask(task.getType(), totalCapacity);
+
+    if (limit == null) {
+      return true; // No limit specified, so task can run
+    }
+
+    int requiredCapacity = task.getTaskResource().getRequiredCapacity();
+
+    return hasCapacityBasedOnLimit(limit, currentSlotsUsed, requiredCapacity);
+  }
+
+  private Integer getLimitForTask(
+      String taskType,
+      Integer totalCapacity
+  )
+  {
+    Integer absoluteLimit = maxSlotCountByType.get(taskType);
+    Double ratioLimit = maxSlotRatioByType.get(taskType);
+
+    if (absoluteLimit == null && ratioLimit == null) {
+      return null;
+    }
+
+    if (ratioLimit != null) {
+      int ratioBasedLimit = calculateTaskCapacityFromRatio(ratioLimit, 
totalCapacity);
+      if (absoluteLimit != null) {
+        return Math.min(absoluteLimit, ratioBasedLimit);
+      } else {
+        return ratioBasedLimit;
+      }
+    } else {
+      return absoluteLimit;
+    }
+  }
+
+  private boolean hasCapacityBasedOnLimit(int limit, int currentCapacityUsed, 
int requiredCapacity)
+  {
+    return limit - currentCapacityUsed >= requiredCapacity;
+  }
+
+  private int calculateTaskCapacityFromRatio(double taskSlotRatio, int 
totalCapacity)
+  {
+    int workerParallelIndexCapacity = (int) Math.floor(taskSlotRatio * 
totalCapacity);
+    return Math.max(1, Math.min(workerParallelIndexCapacity, totalCapacity));
+  }
+
+  private void validateLimits(Map<String, Integer> taskCountLimits, 
Map<String, Double> taskRatios)
+  {
+    if (taskCountLimits != null && 
taskCountLimits.values().stream().anyMatch(val -> val < 0)) {
+      throw InvalidInput.exception(
+          "Max task slot count limit for any type must be greater than zero. 
Found[%s].",
+          taskCountLimits
+      );
+    } else if (taskRatios != null && taskRatios.values().stream().anyMatch(val 
-> val < 0 || val > 1)) {
+      throw InvalidInput.exception(
+          "Max task slot ratios should be in the interval of [0, 1]. 
Found[%s].",
+          taskCountLimits
+      );
+    }
+  }
+
+  @JsonProperty
+  public Map<String, Integer> getMaxSlotCountByType()
+  {
+    return maxSlotCountByType;
+  }
+
+  @JsonProperty
+  public Map<String, Double> getMaxSlotRatioByType()
+  {
+    return maxSlotRatioByType;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    TaskLimits that = (TaskLimits) o;
+    return Objects.equals(maxSlotCountByType, that.maxSlotCountByType) && 
Objects.equals(
+        maxSlotRatioByType,
+        that.maxSlotRatioByType
+    );
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(maxSlotCountByType, maxSlotRatioByType);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "TaskLimits{" +
+           "maxSlotCountByType=" + maxSlotCountByType +
+           ", maxSlotRatioByType=" + maxSlotRatioByType +
+           '}';
+  }
+}
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerBehaviorConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerBehaviorConfig.java
index ffc30f55e64..4b867a4bb10 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerBehaviorConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerBehaviorConfig.java
@@ -40,7 +40,7 @@ import org.apache.druid.guice.annotations.ExtensionPoint;
 public interface WorkerBehaviorConfig
 {
   String CONFIG_KEY = "worker.config";
-  WorkerSelectStrategy DEFAULT_STRATEGY = new 
EqualDistributionWorkerSelectStrategy(null);
+  WorkerSelectStrategy DEFAULT_STRATEGY = new 
EqualDistributionWorkerSelectStrategy(null, null);
 
   WorkerSelectStrategy getSelectStrategy();
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java
index c3832daae32..be4e8426008 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java
@@ -26,6 +26,7 @@ import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
 import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
 
 import javax.annotation.Nullable;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
@@ -54,10 +55,11 @@ public class WorkerSelectUtils
       final Map<String, ImmutableWorkerInfo> allWorkers,
       final WorkerTaskRunnerConfig workerTaskRunnerConfig,
       @Nullable final AffinityConfig affinityConfig,
-      final Function<ImmutableMap<String, ImmutableWorkerInfo>, 
ImmutableWorkerInfo> workerSelector
+      final Function<ImmutableMap<String, ImmutableWorkerInfo>, 
ImmutableWorkerInfo> workerSelector,
+      final TaskLimits taskLimits
   )
   {
-    final Map<String, ImmutableWorkerInfo> runnableWorkers = 
getRunnableWorkers(task, allWorkers, workerTaskRunnerConfig);
+    final Map<String, ImmutableWorkerInfo> runnableWorkers = 
getRunnableWorkers(task, allWorkers, workerTaskRunnerConfig, taskLimits);
 
     if (affinityConfig == null) {
       // All runnable workers are valid.
@@ -105,10 +107,11 @@ public class WorkerSelectUtils
       final Map<String, ImmutableWorkerInfo> allWorkers,
       final WorkerTaskRunnerConfig workerTaskRunnerConfig,
       @Nullable final WorkerCategorySpec workerCategorySpec,
-      final Function<ImmutableMap<String, ImmutableWorkerInfo>, 
ImmutableWorkerInfo> workerSelector
+      final Function<ImmutableMap<String, ImmutableWorkerInfo>, 
ImmutableWorkerInfo> workerSelector,
+      final TaskLimits taskLimits
   )
   {
-    final Map<String, ImmutableWorkerInfo> runnableWorkers = 
getRunnableWorkers(task, allWorkers, workerTaskRunnerConfig);
+    final Map<String, ImmutableWorkerInfo> runnableWorkers = 
getRunnableWorkers(task, allWorkers, workerTaskRunnerConfig, taskLimits);
 
     // select worker according to worker category spec
     if (workerCategorySpec != null) {
@@ -145,9 +148,17 @@ public class WorkerSelectUtils
   private static Map<String, ImmutableWorkerInfo> getRunnableWorkers(
       final Task task,
       final Map<String, ImmutableWorkerInfo> allWorkers,
-      final WorkerTaskRunnerConfig workerTaskRunnerConfig
+      final WorkerTaskRunnerConfig workerTaskRunnerConfig,
+      final TaskLimits taskLimits
   )
   {
+    if (!taskLimits.canRunTask(
+        task,
+        getTotalCapacityUsedByType(allWorkers, task.getType()),
+        getTotalCapacity(allWorkers)
+    )) {
+      return Collections.emptyMap();
+    }
     return allWorkers.values()
                      .stream()
                      .filter(worker -> worker.canRunTask(task, 
workerTaskRunnerConfig.getParallelIndexTaskSlotRatio())
@@ -193,4 +204,20 @@ public class WorkerSelectUtils
         )
     );
   }
+
+  private static int getTotalCapacity(final Map<String, ImmutableWorkerInfo> 
allWorkers)
+  {
+    return allWorkers.values().stream().mapToInt(workerInfo -> 
workerInfo.getWorker().getCapacity()).sum();
+  }
+
+  private static int getTotalCapacityUsedByType(
+      final Map<String, ImmutableWorkerInfo> allWorkers,
+      final String taskType
+  )
+  {
+    return allWorkers.values()
+                     .stream()
+                     .mapToInt(workerInfo -> 
workerInfo.getCurrCapacityUsedByTaskType().getOrDefault(taskType, 0))
+                     .sum();
+  }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfoTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfoTest.java
index e373eb982a3..122e9892dc8 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfoTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfoTest.java
@@ -33,6 +33,8 @@ import org.apache.druid.java.util.common.DateTimes;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Collections;
+
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -201,6 +203,7 @@ public class ImmutableWorkerInfoTest
         ),
         3,
         0,
+        Collections.emptyMap(),
         ImmutableSet.of("grp1", "grp2"),
         ImmutableSet.of("task1", "task2"),
         DateTimes.of("2015-01-01T01:01:01Z"),
@@ -211,6 +214,7 @@ public class ImmutableWorkerInfoTest
         ),
         2,
         0,
+        Collections.emptyMap(),
         ImmutableSet.of("grp1", "grp2"),
         ImmutableSet.of("task1", "task2"),
         DateTimes.of("2015-01-01T01:01:02Z"),
@@ -225,6 +229,7 @@ public class ImmutableWorkerInfoTest
         new Worker("http", "testWorker2", "192.0.0.1", 10, "v1", 
WorkerConfig.DEFAULT_CATEGORY),
         6,
         0,
+        Collections.emptyMap(),
         ImmutableSet.of("grp1", "grp2"),
         ImmutableSet.of("task1", "task2"),
         DateTimes.of("2015-01-01T01:01:02Z")
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
index 8207584f213..7bbd6ef5e37 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
@@ -632,7 +632,7 @@ public class RemoteTaskRunnerTest
         new TestRemoteTaskRunnerConfig(TIMEOUT_PERIOD),
         new TestProvisioningStrategy<>(),
         httpClient,
-        new DefaultWorkerBehaviorConfig(new 
EqualDistributionWorkerSelectStrategy(null), null)
+        new DefaultWorkerBehaviorConfig(new 
EqualDistributionWorkerSelectStrategy(null, null), null)
     );
     Assert.assertEquals(-1, 
remoteTaskRunner.getMaximumCapacityWithAutoscale());
   }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java
index 8a1232b9571..eaba6f9e6f9 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java
@@ -92,7 +92,7 @@ public class PendingTaskBasedProvisioningStrategyTest
 
     workerConfig = new AtomicReference<>(
         new DefaultWorkerBehaviorConfig(
-            new FillCapacityWorkerSelectStrategy(null),
+            new FillCapacityWorkerSelectStrategy(null, null),
             autoScaler
         )
     );
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
index 91b0778c950..3e5c862d7a6 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
@@ -1818,7 +1818,7 @@ public class HttpRemoteTaskRunnerTest
         TestHelper.makeJsonMapper(),
         new HttpRemoteTaskRunnerConfig(),
         EasyMock.createNiceMock(HttpClient.class),
-        DSuppliers.of(new AtomicReference<>(new 
DefaultWorkerBehaviorConfig(new EqualDistributionWorkerSelectStrategy(null), 
null))),
+        DSuppliers.of(new AtomicReference<>(new 
DefaultWorkerBehaviorConfig(new EqualDistributionWorkerSelectStrategy(null, 
null), null))),
         new TestProvisioningStrategy<>(),
         druidNodeDiscoveryProvider,
         EasyMock.createMock(TaskStorage.class),
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithAffinityWorkerSelectStrategyTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithAffinityWorkerSelectStrategyTest.java
index 0641d3a8d91..1e81be9cc97 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithAffinityWorkerSelectStrategyTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithAffinityWorkerSelectStrategyTest.java
@@ -32,7 +32,10 @@ import org.apache.druid.segment.TestHelper;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 
 public class EqualDistributionWithAffinityWorkerSelectStrategyTest
 {
@@ -40,7 +43,8 @@ public class 
EqualDistributionWithAffinityWorkerSelectStrategyTest
   public void testFindWorkerForTask()
   {
     EqualDistributionWorkerSelectStrategy strategy = new 
EqualDistributionWithAffinityWorkerSelectStrategy(
-        new AffinityConfig(ImmutableMap.of("foo", 
ImmutableSet.of("localhost1", "localhost2", "localhost3")), false)
+        new AffinityConfig(ImmutableMap.of("foo", 
ImmutableSet.of("localhost1", "localhost2", "localhost3")), false),
+        null
     );
 
     NoopTask noopTask = NoopTask.forDatasource("foo");
@@ -85,7 +89,8 @@ public class 
EqualDistributionWithAffinityWorkerSelectStrategyTest
   public void testFindWorkerForTaskWithNulls()
   {
     EqualDistributionWorkerSelectStrategy strategy = new 
EqualDistributionWithAffinityWorkerSelectStrategy(
-            new AffinityConfig(ImmutableMap.of("foo", 
ImmutableSet.of("localhost")), false)
+            new AffinityConfig(ImmutableMap.of("foo", 
ImmutableSet.of("localhost")), false),
+            null
     );
 
     ImmutableWorkerInfo worker = strategy.findWorkerForTask(
@@ -115,7 +120,8 @@ public class 
EqualDistributionWithAffinityWorkerSelectStrategyTest
   public void testIsolation()
   {
     EqualDistributionWorkerSelectStrategy strategy = new 
EqualDistributionWithAffinityWorkerSelectStrategy(
-            new AffinityConfig(ImmutableMap.of("foo", 
ImmutableSet.of("localhost")), false)
+            new AffinityConfig(ImmutableMap.of("foo", 
ImmutableSet.of("localhost")), false),
+            null
     );
 
     ImmutableWorkerInfo worker = strategy.findWorkerForTask(
@@ -139,7 +145,8 @@ public class 
EqualDistributionWithAffinityWorkerSelectStrategyTest
   {
     final ObjectMapper objectMapper = TestHelper.makeJsonMapper();
     final EqualDistributionWorkerSelectStrategy strategy = new 
EqualDistributionWithAffinityWorkerSelectStrategy(
-        new AffinityConfig(ImmutableMap.of("foo", 
ImmutableSet.of("localhost")), false)
+        new AffinityConfig(ImmutableMap.of("foo", 
ImmutableSet.of("localhost")), false),
+        null
     );
     final WorkerSelectStrategy strategy2 = objectMapper.readValue(
         objectMapper.writeValueAsBytes(strategy),
@@ -147,4 +154,138 @@ public class 
EqualDistributionWithAffinityWorkerSelectStrategyTest
     );
     Assert.assertEquals(strategy, strategy2);
   }
+
+  @Test
+  public void testFindWorkerForTaskWithGlobalLimits()
+  {
+    Map<String, Integer> taskLimits = new HashMap<>();
+    taskLimits.put("noop", 2);
+
+    Map<String, Integer> capacityUsed = new HashMap<>();
+    capacityUsed.put("noop", 1);
+    EqualDistributionWorkerSelectStrategy strategy = new 
EqualDistributionWithAffinityWorkerSelectStrategy(
+        null,
+        new TaskLimits(taskLimits, null)
+    );
+
+    NoopTask noopTask = NoopTask.forDatasource("foo");
+    ImmutableWorkerInfo worker = strategy.findWorkerForTask(
+        new RemoteTaskRunnerConfig(),
+        ImmutableMap.of(
+            "localhost0",
+            new ImmutableWorkerInfo(
+                new Worker("http", "localhost0", "localhost0", 2, "v1", 
WorkerConfig.DEFAULT_CATEGORY), 0,
+                Set.of(),
+                Set.of(),
+                DateTimes.nowUtc()
+            ),
+            "localhost1",
+            new ImmutableWorkerInfo(
+                new Worker("http", "localhost1", "localhost1", 2, "v1", 
WorkerConfig.DEFAULT_CATEGORY), 0,
+                0,
+                capacityUsed,
+                Set.of(),
+                Set.of(),
+                DateTimes.nowUtc()
+            )
+        ),
+        noopTask
+    );
+    Assert.assertNotNull(worker);
+
+    ImmutableWorkerInfo worker1 = strategy.findWorkerForTask(
+        new RemoteTaskRunnerConfig(),
+        ImmutableMap.of(
+            "localhost0",
+            new ImmutableWorkerInfo(
+                new Worker("http", "localhost0", "localhost0", 2, "v1", 
WorkerConfig.DEFAULT_CATEGORY), 0,
+                0,
+                capacityUsed,
+                Set.of(),
+                Set.of(),
+                DateTimes.nowUtc()
+            ),
+            "localhost1",
+            new ImmutableWorkerInfo(
+                new Worker("http", "localhost1", "localhost1", 2, "v1", 
WorkerConfig.DEFAULT_CATEGORY), 0,
+                0,
+                capacityUsed,
+                Set.of(),
+                Set.of(),
+                DateTimes.nowUtc()
+            )
+        ),
+        noopTask
+    );
+
+    Assert.assertNull(worker1);
+
+  }
+
+  @Test
+  public void testFindWorkerForTaskWithGlobalRatios()
+  {
+    Map<String, Double> taskRatios = new HashMap<>();
+    taskRatios.put("noop", 0.5);
+
+    Map<String, Integer> capacityUsed = new HashMap<>();
+    capacityUsed.put("noop", 1);
+    EqualDistributionWorkerSelectStrategy strategy = new 
EqualDistributionWithAffinityWorkerSelectStrategy(
+        null,
+        new TaskLimits(null, taskRatios)
+    );
+
+    NoopTask noopTask = NoopTask.forDatasource("foo");
+    ImmutableWorkerInfo worker = strategy.findWorkerForTask(
+        new RemoteTaskRunnerConfig(),
+        ImmutableMap.of(
+            "localhost0",
+            new ImmutableWorkerInfo(
+                new Worker("http", "localhost0", "localhost0", 2, "v1", 
WorkerConfig.DEFAULT_CATEGORY), 0,
+                Set.of(),
+                Set.of(),
+                DateTimes.nowUtc()
+            ),
+            "localhost1",
+            new ImmutableWorkerInfo(
+                new Worker("http", "localhost1", "localhost1", 2, "v1", 
WorkerConfig.DEFAULT_CATEGORY), 0,
+                0,
+                capacityUsed,
+                Set.of(),
+                Set.of(),
+                DateTimes.nowUtc()
+            )
+        ),
+        noopTask
+    );
+    Assert.assertNotNull(worker);
+
+    ImmutableWorkerInfo worker1 = strategy.findWorkerForTask(
+        new RemoteTaskRunnerConfig(),
+        ImmutableMap.of(
+            "localhost0",
+            new ImmutableWorkerInfo(
+                new Worker("http", "localhost0", "localhost0", 2, "v1", 
WorkerConfig.DEFAULT_CATEGORY), 0,
+                0,
+                capacityUsed,
+                Set.of(),
+                Set.of(),
+                DateTimes.nowUtc()
+            ),
+            "localhost1",
+            new ImmutableWorkerInfo(
+                new Worker("http", "localhost1", "localhost1", 2, "v1", 
WorkerConfig.DEFAULT_CATEGORY), 0,
+                0,
+                capacityUsed,
+                Set.of(),
+                Set.of(),
+                DateTimes.nowUtc()
+            )
+        ),
+        noopTask
+    );
+
+    Assert.assertNull(worker1);
+
+  }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java
index 99a690a738d..6ac04a8920f 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java
@@ -176,8 +176,8 @@ public class 
EqualDistributionWithCategorySpecWorkerSelectStrategyTest
 
   private ImmutableWorkerInfo selectWorker(WorkerCategorySpec 
workerCategorySpec)
   {
-    final EqualDistributionWithCategorySpecWorkerSelectStrategy strategy = new 
EqualDistributionWithCategorySpecWorkerSelectStrategy(
-        workerCategorySpec);
+    final EqualDistributionWithCategorySpecWorkerSelectStrategy strategy =
+        new 
EqualDistributionWithCategorySpecWorkerSelectStrategy(workerCategorySpec, null);
 
     ImmutableWorkerInfo worker = strategy.findWorkerForTask(
         new RemoteTaskRunnerConfig(),
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategyTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategyTest.java
index dac25a3c9e3..58280641645 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategyTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategyTest.java
@@ -69,7 +69,7 @@ public class EqualDistributionWorkerSelectStrategyTest
   @Test
   public void testFindWorkerForTask()
   {
-    final EqualDistributionWorkerSelectStrategy strategy = new 
EqualDistributionWorkerSelectStrategy(null);
+    final EqualDistributionWorkerSelectStrategy strategy = new 
EqualDistributionWorkerSelectStrategy(null, null);
 
     ImmutableWorkerInfo worker = strategy.findWorkerForTask(
         new RemoteTaskRunnerConfig(),
@@ -97,7 +97,7 @@ public class EqualDistributionWorkerSelectStrategyTest
   @Test
   public void testFindWorkerForTaskWhenSameCurrCapacityUsed()
   {
-    final EqualDistributionWorkerSelectStrategy strategy = new 
EqualDistributionWorkerSelectStrategy(null);
+    final EqualDistributionWorkerSelectStrategy strategy = new 
EqualDistributionWorkerSelectStrategy(null, null);
 
     ImmutableWorkerInfo worker = strategy.findWorkerForTask(
         new RemoteTaskRunnerConfig(),
@@ -126,7 +126,7 @@ public class EqualDistributionWorkerSelectStrategyTest
   public void testOneDisableWorkerDifferentUsedCapacity()
   {
     String disabledVersion = "";
-    final EqualDistributionWorkerSelectStrategy strategy = new 
EqualDistributionWorkerSelectStrategy(null);
+    final EqualDistributionWorkerSelectStrategy strategy = new 
EqualDistributionWorkerSelectStrategy(null, null);
 
     ImmutableWorkerInfo worker = strategy.findWorkerForTask(
         new RemoteTaskRunnerConfig(),
@@ -155,7 +155,7 @@ public class EqualDistributionWorkerSelectStrategyTest
   public void testOneDisableWorkerSameUsedCapacity()
   {
     String disabledVersion = "";
-    final EqualDistributionWorkerSelectStrategy strategy = new 
EqualDistributionWorkerSelectStrategy(null);
+    final EqualDistributionWorkerSelectStrategy strategy = new 
EqualDistributionWorkerSelectStrategy(null, null);
 
     ImmutableWorkerInfo worker = strategy.findWorkerForTask(
         new RemoteTaskRunnerConfig(),
@@ -190,7 +190,8 @@ public class EqualDistributionWorkerSelectStrategyTest
                 "bar", ImmutableSet.of("nonexistent-worker")
             ),
             false
-        )
+        ),
+        null
     );
 
     ImmutableWorkerInfo workerFoo = strategy.findWorkerForTask(
@@ -226,7 +227,8 @@ public class EqualDistributionWorkerSelectStrategyTest
                 "bar", ImmutableSet.of("nonexistent-worker")
             ),
             true
-        )
+        ),
+        null
     );
 
     ImmutableWorkerInfo workerFoo = strategy.findWorkerForTask(
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategyTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategyTest.java
index 93f6c39b0f0..207c6e43dda 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategyTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategyTest.java
@@ -30,7 +30,9 @@ import org.apache.druid.java.util.common.DateTimes;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 
 public class FillCapacityWithAffinityWorkerSelectStrategyTest
 {
@@ -38,7 +40,8 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
   public void testFindWorkerForTask()
   {
     FillCapacityWorkerSelectStrategy strategy = new 
FillCapacityWithAffinityWorkerSelectStrategy(
-        new AffinityConfig(ImmutableMap.of("foo", 
ImmutableSet.of("localhost")), false)
+        new AffinityConfig(ImmutableMap.of("foo", 
ImmutableSet.of("localhost")), false),
+        null
     );
 
     ImmutableWorkerInfo worker = strategy.findWorkerForTask(
@@ -68,7 +71,8 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
   public void testFindWorkerForTaskWithNulls()
   {
     FillCapacityWorkerSelectStrategy strategy = new 
FillCapacityWithAffinityWorkerSelectStrategy(
-        new AffinityConfig(ImmutableMap.of("foo", 
ImmutableSet.of("localhost")), false)
+        new AffinityConfig(ImmutableMap.of("foo", 
ImmutableSet.of("localhost")), false),
+        null
     );
 
     ImmutableWorkerInfo worker = strategy.findWorkerForTask(
@@ -98,7 +102,8 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
   public void testIsolation()
   {
     FillCapacityWorkerSelectStrategy strategy = new 
FillCapacityWithAffinityWorkerSelectStrategy(
-        new AffinityConfig(ImmutableMap.of("foo", 
ImmutableSet.of("localhost")), false)
+        new AffinityConfig(ImmutableMap.of("foo", 
ImmutableSet.of("localhost")), false),
+        null
     );
 
     ImmutableWorkerInfo worker = strategy.findWorkerForTask(
@@ -116,4 +121,138 @@ public class 
FillCapacityWithAffinityWorkerSelectStrategyTest
     );
     Assert.assertNull(worker);
   }
+
+  @Test
+  public void testFindWorkerForTaskWithGlobalLimits()
+  {
+    Map<String, Integer> taskLimits = new HashMap<>();
+    taskLimits.put("noop", 2);
+
+    Map<String, Integer> capacityUsed = new HashMap<>();
+    capacityUsed.put("noop", 1);
+    FillCapacityWorkerSelectStrategy strategy = new 
FillCapacityWorkerSelectStrategy(
+        null,
+        new TaskLimits(taskLimits, null)
+    );
+
+    NoopTask noopTask = NoopTask.forDatasource("foo");
+    ImmutableWorkerInfo worker = strategy.findWorkerForTask(
+        new RemoteTaskRunnerConfig(),
+        ImmutableMap.of(
+            "localhost0",
+            new ImmutableWorkerInfo(
+                new Worker("http", "localhost0", "localhost0", 2, "v1", 
WorkerConfig.DEFAULT_CATEGORY), 0,
+                new HashSet<>(),
+                new HashSet<>(),
+                DateTimes.nowUtc()
+            ),
+            "localhost1",
+            new ImmutableWorkerInfo(
+                new Worker("http", "localhost1", "localhost1", 2, "v1", 
WorkerConfig.DEFAULT_CATEGORY), 0,
+                0,
+                capacityUsed,
+                new HashSet<>(),
+                new HashSet<>(),
+                DateTimes.nowUtc()
+            )
+        ),
+        noopTask
+    );
+    Assert.assertNotNull(worker);
+
+    ImmutableWorkerInfo worker1 = strategy.findWorkerForTask(
+        new RemoteTaskRunnerConfig(),
+        ImmutableMap.of(
+            "localhost0",
+            new ImmutableWorkerInfo(
+                new Worker("http", "localhost0", "localhost0", 2, "v1", 
WorkerConfig.DEFAULT_CATEGORY), 0,
+                0,
+                capacityUsed,
+                new HashSet<>(),
+                new HashSet<>(),
+                DateTimes.nowUtc()
+            ),
+            "localhost1",
+            new ImmutableWorkerInfo(
+                new Worker("http", "localhost1", "localhost1", 2, "v1", 
WorkerConfig.DEFAULT_CATEGORY), 0,
+                0,
+                capacityUsed,
+                new HashSet<>(),
+                new HashSet<>(),
+                DateTimes.nowUtc()
+            )
+        ),
+        noopTask
+    );
+
+    Assert.assertNull(worker1);
+
+  }
+
+  @Test
+  public void testFindWorkerForTaskWithGlobalRatios()
+  {
+    Map<String, Double> taskRatios = new HashMap<>();
+    taskRatios.put("noop", 0.5);
+
+    Map<String, Integer> capacityUsed = new HashMap<>();
+    capacityUsed.put("noop", 1);
+    FillCapacityWorkerSelectStrategy strategy = new 
FillCapacityWorkerSelectStrategy(
+        null,
+        new TaskLimits(null, taskRatios)
+    );
+
+    NoopTask noopTask = NoopTask.forDatasource("foo");
+    ImmutableWorkerInfo worker = strategy.findWorkerForTask(
+        new RemoteTaskRunnerConfig(),
+        ImmutableMap.of(
+            "localhost0",
+            new ImmutableWorkerInfo(
+                new Worker("http", "localhost0", "localhost0", 2, "v1", 
WorkerConfig.DEFAULT_CATEGORY), 0,
+                new HashSet<>(),
+                new HashSet<>(),
+                DateTimes.nowUtc()
+            ),
+            "localhost1",
+            new ImmutableWorkerInfo(
+                new Worker("http", "localhost1", "localhost1", 2, "v1", 
WorkerConfig.DEFAULT_CATEGORY), 0,
+                0,
+                capacityUsed,
+                new HashSet<>(),
+                new HashSet<>(),
+                DateTimes.nowUtc()
+            )
+        ),
+        noopTask
+    );
+    Assert.assertNotNull(worker);
+
+    ImmutableWorkerInfo worker1 = strategy.findWorkerForTask(
+        new RemoteTaskRunnerConfig(),
+        ImmutableMap.of(
+            "localhost0",
+            new ImmutableWorkerInfo(
+                new Worker("http", "localhost0", "localhost0", 2, "v1", 
WorkerConfig.DEFAULT_CATEGORY), 0,
+                0,
+                capacityUsed,
+                new HashSet<>(),
+                new HashSet<>(),
+                DateTimes.nowUtc()
+            ),
+            "localhost1",
+            new ImmutableWorkerInfo(
+                new Worker("http", "localhost1", "localhost1", 2, "v1", 
WorkerConfig.DEFAULT_CATEGORY), 0,
+                0,
+                capacityUsed,
+                new HashSet<>(),
+                new HashSet<>(),
+                DateTimes.nowUtc()
+            )
+        ),
+        noopTask
+    );
+
+    Assert.assertNull(worker1);
+
+  }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java
index bb4795d029e..880ef743dca 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java
@@ -176,8 +176,8 @@ public class 
FillCapacityWithCategorySpecWorkerSelectStrategyTest
 
   private ImmutableWorkerInfo selectWorker(WorkerCategorySpec 
workerCategorySpec)
   {
-    final FillCapacityWithCategorySpecWorkerSelectStrategy strategy = new 
FillCapacityWithCategorySpecWorkerSelectStrategy(
-        workerCategorySpec);
+    final FillCapacityWithCategorySpecWorkerSelectStrategy strategy =
+        new 
FillCapacityWithCategorySpecWorkerSelectStrategy(workerCategorySpec, null);
 
     ImmutableWorkerInfo worker = strategy.findWorkerForTask(
         new RemoteTaskRunnerConfig(),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to